hell0kitty 2020-01-25
eg:Wordcount案例 val lines = sc.textFile("本地URL or HDFS URL")//详解见代码1 val words = lines.flatMap(line => line.split(" "))//也会返回一个MapPartitionsRDD val pairs = words.map(word => (word , 1))//同样也是返回一个MapPartitionsRDD val counts = pairs.reduceByKey(_+_)//详解见代码2 counts.foreach(count => printLn(count._1 + ":" + count._2))//见代码4
代码1 /** * Read a text file from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return it as an RDD of Strings. */ /** *首先,hadoopFile()方法的调用,会创建一个HadoopRDD,其中的元素,其实是(key,value) pair RDD . key 是hdfs或文本文件的每一行的offset, value 就是文本行 *然后对HadoopRDD调用map()方法,会剔除key,只保留value,然后会获得一个MapPartitionsRDD,MapPartitionsRDD内部的元素,其实就是一行一行的文本行 */ def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString).setName(path) } //因为RDD.scala类中是没有ReduceByKey方法的,因此它会调用ReduceByKey方法时,会触发scala的隐式转换;此时就会在作用域内,寻找隐式转换,会在RDD中找到rddToPairRDDFunctions()隐式转换,然后再去PairRDDFunctions类里面调用ReduceByKey方法 implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = { new PairRDDFunctions(rdd)//代码详见代码3 } 代码3 def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) } 代码4 //通过foreach方法进行runjob的多次重载到本RunJob方法 def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { if (stopped.get()) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD‘s recursive dependencies:\n" + rdd.toDebugString) } // 调用SparkContext,之前初始化时创建的DAGScheduler的Runjob方法 // 会把当前执行action操作的RDD传到DAGScheduler的runjob方法中 dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() }