/*** 排序的wordcount程序* @author Administrator**/public class SortWordCount {public static void main(String[] args) {// 创建SparkConf和JavaSparkContextSparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);// 创建lines RDDJavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//spark.txt");// 执行我们之前做过的单词计数JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {private static final long serialVersionUID = 1L;@Overridepublic Iterable<String> call(String t) throws Exception {return Arrays.asList(t.split(" "));}});JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(String t) throws Exception {return new Tuple2<String, Integer>(t, 1);}});JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}});// 到这里为止,就得到了每个单词出现的次数// 但是,问题是,我们的新需求,是要按照每个单词出现次数的顺序,降序排序// wordCounts RDD内的元素是什么?应该是这种格式的吧:(hello, 3) (you, 2)// 我们需要将RDD转换成(3, hello) (2, you)的这种格式,才能根据单词出现次数进行排序把!// 进行key-value的反转映射JavaPairRDD<Integer, String> countWords = wordCounts.mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> t)throws Exception {return new Tuple2<Integer, String>(t._2, t._1);}});// 按照key进行排序JavaPairRDD<Integer, String> sortedCountWords = countWords.sortByKey(false);// 再次将value-key进行反转映射JavaPairRDD<String, Integer> sortedWordCounts = sortedCountWords.mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> t)throws Exception {return new Tuple2<String, Integer>(t._2, t._1);}});// 到此为止,我们获得了按照单词出现次数排序后的单词计数// 打印出来sortedWordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<String, Integer> t) throws Exception {System.out.println(t._1 + " appears " + t._2 + " times.");}});// 关闭JavaSparkContextsc.close();}
scala 版本
object SortWordCount {def main(args: Array[String]) {val conf = new SparkConf().setAppName("SortWordCount").setMaster("local")val sc = new SparkContext(conf)val lines = sc.textFile("C://Users//Administrator//Desktop//spark.txt", 1)val words = lines.flatMap { line => line.split(" ") }val pairs = words.map { word => (word, 1) }val wordCounts = pairs.reduceByKey(_ + _)val countWords = wordCounts.map(wordCount => (wordCount._2, wordCount._1))val sortedCountWords = countWords.sortByKey(false)val sortedWordCounts = sortedCountWords.map(sortedCountWord => (sortedCountWord._2, sortedCountWord._1))sortedWordCounts.foreach(sortedWordCount => println(sortedWordCount._1 + " appear " + sortedWordCount._2 + " times."))}}
