[gevent源码分析] c-ares异步DNS请求

时间:2022-04-17 00:17:49

c-ares是异步DNS请求库,libcurl,libevent,wireshark都使用了c-ares,gevent1.0版本前使用的是libevent,

所以它的DNS请求也是使用c-ares,1.0版本后使用cython封装了c-ares。

c-ares官方文档,http://c-ares.haxx.se/docs.html

gevent中DNS默认使用的是线程池版本的,可通过设置GEVENT_RESOLVER=ares环境变量使用c-ares异步库。


[gevent源码分析] c-ares异步DNS请求


[gevent源码分析] c-ares异步DNS请求


如何证明的确是异步呢,试着跑一遍你就知道了?

#coding=utf8import socket
import gevent
from gevent import get_hub
from gevent.resolver_ares import Resolver
r = get_hub().resolver = Resolver(servers=['8.8.8.8'])

def f(w):
print w,r.gethostbyname(w)
for w in ['www.google.com','www.baidu.com','www.apple.com']:
gevent.spawn(f,w)

gevent.sleep(6)


在开始之前先讲几个c-ares库函数,熟悉之后再看ares.pyx会有种和亲切的感觉。
cares.ares_library_init(cares.ARES_LIB_INIT_ALL)
初始化ares库,其实只对windows平台做了处理,主要是为了加载iphlpapi.dll,在非windows平台可不调用。
如果调用一定要在c-ares任何函数之前调用。

cares.ares_library_cleanup()
相对于cares.ares_library_init,在windows平台将释放iphlpapi.dll,非windows平台可不调用。
gevent中并没有调用该函数,作者在__dealloc__中也用?号表明了这一点,我不太理解,可能有更好的理由吧。

cares.ares_destroy(self.channel)
销毁channel,释放内存,关闭打开的socket,这是在__dealloc__中调用

cares.ares_init_options(&channel, &options, optmask)
这是ares中最核心的函数,用于初始化channel,options,optmask主要是通过channel的__init__构造

cdef public class channel [object PyGeventAresChannelObject, type PyGeventAresChannel_Type]:    def __init__(self, object loop, flags=None, timeout=None, tries=None, ndots=None,                 udp_port=None, tcp_port=None, servers=None):

参数说明:

flags用于控制一查询行为,如ARES_FLAG_USEVC,将只发送TCP请求(我们知道DNS既有TCP也有UDP)
ARES_FLAG_PRIMARY :只向第一个服务器发送请求,还有其它选项参考ares_init_options函数文档
timeout:指明第一次请求的超时时间,单位为秒,c-ares单位为毫秒,gevent会转换,第一次之后的超时c-area有它自己的算法
tries:请求尝试次数,默认4次
ndots:最少'.'的数量,默认是1,如果大于1,就直接查找域名,不然会和本地域名合并(init_by_environment设置本地域名)
udp_port,tcp_port:使用的udp,tcp端口号,默认53
servers:发送dns请求的服务器,见下面ares_set_servers
ndots多说一句,比如ping bifeng(这是我一同事的主机),检测发现没有'.'(也就是小于ndots),所以会把本地域给加上去该操作在ares_search.c中ares_search函数中

[gevent源码分析] c-ares异步DNS请求

cares.ares_set_servers(self.channel, cares.ares_addr_node* c_servers)
设置dns请求服务器,设置完成需要free掉c_servers的内存空间,因为ares_set_servers中重新malloc内存空间了。
在set_servers中,通过finally free内存空间

            c_servers = <cares.ares_addr_node*>malloc(sizeof(cares.ares_addr_node) * length)            if not c_servers:                raise MemoryError            try:                index = 0                for server in servers:                    ...                c_servers[length - 1].next = NULL                index = cares.ares_set_servers(self.channel, c_servers)                if index:                    raise ValueError(strerror(index))            finally:                free(c_servers)

你可能很好奇,c-ares是如何和gevent(libev)的socket关联起来的,因为DNS的本质也是
socket请求,所以底层也是需要使用操作系统提供的epoll等机制,而c-ares提供了socket状态变化的接口,
这就可以让c-ares运行在libev上面,所有的魔法其实都是ares_options.sock_state_cb向外提供的。

#ares.hstruct ares_options {  int flags;  int timeout; /* in seconds or milliseconds, depending on options */  int tries;  ....  ares_sock_state_cb sock_state_cb;  void *sock_state_cb_data;};ARES_OPT_SOCK_STATE_CB void (*sock_state_cb)(void *data, int s, int read, int write)
当dns socket状态改变时将回调sock_state_cb,而在channel的__init__中将sock_state_cb设置为gevent_sock_state_callback

def __init__(...)    options.sock_state_cb = <void*>gevent_sock_state_callback    options.sock_state_cb_data = <void*>selfcdef void gevent_sock_state_callback(void *data, int s, int read, int write):    if not data:        return    cdef channel ch = <channel>data    ch._sock_state_callback(s, read, write)
gevent_sock_state_callback只做了一件事就是调用channel的_sock_state_callback,并设置是读是写
    cdef _sock_state_callback(self, int socket, int read, int write):        if not self.channel:            return        cdef object watcher = self._watchers.get(socket)        cdef int events = 0        if read:            events |= EV_READ        if write:            events |= EV_WRITE        if watcher is None:            if not events:                return            watcher = self.loop.io(socket, events) #socket第一次,启动io watcher            self._watchers[socket] = watcher        elif events: #已有watcher,判断事件是否变化了            if watcher.events == events:                return            watcher.stop()            watcher.events = events #设置新状态        else:            watcher.stop()            self._watchers.pop(socket, None)            if not self._watchers:                self._timer.stop()            return #没有事件了,也就是都处理完了,将回调我们的最终回调函数(如调用gethostbyname时设置的回调)        watcher.start(self._process_fd, watcher, pass_events=True) #watcher设置回调        self._timer.again(self._on_timer) #让c-ares每秒处理一下超时和broken_connections
前面io wather的回调self._process_fd主要就是调用cares.ares_process_fd对指定的文件描述符继续处理,
cares.ARES_SOCKET_BAD代表该事件不做处理,其实也就是该事件已经处理完了。
    def _process_fd(self, int events, object watcher):        if not self.channel:            return        cdef int read_fd = watcher.fd #只处理的文件描述符        cdef int write_fd = read_fd        if not (events & EV_READ): #没有可读事件,将读fd设为"不处理"            read_fd = cares.ARES_SOCKET_BAD        if not (events & EV_WRITE): #没有可写事件,将写fd设为"不处理"            write_fd = cares.ARES_SOCKET_BAD        cares.ares_process_fd(self.channel, read_fd, write_fd)

其实到上面c-ares流程已经差不多了,最后会回调设置的最终回调,我们来看一下gethostbyname的操作
定义于resolver_ares.py的gethostbyname函数,调用的是gethostbyname_ex

   def gethostbyname_ex(self, hostname, family=AF_INET):        while True:            ares = self.ares            try:                waiter = Waiter(self.hub) #使用Waiter                ares.gethostbyname(waiter, hostname, family) #调用ares.gethostbyname,设置回调为waiter                result = waiter.get() #我们知道,waiter没有结果时会切换到hub,完美的和gevent结合起来                if not result[-1]:                    raise gaierror(-5, 'No address associated with hostname')                return result            except gaierror:                if ares is self.ares:                    raise

Waiter定义了__call__方法,所以可以直接作为回调函数
ares.gethostbyname主要就是调用了cares.ares_gethostbyname(self.channel, name, family, <void*>gevent_ares_host_callback, <void*>arg)
当DNS请求成功或失败都会回调gevent_ares_host_callback
而gevent_ares_host_callback会回调上面的waiter,并把结果传给waiter,这边可以自己看下代码,比较简单。

waiter.__call__会switch到之前切换的greenlet,即前面的waiter.get()处,此时将返回result,gethostbyname成功执行。

这里还有一个问题,c-ares什么时候认为socket的状态改变了?

#define SOCK_STATE_CALLBACK(c, s, r, w)                                 \  do {                                                                  \    if ((c)->sock_state_cb)                                             \      (c)->sock_state_cb((c)->sock_state_cb_data, (s), (r), (w));       \  } WHILE_FALSE
在c-ares中状态改变回调是通过SOCK_STATE_CALLBACK宏实现的,我们可以搜索一下这个宏你就明白了。

我们可以看一下open_tcp_socket,这是在刚开始发送tcp时调用的。

static int open_tcp_socket(ares_channel channel, struct server_state *server){  ......  /* Acquire a socket. */  s = socket(server->addr.family, SOCK_STREAM, 0);  //创建socket    /* Configure it. */ configure_socket(s, server->addr.family, channel); //配置 #ifdef TCP_NODELAY  /*   * Disable the Nagle algorithm (only relevant for TCP sockets, and thus not   * in configure_socket). In general, in DNS lookups we're pretty much   * interested in firing off a single request and then waiting for a reply,   * so batching isn't very interesting.   */  opt = 1;  if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY,                 (void *)&opt, sizeof(opt)) == -1) //判断是否使用TCP_NODELAY    {       sclose(s);       return -1;    }#endif  /* Connect to the server. */  if (connect(s, sa, salen) == -1)  // 连接DNS服务器    {      int err = SOCKERRNO;      if (err != EINPROGRESS && err != EWOULDBLOCK)        {          sclose(s);          return -1;        }    }  SOCK_STATE_CALLBACK(channel, s, 1, 0); // 连接后,状态肯定改变,肯定有读事件,所有read_fd设为1,自然地调用了状态改变函数  server->tcp_buffer_pos = 0;  server->tcp_socket = s;  server->tcp_connection_generation = ++channel->tcp_connection_generation;  return 0;}
也就是说c-ares值关注刚开始的状态变化,也就是连接后“读”事件,中间的状态改变就全部交给gevent了。

当然当查询结束,或area channel被destory或cancel时,你还需要告诉gevent已经没有关注事件了,这个是在

ares__close_sockets函数中实现的。


c-ares真的很美,仅仅通过提供几个接口,就可以让自己和其它的框架完美结合,very nice!!!

我之前就是很好奇c-ares的运行方式,内部DNS细节可能并不关注,关注的就是结合问题,花了不少时间研究,

主要是我在网上找不到c-ares的example,这让我郁闷了半天,这么使用广泛的库怎么没有人研究呢?