zhixingheyitian 2020-07-19
广播变量object Main { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("readMysql").setMaster("local[2]") val sparkContext = new SparkContext(sparkConf) val rdd: RDD[Int] = sparkContext.parallelize(List(1, 2, 3, 4, 5, 6), 2) val i: Int =3 val broadcast = sparkContext.broadcast(i) rdd.map(_+broadcast.value).foreach(println) sparkContext.stop() } }
持久化广播变量:
广播变量会持续占用内存,当我们不需要的时候,可以用 unpersist 算子将其移除
broadcast.unpersist()
这时,如果计算任务又用到广播变量,那么就会重新拉取数据
你还可以使用 destroy 方法彻底销毁广播变量,调用该方法后,如果计算任务中又用到广播变量,则会抛出异常
broadcast.destroy()
广播变量在一定数据量范围内可以有效地使作业避免 Shuffle,使计算尽可能本地运行,Spark 的 Map 端连接操作就是用广播变量实现的。
累加器
与广播变量只读不同,累加器是一种只能进行增加操作的共享变量
package bigdata import org.apache.spark.rdd.RDD import org.apache.spark.util.LongAccumulator import org.apache.spark.{SparkConf, SparkContext} object Main { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("readMysql").setMaster("local[2]") val sparkContext = new SparkContext(sparkConf) val rdd: RDD[Int] = sparkContext.parallelize(List(1, 2, 3, 4, 5, 6), 2) val aAccumulator: LongAccumulator = sparkContext.longAccumulator("acc") rdd.map(x=>{ aAccumulator.add(x) x+1 }).foreach(print) print(aAccumulator) sparkContext.stop() } }
输出:
567234
LongAccumulator(id: 0, name: Some(acc), value: 21)
自定义累加器:
package bigdata import org.apache.spark.util.AccumulatorV2 case class SumAandB(A:Long,B:Long) class MyAccumulator extends AccumulatorV2[SumAandB,SumAandB]{ private var a=0L private var b=0L override def isZero: Boolean = { a==0 && b==0 } override def copy(): AccumulatorV2[SumAandB, SumAandB] = { val accumulator = new MyAccumulator accumulator.a=this.a accumulator.b=this.b accumulator } override def reset(): Unit = {a=0;b=0 } override def add(v: SumAandB): Unit = { a=a+v.A b=b+v.B } override def merge(other: AccumulatorV2[SumAandB, SumAandB]): Unit = { other match{ case e:MyAccumulator=>{ a+=e.a b+=e.b } case _ => }} override def value: SumAandB = SumAandB(a,b) }
使用自定义累加器:
package bigdata import org.apache.spark.rdd.RDD import org.apache.spark.util.LongAccumulator import org.apache.spark.{SparkConf, SparkContext} object Main { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("readMysql").setMaster("local[2]") val sparkContext = new SparkContext(sparkConf) val myAccumulator = new MyAccumulator sparkContext.register(myAccumulator,"myAccumulator") val rdd: RDD[SumAandB] = sparkContext.parallelize(List(SumAandB(1, 2), SumAandB(0, 2), SumAandB(3, 2)), 2) rdd.map(s=>{ myAccumulator.add(s) s }).foreach(println) print(myAccumulator) sparkContext.stop() } }