深入MapReduce计算引擎

发布时间 2023-08-13 20:58:34作者: nangk

深入MapReduce计算引擎

MapReduce整体处理过程

MapReduce的运行需要经过input(作业输入)--mapper(业务处理接口)--shuffle(map到reduce之间的数据传输环节)--reducer(业务处理接口)--output(作业输出)

整个过程由Driver作为主入口,如下示例代码:

Driver code

Configuration conf= new Configuration();
Job job = new Job(conf,"My Word Count Program");
job.setJarByClass(WordCount.class);

job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path outputPath = new Path(args[1]);
//Configuring the input/output path from the filesystem into the job
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

Input

作业输入的核心是InputFormat类,作用是规范作业输入和读取数据的格式。

InputFormat涉及三个接口:InputFormat,InputSplit,RecordReader。

InputFormat Interface

包含两个方法

  • getSplit():获取逻辑分片(InputSplit),逻辑分片用于指定输入到每个Mapper任务的文件大小;
  • getRecordReader():获取记录读取器,记录读取器根据逻辑分配读取文件相应位置的数据,以kv的形式传给Mapper。

InputSplit Interface

包含两个方法:

  • getLength():获取每个分片的大小;
  • getLocations():获取每个分片所在的位置。

RecordReader Interface

包含五个方法:

  • close():close the record reader;
  • getCurrentKey():获取当前的key;
  • getCurrentValue():获取当前的value;
  • nextKeyValue():读取下一个key-value对;
  • getProgress():读取当前逻辑分片的进度。

Mapper

Mapper类负责MapReduce计算引擎在Map阶段的业务逻辑处理。其输入输出均是kv形式,具体的业务逻辑由用户自主开发。核心是Map()。

以下为wordcount的mapper类:

mapper类除了提供map方法外,还提供了其他一些api

run()方法的源码如下:

public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
  }

调用run(),通过run()调用setup(),初始化一些信息;再进行一个while循环,将数据一行行循环调用map()方法,最后跳出循环cleanup()释放资源。

扩展:

1. Mapper阶段是一种只移动数据计算逻辑,而不移动数据的模式;mapper中的逻辑会分发到集群的各个节点,并读取节点的本地数据进行处理,最后再写入本地。

2. MapReduce只简单定义了业务处理的输入输出规范,没有丰富的api供业务逻辑开发。相较于Spark来说不易使用。

Reducer

Recuder也是一个业务逻辑实现类,但是输入不同与Mapper,由于Reduce阶段将key相同的value聚集,因此输入的value是集合类型;而输出则同Mapper一致,是kv对形式。Reducer的核心是reduce()。

以下为wordcount的reducer类:

Reducer除了reduce(),同样提供了cleanup() run() setup()。

MapReduce Shuffle

shuffle在官方定义是Mapper输出到Reducer输入的整个,而《Hive性能调优实战》的作者认为shuffle是Mapper类的map()的输出到Reducer类的reduce()的输入整个过程。

下图是MapReduce整体环节的拆解:

可以清晰得看到,map方法结束到reduce方法开始中间经历的步骤。

  1. 在map()方法中,会调用context.write()会将数据计算分区后写入到内存缓冲,当写入数据达到缓冲区(缓冲区大小mapreduce.task.io.sort.mb=100MB)的80%(mapreduce.map.sort.spill.percent=0.8)后,会重新启动一个线程将缓冲区数据写入到hdfs临时目录中。

  2. 在写入hdfs临时目录时,会将数据进行排序,当整个Map阶段结束后,再将临时文件合并成一个文件。

    排序的好处:如果不进行排序,后续Reducer读取该份数据时就需要频繁搜索磁盘,将顺序读变为随机读,会极大降低效率。

    1. Combiner可以进行Map端的聚合(Map端聚合通常指实现Combiner类)。所谓Map端聚合就是在节点先对本地的Mapper进行一次数据聚合,此次数据聚合可以重新编写Conbiner类,也可以使用Reducer类实现业务逻辑。示例如下:

      job.setCombinerClass(IntSumReducer.class)
      

      Map端聚合的好处,限制MapReduce任务的一大瓶颈就是网络传输和io读写。先进行Map端聚合可以减少Suffle过程的数据量,减轻系统磁盘和网络的压力。

Output

Output作业输出主要包含两个类:OutputFormat和OutputCommitter

OutputFormat

包含三个方法:

  • checkOutputSpecs():校验作业的输出规范;
  • getOutputCommitter():获取OutputCommitter对象;
  • getRecordWriter():获取RecordWriter对象,通过该对象将数据写入到HDFS。

OutputCommitter

主要工作:

  • 初始化期间,做作业运行的准备工作;如创建临时目录;
  • 作业完成后,清理作业遗留的文件目录;
  • 检查任务是否需要提交;
  • 提交输出任务,任务完成后就需要提交任务;
  • 丢弃任务提交,任务失败或终止,清理输出。

参考资料:

  1. 《Hive性能调优实战》,林志煌编著。