Spark算子总结

发布时间 2023-04-20 13:50:36作者: MrSponge


Spark的算子分为两大类:transform(转换算子)和action(行动算子)

transform算子:map、mapPartitions、mapPartitionsWithIndex、flatMap、glom、groupBy、filter、sample、distinct、coalesce、repartition、sortBy、intersection、union、subtract、zip、partitionBy、reduceByKey、groupByKey、aggregateByKey、foldByKey、combineByKey、reduceByKey、aggregateByKey、foldByKey、combineByKey、join、leftOuterJoin、cogroup...

action算子:reduce、collect、count、first、take、takeOrdering、top、aggregate、fold、countByKey、save、forach...

算子区别的定义

transform:不会触发作业的执行

action:会触发作业的执行

transform算子介绍

map

  • 函数签名
    • def map[U: ClassTag](f: Y=>U): RDD[U]

函数说明
将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark01_RDD_Operator_Transform_Par {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - map
    // 先说结论:
    // 1. rdd的计算一个分区内的数据是一个一个执行逻辑
    //    只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据
    //    分区内数据的执行是有序的
    // 2. 不同分区内的数据是无序的
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 1)

    val mapRDD: RDD[Int] = rdd.map(
      num => {
        println(">>>>>>" + num)
        num
      }
    )
    val mapRDD1: RDD[Int] = mapRDD.map(
      num => {
        println("######" + num)
        num
      }
    )
    mapRDD1.collect()
    /* 未设置分区的结果
    >>>>>>3
    >>>>>>2
    ######3
    >>>>>>4
    ######4
    ######2
    >>>>>>1
    ######1
     */

    /* 设置分区后的结果
    >>>>>>1
    ######1
    >>>>>>2
    ######2
    >>>>>>3
    ######3
    >>>>>>4
    ######4
     */
     
    sc.stop()
  }
}

mapPartitions

  • 函数签名
    • def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPatitioning: Boolean = false): RDD[U]

函数说明

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处 理,哪怕是过滤数据。

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark02_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - mapPartitions:可以以分区为单位进行批处理
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

    // mapPartitions: 可以以分区为单位进行数据转换操作
    //                  但是会将整个分区的数据加载到内存进行引用
    //                  如果处理完的数据是不会被释放掉,存在对象的引用
    //                  在内存较小,数据量较大的场合下,容易出现内存溢出
    val mapRDD: RDD[Int] = rdd.mapPartitions(
      iter => {
        println(">>>>>>>")
        iter.map(_ * 2)
      }
    )

    mapRDD.collect().foreach(println)
    /*
    >>>>>>>
    >>>>>>>
    2
    4
    6
    8
     */

    sc.stop()
  }
}

求分区内的最大数

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark02_RDD_Operator_Transform_Test {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - mapPartitions 求各个分区内的最大数
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

    // [1,2],[3,4]
    // [2],[4]
    val mappRDD: RDD[Int] = rdd.mapPartitions(
      iter => {
        List(iter.max).iterator// mapPartitions需要返回的是一个迭代器对象
      }
    )

    mappRDD.collect().foreach(println)

    sc.stop()
  }
}

mapPartitionsWithIndex

  • 函数签名
    • def mapPatitionsWithIndex[U: ClassTag](f:(Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

函数说明

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处 理,哪怕是过滤数据,在处理时同时可以获取当前分区索引

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO 获取索引1的数据
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark03_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - mapPartitionsWithIndex: 将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处
    //                                    理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

    val mapRDD: RDD[Int] = rdd.mapPartitionsWithIndex(
      (index, iter) => {;
        if (index == 1) { // 如果分区索引为1,及第二个分区
          iter
        } else {
          Nil.iterator // 空迭代器,Nil表示空集合
        }
      }
    )

    mapRDD.collect().foreach(println)
    /*
    3
    4
     */
    sc.stop()
  }
}

分区索引下标从0开始

打印索引

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark03_RDD_Operator_Transform1 {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - mapPartitionsWithIndex: 将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处
    //                                    理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))

    // 用mapPartitionsWithIndex查看数据在哪个分区
    val mpiRDD: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex(
      (index, iter) => {
        iter.map(
          num => (index, num) // 或者:(index, _)
        )
      }
    )

    mpiRDD.collect().foreach(println)
    /*
    (1,1)
    (3,2)
    (5,3)
    (7,4)
     */
    sc.stop()
  }
}

flatMap

  • 函数签名
    • def flatMap[U: ClassTag](f: T=> TraversableOnce[U]): RDD[U]

函数说明
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark04_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - flatMap
    val rdd: RDD[List[Int]] = sc.makeRDD(List(List(1, 2), List(3, 4)))

    val flatRDD: RDD[Int] = rdd.flatMap(
      iter => iter
    )
    val flatRDD1: RDD[Int] = rdd.flatMap(
      iter => iter.map(num => num)
    )
    //flatRDD.collect().foreach(println)
    flatRDD1.collect().foreach(println)
    /*
    1
    2
    3
    4
     */

    sc.stop()
  }
}

flatMap进行模式匹配

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark04_RDD_Operator_Transform2 {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - flatMap 模式匹配
    val rdd: RDD[Any] = sc.makeRDD(List(List(1,2),3,List(4,5)))
    println("rdd:" + rdd.toDebugString)

    val flatRDD: RDD[Any] = rdd.flatMap(
      data => {
        data match {
          case list: List[_] => list
          case other => List(other)
        }
      }
    )
    rdd.collect().foreach(println)
    /*
    List(1, 2)
    3
    List(4, 5)
     */
    flatRDD.collect().foreach(println)
    /*
    1
    2
    3
    4
    5
     */

    sc.stop()
  }
}

glom

  • 函数签名
    • def glom(): RDD[Array[T]]

函数说明
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark05_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - glom 将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

    // 转换过程
    // List => Int
    // Int => Array
    val glomRDD: RDD[Array[Int]] = rdd.glom()

    glomRDD.collect().foreach(datas => println(datas.mkString(",")))
    /*
    [1, 2]
    [3, 4]
     */
    sc.stop()
  }
}

求所以分区最大值的和

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark05_RDD_Operator_Transform_Test {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - glom 求各个分区内的最大数之和
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
    val glomRDD: RDD[Array[Int]] = rdd.glom()
    val value: RDD[Int] = glomRDD.map(
      arrays => {
        arrays.max
      }
    )
    println(value.sum()) // 6.0
    println(value.collect().sum) // 6
    sc.stop()
  }
}

groupBy

  • 函数签名
    • def groupBy[K](f: T=> K)(implicit kt: ClassTag[K]):RDD[(K, Iterable[T])]

函数说明

将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样 的操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中
一个组的数据在一个分区中,但是并不是说一个分区中只有一个组

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark06_RDD_Operator_Transform1 {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - groupBy
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

    // groupBy会将数据源中的每一个数据进行分组判断,根据返回的分组key进行分组
    // 相同的key值的数据会放在一个组中
    def groupFunction(num: Int): Int = {
      num % 2
    }

    val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(groupFunction)
    /*
    (0,CompactBuffer(2, 4))
    (1,CompactBuffer(1, 3))
     */

    groupRDD.collect().foreach(println)

    sc.stop()
  }
}

按开头第一个字母分组

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark06_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - groupBy 按开头第一个字母分组
    val rdd: RDD[String] = sc.makeRDD(List("Hello","Spark","Hello","World"), 2)

    // groupBy会将数据源中的每一个数据进行分组判断,根据返回的分组key进行分组
    // 相同的key值的数据会放在一个组中
    val groupRDD: RDD[(Char, Iterable[String])] = rdd.groupBy(_.charAt(0))

    groupRDD.collect().foreach(println)
    /*
    (H,CompactBuffer(Hello, Hello))
    (S,CompactBuffer(Spark))
    (W,CompactBuffer(World))
     */
    println(groupRDD.toDebugString)
    /*
    (2) ShuffledRDD[2] at groupBy at Spark06_RDD_Operator_Transform.scala:21 []
     +-(2) MapPartitionsRDD[1] at groupBy at Spark06_RDD_Operator_Transform.scala:21 []
        |  ParallelCollectionRDD[0] at makeRDD at Spark06_RDD_Operator_Transform.scala:17 []
     */
    sc.stop()

  }
}

filter

  • 函数签名
    • def filter(f: T=>Boolean): RDD[T]

函数说明

将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。 当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark07_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - filter
    //                  filter会根据分区来过滤,过滤之后的分区是不会改变的,
    //                  但过滤后的分区内的数据量会发生改变,可能会出现数据倾斜
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))

    val filterRDD: RDD[Int] = rdd.filter(_ % 2 == 0)
    /*
    2
    4
     */
    filterRDD.collect().foreach(println)

    sc.stop()

  }

}

sample

  • 函数签名
    • def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]

函数说明
根据指定的规则从数据集中抽取数据

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark08_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - sample:根据指定的规则从数据集中抽取数据
    val dataRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))

    // 抽取数据不放回(伯努利算法)
    // 伯努利算法:又叫 0、1分布。例如扔硬币,要么正面,要么反面。
    // 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于就不要
    // 第一个参数:抽取的数据是否放回,false:不放回,true:放回
    // 第二个参数:表示数据源中每条数据被抽取的概率,范围在[0, 1]之间,0:全不取;1:全取
    // 第三个参数:随机数种子,如果不传递第三个参数,那么使用的是系统时间
    val dataRDD1: RDD[Int] = dataRDD.sample(false, 0.4)

    // 抽取数据放回(泊松算法)
    // 第一个参数:抽取的数据是否放回,true:放回;false:不放回
    // 第二个参数:重复数据的几率,范围大于等于0。表示每一个元素被期望抽取到的次数(依然会出现有些数据没被抽取到,有些数据会抽取多次)
    // 第三个参数:随机数种子,如果不传递第三个参数,那么使用的是系统时间
    val dataRDD2: RDD[Int] = dataRDD.sample(true, 2)

    dataRDD1.collect().foreach(x => println("dataRDD1:" + x))
    /* 结果是随机的
    dataRDD1:1
    dataRDD1:3
     */

    dataRDD2.collect().foreach(x => println("dataRDD2:" + x))
    /* 结果是随机的
    dataRDD2:1
    dataRDD2:1
    dataRDD2:2
    dataRDD2:2
    dataRDD2:3
    dataRDD2:4
    dataRDD2:4
    dataRDD2:4
     */


    sc.stop()
  }
}

采用不同的算法取决于第一个withReplacement参数。不同算法的第二个参数表达的意思也有点偏差

一般这种可以用来判断造成数据倾斜的数据???

distinct

  • 函数签名
    • def distinct()(implicit ord: Ordering[T] = null): RDD[T]
    • def distinct(numPatitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

函数说明
将数据集中重复的数据去重

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark09_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - distinct
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4))

    val dataRDD1: RDD[Int] = rdd.distinct()
    val dataRDD2: RDD[Int] = rdd.distinct(2)

    println("dataRDD1:" + dataRDD1.collect().mkString(",")) // dataRDD1:1,2,3,4
    println("dataRDD2:" + dataRDD2.collect().mkString(",")) //dataRDD2:4,2,1,3

    sc.stop()

  }

}

scala的distinct底层用的是HashSet进行去重。而Spark的底层用的是reduceKeyBy进行去重。

coalesce

  • 函数签名
    • def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T]

函数说明

根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率

当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark10_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - coalesce
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)

    val coalRDD: RDD[Int] = rdd.coalesce(2)
    coalRDD.saveAsTextFile("/output")
    /* 因为没有开启shuffle,所以[1,2]一个分区、[3,4,5,6]一个分区
    [1, 2]
    [3, 4, 5, 6]
     */
    val coalRDD2: RDD[Int] = rdd.coalesce(2, true)
    coalRDD2.saveAsTextFile("/output")
    /*
    因为开启了shuffle,所以分区内的数据会被打乱整合,然后分区,
    最终数据会均匀分散到2个文件中
     */
    sc.stop()

  }
}

coalesce默认是不开启shuffle的,即不会打乱分区,而是将一整个分区内的元素合并到其他分区里。
如果想扩大分区是一定要启动shuffle的,这与下面的repartition算子相似。

repartition

  • 函数签名
    • def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

函数说明

该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的 RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition 操作都可以完成,因为无论如何都会经 shuffle 过程。

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark11_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - repartition
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)

    val reparRDD: RDD[Int] = rdd.repartition(4)
    sc.stop()

  }

}

repartition的底层默认调用的是coalesce

因此为了方便记忆、操作。一般用coalesce进行缩小分区,repartition进行扩大分区。

sortBy

  • 函数签名
    • def sortBy[K](f: (T)=>K, ascending: Boolean = true, numPartitions: Int = this.partitions.length) (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

函数说明

该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理 的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一 致。中间存在 shuffle 的过程

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark12_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - sortBy 开启shuffle,但分区数不变
    val rdd: RDD[Int] = sc.makeRDD(List(6, 3, 5, 2, 1, 4), 2)
    val sortByRDD: RDD[Int] = rdd.sortBy(x => x)
    val dataRDD: RDD[Array[Int]] = sortByRDD.glom()
    dataRDD.collect().foreach(data=>println(data.mkString(",")))
    /*
    1,2,3
    4,5,6
     */

    // 默认情况是升序,修改 ascending 参数为 false 则为降序
    val sortByRDD2: RDD[Int] = rdd.sortBy(x => x, false)
    val dataRDD2: RDD[Array[Int]] = sortByRDD2.glom()
    dataRDD2.collect().foreach(data=>println(data.mkString(",")))
    /*
    6,5,4
    3,2,1
     */
    sc.stop()

  }
}

intersection、union、subtract、zip

分别对应交集、并集、差集、拉链

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark13_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - intersection、union、subtract、zip
    val rdd1: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6))
    val rdd2: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))

    // 交集
    val interRDD: RDD[Int] = rdd1.intersection(rdd2)
    println(interRDD.collect().mkString(",")) // 3,4

    // 并集
    val unionRDD: RDD[Int] = rdd1.union(rdd2)
    println(unionRDD.collect().mkString(",")) // 3,4,5,6,1,2,3,4

    // 差集
    val subRDD: RDD[Int] = rdd1.subtract(rdd2)
    println(subRDD.collect().mkString(",")) // 5,6

    // 拉链
    val zipRDD: RDD[(Int, Int)] = rdd1.zip(rdd2)
    println(zipRDD.collect().mkString(",")) // (3,1),(4,2),(5,3),(6,4)

    sc.stop()

  }

}

注意:上面的intersection、union、subtract都是要两边的数据类型相同,而zip没有这个限制

scala中的拉链与Spark中的拉链也有些许区别。
spark中的拉链虽然没有数据类型的限制,但有分区和分区内的元素限制。须具备相同的分区数,并且分区内的元素个数必须相等

partitionBy

  • 函数签名
    • def partitionBy(partitioner: Partitioner): RDD[(K, V)]

函数说明
将数据按照指定Partitioner重新进行分区。Spark默认的分区器是HashPartitioner

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark14_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - partitionBy:Key-Value类型
    val rdd: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6))
    // rdd.partitionBy()// 单独的Value类型是不能使用partitionBy方法的

    // 将Vaule转换成Key-Value类型
    val mapRDD: RDD[(Int, Int)] = rdd.map((_, 1))
    // partitionBy根据指定的分区规则对数据进行重分区
    val parRDD: RDD[(Int, Int)] = mapRDD.partitionBy(new HashPartitioner(2))
    val glomRDD: RDD[Array[(Int, Int)]] = parRDD.glom()
    glomRDD.collect().foreach(data => println(data.mkString(",")))
    /* 4和6一个分区,3和5一个分区
    (4,1),(6,1)
    (3,1),(5,1)
     */

    sc.stop()
  }

}


nonNegativeMod方法的内容如下:

reduceByKey

  • 函数签名
    • def reduceByKey(func: (V, V) => V): RDD[(K, V)]
    • def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

函数说明
可以将数据按照相同的 Key 对 Value 进行聚合

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark15_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - reduceByKey:Key-Value类型
    val rdd: RDD[(String,Int)] = sc.makeRDD(List(("a",1), ("a",2), ("b",3), ("b",4)))

    // reduceByKey : 相同的key的数据进行value数据的聚合操作
    //                如果只有一个key,那么是不会参与计算的
    val reduceRDD: RDD[(String, Int)] = rdd.reduceByKey((x, y) => (x + y))
    reduceRDD.collect().foreach(println)
    /*
    (a,3)
    (b,7)
     */

    sc.stop()

  }

}

groupByKey

  • 函数签名
    • def groupByKey(): RDD[(K, Iterable[V])]
    • def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
    • def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

函数说明
将数据源的数据根据 key 对 value 进行分组

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark16_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - groupByKey:Key-Value类型
    val rdd: RDD[(String,Int)] = sc.makeRDD(List(("a",1), ("a",2), ("b",3), ("b",4)))
    // groupByKey :将数据源中的数据,相同key的数据分组在一个组中,形成一个对偶元组
    //              元组中的第一个元素是key
    //              元组中的第二个元素是相同key的value的集合
    val groupByKeRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()

    groupByKeRDD.collect().foreach(println)
    /*
    (a,CompactBuffer(1, 2))
    (b,CompactBuffer(3, 4))
     */

    val groupRDD: RDD[(String, Iterable[(String, Int)])] = rdd.groupBy(_._1)
    groupRDD.collect().foreach(println)
    /*
    (a,CompactBuffer((a,1), (a,2)))
    (b,CompactBuffer((b,3), (b,4))) 
     */
    sc.stop()

  }

}

reduceByKey与groupByKey的区别:
reduceByKey与groupByKey都会进行shuffle操作,总所周知,Spark的shuffle因为要落盘,是非常耗性能的,而reduceByKey会在shuffle阶段前进行一个预处理操作,类似读写文件操作中的缓冲区,可以极大的提高工作效率。

aggregateByKey

  • 函数签名
    • def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U)=> U): RDD[(K, U)]

aggregateByKey函数是柯里化

函数说明
将数据根据不同的规则进行分区内计算和分区间计算

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark17_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - aggregateByKey:Key-Value类型
    val rdd: RDD[(String,Int)] = sc.makeRDD(List(("a",1), ("a",2), ("b",3), ("b",4)))

    val aggByKeyRDD: RDD[(String, Int)] = rdd.aggregateByKey(1)(_ + _, _ + _)
    aggByKeyRDD.collect().foreach(println)
    /*
    (a,3)
    (b,7)
     */

    // TODO : 取出每个分区内相同 key 的最大值然后分区间相加
    //aggregateByKey算子是函数柯里化,存在两个参数列表
    // 1. 第一个参数列表中的参数表示初始值
    //      主要用于当碰见第一个key的时候,和value进行分区内计算
    // 2. 第二个参数列表中含有两个参数
    //    2.1 第一个参数表示分区内的计算规则
    //    2.2 第二个参数表示分区间的计算规则
    val rdd2: RDD[(String, Int)] = sc.makeRDD(List(
      ("a", 1), ("a", 2), ("c", 3),
      ("b", 4), ("c", 5), ("c", 6)
    ), 2)
    /*分区情况
    0:("a", 1), ("a", 2), ("c", 3)
    1:("b", 4), ("c", 5), ("c", 6)
     */
    val resultRDD: RDD[(String, Int)] = rdd2.aggregateByKey(10)(
      (x, y) => math.max(x, y),
      (x, y) => x + y
    )
    /*内部规则
    (1,10),(2,10),(3,10) 取max => (a, 10), (c, 10)
    (4,10),(5,10),(6,10) 取max => (b, 10), (c, 10)
     */
    resultRDD.collect().foreach(println)
    /*
    (b,10)
    (a,10)
    (c,20)
     */
    sc.stop()

  }

}

aggregateByKey内部计算逻辑(当zeroValue为0时):

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark17_RDD_Operator_Transform1 {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // TODO : 取出每个分区内相同 key 的最大值然后分区间相加
    //aggregateByKey算子是函数柯里化,存在两个参数列表
    // 1. 第一个参数列表中的参数表示初始值
    //      主要用于当碰见第一个key的时候,和value进行分区内计算
    // 2. 第二个参数列表中含有两个参数
    //    2.1 第一个参数表示分区内的计算规则
    //    2.2 第二个参数表示分区间的计算规则
    val rdd2: RDD[(String, Int)] = sc.makeRDD(List(
      ("a", 1), ("a", 2), ("b", 3),
      ("b", 4), ("b", 5), ("a", 6)
    ), 2)
    /*分区情况
    0:("a", 1), ("a", 2), ("b", 3)
    1:("b", 4), ("b", 5), ("a", 6)
     */
    val resultRDD: RDD[(String, Int)] = rdd2.aggregateByKey(0)(
      (x, y) => math.max(x, y),
      (x, y) => x + y
    )
    resultRDD.collect().foreach(println)
    /*
    (b,8)
    (a,8)
     */
    sc.stop()

  }
}

foldByKey

当分区内与分区间的计算规则一样,那就可以用foldByKey代替aggregateByKey的使用

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark17_RDD_Operator_Transform2 {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - foldByKey:Key-Value类型
    //      当分区内与分区间的计算规则相同时,spark提供了简化的方法
    val rdd: RDD[(String,Int)] = sc.makeRDD(List(("a",1), ("a",2), ("b",3), ("b",4)))

    val aggByKeyRDD: RDD[(String, Int)] = rdd.aggregateByKey(0)(_ + _, _ + _)
    aggByKeyRDD.collect().foreach(println)
    /*
    (a,3)
    (b,7)
     */
    rdd.foldByKey(0)(_+_).collect().foreach(println)
    /*
    (a,3)
    (b,7)
     */

    sc.stop()
  }
}

foldByKey又与reduceByKey相比多了一个初始值的设置

combineByKey

  • 函数签名
    • def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K,C)]

函数说明

最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于 aggregate(),combineByKey()允许用户返回值的类型与输入不一致。

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark19_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO transform算子 - combineByKey

    val rdd: RDD[(String, Int)] = sc.makeRDD(List(
      ("a", 1), ("a", 2), ("b", 3),
      ("b", 4), ("b", 5), ("a", 6)
    ), 2)

    // combineByKey : 方法需要三个参数
    // 第一个参数表示:将相同key的第一个数据进行结构的转换,实现操作
    // 第二个参数表示:分区内的计算规则
    // 第三个参数表示:分区间的计算规则
    val newRDD: RDD[(String, (Int, Int))] = rdd.combineByKey(
      v => (v, 1), // 指定不同key的第一个value元素格式
      (t: (Int, Int), v) => { // 因为需要动态识别,所以要事先标明数据类型,否则有肯能会出错
        (t._1 + v, t._2 + 1) // t._1 + v:value的总数
      },
      (t1: (Int, Int), t2: (Int, Int)) => {
        (t1._1 + t2._1, t1._2 + t2._2)
      }
    )
    val resultRDD: RDD[(String, Int)] = newRDD.mapValues {
      case (sum, count) => sum / count
    }
    resultRDD.collect().foreach(println)
    /*
    (b,4)
    (a,3)
     */
    sc.stop()
  }
}

几种key-value算子

reduceByKey、aggregateByKey、foldByKey、combineByKey这几种算子的底层,调用的都是combineByKeyWithClassTag方法。只不过传递的参数不一样,进而实现了不同的功能。

join

  • 函数签名
    • def join[W](other: RDD[(K, W)]): RDD[(K, (V,W))]

函数说明
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark20_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO transform算子 - join
    val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
    val rdd2: RDD[(String, String)] = sc.makeRDD(List(("b", "B"), ("a", "A"), ("d", "D"), ("a", "a")))
    val joinRDD: RDD[(String, (Int, String))] = rdd1.join(rdd2)
    joinRDD.collect().foreach(println)
    /* 此时的c是没有的
    (a,(1,A))
    (a,(1,a))
    (b,(2,B))
     */

    sc.stop()
  }
}

与zip有点区别,zip不会根据key来进行连接。

在它的底层实现原理中,join会遍历每个元素,如果key值相同,就会进行匹配。就是会进行笛卡尔积
总结:

  • 两个不同数据源的够s,相同的key的value会连接起来,形成元组
  • 如果两个数据源中key没有匹配上,那么数据不会出现在结果中
  • 如果两个数据源中key有多个相同的,会依此匹配,可能会出现笛卡尔积,数据量就会几何形增长,会导致性能降低

leftOuterJoin

  • 函数签名
    • def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

函数说明
类似SQL语句的左外连接

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark21_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO transform算子 - leftOuterJoin、rightOuterJoin
    val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
    val rdd2: RDD[(String, String)] = sc.makeRDD(List(("b", "B"), ("a", "A"), ("d", "D"), ("a", "a")))
    val leftJoinRDD: RDD[(String, (Int, Option[String]))] = rdd1.leftOuterJoin(rdd2)
    leftJoinRDD.collect().foreach(println)
    /*
    (a,(1,Some(A)))
    (a,(1,Some(a)))
    (b,(2,Some(B)))
    (c,(3,None))
     */

    sc.stop()
  }
}

None表示右表没有数据

cogroup

  • 函数签名
  • def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

函数说明
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD

package com.pzb.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/3/16-15:17
 */
object Spark22_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO transform算子 - cogroup = connect + group
    val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
    val rdd2: RDD[(String, String)] = sc.makeRDD(List(("b", "B"), ("a", "A"), ("d", "D"), ("a", "a")))

    val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[String]))] = rdd1.cogroup(rdd2)
    cogroupRDD.collect().foreach(println)
    /*
    (a,(CompactBuffer(1),CompactBuffer(A, a)))
    (b,(CompactBuffer(2),CompactBuffer(B)))
    (c,(CompactBuffer(3),CompactBuffer()))
    (d,(CompactBuffer(),CompactBuffer(D)))
     */

    sc.stop()
  }
}

其它的连接方法只能连接一个rdd,而这种可以连接多种

Action算子介绍

这些算子之所以被称为行动算子,是因为作业会随着这些的算子触发而提交作业。
以collect为例:

回想下,transform算子的内部是不是都是返回 new RDD

几种简单行动算子

reduce、collect、count、first、take、takeOrdering、top。

package com.pzb.rdd.operator.action

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/4/1-21:10
 */
object Spark01_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))

    // TODO - 行动算子 - reduce
    // 所谓的行动算子,其实就是触发作业(Job)执行方法
    // 底层代码调用的是环境对象的runJob方法
    // 底层代码中会创建ActionJob,并提交执行
    val i: Int = rdd.reduce(_ + _)
    println(i) // 这里的输出也不再需要collect了
    /*
    10
     */
    // collect:采集 -方法会将不同分区的数据按照分区顺序采集到Driver端内存中,形成数组
    val ints: Array[Int] = rdd.collect()
    println(ints.mkString(",")) // 1,2,3,4

    // count:返回RDD数据源中的元素个数
    val cnt: Long = rdd.count()
    println(cnt) // 4

    // first:获取数据源中数据的第一个
    val first: Int = rdd.first()
    println(first) // 1

    // take:取RDD数据源中的多少个
    val ints1: Array[Int] = rdd.take(3)
    println(ints1.mkString(",")) // 1,2,3

    // takeOrdered:数据排序后(默认升序),取N个数据
    val rdd1: RDD[Int] = sc.makeRDD(List(4, 2, 3, 1))
    val ints2: Array[Int] = rdd1.takeOrdered(3)
    // TODO 若想实现降序,则调用翻转方法
    val ints3: Array[Int] = rdd1.takeOrdered(3)(Ordering[Int].reverse)
    println(ints2.mkString(",")) // 1,2,3
    println(ints3.mkString(",")) // 4,3,2

    // top:数据排序后(默认降序),取N个数据
    val ints4: Array[Int] = rdd1.top(3)
    println(ints4.mkString(",")) // 4,3,2
    sc.stop()
  }
}

aggregate

  • 函数签名
    • def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

函数说明
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
有点类似转换算子中的aggregateByKey,但有些许出入。

package com.pzb.rdd.operator.action

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/4/1-21:10
 */
object Spark02_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

    // TODO - 行动算子
    val result: Int = rdd.aggregate(0)(_ + _, _ + _)
    println(result) // 10

    // aggregateByKey : 初始值只会参与分区内计算
    // aggregate : 初始值不仅会参与分区内计算,还会参与分区间计算
    val result2: Int = rdd.aggregate(10)(_ + _, _ + _)
    // 计算过程:[10 + 1 + 2] + [10 + 3 + 4] + 10 
    println(result2) // 40
    sc.stop()
  }
}

两者的区别:

  • aggregateByKey : 初始值只会参与分区内计算
  • aggregate : 初始值不仅会参与分区内计算,还会参与分区间计算

fold

  • 函数签名
    • def fold(zeroValue: T)(op: (T, T) => T): T

函数说明
折叠操作,aggregate的简化版操作。类似aggregateByKey与foldByKey之间的差别

package com.pzb.rdd.operator.action

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/4/1-21:10
 */
object Spark03_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

    // TODO - 行动算子
    val result: Int = rdd.fold(10)(_ + _)
    println(result) // 40
    sc.stop()
  }
}

countByKey

  • 函数签名
    • def countByKey(): Map[K, Long]

函数说明
统计每种key的个数

package com.pzb.rdd.operator.action

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/4/1-21:10
 */
object Spark04_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 2, 1), 2)
    val rdd2: RDD[(String, Int)] = sc.makeRDD(List(
      ("a", 1), ("a", 2), ("a", 3)
    ), 2)

    // TODO - countByKey - 统计每个key的个数
    val stringToLong: collection.Map[String, Long] = rdd2.countByKey()
    println(stringToLong) // Map(a -> 3)
    val intToLong: collection.Map[Int, Long] = rdd.countByValue()
    println(intToLong) // Map(4 -> 1, 2 -> 2, 1 -> 2, 3 -> 1)

    sc.stop()
  }
}

save相关算子

  • 函数签名
    • def saveAsTextFile(path: String): Unit
    • def saveAsObjectFile(path: String): Unit
    • def saveAsSequenceFile(path: String, code: Option[Class[_ <: CompressionCode]]=None): Unit

函数说明
将数据保存到不同格式的文件中

package com.pzb.rdd.operator.action

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO
 * @author 海绵先生
 * @date 2023/4/1-21:10
 */
object Spark05_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd: RDD[(String, Int)] = sc.makeRDD(List(
      ("a", 1), ("a", 2), ("a", 3)
    ), 2)

    // TODO - save - 保存文件
    rdd.saveAsTextFile("output2")
    rdd.saveAsObjectFile("output3")
    // saveAsSequenceFile方法要求数据的格式必须为K-V类型
    rdd.saveAsSequenceFile("output4")

    sc.stop()
  }
}

具体区别百度吧~~~

SparkCore中的几种save算子介绍

foreach

  • 函数签名
    • def foreach(f: T => Unit): Unit = withScope{
      val cleanF = sc.clean(f)
      sc.runJob(this,(iter: Iterator[T]) => iter.foreach(cleanF))
      }

函数说明
分布式遍历RDD中的每一个元素,调用指定函数

package com.pzb.rdd.operator.action

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
* @Description TODO
* @author 海绵先生
* @date 2023/4/1-21:10
*/
object Spark06_RDD_Operator_Action {
 def main(args: Array[String]): Unit = {
   val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
   val sc = new SparkContext(sparkConf)

   val rdd: RDD[(String, Int)] = sc.makeRDD(List(
     ("a", 1), ("a", 2), ("a", 3)
   ), 2)

   // TODO - foreach - 分布式遍历 RDD中的每一个元素
   // 因为有collect,所以此处的foreach是在Driver端内存集合的循环遍历方法
   rdd.collect().foreach(println)
   /*
   (a,1)
   (a,2)
   (a,3)
    */
   println("****************")
   // 此处的foreach 是在Executor端内存数据打印,所以是乱序的
   rdd.foreach(println)
   /*
   (a,2)
   (a,3)
   (a,1)
    */

   sc.stop()
 }
}

算子:Operator(操作)

  • RDD的方法和Scala集合对象的方法不一样。
  • 集合对象的方法都是在同一个节点的内存中完成的。
  • RDD的方法可以将计算逻辑发送到Executor端(分布式执行)。
  • 为了区分不同的处理效果,所以将RDD的方法称之为算子。
  • RDD的方法外部的操作都是在Driver端执行的,而方法内部的逻辑代码是在Executor端执行。

理解collect算子

使用foreach小细节

collect算子会把数据拉回到Driver端,而算子内部的代码是在Executor端执行了,因此在单独使用foreach时,可能会遇到序列化的错误问题。

下面是使用foreach时因序列化发生的错误问题:

package com.pzb.rdd.operator.action

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO 当使用foreach遇到序列化问题时
 * @author 海绵先生
 * @date 2023/4/1-21:10
 */
object Spark06_RDD_Operator_Action_Test {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3))

    val user = new User()
    // TODO - foreach
    // 报错,发生异常:Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    //                at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
    //                Caused by: java.io.NotSerializableException: com.pzb.rdd.operator.action.Spark06_RDD_Operator_Action_Test$User
    rdd.foreach(
      data=>{
        println("age = " + (user.age + data))
      }
    )

    sc.stop()
  }
  class User{
    val age: Int = 30
  }
}

这是因为Driver和Executor端的问题。user是在算子外声明的,其数据存在Driver端,而foreach是在Executor端执行的,因此数据的交互需要传输;两端的数据交流是需要通过网络传输完成的,因此需要序列化

正确用法:

package com.pzb.rdd.operator.action

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Description TODO 当使用foreach遇到序列化问题时
 * @author 海绵先生
 * @date 2023/4/1-21:10
 */
object Spark06_RDD_Operator_Action_Test {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3))

    val user = new User()
    // TODO - foreach
    // 报错,发生异常:Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    //                at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
    //                Caused by: java.io.NotSerializableException: com.pzb.rdd.operator.action.Spark06_RDD_Operator_Action_Test$User
    rdd.foreach(
      data=>{
        println("age = " + (user.age + data))
      }
    )
    /*
    age = 31
    age = 33
    age = 32
     */

    sc.stop()
  }
  // 类通过继承Serializable类实现简单的自动序列化
  class User extends Serializable {
    val age: Int = 30
  }
}

除了继承类,还可以使用样例类,默认自动继承;