Hadoop:哪个数据节点是最近的数据节点来检索数据以及节点如何实现容错性

发布时间 2023-08-08 19:23:14作者: LateSpring

Q1 who can decide which Data Node is the closest datanode to retrieve the data?

当客户端要读一个文件的某个数据块时,它就需要向NameNode节点询问这个数据块存储在哪些DataNode节点上,这个过程如下图:
image.png
当然,客户端至少需要向NameNode传输三个参数:文件路径、读取文件的起始位置、读取文件的长度,而NameNode会向该客户端返回它所要读取文件内容所在的位置——LocatedBlock对象,该对象包含对应的数据块所有副本所在的DataNode节点的位置信息,客户端接着就会依次从这些DataNode节点上读取该数据块,直到成功读取为止。这里就设计到了一个优化问题了:客户端应该总是选择从距离它最近的可用DataNode节点上读取需要的数据块,所以此时的关键就是如何来计算客户端与DataNode节点之间的距离。这个计算距离的问题本质上涉及到一种数据结构——NetworkTopology(网络拓扑),这种数据结构被NameNode节点用来形象的表示HDFS集群中所有DataNode节点的物理位置,自然这个优化工作就交由NameNode来处理了
NameNode对文件的读优化的实现很简单,基本原理就是按照客户端与DataNode节点之间的距离进行排序,距客户端越近的DataNode节点越被放在LocatedBlock的前面,该算法的基本思路如下:
1.如果该Block的一个副本存在于客户端,则客户端优先从本地读取该数据块;
2.如果该Block的一个副本与客户端在同一个机架上,且没有一个副本存放在客户端,则客户端优先读取这个同机架上的副本;否则客户端优先读取同机器的副本,失败的情况下然后再优先考虑这个同机架上的副本;
3.如果该Block既没有一个副本存在客户端,又没有一个副本与客户端在同一个机架上,则随机选择一个DataNode节点作为优先节点。
其详细实现如下:

/**  
   * @param reader 客户端
   * @param nodes 某个Block所在的DataNode节点
 */
public void pseudoSortByDistance( Node reader, Node[] nodes ) {
    int tempIndex = 0;
    if (reader != null ) {
      int localRackNode = -1;
      //scan the array to find the local node & local rack node
      for(int i=0; i<nodes.length; i++) {
        if(tempIndex == 0 && reader == nodes[i]) { //local node
          //swap the local node and the node at position 0
          if( i != 0 ) {
            swap(nodes, tempIndex, i);
          }
          tempIndex=1;
          if(localRackNode != -1 ) {
            if(localRackNode == 0) {
              localRackNode = i;
            }
            break;
          }
        } else if(localRackNode == -1 && isOnSameRack(reader, nodes[i])) {
          //local rack
          localRackNode = i;
          if(tempIndex != 0 ) break;
        }
      }
 
      // swap the local rack node and the node at position tempIndex
      if(localRackNode != -1 && localRackNode != tempIndex ) {
        swap(nodes, tempIndex, localRackNode);
        tempIndex++;
      }
    }
    
    // put a random node at position 0 if it is not a local/local-rack node
    if(tempIndex == 0 && nodes.length != 0) {
      swap(nodes, 0, r.nextInt(nodes.length));
    }
  }

从上面的具体实现可以看,这种优化策略存在一个很大的缺陷:完全没有考虑到DataNode上的负载情况,从而致使HDFS的读性能急速下降。这一点也正好反映了HDFS好友很大的优化空间,但遗憾的是,HDFS对这种读Block时的副本选择策略并没有开放给开发者,以至于大大的增加了优化的难度[1]。

Q2 How do the Primary Namenode, Standby Namenode and secondary namenode work together to achieve fault tolorence when the primary namenode is down?

1.Primary Namenode

最初hadoop只有namenode保管datanode的元数据.
namenode包括fsimage(文件系统快照)和edit logs(编辑日志)两部分.
fsimage对文件系统的编辑又是快而频繁的, 所以需要一个内存中存储的edit logs, 以便频繁地修改写入.
namenode启动之初, 会将edit logs合并入fsimage中. 也就是说, 新备份会在namenode启动时产生.但一般来说namenode一开就要很久, 导致edit logs越滚越大, 内存可没有这么大,所以出现了secondary namenode.[7]

2.Standby Namenode

在hadoop2.x以及之后的版本开启高可用模式后可以使用Standby Namenode来进行namenode的恢复

2.1.hadoop1.x

在hadoop1中NameNode存在一个单点故障问题,也就是说如果NameNode所在的机器发生故障,那么整个集群就将不可用(hadoop1中有个SecorndaryNameNode,但是它并不是NameNode的备份,它只是namenode的一个助理,协助namenode工作,对fsimage和edits文件进行合并,并推送给NameNode,防止因edits文件过大,导致NameNode重启变得很慢),这是hadoop1的不可靠实现[2]。

2.2.hadoop2.x

在hadoop2.0引入了HA(High Availability, 高可用)机制。hadoop2.0的HA机制官方介绍了有2种方式,一种是NFS(Network File System)方式,另外一种是QJM(Quorum Journal Manager)方式[3][4]。
hadoop2.0的HA 机制有两个namenode,一个是active namenode,状态是active;另外一个是standby namenode,状态是standby。两者的状态是可以切换的,但不能同时两个都是active状态,最多只有1个是active状态。只有active namenode提供对外的服务,standby namenode是不对外服务的。active namenode和standby namenode之间通过NFS或者JN(journalnode,QJM方式)来同步数据
active namenode会把最近的操作记录写到本地的一个edits文件中(edits file),并传输到NFS或者JN中。standby namenode定期的检查,从NFS或者JN把最近的edit文件读过来,然后把edits文件和fsimage文件合并成一个新的fsimage,合并完成之后会通知active namenode获取这个新fsimage。active namenode获得这个新的fsimage文件之后,替换原来旧的fsimage文件。
这样,保持了active namenode和standby namenode的数据的实时同步,standby namenode可以随时切换成active namenode(譬如active namenode挂了)。而且还有一个原来hadoop1.0的secondarynamenode,checkpointnode,buckcupnode的功能:合并edits文件和fsimage文件,使fsimage文件一直保持更新。所以启动了hadoop2.0的HA机制之后,secondarynamenode,checkpointnode,buckcupnode这些都不需要了[6]。

2.3.NFS方式

NFS作为active namenode和standby namenode之间数据共享的存储。active namenode会把最近的edits文件写到NFS,而standby namenode从NFS中把数据读过来。这个方式的缺点是,如果active namenode或者standby namenode有一个和NFS之间网络有问题,则会造成他们之前数据的同步出问题。
image.png

2.4.QJM(Quorum Journal Manager )方式

QJM的方式可以解决上述NFS容错机制不足的问题。active namenode和standby namenode之间是通过一组journalnode(数量是奇数,可以是3,5,7...,2n+1)来共享数据。active namenode把最近的edits文件写到2n+1个journalnode上,只要有n+1个写入成功就认为这次写入操作成功了,然后standby namenode就可以从journalnode上读取了。可以看到,QJM方式有容错的机制,可以容忍n个journalnode的失败。
image.png

2.5.主备节点的切换

active namenode和standby namenode可以随时切换。当active namenode挂掉后,也可以把standby namenode切换成active状态,成为active namenode。可以人工切换和自动切换。人工切换是通过执行HA管理的命令来改变namenode的状态,从standby到active,或者从active到standby。自动切换则在active namenode挂掉的时候,standby namenode自动切换成active状态,取代原来的active namenode成为新的active namenode,HDFS继续正常工作。
主备节点的自动切换需要配置zookeeper。active namenode和standby namenode把他们的状态实时记录到zookeeper中,zookeeper监视他们的状态变化。当zookeeper发现active namenode挂掉后,会自动把standby namenode切换成active namenode。
image.png

3.Secondary NameNode的辅助恢复机制

然而Secondary NameNode在紧急情况(未开启高可用,namenode挂掉的情况)下也可以辅助NameNode的恢复
NameNode故障后,可以通过如下方法进行处理:将Secondary NameNode中数据拷贝到NameNode存储数据的目录。过程如下:
首先强制清除NameNode进行:shell kill -9 NameNode进程
通过rm -rf删除NameNode存储的数据,存储的位置可以在``hdfs-site.xml进行查看:
rm -rf /opt/module/hadoop3.1.3/data/name`
image.png
可以通过scp命令进行拷贝Secondary NameNode下上述配置的目录到NameNode中

scp -r 2NN上的name目录  NN上的name目录

scp -r cxj@hadoop103:/opt/module/hadoop3.1.3/data/name、* ./name/

Secondary NameNode可以恢复绝大部分数据,跟NameNode主要差异,就是Secondary NameNode中没有NameNode最新的编辑日志,因为编辑日志是按一定规则进行提交合并的,不符合条件的edits文件就存在于NameNode中,所以如果服务器出现问题需要进行NameNode的恢复,那么通过Secondary NameNode不一定可以完全恢复所有的数据[5]。

Reference:
[1]https://blog.csdn.net/xhh198781/article/details/7256142
[2]https://blog.csdn.net/daydayup_668819/article/details/70815335#:~:text=standby%E7%8A%B6%E6%80%81%E7%9A%84NameNode%E6%9C%89%E8%83%BD%E5%8A%9B%E8%AF%BB%E5%8F%96JNs%E4%B8%AD%E7%9A%84%E5%8F%98%E6%9B%B4%E4%BF%A1%E6%81%AF%EF%BC%8C%E5%B9%B6%E4%B8%94%E4%B8%80%E7%9B%B4%E7%9B%91%E6%8E%A7edit,log%E7%9A%84%E5%8F%98%E5%8C%96%EF%BC%8C%E6%8A%8A%E5%8F%98%E5%8C%96%E5%BA%94%E7%94%A8%E4%BA%8E%E8%87%AA%E5%B7%B1%E7%9A%84%E5%91%BD%E5%90%8D%E7%A9%BA%E9%97%B4%E3%80%82%20standby%E5%8F%AF%E4%BB%A5%E7%A1%AE%E4%BF%9D%E5%9C%A8%E9%9B%86%E7%BE%A4%E5%87%BA%E9%94%99%E6%97%B6%EF%BC%8C%E5%91%BD%E5%90%8D%E7%A9%BA%E9%97%B4%E7%8A%B6%E6%80%81%E5%B7%B2%E7%BB%8F%E5%AE%8C%E5%85%A8%E5%90%8C%E6%AD%A5%E4%BA%86%EF%BC%8C%E5%A6%82%E5%9B%BE3%E6%89%80%E7%A4%BA%E3%80%82
[3]https://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html
[4]https://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithNFS.html
[5]https://blog.csdn.net/qq_43967413/article/details/121788004
[6]https://blog.csdn.net/jarth/article/details/52839864
[7]https://blog.csdn.net/qq_39006282/article/details/105808380#:~:text=standby%20namenode%E6%98%AF%E5%92%8Cnamenode%E5%90%8C%E6%97%B6%E6%9B%B4%E6%96%B0%E7%9A%84%2C%20%E4%BD%86%E5%AE%83%E4%B8%8D%E6%98%AF%E4%B8%BBnamenode%2C%20%E5%8F%AA%E6%98%AFstandby.%20standby%20namenode%E4%BC%9A%E5%92%8Cnamenode%E5%85%B1%E4%BA%ABedit%20logs%2C,%E5%B9%B6%E5%9C%A8namenode%E5%AE%95%E6%9C%BA%E6%97%B6%E7%83%AD%E5%88%87%E6%8D%A2%2C%20%E6%8E%A5%E6%9B%BFnamenode%E7%9A%84%E5%B7%A5%E4%BD%9C.%20datanode%E4%BC%9A%E5%B0%86%E4%BF%AE%E6%94%B9%E4%BF%A1%E6%81%AF%E5%90%8C%E6%97%B6%E5%91%8A%E8%AF%89%E4%B8%A4%E4%B8%AAnamenode%2C%20%E4%BD%86standby%20namenode%E4%B8%8D%E4%BC%9A%E8%BF%9B%E8%A1%8Cedit%20logs%E7%9A%84%E6%93%8D%E4%BD%9C%2C%20%E9%99%A4%E9%9D%9Enamenode%E5%9D%8F%E6%8E%89.