在sqlserver2008中使用自带的消息队列Service Broker

发布时间 2023-09-27 11:13:41作者: Fan丶
  1. 以前有个业务操作本来是用sqlserver的表中触发器来处理的,后来在使用一个存储过程中,涉及到这个表后,发现存储过程执行过程,需要等待涉及的表的触发器操作完成才会返回,导致这个存储过程耗时有点久,这样就出现锁的问题,本来想改造下代码 写到C#中,后来也懒得弄了,就找了找,发现可以用消息队列 Service Broker来异步处理触发器中的逻辑sql操作,然后就各种巴拉巴拉的搜百度,找到了一篇文章
    https://www.51c51.com/baike/xinxi/5/234276.html
    是这
    捣鼓了下,竟然成功了,
  2. 整体看,这个 Service Broker  需要
    • 启动会话。
    • 发送消息。
    • 接收方接收消息,并回复消息到发送方,结束回话。
    • 发送方反馈消息,结束会话。

    四个步骤都不能少

  3. 先定位到消息队列,如图

    需要定义消息类型,约定类型,队列,服务 四个创建,根据前面的文章 就照着抄就行

    1.开启数据库 Broker
    ALTER DATABASE DBName SET NEW_BROKER WITH ROLLBACK IMMEDIATE;
    ALTER DATABASE DBName SET ENABLE_BROKER;
    
    2.创建两个消息类型,定义为XML格式
    CREATE MESSAGE TYPE
           [//Dsend/test/RequestMessage]
           VALIDATION = WELL_FORMED_XML;
    CREATE MESSAGE TYPE
           [//Dsend/test/ReplyMessage]
           VALIDATION = WELL_FORMED_XML;
    
    3.创建两个约定
    CREATE CONTRACT [//Dsend/test/RequestContract]
          ([//Dsend/test/RequestMessage]    
           SENT BY INITIATOR,             ---约定只有发起方才能使用//Dsend/test/RequestMessage消息类型
           [//Dsend/test/ReplyMessage]
           SENT BY TARGET                 ---约定只有答复方才能使用//Dsend/test/ReplyMessage消息类型
          );
    4.创建两个队列,并启用,一个发送 ,一个接受
    CREATE QUEUE RequestQueue WITH STATUS=ON;
    CREATE QUEUE ReplyQueue WITH STATUS=ON;
    
    5.创建两个服务
    CREATE SERVICE
           [//Dsend/test/RequestService]
           ON QUEUE RequestQueue     
    GO
    ---2.创建接答复服务
    CREATE SERVICE
           [//Dsend/test/ReplyService]
           ON QUEUE ReplyQueue
           ([//Dsend/test/RequestContract]
           )
           ;
    GO

    4.上面代码执行后,创建就结束了,下面开始组合操作了,【发送消息】

    --干掉以前的逻辑代码
    --EXEC usp_do_PDA_TaskUpload @TaskId
    
    --替换成发送到队列,我这里比如就发送一个变量@TaskId
    
                    DECLARE @InitDlgHandle UNIQUEIDENTIFIER;
                    DECLARE @RequestMsg NVARCHAR(100);
    
                    BEGIN TRANSACTION;
    
                    BEGIN DIALOG @InitDlgHandle
                         FROM SERVICE
                          [//Dsend/test/RequestService] ---指定的服务是用于答复消息的返回地址
                         TO SERVICE
                          N'//Dsend/test/ReplyService'  ---指定的服务是消息发送到的地址。
                         ON CONTRACT
                          [//Dsend/test/RequestContract]
                         WITH
                             ENCRYPTION = OFF;
    
                    SELECT @InitDlgHandle;
    
                    SELECT @RequestMsg =N'<RequestMsg>'+ CAST(@TaskId AS nvarchar(50))+'</RequestMsg>';
    
                    SEND ON CONVERSATION @InitDlgHandle
                         MESSAGE TYPE 
                         [//Dsend/test/RequestMessage]
                         (@RequestMsg);
    
                    COMMIT TRANSACTION;

    5.我们发送队列后 就需要接受消息了,我们想让他自动接受,而不是手动调用,这就需要我们绑定下发送与接受队列 【接受消息】

    --发送方绑定到一个存储过程
    CREATE QUEUE RequestQueue
    WITH
    STATUS = ON,
    RETENTION = OFF,
    ACTIVATION
    (
        STATUS = ON,
        PROCEDURE_NAME = SayHelloQueueProc_Req,--要处理东西的存储过程
        MAX_QUEUE_READERS = 10,
        EXECUTE AS SELF
    );
    
    --接收方绑定到一个存储过程
    CREATE QUEUE ReplyQueue
    WITH
    STATUS = ON,
    RETENTION = OFF,
    ACTIVATION
    (
        STATUS = ON,
        PROCEDURE_NAME = SayHelloQueueProc, --要处理东西的存储过程
        MAX_QUEUE_READERS = 10,
        EXECUTE AS SELF
    );

     

    6.剩下的就简单了,在2个存储过程中,分别处理不同的逻辑即可,参考如下:
     
    --接收方的存储过程逻辑
    
    ALTER PROCEDURE [dbo].[SayHelloQueueProc]
    AS
    BEGIN
    
      -- 接收句柄.
      DECLARE @RecvReqDlgHandle UNIQUEIDENTIFIER;
      -- 接收到的数据.
      DECLARE @RecvReqMsg XML;
      -- 接收到的数据类型名称.
      DECLARE @RecvReqMsgName sysname;
      --任务id
      declare @mTaskId int
      -- 循环处理.
      WHILE (1=1)
      BEGIN
        -- 开始事务处理.
        BEGIN TRANSACTION;
        -- 尝试从 SayHelloReceiveQueue 队列 接收消息.
        WAITFOR
        ( RECEIVE TOP(1)
            @RecvReqDlgHandle = conversation_handle,
            @RecvReqMsg       = message_body,
            @RecvReqMsgName   = message_type_name
          FROM ReplyQueue
        ), TIMEOUT 1000;
    
        -- 判断有没有获取到消息.
        IF (@@ROWCOUNT = 0)
        BEGIN
          -- 如果没有接收到消息
          -- 回滚事务.
          ROLLBACK TRANSACTION;
          -- 跳出循环.
          BREAK;
        END
    
        -- 如果有数据那么进行处理.
        -- 定义准备用于返回的消息.回复给发送方
    
         set @mTaskId= @RecvReqMsg.value('data(/RequestMsg)[1]', 'int')
            --自己的数据库操作代码
            EXEC usp_do_PDA_TaskUpload @mTaskId;
    
           -- 发送反馈消息
           SEND ON CONVERSATION @RecvReqDlgHandle
             MESSAGE TYPE
               [//Dsend/test/ReplyMessage]
                  ('<ReplyMsg>ok</ReplyMsg>');
    
        IF @RecvReqMsgName =N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' or  @RecvReqMsgName=N'//Dsend/test/RequestMessage'
        BEGIN
            --一定要有
             END CONVERSATION @RecvReqDlgHandle WITH CLEANUP;
        END
        -- 提交事务.
        COMMIT TRANSACTION;
      END
    END
    
    --发送方的存储过程,这个纯粹是为了结束掉回话,不然有历史数据记录,撑爆数据库
    
    ALTER PROCEDURE [dbo].[SayHelloQueueProc_Req]
    AS
    BEGIN
    
      -- 接收句柄.
      DECLARE @RecvReqDlgHandle UNIQUEIDENTIFIER;
      -- 接收到的数据.
      DECLARE @RecvReqMsg NVARCHAR(100);
      -- 接收到的数据类型名称.
      DECLARE @RecvReqMsgName sysname;
      -- 循环处理.
      WHILE (1=1)
      BEGIN
        -- 开始事务处理.
        BEGIN TRANSACTION;
        -- 尝试从 SayHelloReceiveQueue 队列 接收消息.
        WAITFOR
        ( RECEIVE TOP(1)
            @RecvReqDlgHandle = conversation_handle,
            @RecvReqMsg       = message_body,
            @RecvReqMsgName   = message_type_name
          FROM RequestQueue
        ), TIMEOUT 1000;
    
        -- 判断有没有获取到消息.
        IF (@@ROWCOUNT = 0)
        BEGIN
          -- 如果没有接收到消息
          -- 回滚事务.
          ROLLBACK TRANSACTION;
          -- 跳出循环.
          BREAK;
        END
    
        -- 那么进行处理.
        IF @RecvReqMsgName =N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' or  @RecvReqMsgName=N'//Dsend/test/ReplyMessage'
        BEGIN
            END CONVERSATION @RecvReqDlgHandle WITH CLEANUP;
        END
        -- 提交事务.
        COMMIT TRANSACTION;
      END
    END

    7.到底就结束了
    其他操作,
    查看下面这个sql ,如果数据一直增长说明没处理好清理任务
    select * from sys.conversation_endpoints order by  security_timestamp desc
    收不到消息,可以看看这个sql,是不是有错误提示
    SELECT CAST(message_body as xml) message,* FROM sys.transmission_queue