09-MapReduce(1)

发布时间 2023-07-03 17:47:00作者: tree6x7

1. MapReduce 概述

MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于 Hadoop 的数据分析应用”的核心框架。

MapReduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 Hadoop 集群上。

1.1 引入

MapReduce 的思想核心是“先分再合,分而治之”。

所谓“分而治之”就是把一个复杂的问题,按照一定的“分解”方法分为等价的规模较小的若干部分,然后逐个解决,分别找出各部分的结果,然后把各部分的结果组成整个问题的最终结果。

  • Map 负责“拆分”:即把复杂的任务分解为若干个“简单的子任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。
  • Reduce 负责“合并”:即对 Map 阶段的结果进行全局汇总。

“分布式计算”的概念:

  • 分布式计算是一种计算方法,和集中式计算是相对的。
  • 随着计算技术的发展,有些应用需要非常巨大的计算能力才能完成,如果采用集中式计算,需要耗费相当长的时间来完成。
  • 分布式计算将该应用分解成许多小的部分,分配给多台计算机进行处理。这样可以节约整体计算时间,大大提高计算效率。

模拟实现分布式计算:

  • 大数据场景下,数据通常采用分布式存储。如果采用传统的集中式计算,首先需要下载分布式存储的数据,数据传输性能效率如何保证?
  • 其次,数据量大的情况下,按部就班的串行读取数据集中计算,耗时如何?
  • 如果采用分布式计算,是移动数据的代价高,还是移动计算程序的代价高?

1.2 设计构思

(1)如何应对大数据处理场景?

对相互间不具有计算依赖关系的大数据计算任务,实现并行最自然的办法就是采取 MapReduce 分而治之的策略。

首先 Map 阶段进行拆分,把大数据拆分成若干份小数据,多个程序同时并行计算产生中间结果;然后是 Reduce 聚合阶段,通过程序对并行的结果进行最终的汇总计算,得出最终的结果。

不可拆分的计算任务或相互间有依赖关系的数据无法进行并行计算!

(2)构建抽象编程模型

MapReduce 借鉴了函数式语言中的思想,用 Map 和 Reduce 两个函数提供了高层的并行编程抽象模型。

  • map:对一组数据元素进行某种重复式的处理;
  • reduce:对 Map 的中间结果进行某种进一步的结果整理。

MapReduce 中定义了如下的 Map 和 Reduce 两个抽象的编程接口,由用户去编程实现:

 map  	 (k1; v1)  → (k2; v2)
reduce  (k2; [v2]) → (k3; v3)

通过以上两个编程接口,大家可以看出 MapReduce 处理的数据类型是 <key,value> 键值对。

(3)统一架构、隐藏底层细节

如何提供统一的计算框架?

如果没有统一封装底层细节,那么程序员则需要考虑诸如数据存储、划分、分发、结果收集、错误恢复等诸多细节;为此,MapReduce 设计并提供了统一的计算框架,为程序员隐藏了绝大多数系统层面的处理细节。

MapReduce 最大的亮点在于通过抽象模型和计算框架把“需要做什么(what need to do)”与“具体怎么做(how to do)”分开了,为程序员提供一个抽象和高层的编程接口和框架。程序员仅需要关心其应用层的具体计算问题,仅需编写少量的处理应用本身计算问题的业务程序代码。

至于如何具体完成这个并行计算任务所相关的诸多系统层细节被隐藏起来,交给计算框架去处理:从分布代码的执行,到大到数千小到单个节点集群的自动调度使用。

MapReduce 局限性

MapReduce 虽然有很多的优势,也有相对得局限性,局限性不代表不能做,而是在有些场景下实现的效果比较差,并不适合用 MapReduce 来处理,主要表现在以下结果方面:

  • 实时计算性能差:MapReduce 主要应用于离线作业,无法作到秒级或者是亚秒级得数据响应。
  • 不能进行流式计算:流式计算特点是数据是源源不断得计算,并且数据是动态的;而 MapReduce 作为一个离线计算框架,主要是针对静态数据集,数据是不能动态变化的。

1.4 编程设计

a. 架构体系

一个完整的 MapReduce 程序在分布式运行时有 3 类实例进程:

  1. MRAppMaster:负责整个程序的过程调度及状态协调
  2. MapTask:负责 map 阶段的整个数据处理流程
  3. ReduceTask:负责 reduce 阶段的整个数据处理流程

MapReduce 编程模型只能包含一个 Map 阶段和一个 Reduce 阶段。如果用户的业务逻辑非常复杂,那就只能多个 MapReduce 程序串行运行。

b. 编程规范

用户编写的程序代码分成三个部分:Mapper、Reducer、Driver(客户端提交作业驱动程序)。

  • 用户自定义的 Mapper 和 Reducer 都要继承各自的父类。
    • Mapper 中的业务逻辑写在 map()
    • Reducer 的业务逻辑写在 reduce()
  • 整个程序需要一个 Driver 来进行提交,提交的是一个描述了各种必要信息的 Job 对象。

整个 MapReduce 程序中,数据都是以「kv 键值对」的形式流转的。在实际编程解决各种业务问题中,需要考虑每个阶段的输入输出 kv 分别是什么。

MapReduce 内置了很多默认属性,比如排序、分组等,都和数据的 k 有关,所以说 kv 的类型数据确定及其重要的。

c. 执行流程

虽然 MapReduce 从外表看起来就两个阶段 Map 和 Reduce,但是内部却包含了很多默认组件和默认的行为。

  • 组件
    • 读取数据组件 InputFormat
    • 输出数据组件 OutputFormat
  • 行为
    • 排序
    • 分区
    • 分组

整个 MapReduce 工作流程可以分为 3 个阶段:map、shuffle、reduce。

(1)map 阶段

负责把从数据源读取来到数据进行处理,默认情况下读取数据返回的是 kv 键值对类型,经过自定义 map() 处理之后,输出的也应该是 kv 键值对类型。

(2)shuffle 阶段

map 输出的数据会经过分区、排序、分组等自带动作进行重组,相当于洗牌的逆过程。这是 MapReduce 的核心所在。

  • 默认分区规则:key 相同的分在同一个分区,同一个分区被同一个 reduce 处理;
  • 默认排序规则:根据 key 字典序排序;
  • 默认分组规则:key 相同的分为一组,一组调用 reduce 处理一次。

(3)reduce 阶段

负责针对 shuffle 好的数据进行聚合处理。输出的结果也应该是 kv 键值对。

2. Writable 序列化机制

2.1 序列化机制

  • 序列化(Serialization)是将「结构化对象」转换成「字节流」以便于进行网络传输或写入持久存储的过程;
  • 反序列化(Deserialization)是将「字节流」转换为「一系列结构化对象」的过程,重新创建该对象。

a. Java 序列化

Java 中一切都是对象。开发程序中,经常会涉及到下述场景:

  • 跨进程、跨网络传递对象
  • 将对象数据持久化存储

这就需要有一种可以在两端传输数据的协议。Java 序列化机制就是为了解决这个问题而产生。

Java 对象序列化的机制,把对象表示成一个二进制的字节数组,里面包含了对象的数据、对象的类型信息、对象内部的数据的类型信息等等。通过保存或则转移这些二进制数组达到持久化、传递的目的。

要实现序列化,需要实现 java.io.Serializable 接口。反序列化是和序列化相反的过程,就是把二进制数组转化为对象的过程。

b. Hadoop 序列化

Hadoop 的序列化没有采用 Java 的序列化机制,而是实现了自己的序列化机制 Writable。

Java 的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息、Header、继承体系等),不便于在网络中高效传输。所以,Hadoop 自己开发了一套序列化机制(Writable)。

但在 Hadoop 的序列化机制中,用户可以复用对象,这样就减少了 Java 对象的分配和回收,提高了应用效率

Hadoop 通过 Writable 接口实现的序列化机制,接口提供两个方法 write()readFields()

/**
 * A serializable object which implements a simple, efficient, serialization 
 * protocol, based on {@link DataInput} and {@link DataOutput}.
 *
 * Any key or value type in the Hadoop Map-Reduce framework implements this interface.
 * 
 * Implementations typically implement a static read(DataInput) method
 * which constructs a new instance, calls {@link #readFields(DataInput)} 
 * and returns the instance.
 * 
 * Example:
 *     public class MyWritable implements Writable {
 *       // Some data
 *       private int counter;
 *       private long timestamp;
 *
 *       MyWritable() { } // Default constructor to allow (de)serialization
 *
 *       @Override
 *       public void write(DataOutput out) throws IOException {
 *         out.writeInt(counter);
 *         out.writeLong(timestamp);
 *       }
 *
 *       @Override
 *       public void readFields(DataInput in) throws IOException {
 *         counter = in.readInt();
 *         timestamp = in.readLong();
 *       }
 *
 *       public static MyWritable read(DataInput in) throws IOException {
 *         MyWritable w = new MyWritable();
 *         w.readFields(in);
 *         return w;
 *       }
 *     }
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Writable {
  /** 
   * Serialize the fields of this object to out.
   * 
   * @param out DataOuput to serialize this object into.
   * @throws IOException
   */
  void write(DataOutput out) throws IOException;

  /** 
   * Deserialize the fields of this object from in.  
   * 
   * For efficiency, implementations should attempt to re-use storage in the 
   * existing object where possible.
   * 
   * @param in DataInput to deseriablize this object from.
   * @throws IOException
   */
  void readFields(DataInput in) throws IOException;
}

Hadoop 没有提供对象比较功能,所以和 Java#Comparable 接口合并,提供一个接口 WritableComparable,该接口可用于用户自定义对象的比较规则。

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface WritableComparable<T> extends Writable, Comparable<T> {}

Hadoop 提供了如下内容的数据类型,这些数据类型都实现了 WritableComparable 接口,以便用这些类型定义的数据可以被序列化进行网络传输和文件存储,以及进行大小比较。

Hadoop 数据类型 Java 数据类型
BooleanWritable boolean
ByteWritable byte
IntWritable int
FloatWritable float
LongWritable long
DoubleWritable double
Text String
MapWritable Map
ArrayWritable array
NullWritable null

2.2 自定义类序列化

在 Hadoop 框架内部传递一个 bean 对象,那么该对象就需要实现序列化接口。具体实现 bean 对象序列化步骤如下 7 步。

(1)必须实现 Writable 接口

(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造;

public FlowBean() {
    super();
}

(3)重写序列化方法

@Override
public void write(DataOutput out) throws IOException {
    out.writeLong(upFlow);
    out.writeLong(downFlow);
    out.writeLong(sumFlow);
}

(4)重写反序列化方法

@Override
public void readFields(DataInput in) throws IOException {
    upFlow = in.readLong();
    downFlow = in.readLong();
    sumFlow = in.readLong();
}

(5)注意反序列化的顺序和序列化的顺序完全一致;

(6)要想把结果显示在文件中,需要重写 toString(),可用“\t”分开,方便后续用;

(7)如果需要将自定义的 bean 放在 key 中传输,则还需要实现 Comparable 接口,因为 MapReduce 框架中的「shuffle 过程」要求对 key 必须能排序。

@Override
public int compareTo(FlowBean o) {
    return this.sumFlow > o.getSumFlow() ? -1 : 1; // 倒序排列,从大到小
}

2.3 序列化案例

先看 wordCount 案例再看这个案例。

a. 需求分析

需求:统计每一个手机号耗费的总上行流量、总下行流量、总流量。

# 输入数据格式
id		手机号码		网络ip    域名(可能为空)		上行流量		下行流量		网络状态码
# 输出数据格式
手机号码		上行流量		下行流量		总流量

map 阶段

  1. 读取一行数据,按字符切分字段;
  2. 抽取手机号、上行流量、下行流量;
  3. 以手机号为 key,bean 对象为 value 输出,即 context.write(手机号, bean)
  4. bean 对象要想能够传输,必须实现序列化接口。

reduce 阶段

  1. 累加上行流量和下行流量得到总流量

b. 代码实现

(1)FlowBean

/**
 * 1. 实现 Writable 接口:重写序列化/反序列化方法
 * 2. 重写空参构造
 * 3. 重写toString()
 */
public class FlowBean implements Writable, Serializable {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    public FlowBean() {}

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }
  
    // 忽略 upFlow、downFlow 的 get/set

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }

    @Override
    public String toString() {
        return String.format("%d\t%d\t%d", upFlow, downFlow, sumFlow);
    }
}

(2)FlowMapper

public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
    private Text outK = new Text();
    private FlowBean outV = new FlowBean();

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
        // 1. 获取一行
        String line = value.toString();
        // 2. 切割 \t
        String[] split = line.split("\t");
        // 3. 抓取需要的数据 (id  手机号码  网络ip  域名(可能为空)  上行流量[-3]  下行流量[-2]  网络状态码[-1])
        String phone = split[1];
        String upFlow = split[split.length - 3];
        String downFlow = split[split.length - 2];
        outK.set(phone);
        outV.setUpFlow(Long.parseLong(upFlow));
        outV.setDownFlow(Long.parseLong(downFlow));
        outV.setSumFlow();
        // 4. 写出
        context.write(outK, outV);
    }
}

(3)FlowReducer

public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

    private FlowBean outV = new FlowBean();

    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {
        long totalUp = 0, totalDown = 0;
        for (FlowBean value : values) {
            totalUp += value.getUpFlow();
            totalDown += value.getDownFlow();
        }
        outV.setUpFlow(totalUp);
        outV.setDownFlow(totalDown);
        outV.setSumFlow();
        context.write(key, outV);
    }
}

(4)FlowDriver

public class FlowDriver {
    public static void main(String[] args) throws Exception {
        // 1. 获取Job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2. 关联本Driver类
        job.setJarByClass(FlowDriver.class);

        // 3. 关联Mapper和Reducer
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        // 4. 设置Map端输出KV类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 5. 设置程序最终输出的KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 6. 设置程序的输入输出路径
        FileInputFormat.setInputPaths(job, new Path("/.../inputflow"));
        FileOutputFormat.setOutputPath(job, new Path("/.../flowoutput"));

        // 7. 提交Job
        boolean flag = job.waitForCompletion(true);
        System.exit(flag ? 0 : 1);
    }
}

3. WordCount 案例

3.1 需求分析

WordCount 中文叫做单词统计、词频统计,指的是统计指定文件中,每个单词出现的总次数。

# 输入数据 1.txt
hello hadoop hello hello
hadoop allen hadoop
...

# 输出结果
hello 3
hadoop 3
allen 1

虽然 WordCount 业务及其简单,但是希望能够通过案例感受背后 MapReduce 的执行流程和默认的行为机制,这才是关键。

3.2 编程思路

  • map 阶段:把输入的数据经过切割,全部标记 1。因此输出就是 <单词,1>
  • shuffle 阶段:经过默认的排序分区分组,key 相同的单词会作为一组数据构成新的 kv 对;
  • reduce 阶段:处理 shuffle 完的一组数据,该组数据就是该单词所有的键值对。对所有的 1 进行累加求和,就是单词的总次数。

3.3 代码实现

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>io.tree6x7</groupId>
    <artifactId>WordCountDemo</artifactId>
    <version>1.0</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.1.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.1.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.1.3</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>io.tree6x7.mapreduce.wordcount.WordCountDriver1</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

a. WordCountMapper

package io.tree6x7.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author tree6x7
 * @description Map
 * @createTime 2023/4/5
 * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 * - KEYIN      map阶段输入的key类型       LongWritable(每一行起始位置的偏移量)
 * - VALUEIN    map阶段输入的value类型     Text(一行内容)
 * - KEYOUT     map阶段输出的key类型       Text(单词)
 * - VALUEOUT   map阶段输出的value类型     IntWritable(词频#1)
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    // 复用
    private final static IntWritable ONE = new IntWritable(1);
    private Text outKey = new Text();

    /**
     * Called once for each key/value pair in the input split.
     */
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        // 1. String API 丰富
        String line = value.toString();
        // 2. 切割
        String[] words = line.split("\\s+");
        // 3. 遍历输出
        for (String word : words) {
            outKey.set(word);
            context.write(outKey, ONE);
        }
    }
}

b. WordCountReducer

package io.tree6x7.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author tree6x7
 * @description reduce
 * @createTime 2023/4/5
 * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 * - KEYIN      reduce阶段输入的key类型       Text(单词)
 * - VALUEIN    reduce阶段输入的value类型     IntWritable(词频#1)
 * - KEYOUT     reduce阶段输出的key类型       Text(单词)
 * - VALUEOUT   reduce阶段输出的value类型     IntWritable(总词频)
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private final IntWritable outValue = new IntWritable();

    /**
     * 1. 排序
     * 默认规则:字典序排序
     * 2. 分组
     * 默认规则:key 相同的分为一组
     * 3. 分组之后,同一组的数据组成一个新的 kv,调用一次 reduce()。reduce() 基于分组调用的,一个分组调用一次。
     * ~ 新key    该组共同的 key
     * ~ 新value  该组所有的 value 组成的一个迭代器
     * ~ e.g. <hadoop,1>,<hadoop,1> => <hadoop, Iterable{1,1}>
     *        <ljq,1><ljq,1><ljq,1> => <ljq, Iterable{1,1,1}>
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        int cnt = 0;
        for (IntWritable value : values) {
            cnt += value.get();
        }
        outValue.set(cnt);
        context.write(key, outValue);
    }
}

c. WordCountDriver1

package io.tree6x7.mapreduce.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @author tree6x7
 * @description 驱动类,构造Job实例:指定各种组件属性、提交作业。
 * @createTime 2023/4/5
 */
public class WordCountDriver1 {
    public static void main(String[] args) throws Exception {
        Configuration config = new Configuration();
        // 0. 运行模式(默认local)
        config.set("mapreduce.framework.name", "local");
        // 1. 创建Job实例
        Job job = Job.getInstance(config, WordCountDriver1.class.getSimpleName());
        // 2. 设置MR程序运行的主类
        job.setJarByClass(WordCountDriver1.class);
        // 3. 设置MR程序的Mapper类
        job.setMapperClass(WordCountMapper.class);
        // 4. 设置MR程序的Reducer类
        job.setReducerClass(WordCountReducer.class);
        // 5. 指定Map阶段输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 6. 指定Reduce阶段输出的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        // 7. 配置本次Job的输入/输出数据路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 8. 提交本次作业
        // true => 'Monitor a job and print status in real-time as progress is made and tasks fail.'
        boolean retFlag = job.waitForCompletion(true);
        // 9. 退出程序和 Job 结果绑定
        System.exit(retFlag ? 0 : 1);
    }
}

d. WordCountDriver2

package io.tree6x7.mapreduce.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author tree6x7
 * @description 使用 ToolRunner 提交 MR 作业
 * @createTime 2023/4/5
 */
public class WordCountDriver2 extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        // 1. 创建配置对象
        Configuration config = new Configuration();
        config.set("mapreduce.framework.name", "local");
        // 2. 使用工具类 ToolRunner 提交作业
        int status = ToolRunner.run(config, new WordCountDriver2(), args);
        // 3. 退出客户端
        System.exit(status);
    }

    @Override
    public int run(String[] args) throws Exception {
        // 1. 创建Job实例
        Job job = Job.getInstance(getConf(), WordCountDriver2.class.getSimpleName());
        // 2. 设置MR程序运行的主类
        job.setJarByClass(WordCountDriver2.class);
        // 3. 设置MR程序的Mapper类
        job.setMapperClass(WordCountMapper.class);
        // 4. 设置MR程序的Reducer类
        job.setReducerClass(WordCountReducer.class);
        // 5. 指定Map阶段输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 6. 指定Reduce阶段输出的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        // 7. 配置本次Job的输入/输出数据路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }
}

4. 运行模式

所谓的运行模式是指:MapReduce 程序是单机运行还是分布式运行?MapReduce 程序需要的运算资源是 YARN 分配还是本机系统自己分配?

运行在何种模式取决于 mapred-default.xml 中的参数 mapreduce.framework.name

  • yarn:YARN 集群模式
  • local:本地模式(默认)

如果代码中、运行的环境中有配置,会默认覆盖 default 配置。

4.1 集群模式运行

MR 程序提交给 YARN 集群,分发到多个节点上分布式并发执行。处理的数据和输出结果通常位于 HDFS。

需要配置参数(如果 Hadoop 集群已配置,可以不用写):

mapreduce.framework.name=yarn
yarn.resourcemanager.hostname=hadoop103

提交集群的实现步骤:

  1. 确保 Hadoop 集群启动(HDFS 集群、YARN 集群)
  2. 将程序打成 jar 包并上传到 Hadoop 集群的任意一个节点
  3. 执行启动命令
    hadoop jar <jar包名称> <程序启动类> <参数>
    yarn jar <jar包名称> <程序启动类> <参数>
    

集群模式运行 WordCountDemo 过程:

[liujiaqi@hadoop102 ~]$ hadoop jar WordCountDemo-1.0.jar /data/wordcount/input /data/wordcount/output
2023-04-05 21:46:59,089 INFO client.RMProxy: Connecting to ResourceManager at hadoop103/192.168.6.103:8032
# 这个WARN就是在提示使用Tool方式跑Job
2023-04-05 21:46:59,292 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2023-04-05 21:46:59,297 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/liujiaqi/.staging/job_1680701460208_0001
2023-04-05 21:46:59,336 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2023-04-05 21:46:59,398 INFO input.FileInputFormat: Total input files to process : 1
2023-04-05 21:46:59,409 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2023-04-05 21:46:59,423 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2023-04-05 21:46:59,432 INFO mapreduce.JobSubmitter: number of splits:1
2023-04-05 21:46:59,505 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2023-04-05 21:46:59,530 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1680701460208_0001
2023-04-05 21:46:59,530 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-04-05 21:46:59,598 INFO conf.Configuration: resource-types.xml not found
2023-04-05 21:46:59,598 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
2023-04-05 21:46:59,686 INFO impl.YarnClientImpl: Submitted application application_1680701460208_0001
2023-04-05 21:46:59,706 INFO mapreduce.Job: The url to track the job: http://hadoop103:8088/proxy/application_1680701460208_0001/
2023-04-05 21:46:59,706 INFO mapreduce.Job: Running job: job_1680701460208_0001
2023-04-05 21:47:03,743 INFO mapreduce.Job: Job job_1680701460208_0001 running in uber mode : false
2023-04-05 21:47:03,744 INFO mapreduce.Job:  map 0% reduce 0%
2023-04-05 21:47:07,793 INFO mapreduce.Job:  map 100% reduce 0%
2023-04-05 21:47:12,859 INFO mapreduce.Job:  map 100% reduce 100%
2023-04-05 21:47:12,877 INFO mapreduce.Job: Job job_1680701460208_0001 completed successfully
2023-04-05 21:47:12,942 INFO mapreduce.Job: Counters: 53
        File System Counters
                FILE: Number of bytes read=5753
                FILE: Number of bytes written=446171
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=3255
                HDFS: Number of bytes written=345
                HDFS: Number of read operations=8
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters 
                Launched map tasks=1
                Launched reduce tasks=1
                Data-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=1023
                Total time spent by all reduces in occupied slots (ms)=2075
                Total time spent by all map tasks (ms)=1023
                Total time spent by all reduce tasks (ms)=2075
                Total vcore-milliseconds taken by all map tasks=1023
                Total vcore-milliseconds taken by all reduce tasks=2075
                Total megabyte-milliseconds taken by all map tasks=1047552
                Total megabyte-milliseconds taken by all reduce tasks=2124800
        Map-Reduce Framework
                Map input records=35
                Map output records=434
                Map output bytes=4879
                Map output materialized bytes=5753
                Input split bytes=113
                Combine input records=0
                Combine output records=0
                Reduce input groups=38
                Reduce shuffle bytes=5753
                Reduce input records=434
                Reduce output records=38
                Spilled Records=868
                Shuffled Maps =1
                Failed Shuffles=0
                Merged Map outputs=1
                GC time elapsed (ms)=49
                CPU time spent (ms)=410
                Physical memory (bytes) snapshot=484417536
                Virtual memory (bytes) snapshot=4851183616
                Total committed heap usage (bytes)=381157376
                Peak Map Physical memory (bytes)=289017856
                Peak Map Virtual memory (bytes)=2416971776
                Peak Reduce Physical memory (bytes)=195399680
                Peak Reduce Virtual memory (bytes)=2434211840
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters 
                Bytes Read=3142
        File Output Format Counters 
                Bytes Written=345

打开 YARN 页面查看 Job 运行情况:http://hadoop103:8088/cluster

HDFS 查看运行结果:http://hadoop102:9870/

4.2 本地模式运行

MR 程序是被提交给 LocalJobRunner 在本地以单进程的形式运行,而处理的数据及输出结果可以在本地文件系统,也可以在 HDFS 上。

右键直接运行 main() 方法所在的主类即可。本地模式非常便于进行业务逻辑的 debug。

如果 Driver 类配置了 config.set("mapreduce.framework.name", "local"),那么就算使用 hadoop/yarn 命令启动 jar,也会本地运行。

如何区分 MapReduce 的运行模式?

(1)登录 YARN 集群查看,是否有程序执行过的记录。
(2)通过查看执行日志,提示是否为本地模式运行:Job 作业的编号中是否有 local 关键字,如果有,就是本地模式。