Zookeeper Leader选举网络通信

发布时间 2023-03-30 20:59:03作者: Dazzling!

Leader 选举过程中怎么把票发出去的?发出去后其他节点是怎么收到票的? 这两个问题的答案说简单点那肯定是通过网络传输,那问题又来了:节点之间是怎么建立连接的?

先来分析下 Leader 选举发起投票以及接收投票这部分内容的网络通信原理以及简易架构图,然后再对照着思想进行源码剖析。话不多说,让我们进入正题。

一、核心架构及源码

首先我们在前面几篇就反复提到了 ZooKeeper 的投票机制是异步的,并不是立即将票通过网络传输给接收者了,异步二字大家并不陌生了,那如何实现异步呢?大家可能立即就想到了:先放到队列里,然后开线程异步消费进行发送。 没错, ZooKeeper 也是这么玩的。

接下来一起看看 ZooKeeper 的异步传输原理以及它的核心组件源码。

刚也已经说到了,ZooKeeper 也是先把消息放到队列里,然后开线程异步消费进行发送。 所以我们需要搞一个发送者队列:

fianl BlockingQueue<ByteBuffer> sendQueue;

这样就行了吗?明显不够,我们并不知道这条消息是谁发的,所以我们需要换成 Map 结构,key 来标记 sid,value 是这个队列,如下:

final ConcurrentHashMap<Long, BlockingQueue<ByteBuffer>> queueSendMap;

好了,到目前为止,第一步先把消息放到队列里的数据结构已经设计完成了,接下来我们分析第二步开线程异步消费进行发送。 这个是不是也通俗易懂,直接搞个 Thread 去消费这个队列就完了,怎么消费?太简单不过了,阻塞队列有poll()方法可以弹出数据,这不就是消费嘛。因此,我们需要设计一个线程类:

class SendWorker extends ZooKeeperThread {
    public void run() {
        // 消费
    }
}

看起来也很完美,但是别忘了我们消息是存到 Map 里的,key 是 sid,也就是代表某个 ZooKeeper 节点,那我们这个线程消费 Map 里的哪份数据?因此我们也需要设计一个 Map 类型的数据结构,还是以 sid 为 key,value 变为SendWorker,也就是发送线程。也就是自己线程消费自己数据,数据结构和伪代码如下:

final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;

class SendWorker extends ZooKeeperThread {
    public void run() {
        // 消费
        BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
        ByteBuffer b = bq.poll(timeout, unit);
        send(b);
    }
}

我们用一张图总结一下:

Leader选举-network-1.png

现在发送者的数据结构和核心逻辑已经设计完了,那消息发出去后肯定要有接收者呀,接收完消息后存到哪呢?所以我们还需要一个队列,那就是接收者队列,接收者需要知道是谁发来的吗?完全不需要,你发给我了,我拿着消息做处理就行了,所以不用 Map 结构,直接用一个队列来接收即可:

public final BlockingQueue<Message> recvQueue;

问题又来了:我只是设计了一个接收消息队列,也就是接收完消息存储的地方,那怎么接收?老规矩,肯定还是一个线程去异步地收消息,然后放到recvQueue里,我们称这个线程为RecvWorker

class RecvWorker extends  ZooKeeperThread {
    public void run() {
        // 读消息
        msg = din.readFully(msgArray, 0, length);
        // 放到recvQueue
        recvQueue.offer(msg)
    }
}

到目前为止,我们设计完了发送者的存储结构和工作线程以及接收者的存储结构和工作线程,我们画个架构图来总结下,这样更易于理解,轻松易懂。

Leader选举-network-2.png

以上内容就是 ZooKeeper 在 Leader 选举时网络通信组件的架构设计,既然我称它为组件,那自然要把它封装到一个类里面,我们这里有两个线程,一个发送消息的工作线程SendWorker,另一个是接收消息的工作线程RecvWorker,它们其实都是监听队列,然后放入或弹出数据,所以我们将它们放到网络通信组件里,如下:

public class QuorumCnxManager {
    // 发送者Worker
    final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
    // 发送队列
    final ConcurrentHashMap<Long, BlockingQueue<ByteBuffer>> queueSendMap;
    // 接收队列
    public final BlockingQueue<Message> recvQueue;
    
    class SendWorker extends  ZooKeeperThread {
        // ... 省略
    }
    
    class RecvWorker extends  ZooKeeperThread {
        // ... 省略
    }
}

Leader选举-network-3.png

至此,网络管理组件的大体架构已经设计好了,但是我们还有很多疑问,比如:

  1. 我们知道queueSendMap是发送队列且以 sid 为 key,那么这个队列里的数据是谁放进来的?
  2. 我们知道recvQueue是接收队列,接收的消息来自queueSendMap,那么接收完消息后干嘛了?

接下来,我们就一个一个地攻破这些疑问,彻底掌握 Leader 选举网络通信相关的底层原理以及源码。我们先看第一个疑问:queueSendMap队列里的数据是谁放进来的?

Leader 选举第一步就是投票给自己并异步通过网络发送出去,那这时候它要发给哪些节点?当然是全部节点,每个节点都有自己的 sid,因此这里以接收者的 sid 作为 key,投票给自己的消息体作为 value 存储到queueSendMap当中。

我们在上一篇讲解了 Leader 选举的核心源码,主方法是lookForLeader(),但是我们留了一个方法没讲解:sendNotifications();,我们只说这个是异步将投票信息发给其他节点。接下来我们就看下这个方法到底干了什么:

private void sendNotifications() {
    // for循环,给每一个sid进行发送。
    for (long sid : self.getCurrentAndNextConfigVoters()) {
        ToSend notmsg = new ToSend(/*省略拼凑消息体*/);
        // 放到queue里
        sendqueue.offer(notmsg);
    }
}

我们发现这个方法很简单,就是 for 循环遍历每个节点,然后拼凑消息体,将消息体放到一个全新的队列里sendqueue。这时候我们就该想到:既然我们把消息已经放到了sendqueue里面,那我们肯定有地方从这里取数据,那怎么获取呢?我们sendqueue的数据结构是阻塞队列LinkedBlockingQueue<ToSend>,阻塞队列有poll()方法可以弹出数据,那这就好办了,我们搞个死循环,一直监听数据,有的话就将消息弹出来进行处理,然后以接收者的 sid 为 key 将消息放到queueSendMap不就好了吗?所以太简单啦:

while (!stop) {
    try {
        // 拿到消息
        ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
        // 其实我觉得这个判断就是防止“假死”的情况,如果这么久还没消息就执行下一轮循环重新调用下poll方法。
        if (m == null) {
            continue;
        }
		// 进行处理
        process(m);
    } catch (InterruptedException e) {
        break;
    }
}

void process(ToSend m) {
    // 拼凑消息体
    ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m. Leader , m.zxid, m.electionEpoch, m.peerEpoch, m.configData);
    // 构造BlockingQueue,然后以sid为key,将queue放到queueSendMap里
    BlockingQueue<ByteBuffer> bq = queueSendMap.computeIfAbsent(m.sid, serverId -> new CircularBlockingQueue<>(SEND_CAPACITY));
    // 将消息体加入到BlockingQueue里,这样queueSendMap的数据就构造完成了。
    // 有的人问:bq.offer()了,不需要重新queueSendMap.put()一下吗? 大哥,你先了解下Java是值传递还是引用传递哈。
	bq.offer(requestBuffer);
}