六、MapReduce

发布时间 2023-12-04 20:49:35作者: SIKeborn

一、MapReduce设计理念

 

map--->映射

 

reduce--->归纳

 

mapreduce必须构建在hdfs之上的一种大数据离线计算框架

 

在线:实时数据处理

 

离线:数据处理时效性没有在线那么强,但是相对也需要很快得到结果

 

mapreduce不会马上得到结果,他会有一定的延时(磁盘IO)

 

如果数据量小,使用mapreduce反而不合适

 

杀鸡焉用宰牛刀

 

原始数据-->map(Key,Value)-->Reduce

 

分布式计算

 

将大的数据切分成多个小数据,交给更多的节点参与运算

 

计算向数据靠拢

 

将计算传递给有数据的节点上进行工作

 

二、MapReduce架构特点

MapReduce1.x

 

 

JobTracker

   主节点,单点,负责调度所有的作业和监控整个集群的资源负载。

TaskTracker

   从节点,自身节点资源管理和JobTracker进行心跳联系,汇报资源和获取task。

Client

   以作业为单位,规划作业计算分布,提交作业资源到HDFS,最终提交作业到JobTracker。

Slot(槽):

属于JobTracker分配的资源(计算能力、IO能力等)。

不管任务大小,资源是恒定的,不灵活但是好管理。

Task(MapTask-->ReduceTask):

开始按照MR的流程执行业务。

当任务完成时,JobTracker告诉TaskTracker回收资源。

MapReduce1.x的弊端

  1.JobTracker负载过重,存在单点故障。

  2.资源管理和计算调度强耦合,其它计算框架难以复用其资源管理。

  3.不同框架对资源不能全局管理。

 

MapReduce2.x

 

 

Client: 客户端发送MR任务到集群,其中客户端有很多种类,例如hadoop jar

ResourceManager: 资源协调框架的管理者,分为主节点和备用节点(防止单点故障,主备的切换基于ZK的管理),它时刻与NodeManager保持心跳,接受NodeManager的汇报(NodeManager当前节点的资源情况)。

当有外部框架要使用资源的时候直接访问ResourceManager即可。

如果是有MR任务,先去ResourceManager申请资源,ResourceManager根据汇报分配资源,例如资源在NodeManager1,那么NodeManager1要负责开辟资源。

Yarn(NodeManager): Yarn(Yet Another Resource Negotiator,另一种资源协调者),统一管理资源。以后其他的计算框架可以直接访问yarn获取当前集群的空闲节点。

每个DataNode上默认有一个NodeManager,NodeManager汇报自己的信息到ResourceManager。

Container: 它是动态分配的,2.X资源的代名词。

ApplicationMaster: 我们本次任务的主导者,负责调度本次被分配的资源Container。当所有的节点任务全部完成,applicaion告诉ResourceManager请求杀死当前ApplicationMaster线程,本次任务的所有资源都会被释放。

Task(MapTask--ReduceTask): 开始按照MR的流程执行业务,当任务完成时,ApplicationMaster接收当前节点的反馈。

 

YARN【Yet Another Resource Negotiator】:Hadoop 2.0新引入的资源管理系统,直接从MRv1演化而来的。

核心思想:将MRv1中JobTracker的资源管理和任务调度两个功能分开,分别由ResourceManager和ApplicationMaster进程实现:

   ResourceManager:负责整个集群的资源管理和调度。

   ApplicationMaster:负责应用程序相关的事务,比如任务调度、任务监控和容错等。

YARN的引入,使得多个计算框架可运行在一个集群中 每个应用程序对应一个ApplicationMaster 目前多个计算框架可以运行在YARN上,比如MapReduce、Spark、Storm等。

 

三、MR的计算流程

计算1T数据中每个单词出现的次数---->wordcount

 

 

1 原始数据File(可以从网上找一篇英文的文章)

The books chronicle the adventures of the adolescent wizard Harry Potter and his best friends Ron Weasley and Hermione Granger, all of whom are students at Hogwarts School of Witchcraft and Wizardry. 

1T数据被切分成块存放在HDFS上,每一个块有128M大小

2 数据块Block

block块是hdfs上数据存储的一个单元,同一个文件中块的大小都是相同的

因为数据存储到HDFS上不可变,所以有可能块的数量和集群的计算能力不匹配

我们需要一个动态调整本次参与计算节点数量的一个单位

我们可以动态的改变这个单位–-->参与的节点

3 切片Split

目的:动态地控制计算单元的数量

切片是一个逻辑概念

在不改变现在数据存储的情况下,可以控制参与计算的节点数目

通过切片大小可以达到控制计算节点数量的目的

有多少个切片就会执行多少个Map任务

一般切片大小为Block的整数倍(2 1/2)

防止多余创建和很多的数据连接

如果Split大小 > Block大小 ,计算节点少了

如果Split大小 < Block大小 ,计算节点多了

默认情况下,Split切片的大小等于Block的大小 ,默认128M,如果读取到最后一个block块的时候,与前一个blokc块组合起来的大小小于128M*1.1的话,他们结合生一个split切片,生成一个map任务

一个切片对应一个MapTask

4 MapTask

map默认从所属切片读取数据,每次读取一行(默认读取器)到内存中(map种的逻辑作用在每一行上)

我们可以根据自己书写的分词逻辑(空格,逗号等分隔),计算每个单词出现的次数(wordcount)

这时会产生(Map<String,Integer>)临时数据,存放到内存中the books chronicle the adventures of the adolescent wizard Harry Potter and his best friends Ron Weasley and Hermione Granger, all of whom are students at Hogwarts School of Witchcraft and Wizardry


the 1
books 1
chronicle 1
the 1
adventures 1
of 1
... Wizardry 1

 

 

但是内存的大小是有限的,如果每个任务随机的去占用内存,会导致内存不可控。多个任务同时执行有可能内存溢出(OOM)

如果把数据都直接放到硬盘,效率太低

所以想个方案,内存和硬盘结合,我们要做的就是在OOM和效率低之间提供一个有效方案,可以先往内存中写入一部分数据,然后写出到硬盘

环形缓冲区(KV-Buffer)

可以循环利用这块内存区域,减少数据溢写时map的停止时间

每一个Map可以独享的一个内存区域

在内存中构建一个环形数据缓冲区(kvBuffer),默认大小为100M

设置缓冲区的阈值为80%(设置阈值的目的是为了同时写入和写出),当缓冲区的数据达到80M开始向外溢写到硬盘

溢写的时候还有20M的空间可以被使用效率并不会被减缓

而且将数据循环写到硬盘,不用担心OOM问题

说完这个先说溢写,合并,拉取(分析出问题得到结论),再说中间的分区排序

6 分区Partition(环形缓冲区做的)

根据Key直接计算出对应的Reduce

分区的数量和Reduce的数量是相等的

hash(key) % partation(reduce的数量) = num

默认分区的算法是Hash然后取余

Object的hashCode()—equals()

如果两个对象equals,那么两个对象的hashcode一定相等

如果两个对象的hashcode相等,但是对象不一定equlas

7 排序Sort(环形缓冲区做的,快速排序,对前面分区后的编号进行排序,使得相同编号的在一起)

对要溢写的数据进行排序(QuickSort)

按照先Partation后Key的顺序排序–>相同分区在一起,相同Key的在一起

我们将来溢写出的小文件也都是有序的

 

8 溢写Spill

将内存中的数据循环写到硬盘,不用担心OOM问题

每次会产生一个80M的文件

如果本次Map产生的数据较多,可能会溢写多个文件

 

9 合并Merge

因为溢写会产生很多有序(分区 key)的小文件,而且小文件的数目不确定

后面向reduce传递数据带来很大的问题

所以将小文件合并成一个大文件,将来拉取的数据直接从大文件拉取即可

合并小文件的时候同样进行排序(归并 排序),最终产生一个有序的大文件

0 组合器Combiner

a. 集群的带宽限制了mapreduce作业的数量,因此应该尽量避免map和reduce任务之间的数据传输,hadoop允许用户对map的输出数据进行处理,用户可自定义combiner函数(如同map函数和reduce函数一般),其逻辑一般和reduce函数一样,combiner的输入是map的输出,combiner的输出作为reduce的输入,很多情况下可以i直接将reduce函数作为conbiner函数来试用(job.setCombinerClass(FlowCountReducer.class))。

b. combiner属于优化方案,所以无法确定combiner函数会调用多少次,可以在环形缓存区溢出文件时调用combiner函数,也可以在溢出的小文件合并成大文件时调用combiner,但是要保证不管调用多少次,combiner函数都不影响最终的结果,所以不是所有处理逻辑都可以i使用combiner组件,有些逻辑如果试用了conbiner函数会改变最后reduce的输出结果(如求几个数的平均值,就不能先用conbiner求一次各个map输出结果的平均值,再求这些平均值的平均值,那样会导致结果的错误)。

c. combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量:

原先传给reduce的数据时a1 a1 a1 a1 a1

第一次combiner组合后变成a(1,1,1,1,1)

第二次combiner后传给reduce的数据变为a(5,5,6,7,23,...)

 

11 拉取Fetch

我们需要将Map的临时结果拉取到Reduce节点

第一种方式:两两合并
第二种方式:相同的进一个reduce
第三种对第二种优化,排序
第四种对第三种优化:如果一个reduce处理两种key,而key分布一个首一个尾,解决不连续的问题,给个编号,这个编号怎么算呢,`回到分区,排序`

原则(用统计姓氏的例子画图理解)

相同的Key必须拉取到同一个Reduce节点

但是一个Reduce节点可以有多个Key

未排序前拉取数据的时候必须对Map产生的最终的合并文件做全序遍历

而且每一个reduce都要做一个全序遍历

如果map产生的大文件是有序的,每一个reduce只需要从文件中读取自己所需的即可

 

12 合并Merge

因为reduce拉取的时候,会从多个map拉取数据

那么每个map都会产生一个小文件,这些小文件(文件与文件之间无序,文件内部有序)

为了方便计算(没必要读取N个小文件),需要合并文件

归并算法合并成2个

相同的key都在一起

13 归并Reduce

将文件中的数据读取到内存中

一次性将相同的key全部读取到内存中

直接将相同的key得到结果–>最终结果

14 写出Output

每个reduce将自己计算的最终结果都会存放到HDFS上

 

15 MapReduce过程截图(扑克牌案例)

 

 

四、分词器

#maven添加依赖,父依赖 pom.xml
<!-- https://mvnrepository.com/artifact/com.hankcs/hanlp --> <dependency> <groupId>com.hankcs</groupId> <artifactId>hanlp</artifactId> <version>portable-1.7.8</version> </dependency>

 

#子依赖 pom.xml
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <configuration> <descriptorRefs> <!-- 打包出来的带依赖jar包名称 --> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <!--下面是为了使用 mvn package命令,如果不加则使用mvn assembly--> <executions> <execution> <id>make-assemble</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>

 

 

 

Combiner

job.setCombinerClass

combiner发生在map端的reduce操作。

作用是减少map端的输出,减少shuffle过程中网络传输的数据量,提高作业的执行效率。

combiner仅仅是单个map task的reduce,没有对全部map的输出做reduce。 如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。

使用combiner,先完成的map会在本地聚合,提升速度。

注意:Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以,Combine适合于等幂操作,比如累加,最大值等。求平均数不适合

 

五、MapReduce源码分析(高级部分)

快捷键

ctrl+alt+方向键:查看上一个或者下一个方法
ctrl+shift+alt+c: 拷贝方法的全名   com.shujia.airPM25.Pm25Avg#main  
ctrl+alt+b:查看当前接口的实现类

 

1 Split

带着问题看源码:

1、map的数量和切片的数量一样?

2、split的大小可以自己调节吗?算法是什么?

 源代码的分析从提交任务开始

job.waitForCompletion(true);

 

 org.apache.hadoop.mapreduce.Job#waitForCompletion

/**
   * Submit the job to the cluster and wait for it to finish.
   * @param verbose print the progress to the user
   * @return true if the job succeeded
   * @throws IOException thrown if the communication with the 
   *         <code>JobTracker</code> is lost
   */
  public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException,ClassNotFoundException {
    //判断当前的状态
    if (state == JobState.DEFINE) {
        //=============关键代码================
      submit();
    }
    //监控任务的运行状态
    if (verbose) {
      monitorAndPrintJob();
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
      //返回任务状态
    return isSuccessful();
  }

 

 

 org.apache.hadoop.mapreduce.Job#submit

public void submit() throws IOException, InterruptedException, ClassNotFoundException {
    //确认当前任务的状态
    ensureState(JobState.DEFINE);
    //mapreduce1.x和2.x,但是2的时候将1的好多方法进行了优化
    setUseNewAPI();
    //获取当前任务所运行的集群
    connect();
    //创建Job的提交器
    final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException,  ClassNotFoundException {
        //提交任务到系统去执行 
        //Internal method for submitting jobs to the system
        //===========关键代码============
        return submitter.submitJobInternal(Job.this, cluster);
      }
    });
    //任务的状态修改为运行
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
   }

 

org.apache.hadoop.mapreduce.JobSubmitter#submitJobInternal

//validate the jobs output specs 
    //检查一下输出路径存不存在呀,有没有权限之类的
    checkSpecs(job);
    //生成并设置新的JobId
    JobID jobId = submitClient.getNewJobID();
    job.setJobID(jobId);
    //获取任务的提交目录
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    // Create the splits for the job
    LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
    //===========关键代码============ 197行
    int maps = writeSplits(job, submitJobDir);
    //设置map的数量,其中map的数量就等于切片的数量
    conf.setInt(MRJobConfig.NUM_MAPS, maps);
    LOG.info("number of splits:" + maps);

 

org.apache.hadoop.mapreduce.JobSubmitter#writeSplits

private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException {
    //获取作业的配置文件
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    //今后我们看源码的时候,想都不要想,看新的方式
    if (jConf.getUseNewMapper()) {
      //===========关键代码============
      maps = writeNewSplits(job, jobSubmitDir);
    } else {
      maps = writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;
  }

 

 

org.apache.hadoop.mapreduce.JobSubmitter#writeNewSplits

 private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException {
    //获取集群配置
    Configuration conf = job.getConfiguration();
    //通过反射工具获取文件读取器对象
    //===========关键代码============ job 的实现类
    //org.apache.hadoop.mapreduce.lib.input.TextInputFormat --> input
    //job->org.apache.hadoop.mapreduce.task.JobContextImpl#getInputFormatClass
    InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
    //获取到切片
    //===========关键代码============ getSplits
    List<InputSplit> splits = input.getSplits(job);
    //转成数组
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);
    
    //返回的是数组的长度,对应着切片的数量,回到197行验证
    return array.length;
  }

 

 

org.apache.hadoop.mapreduce.task.JobContextImpl#getInputFormatClass

 /**
   * Get the {@link InputFormat} class for the job.
   * 
   * @return the {@link InputFormat} class for the job.
   */
  @SuppressWarnings("unchecked")
  public Class<? extends InputFormat<?,?>> getInputFormatClass() throws ClassNotFoundException {
      return (Class<? extends InputFormat<?,?>>) 
      //getClass的操作是如果有值返回值,没有的话使用默认值
      conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
  }

 

org.apache.hadoop.mapreduce.lib.input.FileInputFormat#getSplits

/** 
   * Generate the list of files and make them into FileSplits.
   * @param job the job context
   * @throws IOException
   */
  public List<InputSplit> getSplits(JobContext job) throws IOException {
    StopWatch sw = new StopWatch().start();
    //开始计算两个变量(一个切片最少有一个字节,一个最小切片值也是1)
    //1
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    //Long.MAX_VALUE
    long maxSize = getMaxSplitSize(job);

    // generate splits
    //创建一个List存放切片
    List<InputSplit> splits = new ArrayList<InputSplit>();
    //获取本次计算中所有的要计算的文件
    List<FileStatus> files = listStatus(job);
    //首先取出一个文件
    for (FileStatus file: files) {
      //获取文件路径
      Path path = file.getPath();
      //获取文件大小
      long length = file.getLen();
      if (length != 0) {
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          //获取文件块的信息
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        //判断文件是否可以被切分,比如文件被压缩了,需要解压缩才可以
        if (isSplitable(job, path)) {
          //获取单个块的大小
          long blockSize = file.getBlockSize();
          //开始计算切片大小,这里可以验证切片大小与block大小一样
          //思考如何生成256M的切片
          
            
          //如果切片小于blocksize-->将maxsize小于blocksize
          //如果切片大于blocksize-->将minsize大于blocksize
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);

          //将文件大小分配给bytesRemaining
          long bytesRemaining = length;
          //private static final double SPLIT_SLOP = 1.1;   // 10% slop
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            //制作切片
            //封装切片对象并将其存放到list中
            //makeSplit(路径,偏移量,切片大小,块的位置,备份的位置);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }

          //如果最后一个文件过小没有大于1.1,就与上一个一起生成切片
          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } else { // not splitable
          //如果文件不可切,就生成一个切片
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
    }
    //返回切片(后面的代码我们跟不进去了,是yarn上面的了)
    return splits;
  }

 

计算切片大小逻辑

org.apache.hadoop.mapreduce.lib.input.FileInputFormat#computeSplitSize

protected long computeSplitSize(long blockSize, long minSize,
                                  long maxSize) {
    //blockSize--128M
    //maxSize--56M ----> 56M
    //minSize--256M ----> 256M
    return Math.max(minSize, Math.min(maxSize, blockSize));
  }

 

 

2 Map源码-MapTask

带着问题:

1、map读取数据按照行读取数据?验证一下

2、如果切片把一行数据放在了两个切片中呢?怎么办?

3、map里面的第一个参数类型是LongWritable?哪里指定的?

 

org.apache.hadoop.mapred.MapTask

 

 

@Override
  public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, ClassNotFoundException, InterruptedException {
    this.umbilical = umbilical;

    //判断是否为Map任务
    if (isMapTask()) {
      // If there are no reducers then there won't be any sort. Hence the map 
      // phase will govern the entire attempt's progress.
      //判断reduce数量是否等于0,有可能等于0的如果我们只是清洗数据,就不需要
      if (conf.getNumReduceTasks() == 0) {
        //map所占的比例100%,没有reducce就不用分区了
        mapPhase = getProgress().addPhase("map", 1.0f);
      } else {
        // If there are reducers then the entire attempt's progress will be 
        // split between the map phase (67%) and the sort phase (33%).
        //如果有reduce的话分区排序
        mapPhase = getProgress().addPhase("map", 0.667f);
        sortPhase  = getProgress().addPhase("sort", 0.333f);
      }
    }
    //任务报告一下,说明我要处理多少数据
    TaskReporter reporter = startReporter(umbilical);
 
    //使用新api
    boolean useNewApi = job.getUseNewMapper();
    //===========关键代码============
    //使用新api进行初始化
    initialize(job, getJobID(), reporter, useNewApi);

    // check if it is a cleanupJobTask
    if (jobCleanup) {
      runJobCleanupTask(umbilical, reporter);
      return;
    }
    if (jobSetup) {
      runJobSetupTask(umbilical, reporter);
      return;
    }
    if (taskCleanup) {
      runTaskCleanupTask(umbilical, reporter);
      return;
    }

    if (useNewApi) {
      //===========关键代码============
      runNewMapper(job, splitMetaInfo, umbilical, reporter);
    } else {
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    done(umbilical, reporter);
  }

 

 

org.apache.hadoop.mapred.Task#initialize

 public void initialize(JobConf job, JobID id, 
                         Reporter reporter,
                         boolean useNewApi) throws IOException, 
                                                   ClassNotFoundException,
                                                   InterruptedException {
    //获取作业的上下文
    jobContext = new JobContextImpl(job, id, reporter);
    //获取任务的上下文
    taskContext = new TaskAttemptContextImpl(job, taskId, reporter);
    if (getState() == TaskStatus.State.UNASSIGNED) {
      setState(TaskStatus.State.RUNNING);
    }
    if (useNewApi) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("using new api for output committer");
      }
      //创建了一个outputFormat对象
      outputFormat = ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), job);
      committer = outputFormat.getOutputCommitter(taskContext);
    } else {
      committer = conf.getOutputCommitter();
    }
    Path outputPath = FileOutputFormat.getOutputPath(conf);
    if (outputPath != null) {
      if ((committer instanceof FileOutputCommitter)) {
        FileOutputFormat.setWorkOutputPath(conf, 
          ((FileOutputCommitter)committer).getTaskAttemptPath(taskContext));
      } else {
        FileOutputFormat.setWorkOutputPath(conf, outputPath);
      }
    }
    committer.setupTask(taskContext);
    Class<? extends ResourceCalculatorProcessTree> clazz =
        conf.getClass(MRConfig.RESOURCE_CALCULATOR_PROCESS_TREE,
            null, ResourceCalculatorProcessTree.class);
    pTree = ResourceCalculatorProcessTree
            .getResourceCalculatorProcessTree(System.getenv().get("JVM_PID"), clazz, conf);
    LOG.info(" Using ResourceCalculatorProcessTree : " + pTree);
    if (pTree != null) {
      pTree.updateProcessTree();
      initCpuCumulativeTime = pTree.getCumulativeCpuTime();
    }
  }

 

 

org.apache.hadoop.mapred.MapTask#runNewMapper

@SuppressWarnings("unchecked")
  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runNewMapper(final JobConf job,
                    final TaskSplitIndex splitIndex,
                    final TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ) throws IOException, ClassNotFoundException,
                             InterruptedException {
    // make a task context so we can get the classes
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
                                                                  getTaskID(),
                                                                  reporter);
    // make a mapper--com.shujia.MyMapper
   //对应自己写的map类  TaskAttemptContextImpl
  org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
    // make the input format
    //org.apache.hadoop.mapreduce.lib.input.TextInputFormat
    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
    // rebuild the input split
    //获取切片
    org.apache.hadoop.mapreduce.InputSplit split = null;
    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
        splitIndex.getStartOffset());
    LOG.info("Processing split: " + split);

    //===========关键代码============ NewTrackingRecordReader
    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
      new NewTrackingRecordReader<INKEY,INVALUE>
        (split, inputFormat, reporter, taskContext);
    
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    org.apache.hadoop.mapreduce.RecordWriter output = null;
    
    // get an output object
    if (job.getNumReduceTasks() == 0) {
      //如果reduce数量等于0,直接输出
      output = 
        new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    } else {
      //如果reduce数量不等于0,待会来看,看下面的初始化
      output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    }

    org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
    mapContext = 
      new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
          input, output, 
          committer, 
          reporter, split);

    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
        mapperContext = 
          new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
              mapContext);

    try {
      //===========关键代码============
      //初始化的时候有意识的将第一行省略了
      input.initialize(split, mapperContext);
      //实际上调用的就是我们自己重写的map方法
      //===========关键代码============
      mapper.run(mapperContext);
      mapPhase.complete();
      setPhase(TaskStatus.Phase.SORT);
      statusUpdate(umbilical);
      input.close();
      input = null;
      output.close(mapperContext);
      output = null;
    } finally {
      closeQuietly(input);
      closeQuietly(output, mapperContext);
    }
  }

 

 

org.apache.hadoop.mapred.MapTask.NewTrackingRecordReader#NewTrackingRecordReader

NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
    org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat,
    TaskReporter reporter,
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
    throws InterruptedException, IOException {
  this.reporter = reporter;
  this.inputRecordCounter = reporter
      .getCounter(TaskCounter.MAP_INPUT_RECORDS);
  this.fileInputByteCounter = reporter
      .getCounter(FileInputFormatCounter.BYTES_READ);

  List <Statistics> matchedStats = null;
  if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
    matchedStats = getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit) split)
        .getPath(), taskContext.getConfiguration());
  }
  fsStats = matchedStats;

  long bytesInPrev = getInputBytes(fsStats);
  
  //===========关键代码============
  //真正工作的人是谁,创建一个记录读取器
  //返回的是一个行记录读取器
  this.real = inputFormat.createRecordReader(split, taskContext);
  long bytesInCurr = getInputBytes(fsStats);
  fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
}

 

 

org.apache.hadoop.mapreduce.RecordReader

 

@Override
public RecordReader<LongWritable, Text> 
  createRecordReader(InputSplit split,
                     TaskAttemptContext context) {
  String delimiter = context.getConfiguration().get(
      "textinputformat.record.delimiter");
  byte[] recordDelimiterBytes = null;
  if (null != delimiter)
    recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
  return new LineRecordReader(recordDelimiterBytes);
}

 

 

org.apache.hadoop.mapreduce.lib.input.LineRecordReader#initialize

public void initialize(InputSplit genericSplit,
                       TaskAttemptContext context) throws IOException {
  FileSplit split = (FileSplit) genericSplit;
  Configuration job = context.getConfiguration();
  this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
  //获取开始的位置
  start = split.getStart();
  end = start + split.getLength();
  final Path file = split.getPath();

  // open the file and seek to the start of the split
  //获取分布式文件系统
  final FileSystem fs = file.getFileSystem(job);
  //获取一个输入流
  fileIn = fs.open(file);
  
  CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
  if (null!=codec) {
    isCompressedInput = true;  
    decompressor = CodecPool.getDecompressor(codec);
    if (codec instanceof SplittableCompressionCodec) {
      final SplitCompressionInputStream cIn =
        ((SplittableCompressionCodec)codec).createInputStream(
          fileIn, decompressor, start, end,
          SplittableCompressionCodec.READ_MODE.BYBLOCK);
      in = new CompressedSplitLineReader(cIn, job,
          this.recordDelimiterBytes);
      start = cIn.getAdjustedStart();
      end = cIn.getAdjustedEnd();
      filePosition = cIn;
    } else {
      in = new SplitLineReader(codec.createInputStream(fileIn,
          decompressor), job, this.recordDelimiterBytes);
      filePosition = fileIn;
    }
  } else {
    //读取偏移量
    fileIn.seek(start);
    in = new UncompressedSplitLineReader(
        fileIn, job, this.recordDelimiterBytes, split.getLength());
    filePosition = fileIn;
  }
  // If this is not the first split, we always throw away first record
  // because we always (except the last split) read one extra line in
  // next() method.
  //解决第二个问题 从第二行开始读,把切片的第一行将给上一个切片去读
  if (start != 0) {
    //返回的start正好是下一行数据的开头
    start += in.readLine(new Text(), 0, maxBytesToConsume(start));
  }
  this.pos = start;
}

 

 

org.apache.hadoop.mapreduce.lib.map.WrappedMapper

 @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
      return mapContext.nextKeyValue();
    }

 

org.apache.hadoop.mapreduce.task.MapContextImpl#MapContextImpl

@Override
  public boolean nextKeyValue() throws IOException, InterruptedException {
    //最终调用的是LineRecordReader中的nextKeyValue方法
    return reader.nextKeyValue();
  }

 

org.apache.hadoop.mapreduce.lib.input.LineRecordReader

public boolean nextKeyValue() throws IOException {
  if (key == null) {
    key = new LongWritable();
  }
  //设置当前的偏移量
  key.set(pos);
  if (value == null) {
    value = new Text();
  }
  int newSize = 0;
  // We always read one extra line, which lies outside the upper
  // split limit i.e. (end - 1)
  //循环读取数据
  while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
    if (pos == 0) {
      newSize = skipUtfByteOrderMark();
    } else {
      //pos是当前数据的定位,value是数据
      newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
      pos += newSize;
    }

    if ((newSize == 0) || (newSize < maxLineLength)) {
      break;
    }

    // line too long. try again
    LOG.info("Skipped line of size " + newSize + " at pos " + 
             (pos - newSize));
  }
  if (newSize == 0) {
    //如果本次啥也没有读到,返回false
    key = null;
    value = null;
    return false;
  } else {
    //读到了返回true
    return true;
  }
}

//看完这里回到map方法,key就是数据的偏移量,value就是一行数据,context上下文,写到环形缓冲区,map结束

 

3 KV-Buffer

通过context中的write方法进行写

带着问题:

1、分区的数量和reduce数量一样?

2、环形缓冲区内存大小100M?80%溢写?可以自己设置吗?

3、排序是快速排序?

4、怎么分区的?Hash?

org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl

  /**
   * Generate an output key/value pair.
   */
  public void write(KEYOUT key, VALUEOUT value
                    ) throws IOException, InterruptedException {
    output.write(key, value);
  }

通过参数的个数

org.apache.hadoop.mapred.MapTask.NewOutputCollector#NewOutputCollector

@SuppressWarnings("unchecked")
    NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
                       JobConf job,
                       TaskUmbilicalProtocol umbilical,
                       TaskReporter reporter
                       ) throws IOException, ClassNotFoundException {
      //========关键代码=============
      collector = createSortingCollector(job, reporter);
      partitions = jobContext.getNumReduceTasks();
      if (partitions > 1) {
        //========关键代码============= 分区器
        partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
          ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
      } else {
        partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
          @Override
          public int getPartition(K key, V value, int numPartitions) {
            return partitions - 1;
          }
        };
      }
    }

org.apache.hadoop.mapred.MapTask#createSortingCollector

private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
          createSortingCollector(JobConf job, TaskReporter reporter)
    throws IOException, ClassNotFoundException {
    MapOutputCollector.Context context =
      new MapOutputCollector.Context(this, job, reporter);

    //========关键代码==========
    Class<?>[] collectorClasses = job.getClasses(
      JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);
    int remainingCollectors = collectorClasses.length;
    Exception lastException = null;
    for (Class clazz : collectorClasses) {
      try {
        if (!MapOutputCollector.class.isAssignableFrom(clazz)) {
          throw new IOException("Invalid output collector class: " + clazz.getName() +
            " (does not implement MapOutputCollector)");
        }
        Class<? extends MapOutputCollector> subclazz =
          clazz.asSubclass(MapOutputCollector.class);
        LOG.debug("Trying map output collector class: " + subclazz.getName());
        MapOutputCollector<KEY, VALUE> collector =
          ReflectionUtils.newInstance(subclazz, job);
        
        //====================进行初始化===============
        collector.init(context);
        LOG.info("Map output collector class = " + collector.getClass().getName());
          
        //返回MapOutputBuffer
        return collector;
      } catch (Exception e) {
        String msg = "Unable to initialize MapOutputCollector " + clazz.getName();
        if (--remainingCollectors > 0) {
          msg += " (" + remainingCollectors + " more collector(s) to try)";
        }
        lastException = e;
        LOG.warn(msg, e);
      }
    }
    throw new IOException("Initialization of all the collectors failed. " +
      "Error in last collector was :" + lastException.getMessage(), lastException);
  }

 

org.apache.hadoop.mapred.MapTask.MapOutputBuffer#init

 public void init(MapOutputCollector.Context context
                    ) throws IOException, ClassNotFoundException {
      job = context.getJobConf();
      reporter = context.getReporter();
      mapTask = context.getMapTask();
      mapOutputFile = mapTask.getMapOutputFile();
      sortPhase = mapTask.getSortPhase();
      spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
      partitions = job.getNumReduceTasks();
      rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();

      //sanity checks
      //溢写
      final float spillper =
        job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
      final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
      indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
                                         INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
      if (spillper > (float)1.0 || spillper <= (float)0.0) {
        throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
            "\": " + spillper);
      }
      if ((sortmb & 0x7FF) != sortmb) {
        throw new IOException(
            "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
      }
     //默认排序器是快速排序
      sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
            QuickSort.class, IndexedSorter.class), job);
      // buffers and accounting
     
      //100左移2位 ×2^20
      int maxMemUsage = sortmb << 20;
      //对16进行取余,让这个数字变成16的整数倍
      maxMemUsage -= maxMemUsage % METASIZE;
      //环形缓冲区100M
      //并且设置环形缓冲区的一些初始值
      kvbuffer = new byte[maxMemUsage];
      bufvoid = kvbuffer.length;
      kvmeta = ByteBuffer.wrap(kvbuffer)
         .order(ByteOrder.nativeOrder())
         .asIntBuffer();
      setEquator(0);
      bufstart = bufend = bufindex = equator;
      kvstart = kvend = kvindex;

      //双重索引,大家课下可以自己了解
      maxRec = kvmeta.capacity() / NMETA;
      softLimit = (int)(kvbuffer.length * spillper);
      bufferRemaining = softLimit;
      LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);
      LOG.info("soft limit at " + softLimit);
      LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
      LOG.info("kvstart = " + kvstart + "; length = " + maxRec);

      // k/v serialization
      //如果是自己自定义的类型,需要自定义排序器
      comparator = job.getOutputKeyComparator();
      keyClass = (Class<K>)job.getMapOutputKeyClass();
      valClass = (Class<V>)job.getMapOutputValueClass();
      //
      serializationFactory = new SerializationFactory(job);
      keySerializer = serializationFactory.getSerializer(keyClass);
      keySerializer.open(bb);
      valSerializer = serializationFactory.getSerializer(valClass);
      valSerializer.open(bb);

      // output counters
      mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
      mapOutputRecordCounter =
        reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
      fileOutputByteCounter = reporter
          .getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);

      // compression
      if (job.getCompressMapOutput()) {
        Class<? extends CompressionCodec> codecClass =
          job.getMapOutputCompressorClass(DefaultCodec.class);
        codec = ReflectionUtils.newInstance(codecClass, job);
      } else {
        codec = null;
      }

      // combiner
      final Counters.Counter combineInputCounter =
        reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
      combinerRunner = CombinerRunner.create(job, getTaskID(), 
                                             combineInputCounter,
                                             reporter, null);
      if (combinerRunner != null) {
        final Counters.Counter combineOutputCounter =
          reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
        combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
      } else {
        combineCollector = null;
      }
      spillInProgress = false;
      minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
      spillThread.setDaemon(true);
      spillThread.setName("SpillThread");
      spillLock.lock();
      try {
        spillThread.start();
        while (!spillThreadRunning) {
          spillDone.await();
        }
      } catch (InterruptedException e) {
        throw new IOException("Spill thread failed to initialize", e);
      } finally {
        spillLock.unlock();
      }
      if (sortSpillException != null) {
        throw new IOException("Spill thread failed to initialize",
            sortSpillException);
      }
    }

org.apache.hadoop.mapreduce.task.JobContextImpl#getPartitionerClass 默认是hash分区

 @SuppressWarnings("unchecked")
  public Class<? extends Partitioner<?,?>> getPartitionerClass() 
     throws ClassNotFoundException {
    return (Class<? extends Partitioner<?,?>>) 
      conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
  }

 

 

org.apache.hadoop.mapreduce.lib.partition.HashPartitioner

public class HashPartitioner<K, V> extends Partitioner<K, V> {

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}

output-->NewOutputCollector--->write

    @Override
    public void write(K key, V value) throws IOException, InterruptedException {
      collector.collect(key, value,
                        partitioner.getPartition(key, value, partitions));
    }

 

MapOutputBuffer

 /**
     * Serialize the key, value to intermediate storage.
     * When this method returns, kvindex must refer to sufficient unused
     * storage to store one METADATA.
     */
    public synchronized void collect(K key, V value, final int partition
                                     ) throws IOException {
      reporter.progress();
      if (key.getClass() != keyClass) {
        throw new IOException("Type mismatch in key from map: expected "
                              + keyClass.getName() + ", received "
                              + key.getClass().getName());
      }
      if (value.getClass() != valClass) {
        throw new IOException("Type mismatch in value from map: expected "
                              + valClass.getName() + ", received "
                              + value.getClass().getName());
      }
      if (partition < 0 || partition >= partitions) {
        throw new IOException("Illegal partition for " + key + " (" +
            partition + ")");
      }
      checkSpillException();
      bufferRemaining -= METASIZE;
      if (bufferRemaining <= 0) {
        // start spill if the thread is not running and the soft limit has been
        // reached
        spillLock.lock();
        try {
          do {
            if (!spillInProgress) {
              final int kvbidx = 4 * kvindex;
              final int kvbend = 4 * kvend;
              // serialized, unspilled bytes always lie between kvindex and
              // bufindex, crossing the equator. Note that any void space
              // created by a reset must be included in "used" bytes
              final int bUsed = distanceTo(kvbidx, bufindex);
              final boolean bufsoftlimit = bUsed >= softLimit;
              if ((kvbend + METASIZE) % kvbuffer.length !=
                  equator - (equator % METASIZE)) {
                // spill finished, reclaim space
                resetSpill();
                bufferRemaining = Math.min(
                    distanceTo(bufindex, kvbidx) - 2 * METASIZE,
                    softLimit - bUsed) - METASIZE;
                continue;
              } else if (bufsoftlimit && kvindex != kvend) {
                // spill records, if any collected; check latter, as it may
                // be possible for metadata alignment to hit spill pcnt
                //====开始溢写====================
                startSpill();
                final int avgRec = (int)
                  (mapOutputByteCounter.getCounter() /
                  mapOutputRecordCounter.getCounter());
                // leave at least half the split buffer for serialization data
                // ensure that kvindex >= bufindex
                final int distkvi = distanceTo(bufindex, kvbidx);
                final int newPos = (bufindex +
                  Math.max(2 * METASIZE - 1,
                          Math.min(distkvi / 2,
                                   distkvi / (METASIZE + avgRec) * METASIZE)))
                  % kvbuffer.length;
                setEquator(newPos);
                bufmark = bufindex = newPos;
                final int serBound = 4 * kvend;
                // bytes remaining before the lock must be held and limits
                // checked is the minimum of three arcs: the metadata space, the
                // serialization space, and the soft limit
                bufferRemaining = Math.min(
                    // metadata max
                    distanceTo(bufend, newPos),
                    Math.min(
                      // serialization max
                      distanceTo(newPos, serBound),
                      // soft limit
                      softLimit)) - 2 * METASIZE;
              }
            }
          } while (false);
        } finally {
          spillLock.unlock();
        }
      }

4 溢写Spill

output--->NewOutputCollector

  @Override
  public void close(TaskAttemptContext context
                    ) throws IOException,InterruptedException {
    try {
      //===============关键代码===============
      collector.flush();
    } catch (ClassNotFoundException cnf) {
      throw new IOException("can't find class ", cnf);
    }
    collector.close();
  }
}

collector--->MapOutputBuffer

public void flush() throws IOException, ClassNotFoundException,
       InterruptedException {
  LOG.info("Starting flush of map output");
  if (kvbuffer == null) {
    LOG.info("kvbuffer is null. Skipping flush.");
    return;
  }
  spillLock.lock();
  try {
    while (spillInProgress) {
      reporter.progress();
      spillDone.await();
    }
    checkSpillException();

    final int kvbend = 4 * kvend;
    if ((kvbend + METASIZE) % kvbuffer.length !=
        equator - (equator % METASIZE)) {
      // spill finished
      resetSpill();
    }
    if (kvindex != kvend) {
      kvend = (kvindex + NMETA) % kvmeta.capacity();
      bufend = bufmark;
      LOG.info("Spilling map output");
      LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
               "; bufvoid = " + bufvoid);
      LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
               "); kvend = " + kvend + "(" + (kvend * 4) +
               "); length = " + (distanceTo(kvend, kvstart,
                     kvmeta.capacity()) + 1) + "/" + maxRec);
      sortAndSpill();
    }
  } catch (InterruptedException e) {
    throw new IOException("Interrupted while waiting for the writer", e);
  } finally {
    spillLock.unlock();
  }
  assert !spillLock.isHeldByCurrentThread();
  // shut down spill thread and wait for it to exit. Since the preceding
  // ensures that it is finished with its work (and sortAndSpill did not
  // throw), we elect to use an interrupt instead of setting a flag.
  // Spilling simultaneously from this thread while the spill thread
  // finishes its work might be both a useful way to extend this and also
  // sufficient motivation for the latter approach.
  try {
    spillThread.interrupt();
    spillThread.join();
  } catch (InterruptedException e) {
    throw new IOException("Spill failed", e);
  }
  // release sort buffer before the merge
  kvbuffer = null;
  
  //当最后一个数据写出后,开始对溢写的小文件进行合并
  mergeParts();
  Path outputPath = mapOutputFile.getOutputFile();
  fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());
}

 

1)Read阶段:MapTask通过InputFormat获得的RecordReader,从输入InputSplit中解析出一个个key/value。

(2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。

(3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。

(4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。 溢写阶段详情: 步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。 步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。 步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。

(5)Merge阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。 当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。 在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并mapreduce.task.io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。

让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

 

5 Reduce

run方法

@Override
@SuppressWarnings("unchecked")
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
  throws IOException, InterruptedException, ClassNotFoundException {
  job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());

  //reduce的三个阶段
  if (isMapOrReduce()) {
    copyPhase = getProgress().addPhase("copy");
    sortPhase  = getProgress().addPhase("sort");
    reducePhase = getProgress().addPhase("reduce");
  }
  // start thread that will handle communication with parent
  TaskReporter reporter = startReporter(umbilical);
  
  boolean useNewApi = job.getUseNewReducer();
  //初始化信息
  initialize(job, getJobID(), reporter, useNewApi);

  // check if it is a cleanupJobTask
  if (jobCleanup) {
    runJobCleanupTask(umbilical, reporter);
    return;
  }
  if (jobSetup) {
    runJobSetupTask(umbilical, reporter);
    return;
  }
  if (taskCleanup) {
    runTaskCleanupTask(umbilical, reporter);
    return;
  }
  
  // Initialize the codec
  codec = initCodec();
  RawKeyValueIterator rIter = null;
  ShuffleConsumerPlugin shuffleConsumerPlugin = null;
  
  Class combinerClass = conf.getCombinerClass();
  CombineOutputCollector combineCollector = 
    (null != combinerClass) ? 
   new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;

  Class<? extends ShuffleConsumerPlugin> clazz =
        job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
         
  shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
  LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);

  ShuffleConsumerPlugin.Context shuffleContext = 
    new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, 
                super.lDirAlloc, reporter, codec, 
                combinerClass, combineCollector, 
                spilledRecordsCounter, reduceCombineInputCounter,
                shuffledMapsCounter,
                reduceShuffleBytes, failedShuffleCounter,
                mergedMapOutputsCounter,
                taskStatus, copyPhase, sortPhase, this,
                mapOutputFile, localMapFiles);
    
  //=================关键代码=========================
  shuffleConsumerPlugin.init(shuffleContext);

  rIter = shuffleConsumerPlugin.run();

  // free up the data structures
  mapOutputFilesOnDisk.clear();
  
  sortPhase.complete();                         // sort is complete
  setPhase(TaskStatus.Phase.REDUCE); 
  statusUpdate(umbilical);
  Class keyClass = job.getMapOutputKeyClass();
  Class valueClass = job.getMapOutputValueClass();
  RawComparator comparator = job.getOutputValueGroupingComparator();

  if (useNewApi) {
    runNewReducer(job, umbilical, reporter, rIter, comparator, 
                  keyClass, valueClass);
  } else {
    runOldReducer(job, umbilical, reporter, rIter, comparator, 
                  keyClass, valueClass);
  }

  shuffleConsumerPlugin.close();
  done(umbilical, reporter);
}