PHP RabbitMQ 异步ACK持久化

发布时间 2024-01-10 00:56:26作者: 烈焰螺旋

消息持久化以及手动应答

RabbitMq重启之后queue_declare队列不会丢失

  1. 为了不让队列消失,需要把队列声明为持久化(durable)。为此我们通过queue_declare的第三参数为true:
  2. queue_declare必须在生产者(producer)和消费者(consumer)对应的代码中修改
$channel->queue_declare('queue', false, true, false, false);

  

消息持久化

注意:消息持久化
将消息设为持久化并不能完全保证不会丢失。以上代码只是告诉了RabbitMq要把消息存到硬盘,但从RabbitMq收到消息到保存之间还是有一个很小的间隔时间。因为RabbitMq并不是所有的消息都使用fsync(2)——它有可能只是保存到缓存中,并不一定会写到硬盘中。并不能保证真正的持久化,但已经足够应付我们的简单工作队列。如果你一定要保证持久化,你可以使用publisher confirms。

$msg = new AMQPMessage($data,
       array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
 );

  

手动应答

  1. 回调函数中设置手动应答
  2. 执行回调配置
# 消息
        $msg = new AMQPMessage('swoft_queue_test'.time(),[
            'content_type' => 'text/plain',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, // 消息持久化
        ]);

  

 // 第四参数为自动应答, 设置为false
        $channel->basic_consume('swoft_queue_test','',false,false,false,false,$callback);

  

公平调度
比如有两个工作者(workers),处理奇数消息的比较繁忙,处理偶数消息的比较轻松。然而RabbitMQ并不知道这些,它仍然一如既往的派发消息。

这时因为RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有作出响应。它盲目的把第n-th条消息发给第n-th个消费者。

 

我们可以使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)。

$channel->basic_qos(null, 1, null);

  

完整代码

发送数据

 /**
     * @RequestMapping(route="index")
     */
    public function index()
    {
        $connect = new AMQPStreamConnection('39.105.106.191',5672,'guest','guest');
        $channel = $connect->channel();
        // 第三参数持久化
        $channel->queue_declare('swoft_queue_test', false,true,false,false);
        # 消息
        $msg = new AMQPMessage('swoft_queue_test'.time(),[
            'content_type' => 'text/plain',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, // 消息持久化
        ]);
 
        $channel->confirm_select(); // 发布确认模式
 
        //推送成功
        $channel->set_ack_handler(
            function (AMQPMessage $message) {
                echo "发送成功: " . $message->body . PHP_EOL;
            }
        );
 
        //推送失败
        $channel->set_nack_handler(
            function (AMQPMessage $message) {
                echo "发送失败: " . $message->body . PHP_EOL;
            }
        );
        # 发送
        $channel->basic_publish($msg,'','swoft_queue_test');
        $channel->wait_for_pending_acks();
 
        $channel->close();
        $connect->close();
        return ['code'=>0 ,"msg"=>"发送成功"];
    }

  

接收数据

<?php
 
namespace App\Process;
 
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use Swoft\Bean\Annotation\Mapping\Bean;
use Swoft\Process\UserProcess;
use Swoole\Timer;
 
/**
 * @Bean()
 * 简单模式
 */
class RabbitMQProcess extends UserProcess
{
 
    /**
     * @param \Swoft\Process\Process $process
     * @return void
     */
    public function run(\Swoft\Process\Process $process):void
    {
        $connect = new AMQPStreamConnection('39.105.156.191',5672,'guest','guest');
        $channel = $connect->channel();
        $channel->queue_declare('swoft_queue_test', false,false,false,false,false);
        # 回调
        $callback = function ($msg){
            echo $msg->body.PHP_EOL;
            // 手动应答
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        };
        // 公平调度
        $channel->basic_qos(null, 1, null);
        // 第四参数为自动应答, 设置为false
        $channel->basic_consume('swoft_queue_test','',false,false,false,false,$callback);
        while(count($channel->callbacks)){
            $channel->wait();
        }
        $channel->close();
        $connect->close();
    }
    
}

  

引用:https://blog.csdn.net/s1095622320/article/details/125171173