[项目] 电信数据运营

paopaozhuli 2020-06-26

 数据生产

import java.io.*;
import java.text.DecimalFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;

public class ProductLog {
    // 存放生产的电话号码
    private List<String> phoneList = new ArrayList<String>();
    private Map<String, String> phoneNameMap = new HashMap<>();
    String startTime = "2020-01-01";
    String endTime = "2020-12-31";

    public void initPhone() {
        //20个随机电话
        phoneList.add("17078388295");
        phoneList.add("13980337439");
        phoneList.add("14575535933");
        phoneList.add("19902496992");
        phoneList.add("18549641558");
        phoneList.add("17005930322");
        phoneList.add("18468618874");
        phoneList.add("18576581848");
        phoneList.add("15978226424");
        phoneList.add("15542823911");
        phoneList.add("17526304161");
        phoneList.add("15422018558");
        phoneList.add("17269452013");
        phoneList.add("17764278604");
        phoneList.add("15711910344");
        phoneList.add("15714728273");
        phoneList.add("16061028454");
        phoneList.add("16264433631");
        phoneList.add("17601615878");
        phoneList.add("15897468949");

        //随机电话对应的姓名
        phoneNameMap.put("17078388295", "李雁");
        phoneNameMap.put("13980337439", "卫艺");
        phoneNameMap.put("14575535933", "仰莉");
        phoneNameMap.put("19902496992", "陶欣悦");
        phoneNameMap.put("18549641558", "施梅梅");
        phoneNameMap.put("17005930322", "金虹霖");
        phoneNameMap.put("18468618874", "魏明艳");
        phoneNameMap.put("18576581848", "华贞");
        phoneNameMap.put("15978226424", "华啟倩");
        phoneNameMap.put("15542823911", "仲采绿");
        phoneNameMap.put("17526304161", "卫丹");
        phoneNameMap.put("15422018558", "戚丽红");
        phoneNameMap.put("17269452013", "何翠柔");
        phoneNameMap.put("17764278604", "钱溶艳");
        phoneNameMap.put("15711910344", "钱琳");
        phoneNameMap.put("15714728273", "缪静欣");
        phoneNameMap.put("16061028454", "焦秋菊");
        phoneNameMap.put("16264433631", "吕访琴");
        phoneNameMap.put("17601615878", "沈丹");
        phoneNameMap.put("15897468949", "褚美丽");
    }

    // 生产数据
    // caller,callee,buildTime,duration
    // 主叫,被叫,通话建立时间,通话持续时间
    public String product() {
        String caller;
        String callee;

        // 生成主叫的随机索引
        int callerIndex = (int) (Math.random() * phoneList.size());
        // 通过随机索引获得主叫电话号码
        caller = phoneList.get(callerIndex);

        while (true) {
            int calleeIndex = (int) (Math.random() * phoneList.size());
            callee = phoneList.get(calleeIndex);
            // 去重判断
            if (!caller.equals(callee)) break;
        }

        // 随机产生通话建立时间
        String buildTime = randomBuildTime(startTime,endTime);

        // 随机产生通话持续时间
        DecimalFormat df = new DecimalFormat("0000");
        String duration = df.format((int) (30 * 60 * Math.random()));

        StringBuilder sb = new StringBuilder();
        sb.append(caller + ",").append(callee + ",").append(buildTime + ",").append(duration);

        return sb.toString();
    }

    // 随机生成时间
    private String randomBuildTime(String startTime, String endTime) {
        try {
            SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
            Date startDate = sdf1.parse(startTime);
            Date endDate = sdf1.parse(endTime);

            // 生成时间字符串
            if(endDate.getTime() <= startDate.getTime()){return null;}

            // (结束 - 起始) * 随机[0,1) + 起始
            long randomTS = startDate.getTime() + (long)((endDate.getTime() - startDate.getTime())*Math.random());
            Date resultDate = new Date(randomTS);
            SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String resultTimeString = sdf2.format(resultDate);

            return resultTimeString;
        } catch (ParseException e) {
            e.printStackTrace();
        }
        return null;
    }

    public void writeLog(String filePath){
        try {
            OutputStreamWriter osw = new OutputStreamWriter(new FileOutputStream(filePath,true), "UTF-8");
            while(true){
                try {
                    Thread.sleep(500);
                    String log = product();
                    System.out.println(log);
                    osw.write(log+"\n");
                    osw.flush();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        args = new String[]{"F:\\idea-workspace\\CT_BD\\data\\calllog.csv"};
        ProductLog productLog = new ProductLog();
        productLog.initPhone();
        productLog.product();
        productLog.writeLog(args[0]);
    }
}

producer.sh

#!/bin/bash
java -cp /root/temp/CT_producer-1.0-SNAPSHOT.jar ProductLog /root/temp/calllog.csv

数据消费

  • Flume用于监控目标文件的变化,并把信息传递到Kafka

Flume配置

#定义agent名, source、channel、sink的名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#具体定义source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /root/temp/calllog.csv
a1.sources.r1.shell = /bin/bash -c

#具体定义channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#具体定义sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = bigdata111:9092
a1.sinks.k1.topic = call
a1.sinks.k1.batchSize = 20
  • 启动kafka生产者:bin/kafka-server-start.sh config/server.properties &
  • 创建主题:bin/kafka-topics.sh --create --zookeeper bigdata111:2181 -replication-factor 1 --partitions 3 --topic calllog
  • 启动kafka消费者:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mydemo1 --from-beginning
  • 启动flume:bin/flume-ng agent -c conf/ -n a1 -f /root/temp/flume-kafka.conf
  • 生产数据:sh producer.sh

[项目] 电信数据运营

数据存储

  • 将产生的数据实时存储在HBase中
  • 编写调用HBaseAPI的相关方法,将从Kafka中读取出来的数据写入HBase中
  • HBase的描述器:命名空间描述器、表描述器、列族描述器
  • 协处理器:主叫插入f1后,被叫插入f2。修改程序和虚拟机中的 hbase-site.xml  

数据分析

  • 逻辑简单,代码量大
  • 按照时间范围(年月日),统计出所属时间范围内所有手机号码的通话次数总和及通话时长总和
    • 维度:某个视角,如按时间维度,统计2018年全年的通话记录,表示为2018年*月*日
    • 通过Mapper将数据按照不同维度聚合给Reducer
    • 通过Reducer拿到按各个维度聚合过来的数据,汇总输出
    • 将Reducer输出通过outputformat输出到Mysql表
  • 表结构设计
    • contacts:存放手机号,联系人姓名
    • call:存放某个时间维度下通话次数及通话时长总和
    • dimension_data:存放时间
  • 数据形式:联系人维度,时间维度
电话号码:123456 Chen
年:2020
月:12
日:31
  • HBase-->Mysql
    • Sqoop
    • 自定义输出:map,reducer,outputformat,runner
  • 下载 lombok 插件,并在maven中添加依赖

数据展示

参考

flume

https://www.freesion.com/article/4812259552/

电信项目

https://blog.csdn.net/liu16659/article/details/81133090?

相关推荐

crazyhulu / 0评论 2019-07-24