RabbitMQBundle在从消费者处生成消息时不会确认消息

时间:2023-01-18 09:49:23

How to acknowledge the message and produce a new message at the same time?

如何确认消息并同时生成新消息?

When I start my consumer from the command line, the messages will stay in the original queue. But new ones will be created in the new queue, in an infinite loop. Because it keeps consuming the messages that are not being acknowledged.

当我从命令行启动我的使用者时,消息将保留在原始队列中。但是新的队列将在无限循环中在新队列中创建。因为它不断消耗未被确认的消息。

Even though TRUE is returned in the execute() function of the consumer. Which should acknowledge it, like it says in the documentation.

即使在消费者的execute()函数中返回TRUE。应该承认它,就像它在文档中所说的那样。

I am producing messages from a callback inside a consumer. This producer is injected using the standard Symfony DI.

我正在从消费者内部的回调中产生消息。使用标准Symfony DI注入该生产者。

If I remove the method that publishes the new message, the messages are acknowledged just fine...

如果我删除发布新消息的方法,则确认消息就好了......

services.yml

services.yml

services:
  my_importlog_repository:
    class: Doctrine\ORM\EntityRepository
    factory_service: doctrine.orm.default_entity_manager
    factory_method: getRepository
    arguments: [AppBundle\Entity\MyImportlogEntity]
  my_distributor:
    class: AppBundle\DistributorImport\MyDistributor
    arguments: [@my_importlog_repository,@logger,@old_sound_rabbit_mq.my_download_producer, %my_config%]
  my_download:
    class: AppBundle\Consumer\MyDownloadConsumer
    arguments: [@logger,@old_sound_rabbit_mq.my_extract_producer,@my_distributor,%my_config%]
  my_extract:
    class: AppBundle\Consumer\MyExtractConsumer
    arguments: [@logger,@old_sound_rabbit_mq.my_convert_producer,@my_distributor,%my_config%]

config.yml

config.yml

# rabbitmq
old_sound_rabbit_mq:
  connections:
    default:
      host:     '192.168.99.100'
      port:     5672
      user:     'guest'
      password: 'guest'
      vhost:    '/'
      lazy:     false
      connection_timeout: 60
      read_write_timeout: 60

      # requires php-amqplib v2.4.1+ and PHP5.4+
      keepalive: false

      # requires php-amqplib v2.4.1+
      heartbeat: 30
  producers:
    # my producers
    my_download:
      connection:       default
      exchange_options: {name: 'distributor_import', type: direct}
      queue_options:    {name: 'my_download'}
    my_extract:
      connection:       default
      exchange_options: {name: 'distributor_import', type: direct}
      queue_options:    {name: 'my_extract'}
    my_convert:
      connection:       default
      exchange_options: {name: 'distributor_import', type: direct}
      queue_options:    {name: 'my_convert'}
  consumers:
    # my consumers
    my_download:
      connection:       default
      exchange_options: {name: 'distributor_import', type: direct}
      queue_options:    {name: 'my_download'}
      callback:         my_download
      qos_options:      {prefetch_size: 0, prefetch_count: 1, global: false}
      idle_timeout:     60
    my_extract:
      connection:       default
      exchange_options: {name: 'distributor_import', type: direct}
      queue_options:    {name: 'my_extract'}
      callback:         my_extract
      qos_options:      {prefetch_size: 0, prefetch_count: 1, global: false}
      idle_timeout:     60

MyDownloadConsumer.php

MyDownloadConsumer.php

<?php

namespace AppBundle\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;

class MyDownloadConsumer implements ConsumerInterface
{
  private $logger;
  private $producer;
  private $distributor;
  private $config;

  public function __construct(\Symfony\Component\HttpKernel\Log\LoggerInterface $logger, \OldSound\RabbitMqBundle\RabbitMq\Producer $producer, \AppBundle\DistributorImport\MyDistributor $distributor, Array $config)
  {
    $this->logger = $logger;
    $this->producer = $producer;
    $this->distributor = $distributor;
    $this->config = $config;
  }

  public function execute(\PhpAmqpLib\Message\AMQPMessage $message)
  {
    $data = unserialize($message->body);
    $this->producer->publish(serialize($data));
    return true;
  }
}

If I remove

如果我删除

$data = unserialize($message->body);
$this->producer->publish(serialize($data));

It works like it should...

它的工作原理应该......

1 个解决方案

#1


1  

Was able to publish a message from inside my Consumer execute() method, while also acknowledging the current message being consumed. Using the following code.

能够从我的Consumer execute()方法中发布消息,同时还确认当前正在消费的消息。使用以下代码。

$message->delivery_info['channel']
    ->basic_publish(
        new AMQPMessage (serialize($data)),
        'name_of_my_exchange',
        'key.of.my.routing'
    );

Publishing directly on the channel of the message that is being consumed.

直接在正在使用的消息的通道上发布。

#1


1  

Was able to publish a message from inside my Consumer execute() method, while also acknowledging the current message being consumed. Using the following code.

能够从我的Consumer execute()方法中发布消息,同时还确认当前正在消费的消息。使用以下代码。

$message->delivery_info['channel']
    ->basic_publish(
        new AMQPMessage (serialize($data)),
        'name_of_my_exchange',
        'key.of.my.routing'
    );

Publishing directly on the channel of the message that is being consumed.

直接在正在使用的消息的通道上发布。