累加器概念
https://www.yuque.com/docs/share/5314de22-204e-4731-86a7-4454a5ac0274?# 《Spark之累加器的概念和使用》
自定义累加器
自定义Map的累加器,
统计List里面的累加值,总数,平均值
package com.addimport org.apache.spark.rdd.RDDimport org.apache.spark.util.AccumulatorV2import org.apache.spark.{SparkConf, SparkContext}object MyMapAccumulator {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("MyAcc").setMaster("local[4]")val sc: SparkContext = new SparkContext(conf)val list1 = List(30, 50, 70, 60, 10, 20, 10, 30, 40, 50)val rdd1: RDD[Int] = sc.parallelize(list1, 2)val acc = new MyMapAccumulatorsc.register(acc)rdd1.foreach(x => acc.add(x))println(acc.value) //输出: Map(sum -> 370.0, count -> 10, avg -> 37.0)sc.stop()}}// 将来累加器的值同时包含 sum, count, avg// (sum, count, avg)// Map("sum"-> 1000, "count"-> 10, "avg" -> 100)class MyMapAccumulator extends AccumulatorV2[Double, Map[String, Any]] {private var map = Map[String, Any]()override def isZero: Boolean = map.isEmptyoverride def copy(): AccumulatorV2[Double, Map[String, Any]] = {println("copy...")val acc = new MyMapAccumulatoracc.map = map //复制到当前的mapacc}/** 重置一下* 不可变集合, 直接赋值一个空的集合**/override def reset(): Unit = map = {println("reset...")Map[String, Any]()}override def add(v: Double): Unit = {// 对sum和count进行累加. avg在最后value函数进行计算// 强转成double进行累加.map += "sum" -> (map.getOrElse("sum", 0D).asInstanceOf[Double] + v)map += "count" -> (map.getOrElse("count", 0L).asInstanceOf[Long] + 1L)}/*** 把所有分区的map进行合并** @param other*/override def merge(other: AccumulatorV2[Double, Map[String, Any]]): Unit = {// 合并两个mapother match {//是MapAcc才进行合并,如果是其它类型的话就抛异常case o: MyMapAccumulator =>map +="sum" -> (map.getOrElse("sum", 0D).asInstanceOf[Double]+ o.map.getOrElse("sum", 0D).asInstanceOf[Double])map +="count" -> (map.getOrElse("count", 0L).asInstanceOf[Long]+ o.map.getOrElse("count", 0L).asInstanceOf[Long])case _ => throw new UnsupportedOperationException}}override def value: Map[String, Any] = {// 进行平均值统计map += "avg" ->(map.getOrElse("sum", 0D).asInstanceOf[Double] / map.getOrElse("count", 0L).asInstanceOf[Long])map}}
输出
Map(sum -> 370.0, count -> 10, avg -> 37.0)
