Springboot+ElasticJob-Lite实现集群任务调度

发布时间 2023-04-05 19:59:07作者: 我有八千部下

前言

ElasticJob-Lite是集群环境下应用(比如SpringCloud微服务)任务调度的解决方案。

集群部署的时候,一个定时任务会有多个进程执行,如果不进行任何处理,会导致任务触发的时候每个进程重复执行一次。

解决办法有两种:一种是加锁,保证同时只有一个进程执行任务,比如用分布式锁,或者用任务调度框架Quartz,但是这种方案有个缺陷,当任务负载比较高的时候,单个进程处理压力比较大;另一种方式是分片,将任务分片到参与的多个进程中,每次执行多个进程一起分摊,解决了单进程负载过高问题,还能提高扩展性,ElasticJob-Lite实现的是这种。

关于ElasticJob-Lite其他丰富功能参考 概念 & 功能

我们假设有一个需求,需要周期性的的对N个用户进行相业务处理,来看看如果使用ElasticJob-Lite,完整例子见 GitHub - fruitbasket-litchi-elasticjob-lite

代码和配置

参考 ShardingSphere - ElasticJob-Lite Spring Boot Starter 配置

引入POM依赖,关键是elasticjob-lite-spring-boot-starter

<properties>
    <java.version>1.8</java.version>
    <maven.compiler.source>${java.version}</maven.compiler.source>
    <maven.compiler.target>${java.version}</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <spring-boot-dependencies.version>2.3.12.RELEASE</spring-boot-dependencies.version>
    <elasticjob-lite-spring-boot-starter.version>3.0.0</elasticjob-lite-spring-boot-starter.version>
    <lombok.version>1.18.22</lombok.version>
</properties>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring-boot-dependencies.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>org.apache.shardingsphere.elasticjob</groupId>
        <artifactId>elasticjob-lite-spring-boot-starter</artifactId>
        <version>${elasticjob-lite-spring-boot-starter.version}</version>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>${lombok.version}</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

先创建个模拟的数据源,用户ID对分片总数取模结果跟分片值进行等值比较,返回跟分片值对应的部分用户ID。

import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
 * 假装这是MySQL或者Redis等数据源
 */
@Component
public class MockDataSource {

    private static final int[] USER_IDS = IntStream.rangeClosed(1, 50).toArray();

    /**
     * 对用户Id取模,过滤得到跟分片ID匹配的结果
     *
     * @param shardingTotalCount 分片总数
     * @param shardingItem       分片值
     * @return 用户Id列表
     */
    public List<Integer> getBy(int shardingTotalCount, int shardingItem) {
        return Arrays.stream(USER_IDS)
                .filter(userId -> userId % shardingTotalCount == shardingItem).boxed().collect(Collectors.toList());
    }
}

创建一个处理处理调度任务的类UserDataflowJob,实现DataflowJob接口中两个方法fetchData()processData()

fetchData()根据分片值(shardingItem)来获取任务相关用户ID,在processData()中执行真正的业务逻辑。

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;

@Slf4j
@Component
public class UserDataflowJob implements DataflowJob<Integer> {

    @Autowired
    private MockDataSource dataSource;

    @Override
    public List<Integer> fetchData(final ShardingContext shardingContext) {
        return dataSource.getBy(shardingContext.getShardingTotalCount(), shardingContext.getShardingItem());
    }

    @Override
    public void processData(final ShardingContext shardingContext, final List<Integer> data) {
        log.info("分片总数={},当前分片值={},分片匹配的用户ID={}"
                , shardingContext.getShardingTotalCount()
                , shardingContext.getShardingItem()
                , data);
        // 执行业务逻辑
    }
}

编辑配置文件(application.yml)。指定N个注册中心Zookeeper的客户端访问地址(serverLists);指定命名空间(elasticjob-lite),程序启动后会在Zookeeper根节点(/)下创建命名空间指定的节点,所有elasticjob相关的数据都在这个命名空间节点下。

jobs分别指定N个调度任务。比如任务名称(dataflowJob)、任务处理类的全限定名(elasticJobClass)、执行周期表达式(cron)、分片总数(shardingTotalCount)、是否覆盖注册中心的任务配置(overwrite)等等。详细配置信息参考 ShardingSphere - ElasticJob-Lite 配置手册

这里指定分片总数为5,那分片值就在0~4之间。

elasticjob:
  regCenter:
    serverLists: zookeeperhost1:2181,zookeeperhost2:2181,zookeeperhost3:2181,zookeeperhost4:2181
    namespace: elasticjob-lite
  jobs:
    userDataflowJob:
      elasticJobClass: cn.fruitbasket.litchi.elasticjob.lite.UserDataflowJob
      cron: 0/5 * * * * ?
      shardingTotalCount: 5
      overwrite: true

测试

当只有一个参与任务调度的进程时,我我们可以看到所有分片都分给了这个进程。分片总数为5,每次触发分配对应数量的线程并行执行,每个线程负责一片。

image-20211202175244406

当我们启动两个参与任务调度进程,可以通过日志看到在下次触发时,分片到了两个进程中。

image-20211202175702928

image-20211202175726336

参考

ShardingSphere - ElasticJob-Lite

ShardingSphere - ElasticJob-Lite Spring Boot Starter 配置

ShardingSphere - ElasticJob-Lite 配置手册