06-HDFS(4)

发布时间 2023-06-29 00:32:38作者: tree6x7

1. HDFS 数据迁移解决方案

数据迁移指的是一种大规模量级的数据转移,转移的过程中往往会跨机房、跨集群 ,数据迁移规模的不同会导致整个数据迁移的周期也不尽相同 。

在 HDFS 中,同样有许多需要数据迁移的场景,比如冷热数据集群之间的数据转化, 或者 HDFS 数据的双机房备份等等。因为涉及跨机房 、跨集群,所以数据迁移不会是一个简单的操作。

1.1 数据迁移使用场景

(1)冷热集群数据同步、分类存储

(2)集群数据整体搬迁

当公司的业务迅速的发展,导致当前的服务器数量资源出现临时紧张的时候,为了更高效的利用资源,会将原 A 机房数据整体迁移到 B 机房,原因可能是 B 机房机器多,而且 B 机房本身开销较 A 机房成本低些等。

(3)数据的准实时同步

数据准实时同步的目的在于数据的双备份可用,比如某天 A 集群突然宣告不允许再使用了,此时可以将线上使用集群直接切向 B 的同步集群,因为 B 集群实时同步 A 集群数据,拥有完全一致的真实数据和元数据信息,所以对于业务方使用而言是不会受到任何影响的。

1.2 数据迁移要素考量

(1)Bandwidth 带宽

带宽用的多了,会影响到线上业务的任务运行,带宽用的少了又会导致数据同步过慢的问题。

(2)Performance 性能

是采用简单的单机程序?还是多线程的性能更佳的分布式程序?

(3)Data-Increment 增量同步

当 TB、PB 级别的数据需要同步的时候,如果每次以全量的方式去同步数据,结果一定是非常糟糕。如果仅针对变化的增量数据进行同步将会是不错的选择。可以配合 HDFS 快照等技术实现增量数据同步。

(4)Syncable 数据迁移的同步性

数据迁移的过程中需要保证周期内数据是一定能够同步完的,不能差距太大。比如 A 集群 7 天内的增量数据,我只要花半天就可以完全同步到 B 集群,然后我又可以等到下周再次进行同步。最可怕的事情在于 A 集群的 7 天内的数据,我的程序花了 7 天还同步不完,然后下一个周期又来了,这样就无法做到准实时的一致性。其实7天还是一个比较大的时间,最好是能达到按天同步。

1.3 拷贝工具 DistCp

DistCp 是 Apache Hadoop 中的一种流行工具,在 hadoop-tools 工程下,作为独立子工程存在。其定位就是用于数据迁移的,定期在集群之间集群内部备份数据(在备份过程中,每次运行 DistCp 都称为一个备份周期)。尽管性能相对较慢,但它的普及程度已经越来越高。

DistCp 底层使用 MapReduce 在群集之间或并行在同一群集内复制文件。执行复制的 MapReduce 只有 map 阶段。它涉及两个步骤:

  • 构建要复制的文件列表(称为复制列表);
  • 运行 MapReduce 作业以复制文件,并以复制列表为输入。

DistCp 优势特性:

(1)带宽限流

DistCp 可以通过命令参数 bandwidth 来为程序进行带宽限流。

(2)增量数据同步

在 DistCp 中可以通过 update 、append、diff 3 个参数实现增量同步。

参数 说明 作用
update 只拷贝不存在的文件或者目录 解决了新增文件、目录的同步
append 追加写目标路径下己存在的文件 解决已存在文件的增量更新同步
diff 通过快照的 diff 对比信息来同步源端路径与目标路径 解决删除或重命名类型文件的同步

(3)分布式特性

DistCp 底层使用 MapReduce 执行数据同步,MapReduce 本身是一类分布式程序。

distcp 命令:

$ hadoop distcp
usage: distcp OPTIONS [source_path...] <target_path>
             
 -append                # 拷贝文件时支持对现有文件进行追加写操作
 -async                 # 异步执行distcp拷贝任务
 -bandwidth <arg>       # 对每个Map任务的带宽限速
 -delete                # 删除相对于源端,目标端多出来的文件
 -diff <arg>            # 通过快照diff信息进行数据的同步                  
 -overwrite             # 以覆盖的方式进行拷贝,如果目标端文件已经存在,则直接覆盖
 -p <arg>               # 拷贝数据时,扩展属性信息的保留,包括权限信息、块大小信息等等
 -skipcrccheck          # 拷贝数据时是否跳过cheacksum的校验
 -update                # 拷贝数据时,只拷贝相对于源端,目标端不存在的文件数据

注!source_path 、target_path 需要带上地址前缀以区分不同的集群。

# 从 nn1 集群拷贝 /foo/a 路径下的数据到 nn2 集群的 /bar/foo 路径下
hadoop distcp hdfs://nn1:8020/foo/a  hdfs://nn2:8020/bar/foo

2. NameNode 安全模式

2.1 安全模式·现象

HDFS 集群在停机状态下,使用 hdfs –daemon 命令逐个进程启动集群,观察现象。

首先启动 NameNode:hdfs --daemon start namenode,然后依次执行浏览文件系统和创建文件夹操作,现象如下,发现集群可以查看目录结构但是无法做增删改操作。

打开 HDFS 集群 web 页面可以发现如下提示:已经汇报的数据块的比例没有达到阈值,而阈值为总数量块的 0.999。

接下来,启动第一台机器上的 DataNode 进程:hdfs --daemon start datanode,继续查看页面提示信息。

倒计时结束后:

2.2 安全模式·说明

Hadoop 中的「安全模式」是 NameNode 的维护状态,在此状态下 NameNode 不允许对文件系统进行任何更改,可以接受读数据请求。

在 NameNode 启动过程中,首先会从 fsimage 和 edits 日志文件加载文件系统状态。然后,等待 DataNodes 汇报可用的 block 信息。在此期间,NameNode 保持在安全模式。随着 DataNode 的 block 汇报持续进行,当整个系统达到安全标准时,HDFS 自动离开安全模式。在 NameNode 的 Web 主页上会显示安全模式是打开还是关闭。

如果 HDFS 处于安全模式下,不允许 HDFS 客户端进行任何修改文件的操作,包括上传/删除/重命名文件、创建文件夹、修改副本数等操作。

2.3 自动/手动进入离开

自动·进入/离开

HDFS 集群启动时,当 NameNode 启动成功之后,此时集群就会自动进入安全模式。

自动离开条件是由多条件控制的,相关的配置属性参数都在 hdfs-default.xml 中定义,如果需要覆盖任何值,请在 hdfs-site.xml 文件中重新覆盖定义。

配置 说明
dfs.replication 数据块应该被复制的份数;
dfs.replication.max 规定最大副本数量,默认 512;
dfs.namenode.replication.min 规定最小副本数量,默认 1;
dfs.namenode.safemode.threshold-pct 已汇报可用数据块数量占整体块数量的百分比阈值(默认 0.999f)。小于或等于 0 则表示退出安全模式之前,不要等待特定百分比的块。大于 1 的值将使安全模式永久生效。
dfs.namenode.safemode.min.datanodes 指在退出安全模式之前必须存活的 DataNode 数量,默认 0;
dfs.namenode.safemode.extension 达到阈值条件后持续扩展的时间,默认 30000ms。倒计时结束如果依然满足阈值条件,自动离开安全模式。
  1. 当小于这个比例, 那就将系统切换成安全模式,对数据块进行复制;
  2. 当大于该比例时,就离开安全模式,说明系统有足够的数据块副本数,可以对外提供服务;
  3. 小于等于 0 意味不进入安全模式,大于 1 意味一直处于安全模式。

手动·进入/离开

(1)手动获取安全模式状态信息

hdfs dfsadmin -safemode get

(2)手动进入命令(手动进入安全模式对于集群维护或者升级的时候非常有用,因为这时候 HDFS 上的数据是只读的~

hdfs dfsadmin -safemode enter

(3)手动离开命令

hdfs dfsadmin -safemode leave

3. HDFS 优化方案

3.1 短路本地读取

a. 背景

短路本地读取 Short Circuit Local Reads

在 HDFS 中,不管是 Local Reads(DFSClient 和 DataNode 在同一个节点)还是 Remote Reads(DFSClient 和 DataNode 不在同一个节点),底层处理方式都是一样的,都是先由 DataNode 读取数据,然后再通过 RPC(基于 TCP)把数据传给 DFSClient。这样处理是比较简单的,但是性能会受到一些影响,因为需要 DataNode 在中间做一次中转。

尤其 Local Reads 的时候,既然 DFSClient 和数据是在一个机器上面,那么很自然的想法,就是让 DFSClient 绕开 DataNode 自己去读取数据。

所谓的“短路”读取绕过了 DataNode,从而允许客户端直接读取文件。显然,这仅在客户端与数据位于同一机器的情况下才可行。短路读取为许多应用提供了显着的性能提升。

b. 老版本设计

HDFS-2246 这个 JIRA 中,工程师们的想法是既然读取数据 DFSClient 和数据在同一台机器上,那么 DataNode 就把数据在文件系统中的路径,从什么地方开始读(offset)和需要读取多少(length)等信息告诉 DFSClient,然后 DFSClient 去打开文件自己读取。

想法很好,问题在于配置复杂以及安全问题。

首先是配置问题,因为是让 DFSClient 自己打开文件读取数据,那么就需要配置一个白名单,定义哪些用户拥有访问 DataNode 的数据目录权限。如果有新用户加入,那么就得修改白名单。需要注意的是,这里是允许客户端访问 DataNode 的数据目录,也就意味着,任何用户拥有了这个权限,就可以访问目录下其他数据,从而导致了安全漏洞。因此,这个实现已经不建议使用了。

c. 安全性改进

在 HDFS-347 中,提出了一种新的解决方案,让短路本地读取数据更加安全。

在 Linux 中,有个技术叫做 Unix Domain Socket。Unix Domain Socket 是一种进程间的通讯方式,它使得同一个机器上的两个进程能以 Socket 的方式通讯。它带来的另一大好处是,利用它两个进程除了可以传递普通数据外,还可以在进程间传递「文件描述符」

假设机器上的两个用户 A 和 B,A 拥有访问某个文件的权限而 B 没有,而 B 又需要访问这个文件。借助 Unix Domain Socket,可以让 A 打开文件得到一个文件描述符,然后把文件描述符传递给 B,B 就能读取文件里面的内容了(即使它没有相应的权限)

在 HDFS 的场景里面,A 就是 DataNode,B 就是 DFSClient,需要读取的文件就是 DataNode 数据目录中的某个文件。

这个方案在安全上就比上一个方案上好一些,至少它只允许 DFSClient 读取它需要的文件。

d. 配置&测试

(1)libhadoop.so

因为 Java 不能直接操作 Unix Domain Socket,所以需要安装 Hadoop 的 native 包 libhadoop.so。在编译 Hadoop 源码的时候可以通过编译 native 模块获取。可以用如下命令来检查 native 包是否安装好。

hadoop checknative

(2)hdfs-site.xml

<property>
  <name>dfs.client.read.shortcircuit</name>
  <value>true</value>
</property>
<property>
  <name>dfs.domain.socket.path</name>
  <value>/var/lib/hadoop-hdfs/dn_socket</value>
</property>
  • dfs.client.read.shortcircuit 是打开短路本地读取功能的开关
  • dfs.domain.socket.path 是 DataNode 和 DFSClient 之间沟通的 Socket 的本地路径
    • 要确保 Socket 本地路径提前创建好,这里创建的是文件夹 hadoop-hdfs;
    • 上述配置中的 dn_socket 是 DataNode 自己创建的,不是文件夹。

如何确认配置生效?

(1)查看 DataNode 的日志

在 DataNode 的启动日志中,看到如下相关的日志表明 Unix Domain Socket 被启用了。

2023-04-15 08:18:59,867 INFO  datanode.DataNode (DataNode.java:initDataXceiver(579)) - Listening on UNIX domain socket: /var/lib/hadoop-hdfs/dn_socket

(2)读取一个文件到本地

使用命令找出文件的数据块位置信息,到对应的机器上进行本地下载操作。下载完毕,打开 DataNode 的日志,下面信息就表明读取的时候用到了 Short Circuit Local Reads。

INFO DataNode.clienttrace,(DataXceiver.java:requestShortCircuitFds(334)) - src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_FDS, blockid: 1073741962, srvID: 4ff4d539-1bca-480d-91e3-e5dc8c6bc4a8, success: true

(3)ReadStatistics API

通过 HdfsDataInputStream#getReadStatistics API 来获取读取数据的统计信息。

public class FileSystemCat {
  public static void main(String[] args) throws IOException {
    String uri = args[0];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
    OutputStream out = new FileOutputStream("/tmp/out");
    FSDataInputStream in = null;
    try {
      in = fs.open(new Path(uri));
      IOUtils.copy(in, out);
      if (in instanceof HdfsDataInputStream) {
        HdfsDataInputStream hdfsIn = (HdfsDataInputStream) in;
        DFSInputStream.ReadStatistics stat = hdfsIn.getReadStatistics();
        System.out.println("Total Bytes Read Bytes: " + stat.getTotalBytesRead());
        System.out.println("Short Circuit Read Bytes: " + stat.getTotalShortCircuitBytesRead());
        System.out.println("Local Read Bytes:" + stat.getTotalLocalBytesRead());
      }
    } finally {
      IOUtils.closeQuietly(in
      IOUtils.closeQuietly(out);
    }
  }
}

3.2 Block Balancer

HDFS 数据可能并不总是在 DataNode 之间均匀分布。一个常见的原因是向现有群集中添加了新的 DataNode。

HDFS 提供了一个 Balancer 程序,分析 Block 放置信息并且在整个 DataNode 节点之间平衡数据,直到被视为平衡为止。

所谓的“平衡”指的是每个 DataNode 的利用率(节点上已用空间与节点总容量之比)与集群的利用率(集群上已用空间与集群总容量的比)相差不超过给定阈值百分比。 平衡器无法在单个 DataNode 上的各个卷之间进行平衡。

(1)命令行配置和运行

$ hdfs balancer --help
Usage: hdfs balancer
  [-policy <policy>]
    # the balancing policy: datanode or blockpool
    # 平衡策略,默认为 DataNode,如果 DataNode 平衡,则集群已平衡。
  [-threshold <threshold>]
    # Percentage of disk capacity
    # 集群平衡的条件,DataNode 间磁盘使用率相差阈值,区间选择:0~100
  [-exclude [-f <hosts-file> | <comma-separated list of hosts>]]
    # Excludes the specified datanodes.
    # 默认为空,指定该部分IP不参与balance,-f:指定输入为文件
  [-include [-f <hosts-file> | <comma-separated list of hosts>]]
    # Includes only the specified datanodes.
    # 默认为空,只允许该部分IP参与balance,-f:指定输入为文件
  [-source [-f <hosts-file> | <comma-separated list of hosts>]]
    # Pick only the specified datanodes as source nodes.
  [-blockpools <comma-separated list of blockpool ids>]
    # The balancer will only run on blockpools included in this list.
  [-idleiterations <idleiterations>]
    # Number of consecutive idle iterations (-1 for Infinite) before exit.
  [-runDuringUpgrade]
    # Whether to run the balancer during an ongoing HDFS upgrade.This is usually not desired since it will not affect used space on over-utilized machines.

(2)设置平衡数据传输带宽

hdfs dfsadmin -setBalancerBandwidth <newbandwidth>
# hdfs dfsadmin -setBalancerBandwidth 104857600(100M)

newbandwidth 是每个 DataNode 在平衡操作期间可以使用的最大网络带宽量,以每秒字节数为单位。

(3)默认运行 Balancer

hdfs balancer

此时将会以默认参数进行数据块的平衡操作。

(4)修改阈值运行 Balancer

hdfs balancer -threshold 5

Balancer 将以阈值 5% 运行(默认值 10%),这意味着程序将确保每个 DataNode 上的磁盘使用量与群集中的总体使用量相差不超过 5%。例如,如果集群中所有 DataNode 的总体使用率是集群磁盘总存储容量的 40%,则程序将确保每个 DataNode 的磁盘使用率在该 DataNode 磁盘存储容量的 35%~45% 之间。

3.3 Disk Balancer

a. 概述

相比较于个人 PC,服务器一般可以通过挂载多块磁盘来扩大单机的存储能力。

在 Hadoop HDFS 中,DataNode 负责最终数据块的存储,在所在机器上的磁盘之间分配数据块。当写入新 block 时,DataNodes 将根据选择策略(循环策略可用空间策略)来选择 block 的磁盘(卷)。

  • 循环策略:它将新 block 均匀分布在可用磁盘上,默认此策略。
  • 可用空间策略:此策略将数据写入具有更多可用空间(按百分比)的磁盘。

但是,在长期运行的群集中采用「循环策略」时,DataNode 有时会不均匀地填充其存储目录(磁盘/卷),从而导致某些磁盘已满而其他磁盘却很少使用的情况。发生这种情况的原因可能是由于大量的写入和删除操作,也可能是由于更换了磁盘。另外,如果我们使用「基于可用空间的选择策略」,则每个新写入将进入新添加的空磁盘,从而使该期间的其他磁盘处于空闲状态。这将在新磁盘上创建瓶颈。

因此,需要一种 Intra DataNode Balancing(DataNode 内数据块的均匀分布)来解决 Intra-DataNode 偏斜(磁盘上块的不均匀分布),这种偏斜是由于磁盘更换或随机写入和删除而发生的。

Hadoop 3.0 中引入了一个名为 Disk Balancer 的工具,该工具专注于在 DataNode 内分发数据。


HDFS Disk Balancer 是 Hadoop 3 中引入的命令行工具,用于平衡 DataNode 中的数据在磁盘之间分布不均匀问题。 这里要特别注意,HDFS Disk Balancer 与 HDFS Balancer 是不同的:

  • HDFS Disk Balancer 针对给定的 DataNode 进行操作,将块从一个磁盘移动到另一个磁盘,是 DataNode 节点内部数据在不同磁盘间平衡;
  • HDFS Balancer 平衡了 DataNode 节点之间的分布。

b. 数据传播报告

HDFS Disk Balancer 支持两个主要功能,即「报告」「平衡」

为了定义一种方法来衡量集群中哪些计算机遭受数据分布不均的影响,HDFS 磁盘平衡器定义了 HDFS Volume Data Density Metric(卷/磁盘数据密度度量标准)和 Node Data Density Metric(节点数据密度度量标准)。

  • 「卷数据密度」度量标准能够比较数据在给定节点的不同卷上的分布情况
  • 「节点数据密度」度量允许在节点之间进行比较

Volume Data Density Metric 计算过程

假设有一台具有 4 个卷/磁盘的计算机,各个磁盘使用情况:

\ Disk1 Disk2 Disk3 Disk4
capacity 200 GB 300 GB 350 GB 500 GB
dfsUsed 100 GB 76 GB 300 GB 475 GB
dfsUsedRatio 0.5 0.25 0.85 0.95
volumeDataDensity 0.20 0.45 -0.15 -0.24

VolumeDataDensity 的正值表示磁盘未充分利用,而负值表示磁盘相对于当前理想存储目标的利用率过高。

Total capacity = 200 + 300 + 350 + 500 = 1350 GB
Total Used     = 100 + 76 + 300 + 475  = 951 GB

# 因此,每个卷/磁盘上的理想存储如下,也就是每个磁盘应该保持在 70% 理想存储容量
IdealStorage = totalUsed ÷ totalCapacity = 951÷1350 = 0.70 
# 比如 Disk1 的卷数据密度 = 0.70 - 0.50 = 0.20,其他 Disk 以此类推。
VolumeDataDensity = idealStorage - dfsUsedRatio

Node Data Density 计算过程

Node Data Density = 该节点上所有卷/磁盘 Volume Data Density 绝对值的总和

上述例子中的节点数据密度 = |0.20|+|0.45|+|-0.15|+|-0.24| = 1.04

较低的 NodeDataDensity 值表示该机器节点具有较好的扩展性,而较高的值表示节点具有更倾斜的数据分布。一旦有了 VolumeDataDensity 和 NodeDataDensity,就可以找到集群中数据分布倾斜的节点。

c. 磁盘平衡

当指定某个 DataNode 节点进行 disk 数据平衡,就可以先计算或读取当前的 VolumeDataDensity(磁盘数据密度)。有了这些信息,我们可以轻松地确定哪些卷已超量配置,哪些卷已不足。为了将数据从一个卷移动到 DataNode 中的另一个卷,Hadoop 开发实现了基于 RPC 协议的 Disk Balancer。

HDFS Disk Balancer 通过创建「计划」进行操作,该计划是一组语句,描述应在两个磁盘之间移动多少数据,然后在 DataNode 上执行该组语句。一个计划由多个步骤组成。移动计划包括源磁盘、目标磁盘和要移动的字节数。磁盘平衡器不会干扰其他进程,因为它会限制每秒钟复制多少数据。该计划是针对可操作的 DataNode 执行的。

默认情况下,Hadoop 群集上已经启用了 Disk Balancer 功能。通过在 hdfs-site.xml 中调整 dfs.disk.balancer.enabled 参数值,选择在 Hadoop 中是否启用磁盘平衡器。

(1)Plan 计划

hdfs diskbalancer -plan <DataNode>
  -out
    # 用来控制计划文件的输出位置
  -bandwidth
    # 设置用于运行 Disk Balancer 的最大带宽。默认带宽 10MB/s。
  -thresholdPercentage
    # 定义磁盘开始参与数据重新分配或平衡操作的值。默认的 thresholdPercentage 值为 10%。
    # 这意味着仅当磁盘包含的数据比理想存储值多 10% 或更少时,磁盘才用于平衡操作。
  -maxerror
    # 它允许用户在中止移动步骤之前为两个磁盘之间的移动操作指定要忽略的错误数。
  -v
    # 详细模式,指定此选项将强制 plan 命令在 stdout 上显示计划的摘要。
  -fs
    # 此选项指定要使用的 NameNode。如果未指定,则 Disk Balancer 将使用配置中的默认 NameNode。

(2)Execute 执行

# execute 命令针对为其生成计划的 DataNode 执行计划
hdfs diskbalancer -execute <JSON file path>

(3)Query 查询

# query 命令从运行计划的 DataNode 获取 HDFS 磁盘平衡器的当前状态
hdfs diskbalancer -query <DataNode>

(4)Cancel 取消

# cancel命令取消运行计划
hdfs diskbalancer -cancel <JSON file path>
hdfs diskbalancer -cancel planID node <DataNode>

(5)Report 汇报

hdfs diskbalancer -fs hdfs://hadoop102:8020 -report <file://>

3.4 纠删码技术

a. 纠删码概述

3 副本策略弊端

为了提供容错能力,HDFS 会根据 Replication Factor(复制因子)在不同的 DataNode 上复制文件块。默认复制因子为 3(注意这里的 3 指的是 1+2=3,不是额外 3 个),则原始块除外,还将有额外 2 个副本。每个副本使用 100% 的存储开销,因此导致 200% 的存储开销。这些副本也消耗其他资源,例如网络带宽。

在复制因子为 N 时,存在 N-1 个容错能力,但存储效率仅为 1/N。这种复制增加了存储开销,并且似乎很昂贵。因此,HDFS 使用 Erasure Coding(纠删码)代替复制,以提供相同级别的容错能力,并且存储开销不超过 50%。Erasure Coding 文件的复制因子始终为 1,用户无法对其进行更改。

纠删码技术(Erasure coding)

纠删码技术简称 EC,是一种编码容错技术。最早用于通信行业,数据传输中的数据恢复。它通过对数据进行分块,然后计算出校验数据,使得各个部分的数据产生关联性。当一部分数据块丢失时,可以通过剩余的数据块和校验块计算出丢失的数据块。

Hadoop 3.0 之后引入了纠删码技术(Erasure Coding),它可以提高 50% 以上的存储利用率,并且保证数据的可靠性。

存储系统 RAID 使用纠删码。RAID 通过 striping(条带化)实现纠删码,也就是说,将逻辑上连续的数据(例如文件)划分为较小的单位(bit, byte or block),并将连续的单位存储在不同的磁盘上。

对于原始数据集的每个条带,都会根据纠删码算法来计算并存储一定数量的奇偶校验单元,该过程称为“编码”。任何条带化单元中的错误都可以根据剩余数据和奇偶校验单元从计算中恢复,此过程称为“解码”。

b. RS 码

Reed-Solomon(RS)码是存储系统较为常用的一种纠删码,它有两个参数 k 和 m,记为 RS(k, m)。

如下图所示,k 个数据块组成一个向量被乘上一个生成矩阵(Generator Matrix)从而得到一个码字(codeword)向量,该向量由 k 个数据块(d0, d1,d2, d3)和 m 个校验块(c0, c1)构成。

如果一个数据块丢失,可以用 GT 逆矩阵乘以码字向量来恢复出丢失的数据块。RS(k, m) 最多可容忍 m 个块(包括数据块和校验块)丢失。

RS 码通俗解释:比如有 7、8、9 三个原始数据,通过矩阵乘法,计算出来两个校验数据 50、122。这时原始数据加上校验数据,一共五个数据:7、8、9、50、122,可以任意丢两个,然后通过算法进行恢复。

c. EC 架构

为了支持纠删码,HDFS 体系结构进行了一些更改调整。

(1)NameNode 扩展

条带化的 HDFS 文件在逻辑上由 Block Group(块组)组成,每个块组包含一定数量的内部块。这允许在块组级别而不是块级别进行文件管理。

(2)客户端扩展

客户端的读写路径得到了增强,可以并行处理块组中的多个内部块。

(3)DataNode 扩展

DataNode 运行一个附加的 ErasureCodingWorker(ECWorker)任务,以对失败的纠删编码块进行后台恢复。NameNode 检测到失败的 EC 块,然后 NameNode 选择一个 DataNode 进行恢复工作。

(4)纠删编码策略

为了适应异构的工作负载,允许 HDFS 群集中的「文件」和「目录」具有不同的复制和纠删码策略。纠删码策略封装了如何对文件进行编码/解码。默认情况下启用 RS-6-3-1024k 策略, RS 表示编码器算法 Reed-Solomon,6-3 表示数据块和奇偶校验块的数量,1024k 表示条带化单元的大小。

目录上还支持默认的 REPLICATION 方案!它只能在目录上设置,以强制目录采用 3 倍复制方案,而不继承其祖先的纠删码策略。此策略可以使 3x 复制方案目录与纠删码目录交错。REPLICATION 始终处于启用状态。

此外也支持用户通过 XML 文件定义自己的 EC 策略,Hadoop conf 目录中有一个名为 user_ec_policies.xml.template 的示例 EC 策略 XML 文件,用户可以参考该文件。

(5)Intel ISA-L

英特尔 ISA-L 代表英特尔智能存储加速库。 ISA-L 是针对存储应用程序而优化的低级功能的开源集合。它包括针对 Intel AVX 和 AVX2 指令集优化的快速块 Reed-Solomon 类型擦除代码。HDFS 纠删码可以利用 ISA-L 加速编码和解码计算。

d. EC 部署

(1)集群和硬件配置

编码和解码工作会消耗 HDFS 客户端和 DataNode 上的额外 CPU

纠删码文件也分布在整个机架上,以实现机架容错。这意味着在读写条带化文件时,大多数操作都是在机架上进行的。因此,网络带宽也非常重要。

对于机架容错,拥有足够数量的机架也很重要,每个机架所容纳的块数不超过 EC 奇偶校验块的数。

机架数量 = { (数据块 + 奇偶校验块) / 奇偶校验块 } 再取整

比如对于 EC 策略 RS(6,3),这意味着最少 3 个机架( (6+3)/3=3),理想情况下为 9 个或更多,以处理计划内和计划外的停机。对于机架数少于奇偶校验单元数的群集,HDFS 无法维持机架容错能力,但仍将尝试在多个节点之间分布条带化文件以保留节点级容错能力。因此,建议设置具有类似数量的 DataNode 的机架。

(2)纠删码策略设置

纠删码策略由参数 dfs.namenode.ec.system.default.policy 指定,默认是 RS-6-3-1024k,其他策略默认是禁用的。可以通过 hdfs ec [-enablePolicy -policy <policyName>] 命令启用策略集。

(3)启用英特尔 ISA-L

默认 RS 编解码器的 HDFS 本机实现利用 Intel ISA-L 库来改善编码和解码计算。要启用和使用 Intel ISA-L,需要执行 3 个步骤。

  1. 建立 ISA-L 库;
  2. 使用 ISA-L 支持构建 Hadoop;
  3. 使用 -Dbundle.isal 将 isal.lib 目录的内容复制到最终的 tar 文件中。使用 tar 文件部署 Hadoop,确保 ISA-L 在 HDFS 客户端和 DataNode 上可用。

e. EC 命令

HDFS 提供了一个 EC 子命令来执行与纠删码(擦除编码)有关的管理命令:

$ hdfs ec
  [-setPolicy -path <path> [-policy <policy>] [-replicate]]
    # 在指定路径的目录上设置擦除编码策略。
    # path:HDFS 中的目录。这是必填参数。设置策略仅影响新创建的文件,而不影响现有文件!
    # policy:用于此目录下文件的擦除编码策略。默认 RS-6-3-1024k 策略。
    # -replicate 在目录上应用默认的 REPLICATION 方案,强制目录采用 3x 复制方案。
    # -replicate 和 -policy <policy> 是可选参数。不能同时指定它们。
  [-getPolicy -path < path >]
    # 获取指定路径下文件或目录的擦除编码策略的详细信息。
  [-unsetPolicy -path < path >]
    # 取消设置先前对目录上的 setPolicy 的调用所设置的擦除编码策略。
    # 如果该目录从祖先目录继承了擦除编码策略,则 unsetPolicy 是 no-op。
    # 在没有显式策略集的目录上取消策略将不会返回错误。
  [-listPolicies]
    # 列出在 HDFS 中注册的所有(启用,禁用和删除)擦除编码策略。
    # 只有启用的策略才适合与 setPolicy 命令一起使用。
  [-addPolicies -policyFile <文件>]
    # 添加用户定义的擦除编码策略列表。
  [-listCodecs]
    # 获取系统中支持的擦除编码编解码器和编码器的列表。
  [-removePolicy -policy <policyName>]
    # 删除用户定义的擦除编码策略。
  [-enablePolicy -policy <policyName>]
    # 启用擦除编码策略。
  [-disablePolicy -policy <policyName>]
    # 禁用擦除编码策略。

4. HDFS 动态节点管理

已有 HDFS 集群容量已经不能满足存储数据的需求,需要在原有集群基础上动态添加新的 DataNode 节点,就是俗称的「动态扩容」。旧的服务器需要进行退役更换,暂停服务,需要在当下的集群中停止某些机器上 HDFS 的服务,俗称「动态缩容」。

4.1 动态扩容、节点上线

(1)新机器基础环境准备好后,进行 Hadoop 配置;

  1. 修改 NameNode 节点 workers 配置文件,增加新节点主机名,便于后续一键启停;
  2. 从 NameNode 节点复制 Hadoop 安装包到新节点(注意不包括 hadoop.tmp.dir 指定的数据存储目录);
  3. 新机器上配置 Hadoop 环境变量。

(2)手动启动 DataNode 进程;

hdfs --daemon start datanode

(3)Hadoop Web 页面检查「Datanodes」卡页,会发现新增节点已经使用了;

(4)DataNode 负载均衡服务。新加入的节点,没有数据块的存储,使得集群整体来看负载不均衡。因此最后还需要对 HDFS 负载设置均衡。

# 首先设置数据传输带宽
hdfs dfsadmin -setBalancerBandwidth 104857600
# 然后启动 Balancer,等待集群自均衡完成即可
hdfs balancer -threshold 5

4.2 动态缩容、节点下线

(1)添加退役节点

在 NameNode 机器的 hdfs-site.xml 配置文件中需要提前配置 dfs.hosts.exclude 属性,该属性指向的文件就是所谓的黑名单列表,会被 NameNode 排除在集群之外。如果文件内容为空,则意味着不禁止任何机器。

提前配置好的目的是让 NameNode 启动的时候就能加载到该属性,只不过还没有指定任何机器。否则就需要重启 NameNode 才能加载,因此这样的操作我们称之为具有前瞻性的操作。

<property>
  <name>dfs.hosts.exclude</name>
  <value>/export/server/hadoop-3.1.4/etc/hadoop/excludes</value>
</property>

编辑 dfs.hosts.exclude 属性指向的 excludes 文件,向文件写入添加需要退役的主机名称。

注意:如果副本数是 3,服役的节点小于等于 3,是不能退役成功的,需要修改副本数后才能退役。

(2)刷新集群

在 NameNode 所在的机器刷新节点:hdfs dfsadmin -refreshNodes,等待退役节点状态为 decommissioned(所有块已经复制完成)。

(3)手动关闭退役节点的 DataNode 进程

hdfs --daemon stop datanode

(4)DataNode 负载均衡服务

# 如果需要可以对已有的 HDFS 集群进行负载均衡服务
hdfs balancer -threshold 5

4.3 黑白名单机制

(1)白名单

所谓的白名单指的是允许哪些机器加入到当前的 HDFS 集群中,是一种准入机制。

白名单由 dfs.hosts 参数指定,该参数位于 hdfs-site.xml。默认值为空。

dfs.hosts 指向文件,该文件包含允许连接到 NameNode 的主机列表。必须指定文件的完整路径名。如果该值为空,则允许所有主机准入。

(2)黑名单

所谓的黑名单指的是禁止哪些机器加入到当前的 HDFS 集群中,是一种禁入机制。

黑名单由 dfs.hosts.exclude 参数指定,该参数位于 hdfs-site.xml。默认值为空。

dfs.hosts.exclude 指向文件,该文件包含不允许连接到名称节点的主机列表。必须指定文件的完整路径名。如果该值为空,则不禁止任何主机加入。

5. HDFS·HA 高可用机制

5.1 HA 前置知识

a. 单点故障、高可用

单点故障(Single Point Of Failure,SPOF)是指系统中某一点一旦失效,就会让整个系统无法运作,换句话说,单点故障即会整体故障。

高可用性(High Availability, HA)指系统无中断地执行其功能的能力,代表系统的可用性程度。是进行系统设计时的准则之一。高可用性系统意味着系统服务可以更长时间运行,通常通过提高系统的容错能力来实现。

高可用性或者高可靠度的系统不会希望有单点故障造成整体故障的情形。一般可以透过冗余的方式增加多个相同机能的部件,只要这些部件没有同时失效,系统(或至少部分系统)仍可运作,这会让可靠度提高。

b. 高可用如何实现

主备集群

解决单点故障,实现系统服务高可用的核心并不是让故障永不发生,而是让故障的发生对业务的影响降到最小。因为软硬件故障是难以避免的问题。

当下企业中成熟的做法就是给单点故障的位置设置备份,形成主备架构。通俗描述就是当主挂掉,备份顶上,短暂的中断之后继续提供服务。常见的是一主一备架构,当然也可以一主多备。备份越多,容错能力越强,与此同时,冗余也越大,浪费资源。

Active、Standby

  • Active:主角色。活跃的角色,代表正在对外提供服务的角色服务。任意时间有且只有一个 Active 对外提供服务。
  • Standby:备份角色。需要和主角色保持数据、状态同步,并且时刻准备切换成主角色(当主角色挂掉或者出现故障时),对外提供服务,保持服务的可用性。

c. 可用性评判标准

在系统的高可用性里有个衡量其可靠性的标准——X个9,这个X是代表数字3-5。X个9表示在系统1年时间的使用过程中,系统可以正常使用时间与总时间(1年)之比。

  • 3个9:(1-99.9%)*365*24=8.76h 表示该系统在连续运行 1 年时间里最多可能的业务中断时间是 8.76 小时;
  • 4个9:(1-99.99%)*365*24=0.876h 表示该系统在连续运行 1 年时间里最多可能的业务中断时间是 52.6 分钟;
  • 5个9:(1-99.999%)*365*24*60=5.26min 表示该系统在连续运行 1 年时间里最多可能的业务中断时间是 5.26 分钟;

可以看出,9 越多,系统的可靠性越强,能够容忍的业务中断时间越少,但是要付出的成本更高。

d. HA 设计核心问题

脑裂问题

脑裂(split-brain)是指“大脑分裂”,本是医学名词。在 HA 集群中,脑裂指的是当联系主备节点的"心跳线"断开时(即两个节点断开联系),本来为一个整体、动作协调的 HA 系统,就分裂成为两个独立的节点。

由于相互失去了联系,主备节点之间像“裂脑人”一样,使得整个集群处于混乱状态。脑裂的严重后果:

  1. 集群无主:都认为对方是状态好的,自己是备份角色,后果是无服务;
  2. 集群多主:都认为对方是故障的,自己是主角色。相互争抢共享资源,结果会导致系统混乱,数据损坏。此外对于客户端访问也是一头雾水,找谁呢?

避免脑裂问题的核心是:保持任意时刻系统有且只有 1 个主角色提供服务。

数据同步问题

主备切换保证服务持续可用性的前提是主备节点之间的状态、数据是一致的,或者说准一致的。如果说备用的节点和主节点之间的数据差距过大,即使完成了主备切换的动作,那也是没有意义的。

数据同步常见做法是:通过日志重演操作记录。「主角色」正常提供服务,发生的事务性操作通过日志记录,「备用角色」读取日志重演操作。

5.2 NN 单点故障

在 Hadoop 2.0.0 之前,NameNode 是 HDFS 集群中的单点故障(SPOF)。每个群集只有一个 NameNode,如果该计算机或进程不可用,则整个群集在整个 NameNode 重新启动或在另一台计算机上启动之前将不可用。

NameNode 的单点故障从两个方面影响了 HDFS 群集的总可用性:

  • 如果发生意外事件,例如机器崩溃,则在重新启动 NameNode 之前,群集将不可用;
  • 计划内的维护事件,例如 NameNode 计算机上的软件或硬件升级,将导致群集停机时间的延长。

【HDFS 高可用性解决方案】在同一群集中运行两个(从 3.0.0 起,超过两个)冗余 NameNode,形成主备架构。这样可以在机器崩溃的情况下快速故障转移到新的 NameNode,或者出于计划维护的目的由管理员发起的正常故障转移。

5.3 解决方案·QJM

QJM 全称 Quorum Journal Manager,由 cloudera 公司提出,是 Hadoop 官方推荐的 HDFS HA 解决方案之一。

QJM 使用 ZooKeeper 中 ZKFC 来实现主备切换,使用 Journal Node 集群实现 Edits Log 的共享以达到数据同步的目的。

a. 主备切换、脑裂问题解决

ZooKeeper

Apache ZooKeeper 是一款高可用分布式协调服务软件,用于维护少量的协调数据。 zk 的下列特性功能参与了 HDFS 的 HA 解决方案中:

(1)临时 znode

如果一个 znode 节点是临时的,那么该 znode 的生命周期将和创建它的客户端的 session 绑定。客户端断开连接 session 结束,Znode 将会被自动删除。

(2)Path 路径唯一性

zk 中维持了一份类似目录树的数据结构。每个节点称之为 Znode,Znode 具有唯一性,不会重名。也可以理解为排他性。

(3)监听机制

客户端可以针对 Znode 上发生的事件设置监听,当事件发生触发条件,zk 服务会把事件通知给设置监听的客户端。

ZKFailoverController(ZKFC)

ZKFC 是一个新组件,它是一个 ZooKeeper 客户端。运行 NameNode 的每台计算机也都运行 ZKFC,ZKFC 的主要职责:

(1)监视和管理 NameNode 健康状态

ZKFC 通过命令定期 ping 本地负责监视的 NameNode 节点。

(2)维持和 ZooKeeper 集群联系

如果本地 NameNode 运行状况良好,并且 ZKFC 看到当前没有其他节点持有锁 Znode,它将自己尝试获取该锁。如果成功,则表明它“赢得了选举”,并负责运行故障转移以使其本地 NameNode 处于 Active 状态。如果已经有其他节点持有锁,ZKFC 选举失败,则会对该节点注册监听,等待下次继续选举。

Fencing 隔离机制

故障转移过程也就是俗称的主备角色切换的过程,切换过程中最怕的就是“脑裂”的发生。因此需要 Fencing 机制来避免(远程补刀,防止 NN 是假死),将先前的 Active 节点隔离,然后将本地 NameNode 转换为 Active 状态。

Hadoop 公共库中对外提供了两种 fenching 实现,分别是 sshfence 和 shellfence(缺省实现),其中 sshfence 是指通过 ssh 登陆目标节点上,使用命令 fuser 将进程杀死(通过 tcp 端口号定位进程 pid,该方法比 jps 命令更准确),shellfence 是指执行一个用户事先定义的 shell 命令(脚本)完成隔离。

b. 主备数据同步问题解决

Journal Node(JN)集群是轻量级分布式系统,主要用于高速读写数据、存储数据。通常使用 2N+1 台 Journal Node 存储共享 edits log(底层类似于 zk 的分布式一致性算法)。

任何修改操作在 Active NN 上执行时,JN 进程同时也会记录 edits log 到至少半数以上的 JN 中,这时 Standby NN 监测到 JN 里面的同步 log 发生变化了会读取 JN 里面的 edits log,然后重演操作记录同步到自己的目录镜像树里面。

当发生故障 Active NN 挂掉后,Standby NN 会在它成为 Active NN 前,读取所有的 JN 里面的修改日志,这样就能高可靠的保证与挂掉的 NN 的目录镜像树一致,然后无缝的接替它的职责,维护来自客户端请求,从而达到一个高可用的目的。

5.4 HA 环境搭建

(1)集群基础环境准备

  1. 修改主机名:/etc/hostname

  2. 修改 IP:/etc/sysconfig/network-scripts/ifcfg-ens33

  3. 修改主机名和IP的映射关系 /etc/hosts

  4. 关闭防火墙

  5. ssh 免登陆

  6. 安装 JDK,配置环境变量等 /etc/profile

  7. 集群时间同步

  8. 配置主备 NN 之间的互相免密登录(fencing)

(2)HA 集群规划

机器 运行角色
node1 namenode zkfc datanode zookeeper journal node
node2 namenode zkfc datanode zookeeper journal node
node3 datanode zookeeper journal node

(3)安装配置 zk 集群

# 1. 解压
$ tar -zxvf zookeeper-3.4.5.tar.gz -C /export/server
# 2. 修改配置
$ cd /export/server/zookeeper-3.4.5/conf/
$ cp zoo_sample.cfg zoo.cfg
$ vim zoo.cfg
# - 修改:dataDir=/export/data/zkdata
# - 在文件末尾追加
      server.1=node1:2888:3888
      server.2=node2:2888:3888
      server.3=node3:2888:3888
# 3. 创建一个tmp文件夹
$ mkdir /export/data/zkdata
$ echo 1 > /export/data/zkdata/myid
# 4. 将配置好的 zk 拷贝到其他节点
$ scp -r /export/server/zookeeper-3.4.5 node2:/export/server
$ scp -r /export/server/zookeeper-3.4.5 node3:/export/server
# 5. 编辑 node2、node3 对应 /export/data/zkdata/myid 内容
# - node2 -
$ mkdir /export/data/zkdata
$ echo 2 > /export/data/zkdata/myid
# - node3 -
$ mkdir /export/data/zkdata
$ echo 3 > /export/data/zkdata/myid

(4)上传安装包、配置环境变量(node1)

$ tar zxvf hadoop-3.1.4-bin-snappy-CentOS7.tar.gz -C /export/server/
$ vim /etc/profile
export HADOOP_HOME=/export/server/hadoop-3.1.4
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

(5)修改 Hadoop 配置文件

  1. hadoop-env.sh

    $ vim /export/server/hadoop-3.1.4/etc/Hadoop/hadoop-env.sh
    export JAVA_HOME=/export/server/jdk1.8.0_65
    export HDFS_NAMENODE_USER=root
    export HDFS_DATANODE_USER=root
    export HDFS_JOURNALNODE_USER=root
    export HDFS_ZKFC_USER=root
    
  2. core-site.xml

    <configuration>
     <!-- HA集群名称,该值要和hdfs-site.xml中的配置保持一致 -->
     <property>
       <name>fs.defaultFS</name>
       <value>hdfs://mycluster</value>
     </property>
    
     <!-- Hadoop本地磁盘存放数据的公共目录 -->
     <property>
       <name>hadoop.tmp.dir</name>
       <value>/export/data/ha-hadoop</value>
     </property>
    
     <!-- ZooKeeper集群的地址和端口-->
     <property>
       <name>ha.zookeeper.quorum</name>
       <value>node1:2181,node2:2181,node3:2181</value>
     </property>
    </configuration>
    
  3. hdfs-site.xml

    <configuration>
     <!-- 指定hdfs的nameservice为mycluster,需要和core-site.xml中的保持一致 -->
     <property>
       <name>dfs.nameservices</name>
       <value>mycluster</value>
     </property>
     
     <!-- mycluster下面有两个NameNode,分别是nn1,nn2 -->
     <property>
       <name>dfs.ha.namenodes.mycluster</name>
       <value>nn1,nn2</value>
     </property>
    
     <!-- nn1的RPC通信地址 -->
     <property>
       <name>dfs.namenode.rpc-address.mycluster.nn1</name>
       <value>node1:8020</value>
     </property>
    
     <!-- nn1的http通信地址 -->
     <property>
       <name>dfs.namenode.http-address.mycluster.nn1</name>
       <value>node1:9870</value>
     </property>
    
     <!-- nn2的RPC通信地址 -->
     <property>
       <name>dfs.namenode.rpc-address.mycluster.nn2</name>
       <value>node2:8020</value>
     </property>
     
     <!-- nn2的http通信地址 -->
     <property>
       <name>dfs.namenode.http-address.mycluster.nn2</name>
       <value>node2:9870</value>
     </property>
    
     <!-- 指定NameNode的edits元数据在JournalNode上的存放位置 -->
     <property>
       <name>dfs.namenode.shared.edits.dir</name>
       <value>qjournal://node1:8485;node2:8485;node3:8485/mycluster</value>
     </property>
     
     <!-- 指定JournalNode在本地磁盘存放数据的位置 -->
     <property>
       <name>dfs.journalnode.edits.dir</name>
       <value>/export/data/journaldata</value>
     </property>
    
     <!-- 开启NameNode失败自动切换 -->
     <property>
       <name>dfs.ha.automatic-failover.enabled</name>
       <value>true</value>
     </property>
    
     <!-- 指定该集群出故障时,哪个实现类负责执行故障切换 -->
     <property>
       <name>dfs.client.failover.proxy.provider.mycluster</name>
        <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
     </property>
    
     <!-- 配置隔离机制方法-->
     <property>
       <name>dfs.ha.fencing.methods</name>
       <value>sshfence</value>
     </property>
     
     <!-- 使用sshfence隔离机制时需要ssh免登陆 -->
     <property>
       <name>dfs.ha.fencing.ssh.private-key-files</name>
       <value>/root/.ssh/id_rsa</value>
     </property>
     
     <!-- 配置sshfence隔离机制超时时间 -->
     <property>
       <name>dfs.ha.fencing.ssh.connect-timeout</name>
       <value>30000</value>
     </property>
    </configuration>
    
  4. mapred-site.xml

    <configuration>
      <!-- 指定MR框架为yarn方式 -->
      <property>
      <name>mapreduce.framework.name</name>
      <value>yarn</value>
      </property>
    </configuration>
    
  5. yarn-site.xml

    <configuration>
      <!-- 开启RM高可用 -->
      <property>
        <name>yarn.resourcemanager.ha.enabled</name>
        <value>true</value>
      </property>
      
      <!-- 指定RM的cluster id -->
      <property>
        <name>yarn.resourcemanager.cluster-id</name>
        <value>yrc</value>
      </property>
      
      <!-- 指定RM的名字 -->
      <property>
        <name>yarn.resourcemanager.ha.rm-ids</name>
        <value>rm1,rm2</value>
      </property>
      
      <!-- 分别指定RM的地址 -->
      <property>
        <name>yarn.resourcemanager.hostname.rm1</name>
        <value>node-1</value>
      </property>
      <property>
        <name>yarn.resourcemanager.hostname.rm2</name>
        <value>node-2</value>
      </property>
      
      <!-- 指定zk集群地址 -->
      <property>
        <name>yarn.resourcemanager.zk-address</name>
        <value>node-1:2181,node-2:2181,node-3:2181</value>
      </property>
      
      <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
      </property>
    </configuration>
    

(6)修改workers

node1
node2
node3

(7)配置免密码登陆

# 首先要配置 node1 到 node1、node2、node3 的免密码登陆
# 在 node1 上生产一对钥匙
$ ssh-keygen -t rsa
# 将公钥拷贝到其他节点,包括自己
ssh-coyp-id node1
ssh-coyp-id node2
ssh-coyp-id node3
# 注:两个 namenode 之间要配置 ssh 免密码登陆 (ssh远程补刀时候需要)

(8)启动 zk 集群(分别在 node1、node2、node3 上启动 zk)

$ bin/zkServer.sh start
$ bin/zkServer.sh status

(9)手动启动 JournalNode(分别在在 node1、node2、node3 上执行)

hadoop-daemon.sh start journalnode

(10)格式化 NameNode

# 在 node1 上执行格式化后会根据 core-site.xml 中的 hadoop.tmp.dir 配置的目录下生成个 hdfs 初始化文件
hdfs namenode -format
# 在 node1 启动 namenode 进程
hdfs --daemon start namenode
# 在 node2 上进行元数据同步
hdfs namenode -bootstrapStandby

(11)格式化 ZKFC(在哪台机器上执行,哪台机器就将成为第 1 次的 Active NN)

hdfs zkfc -formatZK

(12)启动 HDFS(在 node1 上执行)

start-dfs.sh

(13)启动 YARN

start-yarn.sh
yarn-daemon.sh start resourcemanager

(14)查看 HDFS 各节点状态信息

hdfs dfsadmin -report

5.5 HA 效果演示

(1)Web 页面查看两个 NameNode 状态

(2)Active 上可以正常操作,Standby 上无法操作。

(3)模拟故障出现

在 node1,手动 kill 杀死 NameNode 进程。此时发现 node2 上的 NameNode 切换成为 Active 状态且 HDFS 服务正常可用。

(4)HA 自动切换失败·错误解决

使用 kill -9 模拟 JVM 崩溃或者重新启动计算机电源或拔出其网络接口以模拟另一种故障后,另一个 NameNode 应在几秒钟内自动变为活动状态。检测故障并触发故障转移所需的时间取决于 ha.zookeeper.session-timeout.ms 的配置,默认值为 5s。

如果测试不成功,检查 zkfc 守护程序以及 NameNode 守护程序的日志,以便进一步诊断问题。

若错误信息提示是未找到 fuser 程序,导致无法进行隔离,可以通过 yum install psmisc -y 来安装,Psmisc 软件包中包含了 fuser 程序(两个 NN 机器上都需要进行安装)。

6. Federation 联邦机制

6.1 当前体系架构

当前的 HDFS 架构有两个主要的层:

(1)命名空间(Namespace)

HDFS 体系结构中的命名空间层由文件、块和目录组成。该层支持与名称空间相关的文件系统操作,例如创建、删除、修改和列出文件和目录。

(2)块存储层(Block Storage)

块存储层包括两个部分:

  1. 块管理:NameNode 执行块管理。块管理通过处理注册和定期心跳来提供 DataNode 群集成员身份。它处理块报告并支持与块相关的操作,如创建、删除、修改或获取块位置。它还维护块的位置、副本位置。为未复制的块管理块复制,并在已复制的块中删除。
  2. 存储: DataNode 通过在本地文件系统上存储块并提供读/写访问权限来管理存储空间。

当下的 HDFS 体系结构仅允许单个 NameNode 维护文件系统名称空间。注意 HA 体系中虽然说允许多个 NameNode,但是他们所维护的是同一套文件系统名称空间。这种体系目前存在着一些弊端和局限性:

  • DataNode 磁盘存储空间不够增加节点,NameNode 内存不够是否可以无限扩容。一种是 DataNode 横向扩展机器增加节点,一种是纵向扩展单机加内存;
  • 由于名称空间和存储层的紧密耦合,NameNode 的替代实现很困难。这限制了其他服务直接使用块存储。唯一的 NameNode 成了唯一入口;
  • 文件系统的操作还限于 NameNode 一次处理的任务数。因此,群集的性能取决于 NameNode 吞吐量。
  • 同样,由于使用单个名称空间,因此使用群集的占用者组织之间没有隔离。

6.2 联邦架构概述

Federation 中文意思为“联邦”,是 NameNode 之间的 Federation,也就是集群中会有多个 NameNode。多个 NameNode 的情况意味着有多个 Namespace。

注意,这区别于 HA 模式下的多 NameNode,HA 中它们是拥有着同一个 Namespace。Federation 体系中多个 NameNode 之间相互独立且不需要互相协调,各自分工,管理自己的区域。

每个 DataNode 要向集群中所有的 NameNode 注册,且周期性地向所有 NameNode 发送心跳和块报告,并执行来自所有 NameNode 的命令。

上图中,有多个 NameNode,分别表示为 NN1、NN2、...、NNn。NS1、NS2 等是由它们各自的 NameNode 管理的名称空间。

每个名称空间都有其自己的块池(NS1 具有 Block Pool1、NS2 具有 Block Pool2,依此类推)。每个 DataNode 存储集群中所有块池的块。

HDFS Federation 体系结构中的块池(Block Pool)是属于单个名称空间的块的集合。每个块池彼此独立地进行管理。在删除 NameNode 或名称空间时,DataNode 中存在的相应块池也将被删除。在升级群集时,每个名称空间卷都作为一个单元进行升级。

好处:

(1)命名空间可伸缩性

使用 Federation,可以水平扩展名称空间。这对大型群集或包含太多小文件的群集有利,因为向群集添加了更多的 NameNode。

(2)性能

由于文件系统操作不受单个 NameNode 吞吐量的限制,因此可以提高文件系统的性能。

(3)隔离

由于有多个名称空间,它可以为使用群集的占用者组织提供隔离。

6.3 联邦配置示例

<configuration>
  <property>
    <name>dfs.nameservices</name>
    <value>ns1,ns2</value>
  </property>
  <property>
    <name>dfs.namenode.rpc-address.ns1</name>
    <value>nn-host1:rpc-port</value>
  </property>
  <property>
    <name>dfs.namenode.http-address.ns1</name>
    <value>nn-host1:http-port</value>
  </property>
  <property>
    <name>dfs.namenode.secondary.http-address.ns1</name>
    <value>snn-host1:http-port</value>
  </property>
  <property>
    <name>dfs.namenode.rpc-address.ns2</name>
    <value>nn-host2:rpc-port</value>
  </property>
  <property>
    <name>dfs.namenode.http-address.ns2</name>
    <value>nn-host2:http-port</value>
  </property>
  <property>
    <name>dfs.namenode.secondary.http-address.ns2</name>
    <value>snn-host2:http-port</value>
  </property>

  <!-- Other Common Configuration -->
</configuration>

7. HDFS 集群滚动升级

HDFS 滚动升级允许升级单个 HDFS 守护程序。例如,可以独立于 NameNodes 升级 DataNodes,也可以独立于其他 NameNodes 升级单独 NameNode,也可以独立于 DataNodes 和 JournaNodes 升级 NameNodes。

注意,仅从 Hadoop-2.4.0 起才支持滚动升级。

7.1 升级

在 Hadoop v2 中,HDFS 支持 NameNode 高可用(HA)和线路兼容性(Wire Compatibility)。这两个功能使升级 HDFS 变得可行,且不会导致 HDFS 停机。

为了在不停机的情况下升级 HDFS 群集,必须使用 HA 设置群集。

如果新软件版本中启用了任何新功能,则升级后可能无法与旧软件版本一起使用。在这种情况下,应按照步骤进行升级:禁用新功能 - 升级集群 - 启用新功能。

a. 不停机升级

在 HA 群集中,有两个或多个 NameNode(NN)、许多 DataNode(DN)、一些 JournalNode(JN)和一些ZooKeeperNode(ZKN)。JN 相对稳定,在大多数情况下,升级 HDFS 时不需要升级。

滚动升级过程中,仅针对 NNs 和 DNs。JNs 和 ZKNs 都没有,升级 JN 和 ZKN 可能会导致群集停机。

非联邦 HA 集群

假设有两个名称节点 NN1 和 NN2,其中 NN1 和 NN2 分别处于 Active 和 StandBy 状态。以下是升级 HA 群集的步骤:

#### 1. 准备滚动升级
# 以创建用于回滚的 fsimage
$ hdfs dfsadmin -rollingUpgrade prepare
# 断运行下面命令检查回滚 fsimage 是否创建完毕,直到显示 Proceeding with Rolling Upgrade 信息
$ hdfs dfsadmin -rollingUpgrade query 

#### 2. 升级 Active 和 StandBy NN
# 关闭 NN2
hdfs --daemon stop namenode
# 升级启动 NN2
hdfs --daemon start namenode -rollingUpgrade started
# 关闭 NN1(做一次 failover 切换,使得 NN2 成为 Active,NN1 变为 Standby)
hdfs --daemon stop namenode
# 升级启动 NN1
$ hdfs --daemon start namenode -rollingUpgrade started

#### 3. 升级 DN
# 选择整体中的一小部分 DataNode 节点进行升级(比如按照 DataNode 所在的不同机架来筛选)。
# 关闭升级所选的 DN 其中 IPC_PORT 由参数 dfs.datanode.ipc.address 指定,默认 9867。
$ hdfs dfsadmin -shutdownDatanode <DATANODE_HOST:IPC_PORT> upgrade
# 检查下线 DataNode 是否已经停止服务。如果还能得到节点信息,意味着此节点还未真正被关闭。
$ hdfs dfsadmin -getDatanodeInfo <DATANODE_HOST:IPC_PORT>
# 启动 DN 节点
$ hdfs --daemon start datanode
# --- 对选中的所有 DN 节点执行以上步骤,直到升级群集中的所有 DN 节点。---

#### 4. 完成滚动升级
$ hdfs dfsadmin -rollingUpgrade finalize

联邦 HA 集群

联邦集群(Federation)是拥有多 Namespace 的集群。每个 Namespace 对应一对主备 NameNode 节点。上述这套集群就是俗称的「联邦 + HA」集群。

联邦集群的升级过程与非联邦集群的升级过程比较相似,没有什么本质区别,只是需要为不同的 Namespace 多重复执行几遍升级操作而已。

#### 1. 在每个 Namespace 下执行升级准备
$ hdfs dfsadmin -rollingUpgrade prepare

#### 2. 升级每个 Namespace 下的 Active/Standby 节点
# 关闭 NN2
$ hdfs --daemon stop namenode
# 升级启动 NN2
$ hdfs --daemon start namenode -rollingUpgrade started
# 做一次 failover 切换,使得 NN2 成为 Active 节点,NN1 变为 Standby 节点。
$ hdfs --daemon stop namenode
$ hdfs --daemon start namenode -rollingUpgrade started

#### 3. 升级每个 DataNode 节点
# 关闭升级所选的 DN,其中 IPC_PORT 由参数 dfs.datanode.ipc.address 指定,默认 9867。
hdfs dfsadmin -shutdownDatanode <DATANODE_HOST:IPC_PORT> upgrade
# 检查下线 DataNode 是否已经停止服务,如果还能得到节点信息,意味着此节点还未真正被关闭。
$ hdfs dfsadmin -getDatanodeInfo <DATANODE_HOST:IPC_PORT>
# 如果上一步得不到节点信息,说明升级结束了。此时启动 DataNode 节点
hdfs --daemon start datanode

#### 4. 升级过程执行完毕,在每个 Namespace 下执行 finalize 确认命令。
hdfs dfsadmin -rollingUpgrade finalize

b. 停机升级

【非 HA 模式】在升级的过程中,势必会存在服务短暂停止的时间,因为 NameNode 需要重启,而这段时间并没有备用节点可选。整体过程同非联邦 HA 模式的 4 个步骤类似。不过 step2 的过程要略微修改:

# Step1: 滚动升级准备
# Step2: 升级 NN 和 SNN
# a. 关闭 NN
$ hdfs --daemon stop namenode
# b. 升级启动 NN
hdfs --daemon start namenode -rollingUpgrade started
# c. 停止 SNN
$ hdfs --daemon stop secondarynamenode
# d. 升级启动 SNN
$ hdfs --daemon start secondarynamenode -rollingUpgrade started
# Step3: 升级 DN
# Step4: 完成滚动升级
hdfs dfsadmin -rollingUpgrade finalize

7.2 降级&回滚

如果不希望使用升级版本,或者在某些不太可能的情况下,升级失败(由于较新版本中的错误),管理员可以选择将 HDFS 降级到升级前版本,或将 HDFS 回滚到升级前版本和升级前的状态。

请注意,降级可以滚动方式进行,但不能回滚。回滚要求集群停机。

降级(downgrade)和回滚(rollback)区别

共同点:

  • 都会将版本退回到升级前的版本;
  • 在升级的 finalize 动作执行之后,将不允许再执行降级和回滚。

不同点:

  • 「降级」能支持 rollling 的方式,可以滚动降级;而「回滚」需要停止服务一段时间;
  • 「降级」过程只会将软件版本还原成升级前的,会保留用户现有的数据状态;而「回滚」则会将用户数据还原成升级前的状态模式,现有的数据状态不保存。

友情提示:

  • 升级慎重,降级、回滚更要慎重。生产环境中,集群升级之前必须进行科学调研,评估升级后的版本跟现有业务的兼容性。
  • 在测试环境下完整模拟升级流程,并且针对升级前集群状态进行备份,避免意外发生导致集群中断。不要奢求升级失败时,通过回滚、降级等操作挽救集群。

a. 降级

降级会将软件还原回升级前的版本,并保留用户数据。假设时间 T 是滚动升级开始时间,并且升级通过降级终止。然后,在 T 之前或之后创建的文件在 HDFS 中仍然可用。在 T 之前或之后删除的文件在 HDFS 中仍然被删除。

仅当两个版本之间的 NameNode 布局版本和 DataNode 布局版本均未更改时,才可以将较新的版本降级为升级前的版本。

在高可用性集群中,当正在进行从旧软件版本到新软件版本的滚动升级时,可以滚动方式将升级后的计算机降级到旧软件版本。与之前相同,假设 NN1 和 NN2 分别处于活动状态和待机状态。

注意,在降级 NameNode 之前,必须先降级 DataNode,因为协议可以以向后兼容的方式更改,但不能向前兼容,即:旧的 DataNode 可以与新的 NameNode 对话,反之则不行。

以下是在不停机的情况下进行降级的步骤:

(1)降级 DataNode

#### 1. 选择一小部分数据节点(例如,特定机架下的所有数据节点)
# 执行降级操作,其中IPC_PORT由参数dfs.datanode.ipc.address指定,默认9867。
$ hdfs dfsadmin -shutdownDatanode <DATANODE_HOST:IPC_PORT> upgrade
# 执行命令检查节点是否完全停止
$ hdfs dfsadmin -getDatanodeInfo <DATANODE_HOST:IPC_PORT>
#### 2. 重复上述步骤,直到集群中所有升级的数据节点降级。然后重新启动 DataNode。

(2)降级 Active NameNode 和 Standby NameNode

# 关闭和降级 NN2
# 正常启动 NN2 作为待机
# 从 NN1 到 NN2 的故障转移,以便 NN2 变为活动状态,而 NN1 变为待机状态
# 关闭并降级 NN1
# 正常启动 NN1 作为待机

(3)完成降级操作

$ hdfs dfsadmin -rollingUpgrade finalize

HA 集群降级(downgrade)注意事项:

(1)降级与升级在 HA 模式下有一个共同点

在操作 NameNode 时,都是先从 Standby 节点开始操作,等 Standby 节点升/降结束,做一次切换,使另外一个节点得以进行升/降操作。在全程中,始终保持一个 Active 节点对外提供服务。

(2)降级过程 NameNode 与 DataNode 的操作和在升级时操作顺序正好相反

新版本一般在协议、API 是兼容老版本的,如果先降级 NN,那么则会造成 DN 是新版,NN是旧版。新版 DN 中的许多协议将会在旧版 NN 中可能不兼容。所以这里必须要先降级 DN,然后再把服务端 NN 进行降级。看似简单的一次顺序颠倒,背后其实是有更深层的原因的。

(3)联邦集群和非 HA 集群的降级操作与升级操作相对应,进行相应操作命令的替换即可。

b. 回滚

回滚将软件还原到升级前的版本,但也将用户数据还原到升级前的状态。假设时间 T 是滚动升级开始时间,并且升级通过回滚终止。T 之前创建的文件在 HDFS 中仍然可用,但 T 之后创建的文件不可用。T 之前删除的文件在 HDFS 中仍然被删除,但是 T 之后删除的文件被恢复。

始终支持从较新版本回滚到升级前版本。但这不能以滚动方式完成。它需要集群停机。假设 NN1 和 NN2 分别处于活动状态和待机状态。以下是回滚的步骤:

# 1. 停止所有的 NameNode 和 DataNode 节点;
# 2. 在所有的节点机器上恢复升级前的软件版本;
# 3. 在 NN1 节点上执行 -rollingUpgrade rollback 命令来启动 NN1,将 NN1 作为 Active 节点;
# 4. 在 NN2 上执行 -bootstrapStandby 命令并正常启动 NN2,将 NN2 作为 Standby 节点;
# 5. 以 -rollback 参数启动所有的 DataNode。

7.3 相关命令

(1)dfsadmin –rollingUpgrade

hdfs dfsadmin -rollingUpgrade <query|prepare|finalize>
# query       Query the current rolling upgrade status.
# prepare     Prepare a new rolling upgrade.
# finalize    Finalize the current rolling upgrade.

(2)dfsadmin –getDatanodeInfo

hdfs dfsadmin -getDatanodeInfo <DATANODE_HOST:IPC_PORT>

获取有关给定 DataNode 的信息。该命令可以像 Unix ping 命令一样用于检查数据节点是否处于活动状态。

(3)dfsadmin –shutdownDatanode

hdfs dfsadmin -shutdownDatanode <DATANODE_HOST:IPC_PORT> [upgrade]

提交给定 DataNode 的关闭请求。如果指定了可选的升级参数,则建议访问数据节点的客户端等待其重启,然后启用快速启动模式。如果重启不及时,客户端将超时并忽略数据节点。在这种情况下,快速启动模式也将被禁用。

请注意,该命令不会等待数据节点关闭完成。dfsadmin -getDatanodeInfo 命令可用于检查数据节点关闭是否完成。

(4) namenode –rollingUpgrade

hdfs namenode -rollingUpgrade <rollback|started>
# rollback  Restores the namenode back to the pre-upgrade release 
#           but also reverts the user data back to the pre-upgrade state.
# started   Specifies a rolling upgrade already started so that the namenode should
#           allow image directories with different layout versions during startup.