Elastic-Job使用实践

发布时间 2023-05-30 19:29:38作者: 田野与天

Elastic-Job是一个分布式任务调度框架,它基于Quartz和Zookeeper实现,提供了简单易用的任务调度和分布式任务处理能力。Elastic-Job支持任务的动态添加、删除、暂停和恢复,同时还具备故障转移和弹性扩容的能力。下面是Elastic-Job的介绍以及使用Java代码实现的入门示例:

Elastic-Job的介绍:
Elastic-Job是一个开源的分布式任务调度框架,它解决了传统任务调度框架的单点故障和扩展性问题。Elastic-Job基于分布式协调服务Zookeeper,通过将任务进行分片并分配给多个节点执行,实现了任务的高可用和负载均衡。Elastic-Job提供了简单易用的API和丰富的配置选项,可以灵活地管理和调度各种类型的分布式任务。

下面是使用Java代码实现Elastic-Job的入门示例:

  1. 添加依赖:
    在项目的构建文件(例如Maven的pom.xml)中添加Elastic-Job的依赖:
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>2.1.4</version>
</dependency>
  1. 创建作业:
    创建一个名为MyJob.java的类,并实现SimpleJob接口中的execute方法。示例代码如下:
public class MyJob implements SimpleJob {
    @Override
    public void execute(ShardingContext context) {
        System.out.println("Hello Elastic-Job!");
    }
}
  1. 配置作业:
    创建一个名为JobConfig.java的类,并添加以下代码:
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.JobTypeConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.api.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.api.listener.AbstractDistributeOnceElasticJobListener;
import com.dangdang.ddframe.job.lite.api.listener.AbstractOneOffElasticJobListener;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
import com.dangdang.ddframe.job.lite.api.listener.ShardingContexts;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;
import java.util.Collections;

@Configuration
public class JobConfig {

    @Autowired
    private ZookeeperRegistryCenter zookeeperRegistryCenter;

    @Bean
    public SimpleJob myJob() {
        return new MyJob();
    }

    @Bean(initMethod = "init")
    public JobScheduler simpleJobScheduler(final SimpleJob myJob, final ElasticJobListener elasticJobListener) {
        return new Spring

JobScheduler(myJob, zookeeperRegistryCenter,
                getLiteJobConfiguration(myJob.getClass(), "0/5 * * * * ?", 2), elasticJobListener);
    }

    private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron,
                                                         final int shardingTotalCount) {
        return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(
                JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount).build(),
                jobClass.getCanonicalName())
        ).overwrite(true)
                .build();
    }

    @Bean
    public ElasticJobListener elasticJobListener() {
        return new AbstractOneOffElasticJobListener() {
            @Override
            public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
                System.out.println("Before job executed at last started.");
            }

            @Override
            public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
                System.out.println("After job executed at last completed.");
            }
        };
    }

}
  1. 配置Zookeeper:
    创建一个名为ZookeeperConfig.java的类,并添加以下代码:
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ZookeeperConfig {

    @Value("${zookeeper.serverList}")
    private String serverList;

    @Value("${zookeeper.namespace}")
    private String namespace;

    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter zookeeperRegistryCenter() {
        return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
    }

}
  1. 配置Spring Boot:
    创建一个名为Application.java的类,并添加以下代码:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;

@SpringBootApplication
@Import({ZookeeperConfig.class, JobConfig.class})
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}
  1. 配置application.properties:
    application.properties文件中添加以下配置:
zookeeper.serverList=127.0.0.1:2181
zookeeper.namespace=my-job
  1. 运行应用程序:
    运行Application类的main方法,启动应用程序。

以上是使用Java代码实现Elastic-Job的入门示例。您可以根据自己的需求进行配置和扩展,以满足分布式任务调度的要求。请注意,示例中的代码仅供参考,您可以根据实际情况进行修改和调整。如需更多信息和示例代码,建议您查阅Elastic-Job的官方文档和示例项目。