powerjob 任务与工作流配置示例

发布时间 2023-10-12 17:18:50作者: tomoncle

powerjob 任务与工作流配置示例

官方文档:https://www.yuque.com/powerjob/guidence/ysug77

参数描述

该截图来自项目官方文档:

image

官方处理器(内置的任务处理器)

官方文档:https://www.yuque.com/powerjob/guidence/official_processor
官方源码:https://github.com/PowerJob/PowerJob/tree/v4.3.6/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl

功能验证处理器

  • 类名:tech.powerjob.official.processors.impl.VerificationProcessor
  • 参数:json 或 空.

配置处理器

超简易的配置中心,用于配置的下发,需要配合秒级 + 广播任务使用!
超低成本下的解决方案,强配置 or 高SLA 场景,请使用标准的配置管理中间件。

  • 类名:tech.powerjob.official.processors.impl.ConfigProcessor
  • 参数:json

Shell 处理器

  • 类名:tech.powerjob.official.processors.impl.script.ShellProcessor
  • 参数:填写需要处理的 Shell 脚本(直接复制文件内容)或脚本下载链接(http://xxx.sh

PowerShell 处理器

  • 类名:tech.powerjob.official.processors.impl.script.PowerShellProcessor
  • 参数:填写需要处理的 PowerShell 脚本(直接复制文件内容)或脚本下载链接(http://xxx.ps1

CMDProcessor 处理器

  • 类名:tech.powerjob.official.processors.impl.script.CMDProcessor
  • 参数:填写需要处理的 BAT 脚本(直接复制文件内容)或脚本下载链接(http://xxx.bat

Python 处理器

注意:Python 处理器会使用运行worker机器的 python 命令执行,因此 python 版本需要与本机 python 环境保持一致!

  • 类名: tech.powerjob.official.processors.impl.script.PythonProcessor
  • 参数: 填写需要处理的 Python 脚本(直接复制文件内容)或脚本下载链接(http://xxx.py

HTTP 处理器

  • 类名: tech.powerjob.official.processors.impl.HttpProcessor
  • 参数(JSON): {"method":"GET","url":"http://www.baidu.com"}

文件清理处理器

注意:文件删除是高危操作,请慎用该处理器。默认情况下该处理器不可用,需要传入JVM参数
-Dpowerjob.official-processor.file-cleanup.enable=true 开启

  • 类名: tech.powerjob.official.processors.impl.FileCleanupProcessor
  • 参数(JSONArray): [{"dirPath":"/tmp/test"}, {"dirPath":"/var/log/nginx", "retentionTime":24}]
整体参数为 array,array 中的每个元素为 JSON,描述需要清理的资源,每个节点的参数如下:
● dirPath      :待删除文件的文件夹目录(会递归查找该目录下所有符合要求的文件)
● filePattern  :待删除文件名称的 Java 版正则表达式
● retentionTime:待删除文件的保留时间,单位为小时(当前时间 - 待删除文件上次编辑时间 > retentionTime 的文件才会被删除),用于保留某些滚动日志,0 代表忽略该规则

由于 JSON 内传递正则表达式需要转义,强烈建议先用 Java 代码生成配置(JSONObject#put, JSONArray#add),再调用 toJSONString 方法生成参数。

SQL 处理器

官方文档:https://www.yuque.com/powerjob/guidence/official_processor#9LrIO

目前内置了两款 SQL 处理器,均支持自定义 SQL 的校验、解析逻辑,主要区别在于数据源连接的获取方式不同。

SpringDatasourceSqlProcessor

默认情况下在初始化的时候需要至少注入一个数据源,所以必须提前手动初始化并注册到 Spring IOC 容器中,以 SpringBean 的方式进行加载。

允许使用SpringDatasourceSqlProcessor#registerDataSource方法注册多个数据源

建议:最好将该 SQL Processor 用的数据库连接池和其他业务模块用的数据库连接池隔离开,不要共用一个连接池!

初始化 SpringDatasourceSqlProcessor 示例代码:

@Configuration
public class SqlProcessorConfiguration {
 
 
    @Bean
    @DependsOn({"initPowerJob"})
    public DataSource sqlProcessorDataSource() {
        String path = System.getProperty("user.home") + "/test/h2/" + CommonUtils.genUUID() + "/";
        String jdbcUrl = String.format("jdbc:h2:file:%spowerjob_sql_processor_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false", path);
        HikariConfig config = new HikariConfig();
        config.setDriverClassName(Driver.class.getName());
        config.setJdbcUrl(jdbcUrl);
        config.setAutoCommit(true);
        // 池中最小空闲连接数量
        config.setMinimumIdle(1);
        // 池中最大连接数量
        config.setMaximumPoolSize(10);
        return new HikariDataSource(config);
    }
 
 
    @Bean
    public SpringDatasourceSqlProcessor simpleSpringSqlProcessor(@Qualifier("sqlProcessorDataSource") DataSource dataSource) {
        SpringDatasourceSqlProcessor springDatasourceSqlProcessor = new SpringDatasourceSqlProcessor(dataSource);
        // do nothing
        springDatasourceSqlProcessor.registerSqlValidator("fakeSqlValidator", sql -> true);
        // 排除掉包含 drop 的 SQL
        springDatasourceSqlProcessor.registerSqlValidator("interceptDropValidator", sql -> sql.matches("^(?i)((?!drop).)*$"));
        // do nothing
        springDatasourceSqlProcessor.setSqlParser((sql, taskContext) -> sql);
        return springDatasourceSqlProcessor;
    }
 
}
  • 类名:tech.powerjob.official.processors.impl.sql.SpringDatasourceSqlProcessor
  • 参数:{"dataSourceName":"default","sql":"select 'x' from t_example","timeout":10,"showResult":true}

DynamicDatasourceSqlProcessor

支持通过参数动态指定数据源连接,在指定的数据库执行 SQL。
默认情况下该处理器不可用,需要传入JVM参数 -Dpowerjob.official-processor.dynamic-datasource.enable=true 开启。

  • 类名:tech.powerjob.official.processors.impl.sql.DynamicDatasourceSqlProcessor
  • 参数:{"sql":"select 'x' from t_example","timeout":10,"showResult":true,"jdbcUrl":"jdbc:mysql://myhost1:3306/db_name?user=root&password=mypass",}

任务类别

crontab

周期性任务:使用cron表达式0 * * * * ? *触发,右侧支持验证表达式语法

HTTP请求:
image

Shell处理:
image

Python脚本:
image

固定频率

根据当前任务的触发时间,间隔30秒,触发下一次任务

image

固定延迟

根据任务当前任务的结束时间,往后延迟10秒触发下次任务

image

每日固定间隔

指定每天(或者周几)的哪个时间段执行,任务间隔(秒为单位)

image

API类型

我理解的是根据API(或者在页面上点击按钮)触发,还没详细测试这个功能.

image