创建gen_server组解决单process瓶颈

时间:2022-07-16 00:30:21

并发和顺序是一个令人纠结的问题。

下面是开发中遇到的一个问题
常规时间,系统表现的很“端庄”,不折腾CPU,不玩弄Mem。可是到高峰时,这个家伙就开始变态了。内存狂飙,直至swap最后无法响应。这个状况,当时折腾了一天多。始终无法找到问题所在。最后通过排查及yufeng的帮助,将问题锁定在某些局部process。

Erlang中默认,所有的Process具有同等的执行机会。
我们的系统中有上万个process处理客户连接,上万个cient process的数据,通过一个data_trans prcess处理。而这个process通过Message将数据发送到其他节点。问题就是这里。

client processes将数据发送给data_trans process后,数据的处理就是一个顺序的过程了,从消息队列中获取一条数据,打包,然后发送到其他Node。显然,在client process增加时,data_trans process的数据处理能力,已经跟不上了。所以导致系统恶化,最终崩溃。

怎么解决呢?
1,可以限定系统的并发连接数,保证服务质量(因为系统某些不足,导致此方法不可行)
2,加大data_trans处理能力,减少瓶颈

至于方法2,也有很多具体的实施方法:将数据打包和数据传输部分进行功能分割;创建多个data_trans组成一个process group
其中创建多个process,对代码改动最少,所以为最终选择。

根据yufeng的建议,实现如下:
使用一个supervisour(simple_one_for_one)管理所有的data_trans进程
data_trans的数目,与scheduler数目一致(8核则数目为8)
每个data_trans name为name_N (N为 1..SchedulerNumber)
调用data_trans时,根据caller,获取当前执行scheduler的X,直接将request跳转到name_X的进程去处理.

好处:
根据scheduler数目创建进程组,减少单个进程处理瓶颈
根据scheduler id直接跳转到进程组中某个进程,减少了中间查询,实现直接映射,效率更高.


简单的示意图(假设系统4核):

引用

caller 1 (scheduler_id 3) -\   /------|- process_1 |\
                                          \/                                 \
caller 2 (scheduler_id 1) --/ \     /--|- process_2 |--
                                             \  /                                 process supervisor(simple_one_for_one)
caller 3 (scheduler_id 4) -\    / \----|- process_3 |--  
                                          \/                                /
caller 4 (scheduler_id 2) _/ \____|- process_4 |/

                                  (直接映射)


把这个东西在提升一下,抽象出一个叫gen_server_cter的behaviour,其组装多个子gen_server process,调用时,根据调用者的当前scheduler id映射到对应子process name。

gen_server_cter接口:
start_link(CterName, CbMod, Args)
启动gen_server组
参数:CterName - cter name
    CbMod - gen_server callback module
    Args - 传递给CbMod的参数

cast(CbMod, Req)
异步调用请求

call(CbMod, Req) ->
同步调用请求

其中CbMod module必须实现一个get_name/1函数,用来实现scheduler id到进程名的映射.
比如(假设CbMod为my_module)
get_name(SchedulerId) ->
    list_to_atom(lists:concat([my_module, SchedulerId])).

用法:
gen_server_cter:start_link(my_module_group, my_module, Args)
gen_server_cter:cast(my_module, Req)
gen_server_cter:call(my_module, Req)

 

 

就是下面这个module完整代码

Erlang代码 创建gen_server组解决单process瓶颈  创建gen_server组解决单process瓶颈创建gen_server组解决单process瓶颈
  1. -module(gen_server_cter).   
  2. -behaviour(supervisor).   
  3.   
  4. -export([start_link/3]).   
  5. -export([cast/2, call/2]).   
  6.   
  7. %% for supervisor   
  8. -export([init/1]).   
  9.   
  10. -export([behaviour_info/1]).   
  11.   
  12. -spec behaviour_info(atom()) -> 'undefined' | [{atom(), byte()}].   
  13. behaviour_info(callbacks) ->                           
  14.     [{get_name,1}];   
  15. behaviour_info(_Other) ->   
  16.     undefined.   
  17.   
  18. %% @doc start the server   
  19. start_link(CterName, CbMod, Args) ->   
  20.     Ret = {ok, _Pid} = supervisor:start_link({local, CterName}, ?MODULE, [{callback, CbMod}, {args, Args}]),   
  21.     %io:format("pid:~p~n", [_Pid]),   
  22.     N = erlang:system_info(schedulers),   
  23.     [{ok, _} =  supervisor:start_child(CterName, [{index, I}]) || I <- lists:seq(1, N)],   
  24.     Ret.    
  25.   
  26. cast(CbMod, Req) ->   
  27.     Handler = select_handler(CbMod),   
  28.     %io:format("handler is:~p~n", [Handler]),   
  29.     gen_server:cast(Handler, Req).   
  30.   
  31. call(CbMod, Req) ->   
  32.     Handler = select_handler(CbMod),   
  33.     gen_server:call(Handler, Req).   
  34.   
  35.   
  36. %%   
  37. %% supervisor callbacks   
  38. %%   
  39. init([{callback, CbMod}, {args, Args} | _]) ->    
  40.     Strategy = {simple_one_for_one, 1010},   
  41.     Mod = {undefined, {CbMod, start_link, Args},   
  42.           permanent, 3000, worker, [CbMod]},   
  43.     {ok, {Strategy, [Mod]}}.   
  44.   
  45. %% internal API   
  46. select_handler(CbMod) ->   
  47.     I = erlang:system_info(scheduler_id),   
  48.     CbMod:get_name(I).  

 

update(2009.11.24):

在callback模块中,需要做一些小改动,需要添加一个export函数:

get_name(N :: integer()) -> atom().

返回此server对应的name

 

还需要修改start_link为:

start_link({index, I}) -> ....