python 最近的动态简单的任务调度系统 apscheduler

发布时间 2023-05-23 15:18:31作者: vx_guanchaoguo0

直接上代码

from apscheduler.triggers.interval import IntervalTrigger
from flask import Flask, render_template, request, redirect, jsonify
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime

app = Flask(__name__)

# 创建调度器
scheduler = BackgroundScheduler()


# 定义要执行的作业
def print_time():
    print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))


# 添加默认作业
scheduler.add_job(print_time, 'interval', minutes=1)

# 启动调度器
scheduler.start()


# 定义视图函数
@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'POST':
        form = request.get_json()
        if form['action'] == 'add':
            # 用户要添加新任务
            job_name = form['job_name']
            interval = int(form['interval'])

            # 添加调度任务
            scheduler.add_job(func=print_time, trigger='interval', seconds=interval, id=job_name)
            return jsonify({
                "action": "add",
            })

        elif form['action'] == 'remove':
            # 用户要删除任务
            job_name = form['job_name']
            scheduler.remove_job(job_name)
            return jsonify({
                "action": "remove",
            })
        elif form['action'] == 'modify':
            # 用户要修改任务
            job_name = form['job_name']
            interval = int(form['interval'])
            scheduler.modify_job(job_name, trigger=IntervalTrigger(seconds=interval))
            return jsonify({
                "action": "modify",
            })


if __name__ == '__main__':
    app.run()

测试任务 pycharm http.api

#### add
POST http://127.0.0.1:5000
Content-Type: application/json

{
  "action": "add",
  "job_name": "test_job",
  "interval": 5
}

#### remove
POST http://127.0.0.1:5000
Content-Type: application/json

{
  "action": "remove",
  "job_name": "test_job",
  "interval": 5
}

#### modify
POST http://127.0.0.1:5000
Content-Type: application/json

{
  "action": "modify",
  "job_name": "test_job",
  "interval": 2
}