dbporxy-mysql 协议流转图

时间:2021-05-07 14:36:15

dbproxy 支持 in 查询, 当in 中的字段 属于不同的分表时, QPS约为 5000左右, 如果为 等值查询,  qps的30000左右

主要原因是 对于in操作,会产生多个不同分表的sql, 这些sql的发送完全是顺序的, 发送第二个sql, 要完全等到 第一个SQL结果的返回,

src/network-mysqld.c

void network_mysqld_con_handle(int event_fd, short events, void *user_data) {
switch (con->state) {
case CON_STATE_INIT:
//执行NETWORK_MYSQLD_PLUGIN_PROTO(proxy_init)
plugin_call(srv, con, con->state);
break; case CON_STATE_CONNECT_SERVER:
//执行NETWORK_MYSQLD_PLUGIN_PROTO(proxy_connect_server)
//准备发给client的随机挑战数
plugin_call(srv, con, con->state);
break; case CON_STATE_SEND_HANDSHAKE:
//向client发送挑战随机数
network_mysqld_write(srv, con->client); //此处没有对应的函数指针, 只是设置下state 为 CON_STATE_READ_AUTH;
plugin_call(srv, con, con->state);
break; case CON_STATE_READ_AUTH:
//读取client返回数据
network_mysqld_read(srv, recv_sock); //NETWORK_MYSQLD_PLUGIN_PROTO(proxy_read_auth) 设置状态为CON_STATE_SEND_AUTH_RESULT
plugin_call(srv, con, con->state); case CON_STATE_SEND_AUTH_RESULT:
//向client写验证返回数据
network_mysqld_write(srv, con->client); //没有对应的函数指针, 设置状态为 CON_STATE_READ_QUERY, 在这里开始 读取client 发送来的sql
plugin_call(srv, con, con->state);
break; case CON_STATE_READ_QUERY:
network_socket *recv_sock = con->client;
network_mysqld_read(srv, recv_sock); //执行NETWORK_MYSQLD_PLUGIN_PROTO(proxy_read_query), 读取用户sql, 设置状态为CON_STATE_SEND_QUERY
plugin_call(srv, con, con->state);
break; case CON_STATE_SEND_QUERY:
//发送到server
network_mysqld_write(srv, con->server); //设置状态 CON_STATE_READ_QUERY_RESULT
break; case CON_STATE_READ_QUERY_RESULT:
//读取server返回数据
network_mysqld_read(srv, recv_sock); //执行NETWORK_MYSQLD_PLUGIN_PROTO(proxy_read_query_result), 设置状态为CON_STATE_SEND_QUERY_RESULT
plugin_call(srv, con, con->state);
break; case CON_STATE_SEND_QUERY_RESULT:
//先发送给client
network_mysqld_write(srv, con->client); //执行NETWORK_MYSQLD_PLUGIN_PROTO(proxy_send_query_result)
////如果st->injected.queries队列中无数据, 设置状态为CON_STATE_READ_QUERY, 读取client 的新sql
//否则, 继续向server发送队列中的sql
plugin_call(srv, con, con->state)
break;
}
}

plugins/proxy/proxy-plugin.c

//初始化
NETWORK_MYSQLD_PLUGIN_PROTO(proxy_init) {
network_mysqld_con_lua_t *st = con->plugin_con_state;
st = network_mysqld_con_lua_new();
con->plugin_con_state = st;
con->state = CON_STATE_CONNECT_SERVER;
return NETWORK_SOCKET_SUCCESS;
} //连接backend, 就是准备些随机挑战数, 用于连接real mysql 用
NETWORK_MYSQLD_PLUGIN_PROTO(proxy_connect_server) {
con->state = CON_STATE_SEND_HANDSHAKE;
} //读取client发来的验证信息
NETWORK_MYSQLD_PLUGIN_PROTO(proxy_read_auth) {
con->state = CON_STATE_SEND_AUTH_RESULT;
} NETWORK_MYSQLD_PLUGIN_PROTO(proxy_read_query) {
network_injection_queue_reset(st->injected.queries);
sqls = sql_parse(con, tokens);
for (i = ; i < sqls->len; ++i) {
inj = injection_new(id, sqls->pdata[i]);
inj->resultset_is_needed = TRUE;
g_queue_push_tail(st->injected.queries, inj);
} inj = g_queue_peek_head(st->injected.queries);
network_mysqld_queue_reset(send_sock);
network_mysqld_queue_append(send_sock, send_sock->send_queue, S(inj->query)); con->state = CON_STATE_SEND_QUERY;
} NETWORK_MYSQLD_PLUGIN_PROTO(proxy_read_query_result) {
if ( != st->injected.queries->length) {
//在读取server返回数据时, 弹出st->injected.queries的头元素
inj = g_queue_pop_head(st->injected.queries); con->state = CON_STATE_READ_QUERY;
}
} //如果st->injected.queries队列中无数据, 设置状态为CON_STATE_READ_QUERY, 读取client 的新sql
//否则, 继续向server发送队列中的sql
NETWORK_MYSQLD_PLUGIN_PROTO(proxy_send_query_result) {
if (st->injected.queries->length == ) {
con->state = CON_STATE_READ_QUERY;
return NETWORK_SOCKET_SUCCESS;
}
//发送给client后, 将st->injected.queries的头元素 放到send_queue中
inj = g_queue_peek_head(st->injected.queries);
network_mysqld_queue_reset(send_sock);
network_mysqld_queue_append(send_sock, send_sock->send_queue, S(inj->query));
con->state = CON_STATE_SEND_QUERY;
return NETWORK_SOCKET_SUCCESS;
}
network_mysqld_con_lua_t *network_mysqld_con_lua_new() {
network_mysqld_con_lua_t *st;
st = g_new0(network_mysqld_con_lua_t, );
st->injected.queries = network_injection_queue_new();
return st;
}