消费者不承认来自RabbitMq的消息

时间:2022-06-27 09:49:27

I have create a simple publisher and a consumer which subscribes on the queue using basic.consume.

我创建了一个简单的发布者和使用basic.consume在队列中订阅的使用者。

My consumer acknowledges the messages when the job runs without an exception. Whenever I run into an exception I don´t ack the message and return early. Only the acknowledged messages disappear from the queue, so that´s working correctly.
Now I want the consumer to pick up the failed messages again, but the only way to reconsume those messages is by restarting the consumer.

我的消费者在作业无异常运行时确认消息。每当我遇到异常时,我都不会收到消息并提前返回。只有已确认的消息才会从队列中消失,因此工作正常。现在我希望消费者再次获取失败的消息,但重建这些消息的唯一方法是重新启动消费者。

How do I need to approach this use case?

我该如何处理这个用例?

Setup code

$channel = new AMQPChannel($connection);

$exchange = new AMQPExchange($channel);

$exchange->setName('my-exchange');
$exchange->setType('fanout');
$exchange->declare();

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declare();
$queue->bind('my-exchange');

Consumer code

$queue->consume(array($this, 'callback'));

public function callback(AMQPEnvelope $msg)
{
    try {
        //Do some business logic
    } catch (Exception $ex) {
        //Log exception
        return;
    }
    return $queue->ack($msg->getDeliveryTag());
}

Producer code

$exchange->publish('message');

2 个解决方案

#1


18  

If message was not acknowledged and application fails, it will be redelivered automatically and redelivered property on envelope will be set to true (unless you consume them with no-ack = true flag).

如果消息未被确认且应用程序失败,它将自动重新传递,并且信封上的redelivered属性将设置为true(除非您使用no-ack = true标志消耗它们)。

UPD:

You have to nack message with redelivery flag in your catch block

你必须在catch块中使用redelivery标志来解密消息

    try {
        //Do some business logic
    } catch (Exception $ex) {
        //Log exception
        return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE);
    }

Beware infinitely nacked messages while redelivery count doesn't implemented in RabbitMQ and in AMQP protocol at all.

谨防无限次消息,而重新传递计数在RabbitMQ和AMQP协议中根本没有实现。

If you doesn't want to mess with such messages and simply want to add some delay you may want to add some sleep() or usleep() before nack method call, but it is not a good idea at all.

如果你不想搞这些消息而只是想添加一些延迟,你可能想在nack方法调用之前添加一些sleep()或usleep(),但这根本不是一个好主意。

There are multiple techniques to deal with cycle redeliver problem:

有多种技术可以处理循环重新传递问题:

1. Rely on Dead Letter Exchanges

1.依靠死信交换

  • pros: reliable, standard, clear
  • 优点:可靠,标准,清晰

  • cons: require additional logic
  • 缺点:需要额外的逻辑

2. Use per message or per queue TTL

2.使用每条消息或每个队列TTL

  • pros: easy to implement, also standard, clear
  • 优点:易于实施,也标准,清晰

  • cons: with long queues you may loose some message
  • 缺点:排长队你可能会失去一些信息

Examples (note, that for queue ttl we pass only number and for message ttl - anything that will be numeric string):

示例(注意,对于队列ttl,我们只传递数字和传递消息ttl - 任何将是数字字符串的东西):

2.1 Per message ttl:

2.1每条消息ttl:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish(
    'message at ' . microtime(true),
    null,
    AMQP_NOPARAM,
    array(
        'expiration' => '1000'
    )
);

2.2. Per queue ttl:

2.2。每队列ttl:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->setArgument('x-message-ttl', 1000);
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish('message at ' . microtime(true));

3. Hold redelivers count or left redelivers number (aka hop limit or ttl in IP stack) in message body or headers

3.在邮件正文或标题中保留redelivers计数或保留redelivers编号(即IP堆栈中的跳跃限制或ttl)

  • pros: give you extra control on messages lifetime on application level
  • 专业人士:在应用程序级别为您提供额外的消息生存期控制

  • cons: significant overhead while you have to modify message and publish it again, application specific, not clear
  • 缺点:当您必须修改消息并再次发布消息时,显着的开销,特定于应用程序,不清楚

Code:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish(
    'message at ' . microtime(true),
    null,
    AMQP_NOPARAM,
    array(
        'headers' => array(
            'ttl' => 100
        )
    )
);

$queue->consume(
    function (AMQPEnvelope $msg, AMQPQueue $queue) use ($exchange) {
        $headers = $msg->getHeaders();
        echo $msg->isRedelivery() ? 'redelivered' : 'origin', ' ';
        echo $msg->getDeliveryTag(), ' ';
        echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' ';
        echo $msg->getBody(), PHP_EOL;

        try {
            //Do some business logic
            throw new Exception('business logic failed');
        } catch (Exception $ex) {
            //Log exception
            if (isset($headers['ttl'])) {
                // with ttl logic

                if ($headers['ttl'] > 0) {
                    $headers['ttl']--;

                    $exchange->publish($msg->getBody(), $msg->getRoutingKey(), AMQP_NOPARAM, array('headers' => $headers));
                }

                return $queue->ack($msg->getDeliveryTag());
            } else {
                // without ttl logic
                return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); // or drop it without requeue
            }

        }

        return $queue->ack($msg->getDeliveryTag());
    }
);

There are may be some other ways to better control message redelivers flow.

可能有一些其他方法可以更好地控制消息redelivers流。

Conclusion: there are no silver bullet solution. You have to decide what solution fit your need the best or find out something other, but don't forget to share it here ;)

结论:没有银弹解决方案。您必须确定最适合您需求的解决方案或找到其他解决方案,但不要忘记在此处分享;)

#2


0  

If you do not want to restart the consumer, then basic.recover AMQP command may be what you want. According to AMQP protocol:

如果您不想重新启动使用者,那么basic.recover AMQP命令可能就是您想要的。根据AMQP协议:

basic.recover(bit requeue)

Redeliver unacknowledged messages.

This method asks the server to redeliver all unacknowledged messages on a specified channel. 
Zero or more messages may be redelivered. This method replaces the asynchronous Recover. 

#1


18  

If message was not acknowledged and application fails, it will be redelivered automatically and redelivered property on envelope will be set to true (unless you consume them with no-ack = true flag).

如果消息未被确认且应用程序失败,它将自动重新传递,并且信封上的redelivered属性将设置为true(除非您使用no-ack = true标志消耗它们)。

UPD:

You have to nack message with redelivery flag in your catch block

你必须在catch块中使用redelivery标志来解密消息

    try {
        //Do some business logic
    } catch (Exception $ex) {
        //Log exception
        return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE);
    }

Beware infinitely nacked messages while redelivery count doesn't implemented in RabbitMQ and in AMQP protocol at all.

谨防无限次消息,而重新传递计数在RabbitMQ和AMQP协议中根本没有实现。

If you doesn't want to mess with such messages and simply want to add some delay you may want to add some sleep() or usleep() before nack method call, but it is not a good idea at all.

如果你不想搞这些消息而只是想添加一些延迟,你可能想在nack方法调用之前添加一些sleep()或usleep(),但这根本不是一个好主意。

There are multiple techniques to deal with cycle redeliver problem:

有多种技术可以处理循环重新传递问题:

1. Rely on Dead Letter Exchanges

1.依靠死信交换

  • pros: reliable, standard, clear
  • 优点:可靠,标准,清晰

  • cons: require additional logic
  • 缺点:需要额外的逻辑

2. Use per message or per queue TTL

2.使用每条消息或每个队列TTL

  • pros: easy to implement, also standard, clear
  • 优点:易于实施,也标准,清晰

  • cons: with long queues you may loose some message
  • 缺点:排长队你可能会失去一些信息

Examples (note, that for queue ttl we pass only number and for message ttl - anything that will be numeric string):

示例(注意,对于队列ttl,我们只传递数字和传递消息ttl - 任何将是数字字符串的东西):

2.1 Per message ttl:

2.1每条消息ttl:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish(
    'message at ' . microtime(true),
    null,
    AMQP_NOPARAM,
    array(
        'expiration' => '1000'
    )
);

2.2. Per queue ttl:

2.2。每队列ttl:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->setArgument('x-message-ttl', 1000);
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish('message at ' . microtime(true));

3. Hold redelivers count or left redelivers number (aka hop limit or ttl in IP stack) in message body or headers

3.在邮件正文或标题中保留redelivers计数或保留redelivers编号(即IP堆栈中的跳跃限制或ttl)

  • pros: give you extra control on messages lifetime on application level
  • 专业人士:在应用程序级别为您提供额外的消息生存期控制

  • cons: significant overhead while you have to modify message and publish it again, application specific, not clear
  • 缺点:当您必须修改消息并再次发布消息时,显着的开销,特定于应用程序,不清楚

Code:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish(
    'message at ' . microtime(true),
    null,
    AMQP_NOPARAM,
    array(
        'headers' => array(
            'ttl' => 100
        )
    )
);

$queue->consume(
    function (AMQPEnvelope $msg, AMQPQueue $queue) use ($exchange) {
        $headers = $msg->getHeaders();
        echo $msg->isRedelivery() ? 'redelivered' : 'origin', ' ';
        echo $msg->getDeliveryTag(), ' ';
        echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' ';
        echo $msg->getBody(), PHP_EOL;

        try {
            //Do some business logic
            throw new Exception('business logic failed');
        } catch (Exception $ex) {
            //Log exception
            if (isset($headers['ttl'])) {
                // with ttl logic

                if ($headers['ttl'] > 0) {
                    $headers['ttl']--;

                    $exchange->publish($msg->getBody(), $msg->getRoutingKey(), AMQP_NOPARAM, array('headers' => $headers));
                }

                return $queue->ack($msg->getDeliveryTag());
            } else {
                // without ttl logic
                return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); // or drop it without requeue
            }

        }

        return $queue->ack($msg->getDeliveryTag());
    }
);

There are may be some other ways to better control message redelivers flow.

可能有一些其他方法可以更好地控制消息redelivers流。

Conclusion: there are no silver bullet solution. You have to decide what solution fit your need the best or find out something other, but don't forget to share it here ;)

结论:没有银弹解决方案。您必须确定最适合您需求的解决方案或找到其他解决方案,但不要忘记在此处分享;)

#2


0  

If you do not want to restart the consumer, then basic.recover AMQP command may be what you want. According to AMQP protocol:

如果您不想重新启动使用者,那么basic.recover AMQP命令可能就是您想要的。根据AMQP协议:

basic.recover(bit requeue)

Redeliver unacknowledged messages.

This method asks the server to redeliver all unacknowledged messages on a specified channel. 
Zero or more messages may be redelivered. This method replaces the asynchronous Recover.