How to acknowledge the message and produce a new message at the same time?
如何确认消息并同时生成新消息?
When I start my consumer from the command line, the messages will stay in the original queue. But new ones will be created in the new queue, in an infinite loop. Because it keeps consuming the messages that are not being acknowledged.
当我从命令行启动我的使用者时,消息将保留在原始队列中。但是新的队列将在无限循环中在新队列中创建。因为它不断消耗未被确认的消息。
Even though TRUE is returned in the execute() function of the consumer. Which should acknowledge it, like it says in the documentation.
即使在消费者的execute()函数中返回TRUE。应该承认它,就像它在文档中所说的那样。
I am producing messages from a callback inside a consumer. This producer is injected using the standard Symfony DI.
我正在从消费者内部的回调中产生消息。使用标准Symfony DI注入该生产者。
If I remove the method that publishes the new message, the messages are acknowledged just fine...
如果我删除发布新消息的方法,则确认消息就好了......
services.yml
services.yml
services:
my_importlog_repository:
class: Doctrine\ORM\EntityRepository
factory_service: doctrine.orm.default_entity_manager
factory_method: getRepository
arguments: [AppBundle\Entity\MyImportlogEntity]
my_distributor:
class: AppBundle\DistributorImport\MyDistributor
arguments: [@my_importlog_repository,@logger,@old_sound_rabbit_mq.my_download_producer, %my_config%]
my_download:
class: AppBundle\Consumer\MyDownloadConsumer
arguments: [@logger,@old_sound_rabbit_mq.my_extract_producer,@my_distributor,%my_config%]
my_extract:
class: AppBundle\Consumer\MyExtractConsumer
arguments: [@logger,@old_sound_rabbit_mq.my_convert_producer,@my_distributor,%my_config%]
config.yml
config.yml
# rabbitmq
old_sound_rabbit_mq:
connections:
default:
host: '192.168.99.100'
port: 5672
user: 'guest'
password: 'guest'
vhost: '/'
lazy: false
connection_timeout: 60
read_write_timeout: 60
# requires php-amqplib v2.4.1+ and PHP5.4+
keepalive: false
# requires php-amqplib v2.4.1+
heartbeat: 30
producers:
# my producers
my_download:
connection: default
exchange_options: {name: 'distributor_import', type: direct}
queue_options: {name: 'my_download'}
my_extract:
connection: default
exchange_options: {name: 'distributor_import', type: direct}
queue_options: {name: 'my_extract'}
my_convert:
connection: default
exchange_options: {name: 'distributor_import', type: direct}
queue_options: {name: 'my_convert'}
consumers:
# my consumers
my_download:
connection: default
exchange_options: {name: 'distributor_import', type: direct}
queue_options: {name: 'my_download'}
callback: my_download
qos_options: {prefetch_size: 0, prefetch_count: 1, global: false}
idle_timeout: 60
my_extract:
connection: default
exchange_options: {name: 'distributor_import', type: direct}
queue_options: {name: 'my_extract'}
callback: my_extract
qos_options: {prefetch_size: 0, prefetch_count: 1, global: false}
idle_timeout: 60
MyDownloadConsumer.php
MyDownloadConsumer.php
<?php
namespace AppBundle\Consumer;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
class MyDownloadConsumer implements ConsumerInterface
{
private $logger;
private $producer;
private $distributor;
private $config;
public function __construct(\Symfony\Component\HttpKernel\Log\LoggerInterface $logger, \OldSound\RabbitMqBundle\RabbitMq\Producer $producer, \AppBundle\DistributorImport\MyDistributor $distributor, Array $config)
{
$this->logger = $logger;
$this->producer = $producer;
$this->distributor = $distributor;
$this->config = $config;
}
public function execute(\PhpAmqpLib\Message\AMQPMessage $message)
{
$data = unserialize($message->body);
$this->producer->publish(serialize($data));
return true;
}
}
If I remove
如果我删除
$data = unserialize($message->body);
$this->producer->publish(serialize($data));
It works like it should...
它的工作原理应该......
1 个解决方案
#1
1
Was able to publish a message from inside my Consumer execute() method, while also acknowledging the current message being consumed. Using the following code.
能够从我的Consumer execute()方法中发布消息,同时还确认当前正在消费的消息。使用以下代码。
$message->delivery_info['channel']
->basic_publish(
new AMQPMessage (serialize($data)),
'name_of_my_exchange',
'key.of.my.routing'
);
Publishing directly on the channel of the message that is being consumed.
直接在正在使用的消息的通道上发布。
#1
1
Was able to publish a message from inside my Consumer execute() method, while also acknowledging the current message being consumed. Using the following code.
能够从我的Consumer execute()方法中发布消息,同时还确认当前正在消费的消息。使用以下代码。
$message->delivery_info['channel']
->basic_publish(
new AMQPMessage (serialize($data)),
'name_of_my_exchange',
'key.of.my.routing'
);
Publishing directly on the channel of the message that is being consumed.
直接在正在使用的消息的通道上发布。