
wrk 及扩展支持 tcp 字节流协议压测
高性能、方便使用的 HTTP(s)
的流量压测工具,结合了多个开源项目开发而成:
- redis 的 ae 事件框架
- luajit
- openssl
- http-parser
减少造*、复用他人的成功项目,赞;我们定制化也走这条路线,代码见此。
要支持 tcp 字节流协议压测,只需要增加一个函数 stream_response
,实现见此
-- data 的结果为 {"error_code":0,"error_msg":"","data":{}}
-- stream_response 表示使用tcp字节流协议压测,对返回 error_code 进行校验,为0表示状态正常。
function stream_response(data)
local t = json.decode(data)
return t["error_code"] == 0
end
lua 脚本
wrk 的第一大特色就是支持 lua 脚本,直接对 c 修改进行压测成本比较高:c 的业务开发速度较慢及每次都需要编译。
脚本则克服了开发时间过长的缺点,使用 luajit 速度可以保证在开发和运行的速度中得到一个平衡。
具体的脚本的变量和函数的逻辑见 官方文档,一定要熟读这个文档,精华部分,比其它个人的表述准确非常多。
一些官方文档之外的补充
脚本文件
分为两个文件组成
- wrk.lua 内置脚本,提供了基本的 API 和一些变量
- 命令行 -s <foo.lua>, foo.lua 用户自己使用的脚本文件,为可选项
线程
每个线程都包含一个自己的lua状态机,所以在脚本文件中定义的变量(如自增的请求计数),不同线程的中的结果是不相同的。
线程的结构是一个用户数据(userdata), 在 c 中的定义为 struct thread
。关联的 addr 也同样为用户数据,在 c 中的定义为 struct addrinfo
,支持取和存的操作(可以存在 = 的左右)
thread:set(key, value),value 不能够为表,在使用 get 操作后会发生 panic,应该是 script_copy_value()
函数中出现栈顶设置错误的问题
加速
如果构造的 request 内容比较耗时的话,优先放在 init() 使用提前生成并且混缓存起来,后面的 request() 直接从缓存的结果中获取。
高性能 && 请求收发逻辑
基于 redis 的 ae 事件框架是。和 ab 不同的是可以充分利用多核资源,减小线程间的切换,以此获得高性能。一般而言,ab 在请求量不是很大的情况下是ok的,但是在请求量到达上w req/s 后,自身就会成为瓶颈。
在每个线程中创建 connections / threads
个连接,并且将这些建立连接的 fd 添加至事件循环中,然后 fd 就绪后,将 readble
和 writeable
函数添加再添加至事件循环中;
writeable
对应着请求发送的逻辑,调用lua接口 reqeust()
获取发送内容就在其中;
在发送完成后会将自身从事件循环中删除,发送(write)可能调用多次,但是一定会等到将缓冲区中的内容全部发送完成,除非发送失败产生错误。
延时发送 delay
delay() 为 lua 的一个可选接口,发挥延迟发送的间隔,单位为毫秒(ms).
当lua脚本中出现了该函数时,writeable 就会从事件循环的文件事件删除自身,并且将 writeable 作为定时任务添加至事件循环中,从而达到延时发送的效果。
readable
对应响应接收的逻辑,对应返回的内容校验,在官方版本中,为 http 请求的解析。对解析的结果进行统计,当判断响应结束时,删除该连接在事件循环中的事件,并且重新进行最初的动作。
我们可以接管这个解析结果的过程,丰富化使用场景。
收发事件简单的时序图
统计
wrk 对两个维度对压测的结果做了统计,结果如下
Thread Stats Avg Stdev Max +/- Stdev
Latency 19.85ms 3.71ms 49.11ms 81.44%
Req/Sec 98.50 16.32 121.00 88.33%
延迟 Latency && 请求速度 Req/Sec
统计每个请求的延迟情况
- Avg, 平均延迟
- Stdev, 样本标准差
- Max, 最大延迟
- +/- Stdev, 正负一倍标准差概率
实现
wrk 的实现为通用结构,适用延迟和请求速度的统计
typedef struct {
uint64_t count; // 样本数量
uint64_t limit; // 最大样本变量限制
uint64_t min; // 最小样本变量
uint64_t max; // 最大样本变量
uint64_t data[]; // 索引为样本变量,值为出现的次数
} stats;
limit
防止 data[]
容量不够,也起到一个剔除不满足要求的情况,如延迟超过 limit 后直接归为 timeout 中。
️ 注意 data[]
数据每个元素的值为出现的次数,而不是样本变量。
样本变量统计
__sync_*
为编译器的同步函数,wrk 将统计的变量作为一个全局存在,故多个线程内就需要一些同步操作保证正确性。
理论上可以将这些统计变量放在线程内,在所以线程结束后,汇集处理,这里就不要这些同步元语了。不过目前还算简单,这样做问题也不大。
n 为样本变量,stats->data[n]
为该样本变量出现的次数。min 和 max 为之后的统计过程加速。
int stats_record(stats *stats, uint64_t n) {
if (n >= stats->limit) return 0;
__sync_fetch_and_add(&stats->data[n], 1);
__sync_fetch_and_add(&stats->count, 1);
uint64_t min = stats->min;
uint64_t max = stats->max;
while (n < min) min = __sync_val_compare_and_swap(&stats->min, min, n);
while (n > max) max = __sync_val_compare_and_swap(&stats->max, max, n);
return 1;
}
样本标准差
数学公式为
\]
wrk 实现如下,L6 处 * stats->data[i]
表示有多个样本变量为 i
1 long double stats_stdev(stats *stats, long double mean) {
2 long double sum = 0.0;
3 if (stats->count < 2) return 0.0;
4 for (uint64_t i = stats->min; i <= stats->max; i++) {
5 if (stats->data[i]) {
6 sum += powl(i - mean, 2) * stats->data[i];
7 }
8 }
9 return sqrtl(sum / (stats->count - 1));
10 }
扩展支持 tcp 压测
由于 wrk 支持 http(s) 的压测,但实际的场景中有很多不是 http 的协议,可以就是很简单的 json 文本协议。
所以这里对 wrk 做一个简单的扩展,支持普通的4层流量压测,功能上支持 json 和 md5。json库使用 yyjson,md5 使用 nginx/md5,充分利用前人的成功经验。
提供的库功能
-- scripts/test.lua
local data = '{"host":"129.168.10.10","os":"linux","open_ports":[22,80,3306]}'
local data_tbl = json.decode(data)
local function print_tables(t, indent)
local tab_indent = ""
for i = 1, indent do tab_indent = tab_indent .. "\t" end
for k, v in pairs(t) do
if type(v) == "table" then
print_tables(v, indent + 1)
else
print(string.format("%s %s\t%s", tab_indent, tostring(k), tostring(v)))
end
end
end
print_tables(data_tbl, 0)
-- $ ./wrk -t 1 -c 1 -d3s -L -s scripts/test.lua https://www.baidu.com
-- host 129.168.10.10
-- os linux
-- 1 22
-- 2 80
-- 3 3306
json
- json.encode, json转字符串
- json.decode, 字符串转json
- json.encode_empty_table_as_object, 空table时作为 object 使用(参考 openresty)
md5
- md5.sum, 16字节md5sum
- md5.sumhexa, 32字节16进制格式的sum
支持 tcp 文本的压测
主要修改提交为 点此查看。
将wrk扩展为支持普通的tcp流量,主要是在 readable 中接管返回数据的解析过程。也就是 L23-L13
,这个地方改动后应该形成一个分支,如果定义了支持 tcp,就走tcp的解析逻辑。
1 static void socket_readable(aeEventLoop *loop, int fd, void *data, int mask) {
2 connection *c = data;
3 size_t n;
4
5 do {
6 switch (sock.read(c, &n)) {
7 case OK: break;
8 case ERROR: goto error;
9 case RETRY: return;
10 }
11
12 if (http_parser_execute(&c->parser, &parser_settings, c->buf, n) != n) goto error;
13 if (n == 0 && !http_body_is_final(&c->parser)) goto error;
14
15 c->thread->bytes += n;
16 } while (n == RECVBUF && sock.readable(c) > 0);
17
18 return;
19
20 error:
21 c->thread->errors.read++;
22 reconnect_socket(c->thread, c);
23 }
如何决定为tcp文本协议解析
想来想去,为不破坏原来的接口定义并且尽量减少改动,通过以lua脚本中是否定义 stream_response()
来决定是否支持tcp的文本协议:
bool script_want_stream_response(lua_State *L) {
return script_is_function(L, "stream_response");
}
使用函数指针 response_complete
来代替分支逻辑,减少干扰。 在程序的起始阶段,根据是否支持为tcp文本协议将具体的处理函数赋值给函数指针。
函数指针的定义为
// 旧的 response_complete -> message_complete
typedef bool (*response_complete_func)(connection *c, size_t n);
static response_complete_func response_complete;
修改后的结果为, socket_readable L12-L3 被替换为一个函数指针执行。
1 static void socket_readable(aeEventLoop *loop, int fd, void *data, int mask) {
...
12 if (!response_complete(c, n))
13 goto error;
...
23 }
导出tcp文本协议解析
除了代码位置调整及函数名修改,旧的关于 http 流量的解析逻辑不变。
stream_response_complete
为 response_complete
的具体实现,script_stream_response
将响应内容导出至lua的 stream_response
处理。
当响应的内容长度为0时,直接重连。
1 bool stream_response_complete(connection *c, size_t n) {
2 uint64_t now = time_us();
3 thread *thread = c->thread;
4
5 thread->complete++;
6 thread->requests++;
7
8 if (!script_stream_response(thread->L, c->buf, n))
9 thread->errors.status++;
10
11 if (!stats_record(statistics.latency, now - c->start))
12 thread->errors.timeout++;
13
14 c->delayed = cfg.delay;
15 aeCreateFileEvent(thread->loop, c->fd, AE_WRITABLE, socket_writeable, c);
16
17 if (n == 0)
18 reconnect_socket(thread, c);
19
20 return true;
21 }
22
23 bool script_stream_response(lua_State *L, const char *data, size_t n){
24 lua_getglobal(L, "stream_response");
25 lua_pushlstring(L, data, n);
26 lua_call(L, 1, 1);
27 bool ok = lua_toboolean(L, -1);
28 lua_pop(L, 1);
29 return ok;
30 }
其它修改
命令行中的url参数不带 http 的scheme,则自动补全为 http 避免相关逻辑导致退出;因为对于一个 tcp 的流量压测,加一个 http 的scheme看起来怪怪的。
TODO
- 修复
script_copy_value()
不能够 copy 复合表的问题 - 增加类似端口敲门的功能:在每个tcp连接建立后,先发送一段字节流进行验证请求是否合法
- 支持 unix domain socket 的字节流压测
参考
- 官方脚本文档,对相关脚本的描述,一定要熟悉
- 云风的lua5.3中文文档,luajit 使用的是 lua5.1 的语法,但是云风的这个文档足够了
- nginx/md5, nginx 的md5模块
- yyjson,json解析器
- 导出jsoncpp给lua使用,一个正确递归处理复合表的方法
- zxhio/wrk, 魔改后的wrk