Zookeep Leader选举源码

发布时间 2023-03-29 21:09:53作者: Dazzling!

Leader 选举的核心底层原理可以很简单地概述为:核心参数是 logicClock 逻辑时钟、 epoch、事务次数、myid,核心流程为:先对比 logicClock,再对比 epoch,其次对比事务次数,最后对比myid。

一、投给自己,异步广播

首先明确的一点是只有状态是 LOOKING 的时候才会发生选举。也就是如下:

switch (getPeerState()) {
    case LOOKING:
        // 选举流程
        lookFor Leader ();
        break;
    case OBSERVING:
        // ...
        break;
    case FOLLOWING:
        // ...
        break;
    case LEADING:
        // ...
        break;
}

其次我们只需要关注lookForLeader()这个方法就好了,按照我们之前的推论,这个方法第一步应该给 logicClock 自增 1,然后投自己一票且广播给其他节点,我们看下源码,和我们之前的猜想与推论是否吻合。

AtomicLong logicalclock = new AtomicLong();

public Vote lookForLeader () throws InterruptedException {
	// 上锁
    synchronized (this) {
        // 更新逻辑时钟( logicClock ),自增 1
        logicalclock.incrementAndGet();
        // 先投自己一票。将自己的myid,zxid, epoch 准备好,下面sendNotifications方法通过网络给发送出去
        updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerepoch ());
    }
    
    // 异步将投票信息发给其他节点
    sendNotifications();
}

这里有两个黑盒方法:updateProposal()sendNotifications(),我虽然写了注释说第一个方法是投自己一票,第二个方法是异步发给其他节点,但是它具体是怎么做的我们还不知道。接下来我们就逐个攻破,先来看第一个方法updateProposal() 先投自己一票:

synchronized void updateProposal(long  Leader , long zxid, long  epoch ) {
    // 我投票给的 Leader 的sid
    proposedLeader  =  Leader ;
    // 我投票给的 Leader 的zxid
    proposedZxid = zxid;
    // 我投票给的 Leader 的 epoch 
    proposed epoch  =  epoch ;
}

很简单,就是赋值给三个成员变量。那我们就可以猜想第二个方法的逻辑了,其实就是把这三个成员变量给异步发出去,相当于 updateProposal() 初始化投票信息,然后借助 sendNotifications() 异步发送给其他节点,那我们看下是不是这样呢?

// 发送队列
LinkedBlockingQueue<ToSend> sendqueue;

private void sendNotifications() {
    for (long sid : self.getCurrentAndNextConfigVoters()) {
        QuorumVerifier qv = self.getQuorumVerifier();
        ToSend notmsg = new ToSend(
            ToSend.mType.notification,
            // 我投票给的 Leader 
            proposedLeader ,
            // 我投票给的 Leader 的zxid
            proposedZxid,
            // 逻辑时钟,代表轮次
            logicalclock.get(),
            // 寻主状态
            QuorumPeer.ServerState.LOOKING,
            // 我的sid
            sid,
            // 我投票给的 Leader 的 epoch 
            proposedepoch,
            qv.toString().getBytes(UTF_8));
		
        // 放到队列里
        sendqueue.offer(notmsg);
    }
}

这很清晰明了了吧,一句话概述就是:我是谁(sid),我把我这宝贵的一票投给了proposedLeader ,它的 zxid 是proposedZxid,它的 epoch 是proposedEpoch ,我们这轮的轮次是logicalclock,当然了,我是寻主状态(LOOKING)。

然后把这些参数拼装成一个 ToSend 对象,重点是直接把对象丢到了一个 LinkedBlockingQueue 里面,并没有其他网络发送代码,那是怎么发送的呢?用你的脚想想也应该知道肯定有个消费者去 LinkedBlockingQueue 队列里面获取数据、然后通过网络广播给其他节点,但是网络通信这部分内容我们打算放到下一篇去讲解,目前为止我们就假设已经广播给其他节点了。

最后我们用一张图小结下流程:

leader选举-19.png

好了,第一轮投票到此结束了,我们只是将票投给自己且异步广播给其他节点,接下来我们该看第二轮了,第二轮是干嘛来着?我们回忆一下,是不是该接收其他节点的投票信息,然后逐级对比?OK,直奔主题~

二、接收投票,进行选举

既然是接收投票,那是不是需要有地方存储票据信息,很简单,搞个 class 类就完事了,但是里面包含哪些字段呢?这不是更简单吗?你发送的时候有哪些字段,我接收的时候就有哪些字段不就完了?比如proposedLeader proposedZxid等,具体设计如下:

public static class Notification {
    // 投票给的 Leader ,提议的 Leader 的sid
    long  Leader ;

    //投票给的 Leader 的zxid,提议的 Leader 的zxid
    long zxid;

    // 发送者的 logicClock  逻辑时钟
    long electionepoch ;

    // 发送者的状态
    QuorumPeer.ServerState state;

    // 发送者的sid
    long sid;

    // 投票给的 Leader 的 epoch ,提议的 Leader 的 epoch 
    long peer epoch ;
}

接收完投票后,我们下一步就是拿着投票信息对 epoch 、zxid、myid 等字段对比以进行选举,选举完后我们找个地方存起来,这个存储的地方也就是票箱。那这就有一个问题了,这个票箱该用什么数据结构来存储呢?在设计数据结构之前,我们肯定是要先想想票箱里都需要包含什么内容。包含两大部分:谁的票箱投票信息

接下来我们继续分析第一部分:谁的票箱。这个怎么理解?简单,直接用server.id就完事了,也就是我们常说的 myid 或者叫 sid。

再看第二部分:投票信息。这部分都包含什么呢?这个也很简单,我们之前都剖析过原理。这个相当于:我把票投给谁,那么这里的“谁”就是我所说的投票信息。它至少要包含 Leader 的 sid,但是只给 sid 是不够的,我还需要知道 sid 的 logicClock 、zxid 以及 epoch,这样才更有可信性,要不然你说投给sid=1的,我说投给sid=2的,那该听谁的?所以需要对比 logicClock 、zxid 和 epoch 的嘛!因此到目前为止已经四个核心字段了:logicClocksidzxidepoch,这样看起来足够了,但是别忘了,把状态带过去,要不然别人怎么知道你是 LOOKING 寻主状态还是 LEADING 领导者状态呢?

所以完整的设计字段如下:

public class Vote {
    public Vote(long id, long zxid, long election epoch , long peer epoch ) {
        this.version = 0x0;
        // sid
        this.id = id;
        // zxid
        this.zxid = zxid;
        //  logicClock 
        this.electionepoch  = electionepoch ;
        //  epoch 
        this.peerepoch  = peerepoch ;
        // state
        this.state = ServerState.LOOKING;
    }
}

上面的设计一言以蔽之就是:我是谁,我把票投给了谁。 有了这句话还难选择数据结构吗?这不是典型的key-value嘛,所以我们用 Map 来存储最为合适:

// 票箱。存储收到的投票
Map<Long, Vote> recvset = new HashMap<Long, Vote>();

目前为止,存储投票信息的数据结构设计完成了,就差选举流程了,直接开始吧~

1. 先接收投票信息

while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
    // 获取其他节点的投票信息
	Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
}

这里有人会问:recvqueue是什么?它和发送投票的时候一样,就是一个LinkedBlockingQueue<Notification>,至于什么时候放到这个 queue 里的,以及网络是如何通信的,我们这先不关注,后面会单独拿出来进行分析。我们先假设,拿到了其他节点的票据信息。

那下面是不是该对比 epoch 、zxid 等字段了?所以下面进入正题:投票核心逻辑

2. 投票核心逻辑

第一步肯定是要先看状态,状态是 LOOKING 寻主的才能对比投票信息等流程,所以第一步代码很简单:

// 判断收到选票的sid和选举 Leader 的sid是否在我们所配置的集群myid列表内。
else if (validVoter(n.sid) && validVoter(n.leader )) {
    // 判断接收到投票者的状态
    switch (n.state) {
        // 如果投票者的状态是LOOKING,代表也是寻主状态,需要参与投票进行选主
        case LOOKING:
            break;
        case OBSERVING:
            LOG.debug("Notification from  Observer : {}", n.sid);
            break;
        case FOLLOWING:
        case LEADING:
            break;
        default:
            break;
    }
}

接下来就要瞪大眼睛好好看了,因为接下来就是正式进入选举的源码,我不会只照搬源码来分析,那样的话我们前三篇讲的原理岂不是废了吗?所以需要理论结合源码的方式,我们先简单回顾下之前讲的选举原理,第一步是对照 logicClock,然后有三种情况:接收到的 logicClock 大于自己的 logicClock 、接收到的 logicClock 小于自己的 logicClock 、接收到的 logicClock 等于自己的 logicClock 。

我们先来分析第一种情况:接收到的 logicClock 大于自己的 logicClock 。

  • 如果接收到的 logicClock 大于自己的 logicClock ,说明该服务器的选举轮次落后于其他服务器的选举轮次,那么先把自己的 logicalclock 更新为收到的,然后立即清空自己维护的票箱,接着对比myid、zxid、 epoch ,看看收到的票据和自己的票据哪个更适合作为 Leader,最终再次将自己的投票通过网络广播出去。

接下来我们看 ZooKeeper 源码的实现方式是不是我们所说的那样:

// 如果收到的逻辑时钟 logicClock 大于自己的 logicClock 
if (n.electionepoch  > logicalclock.get()) {
    // 那么就把自己的 logicClock 更新为收到的。
    logicalclock.set(n.electionepoch );
    // 且清空自己的票箱
    recvset.clear();
    // 比较 epoch 、zxid、myid
    if (totalOrderPredicate(n.leader , n.zxid, n.peerepoch , getInitId(), getInitLastLoggedZxid(), getPeerepoch ())) {
        // 把自己的票据更新为 logicClock 大的那个,然后下次发送的时候就发送的是 logicClock 大的票据了
        updateProposal(n.leader, n.zxid, n.peerepoch );
    } else {
        // 如果收到的票据信息小于自己的票据信息,那就不变,下次发送继续选择自己的票据。
        updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerepoch ());
    }
    // 将更新的票据信息异步发送给其他节点
    sendNotifications();
}

其实没什么好分析的,和我们所说的原理是一致的,唯一一个黑盒就是这段代码:

// 比较 epoch 、zxid、myid
if (totalOrderPredicate(n.leader , n.zxid, n.peerepoch , getInitId(), getInitLastLoggedZxid(), getPeerepoch ())) {}

因为这段代码我们不知道怎么对比的。按照我们之前剖析的原理来讲,应该是这样的:先对比 epoch,优先选择 epoch 大的;如果 epoch 一样,那再对比 zxid,优先选 zxid 大的;如果 zxid 也一样,那就选一个 myid 最大的。 但是不是这样呢?我们看下源码:

protected boolean totalOrderPredicate(long newId, long newZxid, 
                                      long newepoch , long curId, long curZxid, long curepoch ) {
    return ((newepoch  > curepoch )
            || ((newepoch  == curepoch )
                && ((newZxid > curZxid)
                    || ((newZxid == curZxid)
                        && (newId > curId)))));
}

看着很乱,但是其实如果你懂原理的话就很简单了。我们给拆解一下看看。

首先是:(newepoch > curepoch ),这符合我们所说的 “先对比 epoch,优先选择 epoch 大的”

其次是:((newepoch == curepoch ) && ((newZxid > curZxid))),这也符合我们所说的 “如果 epoch 一样,那再对比 zxid,优先选 zxid 大的”

最后是:((newZxid == curZxid) && (newId > curId)),很完美地符合最后一条 “如果 zxid 也一样,那就选一个 myid 最大的”

后面就是根据对比的结果来更新自己即将投票的票据,如果收到的票据没有自己的“优秀”,那就不变,反之则更新为收到的票据信息,然后异步通过网络发送给其他节点。

接着我们再来分析第二种情况:接收到的 logicClock 小于自己的 logicClock 。

  • 如果接收到的 logicClock 小于自己的 logicClock ,那么当前服务器直接忽略该投票,继续处理下一个投票。

那我们看下代码是不是这样实现的呢?

else if (n.electionepoch  < logicalclock.get()) {
    // 说明收到的票据已经过期了,因为它的 logicClock 小 N 轮,所以直接丢掉不管就行了。
    LOG.debug(
        "Notification election  epoch  is smaller than logicalclock. n.election epoch  = 0x{}, logicalclock=0x{}",
        Long.toHexString(n.electionepoch ),
        Long.toHexString(logicalclock.get()));
    break;
} 

简单粗暴,啥也没干,打个 log 完事,和我们之前的分析也吻合。

还差最后一种情况,那就是:接收到的 logicClock 等于自己的 logicClock 。 这就分为两种情况了,我们之前也剖析过,具体如下。

  • 如果接收到的 logicClock 等于自己维护的 logicClock ,那就对比二者的 vote_zxid,也就是对比被推荐服务器上所保存数据的最大 zxid。若收到的 vote_zxid 大于自己 vote_zxid,那就将自己票中的 vote_zxid 和 vote_id (被推荐服务器的 myid)改为收到的票中的 vote_zxid 和 vote_id 并通过网络广播出去。另外将收到的票据和自己更改后的 vote_id 放入自己的票箱。
  • 如果接收到的 logicClock 等于自己维护的 logicClock 且二者的 vote_zxid 也一致,那就比较二者的 vote_id,也就是被推荐服务器的 myid。若接收到的投票的 vote_id 大于自己所选的 vote_id,那就将自己票中的 vote_id 改为接收到的票中的 vote_id 并通过网络广播出去。另外将收到的票据和自己更改后的 vote_id 放入自己的票箱。

我们直接看这部分原理对应的源码实现,看看是不是我们所说的那样:

else if (totalOrderPredicate(n.leader , n.zxid, n.peerepoch , proposedLeader , proposedZxid, proposedepoch )) {
    // 如果 logicClock 一样,那就需要按照 epoch、zxid、myid三者进行对比了,然后更新票箱
    updateProposal(n.leader , n.zxid, n.peerepoch );
    // 异步发送出去
    sendNotifications();
}

很简单,核心方法是totalOrderPredicate(),刚才讲解过,所以此处不在啰嗦。现在还有一个黑盒方法:updateProposal(),也就是更新票据信息的方法,我们不知道它里面做了什么,我一直没说这个,是因为这个太简单了,为啥现在要说呢?因为下面用到了它里面赋值的变量,废话不多说,一起看下:

synchronized void updateProposal(long leader , long zxid, long epoch ) {
    proposedLeader = leader ;
    proposedZxid = zxid;
    proposedepoch = epoch ;
}

好吧,啥也没干,就是将这轮的投票结果赋值给三个变量。这三个变量有啥用?不急,在分析之前先用一个图简单总结下目前为止的流程。

leader选举-5.png

现在我们已经验证过我们之前分析的选举流程以及原理,而且将每轮的投票结果放到了proposedLeader proposedZxidproposedepoch 这三个变量里。

我们是不是丢了一个很重要的步骤,就是我们最初设计的票箱怎么没用上?也就是我们上面设计的 Map,来存储其他节点的投票信息。记录这个结果有啥用呢?当然有用!我们选举是过半原则,那我怎么知道过没过半呢?有了每轮的选举记录我们不就很轻松地知道每个 Server 被选举的次数了嘛

所以第一步,肯定是先存储到票箱里,我们的票箱 Map 结构的 key 是 sid,value 是 Vote 对象,Vote 对象也很好得到,我们一开始接收到的投票信息Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);不就是嘛,所以如下:

// 票箱。将最终投的票放到票箱。最后的过半原则就是基于这个票箱来完成的。
recvset.put(n.sid, new Vote(n.leader , n.zxid, n.electionepoch , n.peerepoch ));

第二步肯定是判断是否过半了,过半的节点就直接被推为 Leader 了,怎么判断是否过半呢?

这里 ZooKeeper 作者先把 Map 转成了 List,然后进行判断对比的,我这里只贴出核心源码,一些细节代码你可以自己去阅读,大致如下:

// 是否过半
if (voteSet.hasAllQuorums()) {
    // 因为过半,符合 Leader 选举原则,因此处理一些选举逻辑
}

紧接着我们就看看这个是否过半的方法是咋做的,我这里省略了一些代码,其实核心就是下面这样:

private int half = votingMembers.size() / 2;

public boolean containsQuorum(Set<Long> ackSet) {
    return (ackSet.size() > half);
}

至于 votingMembersackSet 是怎么赋值的,我希望你感兴趣的话能自己去阅读下,思想大于源码,过于细节的代码只会影响篇幅和阅读体验。以及每轮的投票结果proposedLeader proposedZxidproposedepoch 这三个变量也是计算过半原则用的,感兴趣的朋友可以读一下这块源码。

到这里我们很清楚地知道即将有两个分支要到来:过半的逻辑和没过半的逻辑。 我们先来看没过半的情况:

如果没过半,那就很简单了,啥也不处理,继续下一轮投票即可。

那如果过半了呢?那就代表选票达到了 Leader 选举的要求,那也很简单了。

  1. 看下自己是不是 Leader ,如果自己是 Leader ,那就把自己的状态从 LOOKING 改为 LEADING。
  2. 如果自己不是 Leader ,那就看自己是 Observer 还是 Follower ,如果是 Observer 就改为 OBSERVING 状态,如果是 Follower ,那就改为 FOLLOWING 状态
  3. 清空自己接收投票信息的队列,因为都选出来了嘛,所以接收队列里的票也无效了,因此清空下无效投票。

原理和思路有了,那就切入到代码实现当中,看看是不是这么写的:

// 设置当前节点的状态:
// 判断 Leader 是不是自己,如果是自己,那就直接更新为LEADING
// 如果不是自己,那就根据条件看是FOLLOWING还是OBSERVING
setPeerState(proposedLeader, voteSet);
// 组装这次 Leader 选举最终的投票结果
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedepoch );
// 清空接收票据的队列
leaveInstance(endVote);
// 返回结果
return endVote;

就两个方法:setPeerState(proposedLeader , voteSet);leaveInstance(endVote);,逐个分析就完事了。先看第一个:setPeerState()设置当前节点状态。

private void setPeerState(long proposedLeader , SyncedLearnerTracker voteSet) {
    // 判断被选举出来的 Leader 是不是自己,如果是自己,直接更新状态为LEADING,如果不是自己,则走learningState()方法
    ServerState ss = (proposedLeader  == self.getId()) ? ServerState.LEADING : learningState();
    // 设置状态
    self.setPeerState(ss);
    if (ss == ServerState.LEADING) {
        leadingVoteSet = voteSet;
    }
}

上面这段代码很清晰,只是有一个黑盒方法:learningState(),这个方法肯定是判断 Observer 和 Follower 的,但是要怎么区分这两个呢?其实很简单嘛,看你配置文件配置的是不是 Observer 咯,默认就是 Follower ,启动的时候已经从配置文件里读出来了,所以直接判断就好啦:

private ServerState learningState() {
    //  Follower 
    if (self.getLearnerType() == LearnerType.PARTICIPANT) {
        return ServerState.FOLLOWING;
    } else { //  Observer 
        return ServerState.OBSERVING;
    }
}

很简单,也很清晰,也符合我们的猜想与设计,还差最后一步:清空接收票据的队列,也就是leaveInstance()方法。这个方法更简单粗暴:

private void leaveInstance(Vote v) {
    // 直接Map.clear()
    recvqueue.clear();
}

用一张图总结这两个方法:

leader选举-20.png

到目前为止,我们 ZooKeeper 的 Leader 选举核心源码就已经剖析完成了,其实和我们之前三篇讲解的原理是吻合的,最后我们对全篇做个总结。

三、总结