ranch 源码分析(三)

时间:2021-02-10 16:49:57

接上 ranch 源码分析(二)

上次讲到了ranch_conns_sup和ranch_acceptors_sup这2个ranch的核心模块,我们接着分析

首先查看ranch_conns_sup.erl

-module(ranch_conns_sup).

%% API.
-export([start_link/6]).
-export([start_protocol/2]).
-export([active_connections/1]).

%...... 省略若干行

%% API.

-spec start_link(ranch:ref(), conn_type(), shutdown(), module(),
timeout(),
module()) -> {ok, pid()}.
start_link(Ref, ConnType, Shutdown, Transport, AckTimeout, Protocol)
->
proc_lib:start_link(
?MODULE, init,
[self(), Ref, ConnType, Shutdown, Transport, AckTimeout, Protocol]).

%...... 省略若干行

%% Supervisor internals.

-spec init(pid(), ranch:ref(), conn_type(), shutdown(),
module(), timeout(), module()) -> no_return().
init(Parent, Ref, ConnType, Shutdown, Transport, AckTimeout, Protocol)
->
process_flag(trap_exit,
true),
ok
= ranch_server:set_connections_sup(Ref, self()),
MaxConns
= ranch_server:get_max_connections(Ref),
Opts
= ranch_server:get_protocol_options(Ref),
ok
= proc_lib:init_ack(Parent, {ok, self()}),
loop(#state{parent
=Parent, ref=Ref, conn_type=ConnType,
shutdown
=Shutdown, transport=Transport, protocol=Protocol,
opts
=Opts, ack_timeout=AckTimeout, max_conns=MaxConns}, 0, 0, []).

loop(State
=#state{parent=Parent, ref=Ref, conn_type=ConnType,
transport
=Transport, protocol=Protocol, opts=Opts,
max_conns
=MaxConns}, CurConns, NbChildren, Sleepers) ->
receive
{
?MODULE, start_protocol, To, Socket} ->
try Protocol:start_link(Ref, Socket, Transport, Opts) of
{ok, Pid}
->
shoot(State, CurConns, NbChildren, Sleepers, To, Socket, Pid, Pid);
{ok, SupPid, ProtocolPid}
when ConnType =:= supervisor ->
shoot(State, CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid);


%...... 省略若干行

shoot(State
=#state{ref=Ref, transport=Transport, ack_timeout=AckTimeout, max_conns=MaxConns},
CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid)
->
case Transport:controlling_process(Socket, ProtocolPid) of
ok
->
ProtocolPid
! {shoot, Ref, Transport, Socket, AckTimeout},
put(SupPid,
true),
CurConns2
= CurConns + 1,
if CurConns2 < MaxConns ->
To
! self(),
loop(State, CurConns2, NbChildren
+ 1, Sleepers);
true ->
loop(State, CurConns2, NbChildren
+ 1, [To|Sleepers])
end;
{error, _}
->
Transport:close(Socket),
%% Only kill the supervised pid, because the connection's pid,
%% when different, is supposed to be sitting under it and linked.
exit(SupPid, kill),
loop(State, CurConns, NbChildren, Sleepers)
end.

%...... 省略若干行

可以看到ranch_conns_sup不是一个典型的gen_tcp模块,

start_link/6启动init后直接使用了loop来循环,

init/7函数主要是获取了一些参数,

loop后等待消息(我们主要看start_protocol消息),loop函数根据start_protocol消息启动Protocol:start_link/4(用户编写的应用模块,这里表示echo_protocol),后面注意谁发这个消息给它

启动正常后,记录下ProtocolPid

 

继续查看ranch_acceptors_sup.erl

-module(ranch_acceptors_sup).
-behaviour(supervisor).

-export([start_link/4]).
-export([init/1]).

-spec start_link(ranch:ref(), non_neg_integer(), module(), any())
-> {ok, pid()}.
start_link(Ref, NbAcceptors, Transport, TransOpts)
->
supervisor:start_link(
?MODULE, [Ref, NbAcceptors, Transport, TransOpts]).

init([Ref, NbAcceptors, Transport, TransOpts])
->
ConnsSup
= ranch_server:get_connections_sup(Ref),
LSocket
= case proplists:get_value(socket, TransOpts) of
undefined
->
TransOpts2
= proplists:delete(ack_timeout,
proplists:delete(connection_type,
proplists:delete(max_connections,
proplists:delete(shutdown,
proplists:delete(socket, TransOpts))))),
case Transport:listen(TransOpts2) of
{ok, Socket}
-> Socket;
{error, Reason}
-> listen_error(Ref, Transport, TransOpts2, Reason)
end;
Socket
->
Socket
end,
{ok, Addr}
= Transport:sockname(LSocket),
ranch_server:set_addr(Ref, Addr),
Procs
= [
{{acceptor, self(), N}, {ranch_acceptor, start_link, [
LSocket, Transport, ConnsSup
]}, permanent, brutal_kill, worker, []}
|| N <- lists:seq(1
, NbAcceptors)],
{ok, {{one_for_one,
1, 5}, Procs}}.

-spec listen_error(any(), module(), any(), atom()) -> no_return().
listen_error(Ref, Transport, TransOpts2, Reason)
->
error_logger:error_msg(
"Failed to start Ranch listener ~p in ~p:listen(~p) for reason ~p (~s)~n",
[Ref, Transport, TransOpts2, Reason, inet:format_error(Reason)]),
exit({listen_error, Ref, Reason}).

 

这里进行最主要的操作有

打开端口Transport:listen/1开始监听端口

启动NbAcceptors个ranch_accetor:start_link进程等待连接,

 

接下来就是查看ranch_acceptor.erl

 

-module(ranch_acceptor).

-export([start_link/3]).
-export([loop/3]).

-spec start_link(inet:socket(), module(), pid())
-> {ok, pid()}.
start_link(LSocket, Transport, ConnsSup)
->
Pid
= spawn_link(?MODULE, loop, [LSocket, Transport, ConnsSup]),
{ok, Pid}.

-spec loop(inet:socket(), module(), pid()) -> no_return().
loop(LSocket, Transport, ConnsSup)
->
_
= case Transport:accept(LSocket, infinity) of
{ok, CSocket}
->
case Transport:controlling_process(CSocket, ConnsSup) of
ok
->
%% This call will not return until process has been started
%% AND we are below the maximum number of connections.
ranch_conns_sup:start_protocol(ConnsSup, CSocket);
{error, _}
->
Transport:close(CSocket)
end;
%% Reduce the accept rate if we run out of file descriptors.
%% We can't accept anymore anyway, so we might as well wait
%% a little for the situation to resolve itself.
{error, emfile} ->
receive after 100 -> ok end;
%% We want to crash if the listening socket got closed.
{error, Reason} when Reason =/= closed ->
ok
end,
flush(),
?MODULE:loop(LSocket, Transport, ConnsSup).

flush()
->
receive Msg ->
error_logger:error_msg(
"Ranch acceptor received unexpected message: ~p~n",
[Msg]),
flush()
after 0 ->
ok
end.

 

当客户端端连接,accept返回ok,ranch_conns_sup:start_protocol/2发送{?MODULE, start_protocol, self(), Socket}给ConnsSup,

 

-module(ranch_conns_sup).

%% API.
-export([start_link/6]).
-export([start_protocol/2]).
-export([active_connections/1]).

%%.......省略若干行

-spec start_protocol(pid(), inet:socket()) -> ok.
start_protocol(SupPid, Socket)
->
SupPid
! {?MODULE, start_protocol, self(), Socket},
receive SupPid -> ok end.


%%.......省略若干行

 

启动Protocol:start_link/4的详细过程,参考上面的ranch_conns_sup分析

好,这个时候基本ranch的大致结构就分析出来了,还有一些其他的细节留以后慢慢添加。