背景
有次在处理一个客户需求时,需要我这里开一个tcp的服务端,然后对方将图片进行base64编码后发送过来,由于图片比较大,再加上base64编码,结果导致整个报文长度达到1500KB,然后我用libevent进行接收的时候,我原以为进入回调后可以直接从evbuffer中将这1500KB的报文一次性获取出来,但事实不是这样的,进入回调后只能获取到2920字节的报文,然后我判断长度不对,直接return,神奇的是这次获取到了7016Byte,比上一次进入回调多了4096字节,然后再return它又会再次进入回调,这次又会比上次能多获取到4096字节,以此类推,对方只发送了一个1500KB的报文,我这里一共反复进了300多次回调才把整个报文接收完成(1500KB接收完成了就不会再进入回调了)。
现在有两个问题:
1,为什么第一次进入回调只能接收到2920字节?
2,为什么每次进入回调只能比上次多获取4096字节,不能一次性全获取出来吗?
对于第一个问题,我用wireshark抓包发现,由于1500KB的报文太大,对方发送过来的时候TCP底层已经对这个1500KB的报文进行了拆包发送处理,刚刚好第一个包的大小就是2920字节,这时候libevent检测到有数据后就第一时间进入了read回调,而且进入回调后我获取当前socket缓冲区可读的字节大小时发现居然是0,也就是说这2920字节发送过来后libevent就第一时间将这些数据从socket缓冲区读取出来并放到evbuffer中然后立即进入了read回调(我是在进入read回调的第一行代码就是获取当前socket缓冲区的可读字节数,代码如下),然后再次进入回调的时候libevent又从缓冲区读取了4096字节出来放到evbuffer中,这时候我再获取socket缓冲区可读字节数的时候发现是8192字节,这说明socket缓冲区的默认大小是8KB。
#define EVBUFFER_MAX_READ 40960 //获取当前socket缓冲区有多少字节数据,入参是socket连接描述符 static int get_n_bytes_readable_on_socket(evutil_socket_t fd) { unsigned long lng = EVBUFFER_MAX_READ; if (ioctlsocket(fd, FIONREAD, &lng) < 0) return -1; return (int)lng; }
梳理的过程如下:
→客户端发送1500KB报文
→客户端的TCP底层拆包发送
→服务端获取到第一个包大小时2920字节
→libevent获取到2920字节并进入read回调(此时不把这2920字节数据取出来,直接return,让数据还保存在evbuffer中)
→libevent再从socket缓冲区获取4096字节并进入回调(此时还有1400多KB的数据不知道存在哪里,反正不在socket缓冲区里面,因为socket缓冲区默认只有8KB)
→libevent从缓冲区取出4096字节后,缓冲区有了4KB的空闲空间,这时候TCP底层又往缓冲区里面放了4KB数据
→libevent再从缓冲区取出4KB放到evbuffer并进入回调,TCP底层又再往缓冲区放KB
→以此类推,直到libevent把这1500KB的数据全取出来放到evbuffer中。
分析到这里就发现限制每次进入回调获取数据长度只能是4096字节的原因有两个:
1,socket缓冲区的大小,默认只有8KB。
2,libevent自身只能最大获取4096字节,估计libevent源码里面限制了,要修改libevent源码。
修改socket缓冲区大小简单,只要在libevent的accept回调里面设置就行了,代码如下
int nRecvBufLen = 128 * 1024; //设置为K setsockopt( fd, SOL_SOCKET, SO_RCVBUF, ( const char* )&nRecvBufLen, sizeof( int ) );
那么如何修改libevent源码呢?
首先必须要直到libevent接收到数据后的处理流程,如下
TCP底层接收数据→调用evbuffer_read来从缓冲区读取数据→进入read回调。需要说明的是evbuffer_read函数的调用时libevent自己调用了,我们是不管的,我们只需要在进入回调后处理业务数据就行了。
直奔主题,修改evbuffer_read函数
int evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch) { struct evbuffer_chain **chainp; int n; int result; #ifdef USE_IOVEC_IMPL int nvecs, i, remaining; #else struct evbuffer_chain *chain; unsigned char *p; #endif EVBUFFER_LOCK(buf); if (buf->freeze_end) { result = -1; goto done; } //3,重点来了,n是当前缓冲区的的字节数,EVBUFFER_MAX_READ的值就是4096,改大,但是还没完 n = get_n_bytes_readable_on_socket(fd); if (n <= 0 || n > EVBUFFER_MAX_READ) n = EVBUFFER_MAX_READ; if (howmuch < 0 || howmuch > n) //4,howmuch是入参,还需要将howmuch改大才行 howmuch = n; #ifdef USE_IOVEC_IMPL /* Since we can use iovecs, we're willing to use the last * NUM_READ_IOVEC chains. */ if (evbuffer_expand_fast_(buf, howmuch, NUM_READ_IOVEC) == -1) { result = -1; goto done; } else { IOV_TYPE vecs[NUM_READ_IOVEC]; #ifdef EVBUFFER_IOVEC_IS_NATIVE_ nvecs = evbuffer_read_setup_vecs_(buf, howmuch, vecs, NUM_READ_IOVEC, &chainp, 1); #else /* We aren't using the native struct iovec. Therefore, we are on win32. */ struct evbuffer_iovec ev_vecs[NUM_READ_IOVEC]; nvecs = evbuffer_read_setup_vecs_(buf, howmuch, ev_vecs, 2, &chainp, 1); for (i=0; i < nvecs; ++i) WSABUF_FROM_EVBUFFER_IOV(&vecs[i], &ev_vecs[i]); #endif #ifdef _WIN32 { DWORD bytesRead; DWORD flags=0; if (WSARecv(fd, vecs, nvecs, &bytesRead, &flags, NULL, NULL)) { /* The read failed. It might be a close, * or it might be an error. */ if (WSAGetLastError() == WSAECONNABORTED) n = 0; else n = -1; } else n = bytesRead; } #else n = readv(fd, vecs, nvecs); #endif } #else /*!USE_IOVEC_IMPL*/ /* If we don't have FIONREAD, we might waste some space here */ /* XXX we _will_ waste some space here if there is any space left * over on buf->last. */ //2,他会根据howmuch的大小来对存放数据的缓冲区进行扩容,更加证明了只需要将howmuch改大就 //行了 if ((chain = evbuffer_expand_singlechain(buf, howmuch)) == NULL) { result = -1; goto done; } /* We can append new data at this point */ p = chain->buffer + chain->misalign + chain->off; #ifndef _WIN32 n = read(fd, p, howmuch); #else //1,没错,他是调用recv来读数据的,读取的大小是howmuch,只需要将howmuch改大就行了 n = recv(fd, p, howmuch, 0); #endif #endif /* USE_IOVEC_IMPL */ if (n == -1) { result = -1; goto done; } if (n == 0) { result = 0; goto done; } #ifdef USE_IOVEC_IMPL remaining = n; for (i=0; i < nvecs; ++i) { /* can't overflow, since only mutable chains have * huge misaligns. */ size_t space = (size_t) CHAIN_SPACE_LEN(*chainp); /* XXXX This is a kludge that can waste space in perverse * situations. */ if (space > EVBUFFER_CHAIN_MAX) space = EVBUFFER_CHAIN_MAX; if ((ev_ssize_t)space < remaining) { (*chainp)->off += space; remaining -= (int)space; } else { (*chainp)->off += remaining; buf->last_with_datap = chainp; break; } chainp = &(*chainp)->next; } #else chain->off += n; advance_last_with_data(buf); #endif buf->total_len += n; buf->n_add_for_cb += n; /* Tell someone about changes in this buffer */ evbuffer_invoke_callbacks_(buf); result = n; done: EVBUFFER_UNLOCK(buf); return result; }
根据上面的代码分析得出的结论是:
1,将宏定义EVBUFFER_MAX_READ改大
2,将入参howmuch改大。
问题又来了,怎么将howmuch改大?
查看代码发现bufferevent_readcb里面调用了evbuffer_read。代码如下:
static void bufferevent_readcb(evutil_socket_t fd, short event, void *arg) { struct bufferevent *bufev = arg; struct bufferevent_private *bufev_p = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); struct evbuffer *input; int res = 0; short what = BEV_EVENT_READING; ev_ssize_t howmuch = -1, readmax=-1; bufferevent_incref_and_lock_(bufev); if (event == EV_TIMEOUT) { /* Note that we only check for event==EV_TIMEOUT. If * event==EV_TIMEOUT|EV_READ, we can safely ignore the * timeout, since a read has occurred */ what |= BEV_EVENT_TIMEOUT; goto error; } input = bufev->input; /* * If we have a high watermark configured then we don't want to * read more data than would make us reach the watermark. */ if (bufev->wm_read.high != 0) { howmuch = bufev->wm_read.high - evbuffer_get_length(input); /* we somehow lowered the watermark, stop reading */ if (howmuch <= 0) { bufferevent_wm_suspend_read(bufev); goto done; } } //3,只需要修改readmax的值就行了 readmax = bufferevent_get_read_max_(bufev_p); //2,houmuch的值就是在这里设置了 if (howmuch < 0 || howmuch > readmax) /* The use of -1 for "unlimited" * uglifies this code. XXXX */ howmuch = readmax; if (bufev_p->read_suspended) goto done; evbuffer_unfreeze(input, 0); //1,就是这里,发现没,howmuch res = evbuffer_read(input, fd, (int)howmuch); /* XXXX evbuffer_read would do better to take and return ev_ssize_t */ evbuffer_freeze(input, 0); if (res == -1) { int err = evutil_socket_geterror(fd); if (EVUTIL_ERR_RW_RETRIABLE(err)) goto reschedule; if (EVUTIL_ERR_CONNECT_REFUSED(err)) { bufev_p->connection_refused = 1; goto done; } /* error case */ what |= BEV_EVENT_ERROR; } else if (res == 0) { /* eof case */ what |= BEV_EVENT_EOF; } if (res <= 0) goto error; bufferevent_decrement_read_buckets_(bufev_p, res); /* Invoke the user callback - must always be called last */ bufferevent_trigger_nolock_(bufev, EV_READ, 0); goto done; reschedule: goto done; error: bufferevent_disable(bufev, EV_READ); bufferevent_run_eventcb_(bufev, what, 0); done: bufferevent_decref_and_unlock_(bufev); }
查看代码发现,只需要修改readmax的值就行了,而readmax的值是bufev_p这个结构体里面的,所以现在只需要找到这个bufev_p结构体在哪里初始化就OK了。
经过查看代码发现在bufferevent_socket_new(这个函数熟悉吗)这个函数中初始化了bufev_p,代码如下:
struct bufferevent * bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, int options) { struct bufferevent_private *bufev_p; struct bufferevent *bufev; #ifdef _WIN32 if (base && event_base_get_iocp_(base)) return bufferevent_async_new_(base, fd, options); #endif if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL) return NULL; //1,就是这个初始化了buffev_p if (bufferevent_init_common_(bufev_p, base, &bufferevent_ops_socket, options) < 0) { mm_free(bufev_p); return NULL; } bufev = &bufev_p->bev; evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD); event_assign(&bufev->ev_read, bufev->ev_base, fd, EV_READ|EV_PERSIST|EV_FINALIZE, bufferevent_readcb, bufev); event_assign(&bufev->ev_write, bufev->ev_base, fd, EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bufev); evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev); evbuffer_freeze(bufev->input, 0); evbuffer_freeze(bufev->output, 1); return bufev; }
直接看bufferevent_init_common代码
int bufferevent_init_common_(struct bufferevent_private *bufev_private, struct event_base *base, const struct bufferevent_ops *ops, enum bufferevent_options options) { struct bufferevent *bufev = &bufev_private->bev; if (!bufev->input) { if ((bufev->input = evbuffer_new()) == NULL) return -1; } if (!bufev->output) { if ((bufev->output = evbuffer_new()) == NULL) { evbuffer_free(bufev->input); return -1; } } bufev_private->refcnt = 1; bufev->ev_base = base; /* Disable timeouts. */ evutil_timerclear(&bufev->timeout_read); evutil_timerclear(&bufev->timeout_write); bufev->be_ops = ops; //1,没错,就是这个函数初始化了bufev_p bufferevent_ratelim_init_(bufev_private); /* * Set to EV_WRITE so that using bufferevent_write is going to * trigger a callback. Reading needs to be explicitly enabled * because otherwise no data will be available. */ bufev->enabled = EV_WRITE; #ifndef EVENT__DISABLE_THREAD_SUPPORT if (options & BEV_OPT_THREADSAFE) { if (bufferevent_enable_locking_(bufev, NULL) < 0) { /* cleanup */ evbuffer_free(bufev->input); evbuffer_free(bufev->output); bufev->input = NULL; bufev->output = NULL; return -1; } } #endif if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS)) == BEV_OPT_UNLOCK_CALLBACKS) { event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS"); return -1; } if (options & BEV_OPT_UNLOCK_CALLBACKS) event_deferred_cb_init_( &bufev_private->deferred, event_base_get_npriorities(base) / 2, bufferevent_run_deferred_callbacks_unlocked, bufev_private); else event_deferred_cb_init_( &bufev_private->deferred, event_base_get_npriorities(base) / 2, bufferevent_run_deferred_callbacks_locked, bufev_private); bufev_private->options = options; evbuffer_set_parent_(bufev->input, bufev); evbuffer_set_parent_(bufev->output, bufev); return 0; }
直接看bufferevent_ratelim_init
int bufferevent_ratelim_init_(struct bufferevent_private *bev) { bev->rate_limiting = NULL; bev->max_single_read = MAX_SINGLE_READ_DEFAULT; //1,就是你,限制了单次最大读取的字节数 bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT; return 0; }
看到这里,max_single_read,看名字也知道了,单次最大读取的字节数,那么是不是我们只需要把MAX_SINGLE_READ_DEFAULT这个宏定义的值改大就行了呢?
错!libevent给我们提供了一个专门的函数用来修改max_single_read的大小,它就是:
bufferevent_set_max_single_read。这个函数放在bufferevent_socket_new后面调用就行了
总结,要修改4096的限制需要做:
1,在accept回调中获取到socket套接字后设置一下socket缓冲区大小(不设置就是8KB)
2,修改libevent源码的EVBUFFER_MAX_READ改大(改好后重新编译libevent)
3,调用bufferevent_set_max_single_read
后话:
既然libevent已经很贴心的增加了bufferevent_set_max_single_read 这个函数可以设置单次回调最大读取字节数,那么为什么不顺便在这个函数里面也修改一下EVBUFFER_MAX_READ的大小呢(不用宏定义,用全局变量的形式)。