问题描述
在消费rabbitMQ队列时, 每次进入回调函数内需要进行一些比较耗时的操作;操作完成后给rabbitMQ server发送ack信号以dequeue本条消息。
问题就发生在发送ack操作时, 程序提示链接已被断开或socket error。
源码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
#!/usr/bin
#coding: utf-8
import pika
import time
USER = 'guest'
PWD = 'guest'
TEST_QUEUE = 'just4test'
def callback(ch, method, properties, body):
print (body)
time.sleep( 600 )
ch.basic_publish('', routing_key = TEST_QUEUE, body = "fortest" )
ch.basic_ack(delivery_tag = method.delivery_tag)
def test_main():
s_conn = pika.BlockingConnection(
pika.ConnectionParameters( '127.0.0.1' ,
credentials = pika.PlainCredentials(USER, PWD)))
chan = s_conn.channel()
chan.queue_declare(queue = TEST_QUEUE)
chan.basic_publish('', routing_key = TEST_QUEUE, body = "fortest" )
chan.basic_consume(callback, queue = TEST_QUEUE)
chan.start_consuming()
if __name__ = = "__main__" :
test_main()
|
运行一段时间后, 就会报错:
1
2
3
|
[ERROR][pika.adapters.base_connection][ 2017 - 08 - 18 12 : 33 : 49 ]Error event 25 , None
[CRITICAL][pika.adapters.base_connection][ 2017 - 08 - 18 12 : 33 : 49 ]Tried to handle an error where no error existed
[ERROR][pika.adapters.base_connection][ 2017 - 08 - 18 12 : 33 : 49 ]Fatal Socket Error: BrokenPipeError( 32 , 'Broken pipe' )
|
问题排查
猜测:pika客户端没有及时发送心跳,连接被server断开
一开始修改了heartbeat_interval参数值, 示例如下:
1
2
3
4
5
6
7
|
def test_main():
s_conn = pika.BlockingConnection(
pika.ConnectionParameters( '127.0.0.1' ,
heartbeat_interval = 10 ,
socket_timeout = 5 ,
credentials = pika.PlainCredentials(USER, PWD)))
# ....
|
修改后运行依然报错,后来想想应该单线程被一直占用,pika无法发送心跳;
于是又加了个心跳线程, 示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
#!/usr/bin
#coding: utf-8
import pika
import time
import logging
import threading
USER = 'guest'
PWD = 'guest'
TEST_QUEUE = 'just4test'
class Heartbeat(threading.Thread):
def __init__( self , connection):
super (Heartbeat, self ).__init__()
self .lock = threading.Lock()
self .connection = connection
self .quitflag = False
self .stopflag = True
self .setDaemon( True )
def run( self ):
while not self .quitflag:
time.sleep( 10 )
self .lock.acquire()
if self .stopflag :
self .lock.release()
continue
try :
self .connection.process_data_events()
except Exception as ex:
logging.warn( "Error format: %s" % ( str (ex)))
self .lock.release()
return
self .lock.release()
def startHeartbeat( self ):
self .lock.acquire()
if self .quitflag = = True :
self .lock.release()
return
self .stopflag = False
self .lock.release()
def callback(ch, method, properties, body):
logging.info( "recv_body:%s" % body)
time.sleep( 600 )
ch.basic_ack(delivery_tag = method.delivery_tag)
def test_main():
s_conn = pika.BlockingConnection(
pika.ConnectionParameters( '127.0.0.1' ,
heartbeat_interval = 10 ,
socket_timeout = 5 ,
credentials = pika.PlainCredentials(USER, PWD)))
chan = s_conn.channel()
chan.queue_declare(queue = TEST_QUEUE)
chan.basic_consume(callback,
queue = TEST_QUEUE)
heartbeat = Heartbeat(s_conn)
heartbeat.start() #开启心跳线程
heartbeat.startHeartbeat()
chan.start_consuming()
if __name__ = = "__main__" :
test_main()
|
尝试运行,结果还是不行,不得不安静下来思考自己是不是想错了。
去看它的api,看到heartbeat_interval的解析:
1
2
3
4
|
:param int heartbeat_interval: How often to send heartbeats.
Min between this value and server's proposal
will be used. Use 0 to deactivate heartbeats
and None to accept server's proposal.
|
按这样说法,应该还是没有把心跳值给设置好。上面的程序期望是10秒发一次心跳,但是理论上发送心跳的间隔会比10秒多一点。所以艾玛,我应该是把heartbeat_interval的作用搞错了, 它是指超过这个时间间隔不发心跳或不给server任何信息,server就会断开连接, 而不是说pika会按这个间隔来发心跳。 结果我把heartbeat_interval值设置高一点(比实际发送心跳/信息的间隔更长),比如上面设置成60秒,就正常运行了。
如果不指定heartbeat_interval, 它默认为None, 意味着按rabbitMQ server的配置来检测心跳是否正常。
如果设置heartbeat_interval=0, 意味着不检测心跳,server端将不会主动断开连接。
以上这篇解决python3 pika之连接断开的问题就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/moxiaomomo/article/details/77414831