tornado服务端+tornado.ioloop.PeriodicCallback定时任务踩坑记录及解决方案

发布时间 2023-05-02 18:58:43作者: Jimmy996
背景:用tornado部署一个AI模型的服务端,由于AI模型较慢,收到请求肯定没办法同步返回结果,所以最后定的方案是批处理并异步回调。
异步回调下,我这边的处理方式是:实时接收所有请求并多线程落库(使用数据库连接池),再启动一个定时任务取出库中(未处理过的)数据进行批处理。
因为web框架用的是tornado,所以最开始的实现方式就是使用tornado的定时任务PeriodicCallback,代码如下(该方案有问题:PeriodicCallback任务启动时,调用AI模型处理数据会阻塞ioloop,导致服务端无法接收新的请求):
import tornado.ioloop
import tornado.web
from concurrent.futures import ThreadPoolExecutor
import requests
from pymysqlpool import ConnectionPool
import multiprocessing
import os
import re
import whisper

class Database: """ MySQL 连接池 """ _instance = None def __init__(self, host, port, username, password, db_name, pool_size=5): self.host = host self.port = port self.username = username self.password = password self.db_name = db_name self.pool = ConnectionPool( pool_name="mypool", host=self.host, port=self.port, user=self.username, password=self.password, db=self.db_name, max_pool_size=pool_size, ) def execute_fetch(self, query, args=None): results = [] with self.pool.cursor() as cursor: cursor.execute(query, args) for request in cursor: results.append(request) return results def execute_insert_or_update(self, query, args=None): with self.pool.cursor() as cursor: result = cursor.execute(query, args) return result class TranscribeHandler(tornado.web.RequestHandler): executor = ThreadPoolExecutor(max_workers=20) def initialize(self, database): self.db = database async def post(self): body = tornado.escape.json_decode(self.request.body) aaa= body.get("aaa") bbb = body.get("bbb") ccc= body.get("ccc") try: with ThreadPoolExecutor(max_workers=10) as executor: insert_request_future = tornado.ioloop.IOLoop.current().run_in_executor(executor, self.insert_request, aaa, bbb, ccc) insert_results_future = tornado.ioloop.IOLoop.current().run_in_executor(executor, self.insert_results, aaa) await insert_request_future await insert_results_future self.write({"error_code": 0, "message": "ok"}) except Exception as e: self.write({"error_code": 1, "message": str(e)}) raise e def insert_request(self, aaa, bbb, ccc): # 在每个线程中创建一个数据库连接,执行操作,然后关闭连接 db = self.db db.execute_insert_or_update( "INSERT INTO requests (aaa, bbb, ccc, status) VALUES (%s, %s, %s, %s)", (aaa, bbb, ccc, 'unprocessed') ) def insert_results(self, aaa): db = self.db db.execute_insert_or_update( "INSERT INTO results (aaa) VALUES (%s)", (aaa,)) def batch_process(model, db, batch_size):
#此处为具体批处理代码,略过


if __name__ == "__main__": # Initialize the Whisper model and MySQL connection print("loading model```") model = whisper.load_model("large") model.to('cuda:2') print("loading model done```") db_pool = Database(host='xxxxxx', port=3306, username='xxxx', password='xxx', db_name="xxx", pool_size=20) a # Create a Torndo application and HTTP server app = tornado.web.Application([ (r"/transcribe", TranscribeHandler, dict(database=db_pool)), ]) server = tornado.httpserver.HTTPServer(app) # Schedule a periodic task to process requests in batches scheduler = tornado.ioloop.PeriodicCallback( lambda: batch_process(model, db_pool, batch_size=10), 100000) scheduler.start() print("start batch_process job```") # Start the server server.listen(8000) tornado.ioloop.IOLoop.current().start()

 

询问chatGPT:

 我在执行上述代码tornado.ioloop.PeriodicCallback对象在定期调用batch_process函数时,Tornado就不能接收传入的HTTP请求了,应该怎么解决PeriodicCallback阻塞I/O循环并导致请求超时或无法响应的问题?

答:

方案一:在Tornado中,当处理长时间的计算任务时,应该使用协程来避免I/O阻塞。可以使用Tornado的gen.coroutine装饰器将batch_process函数转化为协程。并使用await关键字等待函数执行完毕。另外,还可以将batch_process函数放在一个线程池中执行,以避免阻塞I/O循环。

方案二:可以使用asyncio库来异步调用batch_process函数,避免阻塞I/O循环。

方案三:您可以将 batch_process 函数转换为协程,然后使用 asyncio.gather 并发运行 batch_process 函数和 Tornado 的事件循环,以避免 Tornado 无法处理新的 HTTP 请求的问题。

方案四:您可以将batch_process函数改成一个协程,并用asyncio.create_task来调用它,这样就可以将它异步地运行。同时,您可以将periodic_task函数变成一个协程,然后在启动服务器前运行asyncio.gather将HTTP服务器和periodic_task都注册到事件循环中,这样两者就可以并发运行了。
方案五:在 Tornado 中,PeriodicCallback 是一个周期性执行的回调函数。在这里,你的周期性任务是在 I/O 循环中执行,这可能会导致阻塞,从而导致无法响应 HTTP 请求。为了解决这个问题,可以将周期性任务设置为使用 asyncio 的事件循环,并使用 asyncio.create_task() 运行批量处理任务。这样,Tornado 的 I/O 循环将不会被阻塞。
...
问了N次,各种解决方案都试了,主要是batch_process改成协程,但无论如何还是会阻塞ioloop...遂放弃使用PeriodicCallback!
 
想起之前另一个项目用到过的apscheduler库来执行定时任务,来问问chatGPT怎么修改。
from apscheduler.schedulers.blocking import BlockingScheduler


if __name__ == "__main__":
# Initialize the Whisper model and MySQL connection
    print("loading model```")
    model = whisper.load_model("large")
    model.to('cuda:2')
    print("loading model done```")
    db_pool = Database(host='xxx',
                       port=3306,
                       username='xxx',
                       password='xxx',
                       db_name="xxx",
                       pool_size=20)

    # Create a Tornado application and HTTP server
    app = tornado.web.Application([
        (r"/transcribe", TranscribeHandler, dict(database=db_pool)),
    ])
    server = tornado.httpserver.HTTPServer(app)

    # Schedule a periodic task to process requests in batches
    print("start batch_process job```")
    # Start the server
    server.listen(8000)
    tornado.ioloop.IOLoop.current().start()
    bscheduler = BlockingScheduler()
    # bscheduler.add_job(push_seeds, 'cron', kwargs={'spider_name': 'YqSpider'}, minute='0', hour='*/2', name='tasks')
    bscheduler.add_job(batch_process(model, db_pool, batch_size=10), 'cron', minute='*', hour='*', name='task1')
以上代码有问题吗?

有问题。在bscheduler.add_job函数中,第一个参数应该是一个函数名,而不是函数的返回值。因此,您需要将batch_process函数定义为一个单独的函数,然后将其传递给add_job函数。此外,如果您想让定期任务在后台运行并不阻塞HTTP请求处理,您需要在server.listen之前启动定期任务。

以下是一个示例代码,可以帮助您实现这一点:

import asyncio
import tornado.httpserver
import tornado.ioloop
import tornado.web
from database import Database
import whisper
from transcribe_handler import TranscribeHandler
from apscheduler.schedulers.background import BackgroundScheduler

def batch_process(model, db, batch_size):
    max_fail_count = 3  # 最大重试次数
    frequests = db.execute_fetch(
        "SELECT request_id, url, footage_type FROM requests WHERE status='unprocessed' ORDER BY id ASC LIMIT %s",
        batch_size)
    print("start batch_process, unprocessed:", len(frequests))

if __name__ == "__main__":
    multiprocessing.set_start_method("spawn")
    # Initialize the Whisper model and MySQL connection
    print("loading model```")
    model = whisper.load_model("large")
    model.to('cuda:2')
    print("loading model done```")
    db_pool = Database(host='xxx',
                       port=3306,
                       username='xxx',
                       password='xxx',
                       db_name="xxx",
                       pool_size=20)

    # Create a Tornado application and HTTP server
    app = tornado.web.Application([
        (r"/transcribe", TranscribeHandler, dict(database=db_pool)),
    ])
    server = tornado.httpserver.HTTPServer(app)

    # Start the server
    server.listen(8000)

    # Schedule a periodic task to process requests in batches
    bscheduler = BackgroundScheduler()
    bscheduler.add_job(batch_process, 'cron', args=[model, db_pool, 10], minute='*', hour='*', name='task1')
    bscheduler.start()

    # Start the IOLoop
    tornado.ioloop.IOLoop.current().start()

测试了一下,搞定!