yanqianglifei 2020-02-22
目录
本文介绍一下rdd的基本属性概念、rdd的转换/行动操作、rdd的宽/窄依赖。
RDD:Resilient Distributed Dataset 弹性分布式数据集,是Spark中的基本抽象。
RDD表示可以并行操作的元素的不变分区集合。
RDD提供了许多基本的函数(map、filter、reduce等)供我们进行数据处理。
通常来说,每个RDD有5个主要的属性组成:
分区列表
RDD是由多个分区组成的,分区是逻辑上的概念。RDD的计算是以分区为单位进行的。
用于计算每个分区的函数
作用于每个分区数据的计算函数。
对其他RDD的依赖关系列表
RDD中保存了对于父RDD的依赖,根据依赖关系组成了Spark的DAG(有向无环图),实现了spark巧妙、容错的编程模型
针对键值型RDD的分区器
分区器针对键值型RDD而言的,将key传入分区器获取唯一的分区id。在shuffle中,分区器有很重要的体现。
对每个分区进行计算的首选位置列表
根据数据本地性的特性,获取计算的首选位置列表,尽可能的把计算分配到靠近数据的位置,减少数据的网络传输。
//创建此RDD的SparkContext def sparkContext: SparkContext = sc // 唯一的id val id: Int = sc.newRddId() // rdd友善的名字 @transient var name: String = _ // 分区器 val partitioner: Option[Partitioner] = None // 获取依赖列表 // dependencies和partitions中都用到了checkpointRDD,如果进行了checkpoint,checkpointRDD表示进行checkpoint后的rdd final def dependencies: Seq[Dependency[_]] = { // 一对一的窄依赖 checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse { if (dependencies_ == null) { dependencies_ = getDependencies } dependencies_ } } // 获取分区列表 final def partitions: Array[Partition] = { checkpointRDD.map(_.partitions).getOrElse { if (partitions_ == null) { partitions_ = getPartitions partitions_.zipWithIndex.foreach { case (partition, index) => require(partition.index == index, s"partitions($index).partition == ${partition.index}, but it should equal $index") } } partitions_ } } // 获取分区的首选位置 final def preferredLocations(split: Partition): Seq[String] = { checkpointRDD.map(_.getPreferredLocations(split)).getOrElse { getPreferredLocations(split) } } // 对应到每个分区的计算函数 def compute(split: Partition, context: TaskContext): Iterator[T]
主要就是围绕上面5个重要属性的一些操作
// 返回仅包含满足过滤条件的元素的新RDD。 def filter(f: T => Boolean): RDD[T] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[T, T]( this, (context, pid, iter) => iter.filter(cleanF), preservesPartitioning = true) } // 通过将函数应用于此RDD的所有元素来返回新的RDD。 def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) } // 首先向该RDD的所有元素应用函数,然后将结果展平,以返回新的RDD。 def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF)) }
我们可以发现几乎每个算子都会以当前RDD和对应的计算函数创建新的RDD,每个子RDD都持有父RDD的引用。
这就印证了RDD的不变性,也表明了RDD的计算是通过对RDD进行转换实现的。
val words = Seq("hello spark", "hello scala", "hello java") val rdd = sc.makeRDD(words) rdd .flatMap(_.split(" ")) .map((_, 1)) .reduceByKey(_ + _) .foreach(println(_))
上面是一个简单的RDD的操作,我们先调用makeRDD创建了一个RDD,之后对rdd进行一顿算子调用。
首先调用flatMap,flatMap内部会以当前rdd
和我们传入的_.split(" ")
构建新的MapPartitionsRDD;
之后map,map以上步生成的MapPartitionsRDD
和我们传入的(_, 1)
构造新的MapPartitionsRDD;
之后reduceByKey,reduceByKey构造新的RDD;
走到foreach,foreach是行动操作,触发计算,输出。
从上面知道RDD是懒执行的,只有遇到行动算子才执行计算。
转换操作:在内部对根据父RDD创建新的RDD,不执行计算
行动操作:内部会调用sc.runJob
,提交作业、划分阶段、执行作业。
foreach、foreachPartition、collect、reduce、count
除行动操作外,都是转换操作
宽窄依赖是shuffle和划分调度的重要依据。
先看看spark中与依赖有关的几个类(一层一层继承关系):
Dependency依赖的顶级父类 NarrowDependency 窄依赖 OneToOneDependency 表示父RDD和子RDD分区之间的一对一依赖关系的窄依赖 RangeDependency 表示父RDD和子RDD中分区范围之间的一对一依赖关系的窄依赖 ShuffleDependency 宽依赖
先说宽窄依赖的概念:
窄依赖:父RDD的每个分区只被一个子RDD分区使用
宽依赖:父RDD的每个分区都有可能被多个子RDD分区使用
其实就是父RDD的一个分区会被传到几个子RDD分区的区别。如果被传到一个子RDD分区,就可以不需要移动数据(移动计算);如果被传到多个子RDD分区,就需要进行数据的传输。
接下来看看Dependency内部的一些属性及方法:
// 依赖对应的rdd,其实就是当前rdd的父rdd。宽依赖和窄依赖都有这个属性 def rdd: RDD[T] // 获取子分区对应的父分区(窄依赖的方法) def getParents(partitionId: Int): Seq[Int] // 以下是宽依赖的属性及方法 // 对应键值RDD的分区器 val partitioner: Partitioner // 在数据传输时的序列化方法 val serializer: Serializer = SparkEnv.get.serializer // 键的排序方式 val keyOrdering: Option[Ordering[K]] = None // 一组用于聚合数据的功能 val aggregator: Option[Aggregator[K, V, C]] = None // 是否需要map端预聚合 val mapSideCombine: Boolean = false // 当前宽依赖的id val shuffleId: Int = _rdd.context.newShuffleId() // 向管理员注册一个shuffle,并获取一个句柄,以将其传递给任务 val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle( shuffleId, _rdd.partitions.length, this)
窄依赖:map、filter、union、mapPartitions、join(当分区器是HashPartitioner)
宽依赖:sortByKey、join(分区器不是HashPartitioner时)
最后说一下reduceByKey,顺便说一下为什么当分区器HashPartitioner时就是窄依赖。
reduceByKey是用来将key分组后,执行我们传入的函数。
它是窄依赖,它内部默认会使用HashPartitioner分区。
同一个key进去HashPartitioner得到的分区id是一样的,这样进行计算前后同一个key得到的分区都一样,父RDD的分区就只被子RDD的一个分区依赖,就不需要移动数据。
所以join、reduceByKey在分区器是HashPartitioner时是窄依赖。
end. 个人理解,如有偏差,欢迎交流指正。
扶我起来,我还能学。
个人公众号:码农峰,定时推送行业资讯,持续发布原创技术文章,欢迎大家关注。