paopaozhuli 2020-06-26
数据生产
import java.io.*; import java.text.DecimalFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; public class ProductLog { // 存放生产的电话号码 private List<String> phoneList = new ArrayList<String>(); private Map<String, String> phoneNameMap = new HashMap<>(); String startTime = "2020-01-01"; String endTime = "2020-12-31"; public void initPhone() { //20个随机电话 phoneList.add("17078388295"); phoneList.add("13980337439"); phoneList.add("14575535933"); phoneList.add("19902496992"); phoneList.add("18549641558"); phoneList.add("17005930322"); phoneList.add("18468618874"); phoneList.add("18576581848"); phoneList.add("15978226424"); phoneList.add("15542823911"); phoneList.add("17526304161"); phoneList.add("15422018558"); phoneList.add("17269452013"); phoneList.add("17764278604"); phoneList.add("15711910344"); phoneList.add("15714728273"); phoneList.add("16061028454"); phoneList.add("16264433631"); phoneList.add("17601615878"); phoneList.add("15897468949"); //随机电话对应的姓名 phoneNameMap.put("17078388295", "李雁"); phoneNameMap.put("13980337439", "卫艺"); phoneNameMap.put("14575535933", "仰莉"); phoneNameMap.put("19902496992", "陶欣悦"); phoneNameMap.put("18549641558", "施梅梅"); phoneNameMap.put("17005930322", "金虹霖"); phoneNameMap.put("18468618874", "魏明艳"); phoneNameMap.put("18576581848", "华贞"); phoneNameMap.put("15978226424", "华啟倩"); phoneNameMap.put("15542823911", "仲采绿"); phoneNameMap.put("17526304161", "卫丹"); phoneNameMap.put("15422018558", "戚丽红"); phoneNameMap.put("17269452013", "何翠柔"); phoneNameMap.put("17764278604", "钱溶艳"); phoneNameMap.put("15711910344", "钱琳"); phoneNameMap.put("15714728273", "缪静欣"); phoneNameMap.put("16061028454", "焦秋菊"); phoneNameMap.put("16264433631", "吕访琴"); phoneNameMap.put("17601615878", "沈丹"); phoneNameMap.put("15897468949", "褚美丽"); } // 生产数据 // caller,callee,buildTime,duration // 主叫,被叫,通话建立时间,通话持续时间 public String product() { String caller; String callee; // 生成主叫的随机索引 int callerIndex = (int) (Math.random() * phoneList.size()); // 通过随机索引获得主叫电话号码 caller = phoneList.get(callerIndex); while (true) { int calleeIndex = (int) (Math.random() * phoneList.size()); callee = phoneList.get(calleeIndex); // 去重判断 if (!caller.equals(callee)) break; } // 随机产生通话建立时间 String buildTime = randomBuildTime(startTime,endTime); // 随机产生通话持续时间 DecimalFormat df = new DecimalFormat("0000"); String duration = df.format((int) (30 * 60 * Math.random())); StringBuilder sb = new StringBuilder(); sb.append(caller + ",").append(callee + ",").append(buildTime + ",").append(duration); return sb.toString(); } // 随机生成时间 private String randomBuildTime(String startTime, String endTime) { try { SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd"); Date startDate = sdf1.parse(startTime); Date endDate = sdf1.parse(endTime); // 生成时间字符串 if(endDate.getTime() <= startDate.getTime()){return null;} // (结束 - 起始) * 随机[0,1) + 起始 long randomTS = startDate.getTime() + (long)((endDate.getTime() - startDate.getTime())*Math.random()); Date resultDate = new Date(randomTS); SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String resultTimeString = sdf2.format(resultDate); return resultTimeString; } catch (ParseException e) { e.printStackTrace(); } return null; } public void writeLog(String filePath){ try { OutputStreamWriter osw = new OutputStreamWriter(new FileOutputStream(filePath,true), "UTF-8"); while(true){ try { Thread.sleep(500); String log = product(); System.out.println(log); osw.write(log+"\n"); osw.flush(); } catch (InterruptedException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } catch (FileNotFoundException e) { e.printStackTrace(); } } public static void main(String[] args) { args = new String[]{"F:\\idea-workspace\\CT_BD\\data\\calllog.csv"}; ProductLog productLog = new ProductLog(); productLog.initPhone(); productLog.product(); productLog.writeLog(args[0]); } }
producer.sh
#!/bin/bash java -cp /root/temp/CT_producer-1.0-SNAPSHOT.jar ProductLog /root/temp/calllog.csv
数据消费
Flume配置
#定义agent名, source、channel、sink的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 #具体定义source a1.sources.r1.type = exec a1.sources.r1.command = tail -F -c +0 /root/temp/calllog.csv a1.sources.r1.shell = /bin/bash -c #具体定义channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #具体定义sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.brokerList = bigdata111:9092 a1.sinks.k1.topic = call a1.sinks.k1.batchSize = 20
数据存储
数据分析
电话号码:123456 Chen 年:2020 月:12 日:31
数据展示
参考
flume
https://www.freesion.com/article/4812259552/
电信项目