用Erlang开发服务程序#
为什么用erlang#
爱立信开源和维护,跨平台。广泛用于电信、通信系统。 在金融、即时消息和游戏行业也有大量应用。二十多年不断有更新。 通信系统对可靠性要求较高,质量非常有保证。
其实理念非常超前。语言天然带并发,适合处理大量消息的自由流转。 天然支持多cpu上的大量任务(erlang进程)处理。天然带分布式支持、数据库等等。
天然适合在我们通信行业做底层服务。自带几种服务框架、debug工具,对外依赖少。 少数人力即可做出较健壮的程序。非常适合小而精的团队。
我们非常依赖的rabbitmq和emqx是用erlang做的。whats app前期用的erlang。阿里rds也有用。值得一看。
更可以把erlang看做一套思想和一套框架。和已学的技术完全不冲突。
函数式编程。代码较为简洁严谨且优美。开阔眼界。
为什么不用erlang#
不适合复杂多变的业务逻辑和大量的数学计算。
函数式编程和常用语言有一定区别,有的人不适应。
erlang程序的创建#
erlang程序的创建和发布还是有些繁琐。 运行erlang程序需要虚拟机,虚拟机之上的各种库。虚拟机本身还有版本区分。 erlang程序一般以一个约定目录树形式发布。 可参考自己机器上的erlang安装目录,结构基本是一样的。
看一下erlang的Application的概念 https://erlang.org/doc/design_principles/applications.html 大致就是得写好start和stop两个回调函数。 另外做一系列配置,比如依赖的第三方代码,和各种release配置和编译配置等等。
rebar3#
手写配置很繁琐,rebar3是较流行的erlang编译、发布工具。可减缓痛苦。 可以先用上rebar3 https://rebar3.org/ ,跑通流程。具体的细节后续再学。
rebar3可以生成几种程序。
app 带监控树的OTP程序,单个程序。
lib 不带监控树的OTP库程序,用于组合起多个模组。
release 可包含多个独立的OTP程序,组成一个大型程序。
escript 单个程序一种形式。作为脚本运行。
plugin 插件
我们起一个release rebar3 new release tcplab
进入生成的目录 看其中的rebar.config
erl_opts 指定编译选项。
deps 指定依赖的库或源码
relx release的详细配置。
profiles 区分不同目的的配置。比如生产和测试用不同的配置。
这时可以在tcplab目录下编译项目
rebar3 compile
tcplab/_build/default/lib/tcplab/ebin 下会生成编译好的beam文件和一个tcplab.app
tcplab.app是由apps/tcplab/src/tcplab.app.src生成的。
在tcplab.app.src需要填好tcplab依赖的库。
输入rebar3 shell可在shell中运行测试当前项目。
rebar3 shell
===> Verifying dependencies...
===> Analyzing applications...
===> Compiling tcplab
===> Booted tcplab
===> Booted sasl
Eshell V12.0 (abort with ^G)
运行application:which_applications().可以看到包含了一些其他的库crypto、ssl等。 这些是rebar3起的,如果我们自己手动运行,是不会有的。
1> application:which_applications().
[{sasl,"SASL CXC 138 11","4.1"},
{tcplab,"An OTP application","0.1.0"},
{inets,"INETS CXC 138 49","7.4"},
{ssl,"Erlang/OTP SSL application","10.4"},
{public_key,"Public key infrastructure","1.11"},
{asn1,"The Erlang ASN1 compiler version 5.0.16","5.0.16"},
{crypto,"CRYPTO","5.0"},
{stdlib,"ERTS CXC 138 10","3.15"},
{kernel,"ERTS CXC 138 10","8.0"}]
用rebar3 release
发布
成功后_build/default下生成rel/tcplab目录。
执行bin目录下的tcplab,可看到需要指定command。
用tcplab console启动,进入shell环境。
用application:which_applications().可看到实际运行信息。
除了console方式还有各种启动方式和命令。比如foreground、reboot、daemon、status等等。
rebar.config里relx默认的mode是dev, http://rebar3.org/docs/deployment/releases/
dev意味着include_erts会设成false,即erlang运行时库erts不会包含在release里。实际用的是系统的erlang运行时。这样dev版本是很小的。
用rebar3 as prod release
可指定带上erts。
配置#
我们就用默认配置。加上依赖的第三方库即可。
在rebar.config的deps加上
{cowboy, {git, "https://github.com/ninenines/cowboy.git", {tag, "2.9.0"}}}, {jsone, {git, "https://github.com/sile/jsone.git", {tag, "1.6.1"}}}
再运行rebar3 deps
可自动下载并显示依赖的库。
用rebar3 shell
即可运行并调试。
需求#
开发一个tcp调试服务器tcplab。
需要起一个websocket服务。网页上的websocket客户端接入服务后,在随机端口启动一个独立的tcp服务,并显示其ip端口。
其他任意对端(默认为我们的模块)可连上这个tcp服务器。
接入某个tcp服务的所有客户端与相应的websocket客户端形成群组,可群发消息。
网页端可显示群组消息、客户端列表,可主动断开某个客户端。
其他配置,如超时、服务器/客户端的数量限制等等。
websocket服务用cowboy Cowboy是erlang的一个著名的http服务程序库。支持http/http2/websocket等。 https://github.com/ninenines/cowboy https://ninenines.eu/
简单的架构图
tcplab
|
|
|
cowboy ws server
| |----------------------群组消息-----|
| | |
|----------->ws_client_1----创建--->tcp server |
| | |
| |---------->tcp_client_1
| | |
|----------->ws_client_2--- |---------->tcp_client_2
| | |
| |---------->tcp_client_3
| | |
|----------->... |---------->...
| |
实现#
rebar3 new release tcplab
创建项目后src下会生成tcplab_sup.erl和tcplab_app.erl。
tcplab_sup.erl我改名成了tcplab_super.erl。
tcplab_app#
tcplab_app.erl实现Application。 在start里启动tcplab_super这个supervisor即可。
tcplab_app.erl
-module(tcplab_app).
-behaviour(application).
-export([start/2, stop/1]).
start(_StartType, _StartArgs) ->
io:format("tcplab_app start ~p ~p ~p~n", [_StartType, _StartArgs, self()]),
{ok, Started} = application:ensure_all_started(cowboy), % 不加这个会出现noproc。查原因据说是cowboy未初始化完成。
io:format("Started application ~p~n", [Started]),
% 启动super
tcplab_super:start_link().
stop(_State) ->
ok.
application:ensure_all_started(cowboy)这个有点坑。据查是要等待cowboy初始化好。否则会出现noproc。 暂不深究。
tcplab_super#
tcplab_super是一个supervisor。需要完成init回调,填写需要启动的子进程。
tcplab_super.erl
-module(tcplab_super).
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
-define(SERVER, ?MODULE).
-import(http_app, [start/0]).
-import(tcp_server, [start/1]).
start_link() ->
io:format("tcplab_super start_link ~p~n", [self()]),
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
init([]) ->
io:format("tcplab_super init ~p~n", [self()]),
SupFlags = #{strategy => one_for_one,
intensity => 1,
period => 1},
Manager = #{id => manager,
start => {manager, start, []},
restart => permanent,
shutdown => 2000,
type => worker,
modules => [manager]},
WS = #{id => ws6666666,
start => {ws_server, start, [9999]},
restart => permanent,
shutdown => 2000,
type => worker,
modules => [ws_server]},
{ok, {SupFlags, [Manager, WS]}}.
起了一个ws_server:start和一个manager:start。
ws_server#
用cowboy起websocket服务。端口为tcplab_super启动时传的9999。
创建ets存储tcp_server_data。存每个websocket客户端进程id、对应的tcp服务id,和所有的tcp客户端。只需这么一个表就可以实现后续的各种查找。
ws_server.erl
-module(ws_server).
-author("raide").
%% API
-export([start/1]).
start(PORT) ->
io:format("ws_server start at ~p ~p~n", [PORT, self()]),
ets:new(tcp_server_data, [set, public, named_table]),
% 设置路由和handler
Dispatch = cowboy_router:compile([
{'_', [{"/ws", ws_handler, []}]}
]),
% 启动服务
{ok, Pid} = cowboy:start_clear(tcplab_ws, [{port, PORT}], #{
env => #{dispatch => Dispatch}
}),
{ok, Pid}.
ws_handler#
cowboy也是遵循otp思想。用各种回调实现功能。请看一下cowboy的文档和例子。
init回调 新客户端接入会走init。可以获取对端的ip端口等信息。 注意ws_server里启动的方式,ws和http并没有区别。 区别在于init的返回。默认返回ok的话是http服务,如果返回cowboy_websocket,会转化为websocket服务。 init里可以返回一些wb的配置,比如idle时间。
websocket_init回调 转化为websocket服务后,会走websocket_init。此时的pid即是ws客户端的pid,可以在这里做业务初始化。 在这里起此ws客户端对应的tcp_server。
websocket_handle回调 所有ws对端发来的消息会走websocket_handle回调。 在这里解数据进行处理。
websocket_info回调 其他erlang消息会走这里。 比如要发数据给ws对端,可以在这里实现。 websocket_开头的几个回调都可以根据返回值给对端发数据。 例如返回{[{text, Msg}], State};就会发Msg。返回{[], State}.就不发。
terminate回调 ws客户端断开时会走terminate。需要根据不同情况做善后处理。
ws_handler.erl
-module(ws_handler).
-author("raide").
%% API
-export([init/2, websocket_handle/2, websocket_info/2, websocket_init/1, terminate/3]).
-import(tcp_server, [start/1]).
init(Req, State) ->
% 新的ws客户端接入
io:format("ws_handler init ~p~n", [self()]),
%io:format("Req ~p~n", [Req]),
io:format("State ~p~n", [State]),
% 获取client信息
#{peer := Peer, pid := ClientPid} = Req,
io:format("Peer ~p~n", [Peer]),
% 设置超时时间
Opts = #{idle_timeout => 1000000},
NewState = {Peer, ClientPid},
{cowboy_websocket, Req, NewState, Opts}.
websocket_init(State) ->
% init进程会很快销毁。如果要起进程,可在此起。
io:format("ws_handler websocket_init ~p ~p~n", [State, self()]),
{Peer, ClientPid} = State,
% 向ClientPid发消息会走websocket_info。
% 注意self()与收消息的进程不是同一个。
% https://ninenines.eu/docs/en/cowboy/2.9/manual/cowboy_websocket/
% websocket_info返回{[{text, Msg}], State}即可向ws客户端发数据。
% 新的ws客户端接入后起一个tcp_server
% 连上tcp_server的所有tcp_client加ws_client,形成消息群组。
TcpPort = rand:uniform(10000) + 50000,
%TcpServerPid = spawn(tcp_server, start, [TcpPort]), 这样会出noproc
case tcp_server:start(TcpPort, self()) of
{ok, _TcpServerPid} ->
% 可能有微小的时序瑕疵。
% 暂时以Port作为唯一标识
ets:insert(tcp_server_data, {TcpPort, {self(), dict:new()}}),
NewState = {Peer, TcpPort, ClientPid},
{[], NewState};
{error} -> % listen失败。比如端口被占用。
{[{text, "busy"}], State}
end.
% ws客户端消息
websocket_handle({text, Msg}, State) ->
io:format("websocket_handle 1 Msg ~p~n", [Msg]),
io:format("websocket_handle 1 State ~p~n", [State]),
{_, ID, _} = State, % {Peer, ID, ClientPid}
try jsone:decode(Msg) of
JsonData ->
case maps:is_key(<<"msg_type">>, JsonData) of
true ->
#{<<"msg_type">> := MsgType} = JsonData,
case MsgType of
4 -> % 4为约定的命令id。踢掉tcp client。
#{<<"ip">> := Ip, <<"port">> := Port} = JsonData,
io:format("websocket_handle 1 Ip ~p ~p~n", [Ip, binary_to_list(Ip)]),
% 通知管理
manager ! {tcplab_kick_tcp_client, ID, {binary_to_list(Ip), Port}},
{[], State};
5 -> % 5为约定的命令id。标识普通群组消息。
#{<<"data">> := Data} = JsonData,
% 通知管理
manager ! {tcplab_group_msg, ws_client, self(), ID, Data},
{[], State}
end;
false ->
io:format("websocket_handle msg_type not found~n")
end
catch
Error:Reason ->
io:format("websocket_handle jsone:decode error ~p ~p~n", [Error, Reason])
end;
websocket_handle(_Data, State) ->
io:format("websocket_handle 2~n"),
io:format("websocket_handle 2 _Data ~p~n", [_Data]),
io:format("websocket_handle 2 State ~p~n", [State]),
{[], State}.
% erlang消息
websocket_info({tcplab_new_tcp_server, Peer}, State) -> % 新的tcp server
{Ip, Port} = Peer,
Msg = jsone:encode(#{<<"msg_type">> => 0, <<"error">> => 0, <<"ip">> => iolist_to_binary(Ip), <<"port">> => Port}),
%io:format("~p~n", [MSG]),
{[{text, Msg}], State};
websocket_info({tcplab_new_tcp_client, Peer}, State) -> % 新的tcp client
{Ip, Port} = Peer,
Msg = jsone:encode(#{<<"msg_type">> => 1, <<"error">> => 0, <<"ip">> => iolist_to_binary(Ip), <<"port">> => Port}),
%io:format("~p~n", [MSG]),
{[{text, Msg}], State};
websocket_info({tcplab_group_msg, _FromPid, Peer, Data}, State) -> % 群组消息
io:format("websocket_info group_msg~n"),
{Ip, Port} = Peer,
try jsone:try_encode(#{<<"msg_type">> => 3, <<"error">> => 0, <<"ip">> => iolist_to_binary(Ip), <<"port">> => Port, <<"data">> => Data}) of
Msg ->
{[{text, Msg}], State}
catch
Error:Reason ->
io:format("websocket_info jsone:try_encode error ~p ~p~n", [Error, Reason])
end;
websocket_info({tcplab_tcp_client_closed, Peer}, State) -> % client断开
{Ip, Port} = Peer,
Msg = jsone:encode(#{<<"msg_type">> => 2, <<"error">> => 0, <<"ip">> => iolist_to_binary(Ip), <<"port">> => Port}),
%io:format("~p~n", [MSG]),
{[{text, Msg}], State};
websocket_info(_Info, State) -> % 其他
io:format("websocket_info unknonw ~p~p~n", [_Info, self()]),
{[], State}.
clean_ws_client(State) ->
io:format("clean_ws_client State ~p~n", [State]),
{_, ID, _} = State, % {Peer, ID, ClientPid}
[{_, {_, Clients}} | _] = ets:lookup(tcp_server_data, ID),
% 关闭所有tcp client
ClientList = dict:to_list(Clients),
[TcpClientPid ! {tcplab_close_socket} || {_, TcpClientPid} <- ClientList],
ets:delete(tcp_server_data, ID),
ok.
% client断开后会走terminate回调。进行清理。
terminate(timeout, _PartialReq, State) ->
io:format("ws_handler terminate client timeout~n"),
clean_ws_client(State);
terminate({remote, _, _}, _PartialReq, State) ->
io:format("ws_handler terminate client remote close~n"),
clean_ws_client(State);
terminate(Reason, PartialReq, State) ->
io:format("ws_handler terminate~n"),
io:format("ws_handler terminate Reason ~p~n", [Reason]),
io:format("ws_handler terminate PartialReq ~p~n", [PartialReq]),
io:format("ws_handler terminate State ~p~n", [State]),
clean_ws_client(State).
tcp_server#
tcp_server实现一个tcp服务。
在websocket_init会用tcp_server:start启动tcp服务器。
tcp服务器开始accept,每次accept成功会进client_loop。同时新起一个进程继续accept。
即每个客户端都在各自的进程进行处理。
client_loop处理对端tcp数据和tcplab各种功能消息。
tcp_server.erl
-module(tcp_server).
-author("raide").
%% API
-export([start/2]).
client_loop(Socket, ID, Peer) ->
io:format("tcp_server client_loop ~p ~p~n", [Peer, self()]),
% 收各种消息
receive
{tcp_closed, Socket} -> % socket关闭
io:format("tcp_server recv tcp_closed ~n"),
manager ! {tcplab_tcp_client_closed, ID, self(), Peer};
{tcp, Socket, Data} -> % 正常的tcp数据
io:format("tcp_server recv ~p~n", [Data]),
% echo
%gen_tcp:send(Socket, Data),
% active once模式。每次收到消息后要主动设置一下,才会收到后续消息。
%inet:setopts(Socket, [{active, once}]),
% 通知管理
manager ! {tcplab_group_msg, tcp_client, self(), ID, Peer, Data},
% 继续loop
client_loop(Socket, ID, Peer);
{tcplab_group_msg, _FromClientPid, Data} -> % 群组消息
io:format("tcp_server group_msg ~p~n", [Data]),
gen_tcp:send(Socket, Data),
client_loop(Socket, ID, Peer);
tcplab_close_socket ->
io:format("tcp_server tcplab_close_socket~n"),
gen_tcp:close(Socket),
manager ! {tcplab_tcp_client_closed, ID, self(), Peer}
end.
% ID作为tcp server的唯一标识
start_accept(ListenSocket, ID) ->
io:format("tcp_server start_accept ~p ~p~n", [self(), ListenSocket]),
% 开始accept。一旦成功马上另起一个accept进程。自己继续处理连上来的socket。
case gen_tcp:accept(ListenSocket) of
{ok, Socket} ->
case inet:peername(Socket) of
{ok, Peer} ->
io:format("tcp_server new client ~p~n", [Peer]),
{{Ip0, Ip1, Ip2, Ip3}, Port} = Peer,
Ip = integer_to_list(Ip0) ++ "." ++ integer_to_list(Ip1) ++ "." ++ integer_to_list(Ip2) ++ "." ++ integer_to_list(Ip3),
IpPort = {Ip, Port},
spawn(fun() -> start_accept(ListenSocket, ID) end),
io:format("tcp_server_data -> ~p~n", [ets:tab2list(tcp_server_data)]),
% 添加client到ets
[{_, {WsClientPid, Clients}} | _] = ets:lookup(tcp_server_data, ID),
%io:format("lookup ~p~n", [Clients]),
NewClients = dict:store(IpPort, self(), Clients),
%io:format("new GG ~p~n", [NewClients]),
ets:insert(tcp_server_data, {ID, {WsClientPid, NewClients}}),
WsClientPid ! {tcplab_new_tcp_client, IpPort},
% 起loop后继续accept
client_loop(Socket, ID, IpPort);
%start_accept(Listen);
{error, POSIXErrorCode} ->
io:format("tcp_server peername() failed ~p~n", [POSIXErrorCode]),
{error}
end;
{error, Reason} ->
io:format("gen_tcp:accept failed ~p~n", [Reason]),
{error}
end.
start(Port, PPid) -> % 启动tcp服务器的入口
io:format("tcp_server start at port ~p ~p~n", [Port, self()]),
% 标准的socket流程。先listen。第二个参数是各种选项。binary表示数据包是以二进制数据展示。
% {packet, N}是erlang处理tcp包分包的一种简单机制,一包前N个字节规定包的总长度,从而定下边界。
% {active, once}。socket是active还是passive。可选true,false,once,N。类似预取消息数。
case gen_tcp:listen(Port, [binary, {packet, 0}, {reuseaddr, false}, {active, true}]) of
{ok, ListenSocket} ->
io:format("tcp_server start ~p~n", [ListenSocket]),
% listen成功后开始start_accept
% 暂用Port作为tcp server的唯一标识
Pid = spawn(fun() -> start_accept(ListenSocket, Port) end),
PPid ! {tcplab_new_tcp_server, {<<"0.0.0.0">>, Port}},
{ok, Pid};
{error, Reason} ->
io:format("gen_tcp:listen failed ~p~n", [Reason]),
{error}
end.
manager#
完全在tcp_server和ws_handler里处理业务的话可能会越来越混乱。 可以做一个manager看情况管理一下消息和时序,让逻辑变得清晰。
比如tcp客户端发了一个消息,需要群发给群组成员。 这时可以把群发任务交给manager处理,而不是原地阻塞处理。
还有一些接入断开之类的消息也可以交给manager统一处理,更新数据。
-module(manager).
-author("raide").
%% API
-export([start/0]).
-import(tcp_server, [start/1]).
loop() ->
io:format("manager loop~n"),
receive
{tcplab_group_msg, tcp_client, FromPid, ID, Peer, Data} ->
% tcp client发了数据。这里要给所属tcp服务器的每个client发消息。
% 也要通知对应的ws client
% 获取数据
[{_, {WsClientPid, Clients}} | _] = ets:lookup(tcp_server_data, ID),
% 发给ws client
WsClientPid ! {tcplab_group_msg, FromPid, Peer, Data},
% 发给所有tcp client
ClientList = dict:to_list(Clients),
[TcpClientPid ! {tcplab_group_msg, FromPid, Data} || {_, TcpClientPid} <- ClientList],
loop();
{tcplab_group_msg, ws_client, FromWsClientPid, ID, Data} ->
% tcp client发了数据。这里要给所属tcp服务器的每个client发消息。
% 也要通知对应的ws client
[{_, {_WsClientPid, Clients}} | _] = ets:lookup(tcp_server_data, ID),
% 发给所有tcp client
ClientList = dict:to_list(Clients),
io:format("manager loop ClientList-> ~p~n", [ClientList]),
[TcpClientPid ! {tcplab_group_msg, FromWsClientPid, Data} || {_, TcpClientPid} <- ClientList],
loop();
{tcplab_tcp_client_closed, ID, _ClientPid, Peer} ->
% client tcp_closed
% 删除client
[{_, {WsClientPid, Clients}} | _] = ets:lookup(tcp_server_data, ID),
%io:format("lookup ~p~n", [Clients]),
NewClients = dict:erase(Peer, Clients),
%io:format("new GG ~p~n", [NewClients]),
ets:insert(tcp_server_data, {ID, {WsClientPid, NewClients}}),
WsClientPid ! {tcplab_tcp_client_closed, Peer},
loop();
{tcplab_kick_tcp_client, ID, Peer} ->
% 主动关闭一个tcp client
io:format("tcp_server_data -> ~p~n", [ets:tab2list(tcp_server_data)]),
[{_, {_, Clients}} | _] = ets:lookup(tcp_server_data, ID),
case dict:find(Peer, Clients) of
{ok, ClientPid} ->
ClientPid ! tcplab_close_socket;
error ->
io:format("tcplab_kick_tcp_client Peer not found ~p~n", [Peer])
end,
loop();
Any ->
io:format("manager loop Any-> ~p~n", [Any]),
loop()
end.
manager_loop() ->
io:format("manager_loop start ~p~n", [self()]),
register(manager, self()),
loop().
start() ->
io:format("manager start ~p~n", [self()]),
{ok, spawn(fun() -> manager_loop() end)}.