需求:筛选出 省份中 点击率前三的广告
数据源 时间戳 省份 城市 用户 广告
object Spark24_RDD_Req {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("R17")val sc = new SparkContext(conf)val rdd: RDD[String] = sc.textFile("datas/agent.log")// TODO 筛选出 省份中 点击率前三的广告//时间戳 省份 城市 用户 广告// 1. 对原始数据结构进行转换 ==> ((省份,广告),1)val mapRDD: RDD[((String, String), Int)] = rdd.map(lines => {val line: Array[String] = lines.split(" ")((line(1), line(4)), 1)})// 2. 对相同key 进行聚合 ==> ((省份,广告), sum )val reduceRDD: RDD[((String, String), Int)] = mapRDD.reduceByKey((a, b) => a + b)// 3. 在对数据进行转换 ==> (省份,(广告,sum))val mapTwoRDD: RDD[(String, (String, Int))] = reduceRDD.map(item => (item._1._1, (item._1._2, item._2)))// 4. 分组 ==> (省份,[(广告,sum) 。。。。])val groupRDD: RDD[(String, Iterable[(String, Int)])] = mapTwoRDD.groupByKey()// 5. 分组内排序,选出各自分组的前三val resRDD: RDD[(String, List[(String, Int)])] = groupRDD.mapValues(//Iterable 无法排序 必须转成 List/Array 才能排序items => items.toList.sortBy(item => item._2)(Ordering.Int.reverse).take(3))resRDD.collect().foreach(println)sc.stop()}}

