libevent源码分析-修改read回调每次最大只能获取到4096字节的限制

时间:2022-09-16 00:14:03


背景
  有次在处理一个客户需求时,需要我这里开一个tcp的服务端,然后对方将图片进行base64编码后发送过来,由于图片比较大,再加上base64编码,结果导致整个报文长度达到1500KB,然后我用libevent进行接收的时候,我原以为进入回调后可以直接从evbuffer中将这1500KB的报文一次性获取出来,但事实不是这样的,进入回调后只能获取到2920字节的报文,然后我判断长度不对,直接return,神奇的是这次获取到了7016Byte,比上一次进入回调多了4096字节,然后再return它又会再次进入回调,这次又会比上次能多获取到4096字节,以此类推,对方只发送了一个1500KB的报文,我这里一共反复进了300多次回调才把整个报文接收完成(1500KB接收完成了就不会再进入回调了)。

 现在有两个问题:
1,为什么第一次进入回调只能接收到2920字节?
2,为什么每次进入回调只能比上次多获取4096字节,不能一次性全获取出来吗?

对于第一个问题,我用wireshark抓包发现,由于1500KB的报文太大,对方发送过来的时候TCP底层已经对这个1500KB的报文进行了拆包发送处理,刚刚好第一个包的大小就是2920字节,这时候libevent检测到有数据后就第一时间进入了read回调,而且进入回调后我获取当前socket缓冲区可读的字节大小时发现居然是0,也就是说这2920字节发送过来后libevent就第一时间将这些数据从socket缓冲区读取出来并放到evbuffer中然后立即进入了read回调(我是在进入read回调的第一行代码就是获取当前socket缓冲区的可读字节数,代码如下),然后再次进入回调的时候libevent又从缓冲区读取了4096字节出来放到evbuffer中,这时候我再获取socket缓冲区可读字节数的时候发现是8192字节,这说明socket缓冲区的默认大小是8KB。

  #define EVBUFFER_MAX_READ 40960
  //获取当前socket缓冲区有多少字节数据,入参是socket连接描述符
  static int get_n_bytes_readable_on_socket(evutil_socket_t fd)
  {
   unsigned long lng = EVBUFFER_MAX_READ;
   if (ioctlsocket(fd, FIONREAD, &lng) < 0)
    return -1;
   return (int)lng;
  }

梳理的过程如下:
→客户端发送1500KB报文
→客户端的TCP底层拆包发送
→服务端获取到第一个包大小时2920字节
→libevent获取到2920字节并进入read回调(此时不把这2920字节数据取出来,直接return,让数据还保存在evbuffer中)
→libevent再从socket缓冲区获取4096字节并进入回调(此时还有1400多KB的数据不知道存在哪里,反正不在socket缓冲区里面,因为socket缓冲区默认只有8KB)
→libevent从缓冲区取出4096字节后,缓冲区有了4KB的空闲空间,这时候TCP底层又往缓冲区里面放了4KB数据
→libevent再从缓冲区取出4KB放到evbuffer并进入回调,TCP底层又再往缓冲区放KB
→以此类推,直到libevent把这1500KB的数据全取出来放到evbuffer中。

分析到这里就发现限制每次进入回调获取数据长度只能是4096字节的原因有两个:
1,socket缓冲区的大小,默认只有8KB。
2,libevent自身只能最大获取4096字节,估计libevent源码里面限制了,要修改libevent源码。

修改socket缓冲区大小简单,只要在libevent的accept回调里面设置就行了,代码如下

	int nRecvBufLen = 128 * 1024; //设置为K
	setsockopt( fd, SOL_SOCKET, SO_RCVBUF, ( const char* )&nRecvBufLen, sizeof( int ) ); 

那么如何修改libevent源码呢?
首先必须要直到libevent接收到数据后的处理流程,如下
TCP底层接收数据→调用evbuffer_read来从缓冲区读取数据→进入read回调。需要说明的是evbuffer_read函数的调用时libevent自己调用了,我们是不管的,我们只需要在进入回调后处理业务数据就行了。

直奔主题,修改evbuffer_read函数

int
evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
{
	struct evbuffer_chain **chainp;
	int n;
	int result;

#ifdef USE_IOVEC_IMPL
	int nvecs, i, remaining;
#else
	struct evbuffer_chain *chain;
	unsigned char *p;
#endif

	EVBUFFER_LOCK(buf);

	if (buf->freeze_end) {
		result = -1;
		goto done;
	}
//3,重点来了,n是当前缓冲区的的字节数,EVBUFFER_MAX_READ的值就是4096,改大,但是还没完
	n = get_n_bytes_readable_on_socket(fd);
	if (n <= 0 || n > EVBUFFER_MAX_READ)
		n = EVBUFFER_MAX_READ;
	if (howmuch < 0 || howmuch > n)  //4,howmuch是入参,还需要将howmuch改大才行
		howmuch = n;

#ifdef USE_IOVEC_IMPL
	/* Since we can use iovecs, we're willing to use the last
	 * NUM_READ_IOVEC chains. */
	if (evbuffer_expand_fast_(buf, howmuch, NUM_READ_IOVEC) == -1) {
		result = -1;
		goto done;
	} else {
		IOV_TYPE vecs[NUM_READ_IOVEC];
#ifdef EVBUFFER_IOVEC_IS_NATIVE_
		nvecs = evbuffer_read_setup_vecs_(buf, howmuch, vecs,
		    NUM_READ_IOVEC, &chainp, 1);
#else
		/* We aren't using the native struct iovec.  Therefore,
		   we are on win32. */
		struct evbuffer_iovec ev_vecs[NUM_READ_IOVEC];
		nvecs = evbuffer_read_setup_vecs_(buf, howmuch, ev_vecs, 2,
		    &chainp, 1);

		for (i=0; i < nvecs; ++i)
			WSABUF_FROM_EVBUFFER_IOV(&vecs[i], &ev_vecs[i]);
#endif

#ifdef _WIN32
		{
			DWORD bytesRead;
			DWORD flags=0;
			if (WSARecv(fd, vecs, nvecs, &bytesRead, &flags, NULL, NULL)) {
				/* The read failed. It might be a close,
				 * or it might be an error. */
				if (WSAGetLastError() == WSAECONNABORTED)
					n = 0;
				else
					n = -1;
			} else
				n = bytesRead;
		}
#else
		n = readv(fd, vecs, nvecs);
#endif
	}

#else /*!USE_IOVEC_IMPL*/
	/* If we don't have FIONREAD, we might waste some space here */
	/* XXX we _will_ waste some space here if there is any space left
	 * over on buf->last. */
	//2,他会根据howmuch的大小来对存放数据的缓冲区进行扩容,更加证明了只需要将howmuch改大就
//行了
	if ((chain = evbuffer_expand_singlechain(buf, howmuch)) == NULL) {
		result = -1;
		goto done;
	}

	/* We can append new data at this point */
	p = chain->buffer + chain->misalign + chain->off;

#ifndef _WIN32
	n = read(fd, p, howmuch);
#else //1,没错,他是调用recv来读数据的,读取的大小是howmuch,只需要将howmuch改大就行了
	n = recv(fd, p, howmuch, 0);
#endif
#endif /* USE_IOVEC_IMPL */

	if (n == -1) {
		result = -1;
		goto done;
	}
	if (n == 0) {
		result = 0;
		goto done;
	}

#ifdef USE_IOVEC_IMPL
	remaining = n;
	for (i=0; i < nvecs; ++i) {
		/* can't overflow, since only mutable chains have
		 * huge misaligns. */
		size_t space = (size_t) CHAIN_SPACE_LEN(*chainp);
		/* XXXX This is a kludge that can waste space in perverse
		 * situations. */
		if (space > EVBUFFER_CHAIN_MAX)
			space = EVBUFFER_CHAIN_MAX;
		if ((ev_ssize_t)space < remaining) {
			(*chainp)->off += space;
			remaining -= (int)space;
		} else {
			(*chainp)->off += remaining;
			buf->last_with_datap = chainp;
			break;
		}
		chainp = &(*chainp)->next;
	}
#else
	chain->off += n;
	advance_last_with_data(buf);
#endif
	buf->total_len += n;
	buf->n_add_for_cb += n;

	/* Tell someone about changes in this buffer */
	evbuffer_invoke_callbacks_(buf);
	result = n;
done:
	EVBUFFER_UNLOCK(buf);
	return result;
}


根据上面的代码分析得出的结论是:
1,将宏定义EVBUFFER_MAX_READ改大
2,将入参howmuch改大。

问题又来了,怎么将howmuch改大?
查看代码发现bufferevent_readcb里面调用了evbuffer_read。代码如下:

static void
bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
{
	struct bufferevent *bufev = arg;
	struct bufferevent_private *bufev_p =
	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
	struct evbuffer *input;
	int res = 0;
	short what = BEV_EVENT_READING;
	ev_ssize_t howmuch = -1, readmax=-1;

	bufferevent_incref_and_lock_(bufev);

	if (event == EV_TIMEOUT) {
		/* Note that we only check for event==EV_TIMEOUT. If
		 * event==EV_TIMEOUT|EV_READ, we can safely ignore the
		 * timeout, since a read has occurred */
		what |= BEV_EVENT_TIMEOUT;
		goto error;
	}

	input = bufev->input;

	/*
	 * If we have a high watermark configured then we don't want to
	 * read more data than would make us reach the watermark.
	 */
	if (bufev->wm_read.high != 0) {
		howmuch = bufev->wm_read.high - evbuffer_get_length(input);
		/* we somehow lowered the watermark, stop reading */
		if (howmuch <= 0) {
			bufferevent_wm_suspend_read(bufev);
			goto done;
		}
	}
//3,只需要修改readmax的值就行了
	readmax = bufferevent_get_read_max_(bufev_p);
//2,houmuch的值就是在这里设置了
	if (howmuch < 0 || howmuch > readmax) /* The use of -1 for "unlimited"
					       * uglifies this code. XXXX */
		howmuch = readmax;
	if (bufev_p->read_suspended)
		goto done;

	evbuffer_unfreeze(input, 0);
//1,就是这里,发现没,howmuch
	res = evbuffer_read(input, fd, (int)howmuch); /* XXXX evbuffer_read would do better to take and return ev_ssize_t */
	evbuffer_freeze(input, 0);

	if (res == -1) {
		int err = evutil_socket_geterror(fd);
		if (EVUTIL_ERR_RW_RETRIABLE(err))
			goto reschedule;
		if (EVUTIL_ERR_CONNECT_REFUSED(err)) {
			bufev_p->connection_refused = 1;
			goto done;
		}
		/* error case */
		what |= BEV_EVENT_ERROR;
	} else if (res == 0) {
		/* eof case */
		what |= BEV_EVENT_EOF;
	}

	if (res <= 0)
		goto error;

	bufferevent_decrement_read_buckets_(bufev_p, res);

	/* Invoke the user callback - must always be called last */
	bufferevent_trigger_nolock_(bufev, EV_READ, 0);

	goto done;

 reschedule:
	goto done;

 error:
	bufferevent_disable(bufev, EV_READ);
	bufferevent_run_eventcb_(bufev, what, 0);

 done:
	bufferevent_decref_and_unlock_(bufev);
}


查看代码发现,只需要修改readmax的值就行了,而readmax的值是bufev_p这个结构体里面的,所以现在只需要找到这个bufev_p结构体在哪里初始化就OK了。
经过查看代码发现在bufferevent_socket_new(这个函数熟悉吗)这个函数中初始化了bufev_p,代码如下:

struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
    int options)
{
	struct bufferevent_private *bufev_p;
	struct bufferevent *bufev;

#ifdef _WIN32
	if (base && event_base_get_iocp_(base))
		return bufferevent_async_new_(base, fd, options);
#endif

	if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL)
		return NULL;
	//1,就是这个初始化了buffev_p
	if (bufferevent_init_common_(bufev_p, base, &bufferevent_ops_socket,
				    options) < 0) {
		mm_free(bufev_p);
		return NULL;
	}
	bufev = &bufev_p->bev;
	evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);

	event_assign(&bufev->ev_read, bufev->ev_base, fd,
	    EV_READ|EV_PERSIST|EV_FINALIZE, bufferevent_readcb, bufev);
	event_assign(&bufev->ev_write, bufev->ev_base, fd,
	    EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bufev);

	evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);

	evbuffer_freeze(bufev->input, 0);
	evbuffer_freeze(bufev->output, 1);

	return bufev;
}


直接看bufferevent_init_common代码

int
bufferevent_init_common_(struct bufferevent_private *bufev_private,
    struct event_base *base,
    const struct bufferevent_ops *ops,
    enum bufferevent_options options)
{
	struct bufferevent *bufev = &bufev_private->bev;

	if (!bufev->input) {
		if ((bufev->input = evbuffer_new()) == NULL)
			return -1;
	}

	if (!bufev->output) {
		if ((bufev->output = evbuffer_new()) == NULL) {
			evbuffer_free(bufev->input);
			return -1;
		}
	}

	bufev_private->refcnt = 1;
	bufev->ev_base = base;

	/* Disable timeouts. */
	evutil_timerclear(&bufev->timeout_read);
	evutil_timerclear(&bufev->timeout_write);

	bufev->be_ops = ops;
//1,没错,就是这个函数初始化了bufev_p
	bufferevent_ratelim_init_(bufev_private);

	/*
	 * Set to EV_WRITE so that using bufferevent_write is going to
	 * trigger a callback.  Reading needs to be explicitly enabled
	 * because otherwise no data will be available.
	 */
	bufev->enabled = EV_WRITE;

#ifndef EVENT__DISABLE_THREAD_SUPPORT
	if (options & BEV_OPT_THREADSAFE) {
		if (bufferevent_enable_locking_(bufev, NULL) < 0) {
			/* cleanup */
			evbuffer_free(bufev->input);
			evbuffer_free(bufev->output);
			bufev->input = NULL;
			bufev->output = NULL;
			return -1;
		}
	}
#endif
	if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS))
	    == BEV_OPT_UNLOCK_CALLBACKS) {
		event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS");
		return -1;
	}
	if (options & BEV_OPT_UNLOCK_CALLBACKS)
		event_deferred_cb_init_(
		    &bufev_private->deferred,
		    event_base_get_npriorities(base) / 2,
		    bufferevent_run_deferred_callbacks_unlocked,
		    bufev_private);
	else
		event_deferred_cb_init_(
		    &bufev_private->deferred,
		    event_base_get_npriorities(base) / 2,
		    bufferevent_run_deferred_callbacks_locked,
		    bufev_private);

	bufev_private->options = options;

	evbuffer_set_parent_(bufev->input, bufev);
	evbuffer_set_parent_(bufev->output, bufev);

	return 0;
}

直接看bufferevent_ratelim_init

int
bufferevent_ratelim_init_(struct bufferevent_private *bev)
{
	bev->rate_limiting = NULL;
	bev->max_single_read = MAX_SINGLE_READ_DEFAULT;  //1,就是你,限制了单次最大读取的字节数
	bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;

	return 0;
}

看到这里,max_single_read,看名字也知道了,单次最大读取的字节数,那么是不是我们只需要把MAX_SINGLE_READ_DEFAULT这个宏定义的值改大就行了呢?
错!libevent给我们提供了一个专门的函数用来修改max_single_read的大小,它就是:
bufferevent_set_max_single_read。这个函数放在bufferevent_socket_new后面调用就行了

总结,要修改4096的限制需要做:
1,在accept回调中获取到socket套接字后设置一下socket缓冲区大小(不设置就是8KB)
2,修改libevent源码的EVBUFFER_MAX_READ改大(改好后重新编译libevent)
3,调用bufferevent_set_max_single_read


后话:
既然libevent已经很贴心的增加了bufferevent_set_max_single_read 这个函数可以设置单次回调最大读取字节数,那么为什么不顺便在这个函数里面也修改一下EVBUFFER_MAX_READ的大小呢(不用宏定义,用全局变量的形式)。