Spark RDD

Oeljeklaus 2019-12-15

scala> val rdd1 = sc.parallelize(List(63,45,89,23,144,777,888))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:15

查看该RDD的分区数量
scala> rdd1.partitions.length
res0: Int = 1

创建时指定分区数量
scala> val rdd1 = sc.parallelize(List(63,45,89,23,144,777,888),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:15
查看分区数量
scala> rdd1.partitions.length
res1: Int = 3

map/filter

scala> val rdd1 = sc.parallelize(List(1,2,100,3,4))
scala> val rdd2 = rdd1.map(x => x*2).collect
rdd2: Array[Int] = Array(2, 4, 200, 6, 8)
从小到大
scala> val rdd3 = rdd1.map(_*2).sortBy(x =>x,true).collect
rdd3: Array[Int] = Array(2, 4, 6, 8, 200)
从大到小
scala> val rdd3 = rdd1.map(_*2).sortBy(x =>x,false).collect
rdd3: Array[Int] = Array(200, 8, 6, 4, 2)

过滤出大于等于50的元素
scala> val rdd1 = sc.parallelize(List(1,2,100,3,4))
scala> val rdd2 = rdd1.filter(_>50).collect
rdd2: Array[Int] = Array(100)
scala> val rdd2 = rdd1.filter(x => x>50).collect
rdd2: Array[Int] = Array(100)
过滤出偶数
scala> val rdd2 = rdd1.filter(_%2==0).collect
rdd2: Array[Int] = Array(2, 100, 4)

map/flatMap

scala> val rdd1 = sc.parallelize(Array("w u","j i a","d o n g"))
按空格分隔
scala> rdd1.map(_.split(" ")).collect
res28: Array[Array[String]] = Array(Array(w, u), Array(j, i, a), Array(d, o, n, g))
分隔并压平
scala> val rdd2 = rdd1.flatMap(_.split(" "))
scala> rdd2.collect
res5: Array[String] = Array(w, u, j, i, a, d, o, n, g)

scala> val rdd1 = sc.parallelize(List(List("w u","j i a","d o n g"),List("j i a n g","r u i")))
scala> val rdd2 = rdd1.map(_.map(_.split(" "))).collect
rdd2: Array[List[Array[String]]] = Array(List(Array(w, u), Array(j, i, a), Array(d, o, n, g)), List(Array(j, i, a, n, g), Array(r, u, i)))

scala> val rdd2 = rdd1.map(_.flatMap(_.split(" "))).collect
rdd2: Array[List[String]] = Array(List(w, u, j, i, a, d, o, n, g), List(j, i, a, n, g, r, u, i))

scala> val rdd2 = rdd1.flatMap(_.flatMap(_.split(" "))).collect
rdd2: Array[String] = Array(w, u, j, i, a, d, o, n, g, j, i, a, n, g, r, u, i)

union/intersecttion/distinct

scala> val rdd1 = sc.parallelize(List(1,2,3,4))
scala> val rdd2 = sc.parallelize(List(5,6,4,3))
求并集
scala> val rdd3 = rdd1.union(rdd2)
rdd3: Array[Int] = Array(1, 2, 3, 4, 5, 6, 4, 3) 
求交集
scala> val rdd4 = rdd1.intersection(rdd2).collect
rdd4: Array[Int] = Array(4, 3)
去重
scala> val rdd5 = rdd3.distinct
rdd5: Array[Int] = Array(1, 2, 3, 4, 5, 6)

sortBy

对rdd1里的每个元素乘以2,然后排序
scala> val rdd1 = sc.parallelize(List(1,2,100,3,4))
scala> val rdd2 = rdd1.map(_*2).sortBy(x => x,true)//为什么sortBy里面用下划线不行?
scala> rdd2.collect
res21: Array[Int] = Array(2, 4, 6, 8, 200)

注意区别一下两种情况,根据=>右边的情况sort
scala> rdd1.sortBy(x=>x,true).collect
res22: Array[Int] = Array(1, 2, 3, 4,100)

rdd1.sortBy(x=>100-x,true).collect
res23: Array[Int] = Array(100, 4, 3, 2, 1)

groupByKey/groupBy/reduceByKey/sortByKey

scala> val rdd1 = sc.parallelize(Array(("class1",50),("class2",80),("class2",70),("class1",90)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[29] at parallelize at <console>:15
scala> val rdd2 = rdd1.groupByKey()
rdd2: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[30] at groupByKey at <console>:17
scala> val rdd2 = rdd1.groupByKey() 大专栏  Spark RDD.collect
rdd2: Array[(String, Iterable[Int])] = Array((class1,CompactBuffer(50, 90)), (class2,CompactBuffer(80, 70)))
scala> rdd2.foreach(score => {println(score._1);score._2.foreach(singlescore => println(singlescore))})
class1
50
90
class2
80
70
scala> val rdd5 = sc.parallelize(Array(("class1",50),("class2",80),("class2",70),("class1",50)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[47] at parallelize at <console>:24
scala> val rdd2=rdd1.groupBy(x=>x._2).collect
rdd2: Array[(Int, Iterable[(String, Int)])] = Array((80,CompactBuffer((class2,80))), (50,CompactBuffer((class1,50), (class1,50))), (70,CompactBuffer((class2,70))))

scala> val rdd1 = sc.parallelize(Array(("class1",50),("class2",80),("class2",70),("class1",90)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[32] at parallelize at <console>:15
scala> val rdd2 = rdd1.reduceByKey(_+_).collect
rdd2: Array[(String, Int)] = Array((class1,140), (class2,150))

scala> val rdd1 = sc.parallelize(List(("tom",1),("jerry",2),("kitty",3)))
scala> val rdd2 = sc.parallelize(List(("jerry",9),("tom",8),("shuke",7)))
scala> val rdd3 = rdd1.union(rdd2)
按key进行聚合
scala> val rdd4 = rdd3.reduceByKey(_+_)
scala> rdd4.collect
res23: Array[(String, Int)] = Array((tom,9), (jerry,11), (shuke,7), (kitty,3))
按value的降序排序
scala> val rdd5 = rdd4.map(t=>(t._2,t._1)).sortByKey(false).map(t=>(t._2,t._1))
scala> rdd5.collect
res24: Array[(String, Int)] = Array((jerry,11), (tom,9), (shuke,7), (kitty,3))



scala> val rdd1 = sc.parallelize(Array(("class1",50),("class2",80),("class2",70),("class1",90)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[34] at parallelize at <console>:15
scala> val rdd2 = rdd1.sortByKey().collect
rdd2: Array[(String, Int)] = Array((class1,50), (class1,90), (class2,80), (class2,70))
scala> rdd2.foreach(score => println(score._1+":"+score._2))
class1:50
class1:90
class2:80
class2:70

join/leftOuterJoin/rightOuterJoin/union

顺序影响结果
scala> val rdd1 = sc.parallelize(List(("tom",1),("jerry",2),("kitty",3)))
scala> val rdd2 = sc.parallelize(List(("jerry",9),("tom",8),("shuke",7)))
scala> val rdd3 = rdd1.join(rdd2).collect
rdd3: Array[(String, (Int, Int))] = Array((tom,(1,8)), (jerry,(2,9)))
scala> rdd2.join(rdd1).collect
res5: Array[(String, (Int, Int))] = Array((tom,(8,1)), (jerry,(9,2)))

scala> val rdd2 = sc.parallelize(List(("jerry",9),("tom",8),("shuke",7),("tom",2)))
scala> val rdd3 = rdd1.join(rdd2).collect
rdd3: Array[(String, (Int, Int))] = Array((tom,(1,8)), (tom,(1,2)), (jerry,(2,9)))

scala> val rdd3 = rdd1.leftOuterJoin(rdd2).collect
rdd3: Array[(String, (Int, Option[Int]))] = Array((tom,(1,Some(8))), (tom,(1,Some(2))), (jerry,(2,Some(9))), (kitty,(3,None)))
scala> val rdd3 = rdd1.rightOuterJoin(rdd2).collect
rdd3: Array[(String, (Option[Int], Int))] = Array((tom,(Some(1),8)), (tom,(Some(1),2)), (jerry,(Some(2),9)), (shuke,(None,7)))

scala> val rdd3 = rdd1.union(rdd2).collect
rdd3: Array[(String, Int)] = Array((tom,1), (jerry,2), (kitty,3), (jerry,9), (tom,8), (shuke,7), (tom,2))

scala> val rdd3 = rdd1.union(rdd2)
scala> val rdd4 = rdd3.groupByKey
scala> rdd4.collect
res11: Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(1, 8, 2)), (jerry,CompactBuffer(2, 9)), (shuke,CompactBuffer(7)), (kitty,CompactBuffer(3)
求每个单词出现的次数
scala> val rdd5 = rdd3.groupByKey.map(x=>(x._1,x._2.sum))
scala> rdd5.collect
res12: Array[(String, Int)] = Array((tom,11), (jerry,11), (shuke,7), (kitty,3))
scala> rdd3.groupByKey.mapValues(_.sum).collect
res14: Array[(String, Int)] = Array((tom,11), (jerry,11), (shuke,7), (kitty,3))

相关推荐