zookeeper源码(04)leader选举流程

发布时间 2023-11-07 13:29:01作者: 用户不存在!

在"zookeeper源码(03)集群启动流程"中介绍了leader选举的入口,本文将详细分析leader选举组件和流程。

leader选举流程(重要)

  1. quorumPeer的start阶段使用startLeaderElection()方法启动选举
  2. LOOKING状态,投自己一票
  3. createElectionAlgorithm - 创建选举核心组件:QuorumCnxManager(管理连接)、FastLeaderElection(选举)等
  4. quorumPeer的main loop根据当前状态执行不同流程

状态与流程:

  • LOOKING - 使用fastLeaderElection.lookForLeader选举

    1. 递增选举epoch开启新一轮选举
    2. 使用自己的serverId、zxid、currentEpoch初始化投票决议
    3. 把选票发出去
    4. 循环接收其他server的选票:
      • LOOKING选票:对比选举epoch、currentEpoch、zxid、serverId决定投给哪个server,若是超过半数节点同意该决议,则将该server确定为leader
      • FOLLOWING选票:对比选举epoch后将选票投给当前leader
      • LEADING选票:对比选举epoch后将选票投给当前leader
  • LEADING - 创建Leader对象执行lead逻辑

    1. zkServer加载数据
    2. 启动quorum监听
    3. 根据各个follower的当前epoch确定新的epoch和zxid
    4. 给follower同步数据
    5. 启动zkServer
    6. 每间隔tick验证多数follower同步状态
  • FOLLOWING - 创建Follower对象指定followLeader逻辑

    1. connectToLeader - 连接leader服务器
    2. registerWithLeader - 向leader发送当前epoch,等待leader发送新一轮的epoch
    3. syncWithLeader - 接收leader同步的数据:txnlog、committedlog、snapshot
    4. 保持通信处理来自leader的数据包
  • OBSERVING - 创建Observer对象执行observeLeader逻辑,基本与FOLLOWING相同

启动leader选举

QuorumPeer的startLeaderElection方法是启动选举的入口:

public synchronized void startLeaderElection() {
    try {
        if (getPeerState() == ServerState.LOOKING) {
            // 投自己一票,封装zxid和epoch
            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
        }
    } catch (IOException e) {
        RuntimeException re = new RuntimeException(e.getMessage());
        re.setStackTrace(e.getStackTrace());
        throw re;
    }
    // electionType总是3
    this.electionAlg = createElectionAlgorithm(electionType);
}

protected Election createElectionAlgorithm(int electionAlgorithm) {
    Election le = null;

    // TODO: use a factory rather than a switch
    // 可以使用策略模式替换switch语句
    switch (electionAlgorithm) {
    case 1:
        throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
    case 2:
        throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
    case 3:
        QuorumCnxManager qcm = createCnxnManager();
        QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
        // 关闭oldQcm
        if (oldQcm != null) {
            oldQcm.halt();
        }
        // 用来启动serverSocket监听
        QuorumCnxManager.Listener listener = qcm.listener;
        if (listener != null) {
            listener.start();
            FastLeaderElection fle = new FastLeaderElection(this, qcm);
            fle.start();
            le = fle;
        }
        break;
    default:
        assert false;
    }
    return le;
}

public QuorumCnxManager createCnxnManager() {
    // socket超时设置使用,默认tickTime * syncLimit
    // 按照zoo_sample.cfg文件配置是2000 * 5
    int timeout = quorumCnxnTimeoutMs > 0 ? quorumCnxnTimeoutMs : this.tickTime * this.syncLimit;
    return new QuorumCnxManager(
        this,
        this.getMyId(),
        this.getView(), // serverId->quorumServer
        this.authServer,
        this.authLearner,
        timeout,
        this.getQuorumListenOnAllIPs(), // 是否监听所有IP默认false
        this.quorumCnxnThreadsSize, // 默认20
        this.isQuorumSaslAuthEnabled());
}

QuorumCnxManager类

概述:

This class implements a connection manager for leader election using TCP.
It maintains one connection for every pair of servers. The tricky part is to guarantee that there is exactly one connection for every pair of servers that are operating correctly and that can communicate over the network. If two servers try to start a connection concurrently, then the connection manager uses a very simple tie-breaking mechanism to decide which connection to drop based on the IP addressed of the two parties.
For every peer, the manager maintains a queue of messages to send. If the connection to any particular peer drops, then the sender thread puts the message back on the list. As this implementation currently uses a queue implementation to maintain messages to send to another peer, we add the message to the tail of the queue, thus changing the order of messages. Although this is not a problem for the leader election, it could be a problem when consolidating peer communication. This is to be verified, though.
  1. 维护leader选举时server之间的tcp连接
  2. 确保两个server之间存在一个连接,如果两个server同时建立连接,则始终保留id大的一方建立的连接
  3. 使用队列缓存待发送的消息

主要字段

// 用于执行QuorumConnectionReqThread和QuorumConnectionReceiverThread
private ThreadPoolExecutor connectionExecutor;

// 管理sid -> SendWorker/BlockingQueue/ByteBuffer
final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
final ConcurrentHashMap<Long, BlockingQueue<ByteBuffer>> queueSendMap;
final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;

// 接收队列
public final BlockingQueue<Message> recvQueue;

主要方法

public void initiateConnection(final MultipleAddresses electionAddr, final Long sid);
// 将initiateConnection方法放到了QuorumConnectionReqThread中然后提交给connectionExecutor异步执行
public boolean initiateConnectionAsync(final MultipleAddresses electionAddr, final Long sid);

private boolean startConnection(Socket sock, Long sid) throws IOException;

public void receiveConnection(final Socket sock);
// 将receiveConnection方法放到了QuorumConnectionReceiverThread中然后提交给connectionExecutor异步执行
public void receiveConnectionAsync(final Socket sock);

public void toSend(Long sid, ByteBuffer b);

boolean connectOne(long sid, MultipleAddresses electionAddr);
void connectOne(long sid);
public void connectAll();

其余工具方法不分析。

initiateConnection方法

创建Socket对象,如有必要则做ssl握手和认证,发送初始化数据包。如果自己id小则关闭连接,以确保两个server之间存在一个连接。

public void initiateConnection(final MultipleAddresses electionAddr, final Long sid) {
    Socket sock = null;
    try {
        // 创建Socket
        if (self.isSslQuorum()) {
            sock = self.getX509Util().createSSLSocket();
        } else {
            sock = SOCKET_FACTORY.get();
        }
        setSockOpts(sock); // socket设置例如timeout
        // 连接目标peer
        sock.connect(electionAddr.getReachableOrOne(), cnxTO);
        // ssl握手
        if (sock instanceof SSLSocket) {
            SSLSocket sslSock = (SSLSocket) sock;
            sslSock.startHandshake();
        }
    } catch (X509Exception e) {
        closeSocket(sock);
        return;
    } catch (UnresolvedAddressException | IOException e) {
        closeSocket(sock);
        return;
    }

    try {
        // 发连接初始化数据包、sasl认证
        // 如果selfId小于对方,关闭连接
        // 创建SendWorker、RecvWorker并启动
        // 创建对应sid的发送队列
        startConnection(sock, sid);
    } catch (IOException e) {
        closeSocket(sock);
    }
}

startConnection方法

  1. 发连接初始化数据包、sasl认证
  2. 如果selfId小于对方,关闭连接
  3. 创建SendWorker、RecvWorker并启动
  4. 创建对应sid的发送队列
private boolean startConnection(Socket sock, Long sid) throws IOException {
    DataOutputStream dout = null;
    DataInputStream din = null;
    try {
        // 输出流
        BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
        dout = new DataOutputStream(buf);

        // 发协议版本、myid、address初始化数据包
        long protocolVersion = self.isMultiAddressEnabled() ? PROTOCOL_VERSION_V2 : PROTOCOL_VERSION_V1;
        dout.writeLong(protocolVersion);
        dout.writeLong(self.getMyId());

        // now we send our election address. For the new protocol version, we can send multiple addresses.
        Collection<InetSocketAddress> addressesToSend = protocolVersion == PROTOCOL_VERSION_V2
                ? self.getElectionAddress().getAllAddresses()
                : Arrays.asList(self.getElectionAddress().getOne());

        String addr = addressesToSend.stream()
                .map(NetUtils::formatInetAddr).collect(Collectors.joining("|"));
        byte[] addr_bytes = addr.getBytes();
        dout.writeInt(addr_bytes.length);
        dout.write(addr_bytes);
        dout.flush();

        din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
    } catch (IOException e) {
        closeSocket(sock);
        return false;
    }

    // authenticate learner
    QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
    if (qps != null) {
        authLearner.authenticate(sock, qps.hostname);
    }

    if (sid > self.getMyId()) { // If lost the challenge, then drop the new connection
        closeSocket(sock);
    } else {
        // 创建SendWorker、RecvWorker
        SendWorker sw = new SendWorker(sock, sid);
        RecvWorker rw = new RecvWorker(sock, din, sid, sw);
        sw.setRecv(rw);

        SendWorker vsw = senderWorkerMap.get(sid);

        if (vsw != null) {
            vsw.finish();
        }

        senderWorkerMap.put(sid, sw);

        // 创建发送队列
        queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));

        sw.start();
        rw.start();

        return true;
    }
    return false;
}

receiveConnection方法

当server收到连接请求,如果change获胜(selfId大于对方),将关闭该连接,由自己去连接对方。

public void receiveConnection(final Socket sock) {
    DataInputStream din = null;
    try {
        // 输入流
        din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
        handleConnection(sock, din);
    } catch (IOException e) {
        closeSocket(sock);
    }
}

private void handleConnection(Socket sock, DataInputStream din) throws IOException {
    Long sid = null, protocolVersion = null;
    MultipleAddresses electionAddr = null;

    try {
        protocolVersion = din.readLong();
        if (protocolVersion >= 0) { // this is a server id and not a protocol version
            sid = protocolVersion;
        } else {
            try {
                InitialMessage init = InitialMessage.parse(protocolVersion, din);
                sid = init.sid;
                if (!init.electionAddr.isEmpty()) {
                    electionAddr = new MultipleAddresses(init.electionAddr,
                            Duration.ofMillis(self.getMultiAddressReachabilityCheckTimeoutMs()));
                }
            } catch (InitialMessage.InitialMessageException ex) {
                closeSocket(sock);
                return;
            }
        }

        if (sid == QuorumPeer.OBSERVER_ID) {
            // Choose identifier at random. We need a value to identify the connection.
            sid = observerCounter.getAndDecrement();
        }
    } catch (IOException e) {
        closeSocket(sock);
        return;
    }

    // do authenticating learner
    authServer.authenticate(sock, din);
    // If wins the challenge, then close the new connection.
    if (sid < self.getMyId()) { // 对方比自己id小,需要关闭当前连接,由自己去连接对方
        SendWorker sw = senderWorkerMap.get(sid);
        if (sw != null) {
            sw.finish();
        }

        // 关闭连接
        closeSocket(sock);

        if (electionAddr != null) {
            connectOne(sid, electionAddr); // 连接对方
        } else {
            connectOne(sid);
        }
    } else if (sid == self.getMyId()) {
    } else { // 创建SendWorker、RecvWorker和发送队列
        SendWorker sw = new SendWorker(sock, sid);
        RecvWorker rw = new RecvWorker(sock, din, sid, sw);
        sw.setRecv(rw);

        SendWorker vsw = senderWorkerMap.get(sid);

        if (vsw != null) {
            vsw.finish();
        }

        senderWorkerMap.put(sid, sw);

        queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));

        sw.start();
        rw.start();
    }
}

toSend方法

发消息。

public void toSend(Long sid, ByteBuffer b) {
    // 如果是给自己的消息,直接发给recvQueue
    if (this.mySid == sid) {
        b.position(0);
        addToRecvQueue(new Message(b.duplicate(), sid));
    } else {
        // 将消息发给sid对应的发送队列
        BlockingQueue<ByteBuffer> bq =
            queueSendMap.computeIfAbsent(sid, serverId -> new CircularBlockingQueue<>(SEND_CAPACITY));
        addToSendQueue(bq, b);
        // 检查是否建立了连接
        connectOne(sid);
    }
}

connectOne方法

synchronized boolean connectOne(long sid, MultipleAddresses electionAddr) {
    // 已经建立过连接
    if (senderWorkerMap.get(sid) != null) {
        if (self.isMultiAddressEnabled() && electionAddr.size() > 1 &&
            self.isMultiAddressReachabilityCheckEnabled()) {
            // check是否可达
            senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable();
        }
        return true;
    }
    // 异步建立新连接
    return initiateConnectionAsync(electionAddr, sid);
}

synchronized void connectOne(long sid) {
    if (senderWorkerMap.get(sid) != null) {
        if (self.isMultiAddressEnabled() && self.isMultiAddressReachabilityCheckEnabled()) {
            senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable();
        }
        return;
    }
    // 使用sid从lastCommittedView、lastProposedView中解析address之后在建立连接
    synchronized (self.QV_LOCK) {
        boolean knownId = false;
        // Resolve hostname for the remote server before attempting to
        // connect in case the underlying ip address has changed.
        self.recreateSocketAddresses(sid);
        Map<Long, QuorumPeer.QuorumServer> lastCommittedView = self.getView();
        QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
        Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();
        if (lastCommittedView.containsKey(sid)) {
            knownId = true;
            if (connectOne(sid, lastCommittedView.get(sid).electionAddr)) {
                return;
            }
        }
        if (lastSeenQV != null
            && lastProposedView.containsKey(sid)
            && (!knownId ||
                !lastProposedView.get(sid).electionAddr.equals(lastCommittedView.get(sid).electionAddr))) {
            knownId = true;
            if (connectOne(sid, lastProposedView.get(sid).electionAddr)) {
                return;
            }
        }
    }
}

connectAll方法

Try to establish a connection with each server if one doesn't exist.

public void connectAll() {
    long sid;
    for (Enumeration<Long> en = queueSendMap.keys(); en.hasMoreElements(); ) {
        sid = en.nextElement();
        connectOne(sid);
    }
}

Listener类

用来启动serverSocket监听,一个线程类,在run方法启动监听:

public void run() {
    if (!shutdown) {
        Set<InetSocketAddress> addresses;

        // 获取需要监听的地址
        if (self.getQuorumListenOnAllIPs()) {
            addresses = self.getElectionAddress().getWildcardAddresses();
        } else {
            addresses = self.getElectionAddress().getAllAddresses();
        }
        // 用于阻塞等待
        CountDownLatch latch = new CountDownLatch(addresses.size());
        // 为每一个监听地址创建ListenerHandler
        listenerHandlers = addresses.stream().map(address ->
                        new ListenerHandler(address,self.shouldUsePortUnification(),
                                            self.isSslQuorum(), latch))
                .collect(Collectors.toList());

        final ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
        try {
            // 启动ListenerHandler
            listenerHandlers.forEach(executor::submit);
        } finally {
            executor.shutdown();
        }

        try {
            // 阻塞等待,ListenerHandler结束之后会countdown
            latch.await();
        } catch (InterruptedException ie) {
        } finally {
            // Clean up for shutdown 略
        }
    }
    // 略
}

ListenerHandler run方法:

public void run() {
    try {
        // 接受连接
        acceptConnections();
        try {
            close();
        } catch (IOException e) {}
    } catch (Exception e) {
    } finally {
        latch.countDown();
    }
}

private void acceptConnections() {
    int numRetries = 0;
    Socket client = null;

    while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
        try {
            // 创建ServerSocket并bind端口
            serverSocket = createNewServerSocket();
            while (!shutdown) {
                try {
                    // 接受客户端Socket
                    client = serverSocket.accept();
                    setSockOpts(client); // socket设置如timeout
                    // 使用receiveConnection处理新的连接
                    if (quorumSaslAuthEnabled) {
                        receiveConnectionAsync(client);
                    } else {
                        receiveConnection(client);
                    }
                    numRetries = 0;
                } catch (SocketTimeoutException e) {}
            }
        } catch (IOException e) {
            // 略
        }
    }
    // 略
}

QuorumConnectionReqThread类

用于异步连接其他peer服务,run方法调用initiateConnection方法建立连接。

QuorumConnectionReceiverThread类

用于异步接受连接,run方法调用receiveConnection方法处理新建立的连接。

SendWorker类

Thread to send messages. Instance waits on a queue, and send a message as soon as there is one available. If connection breaks, then opens a new one.

用来发送消息的线程:

  • 封装sid、socket、连接输出流
  • 从发送队列取消息,通过输出流发送

RecvWorker类

Thread to receive messages. Instance waits on a socket read. If the channel breaks, then removes itself from the pool of receivers.

用来读取消息的线程:

public void run() {
    threadCnt.incrementAndGet();
    try {
        while (running && !shutdown && sock != null) {
            // 读取消息长度
            int length = din.readInt();
            if (length <= 0 || length > PACKETMAXSIZE) {
                throw new IOException("Received packet with invalid packet: " + length);
            }
            // 读取数据
            final byte[] msgArray = new byte[length];
            din.readFully(msgArray, 0, length);
            // 保存到接收队列
            addToRecvQueue(new Message(ByteBuffer.wrap(msgArray), sid));
        }
    } catch (Exception e) {
    } finally {
        sw.finish();
        closeSocket(sock);
    }
}

FastLeaderElection类

文档说明:

Implementation of leader election using TCP. It uses an object of the class QuorumCnxManager to manage connections. Otherwise, the algorithm is push-based as with the other UDP implementations. There are a few parameters that can be tuned to change its behavior. First, finalizeWait determines the amount of time to wait until deciding upon a leader. This is part of the leader election algorithm.
  1. 使用tcp实现leader选举,基于推送模式
  2. 使用QuorumCnxManager对象管理连接

构造方法

public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {
    this.stop = false;
    this.manager = manager;
    starter(self, manager);
}

private void starter(QuorumPeer self, QuorumCnxManager manager) {
    this.self = self;
    proposedLeader = -1;
    proposedZxid = -1;

    sendqueue = new LinkedBlockingQueue<>();
    recvqueue = new LinkedBlockingQueue<>();
    // 用来启动WorkerSender和WorkerReceiver
    this.messenger = new Messenger(manager);
}

主要字段

// 在leader最终确定之前尝试拉取变化选票的时长
static final int finalizeWait = 200;

// 投票箱,用于保存一轮选举的结果、统计选举结果
private SyncedLearnerTracker leadingVoteSet;

// 发送队列
LinkedBlockingQueue<ToSend> sendqueue;
// 接收队列
LinkedBlockingQueue<Notification> recvqueue;

// 用来启动WorkerSender和WorkerReceiver
Messenger messenger;

// 决议leaderId
long proposedLeader;
// 决议zxid
long proposedZxid;
// 决议epoch
long proposedEpoch;

start方法启动选举

public void start() {
    this.messenger.start(); // 会启动WorkerSender和WorkerReceiver两个线程
}

Messenger类

WorkerSender线程

  1. 从sendqueue取ToSend消息
  2. 通过QuorumCnxManager的toSend方法发送消息

WorkerReceiver线程

  1. 通过QuorumCnxManager的pollRecvQueue取接收的消息
  2. 封装Notification对象,推送到recvqueue队列

主要方法

// 创建发送消息
static ByteBuffer buildMsg(
    int state, long leader, long zxid, long electionEpoch, long epoch, byte[] configData);

// 给所有节点发Notification投票
private void sendNotifications();

// 对比serverId、zxid、currentEpoch决定将票投给哪个server
protected boolean totalOrderPredicate(
    long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch);

// 给定一个Vote集,返回SyncedLearnerTracker对象,用来确定是否有足够的选票确定选举结束
protected SyncedLearnerTracker getVoteTracker(Map<Long, Vote> votes, Vote vote);

// 如果有leader当选,并且有足够的选票,必须检查该leader是否投票并确认其处于领先地位
// 需要进行这种检查,以避免peers一次又一次地选举一个已经崩溃且不再领先的peer
protected boolean checkLeader(Map<Long, Vote> votes, long leader, long electionEpoch);

// 更新proposedLeader、proposedZxid、proposedEpoch
// 确定leader或者为下一轮投票做准备
synchronized void updateProposal(long leader, long zxid, long epoch);

// 使用当前proposedLeader、proposedZxid、proposedEpoch创建Vote(选票)
public synchronized Vote getVote();

// 通过zkDb获取lastLoggedZxid
private long getInitLastLoggedZxid();

// 获取currentEpoch
private long getPeerEpoch();

// 根据参数proposedLeader更新peer状态
// 如果已经是leader会使用voteSet更新leadingVoteSet
private void setPeerState(long proposedLeader, SyncedLearnerTracker voteSet);

// 启动一轮leader选举
// 当状态变为LOOKING该方法就会被调用,会给其他peer发投票notification
public Vote lookForLeader() throws InterruptedException;

// 收到FOLLOWING状态notification
private Vote receivedFollowingNotification(
    Map<Long, Vote> recvset, Map<Long, Vote> outofelection,
    SyncedLearnerTracker voteSet, Notification n);

// 收到LEADING状态notification
private Vote receivedLeadingNotification(
    Map<Long, Vote> recvset, Map<Long, Vote> outofelection,
    SyncedLearnerTracker voteSet, Notification n);

buildMsg方法

static ByteBuffer buildMsg(int state, long leader, long zxid,
                           long electionEpoch, long epoch, byte[] configData) {
    byte[] requestBytes = new byte[44 + configData.length];
    ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);

    requestBuffer.clear();
    requestBuffer.putInt(state); // 当前状态
    requestBuffer.putLong(leader); // 投票的leaderId
    requestBuffer.putLong(zxid); // zxid
    requestBuffer.putLong(electionEpoch); // 选举epoch
    requestBuffer.putLong(epoch); // 数据epoch
    requestBuffer.putInt(Notification.CURRENTVERSION); // 0x2
    requestBuffer.putInt(configData.length); // 数据长度
    requestBuffer.put(configData); // quorumVerifier数据

    return requestBuffer;
}

totalOrderPredicate方法

对比serverId、zxid、currentEpoch决定将票投给哪个server:

protected boolean totalOrderPredicate(
    long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {

    if (self.getQuorumVerifier().getWeight(newId) == 0) {
        return false;
    }

    /*
     * Return true if one of the following three cases hold:
     * 1- New epoch is higher
     * 2- New epoch is the same as current epoch, but new zxid is higher
     * 3- New epoch is the same as current epoch, new zxid is the same
     *  as current zxid, but server id is higher.
     */
    return ((newEpoch > curEpoch)
            || ((newEpoch == curEpoch)
                && ((newZxid > curZxid)
                    || ((newZxid == curZxid)
                        && (newId > curId)))));
}

getVoteTracker方法

给定一个Vote集,返回SyncedLearnerTracker对象,用来确定是否有足够的选票宣布选举结束:

protected SyncedLearnerTracker getVoteTracker(Map<Long, Vote> votes, Vote vote) {
    SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
    voteSet.addQuorumVerifier(self.getQuorumVerifier());
    if (self.getLastSeenQuorumVerifier() != null
        && self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
        voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
    }

    // 比对其他server响应的选票和本地的选票,决定是否将选票sid放入ack集
    for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
        if (vote.equals(entry.getValue())) {
            voteSet.addAck(entry.getKey()); // key是sid
        }
    }

    return voteSet;
}

checkLeader方法

protected boolean checkLeader(Map<Long, Vote> votes, long leader, long electionEpoch) {

    boolean predicate = true;

    if (leader != self.getMyId()) {
        if (votes.get(leader) == null) { // leader服务器必须投票,否则次轮投票也无效
            predicate = false;
        } else if (votes.get(leader).getState() != ServerState.LEADING) {
            // leader服务器的状态必须是LEADING,否则次轮投票也无效
            predicate = false;
        }
    } else if (logicalclock.get() != electionEpoch) { // 选举epoch必须一致
        predicate = false;
    }

    return predicate;
}

lookForLeader方法

启动一轮leader选举,当状态变为LOOKING该方法就会被调用,会给其他peer发投票notification通知:

public Vote lookForLeader() throws InterruptedException {
    // 略
    try {
        // 存储当前选举周期的sid -> vote选票数据
        Map<Long, Vote> recvset = new HashMap<>();

        // 存储之前选举周期的sid -> vote选票数据
        Map<Long, Vote> outofelection = new HashMap<>();

        int notTimeout = minNotificationInterval;

        synchronized (this) {
            logicalclock.incrementAndGet(); // 递增选举epoch开始新一轮选举
            // 初始化选举"决议",最开始都是投票给自己
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
        }
        // 给所有节点发通知
        sendNotifications();
        // 投票箱
        SyncedLearnerTracker voteSet = null;

        // 正常情况下直到选出leader才会退出
        while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
            Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

            if (n == null) {
                // 重发或者重连
                if (manager.haveDelivered()) {
                    sendNotifications();
                } else {
                    manager.connectAll();
                }

                notTimeout = Math.min(notTimeout << 1, maxNotificationInterval);

                // 略

            } else if (validVoter(n.sid) && validVoter(n.leader)) {
                switch (n.state) {
                case LOOKING:
                    // 略
                    // 对方的选举epoch比自己大
                    if (n.electionEpoch > logicalclock.get()) {
                        logicalclock.set(n.electionEpoch); // 同步为新的epoch
                        recvset.clear(); // 清空投票集
                        // 比对选票,如果对方赢了,则使用对方的选票更新到本地
                        if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                                getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                        } else {
                            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                        }
                        // 把最新的选票发出去
                        sendNotifications();
                    } else if (n.electionEpoch < logicalclock.get()) {
                        // 对方的选举epoch比自己小
                        break;
                    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                                   proposedLeader, proposedZxid, proposedEpoch)) {
                        updateProposal(n.leader, n.zxid, n.peerEpoch);
                        sendNotifications();
                    }

                    // 保存到选票集
                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                    // 创建投票箱
                    voteSet = getVoteTracker(
                        recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
                    // 判断acks>half表示已经选举出了leader
                    if (voteSet.hasAllQuorums()) {

                        // 等待拉取变化的选票
                        while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, 
                                                    proposedLeader, proposedZxid, proposedEpoch)) {
                                recvqueue.put(n);
                                break;
                            }
                        }

                        // 设置peer状态
                        if (n == null) {
                            setPeerState(proposedLeader, voteSet);
                            Vote endVote = new Vote(
                                proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }
                    break;
                case OBSERVING:
                    break;
                case FOLLOWING:
                    // 收到FOLLOWING通知
                    Vote resultFN = receivedFollowingNotification(recvset, outofelection, voteSet, n);
                    if (resultFN == null) {
                        break;
                    } else {
                        return resultFN;
                    }
                case LEADING:
                    // 收到LEADING通知
                    Vote resultLN = receivedLeadingNotification(recvset, outofelection, voteSet, n);
                    if (resultLN == null) {
                        break;
                    } else {
                        return resultLN;
                    }
                default:
                    break;
                }
            } else {
                // 略
            }
        }
        return null;
    } finally {
        // 略
    }
}

receivedFollowingNotification方法

收到FOLLOWING状态notification。

private Vote receivedFollowingNotification(
    Map<Long, Vote> recvset, Map<Long, Vote> outofelection, SyncedLearnerTracker voteSet, Notification n) {
    // 也会将选票投给当前leader
    // 之后会进行quorum验证和leaderCheck验证
    if (n.electionEpoch == logicalclock.get()) {
        // 创建投票箱
        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
        voteSet = getVoteTracker(
            recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
        // acks>half和leaderCheck
        if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {
            // 更新节点状态
            setPeerState(n.leader, voteSet);
            Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
            leaveInstance(endVote);
            return endVote;
        }
    }

    // 当本节点较晚进入集群,集群已经有了leader时,会进入下面逻辑
    // 与前面的代码基本相同
    outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
    voteSet = getVoteTracker(
        outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));

    if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
        synchronized (this) {
            logicalclock.set(n.electionEpoch);
            setPeerState(n.leader, voteSet);
        }
        Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
        leaveInstance(endVote);
        return endVote;
    }

    return null;
}

receivedLeadingNotification方法

收到LEADING状态notification。

private Vote receivedLeadingNotification(Map<Long, Vote> recvset, Map<Long, Vote> outofelection,
                                         SyncedLearnerTracker voteSet, Notification n) {
    Vote result = receivedFollowingNotification(recvset, outofelection, voteSet, n);
    if (result == null) {
        if (self.getQuorumVerifier().getNeedOracle() && !self.getQuorumVerifier().askOracle()) {
            // 略
        } else {
            return null;
        }
    } else {
        return result;
    }
}

QuorumPeer类

管理quorum协议,服务器可能处于以下三种状态:

  • Leader选举 - 每个服务器将选出一个leader,最初都会选自己
  • Follower节点 - 将与Leader同步并复制所有事务
  • Leader节点 - 处理请求并将其转发给Follower节点,大多数Follower节点必须同步,该请求才能被提交

run方法main loop

run方法main loop判断当前peer状态,执行选举、lead、follow等逻辑:

public void run() {
    // 略

    try {
        // Main loop
        while (running) {
            switch (getPeerState()) {
            case LOOKING:
                ServerMetrics.getMetrics().LOOKING_COUNT.add(1);
                if (Boolean.getBoolean("readonlymode.enabled")) {
                    // 略
                } else {
                    try {
                        reconfigFlagClear();
                        if (shuttingDownLE) {
                            shuttingDownLE = false;
                            startLeaderElection();
                        }
                        setCurrentVote(makeLEStrategy().lookForLeader());
                    } catch (Exception e) {
                        setPeerState(ServerState.LOOKING);
                    }
                }
                break;
            case OBSERVING:
                try {
                    setObserver(makeObserver(logFactory));
                    observer.observeLeader();
                } catch (Exception e) {
                } finally {
                    observer.shutdown();
                    setObserver(null);
                    updateServerState();

                    // Add delay jitter before we switch to LOOKING
                    // state to reduce the load of ObserverMaster
                    if (isRunning()) {
                        Observer.waitForObserverElectionDelay();
                    }
                }
                break;
            case FOLLOWING:
                try {
                    setFollower(makeFollower(logFactory));
                    follower.followLeader();
                } catch (Exception e) {
                } finally {
                    follower.shutdown();
                    setFollower(null);
                    updateServerState();
                }
                break;
            case LEADING:
                try {
                    setLeader(makeLeader(logFactory));
                    leader.lead();
                    setLeader(null);
                } catch (Exception e) {
                } finally {
                    if (leader != null) {
                        leader.shutdown("Forcing shutdown");
                        setLeader(null);
                    }
                    updateServerState();
                }
                break;
            }
        }
    } finally {
        // 略
    }
}

LOOKING分支

try {
    reconfigFlagClear();
    if (shuttingDownLE) {
        shuttingDownLE = false;
        startLeaderElection();
    }
    // 使用FastLeaderElection选举
    setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
    setPeerState(ServerState.LOOKING); // 重置为LOOKING状态
}

FOLLOWING分支

try {
    setFollower(makeFollower(logFactory));
    follower.followLeader(); // 启动follower
} catch (Exception e) {
} finally {
    follower.shutdown();
    setFollower(null);
    updateServerState(); // 更新服务状态
}

创建Follower对象:

protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
    return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));
}

LEADING分支

try {
    setLeader(makeLeader(logFactory));
    leader.lead(); // 启动leader
    setLeader(null);
} catch (Exception e) {
} finally {
    if (leader != null) {
        leader.shutdown("Forcing shutdown");
        setLeader(null);
    }
    updateServerState(); // 更新服务状态
}

创建Leader对象:

protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {
    return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));
}

OBSERVING分支

try {
    setObserver(makeObserver(logFactory));
    observer.observeLeader();
} catch (Exception e) {
} finally {
    observer.shutdown();
    setObserver(null);
    updateServerState();

    // Add delay jitter before we switch to LOOKING
    // state to reduce the load of ObserverMaster
    if (isRunning()) {
        Observer.waitForObserverElectionDelay();
    }
}

创建Observer对象:

protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {
    return new Observer(this, new ObserverZooKeeperServer(logFactory, this, this.zkDb));
}