Python进程间通信和网络基础
Python支持多种进程间通讯的方式, 有单机通信的signal和mmap等, 也有可以通过网络的socket方式, 这里先介绍select等的有关知识, socket相关的后面再看.
select
Python中支持多种select()和poll()的实现方式.
- devpoll() : Solaris and derivatives
- epoll() : Linux 2.5+
- kqueue() : available on most BSD.
在Windows上, select only works for sockets; on other operating systems, it also works for other file types (in particular, on Unix, it works on pipes).
切忌 : 不能使用在普通文件上来看是否有可读可写事件.
Not used on regular files to determine whether a file has grown since it was last read.
在Python中还有一个更高级的module: selectors, 除非需要精确控制os原语级别否则建议使用selectors.
本质上select还是实现的同步操作, 只是做了非阻塞的角色. 通过对多路IO的转接, 变相提高了IO的效率, 还不是完善的异步IO. 还是在内核IO的输入输出信号上做文章.
select是直接映射到Unix上面的select系统调用的, 所以就是在c层做了一个参数的转换然后直接进入Unix的selec和epoll的调用, 最终将系统调用的结果包装成Python对象再返回回来.
需要认真介绍的部分有:
- select.select()
- select.poll()
- epoll.poll()
其中:
select.select(rlist, wlist, xlist[, timeout])
This is a straightforward interface to the Unix select() system call. The first three arguments are sequences of ‘waitable objects’: either integers representing file descriptors or objects with a parameterless method named fileno() returning such an integer:
rlist: wait until ready for reading
wlist: wait until ready for writing
xlist: wait for an “exceptional condition” (see the manual page for what your system considers such a condition)
和Unix的系统调用一样, select()前三个参数是三个list, 包含一个fileNo的三种状态, 可读, 可写, 异常. 本质上类似一个大的bitmap
fileNo1 | fileNo2 | .... | |
---|---|---|---|
read list | 0 | 1 | ... |
write list | 0 | 0 | ... |
except list | 1 | 0 | ... |
select函数都是靠自己调用来得到结果的, 所以一般都是将select函数放在一个循环中, 还是一个轮询系统的做法.
如果内核准备好资源则会改写select中相应fd list中的值.
对于第四个参数, timeout表示了select在资源没有准备好之前需要等待的时间.
- 如果timeout == null, 则select在资源准备好之前不会返回, 一直等待. 但是在有中断信号来的时候会返回-1(系统调用返回-1), 并且置OSError
- 如果timeout == 0, 那么就会立即返回. 不会阻塞select函数. 就一直做轮询.
- 如果timeout != 0, 如果在超时时间到来前有资源准备好, 那么则返回, 或者等到超时, select也返回, 如果这个时候没有可用资源, 那么select返回0. 同时也会接受中断信号.
Empty sequences are allowed, but acceptance of three empty sequences is platform-dependent. (It is known to work on Unix but not on Windows.) The optional timeout argument specifies a time-out as a floating point number in seconds. When the timeout argument is omitted the function blocks until at least one file descriptor is ready. A time-out value of zero specifies a poll and never blocks.
The return value is a triple of lists of objects that are ready: subsets of the first three arguments. When the time-out is reached without a file descriptor becoming ready, three empty lists are returned.
在底层转化到Python对象时, 将系统调用的返回值替换成了三个lists, 分别对应读写异常三个list, 并且返回的是有效的fd lsit子集.
Among the acceptable object types in the sequences are Python file objects (e.g. sys.stdin, or objects returned by open() or os.popen()), socket objects returned by socket.socket(). You may also define a wrapper class yourself, as long as it has an appropriate fileno() method (that really returns a file descriptor, not just a random integer).
Note File objects on Windows are not acceptable, but sockets are. On Windows, the underlying select() function is provided by the WinSock library, and does not handle file descriptors that don’t originate from WinSock.
注意: 在linux下, python的所有文件描述符都可以传递进来, 但是在win下只能允许socket.
Changed in version 3.5: The function is now retried with a recomputed timeout when interrupted by a signal, except if the signal handler raises an exception (see PEP 475 for the rationale), instead of raising InterruptedError.
select.poll()
现在在任何操作系统上都不再提供, 本身也是select和epoll之间的过渡产物.
epoll.poll()
epoll和poll基本的思路上是一样的, 就是通过一次传入一个包含fd及对应感兴趣事件的结构体列表, 一次拷贝至内核中, 当有相应的事件时回吐出对应的结构体即可. 相比select做到的优势是每次调用不需要将所有的fd list拷贝至内核, 另一方面的优势是select必须遍历一个list所有的fd才知道哪些有效, 而epoll只吐出对应感兴趣且准备好的fd_event结构体列表, 可以减小遍历次数.
在linux中epoll只有三个API:
#include <sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
逐一详解:
int epoll_create(int size);
创建一个epoll句柄, 用来监听其他的fd, size告诉内核监听fd的数量. 和select的监听数量+1不同. 一旦创建好这个句柄, 它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。-
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll的事件注册函数,它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。
第一个参数是epoll_create()的返回值,第二个参数表示动作,用三个宏来表示:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;
第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
- int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
等待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。
python中对于这个接口的调用也是直接映射到底层的系统应用的, 返回一个(fd, events)构成的tuple列表.
selectors
Python中的高级IO Multiplexing模块.
类继承的关系如下:
BaseSelector
+-- SelectSelector
+-- PollSelector
+-- EpollSelector
+-- DevpollSelector
+-- KqueueSelector
selectors包装了select模块的内容
class selectors.SelectorKey
A SelectorKey is a namedtuple used to associate a file object to its underlying file descriptor, selected event mask and attached data. It is returned by several BaseSelector methods.
使用一个namedtuple保存每一个fd及对应的event信息.
fileobj
File object registered.
fd
Underlying file descriptor.
events
Events that must be waited for on this file object.
data
Optional opaque data associated to this file object: for example, this could be used to store a per-client session ID.
基类定义了整个模型的对外接口, 主要是三个调用register,unregister,select方法.
import selectors
import socket
sel = selectors.DefaultSelector()
def accept(sock, mask):
conn, addr = sock.accept() # Should be ready
print('accepted', conn, 'from', addr)
conn.setblocking(False)
sel.register(conn, selectors.EVENT_READ, read)
def read(conn, mask):
data = conn.recv(1000) # Should be ready
if data:
print('echoing', repr(data), 'to', conn)
conn.send(data) # Hope it won't block
else:
print('closing', conn)
sel.unregister(conn)
conn.close()
sock = socket.socket()
sock.bind(('localhost', 1234))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
在第一次注册的时候是注册accept的, 但是当conn连接以后还需要监听conn的可读事件, 所以需要再次注册一个fd.
这里把data作为callback来用的.