分布式任务elasticjob整合springboot本地多服务启动(附源码)

发布时间 2023-06-27 10:04:07作者: 王权-18K

Elastic-Job是一个分布式调度的解决方案,由当当网开源,它由两个相互独立的子项目Elastic-Job-Lite和ElasticJob-Cloud组成,使用Elastic-Job可以快速实现分布式任务调度。

Elastic-Job的github地址:https://github.com/elasticjob

功能列表: 分布式调度协调 在分布式环境中,任务能够按指定的调度策略执行,并且能够避免同一任务多实例重复执行。 丰富的调度策略: 基于成熟的定时任务作业框架Quartz cron表达式执行定时任务。 弹性扩容缩容 北京市昌平区建材城西路金燕龙办公楼一层 电话:400-618-9090 当集群中增加某一个实例,它应当也能够被选举并执行任务;当集群减少一个实例时,它所执行的任务能被 转移到别的实例来执行。 失效转移 某实例在任务执行失败后,会被转移到其他实例执行。 错过执行作业重触发 若因某种原因导致作业错过执行,自动记录错过执行的作业,并在上次作业完成后自动触发。 支持并行调度 支持任务分片,任务分片是指将一个任务分为多个小任务项在多个实例同时执行。 作业分片一致性 当任务被分片后,保证同一分片在分布式环境中仅一个执行实例。 支持作业生命周期操作 可以动态对任务进行开启及停止操作。 丰富的作业类型 支持Simple、DataFlow、Script三种作业类型,后续会有详细介绍。 Spring整合以及命名空间支持 对Spring支持良好的整合方式,支持spring自定义命名空间,支持占位符。 运维平台 提供运维界面,可以管理作业和注册中心。

本次是用elasticjob本地简单测试多服务器启动定时任务,这里需要安装Zookeeper,这里就不多聊了

1、编写Zookeeper配置类

package elastic.job.demo.autoconfig;

import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @ClassName : ZookeeperAutoConfig
 * @Description :自动配置类
 */
@Configuration
@ConditionalOnProperty("elasticjob.zookeeper.server-list")
@EnableConfigurationProperties(ZookeeperProperties.class)
public class ZookeeperAutoConfig {

    private final ZookeeperProperties zookeeperProperties;

    public ZookeeperAutoConfig(ZookeeperProperties zookeeperProperties) {
        this.zookeeperProperties = zookeeperProperties;
    }

    @Bean(initMethod = "init")
    public CoordinatorRegistryCenter zkCenter(){
        String serverList = zookeeperProperties.getServerlist();
        String namespace = zookeeperProperties.getNamespace();

        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(serverList, namespace);
        ZookeeperRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
        return zookeeperRegistryCenter;
    }
}
package elastic.job.demo.autoconfig;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;

/**
 * @ClassName : ZookeeperProperties
 * @Description :属性配置类
 */
@Getter
@Setter
@ConfigurationProperties(prefix = "elasticjob.zookeeper")
public class ZookeeperProperties {
    //zookeeper地址列表
    private String serverlist;
    //zookeeper命名空间
    private String namespace;

}

2、写完配置类后,我们这里写一个注解,用注解的方式进行实现elasticjob的任务,方便后续的开发和简洁性

package elastic.job.demo.autoconfig;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.TYPE)//表示使用在哪:这里是类,
@Retention(RetentionPolicy.RUNTIME)//表示运行时进行启动
public @interface ElasticSimpleJob {
    String jobName() default "";
    String cron() default "";
    int shardingTotalCount() default 1;
    boolean overwrite() default  false;
}

3、用springboot的自动装载功能,进行注解的扫描和自动注册

package elastic.job.demo.autoconfig;

import com.dangdang.ddframe.job.api.ElasticJob;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.JobRootConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import elastic.job.demo.job.MySimpleJob;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.Map;

/**
 * @ClassName : SimpleJobAutoConfig
 * @Description :
 */
@Configuration
//@Service
@ConditionalOnBean(CoordinatorRegistryCenter.class)
@AutoConfigureAfter(ZookeeperAutoConfig.class)
public class SimpleJobAutoConfig {

    @Autowired
    private CoordinatorRegistryCenter coordinatorRegistryCenter;

    @Autowired
    private ApplicationContext applicationContext;

    //自动注册
    @PostConstruct
    public void initSimpleJob(){
        //获取spring的上下文
        Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(ElasticSimpleJob.class);
        for(Map.Entry<String,Object> entry: beansWithAnnotation.entrySet()){
            Object instance = entry.getValue();
            Class<?>[] interfaces = instance.getClass().getInterfaces();
            for (Class<?> superInterface : interfaces) {
                if(superInterface == SimpleJob.class){
                    ElasticSimpleJob annotation = instance.getClass().getAnnotation(ElasticSimpleJob.class);
                    String jobName = annotation.jobName();
                    String cron = annotation.cron();
                    int shardingTotalCount = annotation.shardingTotalCount();
                    boolean overwrite =annotation.overwrite();
                    //注册定时任务
                    //job 核心配置
                    JobCoreConfiguration buildJcc = JobCoreConfiguration
                            .newBuilder(jobName, cron, shardingTotalCount)
                            .build();
                    //job类型配置
                    SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(
                            buildJcc, instance.getClass().getCanonicalName()
                    );
                    // job配置(LiteJobConfiguration)
                    LiteJobConfiguration buildLiteJobConfiguration = LiteJobConfiguration
                            .newBuilder(simpleJobConfiguration)
                            .overwrite(overwrite)
                            .build();

                    //启动
                    new SpringJobScheduler((ElasticJob) instance,coordinatorRegistryCenter,buildLiteJobConfiguration).init();

                }
            }
        }
    }
}

在springboot添加spring.factories用于扫描并自动注册

 4、我们这里编写2个任务 MySimpleJob 和 MySimpleJob2,

MySimpleJob,5秒一次,总分片数为4,MySimpleJob2,5秒一次,总分片数为1

package elastic.job.demo.job;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import elastic.job.demo.autoconfig.ElasticSimpleJob;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * @ClassName : MySimpleJob
 * @Description : 我的定时任务
 */
@ElasticSimpleJob(jobName = "mySimpleJob",
        cron = "0/5 * * * * ?",
        shardingTotalCount = 4,
        overwrite = true)
@Component
public class MySimpleJob implements SimpleJob {

    @Value("${server.port}")
    private int port;

    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println("端口:" + port  + ",我是任务1分片项:"+shardingContext.getShardingItem()+",总分片数是:"+
                shardingContext.getShardingTotalCount());
    }
}
package elastic.job.demo.job;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import elastic.job.demo.autoconfig.ElasticSimpleJob;
import org.springframework.stereotype.Component;

/**
 * @author zhoucc
 * @date 2023-06-26 16:38
 */
@ElasticSimpleJob(jobName = "mySimpleJob2",
        cron = "0/5 * * * * ?",
        shardingTotalCount = 1,
        overwrite = true)
@Component
public class MySimpleJob2 implements SimpleJob {

    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println("我是任务2分片项:"+shardingContext.getShardingItem()+",总分片数是:"+
                shardingContext.getShardingTotalCount());
    }
}

5、启动效果展示,我这里分别用8081、8082、8083,这3个端口的服务进行演示

(1)启动zookeeper,这里不多讲了,我这里的端口是2181

 (2)启动第一个服务8081端口

可以看到我这里4个分片的任务都在8081上进行执行了。

(3)启动第二个服务8082

 

 可以看到8082只运行了0和1.8081运行2和3.

(4)启动第3个服务8083

 

 

  可以看到8081只运行了2,8082运行0和3,8083运行4

(5)当我们停掉8083后

 

 又回到了 8081只运行了2和3,8083运行0和1

 (6)最后我们看一下zookeeper的页面

0,1,2,3这四个分片,2个服务,这里可以观察具体那个分片在哪个服务器上面进行执行。

最后总结一下,个人的见解:elasticjob首先定义分片策略,将服务器注册到zookpeeper上,进行分片分发,若某个服务终止,那将进行重新的分发。

附上源码包:

 链接:https://pan.baidu.com/s/1ChB9u18jgu_2IvWXFASjKA

提取码:1234

如果对你有帮助,麻烦给个小赞