Spark中RDD的特殊算子和重要概念

发布时间 2023-07-04 19:36:42作者: _泡泡

RDD特殊的算子

cache、persist

将数据缓存到内存,第一次触发Action,才会将数据放入内存,以后在触发Action,可以复用前面内存中缓存的数据,可以提升技术效率
cache和persist的使用场景:一个application多次触发Action,为了复用前面RDD的数据,避免反复读取HDFS(数据源)中的数据和重复计算,可以将数据缓存到内存或磁盘【executor所在的磁盘】,第一次触发action才放入到内存或磁盘,以后会缓存的RDD进行操作可以复用缓存的数据。
一个RDD多次触发Action缓存才有意义,如果将数据缓存到内存,内存不够,以分区位单位,只缓存部分分区的数据,cache底层调用persist,可以指定更加丰富的存储基本,支持多种StageLevel,可以将数据序列化,默认放入内存使用的是java对象存储,但是占用空间大,优点速度快,也可以使用其他的序列化方式
cache和persist方法,严格来说,不是Transformation,应为没有生成新的RDD,只是标记当前rdd要cache或persist

checkpoint

checkpoint使用场景:适合复杂的计算【机器学习、迭代计算】,为了避免中间结果数据丢失重复计算,可以将宝贵的中间结果保存到hdfs中,保证中间结果安全。
在调用rdd的checkpint方法之前,一定要指定checkpoint的目录sc.setCheckPointDir,指的HDFS存储目录,为保证中间结果安全,将数据保存到HDFS中
第一次触发Action,才做checkpoint,会额外触发一个job,这个job的目的就是将结果保存到HDFS中
如果RDD做了checkpoint,这个RDD以前的依赖关系就不在使用了,触发多次Action,checkpoint才有意义,多用于迭代计算
checkpoint严格的说,不是Transformation,只是标记当前RDD要做checkpoint

Spark中的一些重要概念

Application

使用SparkSubmit提交的个计算应用,一个Application中可以触发多次Action,触发一次Action产生一个Job,一个Application中可以有一到多个Job

Job

Driver向Executor提交的作业,触发一次Acition形成一个完整的DAG,一个DAG对应一个Job,一个Job中有一到多个Stage,一个Stage中有一到多个Task

DAG

概念:有向无环图,是对多个RDD转换过程和依赖关系的描述,触发Action就会形成一个完整的DAG,一个DAG对应一个Job

Stage

概念:任务执行阶段,Stage执行是有先后顺序的,先执行前的,在执行后面的,一个Stage对应一个TaskSet,一个TaskSet中的Task的数量取决于Stage中最后一个RDD分区的数量

Task

概念:Spark中任务最小的执行单元,Task分类两种,即ShuffleMapTask和ResultTask
Task其实就是类的实例,有属性(从哪里读取数据),有方法(如何计算),Task的数量决定并行度,同时也要考虑可用的cores

TaskSet

保存同一种计算逻辑多个Task的集合,一个TaskSet中的Task计算逻辑都一样,计算的数据不一样

任务执行原理分析

WordCount程序有多个RDD

该Job中,有多少个RDD,多少个Stage,多少个TaskSet,多个Task,Task的类型有哪些?

 object WordCount {

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
    val sc: SparkContext = new SparkContext(conf)
    
    sc.textFile(args(0))
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_+_)
      .saveAsTextFile(args(1))
    
    sc.stop()
  }
}

RDD的数量分析

• 读取hdfs中的目录有两个输入切片,最原始的HadoopRDD的分区为2,以后没有改变RDD的分区数量,RDD的分区数量都是2
• 在调用reduceByKey方法时,有shuffle产生,要划分Stage,所以有两个Stage
• 第一个Stage的并行度为2,所以有2个Task,并且为ShuffleMapTask。第二个Stage的并行度也为2,所以也有2个Task,并且为ResultTask,所以一共有4个Task

spark的任务上次的逻辑计划图

image

下面的的物理执行计划图,会生成Task,生成的Task如下

image

Stage和Task的类型

Stage有两种类型,分别是ShuffleMapStage和ResultStage,ShuffleMapStage生成的Task叫做ShuffleMapTask,ResultStage生成的Task叫做ResultTask

• ShuffleMapTask
1.可以读取各种数据源的数据
2.可以读取Shuffle的中间结果(Shuffle Read)
3.为shuffle做准备,即应用分区器,将数据溢写磁盘(ShuffleWrite),后面一定还会有其他的Stage

• ResultTask
1.可以读取各种数据源的数据
2.可以读取Shuffle的中间结果(Shuffle Read)
3.是整个job中最后一个阶段对应的Task,一定会产生结果数据(就是将产生的结果返回的Driver或写入到外部的存储系统)

多种情况:
第一种:
image

第二种
image

第三种
image