Elastic-job

发布时间 2023-07-20 11:19:19作者: lwx_R

1.介绍

elastic-job 是由当当网基于quartz 二次开发之后的分布式调度解决方案 , 由两个相对独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成 。
elastic-job主要的设计理念是无中心化的分布式定时调度框架,思路来源于Quartz的基于数据库的高可用方案。但数据库没有分布式协调功能,所以在高可用方案的基础上增加了弹性扩容和数据分片的思路,以便于更大限度的利用分布式服务器的资源。

2.使用

需要zookeeper环境

  • pom.xml
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-spring</artifactId>
    <version>2.1.5</version>
</dependency>
  • application.yml
elasticjob:
  zookeeper-url: localhost:2181
  group-name: shop-job-group
jobCron:
  initSeckillProduct: 0 0/1 * * * ?
  userCache: 0 0/1 * * * ?
  • ElasticJobUtil工具类
public class ElasticJobUtil {
    public static LiteJobConfiguration createJobConfiguration(final Class<? extends SimpleJob> jobClass,
                                                               final String cron,
                                                               final int shardingTotalCount,
                                                               final String shardingItemParameters,
                                                               boolean dataflowType) {
        // 定义作业核心配置
        JobCoreConfiguration.Builder jobCoreConfigurationBuilder = JobCoreConfiguration.newBuilder(jobClass.getSimpleName(), cron, shardingTotalCount);
        if(!StringUtils.isEmpty(shardingItemParameters)){
            jobCoreConfigurationBuilder.shardingItemParameters(shardingItemParameters);
        }
        JobTypeConfiguration jobConfig = null;
        if(dataflowType){
            jobConfig = new DataflowJobConfiguration(jobCoreConfigurationBuilder.build(),jobClass.getCanonicalName(),true);
        }else {
            // 定义SIMPLE类型配置
            jobConfig = new SimpleJobConfiguration(jobCoreConfigurationBuilder.build(), jobClass.getCanonicalName());
        }
        // 定义Lite作业根配置
        LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(jobConfig).overwrite(true).build();
        return simpleJobRootConfig;
    }
    public static LiteJobConfiguration createDefaultSimpleJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron) {
        return createJobConfiguration(jobClass,cron,1,null,false);
    }
    public static LiteJobConfiguration createDefaultDataFlowJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron) {
        return createJobConfiguration(jobClass,cron,1,null,true);
    }
}
  • 配置类
@Configuration
public class BusinessJobConfig {
   
    /**
     * 方法名不能与job类名相同
     * @param registryCenter
     * @param seckillProductJob
     * @return
     */
    @Bean(initMethod = "init")
    public SpringJobScheduler initSPJob(CoordinatorRegistryCenter registryCenter, InitSeckillProductJob seckillProductJob){
        //Job的配置
        LiteJobConfiguration jobConfiguration = ElasticJobUtil.createJobConfiguration(
                                                        seckillProductJob.getClass(), //任务类的字节码
                                                        seckillProductJob.getCron(), //任务类cron表达式
                                                        3, //分片个数
                                                        "0=10,1=12,2=14", //分片参数
                                                        false); //不是dataflow类型
        SpringJobScheduler springJobScheduler = new SpringJobScheduler(seckillProductJob, registryCenter,jobConfiguration );
        return springJobScheduler;
    }
}
  • zookeeper配置类
@Configuration
public class RegistryCenterConfig {
    @Bean(initMethod = "init")
    public CoordinatorRegistryCenter createRegistryCenter(@Value("${elasticjob.zookeeper-url}") String zookeeperUrl, @Value("${elasticjob.group-name}") String groupName) {
        //zk的配置
        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(zookeeperUrl,groupName);
        //设置zk超时时间
        zookeeperConfiguration.setSessionTimeoutMilliseconds(100);
        //创建注册中心
        CoordinatorRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
        return zookeeperRegistryCenter;
    }
}
  • job任务类
@Component
@Setter
@Getter
public class InitSeckillProductJob implements SimpleJob {

    @Value("${jobCron.initSeckillProduct}")
    private String cron;

    /**
     * 使用定时任务 把数据库中的产品放入缓存
     * @param shardingContext
     */
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println("线程名" + Thread.currentThread() + " 分片参数 " + shardingContext.getShardingParameter());
       
    }
}