如何在 PyQt 中实现异步数据库请求

发布时间 2023-12-07 18:09:56作者: 之一Yo

需求

开发软件的时候不可避免要和数据库发生交互,但是有些 SQL 请求非常耗时,如果在主线程中发送请求,可能会造成界面卡顿。这篇博客将会介绍一种让数据库请求变得和前端的 ajax 请求一样简单,且不会阻塞界面的异步请求方法。

实现过程

在实现异步请求之前,需要先明确一下函数签名:

def sqlRequest(
    service: str, 
    method: str, 
    slot, 
    params: dict = None
)

各个参数的解释如下:

  • service: 业务名
  • method: 接口名
  • slot: 拿到数据后调用的回调函数
  • params: 请求参数

总体流程如下图所示,包括子界面发送请求、数据库线程处理请求、主界面调用回调函数来消费响应结果三个步骤。

image

信号总线

在 Qt 中,子线程无法直接更新主界面,发送信号通知主线程,然后在主线程中更新界面。在之前的博客《如何在 pyqt 中实现全局事件总线》介绍了信号总线的使用,通过引入信号总线,可实现任意层级的组件之间的通信。

本文的信号总线只含有两个信号,一个用来请求数据,一个用来消费数据:

class SignalBus(QObject):
    """ Signal bus """
    fetchDataSig = Signal(SqlRequest)    # 请求数据信号
    dataFetched = Signal(SqlResponse)    # 响应数据信号

    
signalBus = SignalBus()
    
    
class SqlRequest:
    """ Sql request """

    def __init__(self, service: str, method: str, slot=None, params: dict = None):
        self.service = service
        self.method = method
        self.slot = slot
        self.params = params or {}


class SqlResponse:
    """ Sql response """

    def __init__(self, data, slot):
        self.slot = slot
        self.data = data

发送请求

子界面中通过调用 sqlRequest() 函数来发起异步 SQL 请求,该函数只是将参数封装为 SqlRequest 对象,然后通过 signalBusfetchDataSig 信号发送给数据库子线程:

def sqlRequest(service: str, method: str, slot=None, params: dict = None):
    """ query sql from database """
    request = SqlRequest(service, method, slot, params)
    signalBus.fetchDataSig.emit(request)

比如下图中商品类型下拉框的数据就来自于数据库:

image

在组件 LicenseCard 中使用下述代码就能完成数据的请求和消费(组件库参见 https://qfluentwidgets.com/zh/ ):

from qfluentwidgets import HeaderCardWidget, ComboBox

class LicenseCard(HeaderCardWidget):
    
    def __init__(self, parent=None):
        super().__init__("许可证", parent)
        self.goodsComboBox = ComboBox(self)
        
        # 请求商品信息
        sqlRequest("goodsService", "listAll", lambda i: self.onGoodsFetched(i))

    def onGoodsFetched(self, goods: List[Goods]):
        """ 将商品信息添加到下拉框中 """
        for good in goods:
            self.goodsComboBox.addItem(good.name, userData=good)

处理请求

子线程 DatabaseThread 中维护着一个请求队列 tasks,每当收到信号总线的 fetchDataSig 信号时,就会使用反射机制将请求中携带的 servicemethod 字符串转换为数据库业务类的方法指针,并将这个指针添加到队列中等待调用。调用方法返回的数据会被封装为 SqlResponse 对象,接着通过信号总线发送给主界面。

class DatabaseThread(QThread):
    """ Database thread """

    def __init__(self, db: QSqlDatabase = None, parent=None):
        super().__init__(parent=parent)
        self.database = Database(db, self)
        self.tasks = deque()

        # 处理请求信号
        signalBus.fetchDataSig.connect(self.onFetchData)

    def run(self):
        """ 处理请求 """
        while self.tasks:
            task, request = self.tasks.popleft()
            result = task(**request.params)
            signalBus.dataFetched.emit(SqlResponse(result, request.slot))

    def onFetchData(self, request: SqlRequest):
        """ 将请求添加到队列中 """
        service = getattr(self.database, request.service)
        task = getattr(service, request.method)
        self.tasks.append((task, request))

        if not self.isRunning():
            self.start()
                

class Database(QObject):
    """ Database """

    def __init__(self, db: QSqlDatabase = None, parent=None):
        super().__init__(parent=parent)
        self.orderService = OrderService(db)
        self.userService = UserService(db)
        self.goodsService = GoodsService(db)

处理响应结果

主界面中只需将信号总线的 dataFetched 信号连接槽函数,然后在槽函数中对取出 response 对象中的数据,并调用回调函数来消费数据即可:

from qfluentwidgets import MSFluentWindow

class MainWindow(MSFluentWindow):
    """ 主界面 """
    
    def __init__(self):
        super().__init__()
        
        # 处理响应结果
        signalBus.dataFetched.connect(self.onDataFetched)

    def onDataFetched(self, response: SqlResponse):
        if response.slot:
            response.slot(response.data)

总结

在这篇博客中我们使用子线程和信号总线完成了异步数据库请求操作,界面所使用的组件全部来自于 https://qfluentwidgets.com/zh/ ,以上~~