hell0kitty 2020-01-25
eg:Wordcount案例
val lines = sc.textFile("本地URL or HDFS URL")//详解见代码1
val words = lines.flatMap(line => line.split(" "))//也会返回一个MapPartitionsRDD
val pairs = words.map(word => (word , 1))//同样也是返回一个MapPartitionsRDD
val counts = pairs.reduceByKey(_+_)//详解见代码2
counts.foreach(count => printLn(count._1 + ":" + count._2))//见代码4代码1
/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
/**
*首先,hadoopFile()方法的调用,会创建一个HadoopRDD,其中的元素,其实是(key,value) pair RDD . key 是hdfs或文本文件的每一行的offset, value 就是文本行
*然后对HadoopRDD调用map()方法,会剔除key,只保留value,然后会获得一个MapPartitionsRDD,MapPartitionsRDD内部的元素,其实就是一行一行的文本行
*/
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
//因为RDD.scala类中是没有ReduceByKey方法的,因此它会调用ReduceByKey方法时,会触发scala的隐式转换;此时就会在作用域内,寻找隐式转换,会在RDD中找到rddToPairRDDFunctions()隐式转换,然后再去PairRDDFunctions类里面调用ReduceByKey方法
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
new PairRDDFunctions(rdd)//代码详见代码3
}
代码3
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}
代码4
//通过foreach方法进行runjob的多次重载到本RunJob方法
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD‘s recursive dependencies:\n" + rdd.toDebugString)
}
// 调用SparkContext,之前初始化时创建的DAGScheduler的Runjob方法
// 会把当前执行action操作的RDD传到DAGScheduler的runjob方法中
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}