Spark中RDD的Action算子

发布时间 2023-07-03 21:15:57作者: _泡泡

RDD的Action算子

Action算子会触发Job的生成,底层调用的是sparkContext.runJob方法,根据最后一个RDD,从后往前,切分Stage,生成Task
image

saveAsTextFile

将数据以文本的形式保存到文件系统中,一个分区对应一个结果文件,可以指定hdfs文件系统,也可以指定本地文件系统(本地文件系统要写file://协议),数据的写入是下Executor中Task写入的,是多个Task并行写入的。

val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
rdd1.saveAsTextFile("hdfs://node-1.51doit.cn:9000/out2")

collect

每个分区对应的Task,将数据在Executor中,将数据以集合的形式保存到内存中,然后将每个分区对应的数据以数组形式通过网络收集回Driver端,数据按照分区编号有序返回
image


val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 4)
val rdd2 = rdd1.map(_ * 10)
//调用collect方法,是一个Action
val res: Array[Int] = rdd2.collect()
println(res.toBuffer)

collect底层实现:

def collect(): Array[T] = withScope {
  //this代表最后一个RDD,即触发Action的RDD
  //(iter: Iterator[T]) => iter.toArray 函数代表对最后一个进行的处理逻辑,即将每个分区对应的迭代器中的数据迭代处出来,放到内存中
  //最后将没法分区对应的数组通过网络传输到Driver端
  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  //在Driver端,将多个数组合并成一个数组
  Array.concat(results: _*)
}

使用collect方法的注意事项:
如果Driver的内存相对较小,并且每个分区对应的数据比较大,通过网络传输的数据,返回到Driver,当返回到Driver端的数据达到了一定大小,就不收集了,即将一部分无法收集的数据丢弃
如果需要将大量的数据收集到Driver端,那么可以在提交任务的时候指定Driver的内存大小 (--driver-memory 2g)

aggregate

aggregate方式是Action,可以将多个分区的数据进行聚合运算,例如进行相加,比较大小等
aggregate方法可以指定一个初始值,初始值在每个分区进行聚合时会应用一次,全局聚合时会在使用一次

val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 4)

//f1是在Executor端执行的
val f1 = (a: Int, b: Int) => {
  println("f1 function invoked ~~~~")
  a + b
}

//f2实在Driver端执行的
val f2 = (m: Int, n: Int) => {
  println("f2 function invoked !!!!")
  m + n
}

//返回的结果为55
val r1: Int = rdd1.aggregate(0)(f1, f2)

//返回的结果为50055
val r2: Int = rdd1.aggregate(10000)(f1, f2)
val rdd1 = sc.parallelize(List("a", "b", "c", "d"), 2)
val r: String = rdd1.aggregate("&")(_ + _, _ + _)

//返回的回的有两种:应为task的分布式并行运行的,先返回的结果在前面
// &&cd&ab 或 &&ab&cd```

##  reduce
将数据先在每个分区内进行局部聚合,然后将每个分区返回的结果在Driver端进行全局聚合
```Scala

val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 4)
val f1 = (a: Int, b: Int) => {
  println("f1 function invoked ~~~~")
  a + b
}
//f1这个函数即在Executor中执行,又在Driver端执行
//reduce方法局部聚合的逻辑和全局聚合的逻辑是一样的
//局部聚合是在每个分区内完成(Executor)
//全局聚合实在Driver完成的
val r = rdd1.reduce(f1)```

##  sum
 sum方法是Action,实现的逻辑只能是相加
```Scala
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 4)
//sum底层调用的是fold,该方法是一个柯里化方法,第一个括号传入的初始值是0.0
//第二个括号传入的函数(_ + _) ,局部聚合和全局聚合都是相加
val r = rdd1.sum()

fold

fold跟reduce类似,只不过fold是一个柯里化方法,第一个参数可以指定一个初始值

val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 4)
//fold与reduce方法类似,该方法是一个柯里化方法,第一个括号传入的初始值是0.0
//第二个括号传入的函数(_ + _) ,局部聚合和全局聚合都是相加
val r = rdd1.fold(0)(_ + _)

min、max

将整个RDD中全部对应的数据求最大值或最小值,底层的实现是:现在每个分区内求最大值或最小值,然后将每个分区返回的数据在Driver端再进行比较(min、max没有shuffle)

val rdd1 = sc.parallelize(List(5,7 ,9,6,1 ,8,2, 4,3,10), 4)
//没有shuffle
val r: Int = rdd1.max()

count

返回rdd元素的数量,先在每个分区内求数据的条数,然后再将每个分区返回的条数在Driver进行求和

val rdd1 = sc.parallelize(List(5,7 ,9,6,1 ,8,2, 4,3,10), 4)
//在每个分区内先计算每个分区对应的数据条数(使用的是边遍历,边计数)
//然后再将每个分区返回的条数,在Driver进行求和
val r: Long = rdd1.count()

take

返回一个由数据集的前n个元素组成的数组,即从RDD的0号分区开始取数据,take可能触发一到多次Action(可能生成多个Job)因为首先从0号分区取数据,如果取够了,就直接返回,没有取够,再触发Action,从后面的分区继续取数据,直到取够指定的条数为止

val rdd1 = sc.parallelize(List(5,7 ,9,6,1 ,8,2, 4,3,10), 4)
//可能会触发一到多次Action
val res: Array[Int] = rdd1.take(2)

first

返回RDD中的第一个元素,类似于take(1),first返回的不是数组

val rdd1 = sc.parallelize(List(5,7 ,9,6,1 ,8,2, 4,3,10), 4)
//返回RDD中对应的第一条数据
val r: Int = rdd1.first()

top

将RDD中数据按照降序或者指定的排序规则,返回前n个元素

val rdd1 = sc.parallelize(List(
  5, 7, 6, 4,
  9, 6, 1, 7,
  8, 2, 8, 5,
  4, 3, 10, 9
), 4)

val res1: Array[Int] = rdd1.top(2)
//指定排序规则,如果没有指定,使用默认的排序规则
implicit val ord = Ordering[Int].reverse
val res2: Array[Int] = rdd1.top(2)
val res3: Array[Int] = rdd1.top(2)(Ordering[Int].reverse)
top底层调用的使用takeOrdered
Scala
def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
  takeOrdered(num)(ord.reverse)
}

takeOrdered

top底层丢的是takeOrdered,takeOrdered更灵活,可以传指定排序规则。底层是先在每个分区内求topN,然后将每个分区返回的结果再在Diver端求topN
在每个分区内进行排序,使用的是有界优先队列,特点是数据添加到其中,就会按照指定的排序规则排序,并且允许数据重复,最多只存放最大或最小的N个元素

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
  if (num == 0) {
    Array.empty
  } else {
    val mapRDDs = mapPartitions { items =>
      // Priority keeps the largest elements, so let's reverse the ordering.
      //使用有界优先队列
      val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
      queue ++= collectionUtils.takeOrdered(items, num)(ord)
      Iterator.single(queue)
    }
    if (mapRDDs.partitions.length == 0) {
      Array.empty
    } else {
      mapRDDs.reduce { (queue1, queue2) =>
        queue1 ++= queue2 //将多个有界优先队列进行++= ,返回两个有界优先队列最大的N个
        queue1
      }.toArray.sorted(ord)
    }
  }
}

foreach

将数据一条一条的取出来进行处理,函数没有返回

val sc = SparkUtil.getContext("FlowCount", true)

val rdd1 = sc.parallelize(List(
  5, 7, 6, 4,
  9, 6, 1, 7,
  8, 2, 8, 5,
  4, 3, 10, 9
), 4)

rdd1.foreach(e => {
  println(e * 10) //函数是在Executor中执行
})```
使用foreach将数据写入到MySQL中,不好 ,效率低
```Scala
rdd1.foreach(e => {
  //但是不好,为什么?
  //每写一条数据用一个连接对象,效率太低了
  val connection = DriverManager.getConnection("jdbc:mysql://node-1.51doit.cn:3306/doit35?characterEncoding=utf-8", "root", "123456")
  val preparedStatement = connection.prepareStatement("Insert into tb_res values (?)")
  preparedStatement.setInt(1, e)
  preparedStatement.executeUpdate()
})

foreachPartition

和foreach类似,只不过是以分区位单位,一个分区对应一个迭代器,应用外部传的函数,函数没有返回值,通常使用该方法将数据写入到外部存储系统中,一个分区获取一个连接,效率更高

rdd1.foreachPartition(it => {
  //先创建好一个连接对象
  val connection = DriverManager.getConnection("jdbc:mysql://node-1.51doit.cn:3306/doit35?characterEncoding=utf-8", "root", "123456")
  val preparedStatement = connection.prepareStatement("Insert into tb_res values (?)")
  //一个分区中的多条数据用一个连接进行处理
  it.foreach(e => {
    preparedStatement.setInt(1, e)
    preparedStatement.executeUpdate()
  })
  //用完后关闭连接
  preparedStatement.close()
  connection.close()
})