14-Yarn(1)

发布时间 2023-07-29 21:30:26作者: tree6x7

1. 框架概述

1.1 发展简史

数据、程序、运算资源(内存、CPU)三者组在一起,完成了数据的计算处理过程。在单机环境下,这些都不是太大问题。为了应对海量数据的场景,Hadoop 出现并提供了分而治之的分布式处理思想。通过对 Hadoop 版本演进的简单回顾,可以让我们知道 YARN 的产生和发展简史,洞悉 YARN 发展进程。

很多 Hadoop 的早期用户使用 Hadoop 的方式与在众多主机上运行桌面应用程序类似:

  1. 在少量几个节点上手工建立一个集群;
  2. 将数据载入 Hadoop 分布式文件系统(HDFS);
  3. 通过运行 MapReduce 任务来运算并获得结果;
  4. 然后拆掉集群。

这种方式的一部分原因是没有在 Hadoop HDFS 上持久存储数据的迫切需求,另一部分原因是没有共享数据和计算结果的动机。

a. Ad Hoc 集群

Ad Hoc 应当理解为专用、特定的意思(数仓领域中常理解为“即席查询”)。Ad Hoc 集群时代标志着 Hadoop 集群的起源,集群以 Ad Hoc、单用户方式建立。

后来,随着私人集群的使用和 Hadoop 容错性的提高,持久的 HDFS 集群出现,并且实现了 HDFS 集群的共享,把常用和感兴趣的数据集载入 HDFS 共享集群中。

当共享 HDFS 成为现实,还没实现共享的计算平台就成为关切对象。

不同于 HDFS,为多个组织的多个用户简单设置一个共享 MapReduce 集群并非易事。尤其是集群下的物理资源的共享很不理想。

b. HOD 集群

为了解决集群条件下的多租户问题, Yahoo 发展并且部署了称为“Hadoop on Demand”的平台。Hadoop On Demand(HOD)是一个能在大规模物理集群上供应虚拟 Hadoop 集群的系统。在已经分配的节点上,HOD 会启动 MapReduce 和 HDFS 守护进程来响应用户数据和应用的请求。

特点:用户可以使用 HOD 来同时分配多个 MapReduce 集群。

缺点:无法支持数据本地化、资源回收效率低、无动态扩容缩容能力、多租户共享延迟高等。

c. 共享计算集群

共享 MapReduce 计算集群和与之协同工作的共享 HDFS 是 Hadoop 1.x 版本里的主要架构。

这种共享计算架构的主要组件如下所示:

  • JobTracker:一个中央守护进程,负责运行集群上的所有作业。
  • TaskTracker:系统里的从进程,根据 JobTracker 的指令来执行任务。

共享计算集群的主要弊端有 JobTracker 可扩展性瓶颈(在内存中保存用户作业的数据)、JobTracker 身兼多职(作业数据管理、作业状态记录、作业调度)、可靠性和可用性欠缺(单点故障)、计算模型的单一(不是所有问题都能 MapReduce)。

MapReduce 框架本身也经历了很多变化。但 MapReduce 被绑定到了集群的管理层,证明 MapReduce 的变化演变是比较困难的。

d. YARN 集群

针对共享计算集群,JobTracker 需要彻底地重写,才能解决扩展性的主要问题。但是,这种重写即使成功了,也不一定能解决平台和用户代码的耦合问题,也不能解决用户对 !MapReduce 编程模型的需求。如果不做重大的重新设计,集群可用性会继续被捆绑到整个系统的稳定性上。

拆分 MapReduce,剥离出资源管理成为单独框架,YARN 闪亮登场,一款被设计用以解决以往架构的需求和缺陷的资源管理和调度软件。MapReduce 专注于数据处理,两者解耦合。

YARN 被设计用以解决以往架构的需求和缺陷的资源管理和调度软件。

对 YARN 的需求:

  • 可扩展性:可以平滑的扩展至数万节点和并发的应用。
  • 可维护性:保证集群软件的升级与用户应用程序完全解耦。
  • 多租户:需要支持在同一集群中多个租户并存,同时支持多个租户间细颗粒度地共享单个节点。
  • 位置感知:将计算移至数据所在位置。
  • 高集群使用率:实现底层物理资源的高使用率。
  • 安全和可审计的操作:继续以安全的、可审计的方式使用集群资源。
  • 可靠性和可用性:具有高度可靠的用户交互、并支持高可用性
  • 对编程模型多样化的支持:支持多样化的编程模型,需要演进为不仅仅以 MapReduce 为中心。
  • 灵活的资源模型:支持各个节点的动态资源配置以及灵活的资源模型。
  • 向后兼容:保持现有的 MapReduce 应用程序的向后兼容性。

1.2 YARN 简述

Apache Hadoop YARN (Yet Another Resource Negotiator,另一种资源协调者)是一种新的 Hadoop 资源管理器,它是一个通用资源管理系统和调度平台,可为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处。

  • 资源管理系统:集群的硬件资源,和程序运行相关,比如内存、CPU 等。
  • 调度平台:多个程序同时申请计算资源如何分配,调度的规则(算法)。
  • 通用:不仅仅支持 MapReduce 程序,理论上支持各种计算程序。YARN 不关心你干什么,只关心你要资源,在有的情况下给你,用完之后还我。

可以把 Hadoop YARN 理解为相当于一个分布式的操作系统平台,而 MapReduce 等计算程序则相当于运行于操作系统之上的应用程序,YARN 为这些程序提供运算所需的资源(内存、CPU 等)。

Hadoop 能有今天这个地位,YARN 可以说是功不可没。因为有了 YARN ,更多计算框架可以接入到 HDFS 中,而不单单是 MapReduce,正是因为 YARN 的包容,使得其他计算框架能专注于计算性能的提升。

HDFS 可能不是最优秀的大数据存储系统,但却是应用最广泛的大数据存储系统, YARN 功不可没。

YARN 支持各种数据处理引擎对 HDFS 中的数据进行批处理、交互式和流处理。在不同的场景中使用不同的框架,常见的包括 MapReduce、Spark、Storm、Flink 等 Application。这种架构可以更好、更优雅地进行扩展。因此从一开始就内置了高可用性、安全性和多租户支持更多用户在大型集群上使用,新架构还将提高创新性,敏捷性和硬件利用率。

此外,YARN 提供以下功能:

  • 多租户:可以使用多个开放源代码和专有数据访问引擎来批量、交互式和实时访问同一数据集。多租户数据处理可提高企业在 Hadoop 投资上的回报。
  • Docker 容器化:可以使用 Docker 容器化来并行运行同一应用程序的多个版本。
  • 集群利用率:可以动态分配群集资源以提高资源利用率。
  • 多种资源类型:可以使用多种资源类型,例如 CPU 和内存。
  • 可扩展性:提高了数据中心的处理能力。YARN 的 ResourceManager 仅专注于调度,并在集群扩展到管理数 PB 数据的数千个节点时保持同步。
  • 兼容性:Hadoop 1.x 的 MapReduce 应用程序可在 YARN 上运行,而不会破坏现有流程。YARN 与 Hadoop 的先前稳定版本保持API兼容性。

1.3 YARN 与 MRv1 区别

由于 MRv1(第一代 MapReduce)在扩展性、可靠性、资源利用率和多框架等方面存在明显不足, Apache 开始尝试对 MapReduce 进行升级改造,于是诞生了更加先进的下一代 MapReduce 计算框架 MRv2。

并且在MRv2中,将资源管理任务调度模块单独抽离出来,构建成了一个独立的通用资源管理系统 YARN,而 MRv2 则专注于数据的计算处理了。

  • 在 Hadoop1 中,MapReduce(MRv1)负责数据计算、资源管理。
  • 在 Hadoop2 中,MapReduce(MRv2)负责数据计算,YARN 负责资源管理。

a. MRv1 架构

在 Hadoop 1.0 中 MapReduce 框架和 HDFS 一样,MapReduce 也是采用 Master/Slave的 架构,其架构如下图所示:

MapReduce 包含 4 个组成部分,分别为 Client、JobTracker、TaskTracker、Task。

  • 【Client 客户端】每一个 Job 都会在用户端通过 Client 类,将应用程序以及参数配置 Configuration 打包成 Jar 文件存储在 HDFS,并把路径提交到 JobTracker 的 Master 服务,然后由 Master 创建每一个 Task(即 MapTask 和 ReduceTask),将它们分发到各个 TaskTracker 服务中去执行。
  • 【JobTracker 管理主节点】JobTracker 负责资源监控和作业调度。JobTracker 监控所有的 TaskTracker 与 Job 的健康状况,一旦发现失败,就将相应的任务转移到其它节点;同时 JobTracker 会跟踪任务的执行进度,资源使用量等信息,并将这些信息告诉任务调度器,而调度器会在资源出现空闲时,选择合适的任务使用这些资源。在 Hadoop 中,任务调度器是一个可插拔的模块,用于可以根据自己的需要设计相应的调度器。
  • 【TaskTracker 执行从节点】TaskTracker 会周期性地通过「心跳」将本节点上资源的使用情况和任务的运行进度汇报给 JobTracker,同时执行 JobTracker 发送过来的命令并执行相应的操作(如启动新任务、杀死任务等)。TaskTracker 使用「slot」等量划分本节点上的资源量。“slot”代表计算资源(CPU、内存等) 。一个 Task 获取到一个 slot 之后才有机会运行,而 Hadoop 调度器的作用就是将各个 TaskTracker 上的空闲 slot 分配给 Task 使用。slot 分为 MapSlot 和 ReduceSlot 两种,分别提供 MapTask 和 ReduceTask 使用。TaskTracker 通过 slot 数目(可配置参数)限定 Task 的并发度。
  • 【Task 计算任务】Task 分为 MapTask 和 ReduceTask 两种,均由 TaskTracker 启动。HDFS 以固定大小的 block 为基本单位存储数据,而对于 MapReduce 而言,其处理单位是 split。split 是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。但需要注意的是,split 的多少决定了 MapTask 的数目,因为每一个 split 只会交给一个 MapTask 处理。

JobTracker 负责资源和任务的管理与调度, TaskTracker 负责单个节点的资源管理和任务执行。 MRv1将资源管理和应用程序管理两部分混杂在一起,使得它在扩展性、容错性和多框架支持等方面存在明显缺陷。

Hadoop YARN 是在 MRv1 基础上演化而来的,它克服了 MRv1 中的各种局限性,概括为以下几个方面:

  • 扩展性差:在 MRv1 中, JobTracker 同时兼备了资源管理和作业控制两个功能,这成为系统的一个最大瓶颈,严重制约了 Hadoop 集群扩展性。
  • 可靠性差:MRv1 采用了 Master/Slave 结构,其中 Master 存在单点故障问题,一旦它出现故障将导致整个集群不可用。
  • 资源利用率低: MRv1 采用了基于槽位的资源分配模型,槽位是一种粗粒度的资源划分单位,通常一个任务不会用完槽位对应的资源,且其他任务也无法使用这些空闲资源。此外, Hadoop 将槽位分为 Map Slot 和 Reduce Slot 两种,且不允许它们之间共享,常常会导致一种槽位资源紧张而另外一种闲置(比如一个作业刚刚提交时,只会运行 Map Task,此时 Reduce Slot 闲置)。
  • 无法支持多种计算框架:随着互联网高速发展, MapReduce 这种基于磁盘的离线计算框架已经不能满足应用要求,从而出现了一些新的计算框架,包括内存计算框架、流式计算框架和迭代式计算框架等,而 MRv1 不能支持多种计算框架并存。

b. YARN 架构

为了克服以上几个缺点, Apache 开始尝试对 Hadoop 进行升级改造,进而诞生了更加先进的下一代 MapReduce 计算框架 MRv2。正是由于 MRv2 将资源管理功能抽象成了一个独立的通用系统 YARN,直接导致下一代 MapReduce 的核心从单一的计算框架 MapReduce 转移为通用的资源管理系统 YARN。

YARN 实际上是一个弹性计算平台,它的目标已经不再局限于支持 MapReduce 一种计算框架,而是朝着对多种框架进行统一管理的方向发展。

Hadoop2.0 即第二代 Hadoop,由分布式存储系统 HDFS、并行计算框架 MapReduce 和分布式资源管理系统 YARN 三个系统组成,其中 YARN 是一个资源管理系统,负责集群资源管理和调度,MapReduce 则是运行在 YARN 上的离线处理框架,称为 MRv2(MapReduce 的第 2 版)。

MRv1 主要由编程模型(由新旧 API 组成)、数据处理引擎(由 MapTask 和 ReduceTask 组成)和运行时环境(由一个 JobTracker 和若干个 TaskTracker 组成)三部分组成,为了保证编程模型的向后兼容性, MRv2 重用了 MRv1 中的编程模型和数据处理引擎,但运行时环境被完全重写,具体如下:

  • MRv2 重用了 MRv1 中的编程模型和数据处理引擎。
  • 为了能够让用户应用程序平滑迁移到 Hadoop 2.0 中, MRv2 应尽可能保证编程接口的向后兼容性,但由于 MRv2 本身进行了改进和优化,它在向后兼容性方面存在少量问题。MapReduce 应用程序编程接口有两套,分别是新 API(mapredue)和旧 API(mapred) , MRv2 可做到以下兼容性 :采用 MRv1 旧 API 编写的应用程序,可直接使用之前的 JAR 包将程序运行在 MRv2 上;但采用 MRv1 新 API 编写的应用程序则不可以,需要使用 MRv2 编程库重新编译并修改不兼容的参数和返回值。
  • MRv1 的运行时环境主要由两类服务组成,分别是 JobTracker 和 TaskTracker。JobTracker 负责资源和任务的管理与调度,TaskTracker 负责单个节点的资源管理和任务执行。MRv1 将资源管理和应用程序管理两部分混杂在一起,使得它在扩展性、容错性和多框架支持等方面存在明显缺陷。MRv2 则通过将资源管理和应用程序管理两部分剥离开,分别由 YARN 和 ApplicationMaster 负责,其中, YARN 专管资源管理和调度,而 ApplicationMaster 则负责与具体应用程序相关的任务切分、任务调度和容错等。

2. YARN 集群部署

如何管理集群资源?如何给任务合理分配资源?

YARN 是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而 MapReduce 等运算程序则相当于运行于操作系统之上的应用程序。

Apache Hadoop YARN 是一种开源的分布式资源管理和作业调度技术,它是作为 Apache Hadoop 的核心组件之一,负责将系统资源(计算、存储和网络资源)分配给运行在 Hadoop 集群中的各种应用程序,并对运行在各集群节点上的任务进行调度。在生产环境中,通常采用分布式模式安装部署 YARN 集群。

2.1 集群规划

a. 集群角色

YARN 集群是一个标准的 Master/Slave 结构(主从结构),其中 ResourceManager(RM) 为 Master, NodeManager(NM) 为 Slave。常见的是一主多从集群,也可以搭建 RM 的 HA 高可用集群。

  • ResourceManager(RM)
    • RM 是 YARN 中的主角色,决定系统中所有应用程序之间资源分配的最终权限,即最终仲裁者;
    • RM 接收用户的作业提交,并通过 NodeManager 分配、管理各个机器上的计算资源(资源以 Container 形式给予);
    • 此外 RM 还具有一个可插拔组件 scheduler,负责为各种正在运行的应用程序分配资源,根据策略进行调度。
  • NodeManager(NM)
    • NM 是 YARN 中的从角色,一台机器上一个,负责管理本机器上的计算资源。
    • NM 根据 RM 命令,启动 Container 容器、监视容器的资源使用情况,并向 RM 主角色汇报资源使用情况。

b. 部署规划

理论上 YARN 集群可以部署在任意机器上,但是实际中,通常把 NodeManager 和 DataNode 部署在同一台机器上(有数据的地方就有可能产生计算,移动程序的成本比移动数据的成本低)。

作为 Apache Hadoop 的一部分,通常会把 YARN 集群和 HDFS 集群一起搭建。

服务器 运行角色
node1 NameNode|DataNode|ResourceManager|NodeManager
node2 SecondaryNameNode|DataNode|NodeManager
node3 DataNode|NodeManager

2.2 集群部署

a. 基础环境准备

(1)三台机器创建软件目录相关结构

目录 作用
/export/software 软件压缩包存储
/export/server 软件安装
/export/data 测试数据

(2)关闭防火墙

sudo systemctl stop firewalld
sudo systemctl disable firewalld

(3)三台机器禁用 SELINUX

sudo vim /etc/selinux/config
# 将 SELINUX=enforcing 修改为 SELINUX=disabled,重启虚拟机使之生效。

(4)在三台机器上配置 IP 地址与主机名称映射

192.168.88.100 node1 
192.168.88.101 node2 
192.168.88.102 node3

(5)配置三台机器之间 SSH 无密钥登录,方便后续集群服务启动

# 生成密钥:一直回车,默认生成密钥文件
ssh-keygen -t rsa
# 执行命令,将公钥复制到其余 3 台机器上
ssh-copy-id node1
ssh-copy-id node2
ssh-copy-id node3

(6)安装 JDK

# 在三台机器上安装 JDK8,安装之前先卸载 OPENJDK,相关命令如下
rpm -qa|grep java
rpm -e --nodeps java-1.8.0-openjdk-xxx

# 上传JDK8软件包并解压至 /export/server 目录,在 /etc/profile 配置环境变量
export JAVA_HOME=/export/server/jdk
export PATH=:$PATH:$JAVA_HOME/bin

# 执行如下命令生效
source /etc/profile

b. HDFS 集群部署

(1)解压 Hadoop 软件包到 /export/server/

(2)配置环境变量

export HADOOP_HOME=/export/server/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

修改完后 source /etc/profile(三台机器都要配置环境变量,便使之生效,方便后续直接使用命令)

(3)hadoop-env.sh

在 Hadoop 环境变量脚本配置 JDK 和 Hadoop 安装目录。

执行命令:vim /export/server/hadoop/etc/hadoop/hadoop-env.sh

修改内容如下:

export JAVA_HOME=/export/server/jdk
export HADOOP_HOME=/export/server/hadoop

由于 Hadoop3 开始,执行 start-dfs.sh 脚本启动 HDFS 服务时,默认使用 !root 普通用户,如果使用 root 用户启动,需要在 hadoop-env.sh 添加如下变量:

export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root 
export YARN_RESOURCEMANAGER_USER=root 
export YARN_NODEMANAGER_USER=root

(4)core-site.xml

配置 Hadoop Common 模块公共属性,编辑 core-site.xml 文件。

执行命令:vim /export/server/hadoop/etc/hadoop/core-site.xml

增加配置内容:

<!-- 默认文件系统的名称(通过URI中schema区分不同文件系统) -->
<!-- file:///本地文件系统 | hdfs:// Hadoop分布式文件系统-->
<!-- HDFS文件系统访问地址 http://nn_host:8020。-->
<property>
    <name>fs.defaultFS</name>
    <value>hdfs://node1:8020</value>
</property>
<!-- Hadoop 本地数据存储目录 format时自动生成 -->
<property>
    <name>hadoop.tmp.dir</name>
    <value>/export/data/hadoop-3.1.4</value>
</property>
<!-- 在WebUI访问HDFS使用的用户名 -->
<property>
    <name>hadoop.http.staticuser.user</name>
    <value>root</value>
</property>

(5)hdfs-site.xml

配置 HDFS 分布式文件系统相关属性。

执行命令:vim /export/server/hadoop/etc/hadoop/hdfs-site.xml

增加配置内容:

<!-- 设定SNN运行主机和端口 -->
<property>
    <name>dfs.namenode.secondary.http-address</name>
    <value>node2:9868</value>
</property>

(6)配置 HDFS 集群中从节点 DataNode 所运行机器

执行命令:vim /export/server/hadoop/etc/hadoop/workers

增加配置内容:

node1
node2
node3

(7)将 node1 上配置好的 HDFS 分发到 node2 和 node3

(8)第一次启动 HDFS 文件之前需要先格式 HDFS 文件系统

$ hdfs namenode -format

(9)启动 HDFS 集群

在 node1 上启动 HDFS 集群服务(NameNode 和 DataNodes):

$ hdfs --daemon start namenode
$ hdfs --workers --daemon start datanode

查看各个机器上服务是否启动,命令如下:

$ for i in `seq 1 3`; do echo node$i; ssh node$i `which jps`; done

查看 HDFS WEB UI:http://node1:9870/

c. YARN 集群部署

在 HDFS 集群基础之上配置 YARN 集群,目前不考虑 RM HA 高可用,服务规划如下:

node1 node2 node3
NodeManager ResourceManager|NodeManager NodeManager

(1)yarn-site.xml

<!-- yarn集群主角色RM运行机器 -->
<property>
  <name>yarn.resourcemanager.hostname</name>
  <value>node1</value>
</property>
<!-- NodeManager上运行的附属服务,需配置成mapreduce_shuffle才可运行MR程序。-->
<property>
  <name>yarn.nodemanager.aux-services</name>
  <value>mapreduce_shuffle</value>
</property>
<!-- 容器虚拟内存与物理内存之间的比率 -->
<property>
  <name>yarn.nodemanager.vmem-pmem-ratio</name>
  <value>4</value>
</property>
<!-- 开启yarn日志聚集功能,收集每个容器的日志集中存储在一个地方 -->
<property>
  <name>yarn.log-aggregation-enable</name>
  <value>true</value>
</property>
<!-- 日志保留时间设置为1天 -->
<property>
  <name>yarn.log-aggregation.retain-seconds</name>
  <value>86400</value>
</property>
<property>
  <name>yarn.log.server.url</name>
  <value>http://node1:19888/jobhistory/logs</value>
</property>

(2)mapred-site.xml

Hadoop YARN 集群运行以后,运行 MapReduce 程序,所以需要配置 MapReduce 框架运行 YARN 上时相关属性。

<!-- MR 程序默认运行方式(yarn集群模式 local本地模式) -->
<property>
  <name>mapreduce.framework.name</name>
  <value>yarn</value>
</property>
<!-- JobHistory 服务配置 (注意19888是WEBUI访问端口) -->
<property>
  <name>mapreduce.jobhistory.address</name>
  <value>node1:10020</value>
</property>
<property>
  <name>mapreduce.jobhistory.webapp.address</name>
  <value>node1:19888</value>
</property>
<!-- MR App Master 环境变量 -->
<property>
  <name>yarn.app.mapreduce.am.env</name>
  <value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value>
</property>
<!-- MR MapTask 环境变量 -->
<property>
  <name>mapreduce.map.env</name>
  <value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value>
</property>
<!-- MR ReduceTask 环境变量 -->
<property>
  <name>mapreduce.reduce.env</name>
  <value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value>
</property>

(3)同步配置到 node2、node3

(4)启动 YARN 集群

在启动YARN集群之前,首先启动 HDFS 集群,再进行启动。在 node2 上启动 YARN 服务组件,相关命令如下:

$ yarn --daemon start resourcemanager
$ yarn --workers --daemon start nodemanager

查看各个机器上服务是否启动,命令如下:

$ for i in `seq 1 3`; do echo node$i; ssh node$i `which jps`; done

查看 YARN WEB UI 界面:http://node2:8088/

(5)启动 MR 历史服务

当 MapReduce 运行在 YARN 上完成以后,可以从历史服务查看 MR 运行状况,在 node1 上启动:

$ mapred --daemon start historyserver

(6)将官方提供词频统计 WordCount 程序运行在 YARN 集群上

  1. 准备数据

    vim input.data
    # 内容如下
    # hadoop hive hive spark flink
    # hadoop flink flink spark flink spark hdfs
    # hadoop flink flink spark
    
  2. 数据文件上传 HDFS

    $ hdfs dfs -mkdir -p /data
    $ hdfs dfs -put input.txt /data
    
  3. 提交程序运行

    $ yarn jar ${HADOOP_HOME}/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.4.jar wordcount /data/input.txt /data/output
    
  4. 查看 YARN WEB UI 监控页面,可以看到提交运行 MapReduce 程序

2.3 RM 重启机制

ResourceManager 负责资源管理和应用的调度,是 YARN 的核心组件,存在单点故障的问题。ResourceManager 重启机制是使 RM 在重启动时能够使 Yarn 集群正常工作的特性,并且使 RM 出现的失败不被用户知道(重启机制并不是自动帮我们重启的意思,所以不能解决单点故障问题)。

不开启 RM 重启机制现象:如果 RM 出现故障重启之后,之前的信息将会消失。正在执行的作业也会失败。

a. 两种重新启动类型

Non-work-preserving RM restart 不保留工作的 RM 重启,在 Hadoop 2.4.0 版本实现。

在 Hadoop 2.4.0 版本实现,当 Client 提交一个 application 给 RM 时,RM 会将该 application 的相关信息存储起来,具体存储的位置是可以在配置文件中指定的,可以存储到本地文件系统上,也可以存储到 HDFS 或 Zookeeper 上,此外 RM 也会保存 application 的最终状态信息(failed/killed/finished),如果是在安全环境下运行,RM 还会保存相关证书文件。

当 RM 被关闭后,NodeManager(以下简称 NM)和 Client 由于发现连接不上 RM,会不断的向 RM 发送消息,以便于能及时确认 RM 是否已经恢复正常。当 RM 重新启动后,它会发送一条 re-sync(重新同步)命令给所有的 NM 和 ApplicationMaster(以下简称 AM),NM 收到重新同步的命令后会杀死所有的正在运行的 containers 并重新向 RM 注册,从 RM 的角度来看,每台重新注册的 NM 跟一台新加入到集群中 NM 是一样的。

AM 收到重新同步的命令后会自行将自己杀掉。接下来,RM 会将存储的关于 application 的相关信息读取出来,将在 RM 关闭之前最终状态为正在运行中的 application 重新提交运行

Work-preserving RM restart 保留工作的 RM 重启,在 Hadoop 2.6.0 版本中实现。

从 Hadoop 2.6.0 开始增强了 RM 重启功能,与不保留工作不同的地方在于,RM 会记录下 container 的整个生命周期的数据,包括 application 运行的相关数据,资源申请状况,队列资源使用状况等数据。

当 RM 重启之后,会读取之前存储的关于 application 的运行状态的数据,同时发送 re-sync 的命令,与第一种方式不同的是,NM 在接收到重新同步的命令后并不会杀死正在运行的 containers,而是继续运行 containers 中的任务,同时将 containers 的运行状态发送给 RM,之后,RM 根据自己所掌握的数据重构 container 实例和相关的 application 运行状态,如此一来,就实现了在 RM 重启之后,接着 RM 关闭时任务的执行状态继续执行

小结:

  • 不保留工作 RM 重启机制只保存了 application 提交的信息和最终执行状态,并不保存运行过程中的相关数据,所以 RM 重启后,会先杀死正在执行的任务,再重新提交,从零开始执行任务。
  • 保留工作 RM 重启机制保存了 application 运行中的状态数据,所以在 RM 重启之后,不需要杀死之前的任务,而是接着原来执行到的进度继续执行。

b. 基于 ZK 转态管理

在 YARN 用户配置文件 yarn-site.xml 中启用 RM 重启功能,使用 Zookeeper 进行转态数据存储。

<property>
    <name>hadoop.zk.address</name>
    <value>node1:2181,node2:2181,node3:2181</value>
</property>
<property>
    <name>yarn.resourcemanager.recovery.enabled</name>
    <value>true</value>
</property>
<property>
    <name>yarn.resourcemanager.store.class</name>
    <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>

如果启用了 RM 的重启机制,升级为 Active 状态的 RM 会初始化 RM 内部状态和恢复先前活动 RM 留下的状态,这依赖于 RM 的重启特性。而之前提交到 RM 托管的作业会发起新的尝试请求。用户提交的应用可以考虑进行周期性的 CheckPoint 来避免任务丢失。

RM 的重启机制本质上是将 RM 内部的状态信息写入外部存储,在 RM 启动时会初始化状态信息的目录,当 Application 运行时会将相关的状态写入对应的目录下。如果 RM 发生故障切换,新的 Active 状态的 RM 会通过外部存储进行恢复。RM 状态存储的实现是 RMStateStore 抽象类,YARN 对 RMStateStore 提供了几种实例:

从类继承图可以看出提供5种方式存储状态,比对如下:

状态存储方式 说明
Memory MemoryRMStateStore 是基于内存的状态存储实现,使用 RMState 对象存储 RM 所有的状态。
ZooKeeper ZKRMStateStore 是基于 ZooKeeper 的状态存储实现,支持 RM 的 HA,只有基于 ZooKeeper 的状态存储支持隔离机制,能避免出现裂脑情况发生,允许有多个处于 Active 状态的 RM 同时编辑状态存储。建议在 YARN 的 HA 中使用。
FileSystem FileSystemRMStateStore 支持 HDFS 和基于本地 FS 的状态存储实现。不支持隔离机制。
LevelDB LeveldbRMStateStore 是基于 LevelDB 的状态存储实现,它比基于 HDFS 和 ZooKeeper 的状态存储库更轻巧。LevelDB 支持更好的原子操作,每个状态更新的 I/O 操作更少,文件系统上的文件总数也少得多。不支持隔离机制。
Null NullRMStateStore 是一个空实现。

以 ZKRMStateStore 为例,活动 RM 的所有状态信息存储在 ZooKeeper 的 /rmstore/ZKRMStateRoot 下,主要有 ReservationSystemRoot、RMAppRoot、AMRMTokenSecretManagerRoot、EpochNode、RMDTSecretManagerRoot 和RMVersionNode 共 6 个 Znode。

这 6 个 Znode 的结构和作用主要涵盖了 RM 资源预留信息、应用信息,应用的 Token 信息,RM 版本信息,如下:

各种类型 Znode 节点说明如下:

Znode 名称 说明
ReservationSystemRoot RM 的资源预留系统,对应的实现是 ReservationSystem 接口的子类。
RMAppRoot Application 信息,对应的实现是 RMApp 接口的子类。
AMRMTokenSecretManagerRoot ApplicationAttempt 的 Token 信息,RM 会将每个 Token 保存在本地的内存中,直到应用程序运行完成为止,并保存到 ZooKeeper 存储以供重新启动。对应的实现是 AMRMTokenSecretManager 类。
EpochNode RM 的保存工作重启的时间信息。每次 RM 重新启动时,epoch 都会增加。它用于确保 ContainerId 的唯一性。对应的实现是 Epoch 抽象类。
RMDTSecretManagerRoot 一个特定于 RM 的委托令牌保密管理器。保密管理器负责生成和接受每个令牌的密码。
RMVersionNode RM 的版本信息

2.4 YARN · HA

ResourceManager(RM)负责管理群集中的资源和调度应用程序(如 MR、Spark 等)。在 Hadoop 2.4 之前,YARN 群集中的 ResourceManager 存在 SPOF(Single Point of Failure 单点故障)。

为了解决 ResourceManager 的单点问题,YARN 设计了一套 Active/Standby 模式的 ResourceManager HA 高可用架构。

在运行期间有多个 ResourceManager 同时存在来增加冗余进而消除这个单点故障,并且只能有一个 ResourceManager 处于 Active 状态,其他的则处于 Standby 状态,当 Active 节点无法正常工作,其余 Standby 状态的几点则会通过竞争选举产生新的 Active 节点。

a. 高可用 HA 架构

ResourceManager 的 HA 通过 Active/Standby 体系实现,其底层通过 ZooKeeper 集群来存储 RM 的状态信息、应用程序的状态。如果 Active 状态的 RM 遇到故障,会通过切换 Standby 状态的 RM 为 Active 来继续为集群提供正常服务。

故障转移机制支持自动故障转移和手动故障转移两种方式实现。在生产环境中,自动故障转移应用更为广泛。

(1)手动故障转移

当没有启用自动故障转移时,管理员必须手动将一个 RM 转换为活动状态。要从一个 RM 到另一个 RM 进行故障转移,需要先把 Active 状态的 RM 转换为 Standby 状态的 RM,然后再将 Standby 状态的 RM 转换为 Active 状态的 RM。这些操作可用 yarn rmadmin 命令来完成。

(2)自动故障转移

RM 可以选择嵌入基于 Zookeeper 的 ActiveStandbyElector(org.apache.hadoop.ha.ActiveStandbyElector)来实现自动故障转移,以确定哪个 RM 应该是 Active。当 Active 状态的 RM 发生故障或无响应时,另一个 RM 被自动选为 Active,然后接管服务。YARN 的故障转移不需要像 HDFS 那样运行单独的 ZKFC 守护程序,因为 ActiveStandbyElector 是一个嵌入在 RM 中充当故障检测器和 Leader 选举的线程,而不是单独的 ZKFC 守护进程。

当有多个 RM 时,Clients 和 NMs 通过读取 yarn-site.xml 配置找到所有 ResourceManager。Clients、AM 和 NM 会轮训所有的 ResourceManager 并进行连接,直到找着 Active 状态的 RM。如果 Active 状态的 RM 也出现故障,它们就会继续查找,直到找着新的 Active 状态的 RM。

b. 故障转移原理

YARN 这个 Active/Standby 模式的 RM HA 架构在运行期间,会有多个 RM 同时存在,但只能有一个 RM 处于 Active 状态,其他的 RM 则处于 Standby 状态,当 Active 节点无法正常提供服务,其余 Standby 状态的 RM 则会通过竞争选举产生新的 Active 节点。以基于 ZooKeeper 这个自动故障切换为例,切换步骤如下:

  1. 主备切换,RM 使用基于 ZooKeeper 实现的 ActiveStandbyElector 组件来确定 RM 的状态是 Active 或 Standby。
  2. 创建锁节点,在 ZooKeeper 上会创建一个叫做 ActiveStandbyElectorLock 的锁节点,所有的 RM 在启动的时候,都会去竞争写这个临时的 Lock 节点,而 ZooKeeper 能保证只有一个 RM 创建成功。创建成功的 RM 就切换为 Active 状态,并将信息同步存入到 ActiveBreadCrumb 这个永久节点,那些没有成功的 RM 则切换为 Standby 状态。
  3. 注册 Watcher 监听,所有 Standby 状态的 RM 都会向 /yarn-leader-election/cluster1/ActiveStandbyElectorLock 节点注册一个节点变更的 Watcher 监听,利用临时节点的特性,能够快速感知到 Active 状态的 RM 的运行情况。
  4. 准备切换,当 Active 状态的 RM 出现故障(如宕机或网络中断),其在 ZooKeeper 上创建的 Lock 节点随之被删除,这时其它各个 Standby 状态的 RM 都会受到 ZooKeeper 服务端的 Watcher 事件通知,然后开始竞争写 Lock 子节点,创建成功的变为 Active 状态,其他的则是 Standby 状态。

Fencing(隔离):

在分布式环境中,机器经常出现假死的情况(常见的是 GC 耗时过长、网络中断或 CPU 负载过高)而导致无法正常对外进行及时响应。如果有一个处于 Active 状态的 RM 出现假死,其他的 RM 刚选举出来新的 Active 状态的 RM,这时假死的 RM 又恢复正常,还认为自己是 Active 状态,这就是分布式系统的脑裂现象,即存在多个处于 Active 状态的 RM,可以使用「隔离机制」来解决此类问题。

YARN 的 Fencing 机制是借助 ZooKeeper 数据节点的 ACL 权限控制来实现不同 RM 之间的隔离。这个地方改进的一点是,创建的根 ZNode 必须携带 ZooKeeper 的 ACL 信息,目的是为了独占该节点,以防止其他 RM 对该 ZNode 进行更新。借助这个机制,假死之后的 RM 会试图去更新 ZooKeeper 的相关信息,但发现没有权限去更新节点数据,就把自己切换为 Standby 状态。

c. HA 配置及测试

Zookeeper 集群配置安装部署启动完成以后,可以参考官方文档配置 YARN HA:

http://hadoop.apache.org/docs/r3.1.4/hadoop-yarn/hadoop-yarn-site/ResourceManagerHA.html

注意:先关闭 YARN 集群,执行命令:stop-yarn.sh 即可。

YARN HA 高可用部署配置,node2 和 node3 为 RM 服务节点。

[yarn-site.xml]

<!-- 删除这个配置 -->
<property>
  <name>yarn.resourcemanager.hostname</name>
  <value>node2</value>
</property>

<!-- 启用RM HA -->
<property>
  <name>yarn.resourcemanager.ha.enabled</name>
  <value>true</value>
</property>
<!-- RM HA集群标识ID -->
<property>
  <name>yarn.resourcemanager.cluster-id</name>
  <value>cluster1</value>
</property>
<!-- RM HA集群中各RM的逻辑标识 -->
<property>
  <name>yarn.resourcemanager.ha.rm-ids</name>
  <value>rm1,rm2</value>
</property>
<!-- RM1运行主机 -->
<property>
  <name>yarn.resourcemanager.hostname.rm1</name>
  <value>node1</value>
</property>
<!-- RM2运行主机 -->
<property>
  <name>yarn.resourcemanager.hostname.rm2</name>
  <value>node2</value>
</property>
<!-- RM1-WebUI地址 -->
<property>
  <name>yarn.resourcemanager.webapp.address.rm1</name>
  <value>node1:8088</value>
</property>
<!-- RM2-WebUI地址 -->
<property>
  <name>yarn.resourcemanager.webapp.address.rm2</name>
  <value>node2:8088</value>
</property>
<!-- 开启自动故障转移 -->
<property>
  <name>yarn.resourcemanager.ha.automatic-failover.enabled</name>
  <value>true</value>
</property>
<!-- ZK集群的地址 -->
<property>
  <name>hadoop.zk.address</name>
  <value>node1:2181,node2:2181,node3:2181</value>
</property>
<!-- 开启RM状态恢复重启机制 -->
<property>
  <name>yarn.resourcemanager.recovery.enabled</name>
  <value>true</value>
</property>
<!-- 使用ZK集群存储RM状态数据 -->
<property>
  <name>yarn.resourcemanager.store.class</name>
  <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>
<!-- NodeManager上运行的附属服务需配置成mapreduce_shuffle才可运行MR程序 -->
<property>
  <name>yarn.nodemanager.aux-services</name>
  <value>mapreduce_shuffle</value>
</property>
<!-- 容器虚拟内存与物理内存之间的比率 -->
<property>
  <name>yarn.nodemanager.vmem-pmem-ratio</name>
  <value>4</value>
</property>
<!-- 开启yarn日志聚集功能,收集每个容器的日志集中存储在一个地方 -->
<property>
  <name>yarn.log-aggregation-enable</name>
  <value>true</value>
</property>
<!-- 日志保留时间设置为1天 -->
<property>
  <name>yarn.log-aggregation.retain-seconds</name>
  <value>86400</value>
</property>
<property>
  <name>yarn.log.server.url</name>
  <value>http://node1:19888/jobhistory/logs</value>
</property>

同步配置文件到其他节点。

当配置 YARN 为 HA 时,使用 start-yarn.sh 命令就会启动所有的 RM。此外可以使用 YARN 提供的 yarn rmadmin -getAllServiceState 查看 RM 运行状态。

验证故障切换:

(1)查看 HA 状态

当 node3 节点的 RM 为 Active 状态、node2 节点的 RM 为 Standby 状态时,访问 http://node2:8088 会自动跳转到 http://node3:8088 中,表示 YARN HA 正确配置。

(2)自动故障切换

强制杀死 node3 节点的 RM,基于 ZooKeeper 的 ActiveStandbyElector 自动故障转移策略将 node2 节点的 RM 选举为 Active 状态,表示故障转移配置正确。

(3)手动故障切换

在非自动故障切换的 YARN 集群下进行手动故障切换可以使用命令进行故障转移切换。手动故障切换命令 yarn rmadmin -failover rm1 rm2 是 RM1(node2)故障转移到 RM2(node3)。

3. YARN 架构及原理

3.1 体系架构

YARN 总体上仍然是 Master/Slave 结构(主从结构),在整个资源管理框架中, ResourceManager 为 Master, NodeManager 为 Slave, ResourceManager 负责对各个 NodeManager 上的资源进行统一管理和调度。

当用户提交一个应用程序时,需要提供一个用以跟踪和管理这个程序的 ApplicationMaster,它负责向 ResourceManager 申请资源,并要求 NodeManger 启动可以占用一定资源的任务。由于不同的 ApplicationMaster 被分布到不同的节点上,因此它们之间不会相互影响。

上图描述了 YARN 的基本组成结构, YARN 主要由 ResourceManager、 NodeManager、ApplicationMaster(图中给出了 MapReduce 和 MPI 两种计算框架的 ApplicationMaster,分别为 MR AppMstr 和 MPI AppMstr)和 Container 等几个组件构成。

组件 说明
ResourceManager 负责整个集群的资源管理和分配,处理客户端和 AM 的请求,为 Container 分配资源(将任务调度到不同的NM上运行),同时通过 NM 的心跳获取所有 Container 的运行状态,并进行必要的失败重试。
NodeManager 集群资源(CPU 和内存)的实际拥有者,接收 RM 和 AM 的请求,启动具体的 Container,并通过心跳向 RM 汇报 Container 的运行情况。
Application 对应 1.X 版本中的 Job,它可以是一个 MapReduce 应用,也可以是一个 Spark 应用或 Flink 应用等。
ApplicationMaster ApplicationMaster 实际上是特定计算框架的一个实例,每种计算框架都有自己独特的 ApplicationMaster,负责与 ResourceManager 协商资源,并和 NodeManager 协同来执行和监控 Container。
Container Container 是 YARN 中的一个抽象概念,它是任务运行所需资源、环境变量、启动参数等的一个封装和抽象。一个 Application 中可以分为两类 Container:一类是上面提到的 AM,一类是具体任务的 Container,常见的任务 Container 有 MR 中的 map 任务/reduce 任务、Spark 中的 executor、Flink 的 taskManager。

3.2 组件功能

a. ResourceManager

YARN 集群中的主角色,决定系统中所有应用程序之间资源分配的最终权限,即最终仲裁者。接收用户的作业提交,并通过 NM 分配、管理各个机器上的计算资源。主要由两个组件构成:「调度器」和「应用程序管理器」。

  • 调度器(Scheduler):根据容量、队列等限制条件(如每个队列分配一定的资源,最多执行一定数量的作业等),将系统中的资源分配给各个正在运行的应用程序。
    • 需要注意的是,该调度器是一个“纯调度器”,它不再从事任何与具体应用程序相关的工作,比如不负责监控或者跟踪应用的执行状态等,也不负责重新启动因应用执行失败或者硬件故障而产生的失败任务,这些均交由应用程序相关的 ApplicationMaster 完成。
    • 调度器仅根据各个应用程序的资源需求进行资源分配,而「资源分配单位」用一个抽象概念“资源容器”(Resource Container,简称 Container)表示, Container 是一个动态资源分配单位,它将内存、 CPU、磁盘、网络等资源封装在一起,从而限定每个任务使用的资源量。
    • 此外,该调度器是一个可插拔的组件,用户可根据自己的需要设计新的调度器, YARN 提供了多种直接可用的调度器,比如 FairScheduler 和 Capacity Scheduler 等。
  • 应用程序管理器(Applications Manager):负责管理整个系统中所有应用程序,包括应用程序提交、与调度器协商资源以启动 ApplicationMaster、监控 ApplicationMaster 运行状态并在失败时重新启动它等。

b. NodeManager

YARN 中的从角色,一台机器上一个,负责管理本机器上的计算资源。

NM(NodeManager) 是每个节点上的资源和任务管理器,一方面,它会定时地向 RM 汇报本节点上的资源使用情况和各个 Container 的运行状态;另一方面,它接收并处理来自 AM 的 Container 启动/停止等各种请求。

c. ApplicationMaster

ApplicationMaster 实际上是特定计算框架的一个实例,每种计算框架都有自己独特的 ApplicationMaster,负责与 ResourceManager 协商资源,并和 NodeManager 协同来执行和监控 Container。

MapReduce 是运行在 YARN 上的其中一种计算框架,而 MRAppMaster 则是 MapReduce 的 ApplicationMaster 实现,它使得 MapReduce 计算框架可以运行于 YARN 之上。

用户提交的每个应用程序均包含一个 AM,它是应用程序内的“老大”,负责程序内部各阶段的资源申请、监督程序的执行情况。其主要职责包括:

  • 与 RM 调度器协商以获取资源(用 Container 表示)
  • 将得到的任务进一步分配给内部的任务
  • 与 NM 通信以启动/停止任务
  • 监控所有任务运行状态,并在任务运行失败时重新为任务申请资源以重启任务

当前 YARN 自带了两个 AM 实现:

  1. 用于演示 AM 编写方法的示例程序 — distributedshell,它可以申请一定数目的 Container 以并行运行一个 Shell 命令或 Shell 脚本 ;
  2. 运行 MapReduce 应用程序的 AM — MRAppMaster。

此外,一些其他的计算框架对应的 AM 正在开发中,比如 Spark、Flink 等。

d. Container

Container 是 YARN 中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等,从而限定每个任务使用的资源量。当 AM 向 RM 申请资源时, RM 为 AM 返回的资源便是用 Container 表示的。 YARN 会为每个任务分配一个 Container,且该任务只能使用该 Container 中描述的资源。

需要注意的是, Container 不同于 MRv1 中的 slot,它是一个动态资源划分单位,是根据应用程序的需求动态生成的。当下, YARN 仅支持 CPU 和内存两种资源,且使用了轻量级资源隔离机制 Cgroups 进行资源隔离。

3.3 通信协议

RPC 协议是连接各个组件的“大动脉”,了解不同组件之间的 RPC 协议有助于更深入地理解学习 YARN 框架。在 YARN 中,任何两个需相互通信的组件之间仅有一个 RPC 协议,而对于任何一个 RPC 协议,通信双方有一端是 Client,另一端为 Server,且 Client 总是主动连接 Server 的,因此,YARN 实际上采用的是拉式(pull-based)通信模型

如上图所示,箭头指向的组件是 RPC Server,而箭头尾部的组件是 RPC Client, YARN 主要由以下几个 RPC 协议组成:

通信双方 协议 描述
JobClient => RM ApplicationClientProtocol JobClient 通过该 RPC 协议提交应用程序、查询应用程序状态等。
Admin => RM ResourceManagerAdministrationProtocol Admin 通过该 RPC 协议更新系统配置文件,比如节点黑白名单、用户队列权限等。
AM => RM ApplicationMasterProtocol AM 通过该 RPC 协议向 RM 注册和撤销自己,并为各个任务申请资源。
AM => NM ContainerManagementProtocol AM 通过该 RPC 要求 NM 启动或者停止 Container,获取各个 Container 的使用状态等信息。
NM => RM ResourceTracker NM 通过该 RPC 协议向 RM 注册,并定时发送心跳信息汇报当前节点的资源使用情况和 Container 运行情况。

为了提高 Hadoop 的向后兼容性和不同版本之间的兼容性, YARN 中的序列化框架采用了 Google 开源的 Protocol Buffers。

3.4 工作流程

运行在 Hadoop YARN 上的应用程序主要分为两类 :短应用程序和长应用程序。

  • 短应用程序:指一定时间内(可能是秒级、分钟级或小时级,尽管天级别或者更长时间的也存在,但非常少)可运行完成并正常退出的应用程序,比如 MapReduce 作业、 Spark 作业等;
  • 长应用程序:指不出意外,永不终止运行的应用程序,通常是一些服务,比如 Storm Service(主要包括 Nimbus 和 Supervisor 两类服务), Flink(包括 JobManager 和 TaskManager 两类服务) 等,而它们本身作为一个框架提供了编程接口供用户使用。

【工作流程1】

尽管这两类应用程序作用不同,一类直接运行数据处理程序,一类用于部署服务(服务之上再运行数据处理程序),但运行在 YARN 上的流程是相同的。当用户向 YARN 中提交一个应用程序后, YARN 将分两个阶段运行该应用程序 :第 1 个阶段是启动 ApplicationMaster ;第 2 个阶段是由 ApplicationMaster 创建应用程序,为它申请资源,并监控它的整个运行过程,直到运行完成。

如上图所示, YARN 的工作流程分为以下几个步骤:

  1. 用户向 YARN 中提交应用程序,其中包括 ApplicationMaster 程序、启动 ApplicationMaster 的命令、用户程序等;
  2. ResourceManager 为该应用程序分配第一个 Container,并与对应的 NodeManager 通信,要求它在这个 Container 中启动应用程序的 ApplicationMaster;
  3. ApplicationMaster 首先向 ResourceManager 注册,这样用户可以直接通过 ResourceManage 查看应用程序的运行状态,然后它将为各个任务申请资源,并监控它的运行状态,直到运行结束,即重复步骤 4~7;
  4. ApplicationMaster 采用轮询的方式通过 RPC 协议向 ResourceManager 申请和领取资源;
  5. 一旦 ApplicationMaster 申请到资源后,便与对应的 NodeManager 通信,要求它启动任务;
  6. NodeManager 为任务设置好运行环境(包括环境变量、 JAR 包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务;
  7. 各个任务通过某个 RPC 协议向 ApplicationMaster 汇报自己的状态和进度,以让 ApplicationMaster 随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。在应用程序运行过程中,用户可随时通过 RPC 向 ApplicationMaster 查询应用程序的当前运行状态;
  8. 应用程序运行完成后,ApplicationMaster 向 ResourceManager 注销并关闭自己。

【工作流程2】

3.5 作业提交全过程

(1)作业提交

  1. Client 调用 job.waitForCompletion() 向整个集群提交 MapReduce 作业;
  2. Client 向 RM 申请一个作业 id;
  3. RM 给 Client 返回该 job 资源的提交路径和作业 id;
  4. Client 提交 jar 包、切片信息和配置文件到指定的资源提交路径;
  5. Client 提交完资源后,向 RM 申请运行 MrAppMaster。

(2)作业初始化

  1. 当 RM 收到 Client 的请求后,将该 Job 添加到容量调度器中;
  2. 某一个空闲的 NM 领取到该 Job;
  3. 该 NM 创建 Container,并产生 MRAppmaster;
  4. 下载 Client 提交的资源到本地。

(3)任务分配

  1. MrAppMaster 向 RM 申请运行多个 MapTask 任务资源;
  2. RM 将运行 MapTask 任务分配给另外两个 NodeManager,另两个 NodeManager 分别领取任务并创建容器。

(4)任务运行

  1. MR 向两个接收到任务的 NodeManager 发送程序启动脚本,这两个 NodeManager 分别启动 MapTask,MapTask 对数据分区排序;
  2. MrAppMaster 等待所有 MapTask 运行完毕后,向 RM 申请容器,运行 ReduceTask;
  3. ReduceTask 向 MapTask 获取相应分区的数据;
  4. 程序运行完毕后,MR 会向 RM 申请注销自己。

(5)进度和状态更新

YARN 中的任务将其进度和状态(包括 Counter)返回给应用管理器, 客户端每秒(通过 mapreduce.client.progressmonitor.pollinterval 设置)向应用管理器请求进度更新, 展示给用户。

(6)作业完成

除了向应用管理器请求作业进度外,客户端每 5 秒都会通过调用 waitForCompletion() 来检查作业是否完成。时间间隔可以通过 mapreduce.client.completion.pollinterval 来设置。作业完成之后,应用管理器和 Container 会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。


YARN 正是一个资源管理系统,它的出现弱化了计算框架之争,引入 YARN 这一层后,各种计算框架可各自发挥自己的优势,并由 YARN 进行统一管理,进而运行在一个大集群上。截至本书出版时,各种开源系统都在开发 YARN 版本,包括 MapReduce、 Spark、 Storm、 Flink 等。

4. YARN WebUI

与 HDFS 一样,YARN 也提供了一个 WebUI 服务,可以使用 YARN Web 用户界面监视群集、队列、应用程序、服务、流活动和节点信息。还可以查看集群详细配置的信息,检查各种应用程序和服务的日志。

浏览器输入 http://node2:8088 访问 YARN WebUI 服务,页面打开后,以列表形式展示已经运行完成的各种应用程序,如 MapReduce 应用、Spark 应用、Flink 应用等,与点击页面左侧 Application 栏目红线框 Applications 链接显示的内容一致。

当点击任意一个应用程序时,会打开一个新页面,并展示这个应用程序的运行信息。以 MR 应用为例,如果应用程序正在运行/已经运行完成,打开的页面如下图所示。

4.1 JobHistoryServer

YARN 中提供了一个叫做 JobHistoryServer 的守护进程,它属于 YARN 集群的一项系统服务,仅存储已经运行完成的 MapReduce 应用程序的作业历史信息,并不会存储其他类型(如 Spark、Flink 等)应用程序的作业历史信息。

当启用 JobHistoryServer 服务时,仍需要开启「日志聚合功能」,否则每个 Container 的运行日志是存储在 NodeManager 节点本地,查看日志时需要访问各个 NodeManager 节点,不利于统一管理和分析。

当开启日志聚合功能后 AM 会自动收集每个 Container 的日志,并在应用程序完成后将这些日志移动到文件系统(如 HDFS),然后通过 JHS 的 WebUI 服务来提供用户使用和应用恢复。

a. 启动步骤

(1)启用 JHS 服务

在 mapred-site.xml 文件中配置指定 JobHistoryServer 服务地址和端口号,具体操作如下。

<property>
  <name>yarn.log-aggregation-enable</name>
  <value>true</value>
</property>
<property>
  <name>yarn.nodemanager.remote-app-log-dir</name>
  <value>/app-logs</value>
</property>
<property>
  <name>yarn.log.server.url</name>
  <value>http://node1:19888/jobhistory/logs</value>
</property>

同步 mapred-site.xml 文件到集群其他机器。

(2)启用日志聚合

首先配置运行在 YARN 上应用的日志聚集功能,当应用运行完成,将日志相关信息上传至 HDFS 文件系统,编辑文件 yarn-site.xml 添加属性配置。

<!-- 开启yarn日志聚集功能,收集每个容器的日志集中存储在一个地方 -->
<property>
  <name>yarn.log-aggregation-enable</name>
  <value>true</value>
</property>

<!-- 聚合日志后在hdfs的存放地址 -->
<property>
  <name>yarn.nodemanager.remote-app-log-dir</name>
  <value>/app-logs</value>
</property>

<!-- 日志保留时间设置为一天 -->
<property>
  <name>yarn.log-aggregation.retain-seconds</name>
  <value>86400</value>
</property>

<!-- web访问地址 -->
<property>
  <name>yarn.log.server.url</name>
  <value>http://node1:19888/jobhistory/logs</value>
</property>

同步 yarn-site.xml 文件到集群其他机器。

(3)启动 JHS 服务

在上述配置中指定的 JHS 服务位于 node1 节点上,在 node1 中启动 JobHistoryServer 服务。

$ mr-jobhistory-daemon.sh start historyserver
$ mapred --daemon start historyserver

浏览器查看:http://node1:19888/jobhistory

b. 管理 MR

当提交运行 MapReduce 程序在 YARN 上运行完成以后,将应用运行日志数据上传到 HDFS 上,此时 JobHistoryServer 服务可以从 HDFS 上读取运行信息,在 WebUI 进行展示,具体流程如下。

(1)使用 yarn jar 提交运行官方自带词频统计 WordCount 程序到 YARN 上运行,命令如下。

$ yarn jar \
/export/server/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.4.jar \
wordcount \
/data/input.txt /data/output

(2)MR运行历史信息

MR 应用程序在运行时,是通过 AM(MRAppMaster)将日志写到 HDFS 中,会生成 .jhist.summary_conf.xml 文件。其中 .jhist 文件是 MR 程序的计数信息,.summary 文件是作业的摘要信息,_conf.xml 文件是 MR 程序的配置信息。

yarn.app.mapreduce.am.staging-dir=/tmp/hadoop-yarn/staging(默认)
mapreduce.jobhistory.intermediate-done-dir=/mr-history/intermediate(配置)
mapreduce.jobhistory.intermediate-done-dir=/mr-history/intermediate(配置)
mapreduce.jobhistory.intermediate-done-dir=/mr-history/done(配置)
mapreduce.jobhistory.cleaner.enable=true(默认)
mapreduce.jobhistory.cleaner.interval-ms=86400000(1天)
mapreduce.jobhistory.max-age-ms=86400000(1天)
  • MR 应用程序启动时的资源信息:MR 应用程序启动时,会把作业信息存储到 ${yarn.app.mapreduce.am.staging-dir}/${user}/.staging/${job_id} 目录下;
  • MR 应用程序运行完成时生成的信息:MR 应用程序运行完成后,作业信息会被临时移动到 ${mapreduce.jobhistory.intermediate-done-dir}/${user} 目录下。
  • MR 应用程序最终的作业信息:等待 ${mapreduce.jobhistory.move.interval-ms} 配置项的值(默认180000ms=3min)后,会把 ${mapreduce.jobhistory.intermediate-done-dir}/${user} 下的作业数据移动到 ${mapreduce.jobhistory.done-dir}/${year}/${month}/${day}/${serialPart} 目录下。此时 .summary 文件会被删除,因为 .jhist 文件提供了更详细的作业历史信息。
  • JHS 服务中的作业历史信息不是永久存储的,在默认情况下,作业历史清理程序默认按照 86400000ms(1天)的频率去检查要删除的文件,只有在文件早于 mapreduce.jobhistory.max-age-ms(1天)时才进行删除。JHS 的历史文件的移动和删除操作由 HistoryFileManager 类完成。

c. 运行流程

  1. 客户端提交 MR 应用程序到 RM;
  2. /tmp/logs/<user>/logs/application_timestamp_xxxx 中创建应用程序文件夹;
  3. MR 作业在群集上的 YARN 中运行;
  4. MR 作业完成,在提交作业的作业客户上报告作业计数器;
  5. 将计数器信息(.jhist文件)和j ob_conf.xml 文件写入 /user/history/done_intermediate/<user>/job_timestamp_xxxx
  6. 然后将 .jist 文件和 job_conf.xml从/user/history/done_intermediate/<user>/ 移动到 /user/history/done 目录下;
  7. 来自每个 NM 的 Container 日志汇总到 /tmp/logs/<用户ID>/logs/application_timestamp_xxxx

4.2 TimelineServer

由于 Job History Server 仅对 MapReduce 应用程序提供历史信息支持,其他应用程序的历史信息需要分别提供单独的 HistoryServer 才能查询和检索。例如 Spark 的 Application 需要通过 Spark 自己提供的 org.apache.spark.deploy.history.HistoryServer 来解决应用历史信息。

为了解决这个问题,YARN 新增了 Timeline Server 组件,以通用的方式存储和检索应用程序当前和历史信息。

到目前,有 V1、V1.5 和 V2 共三个版本,V1 仅限于写入器/读取器和存储的单个实例,无法很好地扩展到小型群集之外;V2 还处于 alpha 状态,所以在本章以 V1.5 进行讲解。

版本 说明
V1 基于 LevelDB 实现
V1.5 在 V1 的基础上改进了扩展性
V2 1. 使用更具扩展性的分布式写入器体系结构和可扩展的后端存储。2. 将数据的收集(写入)与数据的提供(读取)分开:它使用分布式收集器,每个 YARN 应用程序实质上是一个收集器;读取器是专用于通过 REST API 服务查询的单独实例。3. 使用 HBase 作为主要的后备存储,因为 Apache HBase 可以很好地扩展到较大的大小,同时保持良好的读写响应时间。4. 支持在流级别汇总指标。

官方文档:

http://hadoop.apache.org/docs/r3.1.4/hadoop-yarn/hadoop-yarn-site/TimelineServer.html

http://hadoop.apache.org/docs/r3.1.4/hadoop-yarn/hadoop-yarn-site/TimelineServiceV2.html

YARN Timeline Service V2 服务架构图如下:

a. 启用 Timeline

在 yarn-site.xml 中添加如下属性,启动 Timeline Server 服务功能:

<property>
  <name>yarn.timeline-service.enabled</name>
  <value>true</value>
  <description>开启YARN Timeline服务</description>
</property>
<property>
  <name>yarn.timeline-service.hostname</name>
  <value>node2</value>
  <description>设置YARN Timeline服务地址</description>
</property>
<property>
  <name>yarn.timeline-service.address</name>
  <value>node2:10200</value>
  <description>设置YARN Timeline服务启动RPC服务器的地址,默认端口10200</description>
</property>
<property>
  <name>yarn.timeline-service.webapp.address</name>
  <value>node2:8188</value>
  <description>设置YARN Timeline服务WebUI地址</description>
</property>

<property>
  <name>yarn.resourcemanager.system-metrics-publisher.enabled</name>
  <value>true</value>
  <description>设置RM是否发布信息到Timeline服务器</description>
</property>
<property>
  <name>yarn.timeline-service.generic-application-history.enabled</name>
  <value>true</value>
  <description>设置是否Timelinehistory-servic中获取常规信息,如果为否,则是通过RM获取</description>
</property>

同步 yarn-site.xml 文件到集群其他机器,并重启 YARN 服务。

b. 启动 Timeline

在上述配置中指定的 Timeline 服务位于 node2 节点上,需要在 node2 节点的 shell 客户端中启动,如果在非 node2 节点上启动时会报错。

启动命令:yarn --daemon start timelineserver

浏览器输入:http://node2:8188/applicationhistory