wrk 及扩展支持 tcp 字节流协议压测

时间:2023-03-09 18:30:33
wrk 及扩展支持 tcp 字节流协议压测

wrk 及扩展支持 tcp 字节流协议压测

高性能、方便使用的 HTTP(s) 的流量压测工具,结合了多个开源项目开发而成:

  1. redis 的 ae 事件框架
  2. luajit
  3. openssl
  4. http-parser

减少造*、复用他人的成功项目,赞;我们定制化也走这条路线,代码见此

要支持 tcp 字节流协议压测,只需要增加一个函数 stream_response实现见此

-- data 的结果为 {"error_code":0,"error_msg":"","data":{}}
-- stream_response 表示使用tcp字节流协议压测,对返回 error_code 进行校验,为0表示状态正常。 function stream_response(data)
local t = json.decode(data)
return t["error_code"] == 0
end

lua 脚本

wrk 的第一大特色就是支持 lua 脚本,直接对 c 修改进行压测成本比较高:c 的业务开发速度较慢及每次都需要编译。

脚本则克服了开发时间过长的缺点,使用 luajit 速度可以保证在开发和运行的速度中得到一个平衡。

具体的脚本的变量和函数的逻辑见 官方文档,一定要熟读这个文档,精华部分,比其它个人的表述准确非常多。

一些官方文档之外的补充

脚本文件

分为两个文件组成

  1. wrk.lua 内置脚本,提供了基本的 API 和一些变量
  2. 命令行 -s <foo.lua>, foo.lua 用户自己使用的脚本文件,为可选项

线程

每个线程都包含一个自己的lua状态机,所以在脚本文件中定义的变量(如自增的请求计数),不同线程的中的结果是不相同的。

线程的结构是一个用户数据(userdata), 在 c 中的定义为 struct thread。关联的 addr 也同样为用户数据,在 c 中的定义为 struct addrinfo,支持取和存的操作(可以存在 = 的左右)

thread:set(key, value),value 不能够为表,在使用 get 操作后会发生 panic,应该是 script_copy_value() 函数中出现栈顶设置错误的问题

加速

如果构造的 request 内容比较耗时的话,优先放在 init() 使用提前生成并且混缓存起来,后面的 request() 直接从缓存的结果中获取。

高性能 && 请求收发逻辑

基于 redis 的 ae 事件框架是。和 ab 不同的是可以充分利用多核资源,减小线程间的切换,以此获得高性能。一般而言,ab 在请求量不是很大的情况下是ok的,但是在请求量到达上w req/s 后,自身就会成为瓶颈。

在每个线程中创建 connections / threads 个连接,并且将这些建立连接的 fd 添加至事件循环中,然后 fd 就绪后,将 readblewriteable 函数添加再添加至事件循环中;

writeable

对应着请求发送的逻辑,调用lua接口 reqeust() 获取发送内容就在其中;

在发送完成后会将自身从事件循环中删除,发送(write)可能调用多次,但是一定会等到将缓冲区中的内容全部发送完成,除非发送失败产生错误。

延时发送 delay

delay() 为 lua 的一个可选接口,发挥延迟发送的间隔,单位为毫秒(ms).

当lua脚本中出现了该函数时,writeable 就会从事件循环的文件事件删除自身,并且将 writeable 作为定时任务添加至事件循环中,从而达到延时发送的效果。

readable

对应响应接收的逻辑,对应返回的内容校验,在官方版本中,为 http 请求的解析。对解析的结果进行统计,当判断响应结束时,删除该连接在事件循环中的事件,并且重新进行最初的动作。

我们可以接管这个解析结果的过程,丰富化使用场景。

收发事件简单的时序图

wrk 及扩展支持 tcp 字节流协议压测

统计

wrk 对两个维度对压测的结果做了统计,结果如下

  Thread Stats   Avg      Stdev     Max   +/- Stdev
Latency 19.85ms 3.71ms 49.11ms 81.44%
Req/Sec 98.50 16.32 121.00 88.33%

延迟 Latency && 请求速度 Req/Sec

统计每个请求的延迟情况

  • Avg, 平均延迟
  • Stdev, 样本标准差
  • Max, 最大延迟
  • +/- Stdev, 正负一倍标准差概率

实现

wrk 的实现为通用结构,适用延迟和请求速度的统计

typedef struct {
uint64_t count; // 样本数量
uint64_t limit; // 最大样本变量限制
uint64_t min; // 最小样本变量
uint64_t max; // 最大样本变量
uint64_t data[]; // 索引为样本变量,值为出现的次数
} stats;

limit 防止 data[] 容量不够,也起到一个剔除不满足要求的情况,如延迟超过 limit 后直接归为 timeout 中。

️ 注意 data[] 数据每个元素的值为出现的次数,而不是样本变量。

样本变量统计

__sync_* 为编译器的同步函数,wrk 将统计的变量作为一个全局存在,故多个线程内就需要一些同步操作保证正确性。

理论上可以将这些统计变量放在线程内,在所以线程结束后,汇集处理,这里就不要这些同步元语了。不过目前还算简单,这样做问题也不大。

n 为样本变量,stats->data[n] 为该样本变量出现的次数。min 和 max 为之后的统计过程加速。

int stats_record(stats *stats, uint64_t n) {
if (n >= stats->limit) return 0;
__sync_fetch_and_add(&stats->data[n], 1);
__sync_fetch_and_add(&stats->count, 1);
uint64_t min = stats->min;
uint64_t max = stats->max;
while (n < min) min = __sync_val_compare_and_swap(&stats->min, min, n);
while (n > max) max = __sync_val_compare_and_swap(&stats->max, max, n);
return 1;
}

样本标准差

数学公式为

\[\delta = \sqrt{ \frac{\Sigma(x_i-\bar{x})^2}{n-1}}
\]

wrk 实现如下,L6 处 * stats->data[i] 表示有多个样本变量为 i

 1 long double stats_stdev(stats *stats, long double mean) {
2 long double sum = 0.0;
3 if (stats->count < 2) return 0.0;
4 for (uint64_t i = stats->min; i <= stats->max; i++) {
5 if (stats->data[i]) {
6 sum += powl(i - mean, 2) * stats->data[i];
7 }
8 }
9 return sqrtl(sum / (stats->count - 1));
10 }

扩展支持 tcp 压测

由于 wrk 支持 http(s) 的压测,但实际的场景中有很多不是 http 的协议,可以就是很简单的 json 文本协议。

所以这里对 wrk 做一个简单的扩展,支持普通的4层流量压测,功能上支持 json 和 md5。json库使用 yyjson,md5 使用 nginx/md5,充分利用前人的成功经验。

提供的库功能

--  scripts/test.lua
local data = '{"host":"129.168.10.10","os":"linux","open_ports":[22,80,3306]}'
local data_tbl = json.decode(data) local function print_tables(t, indent)
local tab_indent = ""
for i = 1, indent do tab_indent = tab_indent .. "\t" end for k, v in pairs(t) do
if type(v) == "table" then
print_tables(v, indent + 1)
else
print(string.format("%s %s\t%s", tab_indent, tostring(k), tostring(v)))
end
end
end print_tables(data_tbl, 0) -- $ ./wrk -t 1 -c 1 -d3s -L -s scripts/test.lua https://www.baidu.com
-- host 129.168.10.10
-- os linux
-- 1 22
-- 2 80
-- 3 3306

json

  • json.encode, json转字符串
  • json.decode, 字符串转json
  • json.encode_empty_table_as_object, 空table时作为 object 使用(参考 openresty)

md5

  • md5.sum, 16字节md5sum
  • md5.sumhexa, 32字节16进制格式的sum

支持 tcp 文本的压测

主要修改提交为 点此查看

将wrk扩展为支持普通的tcp流量,主要是在 readable 中接管返回数据的解析过程。也就是 L23-L13,这个地方改动后应该形成一个分支,如果定义了支持 tcp,就走tcp的解析逻辑。

 1 static void socket_readable(aeEventLoop *loop, int fd, void *data, int mask) {
2 connection *c = data;
3 size_t n;
4
5 do {
6 switch (sock.read(c, &n)) {
7 case OK: break;
8 case ERROR: goto error;
9 case RETRY: return;
10 }
11
12 if (http_parser_execute(&c->parser, &parser_settings, c->buf, n) != n) goto error;
13 if (n == 0 && !http_body_is_final(&c->parser)) goto error;
14
15 c->thread->bytes += n;
16 } while (n == RECVBUF && sock.readable(c) > 0);
17
18 return;
19
20 error:
21 c->thread->errors.read++;
22 reconnect_socket(c->thread, c);
23 }

如何决定为tcp文本协议解析

想来想去,为不破坏原来的接口定义并且尽量减少改动,通过以lua脚本中是否定义 stream_response() 来决定是否支持tcp的文本协议:

bool script_want_stream_response(lua_State *L) {
return script_is_function(L, "stream_response");
}

使用函数指针 response_complete 来代替分支逻辑,减少干扰。 在程序的起始阶段,根据是否支持为tcp文本协议将具体的处理函数赋值给函数指针。

函数指针的定义为

// 旧的 response_complete -> message_complete

typedef bool (*response_complete_func)(connection *c, size_t n);
static response_complete_func response_complete;

修改后的结果为, socket_readable L12-L3 被替换为一个函数指针执行。

 1 static void socket_readable(aeEventLoop *loop, int fd, void *data, int mask) {
...
12 if (!response_complete(c, n))
13 goto error;
...
23 }

导出tcp文本协议解析

除了代码位置调整及函数名修改,旧的关于 http 流量的解析逻辑不变。

stream_response_completeresponse_complete 的具体实现,script_stream_response 将响应内容导出至lua的 stream_response 处理。

当响应的内容长度为0时,直接重连。

 1 bool stream_response_complete(connection *c, size_t n) {
2 uint64_t now = time_us();
3 thread *thread = c->thread;
4
5 thread->complete++;
6 thread->requests++;
7
8 if (!script_stream_response(thread->L, c->buf, n))
9 thread->errors.status++;
10
11 if (!stats_record(statistics.latency, now - c->start))
12 thread->errors.timeout++;
13
14 c->delayed = cfg.delay;
15 aeCreateFileEvent(thread->loop, c->fd, AE_WRITABLE, socket_writeable, c);
16
17 if (n == 0)
18 reconnect_socket(thread, c);
19
20 return true;
21 }
22
23 bool script_stream_response(lua_State *L, const char *data, size_t n){
24 lua_getglobal(L, "stream_response");
25 lua_pushlstring(L, data, n);
26 lua_call(L, 1, 1);
27 bool ok = lua_toboolean(L, -1);
28 lua_pop(L, 1);
29 return ok;
30 }

其它修改

命令行中的url参数不带 http 的scheme,则自动补全为 http 避免相关逻辑导致退出;因为对于一个 tcp 的流量压测,加一个 http 的scheme看起来怪怪的。

TODO

  1. 修复 script_copy_value() 不能够 copy 复合表的问题
  2. 增加类似端口敲门的功能:在每个tcp连接建立后,先发送一段字节流进行验证请求是否合法
  3. 支持 unix domain socket 的字节流压测

参考

  1. 官方脚本文档,对相关脚本的描述,一定要熟悉
  2. 云风的lua5.3中文文档,luajit 使用的是 lua5.1 的语法,但是云风的这个文档足够了
  3. nginx/md5, nginx 的md5模块
  4. yyjson,json解析器
  5. 导出jsoncpp给lua使用,一个正确递归处理复合表的方法
  6. zxhio/wrk, 魔改后的wrk