登峰小蚁 2020-01-12
combineByKey的强大之处,在于提供了三个函数操作来操作一个函数。第一个函数,是对元数据处理,从而获得一个键值对。第二个函数,是对键值键值对进行一对一的操作,即一个键值对对应一个输出,且这里是根据key进行整合。第三个函数是对key相同的键值对进行操作,有点像reduceByKey,但真正实现又有着很大的不同。
在Spark入门(五)--Spark的reduce和reduceByKey中,我们用reduce进行求平均值。用combineByKey我们则可以求比平均值更为丰富的事情。现在有一个数据集,每一行数据包括一个a-z字母和一个整数,其中字母和整数之间以空格分隔。现在要求得每个字母的平均数。这个场景有点像多个学生,每个学生多门成绩,求得学生的平均分。但这里将问题简化,其中数据集放在grades中。数据集以及下面的代码都可以在github上下载。
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.{SparkConf, SparkContext} object SparkCombineByKey { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKey") val sc = new SparkContext(conf) sc.textFile("./grades").map(line=>{ val splits = line.split(" ") (splits(0),splits(1).toInt) }).combineByKey( value => (value,1), (x:(Int,Int),y)=>(x._1+y,x._2+1), (x:(Int,Int),y:(Int,Int))=>(x._1+y._1,x._2+y._2) ).map(x=>(x._1,x._2._1/x._2._2)).foreach(println) } }
(d,338451) (e,335306) (a,336184) (i,346279) (b,333069) (h,334343) (f,341380) (j,320145) (g,334042) (c,325022)
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.sql.sources.In; import scala.Tuple2; public class SparkCombineByKeyJava { public static void main(String[] args){ SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKeyJava"); JavaSparkContext sc = new JavaSparkContext(conf); combineByKeyJava(sc); combineByKeyJava8(sc); } public static void combineByKeyJava(JavaSparkContext sc){ JavaPairRDD<String,Integer> splitData = sc.textFile("./grades").mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { String[] splits = s.split(" "); return new Tuple2<>(splits[0],Integer.parseInt(splits[1])); } }); splitData.combineByKey(new Function<Integer, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<>(integer, 1); } }, new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> integerIntegerTuple2, Integer integer) throws Exception { return new Tuple2<>(integerIntegerTuple2._1 + integer, integerIntegerTuple2._2 + 1); } }, new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> integerIntegerTuple2, Tuple2<Integer, Integer> integerIntegerTuple22) throws Exception { return new Tuple2<>(integerIntegerTuple2._1+integerIntegerTuple22._1,integerIntegerTuple2._2+integerIntegerTuple22._2); } }).map(new Function<Tuple2<String,Tuple2<Integer,Integer>>, Tuple2<String,Double>>() { @Override public Tuple2<String,Double> call(Tuple2<String, Tuple2<Integer, Integer>> stringTuple2Tuple2) throws Exception { return new Tuple2<>(stringTuple2Tuple2._1,stringTuple2Tuple2._2._1*1.0/stringTuple2Tuple2._2._2); } }).foreach(new VoidFunction<Tuple2<String, Double>>() { @Override public void call(Tuple2<String, Double> stringDoubleTuple2) throws Exception { System.out.println(stringDoubleTuple2._1+" "+stringDoubleTuple2._2); } }); } public static void combineByKeyJava8(JavaSparkContext sc){ JavaPairRDD<String,Integer> splitData = sc.textFile("./grades").mapToPair(line -> { String[] splits = line.split(" "); return new Tuple2<>(splits[0],Integer.parseInt(splits[1])); }); splitData.combineByKey( x->new Tuple2<>(x,1), (x,y)->new Tuple2<>(x._1+y,x._2+1), (x,y)->new Tuple2<>(x._1+y._1,x._2+y._2) ).map(x->new Tuple2(x._1,x._2._1*1.0/x._2._2)).foreach(x->System.out.println(x._1+" "+x._2)); } }
d 338451.6 e 335306.7480769231 a 336184.95321637427 i 346279.497029703 b 333069.8589473684 h 334343.75 f 341380.94444444444 j 320145.7618069815 g 334042.37605042016 c 325022.4183673469
在开始python之前,我们先观察java和scala两个程序。我们发现java7的代码非常冗余,而java8和scala则相比起来非常干净利落。当然,我们难说好坏,但是这也表现出当代语言开始从繁就简的一个转变。到了python这一特点就体现的更加淋漓尽致。
但我们不光说语言,我们分析这个求平均的实现方式,由于java中对数值做了一个处理,因此有保留小数,而scala则没有,但至少可以判断两者的结果是一致的。当然,这不是重点,重点是,这个combinByKey非常复杂,有三个函数。我们很难观察到每个过程做了什么。因此我们在这里,对scala程序进行进一步的输出,从而观察combineByKey到底做了什么。
import org.apache.spark.{SparkConf, SparkContext} object SparkCombineByKey { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKey") val sc = new SparkContext(conf) sc.textFile("./grades").map(line=>{ val splits = line.split(" ") (splits(0),splits(1).toInt) }).combineByKey( value => { println("这是第一个函数") println("将所有的值遍历,并放在元组中,标记1") println(value) (value,1) }, (x:(Int,Int),y)=>{ println("这是第二个函数") println("将x中的第一个值进行累加求和,第二个值加一,求得元素总个数") println("x:"+x.toString()) println("y:"+y) (x._1+y,x._2+1) }, (x:(Int,Int),y:(Int,Int))=>{ (x._1+y._1,x._2+y._2) } ).map(x=>(x._1,x._2._1/x._2._2)).foreach(println) } }
这是第一个函数 将所有的值遍历,并放在元组中,标记1 222783 这是第一个函数 将所有的值遍历,并放在元组中,标记1 48364 这是第一个函数 将所有的值遍历,并放在元组中,标记1 204950 这是第一个函数 将所有的值遍历,并放在元组中,标记1 261777 ... ... ... 这是第二个函数 将x中的第一个值进行累加求和,第二个值加一,求得元素总个数 x:(554875,2) y:357748 这是第二个函数 将x中的第一个值进行累加求和,第二个值加一,求得元素总个数 x:(912623,3) y:202407 这是第一个函数 将所有的值遍历,并放在元组中,标记1 48608 这是第二个函数 将x中的第一个值进行累加求和,第二个值加一,求得元素总个数 x:(1115030,4) y:69003 这是第一个函数 将所有的值遍历,并放在元组中,标记1 476893 ... ... ... (d,338451) (e,335306) (a,336184) (i,346279) (b,333069) (h,334343) (f,341380) (j,320145) (g,334042) (c,325022)
这里我们发现了,函数的顺序并不先全部执行完第一个函数,再执行第二个函数。而是分区并行,即第一个分区执行完第一个函数,并不等待其他分区执行完第一个函数,而是紧接着执行第二个函数,最后在第三个函数进行处理。在本地单机下,该并行特点并不能充分发挥,但在集群环境中,各个分区在不同节点计算,然后处理完结果汇总处理。这样,当数据量十分庞大时,集群节点数越多,该优势就表现地越明显。
此外还有一个非常值得关注的特点,当我们把foreach(println)这句话去掉时
foreach(println)
我们运行程序,发现程序没有任何输出。这是由于spark的懒加载特点,spark只用在对数据执行具体操作时,如输出、保存等才会执行计算。这看起来有点不合理,但实际上这样做在很多场景下能大幅度提升效率,但如果没有处理好,可能会导致spark每次执行操作都会从头开始计算该过程。因此当一个操作结果需要被频繁或者多次调用的时候,我们应该将结果存下来。
from pyspark import SparkConf,SparkContext conf = SparkConf().setMaster("local").setAppName("SparkCombineByKey") sc = SparkContext(conf=conf) sc.textFile("./grades") .map(lambda line : (line.split(" ")[0],int(line.split(" ")[1]))) .combineByKey( lambda num:(num,1),lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1]) ).map(lambda x:(x[0],x[1][0]/x[1][1])).foreach(print)
(‘b‘, 333069.8589473684) (‘f‘, 341380.94444444444) (‘j‘, 320145.7618069815) (‘h‘, 334343.75) (‘a‘, 336184.95321637427) (‘g‘, 334042.37605042016) (‘d‘, 338451.6) (‘e‘, 335306.7480769231) (‘c‘, 325022.4183673469)
sortByKey非常简单,也非常常用。这里依然采用上述文本,将处理后的结果,进行排序,得到平均值最大的字母。在实际运用中我们这里可以看成求得按照成绩排序,或者按照姓名排序。
import org.apache.spark.{SparkConf, SparkContext} object SparkSortByKey { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKey") val sc = new SparkContext(conf) val result = sc.textFile("./grades").map(line=>{ val splits = line.split(" ") (splits(0),splits(1).toInt) }).combineByKey(value =>(value,1),(x:(Int,Int),y)=>(x._1+y,x._2+1),(x:(Int,Int),y:(Int,Int))=>(x._1+y._1,x._2+y._2) ).map(x=>(x._1,x._2._1/x._2._2)) //按照名字排序,顺序 result.sortByKey(true).foreach(println) //按照名字排序,倒序 result.sortByKey(false).foreach(println) val result1 = sc.textFile("./grades").map(line=>{ val splits = line.split(" ") (splits(0),splits(1).toInt) }).combineByKey(value =>(value,1),(x:(Int,Int),y)=>(x._1+y,x._2+1),(x:(Int,Int),y:(Int,Int))=>(x._1+y._1,x._2+y._2) ).map(x=>(x._2._1/x._2._2,x._1)) //按照成绩排序,顺序 result1.sortByKey(true).foreach(println) //按照成绩排序,倒序 result1.sortByKey(false).foreach(println) } }
from pyspark import SparkConf,SparkContext conf = SparkConf().setMaster("local").setAppName("SparkCombineByKey") sc = SparkContext(conf=conf) result = sc.textFile("./grades") .map(lambda line : (line.split(" ")[0],int(line.split(" ")[1]))) .combineByKey( lambda num:(num,1),lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1]) ).map(lambda x:(x[0],x[1][0]/x[1][1])) result.sortByKey(True).foreach(print) result.sortByKey(False).foreach(print) result1 = sc.textFile("./grades") .map(lambda line : (line.split(" ")[0],int(line.split(" ")[1]))) .combineByKey( lambda num:(num,1),lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1]) ).map(lambda x:(x[1][0]/x[1][1],x[0])) result1.sortByKey(True).foreach(print) result1.sortByKey(False).foreach(print)
(a,336184) (b,333069) (c,325022) (d,338451) (e,335306) (f,341380) (g,334042) (h,334343) (i,346279) (j,320145) (j,320145) (i,346279) (h,334343) (g,334042) (f,341380) (e,335306) (d,338451) (c,325022) (b,333069) (a,336184) (320145,j) (325022,c) (333069,b) (334042,g) (334343,h) (335306,e) (336184,a) (338451,d) (341380,f) (346279,i) (346279,i) (341380,f) (338451,d) (336184,a) (335306,e) (334343,h) (334042,g) (333069,b) (325022,c) (320145,j)
数据集以及代码都可以在github上下载。