11-MapReduce(3)

发布时间 2023-07-03 19:33:31作者: tree6x7

1. Counter 计数器

1.1 概述

在执行 MapReduce 程序的时候,控制台输出信息中通常有下面所示片段内容:

    File System Counters
        FILE: Number of bytes read=136988
        FILE: Number of bytes written=589973
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
    Map-Reduce Framework
        Map input records=3245
        Map output records=3245
        Map output bytes=81674
        Map output materialized bytes=88170
        Input split bytes=136
        Combine input records=0
        Spilled Records=3245
        Failed Shuffles=0
        Merged Map outputs=0
        GC time elapsed (ms)=17
        Total committed heap usage (bytes)=1029177344
    File Input Format Counters 
        Bytes Read=136795
  INFO - Counters: 30
    File System Counters
        FILE: Number of bytes read=450348
        FILE: Number of bytes written=1269337
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
    Map-Reduce Framework
        Map input records=3245
        Map output records=3245
        Map output bytes=81674
        Map output materialized bytes=88170
        Input split bytes=136
        Combine input records=0
        Combine output records=0
        Reduce input groups=55
        Reduce shuffle bytes=88170
        Reduce input records=3245
        Reduce output records=55
        Spilled Records=6490
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=17
        Total committed heap usage (bytes)=2058354688
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=136795
    File Output Format Counters 
        Bytes Written=1221

可以发现,输出信息中的核心词是 Counters,中文叫做“计数器”。在进行 MapReduce 运算过程中,许多时候,用户希望了解程序的运行情况。Hadoop 内置的计数器功能收集作业的主要统计信息,可以帮助用户理解程序的运行情况,辅助用户诊断故障。

这些记录了该程序运行过程的的一些信息的计数,如 Map input records=3245,表示 Map 有 3245 条记录。可以看出来这些内置计数器可以被分为若干个组,即对于大多数的计数器来说,Hadoop 使用的组件分为若干类。

1.2 内置计数器

Hadoop 为每个 MapReduce 作业维护一些内置的计数器,这些计数器报告各种指标,例如和 MapReduce 程序执行中每个阶段输入输出的数据量相关的计数器,可以帮助用户进行判断程序逻辑是否生效、正确。

Hadoop 内置计数器根据功能进行分组。每个组包括若干个不同的计数器,分别是:MapReduce 任务计数器(Map-Reduce Framework)、文件系统计数器(File System Counters)、作业计数器(Job Counters)、输入文件任务计数器(File Input Format Counters)、输出文件计数器(File Output Format Counters)。

需要注意的是,内置的计数器都是 MapReduce 程序中全局的计数器,跟 MapReduce 分布式运算没有关系,不是所谓的每个局部的统计信息。

(1)Map-Reduce Framework Counters

计数器名称 说明
MAP_INPUT_RECORDS 所有 mapper 已处理的输入记录数
MAP_OUTPUT_RECORDS 所有 mapper 产生的输出记录数
MAP_OUTPUT_BYTES 所有 mapper 产生的未经压缩的输出数据的字节数
MAP_OUTPUT_MATERIALIZED_BYTES 所有 mapper 输出后确实写到磁盘上字节数
COMBINE_INPUT_RECORDS 所有 combiner(如果有)已处理的输入记录数
COMBINE_OUTPUT_RECORDS 所有 combiner(如果有)已产生的输出记录数
REDUCE_INPUT_GROUPS 所有 reducer 已处理分组的个数
REDUCE_INPUT_RECORDS 所有 reducer 已经处理的输入记录的个数。每当某个 reducer 的迭代器读一个值时,该计数器的值增加。
REDUCE_OUTPUT_RECORDS 所有 reducer 输出记录数
REDUCE_SHUFFLE_BYTES shuffle 时复制到 reducer 的字节数
SPILLED_RECORDS 所有 map 和 reduce 任务溢出到磁盘的记录数
CPU_MILLISECONDS 一个任务的总 CPU 时间,以毫秒为单位,可由 /proc/cpuinfo 获取。
PHYSICAL_MEMORY_BYTES 一个任务所用的物理内存,以字节数为单位,可由 /proc/meminfo 获取。
VIRTUAL_MEMORY_BYTES 一个任务所用虚拟内存的字节数,由 /proc/meminfo 获取。

(2)File System Counters Counters

计数器名称 说明
BYTES_READ 程序从文件系统中读取的字节数
BYTES_WRITTEN 程序往文件系统中写入的字节数
READ_OPS 文件系统中进行的读操作的数量(例如 open 操作、filestatus 操作)
LARGE_READ_OPS 文件系统中进行的大规模读操作的数量
WRITE_OPS 文件系统中进行的写操作的数量(例如 create 操作、append 操作)

(3)Job Counters

计数器名称 说明
Launched map tasks 启动的 map 任务数,包括以“推测执行”方式启动的任务。
Launched reduce tasks 启动的 reduce 任务数,包括以“推测执行”方式启动的任务。
Data-local map tasks 与输人数据在同一节点上的 map 任务数
Total time spent by all maps in occupied slots (ms) 所有 map 任务在占用的插槽中花费的总时间(毫秒)
Total time spent by all reduces in occupied slots (ms) 所有 reduce 任务在占用的插槽中花费的总时间(毫秒)
Total time spent by all map tasks (ms) 所有 MapTask 花费的时间
Total time spent by all reduce tasks (ms) 所有 ReduceTask 花费的时间

(4)File Input/Output Format Counters

计数器名称 说明
读取的字节数(BYTES_READ) 由 map 任务通过 FilelnputFormat 读取的字节数
写的字节数(BYTES_WRITTEN) 由 map 任务(针对仅含 map 的作业)或者 reduce 任务通过 FileOutputFormat 写的字节数

1.3 自定义计数器

虽然 Hadoop 内置的计数器比较全面,给作业运行过程的监控带了方便,但是对于一些业务中的特定要求(统计过程中对某种情况发生进行计数统计)MapReduce 还是提供了用户编写自定义计数器的方法。最重要的是,计数器是全局的统计,避免了用户自己维护全局变量的不利性。

自定义计数器的使用分为 2 步:

  1. 首先通过 context.getCounter() 方法获取一个全局计数器,创建的时候需要指定计数器所属的组名和计数器的名字;

      /**
       * Get the Counter for the given groupName and counterName.
       * @param counterName counter name
       * @return the Counter for the given groupName and counterName
       */
      public Counter getCounter(String groupName, String counterName);
    
  2. 然后在程序中需要使用计数器的地方,调用 counter 提供的方法即可,如 +1 操作。

2. DB 操作

2.1 概述

通常企业会使用关系型数据来存储业务相关的数据,但随着数据的规模越来越大,尤其是像 MySQL 这种,在单表超过 5 千万条记录时,尽管对表使用了特定的存储引擎和索引优化,但依然不可避免的存在性能下降问题。

此时,我们可以通过使用 MapReduce 从 MySQL 中定期迁移使用频率较低的历史数据到 HDFS 中,一方面可以降低对 MySQL 的存储和计算负载,另一方面,通过分布式计算引擎可以更加高效的处理过去的历史数据。

对于 MapReduce 框架来说,使用 Inputformat 进行数据读取操作,读取的数据首先由 Mapper 处理,然后交给 Reducer 处理,最终使用 OutputFormat 进行数据的输出操作。默认情况下,输入输出的组件实现都是针对文本数据处理的,分别是 TextInputFormat、TextOutputFormat。

为了方便 MapReduce 直接访问关系型数据库,Hadoop 提供了 DBInputFormat 和 DBOutputFormat 两个类。其中 DBInputFormat 负责从数据库中读取数据,而 DBOutputFormat 负责把数据最终写入数据库中。

  • DBInputFormat 类用于从 SQL 表读取数据。DBInputFormat 底层一行一行读取表中的数据,返回键值对。其中 key 是 LongWritable 类型,表中数据的记录行号,从 0 开始;value 是 DBWritable 类型,表示该行数据对应的对象类型。
  • DBOutputFormat 将 reduce 输出发送到 SQL 表。DBOutputFormat 接收键值对,其中 key 必须具有扩展 DBWritable 的类型。

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface DBWritable

/* Objects that are read from/written to a database should implement DBWritable. DBWritable, is similar to Writable except that the write(PreparedStatement) method takes a PreparedStatement, and readFields(ResultSet) takes a ResultSet. */

/* Implementations are responsible for writing the fields of the object to PreparedStatement, and reading the fields of the object from the ResultSet. */

// Example:

// If we have the following table in the database :
 CREATE TABLE MyTable (
   counter        INTEGER NOT NULL,
   timestamp      BIGINT  NOT NULL,
 );
 
// then we can read/write the tuples from/to the table with :
public class MyWritable implements Writable, DBWritable {
   private int counter;
   private long timestamp;
       
   // Writable#write() implementation
   public void write(DataOutput out) throws IOException {
     out.writeInt(counter);
     out.writeLong(timestamp);
   }
       
   // Writable#readFields() implementation
   public void readFields(DataInput in) throws IOException {
     counter = in.readInt();
     timestamp = in.readLong();
   }
       
   public void write(PreparedStatement statement) throws SQLException {
     statement.setInt(1, counter);
     statement.setLong(2, timestamp);
   }
       
   public void readFields(ResultSet resultSet) throws SQLException {
     counter = resultSet.getInt(1);
     timestamp = resultSet.getLong(2);
   } 
}

2.2 读取 DB

在 mysql 创建表 itheima_goods 并加载数据到表中。要求使用 MapReduce 程序将表中的数据导出存放在指定的文件下。

因为涉及到 Java 操作 mysql,因此需要在 pom.xml 中额外添加 mysql-jdbc 驱动。

<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>8.0.32</version>
</dependency>

a. GoodsBean

定义 GoodsBean 的实体类,用于封装插入表中的数据(对象属性跟表的字段一一对应即可)。并且需要实现序列化机制 Writable。此外,从数据库读取/写入数据库的对象应实现 DBWritable。DBWritable 与 Writable 相似,区别在于 write(PreparedStatement) 方法采用 PreparedStatement,而 readFields(ResultSet) 采用 ResultSet。

public class GoodsBean implements Writable, DBWritable {
    /**
     * 商品ID
     */
    private long goodsId;
    /**
     * 商品编号
     */
    private String goodsSn;
    /**
     * 商品名称
     */
    private String goodsName;
    /**
     * 市场价
     */
    private double marketPrice;
    /**
     * 门店价
     */
    private double shopPrice;
    /**
     * 总销售量
     */
    private long saleNum;

    // ... 省略 get/set ...
  
    public void fillFields(long goodsId, String goodsSn, String goodsName, double marketPrice, double shopPrice, long saleNum) {
        this.goodsId = goodsId;
        this.goodsSn = goodsSn;
        this.goodsName = goodsName;
        this.marketPrice = marketPrice;
        this.shopPrice = shopPrice;
        this.saleNum = saleNum;
    }

    @Override
    public String toString() {
        return goodsId + "\t" + goodsSn + "\t" + goodsName + "\t" + marketPrice + "\t" + shopPrice + "\t" + saleNum;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(goodsId);
        out.writeUTF(goodsSn);
        out.writeUTF(goodsName);
        out.writeDouble(marketPrice);
        out.writeDouble(shopPrice);
        out.writeLong(saleNum);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        goodsId = in.readLong();
        goodsSn = in.readUTF();
        goodsName = in.readUTF();
        marketPrice = in.readDouble();
        shopPrice = in.readDouble();
        saleNum = in.readLong();
    }

    @Override
    public void write(PreparedStatement statement) throws SQLException {
        statement.setLong(1, goodsId);
        statement.setString(2, goodsSn);
        statement.setString(3, goodsName);
        statement.setDouble(4, marketPrice);
        statement.setDouble(5, shopPrice);
        statement.setLong(6, saleNum);
    }

    @Override
    public void readFields(ResultSet resultSet) throws SQLException {
        goodsId = resultSet.getLong(1);
        goodsSn = resultSet.getString(2);
        goodsName = resultSet.getString(3);
        marketPrice = resultSet.getDouble(4);
        shopPrice = resultSet.getDouble(5);
        saleNum = resultSet.getLong(5);
    }
}

b. ReadDBMapper

public class ReadDBMapper extends Mapper<LongWritable, GoodsBean, LongWritable, Text> {

    Text outValue = new Text();

    @Override
    protected void map(LongWritable key, GoodsBean value, Mapper<LongWritable, GoodsBean, LongWritable, Text>.Context context) throws IOException, InterruptedException {
        outValue.set(value.toString());
        context.write(key, outValue);
    }
}

c. ReadDBDriver

public class ReadDBDriver extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {
        Configuration config = new Configuration();
        DBConfiguration.configureDB(config, 
                                    "com.mysql.cj.jdbc.Driver", 
                                    "jdbc:mysql://192.168.6.160:3306/stock_db", 
                                    "root", 
                                    "123456");

        Job job = Job.getInstance(config, ReadDBDriver.class.getSimpleName());
        job.setJarByClass(ReadDBDriver.class);
        job.setMapperClass(ReadDBMapper.class);
        // MR 程序可以没有 reduce 阶段
        job.setNumReduceTasks(0);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        FileOutputFormat.setOutputPath(job, new Path("/Users/xxx/Downloads/mysqlout"));
        job.setInputFormatClass(DBInputFormat.class);
        DBInputFormat.setInput(job, GoodsBean.class,
                "SELECT goodsId,goodsSn,goodsName,marketPrice,shopPrice,saleNum FROM itheima_goods",
                "SELECT count(goodsId) FROM itheima_goods");
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        int status = ToolRunner.run(conf, new ReadDBDriver(), args);
        System.exit(status);
    }
}

2.3 写入 DB

a. WriteDBMapper

public class WriteDBMapper extends Mapper<LongWritable, Text, NullWritable, GoodsBean> {
    private NullWritable outK = NullWritable.get();
    private GoodsBean outV = new GoodsBean();

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, GoodsBean>.Context context) throws IOException, InterruptedException {
        // 获取两个计数器,用于统计合法/非法数据条数
        Counter sc = context.getCounter("mr2sql", "parseSuccessCounter");
        Counter fc = context.getCounter("mr2sql", "parseFailedCounter");

        String[] fields = value.toString().split("\\s+");

        if (fields.length > 6) {
            // 合法数据 —— 提取字段封装成对象
            outV.fillFields(Long.parseLong(fields[1]), fields[2], fields[3], Double.parseDouble(fields[4]),
                                    Double.parseDouble(fields[5]), Long.parseLong(fields[6]));
            context.write(outK, outV);
            sc.increment(1);
        } else {
            // 非法数据 —— 计入统计
            fc.increment(1);
        }
    }
}

b. WriteDBReducer

在使用 DBOutputFormat 时,要求程序最终输出的 key 必须是继承自 DBWritable 的类型,value 则没有具体要求。

public class WriteDBReducer extends Reducer<NullWritable, GoodsBean, GoodsBean, NullWritable> {

    @Override
    protected void reduce(NullWritable key, Iterable<GoodsBean> values, Reducer<NullWritable, GoodsBean, GoodsBean, NullWritable>.Context context) throws IOException, InterruptedException {
        for (GoodsBean value : values) {
            context.write(value, key);
        }
    }
}

c. WriteDBDriver

public class WriteDBDriver extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Configuration config = new Configuration();

        DBConfiguration.configureDB(config, 
                                    "com.mysql.cj.jdbc.Driver", 
                                    "jdbc:mysql://192.168.6.160:3306/stock_db?useUnicode=true&characterEncoding=utf-8", 
                                    "root", 
                                    "123456");

        Job job = Job.getInstance(config, WriteDBDriver.class.getSimpleName());

        job.setJarByClass(WriteDBDriver.class);
      
        job.setMapperClass(WriteDBMapper.class);        
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(GoodsBean.class);

        job.setReducerClass(WriteDBReducer.class);
        FileInputFormat.setInputPaths(job, new Path("/Users/xxx/Downloads/mysqlout"));
        job.setOutputFormatClass(DBOutputFormat.class);
        DBOutputFormat.setOutput(job, "itheima_goods_bak", "goodsId", "goodsSn", "goodsName", "marketPrice", "shopPrice", "saleNum");
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        int status = ToolRunner.run(conf, new WriteDBDriver(), args);
        System.exit(status);
    }

}

3. Join 关联

3.1 概述

在实际的数据库应用中,我们经常需要从多个数据表中读取数据,这时我们就可以使用 SQL 语句中的连接(JOIN),在两个或多个数据表中查询数据。

在使用 MapReduce 框架进行数据处理的过程中,也会涉及到从多个数据集读取数据,进行 join 关联的操作,只不过此时需要使用 Java 代码并根据 MapReduce 的编程规范进行业务的实现。

但是由于 MapReduce 的分布式设计理念的特殊性,因此对于 MapReduce 实现 join 操作具备了一定的特殊性。特殊主要体现在:究竟在 MapReduce 中的什么阶段进行数据集的关联操作,map 阶段还是 reduce 阶段?之间的区别又是什么?

整个 MapReduce 的 join 分为两类:Map-side Join、Reduce-side Join。

3.2 Reduce-side Join

a. 说明

顾名思义,在 reduce 阶段执行 join 关联操作。这也是最容易想到和实现的 join 方式。因为通过 shuffle 过程就可以将相关的数据分到相同的分组中,这将为后面的 join 操作提供了便捷。

基本上,Reduce Side Join 大致步骤如下:

  • Mapper 分别读取不同的数据集;
  • Mapper 的输出中,通常以 join 的字段作为输出的 key;
  • 不同数据集的数据经过 shuffle,key 一样的会被分到同一分组处理;
  • 在 reducer 中根据业务需求把数据进行关联整合汇总,最终输出。

【弊端】reduce 端 join 最大的问题是整个 join 的工作是在 reduce 阶段完成的,但是通常情况下 MapReduce 中 reduce 的并行度是极小的(默认是 1 个),这就使得所有的数据都挤压到 reduce 阶段处理,压力颇大。虽然可以设置 reduce 的并行度,但是又会导致最终结果被分散到多个不同文件中。并且在数据从 Mapper 到 Reducer 的过程中,shuffle 阶段十分繁琐,数据集大时成本极高。

b. 案例

有两份结构化的文件:itheima_goods(商品信息)、itheima_order_goods(订单信息)。要求使用 MapReduce 统计出每笔订单中对应的具体的商品名称信息。

# itheima_goods
goodsId(商品id) | goodsSn(商品编号) | goodsName(商品名称)
# itheima_order_goods
orderId(订单ID) |  goodsId(商品ID) | payPrice(实际支付价格)

思路分析:

  • 使用 Mapper 处理订单数据和商品数据,输出的时候以 goodsId 作为 key。相同 goodsId 的商品和订单项会到同一个 reducer 的同一个分组,在分组中进行订单和商品信息的关联合并。
  • 在 MapReduce 程序中可以通过 context 获取到当前处理的切片所属的文件名称。根据文件名来判断当前处理的是订单数据还是商品数据,以此来进行不同逻辑的输出。
  • join 处理完之后,最后可以再通过 MapReduce 程序排序功能,将属于同一笔订单的所有商品信息汇聚在一起。

ReduceJoinMapper

public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, Text> {

    public static final String ORDER_PREFIX = "order#";
    public static final String GOODS_PREFIX = "goods#";

    Text outK = new Text();
    Text outV = new Text();
  
    String fileName = null;
    StringBuilder builder = new StringBuilder(128);

    @Override
    protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
        // 获取当前处理的切片所属文件名称
        FileSplit split = (FileSplit) context.getInputSplit();
        fileName = split.getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
        // 清空 builder
        builder.setLength(0);
        // 切割处理读取的一行数据
        String[] split = value.toString().split("\\|");
        // 判断当前处理的文件
        if (fileName.contains("order")) {
            outK.set(split[1]);
            builder.append(split[0]).append("\t").append(split[2]);
            outV.set(builder.insert(0, ORDER_PREFIX).toString());
        } else {
            outK.set(split[0]);
            builder.append(split[1]).append("\t").append(split[2]);
            outV.set(builder.insert(0, GOODS_PREFIX).toString());
        }
        // outK:goodsId
        context.write(outK, outV);
    }
}

ReduceJoinReducer

public class ReduceJoinReducer extends Reducer<Text, Text, Text, Text> {

    private List<String> goodsList = new ArrayList<>();
    private List<String> orderList = new ArrayList<>();

    Text outV = new Text();

    @Override
    protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
        // 按 goodsId 分组,一次 reduce 一个商品所关联的所有订单
        for (Text value : values) {
            String valueStr = value.toString();
            if (valueStr.startsWith(ReduceJoinMapper.GOODS_PREFIX)) {
                goodsList.add(valueStr.split("#")[1]);
            } else {
                orderList.add(valueStr.split("#")[1]);
            }
        }

        int orderSize = orderList.size();
        int goodsSize = goodsList.size();

        for (int i = 0; i < orderSize; i++) {
            for (int j = 0; j < goodsSize; j++) {
                outV.set(orderList.get(i) + "\t" + goodsList.get(j));
                context.write(key, outV);
            }
        }
        // 复用
        orderList.clear();
        goodsList.clear();
    }
}

ReduceJoinDriver

public class ReduceJoinDriver {
    public static void main(String[] args) throws Exception {
        // 1. 获取Job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2. 关联本Driver类
        job.setJarByClass(ReduceJoinDriver.class);

        // 3. 关联Mapper和Reducer
        job.setMapperClass(ReduceJoinMapper.class);
        job.setReducerClass(ReduceJoinReducer.class);

        // 4. 设置Map端输出KV类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        // 5. 设置程序最终输出的KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        // 6. 设置程序的输入输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7. 提交Job
        boolean flag = job.waitForCompletion(true);
        System.exit(flag ? 0 : 1);
    }
}

ReducerJoinSortDemo:对上面 Job 的输出再进行排序

public class ReducerJoinSortDemo {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // 1. 获取Job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2. 关联本Driver类
        job.setJarByClass(ReducerJoinSortDemo.class);

        // 3. 关联Mapper和Reducer
        job.setMapperClass(ReduceJoinSortMapper.class);
        job.setReducerClass(ReduceJoinSortReducer.class);

        // 4. 设置Map端输出KV类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        // 5. 设置程序最终输出的KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 6. 设置程序的输入输出路径(输入路径是上一个MR的输出)
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7. 提交Job
        boolean flag = job.waitForCompletion(true);
        System.exit(flag ? 0 : 1);
    }

    public static class ReduceJoinSortMapper extends Mapper<LongWritable, Text, Text, Text> {

        private Text outK = new Text();
        private Text outV = new Text();


        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, 
                           Text, Text>.Context context) throws IOException, InterruptedException {
            String[] fields = value.toString().split("\t");
            outK.set(fields[1]);
            outV.set(fields[1] + "\t" + fields[0] + "\t" + fields[3] + "\t" + fields[4] + "\t" + fields[2]);
            context.write(outK, outV);
        }
    }

    public static class ReduceJoinSortReducer extends Reducer<Text, Text, Text, NullWritable> {

        private NullWritable outV = NullWritable.get();

        @Override
        protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, 
                              Text, NullWritable>.Context context) throws IOException, InterruptedException {
            for (Text value : values) {
                context.write(value, outV);
            }
        }
    }
}

3.3 分布式缓存

DistributedCache 是 Hadoop 框架提供的一种机制,可以将 Job 指定的文件,在 Job 执行前,先行分发到 Task 执行的机器上,并有相关机制对 cache 文件进行管理(所以必须在 YARN 模式下使用)。

DistributedCache 能够缓存应用程序所需的文件 (包括文本、档案文件、jar 文件等)。

MapReduce 框架在作业所有 Task 执行之前会把指定的分布式缓存文件拷贝到各个 Task 运行的节点上。

使用步骤:

  1. 添加缓存文件,可以使用 MapReduce 的 API 添加需要缓存的文件/目录(可以在路径末尾阶段追加 # + '别名',在 map 阶段可以使用该别名)。

    /* 缓存压缩包文件到 task 运行节点的工作目录 */
    job.addCacheArchive(URI uri);
    /* 缓存普通文件到 task 运行节点的工作目录 */
    job.addCacheFile(URI uri);
    ////////////////////////////////////////////////
    String cache = "hdfs://x.x.x.x:8020/cache/file";
    cache += "#myfile";
    job.addCacheFile(new Path(cache).toUri(), conf);
    
  2. MapReduce 程序中读取缓存文件:在 Mapper 或 Reducer 的 setup() 中,用 BufferedReader 获取分布式缓存中的文件内容。BufferedReader 是带缓冲区的字符流,能够减少访问磁盘的次数,提高文件读取性能;并且可以一次性读取一行字符。

    protected void setup(Context context) throw IOException,InterruptedException {
        // 读取缓存文件中的内容(也可直接根据别名读取)
        BufferReader br = new BufferedReader(new InputStreamReader(new FileInputStream("file"), "utf-8"));
        // ...
    }
    

3.4 Map-side Join

a. 说明

Map-side Join 就是在 map 阶段执行 join 关联操作,并且程序也没有了 reduce 阶段,避免了 shuffle 时候的繁琐。实现的关键是使用 MapReduce 的分布式缓存。

尤其是涉及到一大一小数据集的处理场景时,map 端的 join 将会发挥出得天独厚的优势。

Map-side Join 的大致思路如下:

  1. 首先分析 join 处理的数据集,使用分布式缓存技术将小的数据集进行分布式缓存;
  2. MapReduce 框架在执行的时候会自动将缓存的数据分发到各个 MapTask 运行的机器上;
  3. 程序只运行 Mapper,在 Mapper 初始化的时候从分布式缓存中读取小数据集数据,然后和自己读取的大数据集进行 join 关联,输出最终的结果;
  4. 整个 join 的过程没有 shuffle,没有 Reducer。

【优势】减少 shuffle 时候的数据传输成本,并且 Mapper 的并行度可以根据输入数据量自动调整,充分发挥分布式计算的优势。

b. 案例

需求同上。

思路分析:

  • Map-side Join 是指在 Mapper 任务中加载特定数据集,此案例中把商品数据进行分布式缓存,使用 Mapper 读取订单数据和缓存的商品数据进行连接。
  • 通常为了方便使用,会在 Mapper 的初始化方法 setup() 中读取分布式缓存文件加载的程序的内存中,便于后续 Mapper 处理数据。
  • 因为在 Mapper 阶段已经完成了数据的关联操作,因此程序不需要进行 reduce。需要在 Job 中将 ReduceTask 的个数设置为 0,也就是 Mapper 的输出就是程序最终的输出。

MapJoinMapper

public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    Map<String, String> goodsMap = new HashMap<>();
    Text outK = new Text();

    @Override
    protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
        // 读取分布式缓存文件(关于路径,直接指定文件名称即可)
        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("itheima_goods.txt"), "utf-8"));
        String line = null;
        while ((line = br.readLine()) != null) {
            String[] fields = line.split("\\|");
            // 把读取的缓存内容添加到map中
            goodsMap.put(fields[0], fields[1] + "\t" + fields[2]);
        }
        IOUtils.closeStream(br);
    }

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
        String[] fields = value.toString().split("\\|");
        String goodsInfo = goodsMap.get(fields[1]);
        outK.set(value + "\t" + goodsInfo);
        context.write(outK, NullWritable.get());
    }
}

MapJoinDriver

public class MapJoinDriver {
    public static void main(String[] args) throws Exception {
        // 1. 获取Job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2. 关联本Driver类
        job.setJarByClass(MapJoinDriver.class);

        // 3. 关联Mapper和Reducer
        job.setMapperClass(MapJoinMapper.class);
        job.setNumReduceTasks(0);

        // 4. 设置Map端输出KV类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        // 5. 设置程序最终输出的KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 6. 添加分布式缓存文件
        job.addCacheFile(new URI("/data/join/cache/itheima_goods.txt"));

        // 7. 设置输入/输出目录
        FileInputFormat.setInputPaths(job, new Path("/data/join/input"));
        FileOutputFormat.setOutputPath(job, new Path("/data/join/output"));

        // 8. 提交Job
        boolean flag = job.waitForCompletion(true);
        System.exit(flag ? 0 : 1);
    }
}

分布式缓存的使用必须使用 MapReduce 的 YARN 模式运行!

将 jar 包上传集群执行命令:hadoop jar xxx.jar,注意保证 YARN 集群提前启动成功~

4. MR 工作流

使用 Hadoop 里面的 MapReduce 来处理海量数据是非常简单方便的,但有时候我们的应用程序,往往需要多个 MR 作业来计算结果。

比如说一个最简单的使用 MR 提取海量搜索日志的 TopN 的问题,注意,这里面其实涉及了两个 MR 作业,第一个是词频统计,第两个是排序求 TopN,这显然是需要两个 MapReduce 作业来完成的。其他的比如一些数据挖掘类的作业,常常需要迭代组合好几个作业才能完成,这类作业类似于 DAG 类的任务,各个作业之间是具有先后,或相互依赖的关系,比如说,这一个作业的输入,依赖上一个作业的输出等等。

具有依赖式的作业提交后,Hadoop 会根据依赖的关系,先后执行的 Job 任务,每个任务的运行都是独立的。

在 Hadoop 里实际上提供了 JobControl 类来组合一个具有依赖关系的作业,在新版的 API 里,又新增了 ControlledJob 类,细化了任务的分配,通过这两个类,我们就可以轻松的完成类似 DAG 作业的模式,这样我们就可以通过一个提交来完成原来需要提交 2 次的任务,大大简化了任务的繁琐度。

  • JobControl
    • 工作流 Job 控制器,一次可以提交、管理多个 Job;
    • JobControl 类实现了线程 Runnable 接口,需要实例化一个线程来让它启动。
  • ControlledJob
    • 可以将普通作业包装成受控作业。并且支持设置依赖关系;
    • Hadoop 会根据依赖的关系,先后执行 Job 任务,每个任务的运行都是独立的。

【案例】针对 MR Reduce-side Join 方式处理订单和商品数据之间的关联,需要进行两步程序处理,首先把两个数据集进行 join 操作,然后针对 join 的结果进行排序,保证同一笔订单的商品数据聚集在一起。

两个程序带有依赖关系,可以使用工作流进行任务的设定,依赖的绑定,一起提交执行。

public class MRJobFlow {

    public static void main(String[] args) throws IOException {
        Configuration conf = new Configuration();

        // 1. 将普通作业包装成受控作业
        ControlledJob controlledJob1 = new ControlledJob(conf);
        controlledJob1.setJob(getJob1(conf));
        ControlledJob controlledJob2 = new ControlledJob(conf);
        controlledJob2.setJob(getJob2(conf));

        // 2. 设置作业间的依赖关系
        controlledJob2.addDependingJob(controlledJob1);

        // 3. 创建主控制器,控制job1、job2一起提交
        JobControl mainCtrl = new JobControl("myctrl");
        mainCtrl.addJob(controlledJob1);
        mainCtrl.addJob(controlledJob2);

        // 4. 另起线程执行 JobControl
        Thread t = new Thread(mainCtrl);
        t.start();

        // 5. 监测工作流完成情况
        while (true) {
            if (mainCtrl.allFinished()) {
                System.out.println("SUCCESS=>" + mainCtrl.getSuccessfulJobList());
                mainCtrl.stop();
                break;
            }
        }
    }

    private static Job getJob1(Configuration conf) throws IOException {
        // 1. 创建Job对象
        Job job = Job.getInstance(conf);

        // 2. 关联本Driver类
        job.setJarByClass(ReduceJoinDriver.class);

        // 3. 关联Mapper和Reducer
        job.setMapperClass(ReduceJoinMapper.class);
        job.setReducerClass(ReduceJoinReducer.class);

        // 4. 设置Map端输出KV类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        // 5. 设置程序最终输出的KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        // 6. 设置程序的输入输出路径
        FileInputFormat.setInputPaths(job, new Path("/.../reduce-join/input"));
        FileOutputFormat.setOutputPath(job, new Path("/.../reduce-join/output"));

        return job;
    }

    private static Job getJob2(Configuration conf) throws IOException {
        // 1. 创建Job对象
        Job job = Job.getInstance(conf);

        // 2. 关联本Driver类
        job.setJarByClass(ReducerJoinSort.class);

        // 3. 关联Mapper和Reducer
        job.setMapperClass(ReducerJoinSort.ReduceJoinSortMapper.class);
        job.setReducerClass(ReducerJoinSort.ReduceJoinSortReducer.class);

        // 4. 设置Map端输出KV类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        // 5. 设置程序最终输出的KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 6. 设置程序的输入输出路径
        FileInputFormat.setInputPaths(job, new Path("/.../reduce-join/output"));
        FileOutputFormat.setOutputPath(job, new Path("/.../reduce-join/sort"));
        return job;
    }

}