spark 广播变量 累加器

zhixingheyitian 2020-07-19

广播变量object Main {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("readMysql").setMaster("local[2]")
    val sparkContext = new SparkContext(sparkConf)
    val rdd: RDD[Int] = sparkContext.parallelize(List(1, 2, 3, 4, 5, 6), 2)
    val i: Int =3
    val broadcast = sparkContext.broadcast(i)
    rdd.map(_+broadcast.value).foreach(println)
    sparkContext.stop()
  }
}

持久化广播变量:

广播变量会持续占用内存,当我们不需要的时候,可以用 unpersist 算子将其移除

broadcast.unpersist()

这时,如果计算任务又用到广播变量,那么就会重新拉取数据

你还可以使用 destroy 方法彻底销毁广播变量,调用该方法后,如果计算任务中又用到广播变量,则会抛出异常

broadcast.destroy()

广播变量在一定数据量范围内可以有效地使作业避免 Shuffle,使计算尽可能本地运行,Spark 的 Map 端连接操作就是用广播变量实现的。

累加器
与广播变量只读不同,累加器是一种只能进行增加操作的共享变量

package bigdata

import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}

object Main {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("readMysql").setMaster("local[2]")
    val sparkContext = new SparkContext(sparkConf)
    val rdd: RDD[Int] = sparkContext.parallelize(List(1, 2, 3, 4, 5, 6), 2)
    val aAccumulator: LongAccumulator = sparkContext.longAccumulator("acc")
    rdd.map(x=>{
      aAccumulator.add(x)
      x+1
    }).foreach(print)
    print(aAccumulator)
    sparkContext.stop()
  }
}

输出:

567234

LongAccumulator(id: 0, name: Some(acc), value: 21)

自定义累加器:

package bigdata

import org.apache.spark.util.AccumulatorV2

case class SumAandB(A:Long,B:Long)
class MyAccumulator extends  AccumulatorV2[SumAandB,SumAandB]{
  private var a=0L
  private var b=0L

  override def isZero: Boolean = {
    a==0 && b==0
  }

  override def copy(): AccumulatorV2[SumAandB, SumAandB] = {
    val accumulator = new MyAccumulator
    accumulator.a=this.a
    accumulator.b=this.b
    accumulator
  }

  override def reset(): Unit = {a=0;b=0
  }

  override def add(v: SumAandB): Unit = {
    a=a+v.A
    b=b+v.B
  }

  override def merge(other: AccumulatorV2[SumAandB, SumAandB]): Unit = {
    other match{
      case e:MyAccumulator=>{
        a+=e.a
        b+=e.b
      }
      case _ =>
    }}

  override def value: SumAandB = SumAandB(a,b)
}

使用自定义累加器:

package bigdata

import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}

object Main {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("readMysql").setMaster("local[2]")
    val sparkContext = new SparkContext(sparkConf)
    val myAccumulator = new MyAccumulator
    sparkContext.register(myAccumulator,"myAccumulator")

    val rdd: RDD[SumAandB] = sparkContext.parallelize(List(SumAandB(1, 2), SumAandB(0, 2), SumAandB(3, 2)), 2)
    rdd.map(s=>{
      myAccumulator.add(s)
      s
    }).foreach(println)
    print(myAccumulator)
    sparkContext.stop()
  }
}