概念
函数声明:
def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,partitioner, mapSideCombine, serializer)(null)}
作用: 针对每个K, 将V进行合并成C, 得到
createCombiner: 将读进来的数据进行初始化,当前的值作为参数,可以对这个值做一一些转换操作.转换为我们想要的数据格式. 比如说我RDD类型的keyvalue类型.我需要对这些值做个转换.
combineByKey会遍历分区中的每个key-value对. 如果第一次碰到这个key, 则调用createCombiner函数,传入value, 得到一个C类型的值.(如果不是第一次碰到这个 key, 则不会调用这个方法)
mergeValue: 如果不是第一个遇到这个key, 则调用这个函数进行合并操作. 分区内合并
mergeCombiners 跨分区合并相同的key的值(C). 跨分区合并
案例
求出每一个学生的平均成绩
import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object Transformation_combineByKey {def main(args: Array[String]): Unit = {//创建SparkConf并设置App名称val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")//创建SparkContext,该对象是提交Spark App的入口val sc: SparkContext = new SparkContext(conf)//需求:求出每一个学生的平均成绩//创建RDDval scoreRDD: RDD[(String, Int)] = sc.makeRDD(List(("zhangsan", 90), ("lisi", 60), ("zhangsan", 96), ("lisi", 62), ("zhangsan", 100), ("lisi", 50)))//createCombiner: V => C, 对RDD中当前key取出第一个value做一个初始化//mergeValue: (C, V) => C, 分区内计算规则,主要在分区内进行,将当前分区的value值,合并到初始化得到的c上面//mergeCombiners: (C, C) => C 分区间计算规则// 0---("zhangsan",90),("zhangsan",95)// 1---("zhangsan",100)val combineRDD: RDD[(String, (Int, Int))] = scoreRDD.combineByKey((_, 1), //对取出的第一个value做初始化(t1: (Int, Int), v) => {(t1._1 + v, t1._2 + 1)},(t2: (Int, Int), t3: (Int, Int)) => {(t2._1 + t3._1, t2._2 + t3._2)})//求平均成绩val resRDD: RDD[(String, Int)] = combineRDD.map {case (name, (score, count)) => {(name, score / count)}}resRDD.collect().foreach(println)/* 输出:(zhangsan,95)(lisi,57)*/// 关闭连接sc.stop()}}
