spb 2020-02-18
目录
Spark Streaming源码流程解析。
以下是我自己梳理了一遍Spark Streaming程序运行的流程,过程可能有点细、有点乱。
大家可以一边看我写的流程、一边跟着步骤点进去看源码,这样就不会太乱了。
跟着源码走一遍以后,对Spark Streaming的理解也就很清晰了。
这篇文章是自己看源码过程的记录,如果有理解偏差的部分,欢迎交流指正。
以如下的WordCount代码展开叙述:
// 创建SparkConf,配置master为local val conf = new SparkConf() .setMaster("local[2]") .setAppName("socket-streaming") // 实例化StreamingContext val ssc = new StreamingContext(conf, Seconds(2)) // 创建一个ReceiverInputDStream对象 val lines = ssc socketTextStream("localhost", 1234) // 进行逻辑处理、输出 lines .flatMap(_.split(" ")) .map((_, 1)) .reduceByKey(_ + _) .print() // 启动 ssc.start() // 等待执行停止 ssc.awaitTermination()
以上代码启动后,可以接受1234端口收到的消息,然后按空格将句子切分成单词,之后对单词进行计数,每隔两秒计算输出一次结果。
接下来以我们写的WordCount代码为辅,从启动流处理引擎、接收并存储数据、处理数据、输出数据依次走一遍源码。
从val ssc = new StreamingContext(conf, Seconds(2))
开始,这里会实例化StreamingContext对象。
先看一下StreamingContext中的一些重要的变量。
// SparkContext实例,Spark上下文,可以通过直接传参获得, // 也可以通过sparkConf创建,或从checkpoint中取到 private[streaming] val sc: SparkContext = { if (_sc != null) { _sc } else if (isCheckpointPresent) { SparkContext.getOrCreate(_cp.createSparkConf()) } else { throw new SparkException("Cannot create StreamingContext without a SparkContext") } } // DStreamGraph用来管理DStream的依赖, // 创建时将StreamingContext实例绑定到DStreamGraph上 private[streaming] val graph: DStreamGraph = { if (isCheckpointPresent) { _cp.graph.setContext(this) _cp.graph.restoreCheckpointData() _cp.graph } else { require(_batchDur != null, "Batch duration for StreamingContext cannot be null") val newGraph = new DStreamGraph() newGraph.setBatchDuration(_batchDur) newGraph } } // JobScheduler用来生成和调度任务, // 也会将StreamingContext实例绑定到自己身上 private[streaming] val scheduler = new JobScheduler(this) // 批处理间隔 batchDuration
实例化StreamingContext时,这些变量都将会被实例化。
既然这样,就顺势也看一下DStreamGraph和JobScheduler中一些重要的变量。
先看一下DStreamGraph中的重要变量:
// inputStreams是输入数据源的集合, // 输入数据源中有对应的receive方法用来接收数据 private val inputStreams = new ArrayBuffer[InputDStream[_]]() // outputStreams就是DStream的集合, // 我们调用的各个算子最终都会根据依赖生成的DStream, // outputOperator型的算子都会注册到这里来 private val outputStreams = new ArrayBuffer[DStream[_]]()
再看看JobScheduler中的重要变量:
// 生成的job集合,以time为key,jobset为value的Map private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet] // 一个线程池,用来执行job private val jobExecutor = ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor") // JobGenerator用来生成job private val jobGenerator = new JobGenerator(this) // Driver端用于管理Receiver的总管家 var receiverTracker: ReceiverTracker = null // 事件循环,用来处理JobScheduler相关的事件 // 本质是以LinkedBlockingDeque一个队列 private var eventLoop: EventLoop[JobSchedulerEvent] = null
接下来执行val lines = ssc.socketTextStream("localhost", 1234)
如下所示,socketTextStream()会调用socketStream(),socketStream方法中会new一个SocketInputDStream,SocketInputDStream用于接收数据
def socketTextStream( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[String] = withNamedScope("socket text stream") { socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) } def socketStream[T: ClassTag]( hostname: String, port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel ): ReceiverInputDStream[T] = { new SocketInputDStream[T](this, hostname, port, converter, storageLevel) }
追踪一下SocketInputDStream的继承关系,发现它继承于ReceiverInputDStream,ReceiverInputDStream又继承于InputDStream。
InputDStream中有ssc.graph.addInputStream(this)
这么一行代码,将InputDStream添加到DStreamGraph中的inputStreams中。
所以在new SocketInputDStream时,InputDStream就添加到DStreamGraph中了。(这个找了挺久才找见的,之前一直不知道InputDStream什么时候添加进去的)
接着执行如下几行代码
lines .flatMap(_.split(" ")) .map((_, 1)) .reduceByKey(_ + _) .print()
上面每个算子的调用会生成相互依赖的DStream: FlatMappedDStream、MappedDStream、ShuffledDStream。
只有到print()(outputOperator类算子)调用的时候,才会将DStream注册到DStreamGraph中的outputStreams中,之后DStreamGraph才能根据依赖关系生成job。
接下来跟进一下print()
// 以下的方法是依次调用的 def print(): Unit = ssc.withScope { print(10) } def print(num: Int): Unit = ssc.withScope { def foreachFunc: (RDD[T], Time) => Unit = { (rdd: RDD[T], time: Time) => { val firstNum = rdd.take(num + 1) // scalastyle:off println println("-------------------------------------------") println(s"Time: $time") println("-------------------------------------------") firstNum.take(num).foreach(println) if (firstNum.length > num) println("...") println() // scalastyle:on println } } foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false) } private def foreachRDD( foreachFunc: (RDD[T], Time) => Unit, displayInnerRDDOps: Boolean): Unit = { new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register() } private[streaming] def register(): DStream[T] = { ssc.graph.addOutputStream(this) this }
这里调用了register()
将DStream注册到DStreamGraph的outputStreams中
到这里就将我们的业务逻辑什么的都封装到DStream中了
接下来走ssc.start()
启动StreamingContext
StreamingContext的start方法中主要就是调用scheduler.start()
,启动了JobScheduler
接下来在看看JobScheduler的start方法
def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") // 事件环主要接收调度JobSchedulerEvent事件 eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } // 启动事件环,接收事件、处理事件 eventLoop.start() // 添加监听 for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) // 监听总线启动 listenerBus.start() receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match { case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient] case _ => null } // 管理分配Executor executorAllocationManager = ExecutorAllocationManager.createIfEnabled( executorAllocClient, receiverTracker, ssc.conf, ssc.graph.batchDuration.milliseconds, clock) executorAllocationManager.foreach(ssc.addStreamingListener) // 启动ReceiverTracker receiverTracker.start() // 启动JobGenerator jobGenerator.start() executorAllocationManager.foreach(_.start()) logInfo("Started JobScheduler") }
JobScheduler中主要启动了ReceiverTracker和JobGenerator。
ReceiverTracker通知Executor启动Receiver,管理Receiver的执行,与Receiver交互。
JobGenerator用于生成job,执行job。
这两个类分别代表了接收并存储数据 和 生成job、执行job
接下来先看接收并存储数据
先从ReceiverTracker.start()说起。
def start(): Unit = synchronized { if (isTrackerStarted) { throw new SparkException("ReceiverTracker already started") } if (!receiverInputStreams.isEmpty) { // 建立RPC终端 endpoint = ssc.env.rpcEnv.setupEndpoint( "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) // 加载Receiver if (!skipReceiverLaunch) launchReceivers() logInfo("ReceiverTracker started") trackerState = Started } } // 加载Receiver private def launchReceivers(): Unit = { // 从inputStreams中获取receivers val receivers = receiverInputStreams.map { nis => val rcvr = nis.getReceiver() rcvr.setReceiverId(nis.id) rcvr } runDummySparkJob() // 发送StartAllReceivers的消息 logInfo("Starting " + receivers.length + " receivers") endpoint.send(StartAllReceivers(receivers)) }
ReceiverTracker先建立RPC终端点准备通信,监听、回复与Receiver相关的信息。
然后调用launchReceivers(),launchReceivers中的receiverInputStreams是从DStreamGraph中获取的InputStream的集合。通过InputStream获取Receiver,然后发送StartAllReceivers消息。
这里的StartAllReceivers是发给endpoint的,也就是发给ReceiverTrackerEndpoint实例,也就相当于是发给自己的。
ReceiverTrackerEndpoint的receive方法通过模式匹配进行消息的接收,在收到StartAllReceivers后,会根据资源调度分配适合启动Receiver的位置,然后调用本类的startReceiver()
override def receive: PartialFunction[Any, Unit] = { // Local messages case StartAllReceivers(receivers) => // 分配适合的位置 val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors) for (receiver <- receivers) { val executors = scheduledLocations(receiver.streamId) updateReceiverScheduledExecutors(receiver.streamId, executors) receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation startReceiver(receiver, executors) } }
接下来看看startReceiver方法
private def startReceiver( receiver: Receiver[_], scheduledLocations: Seq[TaskLocation]): Unit = { def shouldStartReceiver: Boolean = { !(isTrackerStopping || isTrackerStopped) } val receiverId = receiver.streamId if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) return } val checkpointDirOption = Option(ssc.checkpointDir) val serializableHadoopConf = new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration) // 封装在worker节点启动receiver的方法 val startReceiverFunc: Iterator[Receiver[_]] => Unit = (iterator: Iterator[Receiver[_]]) => { if (!iterator.hasNext) { throw new SparkException( "Could not start receiver as object not found.") } if (TaskContext.get().attemptNumber() == 0) { val receiver = iterator.next() assert(iterator.hasNext == false) val supervisor = new ReceiverSupervisorImpl( receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) supervisor.start() supervisor.awaitTermination() } else { } } // 使用ScheduledLocations创建RDD以在Spark作业中运行接收器 val receiverRDD: RDD[Receiver[_]] = if (scheduledLocations.isEmpty) { ssc.sc.makeRDD(Seq(receiver), 1) } else { val preferredLocations = scheduledLocations.map(_.toString).distinct ssc.sc.makeRDD(Seq(receiver -> preferredLocations)) } // 提交启动receiver的job到spark核心进行启动 val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit]( receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ()) // We will keep restarting the receiver job until ReceiverTracker is stopped future.onComplete { ... }(ThreadUtils.sameThread) logInfo(s"Receiver ${receiver.streamId} started") }
stratReceiver方法先封装了启动receiver的方法和RDD,然后提交给spark核心进行执行。
上面代码startReceiverFunc中,封装了创建和启动ReceiverSupervisor的操作。
ReceiverSupervisor是Executor端Receiver的管理者,负责监督和管理Executor中的Receiver的运行
接下来追踪ReceiverSupervisor的start方法。
/** Start the supervisor */ def start() { onStart() startReceiver() } // ReceiverSupervisorImpl中的onStart方法 override protected def onStart() { registeredBlockGenerators.asScala.foreach { _.start() } } // ReceiverSupervisor的方法,用于启动Receiver def startReceiver(): Unit = synchronized { try { if (onReceiverStart()) { receiverState = Started // 启动receiver,开始接收数据 receiver.onStart() } else { ... } } catch { } }
在onStart方法中,可以看到一个registeredBlockGenerators集合,它是BlockGenerator的集合。
BlockGenerator是Receiver中比较重要的一个类,用于将我们收到的单条数据写入buffer,然后定时将buffer封装为块,进行存储和汇报给Driver。
接下来详细看一下它的变量和方法
// listener创建BlockGenerator时传进来的监听器, // 用来监听块相关事件:onAddData、onGenerateBlock、onPushBlock listener: BlockGeneratorListener // 是一个ArrayBuffer,用来暂存接收到的数据 @volatile private var currentBuffer = new ArrayBuffer[Any] // 一个队列,用来存取封装好的Block块 private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize) // 定时器,定时将currentBuffer中的数据封装为Block,然后推到blocksForPushing里面 private val blockIntervalTimer = new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator") // blocksForPushing队列的大小 private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10) // 这是一个线程,用来从blocksForPushing中取出Block,然后进行存储,汇报ReceiverTracker private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } // 按照时间生成块,然后将块推到blocksForPushing中 private def updateCurrentBuffer(time: Long): Unit = { try { var newBlock: Block = null synchronized { if (currentBuffer.nonEmpty) { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] val blockId = StreamBlockId(receiverId, time - blockIntervalMs) listener.onGenerateBlock(blockId) newBlock = new Block(blockId, newBlockBuffer) } } if (newBlock != null) { blocksForPushing.put(newBlock) // put is blocking when queue is full } } catch { } } // 推送块 private def keepPushingBlocks() { ... while (!blocksForPushing.isEmpty) { val block = blocksForPushing.take() logDebug(s"Pushing block $block") // 调用本类的pushBlock方法 pushBlock(block) logInfo("Blocks left to push " + blocksForPushing.size()) } logInfo("Stopped block pushing thread") } catch { case ie: InterruptedException => logInfo("Block pushing thread was interrupted") case e: Exception => reportError("Error in block pushing thread", e) } } // 推送块 private def pushBlock(block: Block) { listener.onPushBlock(block.id, block.buffer) logInfo("Pushed block " + block.id) }
大体来说,BlockGenerator中使用了一个ArrayBuffer来不断的接收存储数据,然后会按时将ArrayBuffer中的数据封装为Block。另有一个队列ArrayBlockingQueue来存取Block,一边存一边取,这样实现了单条数据的接收与存储。
再接着看pushBlock的操作。其中调用了listener.onPushBlock()。
listener是构造BlockGenerator时传进来的,使用的是ReceiverSupervisorImpl中的defaultBlockGeneratorListener。
private val defaultBlockGeneratorListener = new BlockGeneratorListener { def onAddData(data: Any, metadata: Any): Unit = { } def onGenerateBlock(blockId: StreamBlockId): Unit = { } def onError(message: String, throwable: Throwable) { reportError(message, throwable) } // 推块的时候调用,它又会调用ReceiverSupervisorImpl.pushArrayBuffer() def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { pushArrayBuffer(arrayBuffer, None, Some(blockId)) } } // 将接收到的数据的ArrayBuffer作为数据块存储到Spark的内存中 def pushArrayBuffer( arrayBuffer: ArrayBuffer[_], metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { // 调用pushAndReportBlock() pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption) } // 将块数据进行存储,然后汇报给Driver def pushAndReportBlock( receivedBlock: ReceivedBlock, metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { val blockId = blockIdOption.getOrElse(nextBlockId) val time = System.currentTimeMillis // 这步会真正的存储数据 val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") val numRecords = blockStoreResult.numRecords val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) // 将存储结果报告Driver if (!trackerEndpoint.askSync[Boolean](AddBlock(blockInfo))) { throw new SparkException("Failed to add block to receiver tracker.") } logDebug(s"Reported block $blockId") }
listener.onPushBlock会调用pushArrayBuffer(),pushArrayBuffer方法会调用pushAndReportBlock()将数据进行存储,然后汇报给Driver。
这里需要注意一下:BlockGenerator负责单条数据的接收与生成快。这个一会会再说。
BlockGenerator的内部看完以后,接着回到ReceiverSupervisor.start()中来
def start() { onStart() startReceiver() }
onStart()方法中启动BlockGenerator,启动块生成的定时器和推送块的线程
def start(): Unit = synchronized { if (state == Initialized) { state = Active blockIntervalTimer.start() blockPushingThread.start() logInfo("Started BlockGenerator") } else { throw new SparkException( s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]") } }
startReceiver()方法中,调用receiver.onStart(),开始接收数据
def startReceiver(): Unit = synchronized { try { if (onReceiverStart()) { receiverState = Started // 启动receiver开始接收数据 receiver.onStart() } else { stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None) } } catch { } }
以我们一开写的demo中的SocketInputDStream为例,它会生成一个SocketReceiver实例,以下是SocketReceiver的onStart方法。
def onStart() { try { // 启动socket,开始监听 socket = new Socket(host, port) } catch { } new Thread("Socket Receiver") { setDaemon(true) override def run() { receive() } }.start() } def receive() { try { // 接收数据 val iterator = bytesToObjects(socket.getInputStream()) while(!isStopped && iterator.hasNext) { // 将接收到的数据进行存储 store(iterator.next()) } } catch { ... } finally { onStop() } }
可以看到,onStart中启动了一个线程,开始不断的接收数据,之后会调用store()将接收到的数据进行存储。
这里的store()方法是Receiver中定义的,我们跟进一下。
def store(dataItem: T) { supervisor.pushSingle(dataItem) } /** Store an ArrayBuffer of received data as a data block into Spark's memory. */ def store(dataBuffer: ArrayBuffer[T]) { supervisor.pushArrayBuffer(dataBuffer, None, None) } /** * Store an ArrayBuffer of received data as a data block into Spark's memory. * The metadata will be associated with this block of data * for being used in the corresponding InputDStream. */ def store(dataBuffer: ArrayBuffer[T], metadata: Any) { supervisor.pushArrayBuffer(dataBuffer, Some(metadata), None) } /** Store an iterator of received data as a data block into Spark's memory. */ def store(dataIterator: Iterator[T]) { supervisor.pushIterator(dataIterator, None, None) }
会发现有好几个重载的方法,参数不尽相同。
SocketReceiver中调用的是store(dataItem: T)
这个方法,它会调用pushSingle将数据添加到BlockGenerator中的currentBuffer中。BlockGenerator再定时将currentBuffer封装为Block,然后调用pushBlock、pushArrayBuffer、pushAndReportBlock对数据进行存储、汇报Driver。
store(dataItem: T)
就相当于之前说的接收单条数据进行存储的操作。
另外几个重载方法也都会最终也都会调用pushAndReportBlock数据进行存储,然后报告Driver。这里就不再跟下去了。
数据的接收与存储到这里就结束了。接下来我们在回到JobGenerator解析一下job的生成和执行。
视线在跳回到JobGenerator这边来,先看看JobGenerator中几个重要变量
// job生成消息的事件环 private var eventLoop: EventLoop[JobGeneratorEvent] // 定时器,按照批处理间隔定时向eventLoop发送生成job的消息 private val timer = new RecurringTimer( clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator" )
接下来看看JobGenerator的start方法
def start(): Unit = synchronized { if (eventLoop != null) return checkpointWriter // eventLoop的回调方法onReceive会调用processEvent(event)进行事件的处理 eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } // 启动事件环 eventLoop.start() if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() } }
start方法中会启动eventLoop和调用startFirstTime()。
eventLoop启动后,会启动一个线程来不断的接收消息,根据接收到的消息作出相应的操作
看一下startFirstTime(),startFirstTime中启动了DStreamGraph 和 用于定时发送生成job消息的定时器
/** Starts the generator for the first time */ private def startFirstTime() { val startTime = new Time(timer.getStartTime()) // 启动DStreamGraph graph.start(startTime - graph.batchDuration) // 启动定时器timer timer.start(startTime.milliseconds) logInfo("Started JobGenerator at " + startTime) }
DStreamGraph的start方法就不跟进了,没有很重要的东西。
timer启动后,会定时发送GenerateJobs(new Time(longTime))
的消息。eventLoop在收到消息后,调用processEvent方法进行处理,如下:
private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) case ClearCheckpointData(time) => clearCheckpointData(time) } }
接下来就开始generateJobs的旅程了。
首先processEvent会将GenerateJobs消息通过调用JobGenerator.generateJobs()进行处理。
以下是JobGenerator的generateJobs方法:
// 根据时间生成job private def generateJobs(time: Time) { ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true") Try { // 调用receiverTracker给批分配数据 jobScheduler.receiverTracker.allocateBlocksToBatch(time) // 在DStreamGraph中根据分配的块生成job graph.generateJobs(time) } match { // 如果job生成成功,调用jobScheduler.submitJobSet提交job case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) // 失败则打报告 case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) PythonDStream.stopStreamingContextIfPythonProcessIsDead(e) } // 完成后进行checkpoint eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) }
首先会调用receiverTracker.allocateBlocksToBatch()给当前批分配需要处理的数据,之后调用DStreamGraph.generateJobs()生成job序列,如果生成成功,调用jobScheduler.submitJobSet提交job。
先跟进一下DStreamGraph.generateJobs():
def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { // 根据outputStream生成job outputStreams.flatMap { outputStream => val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs }
发现这里会遍历outputStreams生成job,outputStreams中存放的是我们调用的outputOperation算子对应的DStream,也就是之前说的调用outputOperation算子将DStream注册到DStreamGraph中的outputStreams中。
以我们最开始的WordCount代码为例,我们的代码最终会添加一个ForEachDStream到outputStreams中去。
所以就会调用这里就调用ForEachDStream.generateJob()来生成job。
以下是ForEachDStream的generateJob方法:
override def generateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match { case Some(rdd) => val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) { foreachFunc(rdd, time) } Some(new Job(time, jobFunc)) case None => None } }
generateJob方法会调用parent.getOrCompute()生成RDD,如果生成成功,以RDD和我们定义的逻辑处理函数构造Job,并返回job。
需要注意一下这里的parent,parent其实就是它所依赖的上一个DStream的引用,
lines .flatMap(_.split(" ")) .map((_, 1)) .reduceByKey(_ + _) .print()
以我们写的代码为例,这里的parent就是由reduceByKey算子生成的ShuffledDStream的引用,ShuffledDStream中的parent是map生成的MappedDStream的引用,MappedDStream中的parent是flatMap生成的FlatMappedDStream的引用。
FlatMappedDStream中的parent就是SocketInputDStream的引用
跟进一下parent.getOrCompute(),现在的parent是ShuffledDStream的引用
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = { // 已经生成的RDD集合,是以时间为key,rdd为value的HashMap generatedRDDs.get(time).orElse { if (isTimeValid(time)) { val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) { // 执行compute方法,生成rdd,几乎每个DStream子类都会实现这个方法 SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) { compute(time) } } // 对生成的rdd缓存或checkpoint,添加到已经生成的RDD集合中 rddOption.foreach { case newRDD => if (storageLevel != StorageLevel.NONE) { newRDD.persist(storageLevel) } if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { newRDD.checkpoint() } generatedRDDs.put(time, newRDD) } rddOption } else { None } } }
DStream中定义了一个generatedRDDs用来存储已经生成的RDD。
会先去generatedRDDs中获取当前批的RDD,如果不存在则执行compute()生成RDD。
按我们写的代码来走的话,调用的是ShuffledDStream的compute方法。
override def compute(validTime: Time): Option[RDD[(K, C)]] = { parent.getOrCompute(validTime) match { case Some(rdd) => Some(rdd.combineByKey[C]( createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)) case None => None } }
发现又调用了parent.getOrCompute生成RDD。
我们就可以发现它是根据依赖关系,循环的去调用getOrCompute和compute,直到最开始的DStream。
我们代码中最开始的是SocketInputDStream,会调用SocketInputDStream实例的compute方法,SocketInputDStream没有compute方法,这里调用的是他的父类ReceiverInputDStream的compute方法。
override def compute(validTime: Time): Option[RDD[T]] = { val blockRDD = { if (validTime < graph.startTime) { new BlockRDD[T](ssc.sc, Array.empty) } else { // 获取当前分配给当前批的块信息 val receiverTracker = ssc.scheduler.receiverTracker val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) // 根据批时间和块信息创建RDD,并返回 createBlockRDD(validTime, blockInfos) } } Some(blockRDD) }
一系列操作生成RDD完成后,回到ForEachDStream的generateJob方法,
override def generateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match { case Some(rdd) => val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) { foreachFunc(rdd, time) } Some(new Job(time, jobFunc)) case None => None } }
根据生成的RDD和业务处理函数封装成job,返回job到DStream.generateJobs()
DStream.generateJobs()再将job返回到JobGenerator.generateJobs()中来
此刻,我们的job就生成完成了。
接下来JobGenerator.generateJobs()中会执行jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
,将job进行提交。
def submitJobSet(jobSet: JobSet) { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else { listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) } }
这里会将job封装到JobHandler中进行处理,JobHandler是一个线程类,其中会执行job.run
运行job。
以下是Job的run方法,其中的func()就是我们封装进来的业务处理函数。
def run() { _result = Try(func()) }
将JobHandler扔到线程池中执行,我们的job就跑起来了。
job跑起来后,会根据我们封装的func(),执行对应的输出。
end...
至此,Spark Streaming源码流程解析就over了。
多敲、多看、多搬砖、加油。
个人公众号:码农峰,定时推送行业资讯,持续发布原创技术文章,欢迎大家关注。