[OSPP开源之夏-2023] 结合NWR实现Flexible Raft

发布时间 2023-07-01 14:56:11作者: AKAI_CHI

Summary

我们希望在原始RAFT算法的基础上,让Leader选举和日志复制除了能需要获得多数派确认模型的支持之外,还可以接入NWRQuorum模型,用于动态调整一致性强度。

Motivation

在原始的RAFT算法中,Leader选举和日志复制都需要获得多数派成员的支持。而NWR模型则可以在动态调整一致性强度的场景中使用,它需要满足W+R>N,以保证强一致性。
JRaft将RAFT和NWR结合起来,使得用户可以根据不同的业务需求来动态调整Quorum的数量。例如,在一个写多读少的场景中,用户可以将多数派的数量从3调整为2,以降低达成共识的条件,从而提高写请求的效率。同时,为了保证RAFT的正确性,写Quorum的调整需要付出代价,即读Quorum的数量也需要相应调整。
JRaft支持成员变更,因此用户可以配置(0,1]范围内的小数来计算W和R的具体值。通过使用JRaft,用户可以根据自己的业务需求来灵活地调整一致性强度,使得分布式系统在不同场景下都可以获得最佳的性能和正确性。

Key design

下图为NWR模型Quroum的设计思路:

  • 抽离出抽象父类Quorum作为MajorityQuorum(多数派确认模型,原Ballot类)和NWRQuorum(NWR模型)的模板。
  • 用户首先需要在NodeOptions类中决定是否开启NWR模式,默认为多数派模型,用户手动设置读写Factor因子后,则视为开启了NWR模式。
  • NodeImpl#init方法进行逻辑处理,通过判断是否开启了NWR模式,构造对应的Quorum选票实例,例如MajorityQuorum与NWRQuorum。
  • 在构建好选票实例之后,调用对应的方法可以进行选票的初始化(init)、投票(grant)等操作。

该项目涉及代码变更的地方可以划分为如下四个模块:

  • Leader选举模块: 一个节点想要成为leader,会经过以下几个阶段:预投票、正式投票、当选leader。所以对于preVote、electSelf、becomeLeader等等与多数派模型相关的方法都会涉及NWR模型的有关代码变更。
  • 日志复制模块: 当leader收到客户端的事务请求或者follower与leader数据存在差距时,会调用 Replicator#sendEntries 去发送日志复制消息(事务消息);而心跳消息和探测消息,则是由 Replicator#sendEmptyEntries 发送的。日志复制中,NodeImpl#executeApplyingTasks 和 NodeImpl#unsafeApplyConfiguration 方法会涉及到多数派确认。在执行这些方法的时候,都会使用 BallotBox#appendPendingTask 方法来构造一个待投票的Ballot(现在叫MajorityQuorum/NWRQuorum)并放置到投票箱中。
  • 一致性读模块: 对于一致性读模块,在raft共识算法中,读取R个节点其实体现在R个节点的心跳响应。通过R个节点的心跳,能保证这个节点一定是leader,一定拥有最新的数据,我们并不是真正需要从R个节点里面读取数据。NodeImpl#ReadIndexHeartbeatResponseClosure 这样的方法,可以看到执行了心跳消息的多数派确认模型的逻辑,ReadIndexHeartbeatResponseClosure构造器里面传入了quorum的值,这里我们需要对应修改为NWR模型的逻辑。
  • 成员变更模块: 对于JRaft成员变更来讲,核心逻辑是采用单成员变更的方式,即使需要同时变更多个成员时,也是会先整理出新add与新remove的成员,再逐个进行单成员变更。其核心方法 addPeer、removePeer、changePeers、resetPeers 等等都会涉及NWR模型的适配。

Detailed Design

NodeOptions

NodeOptions类中,我们新增了如下三个参数:readQuorumFactor、writeQuorumFactor与enableNWRMode,分别表示读因子、写因子以及是否开启NWR模型(true),默认不开启,表示多数派确认模型(false)。

    /**
     * Read Quorum's factor
     */
    private double                          readQuorumFactor;
    /**
     * Write Quorum's factor
     */
    private double                          writeQuorumFactor;
    /**
     * Enable NWRMode or Not
     */
    private boolean                         enableNWRMode          = false;

对于readQuorumFactor和writeQuorumFactor两个属性,在NodeOptions类里提供了setter和getter方法便于用户自定义配置。对于enableNWRMode属性,提供了isEnableNWRModeI()来判断是否开启NWR模型,而enableNWRMode()方法表示开启NWR模式。

    public double getReadQuorumFactor() {
        return readQuorumFactor;
    }
    public void setReadQuorumFactor(double readQuorumFactor) {
        this.readQuorumFactor = readQuorumFactor;
        enableNWRMode();
    }
    public double getWriteQuorumFactor() {
        return writeQuorumFactor;
    }
    public void setWriteQuorumFactor(double writeQuorumFactor) {
        this.writeQuorumFactor = writeQuorumFactor;
        enableNWRMode();
    }
    public boolean isEnableNWRMode() {
        return enableNWRMode;
    }
    private void enableNWRMode() {
        this.enableNWRMode = true;
    }

Node Init

在NodeImpl#init时,我们首先会对NodeOptions内部的readFactor和writeFactor进行校验并且进行参数同步,如果用户只设置了readFactor和writeFactor两个参数的其中之一,那么我们需要同步这两个参数的值。
在init方法初始化node时,会首先对NWR模式下的factor进行校验与同步。

if(options.isEnableNWRMode() && !checkAndResetFactor(options.getWriteQuorumFactor(),
         options.getReadQuorumFactor())){
     return false;
}

校验与同步方法在checkAndResetFactor里:

    private boolean checkAndResetFactor(Double writeFactor, Double readFactor){
        if (Objects.nonNull(readFactor) && Objects.nonNull(writeFactor)) {
            if (readFactor + writeFactor != 1) {
                LOG.error("The sum of readFactor and writeFactor should be 1");
                return false;
            }
            return true;
        }
        if (Objects.nonNull(readFactor)) {
            if (readFactor > 0 && readFactor <= 1) {
                options.setWriteQuorumFactor(1 - readFactor);
                return true;
            }
            LOG.error("Fail to set quorum_nwr read_factor because {} is not between (0,1]", readFactor);
        }
        if (Objects.nonNull(writeFactor)) {
            if (writeFactor > 0 && writeFactor <= 1) {
                options.setReadQuorumFactor(1 - writeFactor);
                return true;
            }
            LOG.error("Fail to set quorum_nwr write_factor because {} is not between (0,1]", writeFactor);
        }
        return false;
    }

在之前node初始化时,生成Ballot对象是通过关键字直接new出来的,如下所示:

private final Ballot voteCtx = new Ballot(); 
private final Ballot prevVoteCtx = new Ballot();

添加NWR模型后,我们需要判断,到底是生成MajorityQuorum还是NWRQuorum。所以在对节点进行初始化时(NodeImpl#init),会根据NodeOptions判断是否开启NWR模型,进而构造对应实例。

prevVoteCtx = options.isEnableNWRMode() ? new NWRQuorum(opts.getReadQuorumFactor(), opts.getWriteQuorumFactor())
    : new MajorityQuorum();
voteCtx = options.isEnableNWRMode() ? new NWRQuorum(opts.getReadQuorumFactor(), opts.getWriteQuorumFactor())
    : new MajorityQuorum();

Quorum Detail

Quoroum

Quorum作为NWRQuorum与MajorityQuorum的抽象父类,持有peers、oldPeers、quorum、oldQuorum几个公共属性。

protected final List<Quorum.UnfoundPeerId> peers = new ArrayList<>()
protected int quorum;
protected final List<Quorum.UnfoundPeerId> oldPeers = new ArrayList<>();
protected int oldQuorum;

Quorum提供了grant和init两个抽象方法,子类实现该抽象方法的具体业务逻辑。

public abstract boolean init(final Configuration conf, final Configuration oldConf);
public abstract void grant(final PeerId peerId);

Quorum还定义了findPeer、isGranted、grant这三个包含方法体的父类方法。

public PosHint grant(final PeerId peerId, final PosHint hint){
    //此处省略方法体
}
public boolean isGranted() {
    //此处省略方法体
}
private UnfoundPeerId findPeer(final PeerId peerId, final List<UnfoundPeerId> peers, final int posHint){
    //此处省略方法体
}

NWRQuorum

NWRQuorum作为NWR模型选票实现类,持有readFactor、writeFactor、oldReadFactor、oldWriteFactor、quorumType几个属性,他们代表读写因子与QuoroumType类型(读quorum、写quorum)。

    protected Double readFactor; ---读因子
    protected Double writeFactor; ---写因子

另外,我们提供了一个NWRQuorum的构造器用于构造NWRQuorum实例,需要传入writeFactor, readFactor, quorumType三个参数。

    public NWRQuorum(Double writeFactor, Double readFactor) {
        this.writeFactor = writeFactor;
        this.readFactor = readFactor;
    }

我们也实现了抽象父类的init与grant方法,

public boolean init(Configuration conf, Configuration oldConf) ---初始化选票
public void grant(final PeerId peerId) ---节点投票

对于NWRQuorum的init()方法来讲,他对于quorum的计算与以往有所不同,代码如下:

    @Override
    public boolean init(Configuration conf, Configuration oldConf) {
        peers.clear();
        oldPeers.clear();
        quorum = oldQuorum = 0;
        int index = 0;

        if (conf != null) {
            for (final PeerId peer : conf) {
                peers.add(new UnfoundPeerId(peer, index++, false));
            }
        }
        quorum = new Double(Math.ceil(writeFactor * peers.size())).intValue();

        if (oldConf == null) {
            return true;
        }
        index = 0;
        for (final PeerId peer : oldConf) {
            oldPeers.add(new UnfoundPeerId(peer, index++, false));
        }
        oldQuorum = new Double(Math.ceil(writeFactor * oldPeers.size())).intValue();
        return true;
    }

MajorityQuorum

MajorityQuorum实现了Quorum抽象父类的两个方法,init方法初始化选票需要参数,grant方法用于投票。

public boolean init(final Configuration conf, final Configuration oldConf) ---初始化选票 
public void grant(final PeerId peerId) ---节点投票

Module Detail

Leader-election Module

一个节点成为leader会经过以下几个阶段:预投票、正式投票、当选leader。
首先我们来看预投票NodeImpl#preVote()方法,大概经历以下几个过程:

  1. 校验是否可以开启预投票,安装快照或者集群配置不包含本节点都不可以开启预投票。
  2. 初始化预投票-投票箱。
  3. 遍历,给除了本节点之外的所有其他节点发起RequestVoteRequest--RPC请求。
  4. 给自己投票,并判断是否已经达到多数派。

其中预投票有以下几处核心代码涉及投票:

  • 在NodeImpl#preVote()中,调用Quorum#init()方法初始化预投票-投票箱
prevVoteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
  • 在NodeImpl#preVote()中,本节点自己投票,在判断投票箱达到多数派后开启正式选举,调用electSelf()
            prevVoteCtx.grant(this.serverId);
            if (prevVoteCtx.isGranted()) {
                doUnlock = false;
                electSelf();
            }
  • 在NodeImpl#handlePreVoteResponse中,该方法用来处理预投票响应:首先根据响应判断对方节点是否为本节点投票,在判断为true后,Quorum(即prevVoteCtx)调用grant()对该节点进行授权投票。最后通过isGranted()判断是否大多数节点已经确认,如果符合条件,则开启正式选举模式,调用electSelf()方法。
            // check granted quorum?
            if (response.getGranted()) {
                prevVoteCtx.grant(peerId);
                if (prevVoteCtx.isGranted()) {
                    doUnlock = false;
                    electSelf();
                }
            }
  • 接下来多数派确认后,执行NodeImpl#electSelf()方法,它做了以下几件事:
  1. 检验当前节点是否存在集群配置里面,不存在不进行选举。
  2. 关闭预选举定时器。
  3. 清空leader,增加任期,修改状态为candidate,votedId设置为当前本节点。
  4. 启动投票定时器voteTimer,因为可能投票失败需要循环发起投票,voteTimer里面会根据当前的CANDIDATE状态调用electSelf进行选举。
  5. 初始化投票箱。
  6. 遍历所有节点,向其他集群节点发送RequestVoteRequest--RPC请求,请求被RequestVoteRequestProcessor处理器处理的。
  7. 如果多数派确认,则调用NodeImpl#becomeLeader晋升为leader。
            voteCtx.grant(this.serverId);
            if (voteCtx.isGranted()) {
                becomeLeader();
            }
  • 在NodeImpl#handleRequestVoteResponse中,该方法用来处理投票请求的响应。只要收到投票的反馈,就会在投票箱中对多数派进行确认,如果已经达成多数派确认的共识,那么本节点就调用NodeImpl#becomeLeader方法成为leader。投票请求处理器NodeImpl#handleRequestVoteResponse方法对选票处理的核心逻辑如下:
            // check granted quorum?
            if (response.getGranted()) {
                voteCtx.grant(peerId);
                if (voteCtx.isGranted()) {
                    becomeLeader();
                }
            }
  • 在多数派确认后,会调用NodeImpl#becomeLeader方法正式被选举为leader:
  1. 首先会停止选举定时器。
  2. 设置当前的状态为leader。
  3. 设值任期。
  4. 遍历所有的节点将节点加入到复制集群中。
  5. 最后将stepDownTimer打开,定时对leader进行校验是不是又半数以上的节点响应当前的leader。

Log-replication Module

当leader收到客户端的事务请求或者follower与leader数据存在差距时,会调用Replicator#sendEntries去复制日志,日志复制消息属于事务消息;而心跳消息和探测消息,则是由Replicator#sendEmptyEntries发送的。
日志复制的流程如下:

  1. leader将日志项追加到本地日志
  2. leader将日志广播给follower
  3. follower追加到本地日志
  4. follower返回执行结果
  5. leader收到多数派响应后提交日志
  6. 返回执行结果给客户端
    在JRaft中,我阅读了日志复制模块的源码部分,然后总结出下图来直观的反应整个日志复制从leader到follower再回到leader的整个过程,详细的方法调用链路过程如下所示:

在日志复制中,以下方法会涉及到多数派确认:NodeImpl#executeApplyingTasks 和NodeImpl#unsafeApplyConfiguration,也就是执行应用任务和应用配置变更所使用到的日志复制。在执行这些方法的时候,都会使用BallotBox#appendPendingTask方法来构造一个待投票的Quorum并放置到投票箱中。

场景一:应用任务
我们首先分析一下NodeImpl#executeApplyingTasks方法:

  1. 检查当前节点是否是 Leader 节点。如果节点不是 Leader 节点,则将所有任务的状态设置为错误并执行相应的回调方法;如果节点正在进行领导权转移,则将所有任务的状态设置为繁忙并执行相应的回调方法。
  2. 遍历任务列表,对于每个任务执行以下操作:a. 检查任务的 expectedTerm 是否与当前任期相同,如果不同则将任务的状态设置为错误并执行相应的回调方法。b. 将任务添加到 BallotBox 中。c. 将任务的日志条目信息添加到一个列表中,并将任务重置为默认状态。
  3. 将任务列表中的所有日志条目追加到当前节点的日志中,并将追加操作封装为 LeaderStableClosure 回调方法。
  4. 检查并更新配置信息,如果需要更新则执行相应的更新操作。

注意:在executeApplyingTasks方法中,根据当前节点配置,生成了一个待投票Quorum,并放置到投票箱BallotBox的pendingMetaQueue中。所以我们需要在这里构造待投票Quorum时,修改quorum为NWR模式,而不是之前的多数派。

    private void executeApplyingTasks(final List<LogEntryAndClosure> tasks)  {
        // 省略部分代码...
       if (!this.ballotBox.appendPendingTask(this.conf.getConf(),
          this.conf.isStable() ? null : this.conf.getOldConf(), task.done,options.isEnableNWRMode() ?
          QuorumFactory.createNWRQuorumConfiguration(options.getWriteQuorumFactor(), options.getReadQuorumFactor()):
          QuorumFactory.createMajorityQuorumConfiguration())) {
          ThreadPoolsFactory.runClosureInThread(this.groupId, task.done, new Status(RaftError.EINTERNAL, "Fail to append task."));
          task.reset();
          continue;
          }
        // 省略部分代码...
        this.logManager.appendEntries(entries, new LeaderStableClosure(entries));
        // 省略部分代码...
    }

场景二:
接下来看看NodeImpl#unsafeApplyConfiguration是如何构建选票Ballot的:
这段代码主要用于将新的配置信息封装成一个日志条目,并追加到当前节点的日志中,从而实现配置变更的操作。其实逻辑和增加普通日志类似,主要需要注意的还是ballotBox.appendPendingTask方法,也就是生成一个待投票Quorum的逻辑。

    private void unsafeApplyConfiguration(final Configuration newConf, final Configuration oldConf,
                                          final boolean leaderStart) {
        // 省略部分代码...
        if (!this.ballotBox.appendPendingTask(newConf, oldConf, configurationChangeDone,options.isEnableNWRMode() ?
            QuorumFactory.createNWRQuorumConfiguration(options.getWriteQuorumFactor(), options.getReadQuorumFactor()):
                QuorumFactory.createMajorityQuorumConfiguration())) {
            ThreadPoolsFactory.runClosureInThread(this.groupId, configurationChangeDone, new Status(
                RaftError.EINTERNAL, "Fail to append task."));
            return;
        }
        // 省略部分代码...
        this.logManager.appendEntries(entries, new LeaderStableClosure(entries));
        checkAndSetConfiguration(false);
    }
QuorumFactory

在上面的executeApplyingTasks与unsafeApplyConfiguration方法中使用到了QuorumFactory这个工厂类的方法。为了更方便配置一个Quorum的属性,可以将factor因子和NWR开关整合到QuorumConfiguration类中,以便于快速构建一个QuorumConfiguration。实现代码如下:

public final class QuorumFactory {
    public static QuorumConfiguration createNWRQuorumConfiguration(Double writeFactor,Double readFactor) {
        boolean isEnableNWR = true;
        QuorumConfiguration quorumConfiguration = new QuorumConfiguration();
        quorumConfiguration.setReadFactor(readFactor);
        quorumConfiguration.setWriteFactor(writeFactor);
        quorumConfiguration.setEnableNWR(isEnableNWR);
        return quorumConfiguration;
    }

    public static QuorumConfiguration createMajorityQuorumConfiguration(){
        boolean isEnableNWR = false;
        QuorumConfiguration quorumConfiguration = new QuorumConfiguration();
        quorumConfiguration.setEnableNWR(isEnableNWR);
        return quorumConfiguration;
    }
}

对于BallotBox#CommitAt来说,在进行确认时,只需要从pendingMetaQueue获取Quorum再进行grant授权投票即可,之后再判断是否已经达到(多数派/NWR)确认。

    public boolean commitAt(final long firstLogIndex, final long lastLogIndex, final PeerId peer) {
            // 省略部分代码...
            Quorum.PosHint hint = new Quorum.PosHint();
            for (long logIndex = startAt; logIndex <= lastLogIndex; logIndex++) {
                final Quorum quorum = this.pendingMetaQueue.get((int) (logIndex - this.pendingIndex));
                hint = quorum.grant(peer, hint);
                if (quorum.isGranted()) {
                    lastCommittedIndex = logIndex;
                }
            }
            // 省略部分代码...
        this.waiter.onCommitted(lastCommittedIndex);
        return true;
    }

Consistent-reading Module

对于ReadIndexHeartbeatResponseClosure类来讲,他的run方法执行了心跳消息多数派确认逻辑。它的构造器里面传入的quorum值需要进行NWR模型适配并且failPeersThreshold属性也需要重新适配计算逻辑。
原有获取ReadQuorum数值的多数派确认逻辑是:

    private int getQuorum() {
        final Configuration c = this.conf.getConf();
        if (c.isEmpty()) {
            return 0;
        }
        return c.getPeers().size() / 2 + 1;
    }

如今我们需要修改该方法,额外对NWR模型进行判断:

    private int getQuorum(QuorumConfiguration quorumConfiguration) {
        final Configuration c = this.conf.getConf();
        if (c.isEmpty()) {
            return 0;
        }
        int size = c.getPeers().size();
        if(!options.isEnableNWRMode()){
            return size / 2 + 1;
        }
        return size - new Double(Math.ceil(c.getPeers().size() * options.getWriteQuorumFactor())).intValue() + 1;
    }

failPeersThreshold原有计算逻辑:

this.failPeersThreshold = peersCount % 2 == 0 ? (quorum - 1) : quorum;

修改后:

this.failPeersThreshold = options.isEnableNWRMode() ? peersCount - quorum + 1 :
     (peersCount % 2 == 0 ? (quorum - 1) : quorum);