elastic-job源码(2)-选举机制

发布时间 2023-04-26 19:29:23作者: wjdxw
选举机制:利用zookeeper分布式锁机制,每一个job都存在节点选举机制,用于job分片处理。 
 
Job在初始化的时候就会实施选举机制
如下初始化的代码: 
public void registerStartUpInfo(final boolean enabled) {
    //开始所有的监听器
    listenerManager.startAllListeners();
    //选举leader /{namespace}/leader/election/instance 放置选举出来的服务器
    leaderService.electLeader();
    //{namespace}/{ipservers} 设置enable处理
    serverService.persistOnline(enabled);
    //临时节点   /{namespave}/instances 放置运行服务实例信息
    instanceService.persistOnline();
    //开启一个异步服务
    if (!reconcileService.isRunning()) {
        reconcileService.startAsync();
    }
}

listenerManager.startAllListeners();会开启一个选举相关的listenerManager  ElectionListenerManager.class
leaderService.electLeader();执行选举功能 
 
第一步:执行选举功能
public void electLeader() {
    log.debug("Elect a new leader now.");
    this.jobNodeStorage.executeInLeader("leader/election/latch", new LeaderService.LeaderElectionExecutionCallback());
    log.debug("Leader election completed.");
}
public void executeInLeader(String key, LeaderExecutionCallback callback) {
    try {
        LeaderLatch latch = new LeaderLatch(this.client, key);


        try {
            latch.start();
            latch.await();
            callback.execute();
        } catch (Throwable var7) {
            try {
                latch.close();
            } catch (Throwable var6) {
                var7.addSuppressed(var6);
            }


            throw var7;
        }


        latch.close();
    } catch (Exception var8) {
        this.handleException(var8);
    }


}

{job name}/leader/election/latch节点加zk锁,在抢到锁之后,调用callback对象中的execute方法
 
class LeaderElectionExecutionCallback implements LeaderExecutionCallback {
    
    @Override
    public void execute() {
        //{jobname}/leader/election/instance 不存在
        if (!hasLeader()) {
            //创建临时节点 {jobname}/leader/election/instance 值为 当前运行的实例值 例如:10.100.16.75@-@134642 前面是ip地址,后面是产生的随机数
            //当应用实例与zk断开重新连接时,该节点信息会清除
            jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
        }
    }
}

第二步:ElectionListenerManager.class开启监听
@Override
public void start() {
    addDataListener(new LeaderElectionJobListener());
    addDataListener(new LeaderAbdicationJobListener());
}

执行start方法
有两个监听 LeaderElectionJobListener:用于leader宕机之后重新选举监听
LeaderAbdicationJobListener :用于监听leader宕机数据处理
 
LeaderElectionJobListener.java
@Override
public void onChange(final DataChangedEvent event) {
    //1.schedulerMap 和 jobInstanceMap 没有job信息
    //2.{jobname}/service/{ip} 节点数据为DISABLE 或者 ({jobname}/leader/election/instance 节点的类型为删除且{jobname}/servers 节点的值是ENABLED 且  {jobname}/instances 节点下有其他的在线实例)
    //当前运行的job实例宕机,并且有其他运行的实例
    if (!JobRegistry.getInstance().isShutdown(jobName) && (isActiveElection(event.getKey(), event.getValue()) || isPassiveElection(event.getKey(), event.getType()))) {
        //重新选举
        leaderService.electLeader();
    }
}

LeaderAbdicationJobListener.java

@Override
public void onChange(final DataChangedEvent event) {
    //{jobname}/leader/election/instance节点的实例id和JobRegistry对象中的实例id相等
    //{jobname}/service/{ip}/ 是DISABLED
    //就是实例下线
    if (leaderService.isLeader() && isLocalServerDisabled(event.getKey(), event.getValue())) {
        //删除{jobname}/leader/election/instance 节点
        leaderService.removeLeader();
    }
}