美文网首页
30 分钟学 Erlang (二)

30 分钟学 Erlang (二)

作者: Shawn_xiaoyu | 来源:发表于2018-03-21 03:16 被阅读0次

    并发

    创建进程

    使用 erlang:spawn/1,2,3,4 用来创建一个 erlang 进程。Erlang 进程不是操作系统的进程,而是类似其他语言里“协程”的概念,它由 Erlang 虚拟机调度。本文以后说“进程”,就是指 Erlang 进程。

    进程之间是互相独立的,一个进程要想与另外一个进程通信,就必须通过消息传递。消息会被发送到对方进程的信箱存储起来,对方进程可以在合适的时间,按照自定的顺序读取信箱里的消息。

    Erlang 里进程非常轻量,启动速度很快,并且可以同时运行千千万万个,默认的进程个数上限是 262144 ,但可以在启动时使用 erl +P 修改这个配置。

    1> HelloParallel = fun() -> io:format("hello parallel!~n") end.
    #Fun<erl_eval.20.99386804>
    2> spawn(HelloParallel).  %% spawn/1 BIF 接受一个函数做为参数。
    hello parallel!
    <0.63.0>
    3> PID = pid(0,63,0).   %% 使用 pid 来生成一个 PID
    4> is_pid(PID).  %% 检查是否是 PID 类型
    true
    5> is_process_alive(PID). %%  检查 Process 是否还活着。显示 false 是因为它已经运行完成终止了。
    false
    

    spawn 函数返回一个新进程的 pid,我们可以使用这个 pid 与其交互。

    erlang shell 也是有 pid 的。前面说到一个运行时错误会使得当前的shell 进程崩溃,并重新启动一个新的进程,我们验证一下:

    1> self().   %% self/1 返回当前进程的 pid
    <0.60.0>
    2> 1 = 2.
    ** exception error: no match of right hand side value 2
    3> self().
    <0.63.0>
    

    消息发送和接收

    使用消息发送运算符 ! 发送消息。

    4> self() ! "hello".    %% 向自己所在的进程发送一个 List 类型的 "hello". `!` 操作的返回值是消息内容, "hello".
    "hello"
    5> flush().  %% flush() 将当前 process 的信箱里的所有消息清空并打印。
    Shell got "hello"
    ok
    

    receive ... end 语句使用 pattern matching 来从自己进程的信箱里读取消息,可以使用 after 语句来设置等待超时时间:

    1> self() ! "msg1".
    "msg1"
    2> self() ! "msg2".
    "msg2"
    3> self() ! "msg3".
    "msg3"
    4> receive Msg -> Msg after 3000 -> no_more end. %% 读取任意消息并返回这条消息,如果信箱里没有消息了,等待 3 秒后结束并返回 no_more.
    "msg1"
    5> receive Msg -> Msg after 3000 -> no_more end.  %% 后面这两条为什么返回 no_more ? 不应该是 "msg2", "msg3" 吗?
    no_more
    6> receive Msg -> Msg after 3000 -> no_more end.
    no_more
    

    上面的第 4 行 receive 语句里,erlang shell 进程查看邮箱,查到第一个消息是 "msg1", Msg 被绑定为 "msg1"。再次运行 receive 语句的时候,由于 Msg 的值已经为 "msg1",与信箱里的 "msg2", "msg3" 都不匹配,所以后面两条 receive 语句都没有从信箱里读取新消息,"msg2" 和 "msg3" 仍然存储在信箱里:

    16> flush().
    Shell got "msg2"
    Shell got "msg3"
    ok
    

    注意虽然后面两个 receive 语句都没有从信箱里读取消息,但在 receive 语句的执行过程中,它仍然是从头到尾遍历了整个邮箱,并尝试拿邮箱里的各个消息跟代码里的 Msg 进行匹配,这是消耗资源的,等后面消息堆积越多越麻烦。这个叫 Selective Message Reception. 消息的读取顺序是接收方决定的。

    所以一般情况下我们在读取信箱消息时,读到我们不感兴趣的消息也取出来,打个 error log 然后扔掉它,不要让它一直在信箱里耗费资源。

    在 Erlang shell 已经伸展不开拳脚了。让我们来写个复杂点的程序:
    我们的程序实现一个 消息缓存,具体需求是:

    • 我们需要一个消息栈,用于存储用户发来的消息。
    • 考虑到用户发来的消息可能有很多,我们需要好几个这样的消息栈来分担负载。
    • 我们还想能够给消息栈命名,以便区分。
    -module(msg_cache).
    
    %% APIs
    -export([start_one/1]).
    
    %% for spawns
    -export([loop/1]).
    
    %% 定义进程的 state。
    %% 我们一般说,一个服务、或 “对象” 会维护自己内部的 '状态'
    %% 状态可能是一个字符串缓存,可能是某个资源的引用,这个跟业务相关。
    %% 状态存在于内存中,跟外界隔离,通过 API 接口与外界交互。
    %% 面向对象语言里用 类和对象来存储状态,Erlang 里我们用 process。
    %% 所以我们又说 Erlang 是 “面向Process 编程的”
    -record(state, {
                name,      %% 消息栈的名字
                length = 0,  %% 消息栈长度
                buff = []   %% 消息栈的存储列表
             }).
    
    loop(State = #state{name = Name, length = Len, buff = Buff}) ->
      receive
        {get_name, From}->
          From ! {ok, Name},
          loop(State);
        {get_length, From}->
          From ! {ok, Len},
          loop(State);
        {set_name, NewName, From} ->
          From ! ok,
          loop(State#state{name = NewName});
        {push, Msg, From} ->
          From ! ok,
          loop(State#state{buff = [Msg | Buff], length = Len + 1});
        {pop, [], From} ->
          From ! {error, empty},
          loop(State);
        {pop, [TopMsg | Msgs], From} ->
          From ! {ok, TopMsg},
          loop(State#state{buff = Msgs, length = Len - 1});
        _Unsupported ->
          erlang:error(io_libs:format("unsupported msg: ", [_Unsupported]) )
      end.
    
    start_one(BuffName) ->
      %% 启动一个消息栈,并返回其 PID
      Pid = spawn(msg_cache, loop, [#state{name=BuffName}]),
      io:format("Buff ~s created! Pid = ~p~n", [BuffName, Pid]),
      Pid
    

    其实除了 loop/1 长一点,其他的都挺容易理解的。
    注意 loop/1 里的每个分支的最后一个语句都是尾递归,意味着只要不出错,loop/1 就一直循环下去,所以进程就不会停止。

    思考:如果把上面代码里 receive 语句的最后一个 _Unsupported -> 分支删掉的话,会发生什么?

    receive 语句里,接受消息时,都要求消息发送方将自己的 Pid 带过来,放到 From 变量里,以便我们回复消息给对方。

    我们来试试:

    1> PID = msg_cache:start_one("cache2").
    Buff cache2 created! Pid = <0.62.0>
    <0.62.0>
    2> PID ! {get_length, self()}.
    {get_length,<0.60.0>}
    3> flush().
    Shell got {ok,0}
    ok
    
    4> PID ! {pop, self()}.
    {pop,<0.60.0>}
    5> flush().
    Shell got {error,empty}
    ok
    
    6> PID ! {push, "msg1", self()}.
    {push,"msg1",<0.60.0>}
    7> PID ! {push, "msg2", self()}.
    {push,"msg2",<0.60.0>}
    8> PID ! {push, "msg3", self()}.
    {push,"msg3",<0.60.0>}
    9> PID ! {get_length, self()}.
    {get_length,<0.60.0>}
    10> flush().
    Shell got ok
    Shell got ok
    Shell got ok
    Shell got {ok,3}
    ok
    
    11> PID ! {pop, self()}.
    {pop,<0.60.0>}
    12> flush().
    Shell got {ok,"msg3"}
    ok
    
    13> PID ! {get_length, self()}.
    {get_length,<0.60.0>}
    14> flush().
    Shell got {ok,2}
    ok
    

    继续往下阅读之前,仔细看一下这个例子,确保你完全理解了这段代码。

    挺厉害的吧?但我们还有两个问题没有解决:

    • 没有一个易用易维护的 API。 PID ! {get_length, self()}. 这种调用方式实在有些反人类。
    • 没有管理进程。我们调用一次 msg_cache:start_one/1 就启动了一个msg_cache, 但是现在我不知道当前已经启动了几个 msg_cache.

    我们来解决这第一个问题,重新整理一下代码:

    -module(msg_cache).
    
    %% APIs
    -export([start_one/1,
             get_name/1,
             get_length/1,
             pop/1,
             set_name/2,
             push/2
            ]).
    
    %% for spawns
    -export([loop/1]).
    
    -define(API_TIMEOUT, 3000).
    
    -record(state, {
                name,
                length = 0,
                buff = []
             }).
    
    start_one(BuffName) ->
      Pid = spawn(msg_cache, loop, [#state{name=BuffName}]),
      io:format("Buff ~s created! Pid = ~p~n", [BuffName, Pid]),
      Pid.
    
    %% 加了这几个 API
    get_name(CacheID) ->
      call(CacheID, {get_name, self()}).
    get_length(CacheID) ->
      call(CacheID, {get_length, self()}).
    set_name(CacheID, NewName) ->
      call(CacheID, {set_name, NewName, self()}).
    pop(CacheID) ->
      call(CacheID, {pop, self()}).
    push(CacheID, Msg) ->
      call(CacheID, {push, Msg, self()}).
    
    %% 由于发送和接受消息的处理方面,各个 API 都差不多,就提取出来专门写个 call 函数,提高代码复用。
    call(Pid, Request) ->
      Pid ! Request,
      receive
        Response -> Response
      after ?API_TIMEOUT ->
        {error, api_timeout}
      end.
    
    %% loop 这一部分我们没改动任何代码
    loop(State = #state{name = Name, length = Len, buff = Buff}) ->
      receive
        {get_name, From}->
          From ! {ok, Name},
          loop(State);
        {get_length, From}->
          From ! {ok, Len},
          loop(State);
        {set_name, NewName, From} ->
          From ! ok,
          loop(State#state{name = NewName});
        {push, Msg, From} ->
          From ! ok,
          loop(State#state{buff = [Msg | Buff], length = Len + 1});
        {pop, From} ->
          case Buff of
            [] ->
              From ! {error, empty},
              loop(State);
            [TopMsg | Msgs] ->
              From ! {ok, TopMsg},
              loop(State#state{buff = Msgs, length = Len - 1})
          end;
        _Unsupported ->
          erlang:error(io_libs:format("unsupported msg: ", [_Unsupported]) )
      end.
    

    再试一下:

    1> PID = msg_cache:start_one("cache_worker_1").
    Buff cache_worker_1 created! Pid = <0.62.0>
    <0.62.0>
    2> msg_cache:get_name(PID).
    {ok,"cache_worker_1"}
    3> msg_cache:get_length(PID).
    {ok,0}
    4> msg_cache:pop(PID).
    {error,empty}
    5> msg_cache:push(PID, "msg1").
    ok
    6> msg_cache:push(PID, "msg2").
    ok
    7> msg_cache:get_length(PID).
    {ok,2}
    8> msg_cache:pop(PID).
    {ok,"msg2"}
    9> msg_cache:pop(PID).
    {ok,"msg1"}
    10> msg_cache:pop(PID).
    {error,empty}
    11> msg_cache:get_length(PID).
    {ok,0}
    

    还阔以吧?

    留个作业

    上面那个 "管理进程" 我们没有实现。你来实现它。

    我想这么调用:

    %% 启动两个 worker:
    1> msg_cache:start_cache_workers(["c_worker_1", "c_worker_2"]).
    [<0.62.0>, <0.65.0>]
    
    %% 列出所有 workers, 返回值是个 worker 列表, 元素展示了每个 worker 的 name, pid, 和 length 。
    2> CachePidList = msg_cache:list_cache_workers().
    [{"c_worker_1", <0.62.0>, 0}, {"c_worker_2", <0.65.0>, 0}]
    
    %% 负载均衡, 会往随机的一个 cache worker 里 push.
    %% 注意我这里调用 msg_cache:push 的时候,没有提供某个 cache worker 的 PID
    3> ok = msg_cache:push("msg1").
    ok
    4> ok = msg_cache:push("msg2").
    ok
    5> CachePidList = msg_cache:list_cache_workers().
    [{"c_worker_1", <0.62.0>, 1}, {"c_worker_2", <0.65.0>, 1}]
    
    %% 至于 pop 不用管顺序了,有消息就随便 pop 出一个来。
    4> msg_cache:pop().
    {ok, "msg1"}
    

    提示:

    • erlang:register/2 可以给一个 PID 注册一个名字,以后使用这个 PID 就可以使用这个名字代替。比如
    register(msg_cache_manger, Pid).
    
    msg_cache:list_cache_workers() ->
      msg_cache_manger ! get_all_workers.
    
    课后必读文章

    Erlang 中的错误处理机制, Link、Monitor:
    Errors and Processes

    ETS

    ETS (Erlang Term Storage) 是设计来存放大量的 Erlang 数据的。跟 ETS 打交道不用消息格式转换,可直接存放 Erlang 数据格式 (erlang 各种数据格式的统称叫做 erlang terms)。
    ETS 非常快,访问时间是常数级的,自动帮你解决了多进程访问的各种竞态条件问题,让我们在 Erlang 中做并发编程一身轻松。ETS 是非常优秀的缓存系统,是我们开发中不可或缺的利器之一。这比起用某种流行语言来说,舒服太多[1]
    ETS 只将数据存储在内存里,如果想保存到磁盘,或者要在多个 Erlang Node 之间共享数据,OTP 基于 ETS 和 DETS 实现了 mnesia.
    NODE: mnesia 只适合用来做缓存,在多个 Node 之间共享少量数据,非常快速。但是并不适合当做数据库存储大量的数据,因为 mnesia 在启动时会加载所有数据到内存里,导致启动缓慢、新节点加入缓慢。并且 mnesia 是强一致性的数据库,其本身并不处理由于集群脑裂导致的不一致性,这可能不太符合你的预期。

    ETS 支持几种数据类型:

    • set: set 是普通的 key - value 存储类型,一个 ETS table 里,两个数据的 key 不能相同。重复插入 key 相同的两条数据,后面的那条会覆盖前面的那条。
    • ordered_set: 有序的 set 表。
    • bag: bag 允许多个 key 相同的数据的存在,但 key, value 都完全相同的数据只能留一个。
    • duplicate_bag: 允许多个 key, value 完全相同的数据的存在。

    我们来试试 set 类型的 table,这也是最常用的类型。我们创建一个命名表,叫 users, 然后插入两条数据:

    1> ets:new(users, [set, named_table]).
    users
    2> ets:info(users).   %% 注意默认的权限是 protected
    [{read_concurrency,false},
     {write_concurrency,false},
     {compressed,false},
     {memory,304},
     {owner,<0.57.0>},
     {heir,none},
     {name,users},
     {size,0},
     {node,nonode@nohost},
     {named_table,true},
     {type,set},
     {keypos,1},
     {protection,protected}]
    3> ets:insert(users, {1, <<"Shawn">>, 27}).
    true
    4> ets:insert(users, {2, <<"Scarlett">>, 25}).
    true
    5> ets:lookup(users, 1).
    [{1,<<"Shawn">>,27}]
    6> ets:lookup(users, 2).
    [{2,<<"Scarlett">>,25}]
    7> ets:info(users).
    [{read_concurrency,false},
     {write_concurrency,false},
     {compressed,false},
     {memory,332},
     {owner,<0.57.0>},
     {heir,none},
     {name,users},
     {size,2},
     {node,nonode@nohost},
     {named_table,true},
     {type,set},
     {keypos,1},
     {protection,protected}]
    8>
    

    注意上边的示例里:

    • 创建 ETS table 时给了两个 Options 参数:[set, named_table]。set 是指定创建 set 类型的表,named_table 是创建命名表,命名为 users,后面可以用这个表名来引用。
    • 插入数据 {1, <<"Shawn">>, 27}{2, <<"Scarlett">>, 25} 时,两个 tuple 的第一项就是默认的 key,tuple 里其他项都是 values。如果你想用其他的项作为 key,可以在 ets:new 的时候,指定 {keypos, Pos} 参数,设置 key 在 tuple 中的位置。

    ETS 表的其他类型你可以自己试验一下。

    需要注意的是:

    • ETS 表里的任何数据都不参加 GC
    • ETS 表有自己的 owner 进程,默认情况下,创建表的那个进程就是 ETS table 的 owner。如果 owner 进程挂了,ETS 表也就被释放了。我们上边的例子里,erlang shell 进程就是 user table 的 owner。
    • ETS 表也是有访问权限的,默认是 protected:
      • public:任何人可以读写这张表。
      • protected: owner 可以读写,但其他进程只能读。
      • private:只有 owner 可以读写。别的进程无法访问。

    由于 ETS 表非常高效,一般情况下我们都直接使用 public,然后设置 {read_concurrency, true}{write_concurrency,true} 选项来提高并发读或写的效率,在写一个管理模块来直接访问 ets 表,让什么封装什么设计模式都去 shi。

    OTP

    OTP 已经失去了字面意思,基本上指的就是 Erlang 生态环境的官方部分。Erlang 世界的组成是这样的:

    • Erlang 以及 Elixir 等语言。
    • 工具和函数库,包括 erlang runtime,kernel,stdlib(像 lists 这种的官方库), sasl, 还有像 ETS,dbg 之类的很多。
    • 系统设计原则, 包括本章要讲的一众 Behaviors。是一堆应用于并发世界的设计模式,他们包含了解决通用问题的通用代码。
    • 开源社区生态环境,包括各种开源软件和社区。

    OTP 指的是前三个,Elixir 的话还不大算。

    Erlang 的逻辑是,架构的设计应该由有经验的人负责,由专家做好基础代码框架,解决好最困难的问题。而使用者只需要写自己的逻辑代码。这就是 OTP behaviors,他们已经在通信、互联网领域,经历了几十年的战火考验。

    本文要讲的有三个:

    • gen_server
    • application
    • supervisor

    本章只讲解 gen_server。 application 和 supervisor 放到后面 Hello World 工程里讲解。

    gen_server 要解决的问题,就是我们上面那个 msg_cache 面临的问题:怎样做一个服务来响应用户的请求。

    我们之前写的代码很短,可以工作,但是很多东西都没有考虑。比如请求者如果同时收到来自服务端的两个 Response 的话,不知道是对应哪个请求的:

    %% 服务端:
        {get_name, From}->
          From ! {ok, Name},
          loop(State);
        {get_length, From}->
          From ! {ok, Len},
          loop(State);
    
    %% 客户端:
        ServerPID ! {get_length, self()},   %% 客户端连续调用了两次
        ServerPID ! {get_length, self()},  
        receive
          {ok, Len} ->  %% 你知道这次匹配到的消息,是上面哪次调用的回复吗?
             success;
          _ ->
             failed
        end.
    

    上面代码里连续调用了两次 {get_length}, 但是由于发送消息是异步的,消息通过网络回来,你并不能确定第一次收到的回复就是第一次调用产生的。

    这个问题可以加一个随机生成的 RequestID 的字段来解决,客户端发送请求消息的时候带 RequestID 过去,服务端返回的时候再传回来。客户端通过匹配 RequestID,就能知道当前的回复是对应的哪个请求。

    但这种需求其实是通用的,你现在写 msg_cache 用得到,改天写其他代码也一样用得到。另外我们也没有过多考虑异常的情况:如果程序崩溃了怎么办?发送消息怎么知道对方是不是还活着?

    诸如此类的问题应该由专家来解决,所以我们有了 gen_server.
    gen_server 的模板是这样的:

    -module(gen_server_demo).
    -behaviour(gen_server).
    
    %% API functions
    -export([start_link/0]).
    
    %% gen_server callbacks
    -export([init/1,
             handle_call/3,
             handle_cast/2,
             handle_info/2,
             terminate/2,
             code_change/3]).
    
    -record(state, {}).
    %%%% %%%% %%%% %%%% %%%% 
    %%%% 这是给客户端调用的接口部分
    %%%% %%%% %%%% %%%% %%%% 
    %% 启动一个服务,后台会启动一个 erlang process, 并进入 loop 函数, 回想一下我们实现 msg_cache 时写的那个 loop/1.
    %% 但是这个 loop 函数属于通用部分的代码,是由 OTP 官方实现的,所以代码不在这里,在 OTP 代码的 lib/stdlib/src/gen_server.erl 里。
    start_link() ->
        %% gen_server:start_link 启动 process, 然后将 process 注册在当前
        %% node 上,注册名字就是当前 Module 名:gen_server_demo
        gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
    
    %%%% %%%% %%%% %%%% %%%% 
    %%%% 这是 gen_server 发生某事件时的回调函数部分
    %%%% %%%% %%%% %%%% %%%%
    
    %% gen_server:start_link 被调用,服务启动时,回调 init/1
    init([]) ->
        {ok, #state{}}.
    
    %% gen_server:call 被调用。gen_server:call 是“同步”调用,调用方可以设置一个超时时间。
    %% 返回值里的 Reply 是返回给调用者的内容。
    handle_call(_Request, _From, State) ->
        Reply = ok,
        {reply, Reply, State}.
    
    %% gen_server:cast 被调用。gen_server:cast 是“异步”调用。
    %% 调用者一般是想发一个消息给我们的 gen_server,然后继续做自己的事情,他不想收到来自 gen_server 的回复。
    handle_cast(_Msg, State) ->
        {noreply, State}.
    
    %% gen_server 进程收到一个普通 erlang 消息:一个不是通过 gen_server:call 和 gen_server:cast 发来的消息。
    handle_info(_Info, State) ->
        {noreply, State}.
    
    %% 上面的三个函数 handle_call, handle_cast, handle_info
    %%   都可以返回一个 {stop, Reason, State},这样的话 gen_server 会退出。
    %%   但退出之前,可能会回调 terminate(_Reason, _State)。
    
    
    %% gen_server 将要退出时,回调 terminate/2.
    %% 注意
    %% 1) 要想 terminate 在 gen_server 退出前被回调,gen_server 必须捕获退出信号:
    %%    需要在 init 回调里,加这么一行:process_flag(trap_exit, true).
    %% 2) 有几个特定的 Reason 被认为是正常退出:normal, shutdown, or {shutdown,Term},
    %%    其他的 Reason,sasl 是会报错打日志的。
    terminate(_Reason, _State) ->
        ok.
    
    code_change(_OldVsn, State, _Extra) ->
        {ok, State}.
    

    gen_server 真正的进程代码在 OTP 库里,运行 start_link(),gen_server 就在后台跑起来了。你需要实现的只是这个模板里的各个回调函数,将你的业务逻辑放到这些回调里。

    仔细看一下上面的 gen_server 模板和注释,确保你能完全理解。

    我不想重新实现之前的 msg_cache,一点都不酷。我们重新写个其他的,让你对 Erlang 程序的基本设计理念有更深的印象。

    我们要实现一个多用户聊天的程序:

    • 用户能够查询在线的其他用户。
    • 用户之间能够聊天。
    • 要容易扩展,因为后面我们的 Client 会通过TCP、WebSocket 等连接上来,不会是 Erlang 写的 Client。
    • 要容易伸缩,因为我们业务发展很快,用户量会越来越大,我们希望程序能很容易的部署在多台服务器上。

    先来设计我们程序的架构:


    chat_server-2.png
    • 每个 client 连接上来,都会启动一个新的 Process,叫做 ChatServer.
    • ChatServer 负责维护这个 Client 的 TCP 连接。
    • Route 是一个Module,它提供了数据库的管理,数据库里维护了从 User 到其 ChatServer 的 PID 的映射关系。

    注意我们的设计思想:

    • 为每一个连接上来的请求启动一个 Erlang 进程 "ChatServer",不要担心进程个数,百万也没问题。
    • 两个用户之间的消息传递,体现在服务端就是两个 "ChatServer" 之间的 Erlang 消息传递。
    • Route 部分只是一个 Module,不是进程。每一个 ChatServer 调用 Route 里的代码的时候,执行过程其实是在每个 ChatServer 进程内部的。这样我们就避免了集中向一个进程发送消息带来的瓶颈。我们把这种瓶颈的处理留给了 ETS 来解决。
    • 如何伸缩?ChatServer 在不在同一个服务器上没什么关系。ChatServerPID !{send, Msg} 会将消息发送到ChatServerPID,即使 ChatServerPID 在远端的服务器上。分布式部署的时候,这行代码根本不用改,你要做的仅仅是添加一个新的 Erlang Node。分布式 Erlang 后面还要讲。
    • 如何扩展?ETS 使用 Route Module 管理,为的就是当以后换用其他的缓存数据库的时候简单一些。我们设想后面为了做分布式集群,要用 mnesia 替代 ETS,只需要写一个新的 Route Module,内部改用 mnesia 存储,然后替换线上已经加载的老的 Route Module。线上系统都不用停止,客户端的连接一个都不会断!

    你现在能否体会到 Erlang 的实用主义呢?完全没废话,就是解决问题!

    Client 部分我们现在不做,让前端的同学帮我们实现。但假设我们的前端程序员还没到岗,所以我们可以先放着 WebSocket 部分后面再做。但有两个过程必须现在实现:

    • 当 Client 登录时,我们需要使用 Route 注册 user 所在的 ChatServer 的 PID。
    • 当 Client 发消息时,我们需要使用 Route 查找对方的 ChatServer 的 PID。

    首先我们来定义我们的消息协议。我们的消息体内包含几部分,发送者ID,接收者ID,以及消息内容:

    -record(msg, {
      from_userid,
      to_userid,
      payload
    }).
    

    接下来让我们来实现 Route 模块,实现数据库创建,注册,查找与注销功能:

    -module(route).
    -export([ensure_db/0,
             lookup_server/1,
             register_server/2,
             unregister_server/1]).
    
    ensure_db() ->
      case ets:info(servers) of
        undefined ->
          %% 为了演示方便,我们启动一个临时进程来创建 ETS 表,
          %% 如果直接在 erlang shell 里创建ETS的话,出错时 shell 的崩溃连带着我们的ETS也丢了。
          %% 当然线上系统不会这么做。
          spawn(fun() -> ets:new(servers, [named_table, public]), receive after infinity->ok end end);
        _ -> ok
      end.
    
    lookup_server(UserID) ->
      case ets:lookup(servers, UserID) of
        [{UserID, ServerID}] -> {ok, ServerID};
        _ -> {error, no_server}
      end.
    
    register_server(UserID, ServerID) ->
      ets:insert(servers, {UserID, ServerID}).
    
    unregister_server(UserID) ->
      ets:delete(servers, UserID).
    

    接下来实现我们的 ChatServer:

    -module(chat_server).
    -behaviour(gen_server).
    %% state 保存用户的 userid,以及 client 端连上来的 socket.
    -record(state, {
      userid,
      socket
    }).
    
    %% 后面当一个新连接连接到服务器的时候,我们会调用 start_link 启动一个新的 ChatServer 为之服务。
    %% 注意这里使用的是 gen_server:start_link/3,没有注册进程名,我们直接使用 PID. 因为我们要启动很多个 ChatServer。
    start_link(UserID, Socket) ->
      {ok, ServerID}  = gen_server:start_link(?MODULE, [UserID, Socket], []),
      ServerID.
    
    %% 在 init 回调里注册用户的 ChatServer。
    %% 注意我们捕获了 exit signal, 以便程序退出的时候回调 terminate/2. 
    %% 我们在 terminate/2 里取消注册。
    init([UserID, Socket]) ->
        process_flag(trap_exit, true),
        route:register_server(UserID, self()),
        {ok, #state{userid=UserID, socket=Socket}}.
    
    %% 如果我们的 ChatServer 收到一条来自 Socket 的消息,它会收到一条类似 {tcp, Sock, Data} 的普通消息。
    %% 我们需要在 handle_info 里处理,转发给对方的 ChatServer。
    handle_info({tcp, #msg{to_userid = ToUserID, payload = Payload} = Msg}, State) ->
      io:format("Chat Server(User: ~p) - received msg from tcp client, Msg: ~p~n",[State#state.userid, Msg]),
      case route:lookup_server(ToUserID) of
        {error, Reason} ->
          io:format("Chat Server(User: ~p) - cannot forward to Chat Server(User: ~p): ~p~n",
              [State#state.userid, ToUserID, Reason]);
        {ok, TargetServerID} ->
          io:format("Chat Server(User: ~p) - forward msg to Chat Server(User: ~p), Payload: ~p~n",
            [State#state.userid, ToUserID, Payload]),
          ok = gen_server:call(TargetServerID, {send, Msg})
      end,
      {noreply, State};
    
    %% 我们的 ChatServer 收到一条来自对端 ChatServer 的转发请求
    handle_call({send, #msg{payload = Payload}}, _From, State) ->
      io:format("Chat Server(User: ~p) - deliver msg to tcp client, Payload: ~p~n",
        [State#state.userid, Payload]),
      send_to_client_via_tcp(State#state.socket, Payload),
      {reply, ok, State};
    
    %% Socket 部分我们没有实现,暂时就简单打印一下
    send_to_client_via_tcp(_Socket, Payload) ->
      %gen_tcp:send(_Socket, Payload),
      io:format("Sent To Client: ~p~n",[Payload]).
    

    完工了!我们测试一下:

    1> c(chat_server).
    {ok,chat_server}
    2> c(route).
    {ok,route}
    
    %% 现在模拟系统启动时,初始化 DB 的过程。
    %% 后续这个会在启动代码里写。
    3> route:ensure_db().
    <0.73.0>
    
    %% 现在我们模拟一个新的用户登录上来,启动新的 ChatServer 的过程。
    %% 后续这个过程当然是由 WebSocket 模块调用。
    4> ServerIDUser1 = chat_server:start_link(<<"user1">>, fake_socket).
    <0.75.0>
    5> ServerIDUser2 = chat_server:start_link(<<"user2">>, fake_socket).
    <0.77.0>
    
    %% 我们来做一个 #msg{} 消息体。
    %% 后续我们应该在收到 socket 上来的消息解析成功之后,打包一个 #msg{} 消息体。
    6> rr("chat_protocol.hrl").
    [msg]
    7> Msg = #msg{from_userid= <<"user1">>, to_userid = <<"user2">>, payload = <<"hello?">>}.
    #msg{from_userid = <<"user1">>,to_userid = <<"user2">>,
         payload = <<"hello?">>}
    
    
    %% 模拟从 socket 收到消息的过程。
    8> ServerIDUser1 ! {tcp, Msg}.
    Chat Server(User: <<"user1">>) - received msg from tcp client, Msg: {msg,
                                                                         <<"user1">>,
                                                                         <<"user2">>,
                                                                         <<"hello?">>}
    {tcp,#msg{from_userid = <<"user1">>,to_userid = <<"user2">>,
              payload = <<"hello?">>}}
    Chat Server(User: <<"user1">>) - forward msg to Chat Server(User: <<"user2">>), Payload: <<"hello?">>
    Chat Server(User: <<"user2">>) - deliver msg to tcp client, Payload: <<"hello?">>
    Sent To Client: <<"hello?">>
    9>
    

    我们看到服务端的路由已经走通了,接下来只要写一个 web socket 模块,listen 在某个端口,当有连接请求时,像上面第 4、第 5 行一样调用 chat_server:start_link/2 就行了。当然 send_to_client_via_tcp 也要改为真正往 socket 发送消息。

    完整代码:
    https://github.com/terry-xiaoyu/learn-erlang-in-30-mins/tree/master/chat

    一个完整的线上演示:
    (即将上线)

    书接下文30 分钟学 Erlang (三)


    1. Golang 里你需要自己找多线程安全的 maps 库,写并发没有安全感。Golang 官方也没有下文要说到的 OTP 里提供的各种 Behavior,代码写起来天马行空最后一团糟。然后又不能支持函数式的 pattern matching 等写法... 总之用 golang 写代码从来不会给人愉快的感觉。流行是流行的,但那叫“普通”吧?第一次这么吐槽 golang,但这篇是 erlang 的教程,应该不算过分吧。等到写 go 的时候我再来吐槽 erlang 。我是不会写 go 的 ...

    相关文章

      网友评论

          本文标题:30 分钟学 Erlang (二)

          本文链接:https://www.haomeiwen.com/subject/fixwqftx.html