Spark算子实现wordCount的十种方法

发布时间 2023-10-03 00:45:37作者: sunshin1
//groupBy
  def wordCount1(sc:SparkContext) = {
    val rdd = sc.makeRDD(List("hello scala","hello spark"))
    val words: RDD[String] = rdd.flatMap(_.split(" "))
    val group: RDD[(String, Iterable[String])] = words.groupBy(word => word)
    val wordCount: RDD[(String, Int)] = group.mapValues(iter => iter.size)
  }

  

//groupBykey
  def wordCount2(sc:SparkContext) = {
    val rdd = sc.makeRDD(List("hello scala","hello spark"))
    val words: RDD[String] = rdd.flatMap(_.split(" "))
    val wordToOne: RDD[(String, Int)] = words.map((_, 1))
    val groupByKey: RDD[(String, Iterable[Int])] = wordToOne.groupByKey()
    val wordCount: RDD[(String, Int)] = groupByKey.mapValues(iter => iter.size)
  }

  

 //reduceByKey
  def wordCount3(sc:SparkContext) = {
    val rdd = sc.makeRDD(List("hello scala","hello spark"))
    val words: RDD[String] = rdd.flatMap(_.split(" "))
    val wordToOne: RDD[(String, Int)] = words.map((_, 1))
    val wordCount: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
  }

  

//aggregateByKey
  def wordCount4(sc:SparkContext) = {
    val rdd = sc.makeRDD(List("hello scala","hello spark"))
    val words: RDD[String] = rdd.flatMap(_.split(" "))
    val wordToOne: RDD[(String, Int)] = words.map((_, 1))
    val wordCount: RDD[(String, Int)] = wordToOne.aggregateByKey(0)(_ + _,_ + _)
  }

  

//foldByKey
  def wordCount5(sc:SparkContext) = {
    val rdd = sc.makeRDD(List("hello scala","hello spark"))
    val words: RDD[String] = rdd.flatMap(_.split(" "))
    val wordToOne: RDD[(String, Int)] = words.map((_, 1))
    val wordCount: RDD[(String, Int)] = wordToOne.foldByKey(0)(_ + _)
  }

  

//combineByKey
  def wordCount6(sc:SparkContext) = {
    val rdd = sc.makeRDD(List("hello scala","hello spark"))
    val words: RDD[String] = rdd.flatMap(_.split(" "))
    val wordToOne: RDD[(String, Int)] = words.map((_, 1))
    val wordCount: RDD[(String, Int)] = wordToOne.combineByKey(
      v => v,
      (x: Int, y: Int) => x + y,
      (x: Int, y: Int) => x + y
    )
  }

  

//countByKey
  def wordCount7(sc:SparkContext) = {
    val rdd = sc.makeRDD(List("hello scala","hello spark"))
    val words: RDD[String] = rdd.flatMap(_.split(" "))
    val wordToOne: RDD[(String, Int)] = words.map((_, 1))
    val wordCount: collection.Map[String, Long] = wordToOne.countByKey()
  }

  

//countByValue
  def wordCount8(sc:SparkContext) = {
    val rdd = sc.makeRDD(List("hello scala","hello spark"))
    val words: RDD[String] = rdd.flatMap(_.split(" "))
    val wordCount: collection.Map[String, Long] = words.countByValue()
  }

  

 //reduce
  def wordCount9(sc:SparkContext) = {
    val rdd = sc.makeRDD(List("hello scala","hello spark"))
    val words: RDD[String] = rdd.flatMap(_.split(" "))
    val mapWord: RDD[mutable.Map[String, Long]] = words.map(
      word => {
        mutable.Map[String, Long]((word, 1))
      }
    )
    val wordCount: mutable.Map[String, Long] = mapWord.reduce(
      (map1, map2) => {
        map2.foreach{
          case (word,count) => {
            val newCount = map1.getOrElse(word,0L) + count
            map1.update(word,newCount)
          }
        }
        map1
      }
    )
  }

  

//aggregate
  def wordCount10(sc:SparkContext) = {
    val rdd = sc.makeRDD(List("hello scala","hello spark"))
    val words: RDD[String] = rdd.flatMap(_.split(" "))
 
    val wordCount: mutable.Map[String, Long] = words.aggregate(mutable.Map[String, Long]())(
      (map, word) => {
        // 将单词添加到映射,如果不存在则创建新的键值对
        // `->` 是一个特殊的操作符,用于创建键值对。
        // key -> value 是一个用于创建元组 (key, value) 的语法糖
        //将下述键值对添加到映射 map 中。
        //如果 word 已存在于映射中,则更新其值;否则,添加新的键值对。
        map += (word -> (map.getOrElse(word, 0L) + 1))
        map
      },
      (map1, map2) => {
        // 将两个映射合并
        map2.foreach {
          case (word, count) => {
            val newCount = map1.getOrElse(word, 0L) + count
            map1.update(word, newCount)
          }
        }
        map1
      }
    )
  }