第十四章 套接字编程
Table of Contents
第十四章 套接字编程
14.1 使用TCP
14.1.1 从服务器上获取数据
-module(socket_examples). -export([nano_get_url/0]). -import(lists, [reverse/1]). nano_get_url() -> nano_get_url("www.google.com"). nano_get_url(Host) -> %% 链接到主机的80端口, 以二进制模式打开套接字, 原始方式发送TCP数据 {ok, Socket} = gen_tcp:connect(Host, 80, [binary, {packet, 0}]), %% 发送GET消息到套接字, 使用reverse_data接收数据 ok = gen_tcp:send(Socket, "GET / HTTP/1.0\r\n\r\n"), receive_data(Socket, []). receive_data(Socket, SoFar) -> %% 回应消息一帧一帧的返回, 因此这里使用receive方式接收 receive {tcp, Socket, Bin} -> %% 将接收到的数据添加到列表SoFar中 receive_data(Socket, [Bin|SoFar]); {tcp_closed, Socket} -> %% 因为每接收一帧数据都是放在SoFar的头部, 因此接收完成后需要翻转列表得到正常顺序的数据 list_to_binary(reverse(SoFar)) end.
运行结果:
1> socket_examples:nano_get_url(). <<"HTTP/1.0 200 OK\r\nDate: Mon, 04 Nov 2013 02:32:00 GMT\r\nExpires: -1\r\nCache-Control: private, max-age=0\r\nContent-Type: "...>>
14.1.2 一个简单的TCP服务器
服务端:
start_nano_server() -> %% 监听来自端口2345的链接, 设置包规则为带有4字节长的包头 {ok, Listen} = gen_tcp:listen(2345, [binary, {packet, 4}, {reuseaddr, true}, {active, true}]), %% 只处理正常打开的套接字 {ok, Socket} = gen_tcp:accept(Listen), %% 只处理一个链接 gen_tcp:close(Listen), %% 链接处理 loop(Socket). loop(Socket) -> receive {tcp, Socket, Bin} -> %% 输出二进制数据 io:format("Server received binary = ~p~n", [Bin]), %% 格式转换 Str = binary_to_term(Bin), io:format("Server (unpacked) ~p~n", [Str]), %% 对字符串求值 Reply = string2value(Str), io:format("Server replying = ~p~n", [Reply]), %% 对结果编码后发给套接字 gen_tcp:send(Socket, term_to_binary(Reply)), loop(Socket); {tcp_closed, Socket} -> io:format("Server socket closed~n") end.
客户端:
nano_client_eval(Str) -> %% 链接指定主机的2345端口, 发送数据时包头设置为4字节长 {ok, Socket} = gen_tcp:connect("localhost", 2345, [binary, {packet, 4}]), %% 调用term_to_binary进行数据转换后向服务端发送数据 ok = gen_tcp:send(Socket, term_to_binary(Str)), receive %% 接收返回并输出 {tcp, Socket, Bin} -> io:format("Client received binary = ~p~n", [Bin]), Val = binary_to_term(Bin), io:format("Client result = ~p~n", [Val]), gen_tcp:close(Socket) end.
运行结果:
# 首先启动服务端 1> socket_examples:start_nano_server(). # 然后打开另一个erl窗口启动客户端 # 随后客户端将服务端的计算结果接收后打印输出 1> socket_examples:nano_client_eval("list_to_tuple([2+3*4, 10+20])"). Client received binary = <<131,104,2,97,14,97,30>> Client result = {14,30} ok # 切换到服务端的erl窗口可以看到如下输出 Server received binary = <<131,107,0,29,108,105,115,116,95,116,111,95,116,117, 112,108,101,40,91,50,43,51,42,52,44,32,49,48,43,50, 48,93,41>> Server (unpacked) "list_to_tuple([2+3*4, 10+20])" Server replying = {14,30} Server socket closed ok
而这里服务端对客户端提交的字符串表达式进行计算的实现在string2value函数中
string2value(Str) -> %% 按字符分解字符串 {ok, Tokens, _} = erl_scan:string(Str ++ "."), %% 生成解析表达式 {ok, Exprs} = erl_parse:parse_exprs(Tokens), Bindings = erl_eval:new_bindings(), %% 运行表达式 {value, Value, _} = erl_eval:exprs(Exprs, Bindings), Value.
14.1.3 改进服务器
- 顺序型服务器
一次只接收一个连接%% 接收连接后处理请求然后再次调用seq_loop等待下一个连接 start_seq_server() -> {ok, Listen} = gen_tcp:listen(2345, [binary, {packet, 4}, {reuseaddr, true}, {active, true}]), seq_loop(Listen). seq_loop(Listen) -> {ok, Socket} = gen_tcp:accept(Listen), loop(Socket), seq_loop(Listen).
- 并行服务器
一次可以接收多个并行连接%% 接收连接后启动新的进程来处理套接字 start_parallel_server() -> {ok, Listen} = gen_tcp:listen(2345, [binary, {packet, 4}, {reuseaddr, true}, {active, true}]), spawn(fun() ->par_connect(Listen) end). par_connect(Listen) -> {ok, Socket} = gen_tcp:accept(Listen), spawn(fun() ->par_connect(Listen) end), loop(Socket).
14.2 控制逻辑
14.2.1 主动型消息接收(非阻塞)
建立主动套接字后, 一个独立的客户机可能向服务端无限制的发送成千上万条消息, 如果超过了服务器的处理速度, 则可能导致系统崩溃。因为其不会阻塞客户端, 因此被称为异步服务器, 实现形式如下:
%% 设置active为true即为异步方式 {ok, Listen} = gen_tcp:listen(Port, [..., {active, true}, ...]), {ok, Socket} = gen_tcp:accept(Listen), loop(Socket). loop(Socket) -> receive {tcp, Socket, Data} -> %% 数据处理 {tcp_closed, Socket} -> ... end.
14.2.2 被动型消息接收(阻塞)
建立被动套接字后, 只有服务端调用gen_tcp:recv(Socket, N)时才会接收来自套接字的数据, 且只接收N字节的数据, 因此不会因为客户端的大量请求而导致崩溃, 实现形式如下:
%% 设置active为false即为阻塞方式 {ok, Listen} = gen_tcp:listen(Port, [..., {active, false}, ...]), {ok, Socket} = gen_tcp:accept(Listen), loop(Socket). loop(Socket) -> case gen_tcp:recv(Socket, N) of {ok, B} -> %% 数据处理 loop(Socket); {error, closed} -> ... end.
14.2.3 混合型模式(半阻塞)
半阻塞模式的套接字是主动的但仅针对一个消息, 需要显式的调用inet:setopts重新激活以便接收下一个消息, 在此之前系统将处于阻塞状态, 实现形式如下:
%% 设置active为once即为异步方式 {ok, Listen} = gen_tcp:listen(Port, [..., {active, once}, ...]), {ok, Socket} = gen_tcp:accept(Listen), loop(Socket). loop(Socket) -> receive {tcp, Socket, Data} -> %% 数据处理 inet:setopts(Socket, [{active, once}]), loop(Socket); {tcp_closed, Socket} -> ... end.
14.3 连接从何而来
使用函数inet:peername(Socket)可以获取客户端信息。
inet:peername(Socket) -> {ok, {IP_Address, Port} | {error, Why}}
14.4 套接字的出错处理
测试代码
%% 服务端接收数据后调用atom_to_list处理数据 error_test_server() -> {ok, Listen} = gen_tcp:listen(4321, [binary, {packet, 2}]), {ok, Socket} = gen_tcp:accept(Listen), error_test_server_loop(Socket). error_test_server_loop(Socket) -> receive {tcp, Socket, Data} -> io:format("received:~p~n", [Data]), atom_to_list(Data), error_test_server_loop(Socket) end. %% 客户端连接后发生二进制数据使atom_to_list发生异常 error_test() -> spawn(fun() ->error_test_server() end), sleep(2000), {ok, Socket} = gen_tcp:connect("localhost", 4321, [binary, {packet, 2}]), io:format("connected to:~p~n", [Socket]), gen_tcp:send(Socket, <<"123">>), receive Any -> io:format("Any=~p~n", [Any]) end.
运行结果:
# 服务端异常结果 1> socket_examples:error_test_server(). received:<<"123">> exception error: bad argument in function atom_to_list/1 called as atom_to_list(<<"123">>) in call from socket_examples:error_test_server_loop/1 (socket_examples.erl, line 120) # 客户端异常结果 1> socket_examples:error_test(). =ERROR REPORT==== 5-Nov-2013::10:19:27 === Error in process <0.50.0> with exit value: {{badmatch,{error,eaddrinuse}},[{socket_examples,error_test_server,0,[{file,"socket_examples.erl"},{line,113}]}]} connected to:#Port<0.2291> Any={tcp_closed,#Port<0.2291>} ok
14.5 UDP
14.5.1 最简单的UDP服务器和客户机
UDP服务器的形式
server(Port) -> {ok, Socket} = gen_udp:open(Port, [binary]), loop(Socket). loop(Socket) -> receive {udp, Socket, Host, Port, Bin} -> BinReply = ... , gen_udp:send(Socket, Host, Port, BinReply), loop(Socket) end.
UDP客户机的形式
client(Request) -> {ok, Socket} = gen_udp:open(0, [binary]), ok = gen_udp:send(Socket, "localhost", 4000, Request), Value = receive {udp, Socket, _, _, Bin} ->{ok, Bin} %% 因为UDP协议传输的不可靠性, 有可能没有得到服务端的回应, 因此这里要设置超时时间 after 2000 ->error end, gen_udp:close(Socket), Value.
14.5.2 一个计算阶乘的UDP服务器
服务端实现:
start_server() -> spawn(fun() ->server(40000) end). server(Port) -> {ok, Socket} = gen_udp:open(Port, [binary]), io:format("server opened socket:~p~n", [Socket]), loop(Socket). loop(Socket) -> receive {udp, Socket, Host, Port, Bin} = Msg -> io:format("server received:~p~n", [Msg]), N = binary_to_term(Bin), Fac = fac(N), gen_udp:send(Socket, Host, Port, term_to_binary(Fac)), loop(Socket) end. fac(0) ->1; fac(N) ->N * fac(N-1).
客户端实现:
client(N) -> {ok, Socket} = gen_udp:open(0, [binary]), io:format("client opened socket=~p~n", [Socket]), ok = gen_udp:send(Socket, "localhost", 40000, term_to_binary(N)), Value = receive {udp, Socket, _, _, Bin} = Msg -> io:format("client received:~p~n", [Msg]), binary_to_term(Bin) after 2000 ->0 end, gen_udp:close(Socket), Value.
运行结果:
1> udp_test:start_server(). server opened socket:#Port<0.2308> <0.68.0> 2> udp_test:client(40). client opened socket=#Port<0.2309> server received:{udp,#Port<0.2308>,{127,0,0,1},54449,<<131,97,40>>} client received:{udp,#Port<0.2309>, {127,0,0,1}, 40000, <<131,110,20,0,0,0,0,0,64,37,5,255,100,222,15,8,126,242, 199,132,27,232,234,142>>} 815915283247897734345611269596115894272000000000
14.5.3 关于UDP协议的其他注意事项
因为UDP数据报可能被传输两次, 因此为了避免这个问题, 可以使用make_ref函数为请求创建唯一标示。
客户端实现:
client(Request) -> {ok, Socket} = gen_udp:open(0, [binary]), Ref = make_ref(), B1 = term_to_binary(Ref, Request), ok = gen_udp:send(Socket, "localhost", 40000, B1), wait_for_ref(Socket, Ref). wait_for_ref(Socket, Ref) -> receive {udp, Socket, _, _, Bin} -> case binary_to_term(Bin) of %% 在client(Request)函数中已经为请求添加了唯一标示, 因此这里要从 {Ref, Val} 这种格式的数据中提取出真正的请求 {Ref, Val} ->Val; {_SomeOtherRef, _} -> %% 对于其他数据则不用处理 wait_for_ref(Socket, Ref) end; after 1000 -> ... end.
14.6 向多台机器广播消息
-module(broadcast). -compile(export_all). send(IoList) -> %% 获取网卡en0的IP信息 case inet:ifget("en0", [broadaddr]) of {ok, [{broadaddr, Ip}]} -> %% 打开5010端口 {ok, S} = gen_udp:open(5010, [{broadcast, true}]), %% 向本地网络的6000端口广播数据 gen_udp:send(S, Ip, 6000, IoList), gen_udp:close(S); _ -> io:format("Bad interface name, or broadcastng not supported\n") end. listen() -> %% 监听6000端口的广播 {ok, _} = gen_udp:open(6000), loop(). loop() -> receive Any -> %% 打印任何收到的信息 io:format("received:~p~n", [Any]), loop() end.
在单台机器上测试:
# 一个shell打开监听 1> broadcast:listen(). # 一个shell发送广播 1> broadcast:send(["test"]). # 可以看到监听端的输出 1> broadcast:listen(). received:{udp,#Port<0.2337>,{10,0,1,224},5010,"test"}