1. 摘要
一个项目中,由于Python中某些module对python3和python2的支持不同,必须将一部分代码运行在python2.7环境中,另一部分代码运行在python3.5环境中
- deuces:只支持python2
- tensorflow:只支持python3
那这两部分代码如何进行数据传输呢?
- 用message queue
- 于是选择了轻量级的ZMQ(它对python2和python3都支持的很好)
ZMQ的入门也很简单,查看这个示例就能在几分钟内用起来。但在写稍微复杂一点的通信过程时,就经常遇到下面这个报错:
zmq.error.ZMQError: Operation cannot be accomplished in current state
这个报错也很简单,打了很多log也无法看出具体问题所在。
这就是背景,下面详细讲解如何重现与解决这个问题。
2. ZMQ介绍
从[1]中可知,ZMQ是一个简单好用的传输层,像框架一样的一个 socket library,他使得 Socket 编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ 的明确目标是“成为标准网络协议栈的一部分,之后进入 Linux 内核。
与 RabbitMQ 相比,ZMQ 并不像是一个传统意义上的消息队列服务器,事实上,它也根本不是一个服务器,它更像是一个底层的网络通讯库,在 Socket API 之上做了一层封装,将网络通讯、进程通讯和线程通讯抽象为统一的 API 接口。
ZMQ的编程示例可参考示例。
3. 导致错误的由来
下面我们写一个能重新该问题的简单程序:
服务端程序如下,它主要在一个死循环中不断接受信息,并根据信息内容,做出相应的回复/或不回复。
import zmq
# server
context = ()
socket = context.socket()
socket.bind('tcp://127.0.0.1:5555')
while True:
msg = socket.recv()
print('get msg={0}'.format(msg))
if msg == 'test':
socket.send_string('done')
print('reply done')
else:
print('reply nothing')
客户端程序如下,它不断向服务端程序发送消息
import zmq
# client
context = ()
socket = context.socket()
socket.connect('tcp://127.0.0.1:5555')
socket.send('test')
socket.send('test1')
socket.send('test2')
print msg
先运行服务端程序,在运行客户端程序,能看到两个程序都有报错:
zmq.error.ZMQError: Operation cannot be accomplished in current state
这是怎么回事呢?
4. 错误的原因
通过报错打印的trace看到,在客户端程序中,报错相关信息为:
Traceback (most recent call last):
File "zmq_client.py", line 8, in <module>
socket.send('test1')
可见,在报错前,已经成功通过('test')
将信息送出。
在服务端程序中,报错时的打印内容为:
get msg=b'test'
reply nothing
Traceback (most recent call last):
File "zmq_server.py", line 9, in <module>
msg = socket.recv()
可见,在报错前,已经成功收到了’test’信息。
所以,问题并不是trace打印出来的recv()
和send()
使用错误。而是[3]中提到的编程模式问题。
回到程序中,我们可以看到,这里使用的zmq模式为。在这种模式下,我们的程序必须要遵守
recv()
和send()
配对使用的编程模式。
也就是说,在服务程序中,必须要有完整的recv()
和send()
成对出现。同理,在客户端程序中,send()
后,也要有recv()
。
5. 如何修复错误
只要recv()
和send()
配对使用,就能解决这个问题。
修复后的服务端程序:
import zmq
# server
context = ()
socket = context.socket()
socket.bind('tcp://127.0.0.1:5555')
while True:
msg = socket.recv()
print('get msg={0}'.format(msg))
if msg == 'test':
socket.send_string('done')
print('reply done')
else:
socket.send_string('nothing')# fixing for recv-send pair
print('reply nothing')
修复后的客户端程序:
import zmq
# client
context = ()
socket = context.socket()
socket.connect('tcp://127.0.0.1:5555')
socket.send('test')
socket.recv()# fixing for recv-send pair
socket.send('test1')
socket.recv()# fixing for recv-send pair
socket.send('test2')
socket.recv()# fixing for recv-send pair
程序中加了注释的地方,就是改动/增加的代码fixing
参考
- [1] /eric_sunah/article/details/51384116
- [2] /questions/48414559/pyzmq-req-rep-multiple-clients-zmqerror-with-polling
- [3] /questions/49056602/zeromq-operation-throws-exc-operation-cannot-be-accomplished-in-current-state