onwaygoahead 2020-05-05
package com.bawei.stream import java.net.InetSocketAddress import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} object StreamFlume { def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { val newCount =runningCount.getOrElse(0)+newValues.sum Some(newCount) } def main(args: Array[String]): Unit = { //配置sparkConf参数 val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Poll").setMaster("local[2]") //构建sparkContext对象 val sc: SparkContext = new SparkContext(sparkConf) sc.setLogLevel("WARN") //构建StreamingContext对象,每个批处理的时间间隔 val scc: StreamingContext = new StreamingContext(sc, Seconds(5)) //设置checkpoint scc.checkpoint("C:\\Users\\Desktop\\checkpoint2") //设置flume的地址,可以设置多台 val address=Seq(new InetSocketAddress("192.168.182.147",8888)) // 从flume中拉取数据 val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(scc,address,StorageLevel.MEMORY_AND_DISK) //获取flume中数据,数据存在event的body中,转化为String val lineStream: DStream[String] = flumeStream.map(x=>new String(x.event.getBody.array())) //实现单词汇总 val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunction) result.print() scc.start() scc.awaitTermination() } }