用Erlang开发服务程序#

为什么用erlang#

  • 爱立信开源和维护,跨平台。广泛用于电信、通信系统。 在金融、即时消息和游戏行业也有大量应用。二十多年不断有更新。 通信系统对可靠性要求较高,质量非常有保证。

  • 其实理念非常超前。语言天然带并发,适合处理大量消息的自由流转。 天然支持多cpu上的大量任务(erlang进程)处理。天然带分布式支持、数据库等等。

  • 天然适合在我们通信行业做底层服务。自带几种服务框架、debug工具,对外依赖少。 少数人力即可做出较健壮的程序。非常适合小而精的团队。

  • 我们非常依赖的rabbitmq和emqx是用erlang做的。whats app前期用的erlang。阿里rds也有用。值得一看。

  • 更可以把erlang看做一套思想和一套框架。和已学的技术完全不冲突。

  • 函数式编程。代码较为简洁严谨且优美。开阔眼界。

为什么不用erlang#

  • 不适合复杂多变的业务逻辑和大量的数学计算。

  • 函数式编程和常用语言有一定区别,有的人不适应。


erlang程序的创建#

  • erlang程序的创建和发布还是有些繁琐。 运行erlang程序需要虚拟机,虚拟机之上的各种库。虚拟机本身还有版本区分。 erlang程序一般以一个约定目录树形式发布。 可参考自己机器上的erlang安装目录,结构基本是一样的。

  • 看一下erlang的Application的概念 https://erlang.org/doc/design_principles/applications.html 大致就是得写好start和stop两个回调函数。 另外做一系列配置,比如依赖的第三方代码,和各种release配置和编译配置等等。

rebar3#

  • 手写配置很繁琐,rebar3是较流行的erlang编译、发布工具。可减缓痛苦。 可以先用上rebar3 https://rebar3.org/ ,跑通流程。具体的细节后续再学。

  • rebar3可以生成几种程序。

    • app 带监控树的OTP程序,单个程序。

    • lib 不带监控树的OTP库程序,用于组合起多个模组。

    • release 可包含多个独立的OTP程序,组成一个大型程序。

    • escript 单个程序一种形式。作为脚本运行。

    • plugin 插件

我们起一个release rebar3 new release tcplab

进入生成的目录 看其中的rebar.config

erl_opts 指定编译选项。

deps 指定依赖的库或源码

relx release的详细配置。

profiles 区分不同目的的配置。比如生产和测试用不同的配置。

这时可以在tcplab目录下编译项目 rebar3 compile

tcplab/_build/default/lib/tcplab/ebin 下会生成编译好的beam文件和一个tcplab.app

tcplab.app是由apps/tcplab/src/tcplab.app.src生成的。

在tcplab.app.src需要填好tcplab依赖的库。

输入rebar3 shell可在shell中运行测试当前项目。

rebar3 shell
===> Verifying dependencies...
===> Analyzing applications...
===> Compiling tcplab
===> Booted tcplab
===> Booted sasl
Eshell V12.0  (abort with ^G)

运行application:which_applications().可以看到包含了一些其他的库crypto、ssl等。 这些是rebar3起的,如果我们自己手动运行,是不会有的。

1> application:which_applications().
[{sasl,"SASL  CXC 138 11","4.1"},
 {tcplab,"An OTP application","0.1.0"},
 {inets,"INETS  CXC 138 49","7.4"},
 {ssl,"Erlang/OTP SSL application","10.4"},
 {public_key,"Public key infrastructure","1.11"},
 {asn1,"The Erlang ASN1 compiler version 5.0.16","5.0.16"},
 {crypto,"CRYPTO","5.0"},
 {stdlib,"ERTS  CXC 138 10","3.15"},
 {kernel,"ERTS  CXC 138 10","8.0"}]

rebar3 release发布

成功后_build/default下生成rel/tcplab目录。

执行bin目录下的tcplab,可看到需要指定command。

用tcplab console启动,进入shell环境。

用application:which_applications().可看到实际运行信息。

除了console方式还有各种启动方式和命令。比如foreground、reboot、daemon、status等等。

rebar.config里relx默认的mode是dev, http://rebar3.org/docs/deployment/releases/

dev意味着include_erts会设成false,即erlang运行时库erts不会包含在release里。实际用的是系统的erlang运行时。这样dev版本是很小的。

rebar3 as prod release可指定带上erts。


配置#

我们就用默认配置。加上依赖的第三方库即可。

在rebar.config的deps加上 {cowboy, {git, "https://github.com/ninenines/cowboy.git", {tag, "2.9.0"}}},  {jsone, {git, "https://github.com/sile/jsone.git", {tag, "1.6.1"}}}

再运行rebar3 deps可自动下载并显示依赖的库。

rebar3 shell即可运行并调试。


需求#

  • 开发一个tcp调试服务器tcplab。

    • 需要起一个websocket服务。网页上的websocket客户端接入服务后,在随机端口启动一个独立的tcp服务,并显示其ip端口。

    • 其他任意对端(默认为我们的模块)可连上这个tcp服务器。

    • 接入某个tcp服务的所有客户端与相应的websocket客户端形成群组,可群发消息。

    • 网页端可显示群组消息、客户端列表,可主动断开某个客户端。

    • 其他配置,如超时、服务器/客户端的数量限制等等。

  • websocket服务用cowboy Cowboy是erlang的一个著名的http服务程序库。支持http/http2/websocket等。 https://github.com/ninenines/cowboy https://ninenines.eu/

简单的架构图

           tcplab
              |
              |
              |
        cowboy ws server
              |                   |----------------------群组消息-----|
              |                   |                                   |
              |----------->ws_client_1----创建--->tcp server          |
              |                                       |               |
              |                                       |---------->tcp_client_1
              |                                       |               |
              |----------->ws_client_2---             |---------->tcp_client_2
              |                                       |               |
              |                                       |---------->tcp_client_3
              |                                       |               |
              |----------->...                        |---------->...
              |                                       |




实现#

rebar3 new release tcplab创建项目后src下会生成tcplab_sup.erl和tcplab_app.erl。 tcplab_sup.erl我改名成了tcplab_super.erl。

tcplab_app#

tcplab_app.erl实现Application。 在start里启动tcplab_super这个supervisor即可。

tcplab_app.erl

-module(tcplab_app).

-behaviour(application).

-export([start/2, stop/1]).

start(_StartType, _StartArgs) ->
    io:format("tcplab_app start ~p ~p ~p~n", [_StartType, _StartArgs, self()]),
    {ok, Started} = application:ensure_all_started(cowboy), % 不加这个会出现noproc。查原因据说是cowboy未初始化完成。
    io:format("Started application ~p~n", [Started]),

    % 启动super
    tcplab_super:start_link().

stop(_State) ->
    ok.

application:ensure_all_started(cowboy)这个有点坑。据查是要等待cowboy初始化好。否则会出现noproc。 暂不深究。


tcplab_super#

tcplab_super是一个supervisor。需要完成init回调,填写需要启动的子进程。

tcplab_super.erl

-module(tcplab_super).

-behaviour(supervisor).

-export([start_link/0]).
-export([init/1]).

-define(SERVER, ?MODULE).

-import(http_app, [start/0]).
-import(tcp_server, [start/1]).

start_link() ->
    io:format("tcplab_super start_link ~p~n", [self()]),
    supervisor:start_link({local, ?SERVER}, ?MODULE, []).

init([]) ->
    io:format("tcplab_super init ~p~n", [self()]),
    SupFlags = #{strategy => one_for_one,
                 intensity => 1,
                 period => 1},

    Manager = #{id => manager,
        start => {manager, start, []},
        restart => permanent,
        shutdown => 2000,
        type => worker,
        modules => [manager]},

    WS = #{id => ws6666666,
        start => {ws_server, start, [9999]},
        restart => permanent,
        shutdown => 2000,
        type => worker,
        modules => [ws_server]},

    {ok, {SupFlags, [Manager, WS]}}.

起了一个ws_server:start和一个manager:start。


ws_server#

用cowboy起websocket服务。端口为tcplab_super启动时传的9999。

创建ets存储tcp_server_data。存每个websocket客户端进程id、对应的tcp服务id,和所有的tcp客户端。只需这么一个表就可以实现后续的各种查找。

ws_server.erl

-module(ws_server).
-author("raide").

%% API
-export([start/1]).

start(PORT) ->
    io:format("ws_server start at ~p ~p~n", [PORT, self()]),

    ets:new(tcp_server_data, [set, public, named_table]),

    % 设置路由和handler
    Dispatch = cowboy_router:compile([
        {'_', [{"/ws", ws_handler, []}]}
    ]),
    
    % 启动服务
    {ok, Pid} = cowboy:start_clear(tcplab_ws, [{port, PORT}], #{
        env => #{dispatch => Dispatch}
    }),

    {ok, Pid}.

ws_handler#

  • cowboy也是遵循otp思想。用各种回调实现功能。请看一下cowboy的文档和例子。

  • init回调 新客户端接入会走init。可以获取对端的ip端口等信息。 注意ws_server里启动的方式,ws和http并没有区别。 区别在于init的返回。默认返回ok的话是http服务,如果返回cowboy_websocket,会转化为websocket服务。 init里可以返回一些wb的配置,比如idle时间。

  • websocket_init回调 转化为websocket服务后,会走websocket_init。此时的pid即是ws客户端的pid,可以在这里做业务初始化。 在这里起此ws客户端对应的tcp_server。

  • websocket_handle回调 所有ws对端发来的消息会走websocket_handle回调。 在这里解数据进行处理。

  • websocket_info回调 其他erlang消息会走这里。 比如要发数据给ws对端,可以在这里实现。 websocket_开头的几个回调都可以根据返回值给对端发数据。 例如返回{[{text, Msg}], State};就会发Msg。返回{[], State}.就不发。

  • terminate回调 ws客户端断开时会走terminate。需要根据不同情况做善后处理。

ws_handler.erl

-module(ws_handler).
-author("raide").

%% API
-export([init/2, websocket_handle/2, websocket_info/2, websocket_init/1, terminate/3]).

-import(tcp_server, [start/1]).

init(Req, State) ->
    % 新的ws客户端接入

    io:format("ws_handler init ~p~n", [self()]),
    %io:format("Req ~p~n", [Req]),
    io:format("State ~p~n", [State]),

    % 获取client信息
    #{peer := Peer, pid := ClientPid} = Req,
    io:format("Peer ~p~n", [Peer]),

    % 设置超时时间
    Opts = #{idle_timeout => 1000000},
    NewState = {Peer, ClientPid},
    {cowboy_websocket, Req, NewState, Opts}.

websocket_init(State) ->
    % init进程会很快销毁。如果要起进程,可在此起。
    io:format("ws_handler websocket_init ~p ~p~n", [State, self()]),

    {Peer, ClientPid} = State,

    % 向ClientPid发消息会走websocket_info。
    % 注意self()与收消息的进程不是同一个。
    % https://ninenines.eu/docs/en/cowboy/2.9/manual/cowboy_websocket/
    % websocket_info返回{[{text, Msg}], State}即可向ws客户端发数据。

    % 新的ws客户端接入后起一个tcp_server
    % 连上tcp_server的所有tcp_client加ws_client,形成消息群组。
    TcpPort = rand:uniform(10000) + 50000,
    %TcpServerPid = spawn(tcp_server, start, [TcpPort]), 这样会出noproc
    case tcp_server:start(TcpPort, self()) of
        {ok, _TcpServerPid} ->
            % 可能有微小的时序瑕疵。
            % 暂时以Port作为唯一标识
            ets:insert(tcp_server_data, {TcpPort, {self(), dict:new()}}),

            NewState = {Peer, TcpPort, ClientPid},
            {[], NewState};
        {error} -> % listen失败。比如端口被占用。
            {[{text, "busy"}], State}
    end.

% ws客户端消息
websocket_handle({text, Msg}, State) ->
    io:format("websocket_handle 1 Msg ~p~n", [Msg]),
    io:format("websocket_handle 1 State ~p~n", [State]),

    {_, ID, _} = State, % {Peer, ID, ClientPid}

    try jsone:decode(Msg) of
        JsonData ->
            case maps:is_key(<<"msg_type">>, JsonData) of
                true ->
                    #{<<"msg_type">> := MsgType} = JsonData,

                    case MsgType of
                        4 -> % 4为约定的命令id。踢掉tcp client。
                            #{<<"ip">> := Ip, <<"port">> := Port} = JsonData,

                            io:format("websocket_handle 1 Ip ~p ~p~n", [Ip, binary_to_list(Ip)]),

                            % 通知管理
                            manager ! {tcplab_kick_tcp_client, ID, {binary_to_list(Ip), Port}},
                            {[], State};
                        5 -> % 5为约定的命令id。标识普通群组消息。
                            #{<<"data">> := Data} = JsonData,

                            % 通知管理
                            manager ! {tcplab_group_msg, ws_client, self(), ID, Data},
                            {[], State}
                    end;
                false ->
                    io:format("websocket_handle msg_type not found~n")
            end
    catch
        Error:Reason ->
            io:format("websocket_handle jsone:decode error ~p ~p~n", [Error, Reason])
    end;
websocket_handle(_Data, State) ->
    io:format("websocket_handle 2~n"),
    io:format("websocket_handle 2 _Data ~p~n", [_Data]),
    io:format("websocket_handle 2 State ~p~n", [State]),
    {[], State}.

% erlang消息
websocket_info({tcplab_new_tcp_server, Peer}, State) -> % 新的tcp server
    {Ip, Port} = Peer,
    Msg = jsone:encode(#{<<"msg_type">> => 0, <<"error">> => 0, <<"ip">> => iolist_to_binary(Ip), <<"port">> => Port}),
    %io:format("~p~n", [MSG]),
    {[{text, Msg}], State};
websocket_info({tcplab_new_tcp_client, Peer}, State) -> % 新的tcp client
    {Ip, Port} = Peer,
    Msg = jsone:encode(#{<<"msg_type">> => 1, <<"error">> => 0, <<"ip">> => iolist_to_binary(Ip), <<"port">> => Port}),
    %io:format("~p~n", [MSG]),
    {[{text, Msg}], State};
websocket_info({tcplab_group_msg, _FromPid, Peer, Data}, State) -> % 群组消息
    io:format("websocket_info group_msg~n"),
    {Ip, Port} = Peer,
    try jsone:try_encode(#{<<"msg_type">> => 3, <<"error">> => 0, <<"ip">> => iolist_to_binary(Ip), <<"port">> => Port, <<"data">> => Data}) of
        Msg ->
            {[{text, Msg}], State}
    catch
        Error:Reason ->
            io:format("websocket_info jsone:try_encode error ~p ~p~n", [Error, Reason])
    end;
websocket_info({tcplab_tcp_client_closed, Peer}, State) -> % client断开
    {Ip, Port} = Peer,
    Msg = jsone:encode(#{<<"msg_type">> => 2, <<"error">> => 0, <<"ip">> => iolist_to_binary(Ip), <<"port">> => Port}),
    %io:format("~p~n", [MSG]),
    {[{text, Msg}], State};
websocket_info(_Info, State) -> % 其他
    io:format("websocket_info unknonw ~p~p~n", [_Info, self()]),
    {[], State}.

clean_ws_client(State) ->
    io:format("clean_ws_client State ~p~n", [State]),

    {_, ID, _} = State, % {Peer, ID, ClientPid}
    [{_, {_, Clients}} | _] = ets:lookup(tcp_server_data, ID),

    % 关闭所有tcp client
    ClientList = dict:to_list(Clients),
    [TcpClientPid ! {tcplab_close_socket} || {_, TcpClientPid} <- ClientList],

    ets:delete(tcp_server_data, ID),
    ok.

% client断开后会走terminate回调。进行清理。
terminate(timeout, _PartialReq, State) ->
    io:format("ws_handler terminate client timeout~n"),
    clean_ws_client(State);
terminate({remote, _, _}, _PartialReq, State) ->
    io:format("ws_handler terminate client remote close~n"),
    clean_ws_client(State);
terminate(Reason, PartialReq, State) ->
    io:format("ws_handler terminate~n"),
    io:format("ws_handler terminate Reason ~p~n", [Reason]),
    io:format("ws_handler terminate PartialReq ~p~n", [PartialReq]),
    io:format("ws_handler terminate State ~p~n", [State]),
    clean_ws_client(State).

tcp_server#

  • tcp_server实现一个tcp服务。

    • 在websocket_init会用tcp_server:start启动tcp服务器。

    • tcp服务器开始accept,每次accept成功会进client_loop。同时新起一个进程继续accept。

    • 即每个客户端都在各自的进程进行处理。

    • client_loop处理对端tcp数据和tcplab各种功能消息。

tcp_server.erl

-module(tcp_server).
-author("raide").

%% API
-export([start/2]).

client_loop(Socket, ID, Peer) ->
    io:format("tcp_server client_loop ~p ~p~n", [Peer, self()]),

    % 收各种消息
    receive
        {tcp_closed, Socket} -> % socket关闭
            io:format("tcp_server recv tcp_closed ~n"),
            manager ! {tcplab_tcp_client_closed, ID, self(), Peer};

        {tcp, Socket, Data} -> % 正常的tcp数据
            io:format("tcp_server recv ~p~n", [Data]),

            % echo
            %gen_tcp:send(Socket, Data),

            % active once模式。每次收到消息后要主动设置一下,才会收到后续消息。
            %inet:setopts(Socket, [{active, once}]),

            % 通知管理
            manager ! {tcplab_group_msg, tcp_client, self(), ID, Peer, Data},

            % 继续loop
            client_loop(Socket, ID, Peer);
        {tcplab_group_msg, _FromClientPid, Data} -> % 群组消息
            io:format("tcp_server group_msg ~p~n", [Data]),
            gen_tcp:send(Socket, Data),
            client_loop(Socket, ID, Peer);
        tcplab_close_socket ->
            io:format("tcp_server tcplab_close_socket~n"),
            gen_tcp:close(Socket),
            manager ! {tcplab_tcp_client_closed, ID, self(), Peer}
    end.

% ID作为tcp server的唯一标识
start_accept(ListenSocket, ID) ->
    io:format("tcp_server start_accept ~p ~p~n", [self(), ListenSocket]),

    % 开始accept。一旦成功马上另起一个accept进程。自己继续处理连上来的socket。
    case gen_tcp:accept(ListenSocket) of
        {ok, Socket} ->
            case inet:peername(Socket) of
                {ok, Peer} ->
                    io:format("tcp_server new client ~p~n", [Peer]),
                    {{Ip0, Ip1, Ip2, Ip3}, Port} = Peer,
                    Ip = integer_to_list(Ip0) ++ "." ++ integer_to_list(Ip1) ++ "." ++ integer_to_list(Ip2) ++ "." ++ integer_to_list(Ip3),
                    IpPort = {Ip, Port},

                    spawn(fun() -> start_accept(ListenSocket, ID) end),

                    io:format("tcp_server_data -> ~p~n", [ets:tab2list(tcp_server_data)]),

                    % 添加client到ets
                    [{_, {WsClientPid, Clients}} | _] = ets:lookup(tcp_server_data, ID),
                    %io:format("lookup ~p~n", [Clients]),
                    NewClients = dict:store(IpPort, self(), Clients),
                    %io:format("new GG ~p~n", [NewClients]),
                    ets:insert(tcp_server_data, {ID, {WsClientPid, NewClients}}),

                    WsClientPid ! {tcplab_new_tcp_client, IpPort},

                    % 起loop后继续accept
                    client_loop(Socket, ID, IpPort);
                    %start_accept(Listen);
                {error, POSIXErrorCode} ->
                    io:format("tcp_server peername() failed ~p~n", [POSIXErrorCode]),
                    {error}
            end;
        {error, Reason} ->
            io:format("gen_tcp:accept failed ~p~n", [Reason]),
            {error}
    end.


start(Port, PPid) -> % 启动tcp服务器的入口
    io:format("tcp_server start at port ~p ~p~n", [Port, self()]),

    % 标准的socket流程。先listen。第二个参数是各种选项。binary表示数据包是以二进制数据展示。
    % {packet, N}是erlang处理tcp包分包的一种简单机制,一包前N个字节规定包的总长度,从而定下边界。
    % {active, once}。socket是active还是passive。可选true,false,once,N。类似预取消息数。

    case gen_tcp:listen(Port, [binary, {packet, 0}, {reuseaddr, false}, {active, true}]) of
        {ok, ListenSocket} ->
            io:format("tcp_server start ~p~n", [ListenSocket]),
            
            % listen成功后开始start_accept
            % 暂用Port作为tcp server的唯一标识
            Pid = spawn(fun() -> start_accept(ListenSocket, Port) end),
            PPid ! {tcplab_new_tcp_server, {<<"0.0.0.0">>, Port}},
            {ok, Pid};
        {error, Reason} ->
            io:format("gen_tcp:listen failed ~p~n", [Reason]),
            {error}
    end.

manager#

  • 完全在tcp_server和ws_handler里处理业务的话可能会越来越混乱。 可以做一个manager看情况管理一下消息和时序,让逻辑变得清晰。

  • 比如tcp客户端发了一个消息,需要群发给群组成员。 这时可以把群发任务交给manager处理,而不是原地阻塞处理。

    还有一些接入断开之类的消息也可以交给manager统一处理,更新数据。

-module(manager).
-author("raide").

%% API
-export([start/0]).

-import(tcp_server, [start/1]).

loop() ->
    io:format("manager loop~n"),
    receive
        {tcplab_group_msg, tcp_client, FromPid, ID, Peer, Data} ->
            % tcp client发了数据。这里要给所属tcp服务器的每个client发消息。
            % 也要通知对应的ws client

            % 获取数据
            [{_, {WsClientPid, Clients}} | _] = ets:lookup(tcp_server_data, ID),

            % 发给ws client
            WsClientPid ! {tcplab_group_msg, FromPid, Peer, Data},

            % 发给所有tcp client
            ClientList = dict:to_list(Clients),
            [TcpClientPid ! {tcplab_group_msg, FromPid, Data} || {_, TcpClientPid} <- ClientList],

            loop();
        {tcplab_group_msg, ws_client, FromWsClientPid, ID, Data} ->
            % tcp client发了数据。这里要给所属tcp服务器的每个client发消息。
            % 也要通知对应的ws client

            [{_, {_WsClientPid, Clients}} | _] = ets:lookup(tcp_server_data, ID),

            % 发给所有tcp client
            ClientList = dict:to_list(Clients),
            io:format("manager loop ClientList-> ~p~n", [ClientList]),
            [TcpClientPid ! {tcplab_group_msg, FromWsClientPid, Data} || {_, TcpClientPid} <- ClientList],

            loop();
        {tcplab_tcp_client_closed, ID, _ClientPid, Peer} ->
            % client tcp_closed
            % 删除client
            [{_, {WsClientPid, Clients}} | _] = ets:lookup(tcp_server_data, ID),
            %io:format("lookup ~p~n", [Clients]),
            NewClients = dict:erase(Peer, Clients),
            %io:format("new GG ~p~n", [NewClients]),
            ets:insert(tcp_server_data, {ID, {WsClientPid, NewClients}}),

            WsClientPid ! {tcplab_tcp_client_closed, Peer},

            loop();
        {tcplab_kick_tcp_client, ID, Peer} ->
            % 主动关闭一个tcp client
            io:format("tcp_server_data -> ~p~n", [ets:tab2list(tcp_server_data)]),
            [{_, {_, Clients}} | _] = ets:lookup(tcp_server_data, ID),

            case dict:find(Peer, Clients) of
                {ok, ClientPid} ->
                    ClientPid ! tcplab_close_socket;
                error ->
                    io:format("tcplab_kick_tcp_client Peer not found ~p~n", [Peer])
            end,

            loop();
        Any ->
            io:format("manager loop Any-> ~p~n", [Any]),
            loop()
    end.

manager_loop() ->
    io:format("manager_loop start ~p~n", [self()]),
    register(manager, self()),
    loop().

start() ->
    io:format("manager start ~p~n", [self()]),
    {ok, spawn(fun() -> manager_loop() end)}.