如下一个数据集,需要进行如下操作:
val df = spark.sparkContext.parallelize(Seq(("1", 15), ("1", 120), ("1", 16), ("1", 14),("2", 17), ("2", 12), ("3", 13), ("3", 14))).toDF("lot", "cnt")-------------------------------------------------------------------------------------+---+---+|lot|cnt|+---+---+|1 |15 ||1 |120||1 |16 ||1 |14 ||2 |17 ||2 |12 ||3 |13 ||3 |14 |+---+---+
1构建json,
2按照lot分组,
3同组json合并为数组,
4数组内按照cnt字段排序
思路一:to_json+groupBy+collect_list+sort_array(先说结论,不可行❌)
因为是collect_list后数组里的元素是json,所以sort_array是按照字典顺序排序的,结果就不对。
思路二:groupByKey+mapGroups+JSON.parseObject+sortBy(先说结论,可行✔️)
通过groupByKey+mapGroups,在每个分区里解析json中排序字段进行数组json元素的排序
val df = spark.sparkContext.parallelize(Seq(("1", 15), ("1", 120), ("1", 16), ("1", 14),("2", 17), ("2", 12), ("3", 13), ("3", 14))).toDF("lot", "cnt")println("========= row_number ========")val frame = df.withColumn("rk", row_number() over (Window.partitionBy($"lot").orderBy($"cnt".desc_nulls_last)))frame.show(false)println("========= sort_array + collect_list========")val frame1 = frame.select($"lot", to_json(struct($"lot", $"cnt")).as("v")).groupBy("lot").agg(sort_array(collect_list("v"), false).as("v1"))frame1.printSchema()frame1.show(false)println("========= groupByKey + flatMapGroups========")frame.select($"lot", to_json(struct($"lot", $"cnt")).as("v")).groupByKey((row: Row) => row.getAs[String]("lot")).mapGroups((k, vs) =>(k, vs.map(_.getAs[String]("v")).toList.sortBy(j => JSON.parseObject(j).getIntValue("cnt")).reverse)).toDF("lot", "v").show(false)
打印结果
========= row_number ========+---+---+---+|lot|cnt|rk |+---+---+---+|3 |14 |1 ||3 |13 |2 ||1 |120|1 ||1 |16 |2 ||1 |15 |3 ||1 |14 |4 ||2 |17 |1 ||2 |12 |2 |+---+---+---+========= sort_array + collect_list========root|-- lot: string (nullable = true)|-- v1: array (nullable = true)| |-- element: string (containsNull = true)+---+-----------------------------------------------------------------------------------------+|lot|v1 |+---+-----------------------------------------------------------------------------------------+|3 |[{"lot":"3","cnt":14}, {"lot":"3","cnt":13}] ||1 |[{"lot":"1","cnt":16}, {"lot":"1","cnt":15}, {"lot":"1","cnt":14}, {"lot":"1","cnt":120}]||2 |[{"lot":"2","cnt":17}, {"lot":"2","cnt":12}] |+---+-----------------------------------------------------------------------------------------+========= groupByKey + flatMapGroups========+---+-----------------------------------------------------------------------------------------+|lot|v |+---+-----------------------------------------------------------------------------------------+|3 |[{"lot":"3","cnt":14}, {"lot":"3","cnt":13}] ||1 |[{"lot":"1","cnt":120}, {"lot":"1","cnt":16}, {"lot":"1","cnt":15}, {"lot":"1","cnt":14}]||2 |[{"lot":"2","cnt":17}, {"lot":"2","cnt":12}] |+---+-----------------------------------------------------------------------------------------+
