16-Yarn(3)

发布时间 2023-07-29 21:54:53作者: tree6x7

1. YARN 应用开发流程

YARN 作为通用的资源管理和任务调度平台。理论上,任何类型的程序只要符合规范就可以申请在 YARN 上运行。默认 YARN 实现了 MapReduce 程序的运行支持。

所谓的 YARN 应用开发指的就是如何让自己的程序也可以根据规范在 YARN 申请资源运行。通过学习 YARN 应用开发,也可以更好的理解掌握程序在 YARN 上的提交交互流程。

1.1 概述

组件 说明
ResourceManager 负责整个集群的资源管理和分配,处理客户端和 AM 的请求,为 Container 分配资源(将任务调度到不同的NM上运行),同时通过 NM 的心跳获取所有 Container 的运行状态,并进行必要的失败重试。
NodeManager 集群资源(CPU 和内存)的实际拥有者,接收 RM 和 AM 的请求,启动具体的 Container,并通过心跳向 RM 汇报 Container 的运行情况。
Application 对应 1.X 版本中的 Job,它可以是一个 MapReduce 应用,也可以是一个 Spark 应用或 Flink 应用等。
ApplicationMaster MRAppMaster 是 MapReduce 的 ApplicationMaster 实现,它使得 MapReduce 计算框架可以运行于 YARN 之上。在 YARN 中,MRAppMaster 负责管理 MapReduce 作业的生命周期,包括创建 MapReduce 作业、向 RM 申请资源、与 NM 通信要求其启动 Container、监控作业的运行状态、当任务失败时重新启动任务等。
Container Container 是 YARN 中的一个抽象概念,它是任务运行所需资源、环境变量、启动参数等的一个封装和抽象。一个 Application 中可以分为两类 Container:一类是上面提到的 AM,一类是具体任务的 Container,常见的任务 Container 有 MR 中的 map 任务/reduce 任务、Spark 中的 executor、Flink 的 taskManager。

YARN 的应用开发主要过程如下:

用户在 YARN 上开发应用时,需要实现如下三个模块:

模块 说明
Application Client Application 客户端用于将 Application 提交到 YARN 上,使 Application 运行在 YARN 上,同时监控 Application 的运行状态,控制 Application 的运行。
Application Master AM 实际上是特定计算框架的一个实例,每种计算框架都有自己独特的 AM,负责与 RM 协商资源,并和 NM 协同来执行和监控 Container。AM 负责整个 Application 的运行控制,包括向 YARN 注册 Application、申请资源、启动容器等, Application 的实际工作是在容器中进行。
Application Worker (Task) Application 的实际工作(e.g. 运行 MR 程序)。NodeManager 启动 AM 发送过来的容器,容器内部封装了该 Application Worker 运行所需的资源和启动命令。

实现上述模块,涉及如下 3 个 RPC 协议:

  • ApplicationClientProtocol:Client-RM 之间的协议,主要用于 Application 的提交;
  • ApplicationMasterProtocol:AM-RM 之间的协议,AM 通过该协议向 RM 注册并申请资源;
  • ContainerManagementProtocol:AM-NM 之间的协议,AM 通过该协议控制 NM 启动容器。

【小结】从业务的角度看,YARN 上应用开发需要分两部分进行开发:

  • 接入 YARN 平台,实现 3 个 RPC 协议,通过 YARN 实现对集群资源的访问和利用;
  • 业务功能实现,这个与 YARN 本身没有太大关系;

1.2 Client 开发

客户端的主要作用是「提交(部署)应用」和「监控应用运行」两个部分:

a. 提交应用

提交应用涉及 ApplicationClientProtocol 协议中的两个方法:

// 从RM上获取全局唯一的应用ID和最大可申请的资源量(内存和虚拟CPU核数)
GetNewApplicationResponse getNewApplication(GetNewApplicationRequest request)
// 在获取应用程序ID后,客户端封装应用相关的配置到ApplicationSubmissionContext中,通过submitApplication方法提交到RM上
SubmitApplicationResponse submitApplication(SubmitApplicationRequest request)

具体步骤如下:

  1. Client 通过 RPC 函数 ApplicationClientProtocol#getNewApplication() 从 ResourceManager 中获取唯一的 Application ID;
  2. Client 通过 RPC 函数 ApplicationClientProtocol#submitApplication(所有信息都封装在这个参数里) 将 ApplicationMaster 提交到 ResourceManager 上;
  3. RM 根据 ApplicationSubmissionContext 上封装的内容启动 AM;
  4. 客户端通过 AM 或 RM 获取应用的运行状态,并控制应用的运行过程。

b. 监控应用运行状态

应用监控涉及 ApplicationClientProtocol 协议中的如下几个方法:

// 强制杀死一个应用
KillApplicationResponse forceKillApplication(KillApplicationRequest request)
// 获取应用状态,如进度等
GetApplicationReportResponse getApplicationReport(GetApplicationReportRequest request)
// 获取集群度量
GetClusterMetricsResponse getClusterMetrics(GetClusterMetricsRequest request)
// 获取符合条件的应用的状态(列表)
GetApplicationsResponse getApplications(GetApplicationsRequest request)
// 获取集群中各个节点的状态
GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
// 获取RM中的队列信息
GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)

// 还包括获取用户权限列表和访问权限等方法...

客户端既可以从 RM 上获取应用的信息,也可以通过 AM 获取。通常为了减少 RM 的压力,使用从 AM 获取应用运行状态的方式。客户端与 AM 之间的通信使用应用内部的私有协议,与 YARN 无关。

1.3 AppMaster 开发

AM 的主要功能是按照业务需求,从 RM 处申请资源,并利用这些资源完成业务逻辑。因此,AM 既需要与 RM 通信,又需要与 NM 通信,涉及两个协议,分别是 AM-RM 协议(ApplicationMasterProtocol)和 AM-NM 协议(ContainerManagementProtocol),如下图所示:

a. AM 与 RM 交互

AM-RM 之间使用 ApplicationMasterProtocol 协议进行通信,该协议提供如下几个方法:

// 向 RM 注册 AM
RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request)
// 告知 RM,Application 已经结束
FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request)
// 向 RM 申请/归还资源,维持心跳
AllocateResponse allocate(AllocateRequest request)
  • 客户端向 RM 提交应用后,RM 会根据提交的信息,分配一定的资源来启动 AM,AM 启动后调用 ApplicationMasterProtocol 协议的 registerApplicationMaster 方法主动向 RM 注册;
  • 完成注册后,AM 通过 ApplicationMasterProtocol 协议的 allocate() 方法向 RM 申请运行任务的资源,获取资源后,通过 ContainerManagementProtocol 在 NM 上启动资源容器,完成任务;
  • 应用完成后,AM 通过 ApplicationMasterProtocol 协议的 finishApplicationMaster() 方法向 RM 汇报应用的最终状态,并注销 AM。

具体看一下每一步所传递的信息:

(1)AM 向 RM 注册

  • AM 启动后会主动调用 registerApplicationMaster() 方法向 RM 注册,注册信息中包括该 AM 所在节点和开放的 RPC 服务端口,以及一个应用状态跟踪 Web 接口(将在 RM 的 Web 页面上显示);
  • RM 向 AM返 回一个对象,里面包含了应用最大可申请的单个容器容量、应用访控制列表和一个用于与客户端通信的安全令牌。

(2)AM 向 RM 申请资源

  • AM 通过 allocate() 方法向 RM 申请或释放资源。AM 向 RM 发送的信息被封装在 AllocateRequest 里;RM 接收到 AM 的请求后,扫描其上的资源镜像,按照调度算法分配全部或部分申请的资源给 AM,返回一个 AllocateResponse 对象;
  • 需要注意的是 allocate() 方法还兼顾维持 AM~RM 心跳的作用,因此,即便应用运行过程中有一段时间无需申请任何资源,AM都需要周期性的调用相应该方法,以避免触发 RM 的容错机制。

(3)AM 通知 RM 应用已结束

  • 在应用完成后,AM 通知 RM 应用结束的消息,同时向 RM 提供应用的最终状态(成功/失败等)、一些失败时的诊断信息和应用跟踪地址,RM 收到通知后注销相应的 AM,并将注销结果发送给 AM,AM 收到注销成功的消息后,退出进程;
  • AM 通过调用 ApplicationMasterProtocol#finishApplicationMaster() 方法通知 RM。

b. AM 与 NM 交互

AM 通过 ContainerManagementProtocol 协议与 NM 交互,包括 3 个方面的功能:启动容器、查询容器状态、停止容器,分别对应协议中的三个方法:

// 启动 Container
StartContainersResponse startContainers(StartContainersRequest request)
// 查询 Container 状态
GetContainerStatusesResponse getContainerStatuses(GetContainerStatusesRequest request)
// 停止 Container
StopContainersResponse stopContainers(StopContainersRequest request)

AM-NM 交互过程:

(1)AM 在 NM 上启动容器

  • AM 通过 ContainerManagementProtocol#startContainers() 方法启动一个 NM 上的容器,AM 通过该接口向 NM 提供启动容器的必要配置,包括分配到的资源、安全令牌、启动容器的环境变量和命令等,这些信息都被封装在 StartContainersRequest 中;
  • NM 收到请求后,会启动相应的容器,并返回启动成功的容器列表和失败的容器列表,同时还返回其上相应的辅助服务元数据。

(2)AM 查询 NM 上的容器运行状态

  • 在应用运行期间,AM 需要实时掌握各个 Container 的运行状态,以便及时响应一些异常,如容器运行失败等;
  • AM 通过 ContainerManagementProtocol#getContainerStatuses() 方法获取各个容器的运行状态。

(3)AM 停止 NM 上的容器

  • 当一个容器运行完成后,分配给它的资源需要被回收。
  • AM 通过 ContainerManagementProtocol#stopContainers() 方法停止 NM 上的容器,释放相关资源;然后通过 AM-RM 协议,将释放的资源上报给 RM,RM 完成最终的资源回收。

2. YARN 编程库

YARN 上的应用开发分为平台接入和业务开发两个部分,其中平台接入就是实现上述三个 RPC 协议。直接实现上述协议的开发难度较高,需要处理很多细节和性能问题,如系统并发等。为此,YARN 提供了一套应用程序编程库来简化应用的开发过程,该编程库是基于「事件驱动机制」的,利用了 YARN 内部的服务库、事件库和状态机库,分为三个部分,与上述三个协议一一对应。

2.1 YARN 基础库

在 YARN 基础库中分为服务库、事件库和状态机库,具体说明如下。

a. 服务库

YARN 中普遍采用基于服务的对象管理模型,将一些生命周期较长的对应服务化,YARN 提供一套抽象的接口对服务进行了统一描述,该服务具有如下特点:

  • 具有标准状态,所有服务都具有 4 个状态:NOTINITED、INITED、STARTED、STOPPED;
  • 状态驱动,服务状态变化将触发一些动作,使其转变成另一种状态;
  • 服务嵌套,一个服务可以由其他服务组合嵌套而来;

b. 事件库

YARN 中大量采用了基于事件(Event)驱动的并发模型,该模型由事件、异步调度器和事件处理器三个模块组成。

  • 处理请求被抽象为事件,放入异步调度器的事件队列中;
  • 调度线程从事件队列中取出事件分发给不同的事件处理器;
  • 事件处理器处理事件,产生新的事件放入事件队列,如此循环,直到处理完成(完成事件)。

c. 状态机库

YARN中使用 { 转换前状态、转换后状态、事件、回调函数 } 四元组来表示一个状态变换,一个或多个事件的到来,触发绑定在对象上状态转移函数,使对象的状态发生变化。状态机使得事件处理变得简单可控。

总的来说,YARN 中的服务由一个或多个含有有限状态机的事件处理系统组成,总体框架如下。

2.2 YARN 编程库

a. Client 编程库

YARN 的 Client-RM 编程库位于 org.apache.hadoop.yarn.client.YarnClient(hadoop-yarn-api 项目),该库实现了通用的 ApplicationClientProtocol 协议,提供了重试机制。用户利用该库可以快速开发 YARN 应用的客户端程序,而不需要关心 RPC 等底层接口。针对 MR 程序,YARN 默认实现是 YarnClientImpl 类。

用户开发自己的应用客户端时,只要设置好 ApplicationSubmissionContext 对象,调用 YarnClient 的相关接口,即可实现应用的提交。

b. AM&RM 编程库

AM-RM 编程库主要简化了 AM 向 RM 申请资源过程的开发。YARN 提供了两套 AM-RM 编程库,分别为「阻塞式」和「非阻塞式」模式。

  • AMRMClient 是阻塞式的,实现了A pplicationMasterProtocol 协议,用户调用该类的相应接口,可实现 AM 与 RM 的通信,针对 MR 程序,默认实现;
  • AMRMClientAsync 是 AMRMClient 的非阻塞式封装,所有响应通过回调函数的形式返回给用户,用户实现自己的 AM 时,只需要实现 AMRMClientAsync 的 CallbackHandler 即可。

c. NM 编程库

NM 编程库对 AM 和 RM 与 NM 之间的交互进行了封装,同样有「阻塞式」和「非阻塞式」两种封装(AM 与 NM 和 RM 与 NM 的交互逻辑相似)。同样的,对于异步编程库 NMClientAsync,用户只需要在自己的 AM 上实现相应的回调函数,就可以控制 NM 上 Container 的启动/停止和状态监控了。

总得来说,YARN 是一个资源管理平台,并不涉及业务逻辑,具体的业务逻辑需要用户自己去实现。YARN 的核心作用就是分配资源、保证资源隔离。

3. YARN 应用运行流程

首先回顾,提交 MapReduce 程序到 YARN 集群运行机制,如下图所示:

(1)MapReduce 程序提交到客户端所在的节点,使用 yarn jar 命令提交运行。

(2)Client 客户端向 ResourceManager 申请运行一个 Application。

(3)当 RM 接收到请求后,生成 Application 资源提交路径 hdfs://.../.staging 以及 application_id,并返回给客户端 Client。

(4)Client 客户端提交 Job 运行所需资源到资源提交路径。

(5)当 Client 客户端资源提交完毕,申请运行 MRAppMaster。

(6)当 RM 接收请求以后,将用户的请求初始化成一个 Task,将其放到队列中(Apache Hadoop YARN 默认使用 CapacityScheduler 容器调度器,将 Task 任务放入某个队列 Queue 中,等待后续执行)。

(7)当 NM 中有资源时,RM 向其发送指令,此时 NM 领取到 Task 任务,准备启动 Container 运行 AppMaster。

(8)NM 创建容器 Contanier,容器中包含相关资源(比如 CPU、内存等),在其中运行 MRAppMaster。

(9)MRAppMaster 启动以后,依据资源提交路径,下载 Job 资源到本地(也就是 MRAppMaster 所运行 NodeManager 节点临时目录中)。

(10)MRAppMaster 获取 Job 运行资源信息以后,计算此 MapReduce 任务运行所需要的 MapTask 任务个数,再向 RM 申请资源以创建 MapTask 容器。

(11)RM 接收到 MRAppMaster 请求后,将这些 Task 任务同样放到队列 Queue 中,当 NM 中有资源时,RM 依然向 NM 发送指令,领取到 Task 任务,创建容器 Contanier。

(12)当 NM 中容器 Contanier 创建完成以后,MRAppMaster 将运行 MapTask 任务的程序脚本发送给 Contanier 容器,最后在容器 Contanier 中启动 YarnChild 进程运行 MapTask 任务,处理数据(Container 内 MapTask 执行期间,AM 会通过和 NM 通信来获取容器状态,监督任务执行情况)。

(13)当 MapTask 任务运行完成以后,MRAppMaster 再向 RM 申请资源,...省略...,在 NM 中创建 ReduceTask 任务运行的容器,启动 YarnChild 进程运行 ReduceTask 任务,拉取 MapTask 处理数据,进行聚合操作。

(14)当 MapReduce 应用程序运行完成以后,向 RM 注销自己,释放资源,至此整个应用运行完成。

4. YARN 源码流程

4.1 Client 提交应用到 YARN

a. Job 提交

MapReduce 程序最后执行 job.waitForCompletion() 时,表示应用提交执行等待完成,提交应用时,方法调用链如下:

job.waitForCompletion -> submit -> submitter.submitJobInternal -> submitJob

(1)Job#waitForCompletion

(2)Job#submit

(3)JobSubmmiter#submitJobInternal

b. 创建应用上下文

submitClient 为 ClientProtocol 实例对象,有 2 个实现子类:LOCAL 模式运行和 YARN 集群运行。

(1)submitClient#submitJob

(2)YARNRunner#submitJob

(3)YARNRunner#createApplicationSubmissionContext

构建 MR AppMaster 运行环境,主要包括:

  1. 设置本地资源:Job 配置文件、Job Jar 包及提交运行工作目录等
  2. 设置容器启动上下文:启动 AppMaster 进程 java 命令和运行日志存储等
  3. 应用提交上下文设置:比如设置 ApplicationID 和运行队列 Queue 等

c. 提交应用

在 YARNRunner#submitJob 方法中,应用提交上下文构建完成后,进行应用提交。

(1)ResourceMgrDelegate#submitApplication

(2)YARNClientImpl#submitApplication

rmClient 是一个 ApplicationClientProtocol 类对象,这是一个 RPC 的接口协议,对应的实现类 ApplicationClientProtocolPBClientImpl。

(3)ApplicationClientProtocolPBClientImpl#submitApplication

此时 ApplicationClientProtocolPBServiceImpl 类属于客户端 Client 实现类,所在包为 org.apache.hadoop.yarn.api.impl.pb.client。

RPC Client 端 proxy.submitApplication() 对应的 RPC Server 端的方法数为:ApplicationClientProtocolPBServiceImpl#submitApplication(),这是对称的关系,都实现了 ApplicationClientProtocalPB 接口。

(4)ApplicationClientProtocolPBServiceImpl#submitApplication

此时 ApplicationClientProtocolPBServiceImpl 类属于服务端 Service 实现类,所在包为 org.apache.hadoop.yarn.api.impl.pb.service。

ApplicationClientProtocolPBServiceImpl 类对象由 ClientRMService 构建,属于 RM 端的服务类,专门用于服务 Client,包括 Client 的作业提交、作业查询等服务。

(5)ClientRMService#submitApplication

由 ClientRMService.submitApplication() 直接把作业交给 RMAppManager 类的对象 rmAppManager 进行提交,这也是作业最终上岸了!接下来就是 RM 的事情。


作业提交调用层次】客户端 Client 进行作业提交时,分为 Client 端和 Service 服务端 2 个层次:

4.2 YARN 启动AppMaster

MapReduce 作业提交已经到达 ResourceManager 端,并且交给 RMAppManager 进行继续运转,将此应用当做任务提交到队列 Queue 中,开始执行 MRAppMaster 任务,流程图如下。

从 MRAppMaster 类中 main 查看,启动 MRAppMaster 进程流程步骤。

a. AppMaster 初始化

ResourceManager 在启动 AppMaster 之前,先对 AppMaster 服务进行初始化操作。

(1)AbstractServie#init

(2)MRAppMaster#serviceInit

b. AppMaster 启动

(1)MRAppMaster#start

(2)MRAppMaster#serviceStart

(3)MRAppMaster#startJobs

从分发器 Dispatcher(实例为 AysncDispatcher 异步分发器)中获取事件处理器 EventHandler,处理器的实例对象 GenericEventHandler(通用事件处理器)调用 handle() 方法启动 Job 作业执行。

(4)GenericEventHandler#handle

最后将 Job 作业存放到队列 Queue 中,对于 Apache Hadoop YARN 来说,默认使用 Capacity Scheduler 容量调度器的 default 队列中,等待调度执行。

4.3 调度执行应用进程

任何一个应用提交运行至 YARN 集群,首先为应用启动 AppMaster,当启动完成以后,为每个应用启动应用进程,调度任务 Task 执行,其中不同应用对应的应用进程不一样。

针对 MapReduce 应用提交运行 YARN 上来说,当 MRAppMaster 启动以后,计算整个 Job 的 MapTask 和 ReduceTask 数量,然后向 ResourceManager 申请资源,运行 Task 任务。

无论运行 MapTask 还是 ReduceTask,都是 YarnChild 中执行 Task:

查看 YarnChild 类中 main 方法,核心源码:

Task 类 2 个实现子类:MapTask 和 ReduceTask,查看其中 run 方法,如何执行任务。

a. MapTask 任务执行

查看 MapTask 任务中 run 方法,主要判断是否是 MapReduce New API 编写程序,如果是的话直接调用 runNewMapper 方法,运行 MapTask 任务。

(1)MapTask#runNewMapper

(2)Mapper#run

b. ReduceTask 任务执行

在 MapReduce 计算引擎中,先运行 MapTask 处理每个 Split 分片数据,当完成以后告知 MRAppMaster,接着通知所有 ReduceTask 到 MapTask 输出目录拉取所属自己文件数据。

接下来,查看 ReduceTask 类中 run 方法。

(1)ReduceTask#runNewReducer

(2)Reducer#run

当 ReduceTask 运行完成后,将数据输出到外部存储引擎(如 HDFS),告知 MRAppMaster。MRAppMaster 等到所有 ReduceTask 任务运行完成后,向 ResourceManager 发送信息,要求 ResourceManager 注销自己,释放资源,以便其他应用运行使用。至此,一个 MapReduce 应用程序运行 YARN 集群完成。