Elasticjob 3.x 最新版本源码解读

发布时间 2023-06-06 15:43:05作者: ityml

ElasticJob - 分布式作业调度

官方网站: https://shardingsphere.apache.org/elasticjob/

Stargazers over time

ElasticJob 是面向互联网生态和海量任务的分布式调度解决方案,由两个相互独立的子项目 ElasticJob-Lite 和 ElasticJob-Cloud 组成。
它通过弹性调度、资源管控、以及作业治理的功能,打造一个适用于互联网场景的分布式调度解决方案,并通过开放的架构设计,提供多元化的作业生态。
它的各个产品使用统一的作业 API,开发者仅需一次开发,即可随意部署。

ElasticJob 已于 2020 年 5 月 28 日成为 Apache ShardingSphere 的子项目。
GitHub Workflow
codecov

简介

使用 ElasticJob 能够让开发工程师不再担心任务的线性吞吐量提升等非功能需求,使他们能够更加专注于面向业务编码设计;
同时,它也能够解放运维工程师,使他们不必再担心任务的可用性和相关管理需求,只通过轻松的增加服务节点即可达到自动化运维的目的。

ElasticJob-Lite

定位为轻量级无中心化解决方案,使用 jar 的形式提供分布式任务的协调服务。

功能列表

  • 弹性调度
    • 支持任务在分布式场景下的分片和高可用
    • 能够水平扩展任务的吞吐量和执行效率
    • 任务处理能力随资源配备弹性伸缩
  • 资源分配
    • 在适合的时间将适合的资源分配给任务并使其生效
    • 相同任务聚合至相同的执行器统一处理
    • 动态调配追加资源至新分配的任务
  • 作业治理
    • 失效转移
    • 错过作业重新执行
    • 自诊断修复
  • 作业依赖(TODO)
    • 基于有向无环图(DAG)的作业间依赖
    • 基于有向无环图(DAG)的作业分片间依赖
  • 作业开放生态
    • 可扩展的作业类型统一接口
    • 丰富的作业类型库,如数据流、脚本、HTTP、文件、大数据等
    • 易于对接业务作业,能够与 Spring 依赖注入无缝整合
  • 可视化管控端
    • 作业管控端
    • 作业执行历史数据追踪
    • 注册中心管理

架构图

image.png

基础知识

ElasticJob的作业分类基于class和type两种类型。

  • 基于class的作业需要开发者自行通过实现接口的方式织入业务逻辑;
    • 基于class的作业接口的方法参数shardingContext包含作业配置、片和运行时信息。可通过getShardingTotalCount()、getShardingItem()等方法分别获取分片总数和运行在本作业服务器的分片序列号等。
  • 基于type的作业则无需编码,只需要提供相应配置即可。
  • ElasticJob目前提供 SimpleDataflow这两种基于 class的作业类型,并提供 ScriptHTTP这两种基于 type的作业类型,用户可通过实现 SPI接口自行扩展作业类型。

SimpleJob

SimpleJob: 简单实现,未经任何封装的类型。需实现SimpleJob接口。

ejob-standalone MySimpleJob.java

public class MyElasticJob implements SimpleJob {
	public void execute(ShardingContext context) {
	System.out.println(String.format("Item: %s | Time: %s | Thread: %s ",
	context.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()),Thread.currentThread().getId()));
    }
}

定义了作业,还需要对作业进行配置,比如作业的名称、分片数、cron时间表达式以及是否需要失效转移等,主要通过JobConfiguration类来完成这些配置,它提供了构建者风格的方法,比如下面的作业配置,作业名称为MySimpleJob、作业分片数为3,并且在每一分钟的第30秒执行任务,调用overwrite方法用来设置在作业启动时是否将本地配置覆盖到注册中心(默认不覆盖,所以本地修改了cron时间表达式会不起作用),如果需要覆盖(方法传入true),则每次启动时都将使用本地配置(即以本地的作业配置为主,不然本地修改作业配置不会起作用)。调用failover方法用于设置是否开启失效转移(仅适用于开启了 monitorExecution,默认开启 monitorExecution,但默认不开启失效转移),ElasticJob不会在本次执行过程中进行重新分片(给作业节点分配作业分片),而是等待下次调度之前才开启重新分片流程。当作业执行过程中服务器宕机,失效转移允许将该次未完成的任务在另一作业节点上补偿执行。

开启失效转移功能,ElasticJob会监控作业每一分片的执行状态,并将其写入注册中心,供其他节点感知。在一次运行耗时较长且间隔较长的作业场景,失效转移是提升作业运行实时性的有效手段;对于间隔较短的作业,会产生大量与注册中心的网络通信,对集群的性能产生影响。而且间隔较短的作业并未见得关注单次作业的实时性,可以通过下次作业执行的重分片使所有的分片正确执行,因此不建议短间隔作业开启失效转移。 另外需要注意的是,作业本身的幂等性,是保证失效转移正确性的前提.

    private static JobConfiguration createJobConfiguration() {
        return JobConfiguration.newBuilder("MySimpleJob", 3)
                .cron("30 * * * * ?")
                .overwrite(true)
                .failover(true)
                .build();
    }

DataFlowJob

DataFlowJob:Dataflow类型用于处理数据流,必须实现fetchData()和processData()的方法,一个用来获取数据,一个用来处理获取到的数据。其他的是一样的,跟Simple没有区别

ejob-standalone MyDataFlowJob.java

public class MyDataFlowJob implements DataflowJob<String> {
@Override
public List<String> fetchData(ShardingContext shardingContext) {
// 获取到了数据
return Arrays.asList("qingshan","jack","seven");
}

@Override
public void processData(ShardingContext shardingContext, List<String> data) {
data.forEach(x-> System.out.println("开始处理数据:"+x));
}
}

ScriptJob

Script:Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本。D盘下新建1.bat,内容:

@echo ------【脚本任务】Sharding Context: %*

ejob-standalone script.ScriptJobTest

只要指定脚本的内容或者位置

HTTP

暂时不考虑

代码结构 elasticjob-lite-core

api 暴露给用户调用的类

  • bootstrap 作业启动类
    • OneOffJobBootstrap 一次性调度
    • ScheduleJobBootstrap 定时调度
  • listener 作业监听器
    • AbstractDistributeOnceElasticJobListener 分布式作业监听器
  • register // TODO 没看懂
    • JobInstanceRegistry

internal 内部核心功能,按功能分包

  • config 配置
  • election 选举
  • instance 实例 | JobInstance
  • server 服务器 | ip
  • sharding 分片
  • failover 故障转移
  • reconcile 自诊断修复(重新分片)
  • setup 启动设置
  • schedule 调度器 | 作业调度器,作业类型,作业生命周期
  • storage 存储 | 默认zk
  • snapshot dump 任务快照
  • trigger 手动触发
  • listener 配置变更监听器
  • guarantee 分布式作业监听器幂等保障

源码

初始化

测试代码

ScheduleJobBootstrap 是E-job 的入口执行类,CoordinatorRegistryCenter注册中心zk的相关信息,createJobConfiguration job config 参数的设置。

  public static class JobTest {
        //mvn clean install -DskipTests -Drat.skip=true
        public static void main(String[] args) {
            new ScheduleJobBootstrap(coordinatorRegistryCenter(), new TestJob(), createJobConfiguration());
        }

        private static CoordinatorRegistryCenter coordinatorRegistryCenter() {
            ZookeeperConfiguration zc = new ZookeeperConfiguration("localhost:2181", "elastic-job-dev");
            zc.setConnectionTimeoutMilliseconds(40000);
            zc.setMaxRetries(5);
            CoordinatorRegistryCenter registryCenter = new ZookeeperRegistryCenter(zc);
            registryCenter.init();
            return registryCenter;
        }

        private static JobConfiguration createJobConfiguration() {
            String jobs = "0=zgc,1=gzq,2=wjm";
            return JobConfiguration.newBuilder("TestJob1", 3)
                    .cron("0/5 * * * * ?")
                    .shardingItemParameters(jobs)
                    .overwrite(true)
                    .failover(true)
                    .build();
        }
    }

Jobschedule 类说明

Jobschedule完成了很多和quartz相关的内容,并且也会将服务信息注册到zk上,并且做好相关的job信息的cache。在整个任务的调度中,jobSchedule完成了elastic job主干流程。其中主要有5种类型的方法:生成SetUpFacade、SchedulerFacade、LiteJobFacade,生成ElasticJobExecutor,调用setUpFacade.setUpJobConfiguration初始化配置,调用setGuaranteeServiceForElasticJobListeners为listener设置GuaranteeService,createJobScheduleController完成收尾,接下来

setUpJobConfiguration 设置zk的节点信息。会通过节点是否存在或者本地配置是否可以覆盖注册中心来写入zk的config 节点。

validateJobProperties是否配置异常处理器,若配置则加载异常处理器,加载异常处理器根据JobConfig 类的 jobErrorHandlerType字段来判断异常处理策略

ElasticJobExecutor 执行器的初始化设置 JobItemExecutorFactory 获取执行器

createJobScheduleController初始化调度器,底层为Quartz 分别为Schedule JobDetail triggerIdentity

 /**
     * 核心执行方法  SetUpFacade SchedulerFacade LiteJobFacade ElasticJobExecutor
     */
    public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig) {
        Preconditions.checkArgument(null != elasticJob, "Elastic job cannot be null.");


        this.regCenter = regCenter;
        // 获取需要执行的class 实现了job 的类(SimpleJob)
        String jobClassName = JobClassNameProviderFactory.getProvider().getJobClassName(elasticJob);
        // 设置配置属性信息 zk config node 节点  config 信息
        this.jobConfig = setUpJobConfiguration(regCenter, jobClassName, jobConfig);
        Collection<ElasticJobListener> jobListeners = getElasticJobListeners(this.jobConfig);


        // 对zk 进行操作 路径  配置
        setUpFacade = new SetUpFacade(regCenter, this.jobConfig.getJobName(), jobListeners);
        schedulerFacade = new SchedulerFacade(regCenter, this.jobConfig.getJobName());
        jobFacade = new LiteJobFacade(regCenter, this.jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null));

				// 是否配置异常处理器,若配置则加载异常处理器
        validateJobProperties();

        // 定义执行器
        jobExecutor = new ElasticJobExecutor(elasticJob, this.jobConfig, jobFacade);

        // 设置监听
        setGuaranteeServiceForElasticJobListeners(regCenter, jobListeners);

        // 核心代码 创建 schedule
        jobScheduleController = createJobScheduleController();
    }


生成SetUpFacadeSchedulerFacadeLiteJobFacade,这三个Facade都是对regCenter的封装,可以向zookeeper进行增删改查

 public SetUpFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final Collection<ElasticJobListener> elasticJobListeners) {
        leaderService = new LeaderService(regCenter, jobName);
        serverService = new ServerService(regCenter, jobName);
        instanceService = new InstanceService(regCenter, jobName);
        reconcileService = new ReconcileService(regCenter, jobName);
        listenerManager = new ListenerManager(regCenter, jobName, elasticJobListeners);
        this.jobName = jobName;
        this.regCenter = regCenter;
    }

public SchedulerFacade(final CoordinatorRegistryCenter regCenter, final String jobName) {
        this.jobName = jobName;
        leaderService = new LeaderService(regCenter, jobName);
        shardingService = new ShardingService(regCenter, jobName);
        executionService = new ExecutionService(regCenter, jobName);
        reconcileService = new ReconcileService(regCenter, jobName);
    }
 public LiteJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners, final TracingConfiguration tracingConfig) {
        configService = new ConfigurationService(regCenter, jobName);
        shardingService = new ShardingService(regCenter, jobName);
        executionContextService = new ExecutionContextService(regCenter, jobName);
        executionService = new ExecutionService(regCenter, jobName);
        failoverService = new FailoverService(regCenter, jobName);
        this.elasticJobListeners = elasticJobListeners;
        this.jobEventBus = null == tracingConfig ? new JobEventBus() : new JobEventBus(tracingConfig);
    }

生成ElasticJobExecutor,对elasticJobjobConfigjobFacade进行赋值,通过JobItemExecutorFactory.getExecutor获取到jobItemExecutorJobExecutorServiceHandlerFactory.getHandler(jobConfig.getJobExecutorServiceHandlerType()).createExecutorService获取到executorService,JobErrorHandlerFactory.getHandler获取到jobErrorHandler,对itemErrorMessages进行赋值。

  public ElasticJobExecutor(final ElasticJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade) {
        this(elasticJob, jobConfig, jobFacade, JobItemExecutorFactory.getExecutor(elasticJob.getClass()));
    }

 private ElasticJobExecutor(final ElasticJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final JobItemExecutor jobItemExecutor) {
        this.elasticJob = elasticJob;
        this.jobFacade = jobFacade;
        this.jobItemExecutor = jobItemExecutor;
        executorContext = new ExecutorContext(jobFacade.loadJobConfiguration(true));
        itemErrorMessages = new ConcurrentHashMap<>(jobConfig.getShardingTotalCount(), 1);
    }

setUpFacade.setUpJobConfiguration会判断是否config路径还没有数据并且需要覆盖配置,如果是就将config写入zookeeper的/jobName/config的路径中,并且写入jobClassName到/jobName的路径中。否则调用load从本地缓存或者zookeeper的config路径读取配置。

private JobConfiguration setUpJobConfiguration(final CoordinatorRegistryCenter regCenter, final String jobClassName, final JobConfiguration jobConfig) {
        ConfigurationService configService = new ConfigurationService(regCenter, jobConfig.getJobName());
        return configService.setUpJobConfiguration(jobClassName, jobConfig);
    }

 public JobConfiguration setUpJobConfiguration(final String jobClassName, final JobConfiguration jobConfig) {
        checkConflictJob(jobClassName, jobConfig);
        // 节点是否存在 || 本地配置是否可以覆盖注册中心。
        if (!jobNodeStorage.isJobNodeExisted(ConfigurationNode.ROOT) || jobConfig.isOverwrite()) {
            // config  node 节点配置
            jobNodeStorage.replaceJobNode(ConfigurationNode.ROOT, YamlEngine.marshal(JobConfigurationPOJO.fromJobConfiguration(jobConfig)));
            jobNodeStorage.replaceJobRootNode(jobClassName);
            return jobConfig;
        }
        return load(false);
    }

  public JobConfiguration load(final boolean fromCache) {
        String result;
        if (fromCache) {
            result = jobNodeStorage.getJobNodeData(ConfigurationNode.ROOT);
            if (null == result) {
                result = jobNodeStorage.getJobNodeDataDirectly(ConfigurationNode.ROOT);
            }
        } else {
            result = jobNodeStorage.getJobNodeDataDirectly(ConfigurationNode.ROOT);
        }
        if (result != null) {
            return YamlEngine.unmarshal(result, JobConfigurationPOJO.class).toJobConfiguration();
        } else {
            throw new JobConfigurationException("JobConfiguration was not found. It maybe has been removed or has not been configured correctly.");
        }
    }

createJobScheduleController是重头戏,首先会创建调用createSchedulercreateJobDetailgetJobConfig().getJobName()获取入参生成JobScheduleController,然后调用 JobRegistry.getInstance().registerJob向本地的map注册JobScheduleController,然后调用registerStartUpInfo注册启动信息。

 private JobScheduleController createJobScheduleController() {
        // 底层为Quartz
        JobScheduleController result = new JobScheduleController(createScheduler(), createJobDetail(), getJobConfig().getJobName());
        JobRegistry.getInstance().registerJob(getJobConfig().getJobName(), result);
        // 注册启动信息
        registerStartUpInfo();
        return result;
    }

先来看JobScheduleController的生成,入参是quartz的SchedulerJobDetailtriggerIdentity
Scheduler通过StdSchedulerFactory并且注入一些quartz的配置参数,然后调用其getScheduler获取,最后注册listener到Scheduler,来处理错过调度到的任务。
JobDetail通过注入jobExecutor来生成,使得任务执行时会调用jobExecutor的execute方法。
triggerIdentity即是jobName。

 private Scheduler createScheduler() {
        Scheduler result;
        try {
            StdSchedulerFactory factory = new StdSchedulerFactory();
            factory.initialize(getQuartzProps());
            result = factory.getScheduler();
            result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
        } catch (final SchedulerException ex) {
            throw new JobSystemException(ex);
        }
        return result;
    }
    
    private Properties getQuartzProps() {
        Properties result = new Properties();
        result.put("org.quartz.threadPool.class", SimpleThreadPool.class.getName());
        result.put("org.quartz.threadPool.threadCount", "1");
        result.put("org.quartz.scheduler.instanceName", getJobConfig().getJobName());
        result.put("org.quartz.jobStore.misfireThreshold", "1");
        result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName());
        result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
        return result;
    }
    
    private JobDetail createJobDetail() {
        JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(getJobConfig().getJobName()).build();
        result.getJobDataMap().put(JOB_EXECUTOR_DATA_MAP_KEY, jobExecutor);
        return result;
    }

接下来是registerStartUpInfo,分别会调用JobRegistryregisterRegistryCenteraddJobInstancesetCurrentShardingTotalCount,最后调用setUpFacade.registerStartUpInfo。

    private void registerStartUpInfo() {
        JobRegistry.getInstance().registerRegistryCenter(jobConfig.getJobName(), regCenter);
        JobRegistry.getInstance().addJobInstance(jobConfig.getJobName(), new JobInstance());
        JobRegistry.getInstance().setCurrentShardingTotalCount(jobConfig.getJobName(), jobConfig.getShardingTotalCount());
        setUpFacade.registerStartUpInfo(!jobConfig.isDisabled());
    }

JobRegistryregisterRegistryCenteraddJobInstancesetCurrentShardingTotalCount三个是将regCenterjobInstancecurrentShardingTotalCount进行缓存,并且初始化对/jobName路径下数据的缓存。

    public void registerRegistryCenter(final String jobName, final CoordinatorRegistryCenter regCenter) {
        regCenterMap.put(jobName, regCenter);
        regCenter.addCacheData("/" + jobName);
    }

    public void addJobInstance(final String jobName, final JobInstance jobInstance) {
        jobInstanceMap.put(jobName, jobInstance);
    }

    public void setCurrentShardingTotalCount(final String jobName, final int currentShardingTotalCount) {
        currentShardingTotalCountMap.put(jobName, currentShardingTotalCount);
    }

setUpFacade.registerStartUpInfo是最终的初始化方法,里面的方法很重要,对每一个都分析一下。

    public void registerStartUpInfo(final boolean enabled) {
        // 所有监听的配置初始化
        listenerManager.startAllListeners();
        //开始选举
        leaderService.electLeader();
        serverService.persistOnline(enabled);
        instanceService.persistOnline();
        if (!reconcileService.isRunning()) {
            reconcileService.startAsync();
        }
    }

listenerManager.startAllListeners会启动所有对zookeeper路径数据变化的listener。

 public void startAllListeners() {
        electionListenerManager.start();
        shardingListenerManager.start();
        failoverListenerManager.start();
        monitorExecutionListenerManager.start();
        shutdownListenerManager.start();
        triggerListenerManager.start();
        rescheduleListenerManager.start();
        guaranteeListenerManager.start();

        //regCenterConnectionStateListener监听的是对zookeeper的连接状态,若连接出现挂起或者丢失状态,则暂停任务,
        // 如果重新连接上,则重新写入server、instance本身节点到zookeeper,清除本身节点的运行节点,然后重新调度定时任务
        jobNodeStorage.addConnectionStateListener(regCenterConnectionStateListener);
    }

electionListenerManager监听以下事件:

  1. 当leader节点被删除或者leader节点不存在时,触发leaderService.electLeader选举。
  2. 如果本身节点被置为不可用并且本身节点是leader,则移除本身节点的leader节点,以此来触发leaderService.electLeader选举。
    class LeaderElectionJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (!JobRegistry.getInstance().isShutdown(jobName) && (isActiveElection(path, data) || isPassiveElection(path, eventType))) {
                leaderService.electLeader();
            }
        }
        
        private boolean isActiveElection(final String path, final String data) {
            return !leaderService.hasLeader() && isLocalServerEnabled(path, data);
        }
        
        private boolean isPassiveElection(final String path, final Type eventType) {
            JobInstance jobInstance = JobRegistry.getInstance().getJobInstance(jobName);
            return !Objects.isNull(jobInstance) && isLeaderCrashed(path, eventType) && serverService.isAvailableServer(jobInstance.getIp());
        }
        
        private boolean isLeaderCrashed(final String path, final Type eventType) {
            return leaderNode.isLeaderInstancePath(path) && Type.NODE_DELETED == eventType;
        }
        
        private boolean isLocalServerEnabled(final String path, final String data) {
            return serverNode.isLocalServerPath(path) && !ServerStatus.DISABLED.name().equals(data);
        }
    }

    class LeaderAbdicationJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (leaderService.isLeader() && isLocalServerDisabled(path, data)) {
                leaderService.removeLeader();
            }
        }
        
        private boolean isLocalServerDisabled(final String path, final String data) {
            return serverNode.isLocalServerPath(path) && ServerStatus.DISABLED.name().equals(data);
        }
    }

shardingListenerManager监听以下事件:

  1. 配置节点中的shardingTotalCount发生改变,调用shardingService.setReshardingFlag,强制任务执行时重新分配分片。
  2. server或者instance节点发生改变,调用shardingService.setReshardingFlag,强制任务执行时重新分配分片。
    class ShardingTotalCountChangedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
                int newShardingTotalCount = YamlEngine.unmarshal(data, JobConfigurationPOJO.class).toJobConfiguration().getShardingTotalCount();
                if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
                    shardingService.setReshardingFlag();
                    JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
                }
            }
        }
    }
    
    class ListenServersChangedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) {
                shardingService.setReshardingFlag();
            }
        }
        
        private boolean isInstanceChange(final Type eventType, final String path) {
            return instanceNode.isInstancePath(path) && Type.NODE_CHANGED != eventType;
        }
        
        private boolean isServerChange(final String path) {
            return serverNode.isServerPath(path);
        }
    }

failoverListenerManager监听以下事件:

  1. instance节点发生下线,将其拥有的失效转移任务再次触发失效转移到其他节点执行。
  2. 配置节点中的isFailover发生改变,若不需要失效转移,则直接清除现有的失效转移任务。
class JobCrashedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (!JobRegistry.getInstance().isShutdown(jobName) && isFailoverEnabled() && Type.NODE_DELETED == eventType && instanceNode.isInstancePath(path)) {
                String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
                if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
                    return;
                }
                List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
                if (!failoverItems.isEmpty()) {
                    for (int each : failoverItems) {
                        failoverService.setCrashedFailoverFlag(each);
                        failoverService.failoverIfNecessary();
                    }
                } else {
                    for (int each : shardingService.getShardingItems(jobInstanceId)) {
                        failoverService.setCrashedFailoverFlag(each);
                        failoverService.failoverIfNecessary();
                    }
                }
            }
        }
    }
    
    class FailoverSettingsChangedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (configNode.isConfigPath(path) && Type.NODE_CHANGED == eventType && !YamlEngine.unmarshal(data, JobConfigurationPOJO.class).toJobConfiguration().isFailover()) {
                failoverService.removeFailoverInfo();
            }
        }
    }

monitorExecutionListenerManager监听以下事件:

  1. 配置节点中的isMonitorExecution发生改变,若不需要提醒执行信息,则直接清除现有的显示执行的节点。
    class MonitorExecutionSettingsChangedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (configNode.isConfigPath(path) && Type.NODE_CHANGED == eventType && !YamlEngine.unmarshal(data, JobConfigurationPOJO.class).toJobConfiguration().isMonitorExecution()) {
                executionService.clearAllRunningInfo();
            }
        }
    }

shutdownListenerManager监听以下事件:

  1. 本身的instance节点被删除,需要调用schedulerFacade.shutdownInstance关闭掉本身节点的定时任务。
    class InstanceShutdownStatusJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (!JobRegistry.getInstance().isShutdown(jobName) && !JobRegistry.getInstance().getJobScheduleController(jobName).isPaused()
                    && isRemoveInstance(path, eventType) && !isReconnectedRegistryCenter()) {
                schedulerFacade.shutdownInstance();
            }
        }
        
        private boolean isRemoveInstance(final String path, final Type eventType) {
            return instanceNode.isLocalInstancePath(path) && Type.NODE_DELETED == eventType;
        }
        
        private boolean isReconnectedRegistryCenter() {
            return instanceService.isLocalJobInstanceExisted();
        }
    }

triggerListenerManager监听以下事件:

  1. 出现指定本身节点的trigger节点,将清除trigger节点,然后调用triggerJob立刻执行一次任务。
    class JobTriggerStatusJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (!InstanceOperation.TRIGGER.name().equals(data) || !instanceNode.isLocalInstancePath(path) || Type.NODE_CHANGED != eventType) {
                return;
            }
            instanceService.clearTriggerFlag();
            if (!JobRegistry.getInstance().isShutdown(jobName) && !JobRegistry.getInstance().isJobRunning(jobName)) {
                // TODO At present, it cannot be triggered when the job is running, and it will be changed to a stacked trigger in the future.
                JobRegistry.getInstance().getJobScheduleController(jobName).triggerJob();
            }
        }
    }

rescheduleListenerManager监听以下事件:

  1. 配置节点中的cron发生改变,则调用rescheduleJob重新调度定时任务。
class CronSettingAndJobEventChangedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (configNode.isConfigPath(path) && Type.NODE_CHANGED == eventType && !JobRegistry.getInstance().isShutdown(jobName)) {
                JobRegistry.getInstance().getJobScheduleController(jobName).rescheduleJob(YamlEngine.unmarshal(data, JobConfigurationPOJO.class).toJobConfiguration().getCron());
            }
        }
    }

guaranteeListenerManager监听以下事件:

  1. start节点发生改变,将调用listener的notifyWaitingTaskStart通知任务启动。
  2. complete发生改变,将调用listener的notifyWaitingTaskComplete通知任务完成。
   class StartedNodeRemovedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (Type.NODE_DELETED == eventType && guaranteeNode.isStartedRootNode(path)) {
                for (ElasticJobListener each : elasticJobListeners) {
                    if (each instanceof AbstractDistributeOnceElasticJobListener) {
                        ((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskStart();
                    }
                }
            }
        }
    }
    
    class CompletedNodeRemovedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (Type.NODE_DELETED == eventType && guaranteeNode.isCompletedRootNode(path)) {
                for (ElasticJobListener each : elasticJobListeners) {
                    if (each instanceof AbstractDistributeOnceElasticJobListener) {
                        ((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskComplete();
                    }
                }
            }
        }
    }

regCenterConnectionStateListener监听的是对zookeeper的连接状态,若连接出现挂起或者丢失状态,则暂停任务,如果重新连接上,则重新写入server、instance本身节点到zookeeper,清除本身节点的运行节点,然后重新调度定时任务。

  @Override
    public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
        if (JobRegistry.getInstance().isShutdown(jobName)) {
            return;
        }
        JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
        if (ConnectionState.SUSPENDED == newState || ConnectionState.LOST == newState) {
            jobScheduleController.pauseJob();
        } else if (ConnectionState.RECONNECTED == newState) {
            serverService.persistOnline(serverService.isEnableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp()));
            instanceService.persistOnline();
            executionService.clearRunningInfo(shardingService.getLocalShardingItems());
            jobScheduleController.resumeJob();
        }
    }

完成所有listener的初始化后,将调用leaderService.electLeader开始选举,将使用zookeeper分布式锁的机制,所有节点都往同一路径下创建顺序节点,只有获取到最小序号的节点会执行LeaderElectionExecutionCallback的execute方法,将本身节点的信息写入leader节点,宣布选举成功。因此后续其他节点再进入LeaderElectionExecutionCallback的execute方法时,发现leader节点拥有数据,自己只能成为follower结束选举流程。

  public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
        try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
            latch.start();
            latch.await();
            callback.execute();
        //CHECKSTYLE:OFF
        } catch (final Exception ex) {
        //CHECKSTYLE:ON
            handleException(ex);
        }
    }

    public void electLeader() {
        log.debug("Elect a new leader now.");
        jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());
        log.debug("Leader election completed.");
    }

   @RequiredArgsConstructor
    class LeaderElectionExecutionCallback implements LeaderExecutionCallback {
        
        @Override
        public void execute() {
            if (!hasLeader()) {
                jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
            }
        }
    }

listener基本上都是围绕server、instance节点来进行处理

接下来serverService.persistOnlineinstanceService.persistOnline就是创建本身节点的server、instance节点。

    public void persistOnline(final boolean enabled) {
        if (!JobRegistry.getInstance().isShutdown(jobName)) {
            jobNodeStorage.fillJobNode(serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(jobName).getIp()), enabled ? ServerStatus.ENABLED.name() : ServerStatus.DISABLED.name());
        }
    }

  public void persistOnline() {
        jobNodeStorage.fillEphemeralJobNode(instanceNode.getLocalInstanceNode(), "");
    }

因为是初始化,还是需要调用shardingService.setReshardingFlag,强制定时任务执行前进行分片。

   public void setReshardingFlag() {
        jobNodeStorage.createJobNodeIfNeeded(ShardingNode.NECESSARY);
    }

最后是reconcileService.startAsync,异步线程会定时检测leader状态触发重新选举和触发重新分片,避免完全依赖listener处理核心逻辑,使得系统拥有一个自检测修复的功能。

    @Override
    protected void runOneIteration() {
        int reconcileIntervalMinutes = configService.load(true).getReconcileIntervalMinutes();
        if (reconcileIntervalMinutes > 0 && (System.currentTimeMillis() - lastReconcileTime >= reconcileIntervalMinutes * 60 * 1000)) {
            lastReconcileTime = System.currentTimeMillis();
            if (leaderService.isLeaderUntilBlock() && !shardingService.isNeedSharding() && shardingService.hasShardingInfoInOfflineServers()) {
                log.warn("Elastic Job: job status node has inconsistent value,start reconciling...");
                shardingService.setReshardingFlag();
            }
        }
    }
    
    @Override
    protected Scheduler scheduler() {
        return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MINUTES);
    }

job的执行

来到job的执行流程解析,在定时任务执行时,会调用到ElasticJobExecutor的execute方法,下面会一步步分析。

/**
     * Execute job.
     */
    public void execute() {
        // 加载 config 配置
        JobConfiguration jobConfig = jobFacade.loadJobConfiguration(true);
        // 配置的重新加载 根据JobExecutorServiceHandlerType 参数是否改变
        executorContext.reloadIfNecessary(jobConfig);

        // 初始化JobErrorHandler ,异常的处理器
        JobErrorHandler jobErrorHandler = executorContext.get(JobErrorHandler.class);
        try {
            // 检查当前任务是否有漏跑的情况 是否执行漏跑
            jobFacade.checkJobExecutionEnvironment();
        } catch (final JobExecutionEnvironmentException cause) {
            jobErrorHandler.handleException(jobConfig.getJobName(), cause);
        }
        // 获取 分片信息。
        ShardingContexts shardingContexts = jobFacade.getShardingContexts();

        // 事件追踪的时间触发
        jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobConfig.getJobName()));
        if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
            jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
                    "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobConfig.getJobName(),
                    shardingContexts.getShardingItemParameters().keySet()));
            return;
        }
        try {
            jobFacade.beforeJobExecuted(shardingContexts);
            //CHECKSTYLE:OFF
        } catch (final Throwable cause) {
            //CHECKSTYLE:ON
            jobErrorHandler.handleException(jobConfig.getJobName(), cause);
        }
        execute(jobConfig, shardingContexts, ExecutionSource.NORMAL_TRIGGER);
        while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
            jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
            execute(jobConfig, shardingContexts, ExecutionSource.MISFIRE);
        }
        jobFacade.failoverIfNecessary();
        try {
            jobFacade.afterJobExecuted(shardingContexts);
            //CHECKSTYLE:OFF
        } catch (final Throwable cause) {
            //CHECKSTYLE:ON
            jobErrorHandler.handleException(jobConfig.getJobName(), cause);
        }
    }

jobFacade.checkJobExecutionEnvironment最终会调用到ConfigurationServicecheckMaxTimeDiffSecondsTolerable方法,原理是通过在zookeeper写入一个节点,然后获取其写入时间,然后与本地时间对比获取时间差,若时间差大于最大容忍的时间差则抛出错误,不执行任务。

   /**
     * Check max time different seconds tolerable between job server and registry center.
     * 
     * @throws JobExecutionEnvironmentException throe JobExecutionEnvironmentException if exceed max time different seconds
     */
    public void checkMaxTimeDiffSecondsTolerable() throws JobExecutionEnvironmentException {
        // maxTimeDiffSeconds 最大容忍时间差 出现漏跑的情况会抛异常 Quartz  SchedulerFactoryBean
        int maxTimeDiffSeconds = load(true).getMaxTimeDiffSeconds();
        if (0 > maxTimeDiffSeconds) {
            return;
        }
        long timeDiff = Math.abs(timeService.getCurrentMillis() - jobNodeStorage.getRegistryCenterTime());
        if (timeDiff > maxTimeDiffSeconds * 1000L) {
            throw new JobExecutionEnvironmentException(
                    "Time different between job server and register center exceed '%s' seconds, max time different is '%s' seconds.", timeDiff / 1000, maxTimeDiffSeconds);
        }
    }

jobFacade.getShardingContexts是获取ShardingContexts的方法,主要是获取到具体的分片items,然后将jobConfig和分片items结合生成ShardingContexts返回。如果是支持failover则返回failover的分片,否则需要调用shardingService.shardingIfNecessary检测是否需要重新分片,然后再获取到分到本节点的分片返回。

  @Override
    public ShardingContexts getShardingContexts() {
        // 故障转移设置
        boolean isFailover = configService.load(true).isFailover();
        if (isFailover) {
            // 当前节点失效 立刻转移到另一个节点
            List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
            if (!failoverShardingItems.isEmpty()) {
                return executionContextService.getJobShardingContext(failoverShardingItems);
            }
        }
        // 是否重新分片
        shardingService.shardingIfNecessary();

        // 获取ShardingItems 分片信息
        List<Integer> shardingItems = shardingService.getLocalShardingItems();
        // 故障转移删除全部分片信息
        if (isFailover) {
            shardingItems.removeAll(failoverService.getLocalTakeOffItems());
        }
        shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
        return executionContextService.getJobShardingContext(shardingItems);
    }

    /**
     * Get job sharding context.
     * 
     * @param shardingItems sharding items
     * @return job sharding context
     */
    public ShardingContexts getJobShardingContext(final List<Integer> shardingItems) {
        JobConfiguration jobConfig = configService.load(false);
        removeRunningIfMonitorExecution(jobConfig.isMonitorExecution(), shardingItems);
        if (shardingItems.isEmpty()) {
            return new ShardingContexts(buildTaskId(jobConfig, shardingItems), jobConfig.getJobName(), jobConfig.getShardingTotalCount(), 
                    jobConfig.getJobParameter(), Collections.emptyMap());
        }
        Map<Integer, String> shardingItemParameterMap = new ShardingItemParameters(jobConfig.getShardingItemParameters()).getMap();
        return new ShardingContexts(buildTaskId(jobConfig, shardingItems), jobConfig.getJobName(), jobConfig.getShardingTotalCount(), 
                jobConfig.getJobParameter(), getAssignedShardingItemParameterMap(shardingItems, shardingItemParameterMap));
    }

shardingService.shardingIfNecessary是重新分片的方法,上文已经说到,如果需要分片,则会在zookeeper加上shard目录加上necessary节点。如果检测到需要重新分片,follower会等待重新分片完成,leader则执行重新分片逻辑,需要先等到正在执行的分片item完成,然后标记重新分片为进行中,调用resetShardingInfo将shard节点调整为正确数量(扩缩容),然后调用jobShardingStrategy的sharding方法,获取到分片结果,最终调用jobNodeStorage.executeInTransaction将分片结果写入到zookeeper。

  /**
     * Sharding if necessary.
     * 是重新分片的方法,如果需要分片,则会在zookeeper加上shard目录加上necessary节点。
     * 如果检测到需要重新分片,follower会等待重新分片完成,leader则执行重新分片逻辑,需要先等到正在执行的分片item完成,
     * 然后标记重新分片为进行中,调用resetShardingInfo将shard节点调整为正确数量(扩缩容),
     * 然后调用jobShardingStrategy的sharding方法,获取到分片结果,最终调用jobNodeStorage.executeInTransaction将分片结果写入到zookeeper
     */
    public void shardingIfNecessary() {
        List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
        // 需要分片的判断
        if (!isNeedSharding() || availableJobInstances.isEmpty()) {
            return;
        }
        // 主节点执行
        if (!leaderService.isLeaderUntilBlock()) {
            blockUntilShardingCompleted();
            return;
        }
        // 分片逻辑 等待所有在运行的作业执行完毕
        // 如果存在正在执行作业中的分片,等待所有作业执行完毕
        waitingOtherShardingItemCompleted();
        // 获取配置
        JobConfiguration jobConfig = configService.load(false);
        int shardingTotalCount = jobConfig.getShardingTotalCount();
        log.debug("Job '{}' sharding begin.", jobName);
        // 分片状态设置为执行中
        jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");

        // 重新设置分片信息 重新调整节点为正确的数量 自动扩缩容
        resetShardingInfo(shardingTotalCount);

        // 根据分片策略获取分片信息
        /**
         * 分片策略:AverageAllocationJobShardingStrategy default 平均分配
         *         OdevitySortByNameJobShardingStrategy         根据作业名的哈希值奇偶数决定IP升降序
         *         RoundRobinByNameJobShardingStrategy          根据作业名的哈希值对服务器列表进行轮转分片
         */
        JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(jobConfig.getJobShardingStrategyType());

        // 信息写入 zk
        /**
         * 1.Check  "/"
         * 2.Create "/Myjob/Sharding/0/instances"
         *          "/Myjob/Sharding/2/instances"
         *          "/Myjob/Sharding/1/instances"
         * 3.Delete "/Myjob/leader/sharding/necessary"
         * 4.Delete "/Myjob/leader/sharding/processing"
         */
        jobNodeStorage.executeInTransaction(getShardingResultTransactionOperations(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
        log.debug("Job '{}' sharding complete.", jobName);
    }

继续回到主流程,拿到ShardingContexts后就调用execute方法执行,忽略一些前后钩子方法的执行,其实就是根据分片号执行任务,如果是只有1个分片,则直接在原线程执行。否则将丢进线程池,然后使用CountDownLatch等待所有任务执行完成返回。

    private void execute(final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
        if (shardingContexts.getShardingItemParameters().isEmpty()) {
            jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobConfig.getJobName()));
            return;
        }
        jobFacade.registerJobBegin(shardingContexts);
        String taskId = shardingContexts.getTaskId();
        jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
        try {
            process(shardingContexts, executionSource);
        } finally {
            // TODO Consider increasing the status of job failure, and how to handle the overall loop of job failure
            jobFacade.registerJobCompleted(shardingContexts);
            if (itemErrorMessages.isEmpty()) {
                jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
            } else {
                jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
            }
        }
    }

private void process(final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
        Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
        if (1 == items.size()) {
            int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
            JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, item);
            process(shardingContexts, item, jobExecutionEvent);
            return;
        }
        CountDownLatch latch = new CountDownLatch(items.size());
        for (int each : items) {
            JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, each);
            if (executorService.isShutdown()) {
                return;
            }
            executorService.submit(() -> {
                try {
                    process(shardingContexts, each, jobExecutionEvent);
                } finally {
                    latch.countDown();
                }
            });
        }
        try {
            latch.await();
        } catch (final InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }

   private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
        jobFacade.postJobExecutionEvent(startEvent);
        log.trace("Job '{}' executing, item is: '{}'.", jobConfig.getJobName(), item);
        JobExecutionEvent completeEvent;
        try {
            jobItemExecutor.process(elasticJob, jobConfig, jobFacade, shardingContexts.createShardingContext(item));
            completeEvent = startEvent.executionSuccess();
            log.trace("Job '{}' executed, item is: '{}'.", jobConfig.getJobName(), item);
            jobFacade.postJobExecutionEvent(completeEvent);
            // CHECKSTYLE:OFF
        } catch (final Throwable cause) {
            // CHECKSTYLE:ON
            completeEvent = startEvent.executionFailure(ExceptionUtils.transform(cause));
            jobFacade.postJobExecutionEvent(completeEvent);
            itemErrorMessages.put(item, ExceptionUtils.transform(cause));
            jobErrorHandler.handleException(jobConfig.getJobName(), cause);
        }
    }

下面来分析一下elasticjob三种job形态。
SimpleJobExecutor直接调用用户定义的elasticJob的execute执行即可。

    @Override
    public void process(final SimpleJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final ShardingContext shardingContext) {
        elasticJob.execute(shardingContext);
    }

ScriptJobExecutor将分片参数转化成命令行,调用底层的命令行执行方法即可。

    @Override
    public void process(final ElasticJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final ShardingContext shardingContext) {
        CommandLine commandLine = CommandLine.parse(getScriptCommandLine(jobConfig.getProps()));
        commandLine.addArgument(GsonFactory.getGson().toJson(shardingContext), false);
        try {
            new DefaultExecutor().execute(commandLine);
        } catch (final IOException ex) {
            throw new JobSystemException("Execute script failure.", ex);
        }
    }
    
    private String getScriptCommandLine(final Properties props) {
        String result = props.getProperty(ScriptJobProperties.SCRIPT_KEY);
        if (Strings.isNullOrEmpty(result)) {
            throw new JobConfigurationException("Cannot find script command line, job is not executed.");
        }
        return result;
    }

DataflowJobExecutor会根据配置选择对应的流式处理模式,如果是oneOffExecute,则调用一次fetchData拉取数据,再调用一次processData处理数据就结束。如果是streamingExecute,则会不断进行拉取处理的循环,直到拉取的数据为空。

 @Override
    public void process(final DataflowJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final ShardingContext shardingContext) {
        if (Boolean.parseBoolean(jobConfig.getProps().getOrDefault(DataflowJobProperties.STREAM_PROCESS_KEY, false).toString())) {
            streamingExecute(elasticJob, jobConfig, jobFacade, shardingContext);
        } else {
            oneOffExecute(elasticJob, shardingContext);
        }
    }
    
    private void streamingExecute(final DataflowJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final ShardingContext shardingContext) {
        List<Object> data = fetchData(elasticJob, shardingContext);
        while (null != data && !data.isEmpty()) {
            processData(elasticJob, shardingContext, data);
            if (!isEligibleForJobRunning(jobConfig, jobFacade)) {
                break;
            }
            data = fetchData(elasticJob, shardingContext);
        }
    }
    
    private boolean isEligibleForJobRunning(final JobConfiguration jobConfig, final JobFacade jobFacade) {
        return !jobFacade.isNeedSharding() && Boolean.parseBoolean(jobConfig.getProps().getOrDefault(DataflowJobProperties.STREAM_PROCESS_KEY, false).toString());
    }
    
    private void oneOffExecute(final DataflowJob elasticJob, final ShardingContext shardingContext) {
        List<Object> data = fetchData(elasticJob, shardingContext);
        if (null != data && !data.isEmpty()) {
            processData(elasticJob, shardingContext, data);
        }
    }
    
    @SuppressWarnings("unchecked")
    private List<Object> fetchData(final DataflowJob elasticJob, final ShardingContext shardingContext) {
        return elasticJob.fetchData(shardingContext);
    }
    
    @SuppressWarnings("unchecked")
    private void processData(final DataflowJob elasticJob, final ShardingContext shardingContext, final List<Object> data) {
        elasticJob.processData(shardingContext, data);
    }
    

继续主流程的错过调度的分片item处理,上文分析到本地的quartz是注册了错过调度listener,如果检测到错过调度,则向zookeeper注册错过调度节点。此时则是取出错过调度节点,然后执行。

       execute(shardingContexts, ExecutionSource.NORMAL_TRIGGER);
        while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
            jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
            execute(shardingContexts, ExecutionSource.MISFIRE);
        }

最后是jobFacade.failoverIfNecessary,也就是再一次取出failover的节点进行执行。

  @Override
    public void failoverIfNecessary() {
        if (configService.load(true).isFailover()) {
            failoverService.failoverIfNecessary();
        }
    }

   public void failoverIfNecessary() {
        if (needFailover()) {
            jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
        }
    }

    private boolean needFailover() {
        return jobNodeStorage.isJobNodeExisted(FailoverNode.ITEMS_ROOT) && !jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).isEmpty()
                && !JobRegistry.getInstance().isJobRunning(jobName);
    }

       @Override
        public void execute() {
            if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {
                return;
            }
            int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
            log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);
            jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
            jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
            // TODO Instead of using triggerJob, use executor for unified scheduling
            JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
            if (null != jobScheduleController) {
                jobScheduleController.triggerJob();
            }
        }

分片策略

JobShardingStrategyFactory为获取分片策略工厂。实现 JobShardingStrategy 接口即可。实现采用SPI

方式加载。

/**
     * Get job sharding strategy.
     * 
     * @param type job sharding strategy type
     * @return job sharding strategy
     */
    public static JobShardingStrategy getStrategy(final String type) {
        if (Strings.isNullOrEmpty(type)) {
            return ElasticJobServiceLoader.getCachedTypedServiceInstance(JobShardingStrategy.class, DEFAULT_STRATEGY).get();
        }
        return ElasticJobServiceLoader.getCachedTypedServiceInstance(JobShardingStrategy.class, type)
                .orElseThrow(() -> new JobConfigurationException("Cannot find sharding strategy using type '%s'.", type));
    }

  1. AverageAllocationJobShardingStrategy

    基于平均分配算法的分片策略,默认的分片策略

    AverageAllocationJobShardingStrategy 策略类实现了 sharding 方法 采用的算法是基于平均分片的策略,节点和分片数可以整除时则平均分配,不能整除则按照节点序号从小到大分配。申请shardingItems容器时多申请一个元素位置就是为了放置不能整除的分片信息。

    具体实现:
    a. 当作业数能被服务器整除情况下,依次平均分配作业片:
    如果有3台服务器, 分成9片, 则每台服务器分到的分片是: 1=[0,1,2], 2=[3,4,5], 3=[6,7,8].
    b. 如果分片不能整除,则不能整除的多余分片将依次追加到序号小的服务器:
    如果有3台服务器, 分成8片, 则每台服务器分到的分片是: 1=[0,1,6], 2=[2,3,7], 3=[4,5].
    如果有3台服务器, 分成10片, 则每台服务器分到的分片是: 1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8].

    该策略缺点:平均分片策略,当分片数小于作业服务器数时,作业会被永远分配序号小的服务器,而导致IP地址靠后的服务器空闲。另外当有多个作业任务的时候,序号小的服务器分配的任务会相对多一些。

    @Override
        public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
            if (jobInstances.isEmpty()) {
                return Collections.emptyMap();
            }
            Map<JobInstance, List<Integer>> result = shardingAliquot(jobInstances, shardingTotalCount);
            addAliquant(jobInstances, shardingTotalCount, result);
            return result;
        }
        
        private Map<JobInstance, List<Integer>> shardingAliquot(final List<JobInstance> shardingUnits, final int shardingTotalCount) {
            Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingUnits.size(), 1);
            //5 2
            int itemCountPerSharding = shardingTotalCount / shardingUnits.size(); //2
            int count = 0;
            for (JobInstance each : shardingUnits) {
                // itemCountPerSharding+1 多申请一个位置
                List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
                for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
                    shardingItems.add(i);
                }
                result.put(each, shardingItems);
                count++;
            }
            return result;
        }
    
        /**
         * 分片算法
         * @param shardingUnits
         * @param shardingTotalCount
         * @param shardingResults
         */
        private void addAliquant(final List<JobInstance> shardingUnits, final int shardingTotalCount, final Map<JobInstance, List<Integer>> shardingResults) {
            // 取模获取剩下未分配的分片信息
            int aliquant = shardingTotalCount % shardingUnits.size();
            int count = 0;
            for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
                if (count < aliquant) {
                    // 5 / 2 * 2 + 0 =4   [0,1,4],[2,3]
                    entry.getValue().add(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count);
                }
                count++;
            }
        }
        
        @Override
        public String getType() {
            return "AVG_ALLOCATION";
        }
    
  2. OdevitySortByNameJobShardingStrategy

    根据作业名的哈希值的奇偶数决定IP升序或降序的算法

    OdevitySortByNameJobShardingStrategy首先 作业名的哈希值为奇数则IP升序. 作业名的哈希值为偶数则IP降序.然后再调用AverageAllocationJobShardingStrategy的平均分片算法进行分片。

    private final AverageAllocationJobShardingStrategy averageAllocationJobShardingStrategy = new AverageAllocationJobShardingStrategy();
        
        @Override
        public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
            long jobNameHash = jobName.hashCode();
            if (0 == jobNameHash % 2) {
                Collections.reverse(jobInstances);
            }
            return averageAllocationJobShardingStrategy.sharding(jobInstances, jobName, shardingTotalCount);
        }
        
        @Override
        public String getType() {
            return "ODEVITY";
        }
    
  3. RoundRobinByNameJobShardingStrategy

    根据作业名字哈希码的绝对值与作业服务器的数量进行轮训

    根据作业的名称的哈希值对实例个数取模来决定,轮转的起始位置,相比平均分片算法,可以一定程度避免多个作业的分片集中在序号小的服务实例上执行。

      private final AverageAllocationJobShardingStrategy averageAllocationJobShardingStrategy = new AverageAllocationJobShardingStrategy();
        
        @Override
        public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
    
            return averageAllocationJobShardingStrategy.sharding(rotateServerList(jobInstances, jobName), jobName, shardingTotalCount);
        }
    
    
        private List<JobInstance> rotateServerList(final List<JobInstance> shardingUnits, final String jobName) {
    
            int shardingUnitsSize = shardingUnits.size();
            // 取模 0,1 ,2 是对服务器进行排序,然后平均分
            int offset = Math.abs(jobName.hashCode()) % shardingUnitsSize;
            if (0 == offset) {
                return shardingUnits;
            }
            List<JobInstance> result = new ArrayList<>(shardingUnitsSize);
            for (int i = 0; i < shardingUnitsSize; i++) {
                int index = (i + offset) % shardingUnitsSize;
                result.add(shardingUnits.get(index));
            }
            return result;
        }
        
        @Override
        public String getType() {
            return "ROUND_ROBIN";
        }
    

监听器

ElasticJob‐Lite提供作业监听器,用于在任务执行前后触发监听器的相关方法。作业监听器分为每台作业节点均执行的常规监听器和分布式场景中仅单一节点执行的分布式监听器(分布式监听器目前有Bug)。而实现自己的常规监听器和分布式监听器,需要通过SPI加入,才能被ElasticJob‐Lite感知到。

ElasticJobListenerFactory作业监听工厂类

   static {
        ElasticJobServiceLoader.registerTypedService(ElasticJobListener.class);
    }
    
    /**
     * Create a job listener instance.
     *
     * @param type job listener type
     * @return optional job listener instance
     */
    public static Optional<ElasticJobListener> createListener(final String type) {
        return ElasticJobServiceLoader.newTypedServiceInstance(ElasticJobListener.class, type, new Properties());
    }

作业监听器是在作业监听器工厂中进行注册的,作业监听器工厂类在完成类初始化后,就已经通过ElasticJobServiceLoader类注册了所有的作业监听器。ElasticJobServiceLoader类的相关代码如下:

 /**
     * Register typeSPI service.
     *
     * @param typedService typed service
     * @param <T> class of service
     */
    public static <T extends TypedSPI> void registerTypedService(final Class<T> typedService) {
        if (TYPED_SERVICES.containsKey(typedService)) {
            return;
        }
        // 使用ServiceLoader类加载服务(作业监听器),然后存储于ConcurrentMap中
        ServiceLoader.load(typedService).forEach(each -> registerTypedServiceClass(typedService, each));
    }
    
    private static <T extends TypedSPI> void registerTypedServiceClass(final Class<T> typedService, final TypedSPI instance) {
        //putIfAbsent和computeIfAbsent都是在key不存在的时候才会建立key和value的映射关系;
        // putIfAbset不论传入的value是否为空,都会建立映射(并不适合所有子类,例如HashTable),
        // 而computeIfAbsent方法,当存入value为空时,不做任何操作putIfAbsent返回的是旧value,当key不存在时返回null;
        // computeIfAbsent返回的都是新的value,即使computeIfAbsent在传入的value为null时,不会新建映射关系,但返回的也是null;
        TYPED_SERVICES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance);
        TYPED_SERVICE_CLASSES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance.getClass());
    }

ServiceLoader类就是Java提供的SPI,SPI(Service Provider Interface)是JDK内置的一种服务提供发现机制,可以用来启用框架扩展和替换组件,主要是被框架的开发人员使用,不同厂商可以针对同一接口做出不同的实现,比如java.sql.Driver接口,MySQL和PostgreSQL都提供了对应的实现给用户使用,而Java的SPI机制可以为某个接口寻找服务实现。Java中SPI机制主要思想是将装配的控制权移到程序之外,在模块化设计中这个机制尤其重要,其核心思想就是解耦。

ServiceLoader类正常工作的唯一要求是服务提供类必须具有无参构造函数,以便它们可以在加载期间实例化。通过在资源目录的META-INF/services中放置服务提供者配置文件来标识服务提供者,文件名是服务类型的完全限定名(比如ElasticJobListener类的完全限定名),该文件包含具体的服务提供者类的完全限定名列表(ElasticJobListener实现类的完全限定名列表),每行一个,每个名称周围的空格和制表符以及空行都将被忽略,该文件必须以UTF-8编码。
集成方案

常规监听器

若作业处理作业节点的文件,处理完成后删除文件,可考虑使用每个节点均执行清理任务。此类型任务实现简单,且无需考虑全局分布式任务是否完成,应尽量使用此类型监听器。

分布式监听器

若作业处理数据库中的数据,处理完成后只需一个节点执行数据清理任务即可。此类型任务处理复杂,需要同步分布式环境下作业的状态,提供了超时设置来避免作业不同步导致的死锁,应谨慎使用。

ZK存储路径信息

image-20230605210346894

Spring集成

esjob和spring进行整合入口通过XML文件进行解析和创建esjob实例

通过继承AbstractBeanDefinitionParser类进行XML解析

image-20210914132951812

ZookeeperRegistryCenter

首先看com.dangdang.ddframe.job.lite.spring.reg.parser.ZookeeperBeanDefinitionParser这个类,这个类解析XML转化成com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration配置对象,并执行com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter的init方法进行初始化。

这个init方法非常 清晰,通过ZookeeperConfiguration配置对象创建一个Curator Zookeeper客户端

 @Override
    public void init() {
        log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists());
        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                .connectString(zkConfig.getServerLists())
                .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()))
                .namespace(zkConfig.getNamespace());
        if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
            builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());
        }
        if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
            builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());
        }
        if (!Strings.isNullOrEmpty(zkConfig.getDigest())) {
            builder.authorization("digest", zkConfig.getDigest().getBytes(StandardCharsets.UTF_8))
                    .aclProvider(new ACLProvider() {
                    
                        @Override
                        public List<ACL> getDefaultAcl() {
                            return ZooDefs.Ids.CREATOR_ALL_ACL;
                        }
                    
                        @Override
                        public List<ACL> getAclForPath(final String path) {
                            return ZooDefs.Ids.CREATOR_ALL_ACL;
                        }
                    });
        }
        client = builder.build();
        client.start();
        try {
            if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
                client.close();
                throw new KeeperException.OperationTimeoutException();
            }
            //CHECKSTYLE:OFF
        } catch (final Exception ex) {
            //CHECKSTYLE:ON
            RegExceptionHandler.handleException(ex);
        }
    }

JobBeanDefinitionParser

对于每个作业的解析都会在org.apache.shardingsphere.elasticjob.lite.spring.namespace.job.parser中执行parseInternal方法进行初始化com.dangdang.ddframe.job.lite.spring.api.ScheduleJobBootstrap执行器。

执行parseInternal方法时会将注册中心对象传入,并且解析XML以创建com.dangdang.ddframe.job.lite.config.LiteJobConfiguration该类。并且此时会将XML写的的监听器进行创建。

Spring boot 集成

非场景启动器

<!-- elastic job-->
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-spring</artifactId>
    <version>3.0.1</version>
</dependency>
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>3.0.1</version>
</dependency>

application.yml配置elasticjob

server:
  port: 9999

# 分布式任务调度自定义配置参数
elasticjob:
  # 注册中心
  reg-center:
    server-list: 192.168.198.155:2182,192.168.198.155:2183,192.168.198.155:2184 #zookeeper地址
    namespace: elastic-job-springboot
  # 作业配置
  jobs:
    list:
      - clazz: com.harvey.demo.job.SpringBootJob
        cron: 0/20 * * * * ?
        shardingTotalCount: 2
        shardingItemParam: 0=text,1=image
        jobParam: name=test
        jobName: SpringBootJob # 本实例中没有特殊含义
      - clazz: com.harvey.demo.job.SpringRunnerJob
        cron: 0/20 * * * * ?
        shardingTotalCount: 3
        shardingItemParam: 0=text,1=image,2=video
        jobParam: name=test
        jobName: SpringRunnerJobs # 本实例中没有特殊含义

注册中心

@Configuration
public class ElasticJobRegistryCenterConfig {

    @Value("${elasticjob.reg-center.server-list}")
    private String zookeeperAddressList;

    @Value("${elasticjob.reg-center.namespace}")
    private String namespace;

    /****
     * 注册中心
     * @return
     */
    @Bean(initMethod = "init")
    public CoordinatorRegistryCenter zkCenter() {
        // 1 zk的配置
        // 参数一:zk的地址,如果是集群,每个地址用逗号隔开
        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(zookeeperAddressList, namespace);
        // 创建协调注册中心
        // 2 CoordinatorRegistryCenter接口,elastic-job提供了一个实现ZookeeperRegistryCenter
        CoordinatorRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
        // 3 注册中心初始化
        //zookeeperRegistryCenter.init();
        return zookeeperRegistryCenter;
    }
}

job配置启动

@Component
@EnableConfigurationProperties
@ConfigurationProperties(prefix = "elasticjob.jobs")
public class ElasticJobConfig implements InitializingBean {

    /**
     * 任务列表job
     */
    private List<JobScheduler> list;
    public void setList(List<JobScheduler> list) {
        this.list = list;
    }

    @Autowired
    private CoordinatorRegistryCenter zkCenter;

    @Autowired
    private SpringContextUtil springContextUtil;

    @Override
    public void afterPropertiesSet() throws Exception {
        //这里可以替换为从数据库查询,将JobScheduler的信息配置到表里
        Map<String, JobScheduler> jobSchedulerMap = new HashMap(list.size());
        for (JobScheduler scheduler : list) {
            jobSchedulerMap.put(scheduler.getClazz(), scheduler);
        }
        //收集所有实现SimpleJob接口的job实例
        Map<String, SimpleJob> simpleJobMap = springContextUtil.getApplicationContext().getBeansOfType(SimpleJob.class);
        Collection<SimpleJob> simpleJobs = simpleJobMap.values();
        for (SimpleJob simpleJob : simpleJobs) {
            //根据job实例的全类名获取JobScheduler信息
            JobScheduler jobScheduler = jobSchedulerMap.get(simpleJob.getClass().getName());
            if (jobScheduler == null) {
                throw new RuntimeException("请配置[" + simpleJob.getClass().getName() + "]的JobScheduler");
            }
            System.out.println("job scheduler init :" + jobScheduler.getClazz());
            SpringJobScheduler springJobScheduler = new SpringJobScheduler(simpleJob, zkCenter,
                    liteJobConfiguration(simpleJob.getClass(), jobScheduler.getCron(), jobScheduler.getShardingTotalCount(),
                            jobScheduler.getShardingItemParam(), jobScheduler.getJobParam(), jobScheduler.getJobName()));
            //启动任务
            springJobScheduler.init();
        }
    }

    private LiteJobConfiguration liteJobConfiguration(final Class<? extends ElasticJob> jobClass,
                                                      final String cron,
                                                      final int shardingTotalCount,
                                                      final String shardingItemParam,
                                                      final String jobParam,
                                                      final String jobName
    ) {
        // 1 job核心配置
        JobCoreConfiguration.Builder builder = JobCoreConfiguration.newBuilder(
                // job的名字
                jobName,
                // cron表达式
                cron,
                // 分片总数
                shardingTotalCount
        );
        // 分片参数不是空,设置分片参数
        if (StringUtils.isNotBlank(shardingItemParam)) {
            builder.shardingItemParameters(shardingItemParam);
        }
        // 任务参数不是空,设置任务参数
        if(StringUtils.isNotBlank(jobParam)){
            builder.jobParameter(jobParam);
        }
        JobCoreConfiguration jobCoreConfiguration = builder.build();
        // 2 job类型配置
        // 参数1: job核心配置JobCoreConfiguration
        // 参数2 job的全类名
        JobTypeConfiguration jobTypeConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName());
        // 3 job根配置(LiteJobConfiguration)
        LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(jobTypeConfiguration)
                .overwrite(true)
                .build();
        return liteJobConfiguration;
    }
}

启动打印日志

img

采用场景启动器starter

<!-- 引入版本的时候要充分考虑自己的Zookeeper Server版本, 建议与其保持一致 -->
<!-- 例如 版本使用的是 zk 3.6.x, Zookeeper Server 也是3.6.x, 如果使用3.4.x则会报错 -->
<dependency>
    <groupId>org.apache.shardingsphere.elasticjob</groupId>
    <artifactId>elasticjob-lite-spring-boot-starter</artifactId>
    <version>3.0.1</version>
</dependency>

application.yml中配置注册中心和作业调度

server:
  port: 9999

#elasticjob配置
elasticjob:
  # 注册中心配置
  reg-center:
    # 连接 ZooKeeper 服务器的列表, 包括 IP 地址和端口号,多个地址用逗号分隔
    server-lists: 127.0.0.1:2188
    # ZooKeeper 的命名空间
    namespace: elastic-job-spring
    # 等待重试的间隔时间的初始毫秒数
    base-sleep-time-milliseconds: 1000
    # 等待重试的间隔时间的最大毫秒数
    maxSleepTimeMilliseconds: 3000
    # 最大重试次数
    maxRetries: 3
    # 会话超时毫秒数
    sessionTimeoutMilliseconds: 60000
    # 连接超时毫秒数
    connectionTimeoutMilliseconds: 15000
  # 作业配置, 更多的配置参考官网
  jobs:
    springJob: # job名
      elasticJobClass: com.harvey.demo.job.SpringBootJob
      cron: 0/5 * * * * ?
      shardingTotalCount: 2
      shardingItemParameters: 0=Beijing,1=Shanghai
    springTestJob:
      elasticJobClass: com.harvey.demo.job.SpringRunnerJob
      cron: 0/10 * * * * ?
      shardingTotalCount: 3
      shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou