c-ares是异步DNS请求库,libcurl,libevent,wireshark都使用了c-ares,gevent1.0版本前使用的是libevent,
所以它的DNS请求也是使用c-ares,1.0版本后使用cython封装了c-ares。
c-ares官方文档,http://c-ares.haxx.se/docs.html。
gevent中DNS默认使用的是线程池版本的,可通过设置GEVENT_RESOLVER=ares环境变量使用c-ares异步库。
如何证明的确是异步呢,试着跑一遍你就知道了?
#coding=utf8import socket
import gevent
from gevent import get_hub
from gevent.resolver_ares import Resolver
r = get_hub().resolver = Resolver(servers=['8.8.8.8'])
def f(w):
print w,r.gethostbyname(w)
for w in ['www.google.com','www.baidu.com','www.apple.com']:
gevent.spawn(f,w)
gevent.sleep(6)
cares.ares_library_init(cares.ARES_LIB_INIT_ALL)
初始化ares库,其实只对windows平台做了处理,主要是为了加载iphlpapi.dll,在非windows平台可不调用。
如果调用一定要在c-ares任何函数之前调用。
cares.ares_library_cleanup()
相对于cares.ares_library_init,在windows平台将释放iphlpapi.dll,非windows平台可不调用。
gevent中并没有调用该函数,作者在__dealloc__中也用?号表明了这一点,我不太理解,可能有更好的理由吧。
cares.ares_destroy(self.channel)
销毁channel,释放内存,关闭打开的socket,这是在__dealloc__中调用
cares.ares_init_options(&channel, &options, optmask)
这是ares中最核心的函数,用于初始化channel,options,optmask主要是通过channel的__init__构造
cdef public class channel [object PyGeventAresChannelObject, type PyGeventAresChannel_Type]: def __init__(self, object loop, flags=None, timeout=None, tries=None, ndots=None, udp_port=None, tcp_port=None, servers=None):
参数说明:
flags用于控制一查询行为,如ARES_FLAG_USEVC,将只发送TCP请求(我们知道DNS既有TCP也有UDP)
ARES_FLAG_PRIMARY :只向第一个服务器发送请求,还有其它选项参考ares_init_options函数文档
timeout:指明第一次请求的超时时间,单位为秒,c-ares单位为毫秒,gevent会转换,第一次之后的超时c-area有它自己的算法
tries:请求尝试次数,默认4次
ndots:最少'.'的数量,默认是1,如果大于1,就直接查找域名,不然会和本地域名合并(init_by_environment设置本地域名)
udp_port,tcp_port:使用的udp,tcp端口号,默认53
servers:发送dns请求的服务器,见下面ares_set_servers
ndots多说一句,比如ping bifeng(这是我一同事的主机),检测发现没有'.'(也就是小于ndots),所以会把本地域给加上去该操作在ares_search.c中ares_search函数中
cares.ares_set_servers(self.channel, cares.ares_addr_node* c_servers)
设置dns请求服务器,设置完成需要free掉c_servers的内存空间,因为ares_set_servers中重新malloc内存空间了。
在set_servers中,通过finally free内存空间
c_servers = <cares.ares_addr_node*>malloc(sizeof(cares.ares_addr_node) * length) if not c_servers: raise MemoryError try: index = 0 for server in servers: ... c_servers[length - 1].next = NULL index = cares.ares_set_servers(self.channel, c_servers) if index: raise ValueError(strerror(index)) finally: free(c_servers)
你可能很好奇,c-ares是如何和gevent(libev)的socket关联起来的,因为DNS的本质也是
socket请求,所以底层也是需要使用操作系统提供的epoll等机制,而c-ares提供了socket状态变化的接口,
这就可以让c-ares运行在libev上面,所有的魔法其实都是ares_options.sock_state_cb向外提供的。
#ares.hstruct ares_options { int flags; int timeout; /* in seconds or milliseconds, depending on options */ int tries; .... ares_sock_state_cb sock_state_cb; void *sock_state_cb_data;};ARES_OPT_SOCK_STATE_CB void (*sock_state_cb)(void *data, int s, int read, int write)当dns socket状态改变时将回调sock_state_cb,而在channel的__init__中将sock_state_cb设置为gevent_sock_state_callback
def __init__(...) options.sock_state_cb = <void*>gevent_sock_state_callback options.sock_state_cb_data = <void*>selfcdef void gevent_sock_state_callback(void *data, int s, int read, int write): if not data: return cdef channel ch = <channel>data ch._sock_state_callback(s, read, write)gevent_sock_state_callback只做了一件事就是调用channel的_sock_state_callback,并设置是读是写
cdef _sock_state_callback(self, int socket, int read, int write): if not self.channel: return cdef object watcher = self._watchers.get(socket) cdef int events = 0 if read: events |= EV_READ if write: events |= EV_WRITE if watcher is None: if not events: return watcher = self.loop.io(socket, events) #socket第一次,启动io watcher self._watchers[socket] = watcher elif events: #已有watcher,判断事件是否变化了 if watcher.events == events: return watcher.stop() watcher.events = events #设置新状态 else: watcher.stop() self._watchers.pop(socket, None) if not self._watchers: self._timer.stop() return #没有事件了,也就是都处理完了,将回调我们的最终回调函数(如调用gethostbyname时设置的回调) watcher.start(self._process_fd, watcher, pass_events=True) #watcher设置回调 self._timer.again(self._on_timer) #让c-ares每秒处理一下超时和broken_connections前面io wather的回调self._process_fd主要就是调用cares.ares_process_fd对指定的文件描述符继续处理,
cares.ARES_SOCKET_BAD代表该事件不做处理,其实也就是该事件已经处理完了。
def _process_fd(self, int events, object watcher): if not self.channel: return cdef int read_fd = watcher.fd #只处理的文件描述符 cdef int write_fd = read_fd if not (events & EV_READ): #没有可读事件,将读fd设为"不处理" read_fd = cares.ARES_SOCKET_BAD if not (events & EV_WRITE): #没有可写事件,将写fd设为"不处理" write_fd = cares.ARES_SOCKET_BAD cares.ares_process_fd(self.channel, read_fd, write_fd)
其实到上面c-ares流程已经差不多了,最后会回调设置的最终回调,我们来看一下gethostbyname的操作
定义于resolver_ares.py的gethostbyname函数,调用的是gethostbyname_ex
def gethostbyname_ex(self, hostname, family=AF_INET): while True: ares = self.ares try: waiter = Waiter(self.hub) #使用Waiter ares.gethostbyname(waiter, hostname, family) #调用ares.gethostbyname,设置回调为waiter result = waiter.get() #我们知道,waiter没有结果时会切换到hub,完美的和gevent结合起来 if not result[-1]: raise gaierror(-5, 'No address associated with hostname') return result except gaierror: if ares is self.ares: raise
Waiter定义了__call__方法,所以可以直接作为回调函数
ares.gethostbyname主要就是调用了cares.ares_gethostbyname(self.channel, name, family, <void*>gevent_ares_host_callback, <void*>arg)
当DNS请求成功或失败都会回调gevent_ares_host_callback
而gevent_ares_host_callback会回调上面的waiter,并把结果传给waiter,这边可以自己看下代码,比较简单。
waiter.__call__会switch到之前切换的greenlet,即前面的waiter.get()处,此时将返回result,gethostbyname成功执行。
这里还有一个问题,c-ares什么时候认为socket的状态改变了?
#define SOCK_STATE_CALLBACK(c, s, r, w) \ do { \ if ((c)->sock_state_cb) \ (c)->sock_state_cb((c)->sock_state_cb_data, (s), (r), (w)); \ } WHILE_FALSE在c-ares中状态改变回调是通过SOCK_STATE_CALLBACK宏实现的,我们可以搜索一下这个宏你就明白了。
我们可以看一下open_tcp_socket,这是在刚开始发送tcp时调用的。
static int open_tcp_socket(ares_channel channel, struct server_state *server){ ...... /* Acquire a socket. */ s = socket(server->addr.family, SOCK_STREAM, 0); //创建socket /* Configure it. */ configure_socket(s, server->addr.family, channel); //配置 #ifdef TCP_NODELAY /* * Disable the Nagle algorithm (only relevant for TCP sockets, and thus not * in configure_socket). In general, in DNS lookups we're pretty much * interested in firing off a single request and then waiting for a reply, * so batching isn't very interesting. */ opt = 1; if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)) == -1) //判断是否使用TCP_NODELAY { sclose(s); return -1; }#endif /* Connect to the server. */ if (connect(s, sa, salen) == -1) // 连接DNS服务器 { int err = SOCKERRNO; if (err != EINPROGRESS && err != EWOULDBLOCK) { sclose(s); return -1; } } SOCK_STATE_CALLBACK(channel, s, 1, 0); // 连接后,状态肯定改变,肯定有读事件,所有read_fd设为1,自然地调用了状态改变函数 server->tcp_buffer_pos = 0; server->tcp_socket = s; server->tcp_connection_generation = ++channel->tcp_connection_generation; return 0;}也就是说c-ares值关注刚开始的状态变化,也就是连接后“读”事件,中间的状态改变就全部交给gevent了。
当然当查询结束,或area channel被destory或cancel时,你还需要告诉gevent已经没有关注事件了,这个是在
ares__close_sockets函数中实现的。
c-ares真的很美,仅仅通过提供几个接口,就可以让自己和其它的框架完美结合,very nice!!!
我之前就是很好奇c-ares的运行方式,内部DNS细节可能并不关注,关注的就是结合问题,花了不少时间研究,
主要是我在网上找不到c-ares的example,这让我郁闷了半天,这么使用广泛的库怎么没有人研究呢?