//groupBy def wordCount1(sc:SparkContext) = { val rdd = sc.makeRDD(List("hello scala","hello spark")) val words: RDD[String] = rdd.flatMap(_.split(" ")) val group: RDD[(String, Iterable[String])] = words.groupBy(word => word) val wordCount: RDD[(String, Int)] = group.mapValues(iter => iter.size) }
//groupBykey def wordCount2(sc:SparkContext) = { val rdd = sc.makeRDD(List("hello scala","hello spark")) val words: RDD[String] = rdd.flatMap(_.split(" ")) val wordToOne: RDD[(String, Int)] = words.map((_, 1)) val groupByKey: RDD[(String, Iterable[Int])] = wordToOne.groupByKey() val wordCount: RDD[(String, Int)] = groupByKey.mapValues(iter => iter.size) }
//reduceByKey def wordCount3(sc:SparkContext) = { val rdd = sc.makeRDD(List("hello scala","hello spark")) val words: RDD[String] = rdd.flatMap(_.split(" ")) val wordToOne: RDD[(String, Int)] = words.map((_, 1)) val wordCount: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _) }
//aggregateByKey def wordCount4(sc:SparkContext) = { val rdd = sc.makeRDD(List("hello scala","hello spark")) val words: RDD[String] = rdd.flatMap(_.split(" ")) val wordToOne: RDD[(String, Int)] = words.map((_, 1)) val wordCount: RDD[(String, Int)] = wordToOne.aggregateByKey(0)(_ + _,_ + _) }
//foldByKey def wordCount5(sc:SparkContext) = { val rdd = sc.makeRDD(List("hello scala","hello spark")) val words: RDD[String] = rdd.flatMap(_.split(" ")) val wordToOne: RDD[(String, Int)] = words.map((_, 1)) val wordCount: RDD[(String, Int)] = wordToOne.foldByKey(0)(_ + _) }
//combineByKey def wordCount6(sc:SparkContext) = { val rdd = sc.makeRDD(List("hello scala","hello spark")) val words: RDD[String] = rdd.flatMap(_.split(" ")) val wordToOne: RDD[(String, Int)] = words.map((_, 1)) val wordCount: RDD[(String, Int)] = wordToOne.combineByKey( v => v, (x: Int, y: Int) => x + y, (x: Int, y: Int) => x + y ) }
//countByKey def wordCount7(sc:SparkContext) = { val rdd = sc.makeRDD(List("hello scala","hello spark")) val words: RDD[String] = rdd.flatMap(_.split(" ")) val wordToOne: RDD[(String, Int)] = words.map((_, 1)) val wordCount: collection.Map[String, Long] = wordToOne.countByKey() }
//countByValue def wordCount8(sc:SparkContext) = { val rdd = sc.makeRDD(List("hello scala","hello spark")) val words: RDD[String] = rdd.flatMap(_.split(" ")) val wordCount: collection.Map[String, Long] = words.countByValue() }
//reduce def wordCount9(sc:SparkContext) = { val rdd = sc.makeRDD(List("hello scala","hello spark")) val words: RDD[String] = rdd.flatMap(_.split(" ")) val mapWord: RDD[mutable.Map[String, Long]] = words.map( word => { mutable.Map[String, Long]((word, 1)) } ) val wordCount: mutable.Map[String, Long] = mapWord.reduce( (map1, map2) => { map2.foreach{ case (word,count) => { val newCount = map1.getOrElse(word,0L) + count map1.update(word,newCount) } } map1 } ) }
//aggregate def wordCount10(sc:SparkContext) = { val rdd = sc.makeRDD(List("hello scala","hello spark")) val words: RDD[String] = rdd.flatMap(_.split(" ")) val wordCount: mutable.Map[String, Long] = words.aggregate(mutable.Map[String, Long]())( (map, word) => { // 将单词添加到映射,如果不存在则创建新的键值对 // `->` 是一个特殊的操作符,用于创建键值对。 // key -> value 是一个用于创建元组 (key, value) 的语法糖 //将下述键值对添加到映射 map 中。 //如果 word 已存在于映射中,则更新其值;否则,添加新的键值对。 map += (word -> (map.getOrElse(word, 0L) + 1)) map }, (map1, map2) => { // 将两个映射合并 map2.foreach { case (word, count) => { val newCount = map1.getOrElse(word, 0L) + count map1.update(word, newCount) } } map1 } ) }