RabbitMQ之消息确认机制

发布时间 2023-05-25 17:33:40作者: TaylorSWMM

RabbitMQ之消息确认机制

标签(空格分隔): php,rabbitmq

在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎么解决呢?

RabbitMQ为我们提供了两种方式:

通过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案;
通过将channel设置成confirm模式来实现;

事物机制

$exChangeName = "exchange_02";
$queueName = 'queue_02';
$options = [
    'host' => '127.0.0.1',
    'port' => '5672',
    'vhost' => '/',
    'login' => 'guest',
    'password' => 'guest'
];
$connect = new \AMQPConnection($options);
try {
    $isConnect = $connect->connect();
    if (!$isConnect) {
        exit("connect rabbitmq error");
    }
} catch (AMQPConnectionException $e) {
    exit("connect rabbitmq error");
}
$chan = new \AMQPChannel($connect);
// 开启事物
$chan->startTransaction();
$ex = new \AMQPExchange($chan);
$ex->setName($exChangeName);
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE); // 持久化
$ex->declareExchange();
$bool = $ex->publish("HELLO WORLD", $queueName, AMQP_NOPARAM, ['delivery_mode' => 2]);
$chan->rollbackTransaction(); // 事物回滚
$chan->commitTransaction(); // 事物提交

Confirm

confirm PHP扩展AMQP 我自己试好像不支持异步的 实际业务中应该是用不到


$exChangeName = "exchange_02";
$queueName = 'queue_02';
$options = [
    'host' => '127.0.0.1',
    'port' => '5672',
    'vhost' => '/',
    'login' => 'guest',
    'password' => 'guest'
];
$connect = new \AMQPConnection($options);
try {
    $isConnect = $connect->connect();
    if (!$isConnect) {
        exit("connect rabbitmq error");
    }
} catch (AMQPConnectionException $e) {
    exit("connect rabbitmq error");
}
$chan = new \AMQPChannel($connect);
// 开启
$chan->confirmSelect();
// 设置comfirm ack、nack 回调
$chan->setConfirmCallback(function (int $delivery_tag, bool $multiple) {
    $data = var_export([
        'time' => date('Y-m-d H:i:s'),
        'type' => 'ack',
        'params' => func_get_args()
    ], true);
    file_put_contents("./rabbitmq_log.log", $data . PHP_EOL, FILE_APPEND);
}, function (int $delivery_tag, bool $multiple, bool $requeue) {
    $data = var_export([
        'time' => date('Y-m-d H:i:s'),
        'type' => 'nack',
        'params' => func_get_args()
    ], true);
    file_put_contents("./rabbitmq_log.log", $data . PHP_EOL, FILE_APPEND);
});
$ex = new \AMQPExchange($chan);
$ex->setName($exChangeName);
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE); // 持久化
$ex->declareExchange();
$bool = $ex->publish("HELLO WORLD", $queueName, AMQP_NOPARAM, ['delivery_mode' => 2]);
// 等待回应 可以设置时间,这基本用不到,直接阻塞了
$chan->waitForConfirm();