IT影风 2019-11-02
Spark Streaming整合Flume。参考官方整合文档(http://spark.apache.org/docs/2.2.0/streaming-flume-integration.html)
groupId = org.apache.spark artifactId = spark-streaming-flume_2.11 version = 2.2.0
我们知道flume 的使用就是如何配置它的配置文件,使用本地的netcat source来模拟数据,本次配置如下:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = hadoop a1.sources.r1.port = 5900 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop a1.sinks.k1.port = 5901 #a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory #a1.channels.c1.capacity = 1000 #a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
思路如下:
验证代码如下:功能简单的做一个单词统计:
package flume_streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Durations, StreamingContext}
/**
* @Author: SmallWild
* @Date: 2019/11/2 9:42
* @Desc: 基于flumePushWordCount
*/
object flumePushWordCount {
def main(args: Array[String]): Unit = {
if (args.length != 2) {
System.err.println("错误参数,用法:flumePushWordCount <hostname> <port>")
System.exit(1)
}
//传入参数
val Array(hostname, port) = args
//一定不能使用local[1]
val sparkConf = new SparkConf() //.setMaster("local[2]").setAppName("kafkaDirectWordCount")
val ssc = new StreamingContext(sparkConf, Durations.seconds(5))
//设置日志级别
ssc.sparkContext.setLogLevel("WARN")
//TODO 简单的进行单词统计
val flumeStream = FlumeUtils.createStream(ssc, hostname, port.toInt)
flumeStream.map(x => new String(x.event.getBody.array()).trim)
.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
ssc.start()
ssc.awaitTermination()
}
}验证具体步骤如下:
1)打包工程 mvn clean package -DskipTest 2)spark-submit提交(这里使用local模式) ./spark-submit --class flume_streaming.flumePushWordCount / --master local[2] / --packages org.apache.spark:spark-streaming-flume_2.11:2.2.0 / /smallwild/app/SparkStreaming-1.0.jar hadoop 5901 3)开启flume flume-ng agent --name simple-agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf -Dflume.root.logger=INFO,console 4)发送模式数据 这里使用本地5900端口发送数据 telnet hadoop 5900 5)验证 查看streaming应用程序是否能出现对应的单词计数字样
验证结果:能正确统计从端口发送过来的某一批次的单词的数量

这种方式和上面基本一致
groupId = org.apache.spark artifactId = spark-streaming-flume-sink_2.11 version = 2.2.0 groupId = org.scala-lang artifactId = scala-library version = 2.11.8 groupId = org.apache.commons artifactId = commons-lang3 version = 3.5
和前面差别在配置sink,需要使用自定义的sink
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = hadoop a1.sources.r1.port = 5900 # Describe the sink a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink a1.sinks.k1.hostname = hadoop a1.sinks.k1.port = 5901 #a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory #a1.channels.c1.capacity = 1000 #a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
业务逻辑大致和前面一样,这里使用下面的类
import org.apache.spark.streaming.flume._ val flumeStream = FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port])
思路如下:
和前面基本一致
整理两种整合flume的实践。