10-MapReduce(2)

发布时间 2023-07-03 17:57:08作者: tree6x7

1. MR 基本原理

1.1 MapTask 并行度机制

MapTask 的并行度指的是 map 阶段有多少个并行的 MapTask 共同处理任务。

a. 数据切片

map 阶段的任务处理并行度势必影响到整个 Job 的处理速度。

1G 的数据,启动 8 个 MapTask,可以提高集群的并发处理能力。那么 1K 的数据,也启动 8 个 MapTask,会提高集群性能吗?MapTask 并行任务是否越多越好呢?哪些因素影响了 MapTask 并行度?

一个 MapReduce Job 的 map 阶段并行度由客户端在提交 Job 时决定,即客户端提交 Job 之前会对待处理数据进行「逻辑切片」,切片完成会形成切片规划文件(job.split),每个逻辑切片最终对应启动一个 MapTask。

数据单位区分:

  • 数据块
    • Block 是 HDFS 物理上把数据分成一块一块。
    • 数据块是 HDFS 存储数据单位
  • 数据切片
    • 数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。
    • 数据切片是 MapReduce 程序输入数据的计算单位,一个切片会对应启动一个 MapTask。

FileInputFormat 中默认的切片机制:

b. 源码解析

Job 提交流程源码解析:

FileInputFormat 切片源码解析:

bytesRemaining/splitSize > 1.1不满足的话,那么最后所有剩余的会作为一个切片。从而不会形成例如 129M 文件规划成两个切片的局面。

1.2 常见 InputFormat 实现

在运行 MapReduce 程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。那么,针对不同的数据类型,MapReduce 是如何读取这些数据的呢?

FileInputFormat 常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat 和自定义 InputFormat 等。

a. FileInputFormat

(1)切片机制

  1. 简单地按照文件的内容长度进行切片;
  2. 切片大小,默认等于 block 大小;
  3. 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片。

(2)举例说明

  1. 输入数据有两个文件
    file1.txt    320M
    file2.txt    10M
    
  2. 经过 FileInputFormat 的切片机制运算后,形成的切片信息如下:
    file1.txt.split1  --  0~128
    file1.txt.split2  --  128~256
    file1.txt.split3  --  256~320
    file2.txt.split1  --  0~10M
    

(3)切片大小的参数配置

  1. 源码中计算切片大小的公式
    // mapreduce.input.fileinputformat.split.minsize = 1
    // mapreduce.input.fileinputformat.split.maxsize = Long.MAXValue
    long splitSize = Math.max(minSize, Math.min(maxSize, blockSize));
    
  2. 切片大小设置
    • maxsize(切片最大值):参数如果调得比 blockSize 小,则会让切片变小,而且就等于配置的这个参数的值;
    • minsize(切片最小值):参数调的比 blockSize 大,则可以让切片变得比 blockSize 还要大。
  3. 获取切片信息 API
    // 获取切片的文件名称
    String name = inputSplit.getPath().getName();
    // 根据文件类型获取切片信息
    FileSplit inputSplit = (FileSplit) context.getInputSplit();
    

b. TextInputFormat

TextInputFormat 是默认的 FileInputFormat 实现类。按行读取每条记录。

  • key 是存储该行在整个文件中的起始字节偏移量,LongWritable 类型。
  • value 是这行的内容,不包括任何行终止符(换行符和回车符),Text 类型。

以下是一个示例,比如,一个分片包含了如下 4 条文本记录。

Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enterprise

每条记录表示为以下键/值对:

(0, Rich learning form)
(20,Intelligent learning engine)
(49,Learning more convenient)
(74,From the real demand for more close to the enterprise)

c. CombineTextInputFormat

应用场景:将输入的大量小文件合并成一个切片统一处理。

比如有 4 个 1~5M 不等的小文件。如果不做任何处理,会发现切片个数为 4。但如果在 Driver 类中增加如下代码,运行程序并观察运行的切片数量。

// 如果不设置 InputFormat,默认用的是 TextInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);

// 虚拟存储切片最大值设置为20M
CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);

1.3 ReduceTask 并行度机制

ReduceTask 并行度同样影响整个 Job 的执行并发度和执行效率,与 MapTask 的并发数由切片数决定不同,ReduceTask 数量的决定是可以直接手动设置

如果数据分布不均匀,就有可能在 reduce 阶段产生数据倾斜。

注:ReduceTask 数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有 1 个 ReduceTask。

a. 输出文件数

默认情况下不管 map 阶段有多少个并发执行 task,到 reduce 阶段,所有的结果都将由一个 task 来处理,并且最终结果输出到一个文件中。

改变 ReduceTask 个数

通过 Job 提供的方法,可以修改 ReduceTask 的个数。默认情况下不设置,ReduceTask 个数为 1。

设置 job.setNumReduceTasks(xxx) 后:

输出结果文件个数和 ReduceTask 个数关系

通过修改不同 ReduceTask 个数值,得出输出结果文件的个数和 ReduceTask 个数是一种对等关系。也就是说有几个 ReduceTask,最终程序就输出几个文件。

b. 数据分区

默认情况下,MapReduce 是只有一个 ReduceTask 来进行数据的处理。这就使得不管输入的数据量多大,最终的结果都是输出到一个文件中。当改变 ReduceTask 个数的时候,作为 MapTask 就会涉及到分区的问题,即:MapTask 输出的结果如何分配给各个 ReduceTask 来处理?

MapReduce 默认分区规则是 HashPartitioner。分区的结果和 map 输出的 key 有关。

e.g. part-r-00000 存放的就是 key 取模后为 0 的,part-r-00001 存放的是 key 取模后为 1 的 ...

NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
                   JobConf job,
                   TaskUmbilicalProtocol umbilical,
                   TaskReporter reporter
                  ) throws IOException, ClassNotFoundException {
  collector = createSortingCollector(job, reporter);
  partitions = jobContext.getNumReduceTasks();
  if (partitions > 1) {
    partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
      ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
  } else {
    partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
      @Override
      public int getPartition(K key, V value, int numPartitions) {
        return partitions - 1;
      }
    };
  }
}

【小结】

  • ReduceTask 个数的改变导致了数据分区的产生,而不是有数据分区导致了 ReduceTask 个数改变!
  • 数据分区的核心是「分区规则」,即如何分配数据给各个 ReduceTask。默认的规则可以保证只要 map 阶段输出的 key 一样,数据就一定可以分区到同一个 ReduceTask,但是不能保证数据平均分区;
  • ReduceTask 个数的改变还会导致输出结果文件不再是一个整体,而是输出到多个文件中。
  • ReduceTask 个数与 MapTask 输出的分区文件个数:
    • ReduceTask.cnt > Partition.cnt,则会多产生几个空的输出文件 part-r-000xx;
    • ReduceTask.cnt < Partition.cnt,则有一部分分区数据无处安放,会抛异常!
    • ReduceTask.cnt = 1,则不管 MapTask 端输出多少个分区文件,最终结果都交给这一个 ReduceTask,最终也就只会产生 1 个结果文件 part-r-00000。

1.4 Combiner 规约

a. 产生背景

MapReduce 是一种具有两个执行阶段的分布式计算程序,Map 阶段和 Reduce 阶段之间会涉及到跨网络数据传递。每一个 MapTask 都可能会产生大量的本地输出,这就导致跨网络传输数据量变大,网络 IO 性能低。

比如 WordCount 单词统计案例,假如文件中有 1000 个单词,其中 999 个为 hello,这将产生 999 个 <hello,1> 的键值对在网络中传递,性能及其低下。

b. 数据规约

「数据归约」是指在尽可能保持数据原貌的前提下,最大限度地精简数据量。

Combiner 是 MapReduce 程序中除了 Mapper 和 Reducer 之外的一种组件,默认情况下不启用。

Combiner 中文叫做数据规约,是 MapReduce 的一种优化手段。Combiner 的作用就是对 map 端的输出先做一次局部合并,以减少在 map 和 reduce 节点之间的数据传输量。

Combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件。但 Combiner 组件的父类就是 Reducer,所以也可以说 Combiner 本质就是 Reducer。

Combiner 和 Reducer 的区别在于运行的位置:

  • Combiner 是在每一个 MapTask 所在的节点本地运行,是“局部聚合”;
  • Reducer 是对所有 MapTask 的输出结果计算,是“全局聚合”。

c. 使用说明

Combiner 具体实现步骤:

  1. 自定义 CustomCombiner 类,继承 Reducer 并重写 reduce();
  2. job.setCombinerClass(CustomCombiner.class)

使用注意事项:

  • Combiner 能够应用的前提是不能影响最终的业务逻辑,而且 Combiner 的输出 kv 应该跟 Reducer 的输入 kv 类型要对应起来。
  • 下述场景禁止使用 Combiner,因为这样不仅优化了网络传输数据量,还改变了最终的执行结果:
    • 业务和数据个数相关的
    • 业务和整体排序相关的
  • Combiner 组件不是禁用,而是慎用。用的好提高程序性能,用不好,改变程序结果且不易发现。

1.5 OutputFormat

a. 概述

OutputFormat 是 MapReduce 输出的基类,所有实现 MapReduce 输出的类都实现了 OutputFormat 接口。下面我们介绍几种常见的 OutputFormat 实现类。

默认输出格式是 TextOutputFormat。那可不可以自定义 OutputFormat 呢?

  1. 自定义一个类继承 FileOutputFormat;
  2. 改写 RecordWriter,具体改写输出数据的方法 write()

b. 案例

过滤输入的 log 日志,包含 tree 的网站输出到一个文件,不包含 tree 的行内容输出到另一个文件。

LogMapper

public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
        context.write(value, NullWritable.get());
    }
}

LogReducer

public class LogReducer extends Reducer<Text, NullWritable, Text, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {
        for (NullWritable value : values) {
            context.write(key, NullWritable.get());
        }
    }
}

LogOutputFormat

public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {

    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        LogRecordWriter lrw = new LogRecordWriter(job);
        return lrw;
    }
}

LogRecordWriter

public class LogRecordWriter extends RecordWriter<Text, NullWritable> {

    FSDataOutputStream out1 = null;
    FSDataOutputStream out2 = null;

    public LogRecordWriter(TaskAttemptContext job) {
        try {
            FileSystem fileSystem = FileSystem.get(job.getConfiguration());
            out1 = fileSystem.create(new Path("/.../outputformat/1.log"));
            out2 = fileSystem.create(new Path("/.../outputformat/2.log"));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {
        String log = key.toString();
        if (log.contains("tree")) {
            out1.writeBytes(key + "\n");
        } else {
            out2.writeBytes(key + "\n");
        }
    }

    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        IOUtils.closeStream(out1);
        IOUtils.closeStream(out2);
    }
}

LogDriver

public class LogDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(LogDriver.class);
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 设置自定义的 OutputFormat
        job.setOutputFormatClass(LogOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path("/.../outputformat/input"));
        // 虽然我们自定义了 OutputFormat,但是因为我们的 OutputFormat 继承自 FileOutputFormat
        // 而 FileOutputFormat 要输出一个 _SUCCESS 文件,所以在这还得指定一个输出目录
        FileOutputFormat.setOutputPath(job, new Path("/.../outputformat/output"));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);

    }
}

2. MR 执行流程

2.1 MapTask 工作机制

整个 map 阶段流程大体如下图所示。

  • Input File 通过 split 被逻辑切分为多个 split 文件,每个逻辑切片最终对应启动一个 MapTask,当前 split 通过 LineRecordReader 按行读取内容给自定义 Mapper 进行处理;
  • 数据被 map 处理结束之后交给 OutputCollector 收集器,对其结果 key 进行分区(默认使用 hash 分区),然后写入 buffer,每个 MapTask 都有一个内存缓冲区,存储着 map 的输出结果,当缓冲区快满的时候(80%)需要将缓冲区的数据以一个临时文件的方式 spill 溢出到磁盘;
  • 当整个 MapTask 结束后再对磁盘中这个 MapTask 产生的所有临时文件做合并,生成最终的正式输出文件,然后等待 ReduceTask 来拉数据。

(1)首先,读取数据组件 InputFormat(默认 TextInputFormat)会通过 getSplits() 方法对输入目录中文件进行逻辑切片规划得到 splits,有多少个 split 就对应启动多少个 MapTask。split 与 block 的对应关系默认是一对一。

(2)将输入文件切分为 splits 之后,由 RecordReader 对象(默认 TextInputFormat.LineRecordReader)进行读取,以 \n 作为分隔符,读取一行数据,返回 <key,value>。key 表示每行首字符偏移值,value 表示这一行文本内容。读取 split 返回 <key,value>,进入用户自己继承的 Mapper 类中,执行用户重写的 map() 方法。上一步每解析出来的一个 <k,v>,调用一次 map() 方法。

(3)map 逻辑走完之后,将 map 的每条结果通过 context.write 进行 collect 数据收集。在 collect 中,会先对其进行分区处理,默认使用 HashPartitioner。

MapReduce 提供 Partitioner 接口,它的作用就是根据 key 或 value 及 reduce 的数量来决定当前的这对输出数据最终应该交由哪个 ReduceTask 处理。默认对 key hash 后再以 ReduceTask 数量取模。默认的取模方式只是为了平均 reduce 的处理能力,如果用户自己对 Partitioner 有需求,可以订制并设置到 Job 上。

(4)接下来,会将数据写入内存,内存中这片区域叫做「环形缓冲区」,缓冲区的作用是批量收集 map 结果,减少磁盘 IO 的影响。我们的 key/value 对以及 partition 的结果都会被写入缓冲区。当然写入之前,key 与 value 值都会被序列化成字节数组。

  • 环形缓冲区其实是一个数组,数组中存放着 key、value 的序列化数据和 key、value 的元数据信息,包括 partition、key 的起始位置、value 的起始位置以及 value 的长度。

  • 环形结构是一个抽象概念。缓冲区是有大小限制,默认是 100MB。当 MapTask 的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。

  • 这个从内存往磁盘写数据的过程被称为 Spill,中文可译为“溢写”。这个溢写是由单独线程来完成,不影响往缓冲区写 map 结果的线程。溢写线程启动时不应该阻止 map 的结果输出,所以整个缓冲区有个溢写的比例 spill.percent,这个比例默认是 0.8,也就是当缓冲区的数据已经达到阈值(bufferSize x spillPercent = 100MB x 0.8 = 80MB),溢写线程启动,锁定这 80MB 的内存,执行溢写过程。MapTask 的输出结果还可以往剩下的 20MB 内存中写,互不影响。

(5)当溢写线程启动后,需要对这 80MB 空间内的 key 做排序(QuickSort)。排序是 MR 模型默认的行为,这里的排序也是对序列化的字节做的排序。

如果 Job 设置过 Combiner,那么现在就是使用 Combiner 的时候了。将有相同 key 的 kv 对的 value 加起来,减少溢写到磁盘的数据量。Combiner 会优化 MapReduce 的中间结果,所以它在整个模型中会多次使用。

那哪些场景才能使用 Combiner呢?从这里分析,Combiner 的输出是 Reducer 的输入,Combiner 绝不能改变最终的计算结果。Combiner 只应该用于那种 reduce 的输入 key/value 与输出 key/value 类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner 的使用一定得慎重,如果用好,它对 Job 执行效率有帮助,反之会影响 reduce 的最终结果。

(6)每次溢写会在磁盘上生成一个临时文件(写之前判断是否有 Combiner),如果 map 的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。当整个数据处理结束之后开始对磁盘中的临时文件进行 merge 合并(MergeSort),因为最终的文件只有一个,写入磁盘,并且为这个文件提供了一个索引文件,以记录每个 ReduceTask/Partition 对应数据的偏移量。

至此 map 整个阶段结束。

2.2 ReduceTask 工作机制

reduce 大致分为 copy、sort、reduce 三个阶段,重点在前两个阶段。

(1)copy 阶段

  • reduce 进程启动一些数据 copy 线程,通过 HTTP 方式请求 MapTask 获取属于自己的文件。
  • 具体包含一个 eventFetcher 来获取已完成的 map 列表,由 Fetcher 线程去 copy 数据,在此过程中会启动两个 merge 线程,分别为 inMemoryMerger 和 onDiskMerger,分别将内存中的数据 merge 到磁盘和将磁盘中的数据进行 merge。

(2)merge 阶段

  • copy 过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比 map 端的更为灵活。merge 有 3 种形式:内存到内存、内存到磁盘、磁盘到磁盘。

  • 默认情况下第 1 种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的 merge。与 map 端类似,这也是「溢写」的过程,这个过程中如果你有设置 Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。

  • 第 2 种 merge 方式一直在运行,直到没有 map 端的数据时才结束,然后启动第 3 种磁盘到磁盘的 merge 方式生成最终的文件。

(3)sort 阶段

把分散的数据合并成一个大的数据后(finalMerge),还会再对合并后的数据排序(默认按 key 的字典序排序)。

(4)对排序后的键值对调用 reduce()

  • 默认是 key 相等的键值对组成一组,调用一次 reduce() 方法。所谓分组就是纯粹的前后两个 key 比较(base 经历过排序),如果相等就继续判断下一个是否和当前相等。如果 reduce 的 key 是自定义 bean,我们只需要 bean 里面的某个属性相同就认为这样的 key 是相同的,并把 value 放同一个 values,进行一次 reduce() 的处理,这时我们就需要自定义 GroupComparator 来“欺骗”reducer 了。
  • reduce 处理的结果会调用默认输出组件 TextOutputFormat 写入到指定的目录文件中。TextOutputFormat 默认是一次输出写一行,key&value 之间以制表符 \t 分割。

2.3 shuffle 机制

shuffle 的本意是洗牌、混洗的意思,把一组有规则的数据尽量打乱成无规则的数据。而在 MapReduce 中,shuffle 更像是洗牌的逆过程,指的是将 map 端的无规则输出按指定的规则“打乱”成具有一定规则的数据,以便 reduce 端接收处理。

shuffle 是 Mapreduce 的核心,它分布在 Mapreduce 的 map 阶段和 reduce 阶段。一般把从 map 产生输出开始到 reduce 取得数据作为输入之前的过程称作 shuffle

shuffle 是 MapReduce 程序的核心与精髓,但也是 MapReduce 被诟病最多的地方所在:MapReduce 相比较于 Spark、Flink 计算引擎慢的原因,跟 shuffle 机制有很大的关系。因为 shuffle 频繁涉及到数据在内存、磁盘之间的多次往复。

a. map:shuffle

  • Partition 阶段】将 MapTask 的结果收集输出到默认大小为 100M 的环形缓冲区,保存之前会对 key 进行分区的计算(默认 Hash 分区)。
  • Spill 阶段】当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘之前需要对数据进行一次排序的操作,如果配置了 Combiner,还会将有相同分区号和 key 的数据进行排序。
  • Merge 阶段】把所有溢出的临时文件进行一次合并操作,以确保一个 MapTask 最终只产生一个中间数据文件。

b. reduce:shuffle

  • Copy 阶段】ReduceTask 启动 Fetcher 线程到已经完成 MapTask 的节点上复制一份属于自己的数据,这些数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值的时候,就会将数据写到磁盘之上。
  • Merge 阶段】在 ReduceTask 远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操作。
  • Sort 阶段】在对数据进行合并的同时,会进行排序操作,由于 MapTask 阶段已经对数据进行了局部的排序,ReduceTask 只需保证 copy 的数据的最终整体有效性即可。

2.4 小结

a. re:工作机制

>>>MapTask<<<

(1)Read 阶段:MapTask 通过 InputFormat 获得的 RecordReader(底层是 LineRecordReader),从输入 InputSplit 中解析出一个个 key/value。

(2)Map 阶段:该节点主要是将解析出的 key/value 交给用户编写 map() 处理,并产生一系列新的 key/value。

(3)Collect 阶段:在用户编写 map() 中,当数据处理完成后,一般会调用 OutputCollector.collect() 输出结果。在该函数内部,它会将生成的 key/value 分区(调用 Partitioner),并写入一个环形内存缓冲区中。

(4)Spill 阶段:即“溢写”,当环形缓冲区满后,MapReduce 会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

  1. 利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号 Partition 进行排序,然后按照 key 进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照 key 有序;
  2. 按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件 output/spillN.out(N 表示当前溢写次数)中。如果用户设置了 Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作;
  3. 将分区数据的元信息写到内存索引数据结构 SpillRecord 中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过 1MB,则将内存索引写到文件 output/spillN.out.index 中。

(5)Merge 阶段:当所有数据处理完成后,MapTask 对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。

  • 当所有数据处理完后,MapTask 会将所有临时文件合并成一个大文件,并保存到文件 output/file.out 中,同时生成相应的索引文件 output/file.out.index。
  • 在进行文件合并过程中,MapTask 以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并 mapreduce.task.io.sort.factor(默认 10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。
  • 让每个 MapTask 最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

>>>ReduceTask<<<

(1)Copy 阶段:ReduceTask 从各个 MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

(2)Sort 阶段:在远程拷贝数据的同时,ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照 MapReduce 语义,用户编写 reduce() 输入数据是按 key 进行聚集的一组数据。为了将 key 相同的数据聚在一起,Hadoop 采用了基于排序的策略。由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此,ReduceTask 只需对所有数据进行一次归并排序即可。

(3)Reduce 阶段:reduce() 将计算结果写到 HDFS 上。

b. key 的重要性

在 MapReduce 编程中,核心是牢牢把握住每个阶段的输入输出 key 是什么!因为 MR 中很多默认行为都跟 key 相关。

  • 排序:key 的字典序 a-z 正序
  • 分区:key.hashcode % reducetask 个数
  • 分组:key 相同的分为一组

最重要的是,如果觉得默认的行为不满足业务需求,MapReduce 还支持自定义排序、分区、分组的规则,这将使得编程更加灵活和方便。

c. 区分常用 API

自定义规约

job.setCombinerClass(xxx.class);

自定义分区规则

job.setPartitionerClass(xxx.class);

自定义分区内排序规则

job.setSortComparatorClass(xxx.class);

自定义分组

job.setGroupingComparatorClass(xxx.class);

3. COVID-19 统计案例

现有一份某日美国各县的新冠疫情统计数据,包括累计确诊病例、累计死亡病例。使用 MapReduce 对疫情数据进行各种分析统计。

3.1 自定义对象序列化

每个州累计确诊案例数、累计死亡案例数

  1. 自定义对象 CovidCountBean,用于封装每个州的确诊病例数和死亡病例数;
  2. 注意自定义对象需要实现 Writable 接口;
  3. 以 state 作为 map 阶段输出的 key、CovidCountBean 作为 value,这样属于同一个州的数据就会变成一组进行 reduce 处理,进行累加即可得出每个州累计确诊病例。

a. CovidCntBean

public class CovidCntBean implements Writable {

    private long cases;
    private long deaths;

    // ~ 省略 get/set 方法 ~

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(cases);
        out.writeLong(deaths);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        cases = in.readLong();
        deaths = in.readLong();
    }

    @Override
    public String toString() {
        return String.format("%d\t%d", cases, deaths);
    }

}

b. CovidSumMapper

public class CovidSumMapper extends Mapper<LongWritable, Text, Text, CovidCntBean> {

    private Text outK = new Text();
    private CovidCntBean outV = new CovidCntBean();

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, CovidCntBean>.Context context) throws IOException, InterruptedException {
        String[] fields = value.toString().split(",");
        // 日期, 县, 州, (县编码,) 累计确诊病例, 累计死亡病例
        outK.set(fields[2]);
        outV.setCases(Long.parseLong(fields[fields.length - 2]));
        outV.setDeaths(Long.parseLong(fields[fields.length - 1]));
        context.write(outK, outV);
    }
}

c. CovidSumReducer

public class CovidSumReducer extends Reducer<Text, CovidCntBean, Text, CovidCntBean> {
    private CovidCntBean outV = new CovidCntBean();

    @Override
    protected void reduce(Text key, Iterable<CovidCntBean> values, Reducer<Text, CovidCntBean, Text, CovidCntBean>.Context context) throws IOException, InterruptedException {
        long totalCases = 0, totalDeaths = 0;
        // key相同的会放到一组
        for (CovidCntBean bean : values) {
            totalCases += bean.getCases();
            totalDeaths += bean.getDeaths();
        }
        outV.setCases(totalCases);
        outV.setDeaths(totalDeaths);
        context.write(key, outV);
    }
}

d. CovidSumDriver

public class CovidSumDriver extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        // 1. 创建配置对象
        Configuration config = new Configuration();
        config.set("mapreduce.framework.name", "local");
        // 2. 使用工具类 ToolRunner 提交作业
        int status = ToolRunner.run(config, new CovidSumDriver(), args);
        // 3. 退出客户端
        System.exit(status);
    }

    @Override
    public int run(String[] args) throws Exception {
        // 1. 创建Job实例
        Job job = Job.getInstance(getConf(), CovidSumDriver.class.getSimpleName());
        // 2. 设置MR程序运行的主类
        job.setJarByClass(CovidSumDriver.class);
        // 3. 设置MR程序的Mapper类
        job.setMapperClass(CovidSumMapper.class);
        // 4. 设置MR程序的Reducer类
        job.setReducerClass(CovidSumReducer.class);
        // 5. 指定Map阶段输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(CovidCntBean.class);
        // 6. 指定Reduce阶段输出的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(CovidCntBean.class);
        // 7. 配置本次Job的输入/输出数据路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }
}

运行结果:

3.2 自定义排序

每个州的确诊案例数进行倒序排序

如果你的需求中需要根据某个属性进行排序,不妨把这个属性作为 key。因为 MapReduce 中 key 有默认排序行为的。

  1. 如果你的需求是正序,并且数据类型是 Hadoop 封装好的类型。这种情况下不需要任何修改,直接使用 Hadoop 类型作为 key 即可。因为 Hadoop 封装好的类型已经实现了排序规则;
  2. 如果你的需求是倒序或数据类型是自定义对象,则需要重写排序规则:实体类实现 Comparable 接口重写 compareTo() 方法。

本例是以州为统计单位做降序排,所以输入数据直接用 3.1#output 数据。

a. CovidCntDescBean

public class CovidCntDescBean implements WritableComparable<CovidCntDescBean> {

    private long cases;
    private long deaths;

    // ~ 省略 get/set 方法 ~

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(cases);
        out.writeLong(deaths);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        cases = in.readLong();
        deaths = in.readLong();
    }

    @Override
    public String toString() {
        return String.format("%d\t%d", cases, deaths);
    }

    @Override
    public int compareTo(CovidCntDescBean o) {
        // 倒序排序
        return (int) (o.getCases() - this.cases);
    }
}

b. CovidDescMapper

public class CovidDescMapper extends Mapper<LongWritable, Text, CovidCntDescBean, Text> {

    private CovidCntDescBean outK = new CovidCntDescBean();
    private Text outV = new Text();

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CovidCntDescBean, Text>.Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split("\t");
        outV.set(split[0]);
        outK.setCases(Long.parseLong(split[1]));
        outK.setDeaths(Long.parseLong(split[2]));
        context.write(outK, outV);
    }
}

c. CovidDescReducer

public class CovidDescReducer extends Reducer<CovidCntDescBean, Text, Text, CovidCntDescBean> {

    @Override
    protected void reduce(CovidCntDescBean key, Iterable<Text> values, Reducer<CovidCntDescBean, Text, Text, CovidCntDescBean>.Context context) throws IOException, InterruptedException {
        // 排序好之后,reduce 会进行分组操作,分组规则是判断 key 是否相等。
        // 本业务中,使用自定义对象作为 key 并且没有重写equals&hash,默认就会比较对象的地址,所以每个 kv 都是一个单独的分组
        // 不过也没关系,在case1已经分组统计过了,本次MR只是为了倒序排序
        // <{cases=1101  deaths=13}, California>, ...
        context.write(values.iterator().next(), key);
    }
}

d. CovidDescDriver

public class CovidDescDriver extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        // 1. 创建配置对象
        Configuration config = new Configuration();
        config.set("mapreduce.framework.name", "local");
        // 2. 使用工具类 ToolRunner 提交作业
        int status = ToolRunner.run(config, new CovidDescDriver(), args);
        // 3. 退出客户端
        System.exit(status);
    }

    @Override
    public int run(String[] args) throws Exception {
        // 1. 创建Job实例
        Job job = Job.getInstance(getConf(), CovidDescDriver.class.getSimpleName());
        // 2. 设置MR程序运行的主类
        job.setJarByClass(CovidDescDriver.class);
        // 3. 设置MR程序的Mapper类
        job.setMapperClass(CovidDescMapper.class);
        // 4. 设置MR程序的Reducer类
        job.setReducerClass(CovidDescReducer.class);
        // 5. 指定Map阶段输出的kv类型
        job.setMapOutputKeyClass(CovidCntDescBean.class);
        job.setMapOutputValueClass(Text.class);
        // 6. 指定Reduce阶段输出的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(CovidCntDescBean.class);
        // 7. 配置本次Job的输入/输出数据路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }
}

运行结果:

3.3 自定义分区

将不同州的输出到不同文件中,属于同一个州的各个县输出到同一个结果文件中。

输出到不同文件中 → ReduceTask 有多个(>1) → 默认只有1个,如何有多个?→ 通过 job.setNumReduceTasks(N) 设置 → 当有多个 ReduceTask 意味着数据分区 → 默认分区规则是什么?HashPartitioner → 默认分区规则符合你的业务需求么?→ 符合,直接使用 | 不符合,自定义分区。

a. CovidPartitionMapper

public class CovidPartitionMapper extends Mapper<LongWritable, Text, Text, Text> {

    Text outKey = new Text();

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split(",");
        // 以 state 作为 key 参与分区,通过自定义分区器实现:同一个州的数据 - 同一个分区 - 同一个 ReduceTask
        String state = split[2];
        outKey.set(state);
        context.write(outKey, value);
    }
}

b. CovidPartitionReducer

public class CovidPartitionReducer extends Reducer<Text, Text, Text, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
        for (Text value : values) {
            context.write(value, NullWritable.get());
        }
    }
}

c. CovidStatePartitioner

public class CovidStatePartitioner extends Partitioner<Text, Text> {

    public static Map<String, Integer> registerState = new HashMap<>();

    static {
        // 事先找出所有州
        registerState.put("Kansas", 0);
        registerState.put("Kentucky", 1);
        registerState.put("Virginia", 2);
        registerState.put("Washington", 3);
        registerState.put("Louisiana", 4);
        registerState.put("Iowa", 5);
        registerState.put("Indiana", 6);
        registerState.put("Florida", 7);
        registerState.put("Louisiana", 8);
        registerState.put("Colorado", 9);
    }

    @Override
    public int getPartition(Text key, Text value, int numPartitions) {
        Integer code = registerState.get(key.toString());
        return code != null ? code : 10;
    }
}

d. CovidPartitionDriver

public class CovidPartitionDriver extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        // 1. 创建配置对象
        Configuration config = new Configuration();
        config.set("mapreduce.framework.name", "local");
        // 2. 使用工具类 ToolRunner 提交作业
        int status = ToolRunner.run(config, new CovidPartitionDriver(), args);
        // 3. 退出客户端
        System.exit(status);
    }

    @Override
    public int run(String[] args) throws Exception {
        // 1. 创建Job实例
        Job job = Job.getInstance(getConf(), CovidPartitionDriver.class.getSimpleName());
        // 2. 设置MR程序运行的主类
        job.setJarByClass(CovidPartitionDriver.class);
        // 3. 设置MR程序的Mapper类
        job.setMapperClass(CovidPartitionMapper.class);
        // 4*. 设置Partition自定义分区类!
        job.setPartitionerClass(CovidStatePartitioner.class);
        // 5. 设置MR程序的Reducer类
        job.setReducerClass(CovidPartitionReducer.class);
        // 6*. 设置ReducerTask数目。因为 ReduceTask 个数的改变,影响的输出文件个数!
        // - ReduceTask.cnt > Partition.cnt 会产生差值个空文件
        // - ReduceTask.cnt < Partition.cnt 会抛异常
        // - ReduceTask.cnt == 1 只会产生1个文件
        job.setNumReduceTasks(11);
        // 7. 指定Map阶段输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        // 8. 指定Reduce阶段输出的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        // 9. 配置本次Job的输入/输出数据路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }
}

运行结果:

3.4 自定义分组

各州累计病例数最多 top3 县

  1. 在 map 阶段将“州 state、县 county、县确诊病例 cases”通过自定义对象封装,作为 key 输出;
  2. 重写对象的排序规则,首先根据州的正序排序,如果州相等,按照确诊病例数 cases 倒序排序,发送到 reduce;
  3. 在 reduce 端利用自定义分组规则,将州相同的分为一组,然后取前 N 个即是 TopN。

a. CovidBean

public class CovidBean implements WritableComparable<CovidBean> {

    private String state;
    private String county;
    private long cases;
    private long deaths;

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(state);
        out.writeUTF(county);
        out.writeLong(cases);
        out.writeLong(deaths);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        state = in.readUTF();
        county = in.readUTF();
        cases = in.readLong();
        deaths = in.readLong();
    }

    @Override
    public int compareTo(CovidBean o) {
        // 首先根据state正序排序,如果state相等,按照确诊病例数cases倒序排序
        int result = state.compareTo(o.state);
        return result == 0 ? (int) (o.cases - cases) : result;
    }

    public CovidBean() {}

    public void fillBean(String state, String county, long cases, long deaths) {
        this.state = state;
        this.county = county;
        this.cases = cases;
        this.deaths = deaths;
    }

    // ~ 省略 get/set 方法 ~

    @Override
    public String toString() {
        return String.format("%s\t%s\t%d\t%d\t", state, county, cases, deaths);
    }
}

b. CovidTopNMapper

public class CovidTopNMapper extends Mapper<LongWritable, Text, CovidBean, NullWritable> {

    private CovidBean outK = new CovidBean();
    private NullWritable outV = NullWritable.get();

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CovidBean, NullWritable>.Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split(",");
        outK.fillBean(split[2], split[1], Long.parseLong(split[split.length - 2]), Long.parseLong(split[split.length - 1]));
        context.write(outK, outV);
    }
}

c. CovidTopNReducer

public class CovidTopNReducer extends Reducer<CovidBean, NullWritable, CovidBean, NullWritable> {
    private final int TOP_N = 3;

    @Override
    protected void reduce(CovidBean key, Iterable<NullWritable> values, Reducer<CovidBean, NullWritable, CovidBean, NullWritable>.Context context) throws IOException, InterruptedException {
        // 不遍历迭代器,此时key就是分组中的第一个key,也就是该州确诊病例数最多的县对应的数据
        int nums = 0;
        // key&value 引用的对象不变,但在迭代过程中,kv对象封装内容各自都在变化(对象复用)。
        for (NullWritable value : values) {
            if (nums++ < TOP_N) {
                context.write(key, value);
            } else {
                return;
            }
        }
    }
}

查看 ReduceContextImpl 源码:

private RawKeyValueIterator input;
private Counter inputValueCounter;
private Counter inputKeyCounter;
private RawComparator<KEYIN> comparator;
private KEYIN key;                                  // current key
private VALUEIN value;                              // current value
private boolean firstValue = false;                 // first value in key
private boolean nextKeyIsSame = false;              // more w/ this key
private boolean hasMore;                            // more in file
protected Progressable reporter;
private Deserializer<KEYIN> keyDeserializer;
private Deserializer<VALUEIN> valueDeserializer;
private DataInputBuffer buffer = new DataInputBuffer();
private BytesWritable currentRawKey = new BytesWritable();
private ValueIterable iterable = new ValueIterable();
private DataInputBuffer buffer = new DataInputBuffer();
private BytesWritable currentRawKey = new BytesWritable();
private ValueIterable iterable = new ValueIterable();
private boolean isMarked = false;
private BackupStore<KEYIN,VALUEIN> backupStore;
private final SerializationFactory serializationFactory;
private final Class<KEYIN> keyClass;
private final Class<VALUEIN> valueClass;
private final Configuration conf;
private final TaskAttemptID taskid;
private int currentKeyLength = -1;
private int currentValueLength = -1;

/**
 * 这就是 reduce() 入参迭代器的实现类
 */
protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> {
  @Override
  public VALUEIN next() {
    // ...
    nextKeyValue();
    return value;
    // ...
  }
}

/**
 * Advance to the next key/value pair.
 */
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
  if (!hasMore) {
    key = null;
    value = null;
    return false;
  }
  firstValue = !nextKeyIsSame;
  DataInputBuffer nextKey = input.getKey();
  currentRawKey.set(nextKey.getData(), nextKey.getPosition(), 
                    nextKey.getLength() - nextKey.getPosition());
  buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
  // ===== ↓↓↓ Writable·key 对象复用 ↓↓↓ =====
  key = keyDeserializer.deserialize(key);
  DataInputBuffer nextVal = input.getValue();
  buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength() - nextVal.getPosition());
  // ===== ↓↓↓ Writable·val 对象复用 ↓↓↓ =====
  value = valueDeserializer.deserialize(value);

  currentKeyLength = nextKey.getLength() - nextKey.getPosition();
  currentValueLength = nextVal.getLength() - nextVal.getPosition();

  if (isMarked) {
    backupStore.write(nextKey, nextVal);
  }

  hasMore = input.next();
  if (hasMore) {
    nextKey = input.getKey();
    nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
                                       currentRawKey.getLength(),
                                       nextKey.getData(),
                                       nextKey.getPosition(),
                                       nextKey.getLength() - nextKey.getPosition()
                                      ) == 0;
  } else {
    nextKeyIsSame = false;
  }
  inputValueCounter.increment(1);
  return true;
}

d. CovidGroupComparator

public class CovidGroupComparator extends WritableComparator {

    protected CovidGroupComparator() {
        super(CovidBean.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        CovidBean instance1 = (CovidBean) a;
        CovidBean instance2 = (CovidBean) b;
        return instance1.getState().compareTo(instance2.getState());
    }
}

e. CovidTopNDriver

public class CovidTopNDriver extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        // 1. 创建配置对象
        Configuration config = new Configuration();
        config.set("mapreduce.framework.name", "local");
        // 2. 使用工具类 ToolRunner 提交作业
        int status = ToolRunner.run(config, new CovidTopNDriver(), args);
        // 3. 退出客户端
        System.exit(status);
    }

    @Override
    public int run(String[] args) throws Exception {
        // 1. 创建Job实例
        Job job = Job.getInstance(getConf(), CovidTopNDriver.class.getSimpleName());
        // 2. 设置MR程序运行的主类
        job.setJarByClass(CovidTopNDriver.class);
        // 3. 设置MR程序的Mapper类
        job.setMapperClass(CovidTopNMapper.class);
        // 4*. 设置MR自定义分组!
        job.setGroupingComparatorClass(CovidGroupComparator.class);
        // 5. 设置MR程序的Reducer类
        job.setReducerClass(CovidTopNReducer.class);
        // 6. 指定Map阶段输出的kv类型
        job.setMapOutputKeyClass(CovidBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        // 7. 指定Reduce阶段输出的kv类型
        job.setOutputKeyClass(CovidBean.class);
        job.setOutputValueClass(NullWritable.class);
        // 8. 配置本次Job的输入/输出数据路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }
}

运行结果: