RabbitMQ消费者连接在空闲90秒后死亡

时间:2022-10-11 15:30:49

I have a RabbitMQ task queue and a Pika consumer to consume these tasks (with acks). The problem is that the connection dies after 90 seconds Idle but my tasks will often take longer than that. That means that while tasks are still being computed they are returned to the task queue and never acked.

我有一个RabbitMQ任务队列和一个Pika消费者来使用这些任务(使用acks)。问题是连接在90秒空闲后死亡但我的任务通常需要更长的时间。这意味着,当仍在计算任务时,它们将返回到任务队列并且永远不会被激活。

Using RabbitMQ 3.5.3 and Pika 0.9.14 with the channel.basic_consume() method. The connection has a heartbeat_interval of 30 seconds.

使用带有channel.basic_consume()方法的RabbitMQ 3.5.3和Pika 0.9.14。该连接的heartbeat_interval为30秒。

Consume code:

import pika
from time import sleep

RABBITMQ_URL = "amqp://user:pass@my-host.com/my_virtual_host?heartbeat_interval=30"
QUEUE_NAME = "my_queue"


def callback(ch, method, properties, body):
    print body
    sleep(91)  # if sleep value < 90 this code works (even 89)
    ch.basic_ack(delivery_tag=method.delivery_tag)


parameters = pika.URLParameters(RABBITMQ_URL)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME, durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue=QUEUE_NAME)
channel.start_consuming()

Traceback:

Traceback (most recent call last):
  File "main.py", line 19, in <module>
    channel.basic_consume(callback, queue=QUEUE_NAME)
  File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 221, in basic_consume
    {'consumer_tag': consumer_tag})])
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 1143, in _rpc
    self.connection.process_data_events()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 240, in process_data_events
    if self._handle_read():
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 347, in _handle_read
    if self._read_poller.ready():
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 43, in inner
    return f(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 89, in ready
    self.poll_timeout)
select.error: (9, 'Bad file descriptor')

1 个解决方案

#1


2  

The problem here is that because you are sleeping for so long, pika cannot respond to heartbeat requests from RabbitMQ, and when this happens, RabbitMQ will close the connection.

这里的问题是因为你睡了这么久,pika无法响应来自RabbitMQ的心跳请求,当发生这种情况时,RabbitMQ将关闭连接。

The only way around this is to either disable heartbeats or sleep in smaller intervals and run process_data_events() continuously so that pika can handle the heartbeats.

解决这个问题的唯一方法是禁用心跳或以较小的间隔睡眠,并连续运行process_data_events(),以便鼠兔可以处理心跳。

e.g. something like this

例如这样的事情

def amqp_sleep(connection, time_to_sleep=20):
    remaining = time_to_sleep
    while remaining > 0:
        connection.process_data_events()
        time.sleep(5)
        remaining -= 5

Personally though I would go for a library that automatically handles the heartbeats in the background so you don't have to deal with them, e.g. rabbitpy or my own amqp-storm.

就个人而言,我会去一个自动处理后台心跳的图书馆,所以你不必处理它们,例如兔子或我自己的amqp风暴。

#1


2  

The problem here is that because you are sleeping for so long, pika cannot respond to heartbeat requests from RabbitMQ, and when this happens, RabbitMQ will close the connection.

这里的问题是因为你睡了这么久,pika无法响应来自RabbitMQ的心跳请求,当发生这种情况时,RabbitMQ将关闭连接。

The only way around this is to either disable heartbeats or sleep in smaller intervals and run process_data_events() continuously so that pika can handle the heartbeats.

解决这个问题的唯一方法是禁用心跳或以较小的间隔睡眠,并连续运行process_data_events(),以便鼠兔可以处理心跳。

e.g. something like this

例如这样的事情

def amqp_sleep(connection, time_to_sleep=20):
    remaining = time_to_sleep
    while remaining > 0:
        connection.process_data_events()
        time.sleep(5)
        remaining -= 5

Personally though I would go for a library that automatically handles the heartbeats in the background so you don't have to deal with them, e.g. rabbitpy or my own amqp-storm.

就个人而言,我会去一个自动处理后台心跳的图书馆,所以你不必处理它们,例如兔子或我自己的amqp风暴。