paopaozhuli 2020-06-26
数据生产
import java.io.*;
import java.text.DecimalFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
public class ProductLog {
// 存放生产的电话号码
private List<String> phoneList = new ArrayList<String>();
private Map<String, String> phoneNameMap = new HashMap<>();
String startTime = "2020-01-01";
String endTime = "2020-12-31";
public void initPhone() {
//20个随机电话
phoneList.add("17078388295");
phoneList.add("13980337439");
phoneList.add("14575535933");
phoneList.add("19902496992");
phoneList.add("18549641558");
phoneList.add("17005930322");
phoneList.add("18468618874");
phoneList.add("18576581848");
phoneList.add("15978226424");
phoneList.add("15542823911");
phoneList.add("17526304161");
phoneList.add("15422018558");
phoneList.add("17269452013");
phoneList.add("17764278604");
phoneList.add("15711910344");
phoneList.add("15714728273");
phoneList.add("16061028454");
phoneList.add("16264433631");
phoneList.add("17601615878");
phoneList.add("15897468949");
//随机电话对应的姓名
phoneNameMap.put("17078388295", "李雁");
phoneNameMap.put("13980337439", "卫艺");
phoneNameMap.put("14575535933", "仰莉");
phoneNameMap.put("19902496992", "陶欣悦");
phoneNameMap.put("18549641558", "施梅梅");
phoneNameMap.put("17005930322", "金虹霖");
phoneNameMap.put("18468618874", "魏明艳");
phoneNameMap.put("18576581848", "华贞");
phoneNameMap.put("15978226424", "华啟倩");
phoneNameMap.put("15542823911", "仲采绿");
phoneNameMap.put("17526304161", "卫丹");
phoneNameMap.put("15422018558", "戚丽红");
phoneNameMap.put("17269452013", "何翠柔");
phoneNameMap.put("17764278604", "钱溶艳");
phoneNameMap.put("15711910344", "钱琳");
phoneNameMap.put("15714728273", "缪静欣");
phoneNameMap.put("16061028454", "焦秋菊");
phoneNameMap.put("16264433631", "吕访琴");
phoneNameMap.put("17601615878", "沈丹");
phoneNameMap.put("15897468949", "褚美丽");
}
// 生产数据
// caller,callee,buildTime,duration
// 主叫,被叫,通话建立时间,通话持续时间
public String product() {
String caller;
String callee;
// 生成主叫的随机索引
int callerIndex = (int) (Math.random() * phoneList.size());
// 通过随机索引获得主叫电话号码
caller = phoneList.get(callerIndex);
while (true) {
int calleeIndex = (int) (Math.random() * phoneList.size());
callee = phoneList.get(calleeIndex);
// 去重判断
if (!caller.equals(callee)) break;
}
// 随机产生通话建立时间
String buildTime = randomBuildTime(startTime,endTime);
// 随机产生通话持续时间
DecimalFormat df = new DecimalFormat("0000");
String duration = df.format((int) (30 * 60 * Math.random()));
StringBuilder sb = new StringBuilder();
sb.append(caller + ",").append(callee + ",").append(buildTime + ",").append(duration);
return sb.toString();
}
// 随机生成时间
private String randomBuildTime(String startTime, String endTime) {
try {
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
Date startDate = sdf1.parse(startTime);
Date endDate = sdf1.parse(endTime);
// 生成时间字符串
if(endDate.getTime() <= startDate.getTime()){return null;}
// (结束 - 起始) * 随机[0,1) + 起始
long randomTS = startDate.getTime() + (long)((endDate.getTime() - startDate.getTime())*Math.random());
Date resultDate = new Date(randomTS);
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String resultTimeString = sdf2.format(resultDate);
return resultTimeString;
} catch (ParseException e) {
e.printStackTrace();
}
return null;
}
public void writeLog(String filePath){
try {
OutputStreamWriter osw = new OutputStreamWriter(new FileOutputStream(filePath,true), "UTF-8");
while(true){
try {
Thread.sleep(500);
String log = product();
System.out.println(log);
osw.write(log+"\n");
osw.flush();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
args = new String[]{"F:\\idea-workspace\\CT_BD\\data\\calllog.csv"};
ProductLog productLog = new ProductLog();
productLog.initPhone();
productLog.product();
productLog.writeLog(args[0]);
}
}producer.sh
#!/bin/bash java -cp /root/temp/CT_producer-1.0-SNAPSHOT.jar ProductLog /root/temp/calllog.csv
数据消费
Flume配置
#定义agent名, source、channel、sink的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 #具体定义source a1.sources.r1.type = exec a1.sources.r1.command = tail -F -c +0 /root/temp/calllog.csv a1.sources.r1.shell = /bin/bash -c #具体定义channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #具体定义sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.brokerList = bigdata111:9092 a1.sinks.k1.topic = call a1.sinks.k1.batchSize = 20
![[项目] 电信数据运营 [项目] 电信数据运营](https://cdn.ancii.com/article/image/v1/sw/wV/kP/PkwwVsGDmjDG9swnOi7SkjVAsMQAarn73E9S3mmSmcD4Mopou0kJz6HhldxFYvixRkATMGG5UfQjhD_g6cq6bw.png)
数据存储
数据分析
电话号码:123456 Chen 年:2020 月:12 日:31
数据展示
参考
flume
https://www.freesion.com/article/4812259552/
电信项目