美文网首页Erlang 源码分析
Erlang 源码之OTP通用服务器行为模式分析(1)

Erlang 源码之OTP通用服务器行为模式分析(1)

作者: 涷野 | 来源:发表于2018-12-30 23:38 被阅读0次

    Erlang并发基于Actor并发模型,容错 Actor之间的通信是异步的,发送方只管发送,不关心超时以及错误,这些都由框架层和独立的错误处理机制接管。Erlang/OTP 是Actor模型的标杆,OTP行为模式的理解非常像面向对象语言的接口,OTP行为模式分成通用部分和具体实现部分,行为模式包括gen_server, gen_statem, gen_event, gen_fsm, supervisor,下面分析它们实现的原理,


    可以看到gen_event, gen_fsm, gen_statem, gen_event,依赖于gen这个通用模块,这种关于类似于

    sequenceDiagram
    调用者->>行为模块: 请求
    行为模块->>调用者: 响应
    行为模块->>gen: 请求
    gen-->>调用者: 响应
    
    下面分析gen源代码
    • gen暴露的方法,
    -export([start/5, start/6, debug_options/2, hibernate_after/1,
         name/1, unregister_name/1, get_proc_name/1, get_parent/0,
         call/3, call/4, reply/2, stop/1, stop/3]).
    
    -export([init_it/6, init_it/7]).
    
    -export([format_status_header/2]).
    

    首先看start方法实现代码块

    -spec start(module(), linkage(), emgr_name(), module(), term(), options()) ->
        start_ret().
    
    start(GenMod, LinkP, Name, Mod, Args, Options) ->
        case where(Name) of
        undefined ->
            do_spawn(GenMod, LinkP, Name, Mod, Args, Options);
        Pid ->
            {error, {already_started, Pid}}
        end.
    
    -spec start(module(), linkage(), module(), term(), options()) -> start_ret().
    
    start(GenMod, LinkP, Mod, Args, Options) ->
        do_spawn(GenMod, LinkP, Mod, Args, Options).
    
    %%-----------------------------------------------------------------
    %% Spawn the process (and link) maybe at another node.
    %% If spawn without link, set parent to ourselves 'self'!!!
    %%-----------------------------------------------------------------
    do_spawn(GenMod, link, Mod, Args, Options) ->
        Time = timeout(Options),
        proc_lib:start_link(?MODULE, init_it,
                [GenMod, self(), self(), Mod, Args, Options], 
                Time,
                spawn_opts(Options));
    do_spawn(GenMod, _, Mod, Args, Options) ->
        Time = timeout(Options),
        proc_lib:start(?MODULE, init_it,
               [GenMod, self(), self, Mod, Args, Options], 
               Time,
               spawn_opts(Options)).
    
    do_spawn(GenMod, link, Name, Mod, Args, Options) ->
        Time = timeout(Options),
        proc_lib:start_link(?MODULE, init_it,
                [GenMod, self(), self(), Name, Mod, Args, Options],
                Time,
                spawn_opts(Options));
    do_spawn(GenMod, _, Name, Mod, Args, Options) ->
        Time = timeout(Options),
        proc_lib:start(?MODULE, init_it,
               [GenMod, self(), self, Name, Mod, Args, Options], 
               Time,
               spawn_opts(Options)).
    
    -type linkage()    :: 'link' | 'nolink'.
    -type emgr_name()  :: {'local', atom()}
                        | {'global', term()}
                        | {'via', Module :: module(), Name :: term()}.
    
    -type start_ret()  :: {'ok', pid()} | 'ignore' | {'error', term()}.
    
    -type debug_flag() :: 'trace' | 'log' | 'statistics' | 'debug'
                        | {'logfile', string()}.
    -type option()     :: {'timeout', timeout()}
                | {'debug', [debug_flag()]}
                | {'hibernate_after', timeout()}
                | {'spawn_opt', [proc_lib:spawn_option()]}.
    -type options()    :: [option()].
    

    第一个start传入6个参数,GenMod表示属于哪个行为,LinkP表示否启动Link监控进程,link方式可以建立进程之间的双向链接关系,当其中一个进程退出时,另一个进程会收到该进程退出的消息,Name表示进程名称,Mod表示模块名称,Args表示多余参数,Options表示是否debug hibernate spawn_otp某一选项,
    第一个start方法进行了where方法判断 判断是否已经存在,如果存在返回改模块已经启动。没有启动则进入do_spwan方法中

    where({global, Name}) -> global:whereis_name(Name);
    where({via, Module, Name}) -> Module:whereis_name(Name);
    where({local, Name})  -> whereis(Name).
    

    do_spwan根据不同的参数,调用proc_lib:start_link或者proc_lib:start,proc_lib用来同步启动符合OTP设计原则的进程,start_link带了进程监控,proc_lib:start_link和proc_lib:spawn_link的不同之处在于,前者是同步创建子进程,后者是异步创建子进程,proc_lib:start_link调用后会阻塞,直到子进程初始化完毕,调用proc_lib:init_ack后才返回。而proc_lib:spawn_link一调用就会立即返回子进程ID,启动完do_spwan将回调启动init_it方法

    init_it(GenMod, Starter, Parent, Mod, Args, Options) ->
        init_it2(GenMod, Starter, Parent, self(), Mod, Args, Options).
    
    init_it(GenMod, Starter, Parent, Name, Mod, Args, Options) ->
        case register_name(Name) of
        true ->
            init_it2(GenMod, Starter, Parent, Name, Mod, Args, Options);
        {false, Pid} ->
            proc_lib:init_ack(Starter, {error, {already_started, Pid}})
        end.
    
    init_it2(GenMod, Starter, Parent, Name, Mod, Args, Options) ->
        GenMod:init_it(Starter, Parent, Name, Mod, Args, Options).
    

    init_it方法,进行Name注册进程,如果注册失败 则返回,成功则启动行为模块中的init_it方法。

    • debug_options方法
    debug_options(Name, Opts) ->
        case lists:keyfind(debug, 1, Opts) of
        {_,Options} ->
            try sys:debug_options(Options)
            catch _:_ ->
                error_logger:format(
                  "~tp: ignoring erroneous debug options - ~tp~n",
                  [Name,Options]),
                []
            end;
        false ->
            []
        end.
    
    • call方法
    call(Process, Label, Request) -> 
        call(Process, Label, Request, ?default_timeout).
    
    %% Optimize a common case.
    call(Process, Label, Request, Timeout) when is_pid(Process),
      Timeout =:= infinity orelse is_integer(Timeout) andalso Timeout >= 0 ->
        do_call(Process, Label, Request, Timeout);
    call(Process, Label, Request, Timeout)
      when Timeout =:= infinity; is_integer(Timeout), Timeout >= 0 ->
        Fun = fun(Pid) -> do_call(Pid, Label, Request, Timeout) end,
        do_for_proc(Process, Fun).
    
    do_call(Process, Label, Request, Timeout) when is_atom(Process) =:= false ->
        Mref = erlang:monitor(process, Process),
        erlang:send(Process, {Label, {self(), Mref}, Request}, [noconnect]),
    
        receive
            {Mref, Reply} ->
                erlang:demonitor(Mref, [flush]),
                {ok, Reply};
            {'DOWN', Mref, _, _, noconnection} ->
                Node = get_node(Process),
                exit({nodedown, Node});
            {'DOWN', Mref, _, _, Reason} ->
                exit(Reason)
        after Timeout ->
                erlang:demonitor(Mref, [flush]),
                exit(timeout)
        end.
        do_for_proc(Pid, Fun) when is_pid(Pid) ->
        Fun(Pid);
    %% Local by name
    do_for_proc(Name, Fun) when is_atom(Name) ->
        case whereis(Name) of
        Pid when is_pid(Pid) ->
            Fun(Pid);
        undefined ->
            exit(noproc)
        end;
    %% Global by name
    do_for_proc(Process, Fun)
      when ((tuple_size(Process) == 2 andalso element(1, Process) == global)
        orelse
          (tuple_size(Process) == 3 andalso element(1, Process) == via)) ->
        case where(Process) of
        Pid when is_pid(Pid) ->
            Node = node(Pid),
            try Fun(Pid)
            catch
            exit:{nodedown, Node} ->
                %% A nodedown not yet detected by global,
                %% pretend that it was.
                exit(noproc)
            end;
        undefined ->
            exit(noproc)
        end;
    %% Local by name in disguise
    do_for_proc({Name, Node}, Fun) when Node =:= node() ->
        do_for_proc(Name, Fun);
    %% Remote by name
    do_for_proc({_Name, Node} = Process, Fun) when is_atom(Node) ->
        if
        node() =:= nonode@nohost ->
            exit({nodedown, Node});
        true ->
            Fun(Process)
        end.
    

    将监控Process进程,并向该进程发送消息,收到消息,则取消监控。do_for_proc调用Fun函数传入Pid,

    • reply发送消息给客户端
    reply({To, Tag}, Reply) ->
        Msg = {Tag, Reply},
        try To ! Msg catch _:_ -> Msg end.
    
    gen_server(通用客户端-服务端)源码分析

    暴露的方法:

    %% API
    -export([start/3, start/4,
         start_link/3, start_link/4,
         stop/1, stop/3,
         call/2, call/3,
         cast/2, reply/2,
         abcast/2, abcast/3,
         multi_call/2, multi_call/3, multi_call/4,
         enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/6]).
    

    需要回调的方法为: init, handle_call, handle_cast, handle_info, handle_continue, terminate,
    code_change,format_status,其中init, hand_call, hand_cast, terminate必须回调,

    • 启动方法:
    start(Mod, Args, Options) ->
        gen:start(?MODULE, nolink, Mod, Args, Options).
    
    start(Name, Mod, Args, Options) ->
        gen:start(?MODULE, nolink, Name, Mod, Args, Options).
    
    start_link(Mod, Args, Options) ->
        gen:start(?MODULE, link, Mod, Args, Options).
    
    start_link(Name, Mod, Args, Options) ->
        gen:start(?MODULE, link, Name, Mod, Args, Options).
    

    其中start_link实际上启动了一个带监控的进程, 进入gen:start, 在进入回调gen_server中的init_it方法,init其中调用Mod中的init方法进行操作。

    init_it(Starter, self, Name, Mod, Args, Options) ->
        init_it(Starter, self(), Name, Mod, Args, Options);
    init_it(Starter, Parent, Name0, Mod, Args, Options) ->
        Name = gen:name(Name0),
        Debug = gen:debug_options(Name, Options),
        HibernateAfterTimeout = gen:hibernate_after(Options),
    
        case init_it(Mod, Args) of
        {ok, {ok, State}} ->
            proc_lib:init_ack(Starter, {ok, self()}),       
            loop(Parent, Name, State, Mod, infinity, HibernateAfterTimeout, Debug);
        {ok, {ok, State, Timeout}} ->
            proc_lib:init_ack(Starter, {ok, self()}),       
            loop(Parent, Name, State, Mod, Timeout, HibernateAfterTimeout, Debug);
        {ok, {stop, Reason}} ->
            %% For consistency, we must make sure that the
            %% registered name (if any) is unregistered before
            %% the parent process is notified about the failure.
            %% (Otherwise, the parent process could get
            %% an 'already_started' error if it immediately
            %% tried starting the process again.)
            gen:unregister_name(Name0),
            proc_lib:init_ack(Starter, {error, Reason}),
            exit(Reason);
        {ok, ignore} ->
            gen:unregister_name(Name0),
            proc_lib:init_ack(Starter, ignore),
            exit(normal);
        {ok, Else} ->
            Error = {bad_return_value, Else},
            proc_lib:init_ack(Starter, {error, Error}),
            exit(Error);
        {'EXIT', Class, Reason, Stacktrace} ->
            gen:unregister_name(Name0),
            proc_lib:init_ack(Starter, {error, terminate_reason(Class, Reason, Stacktrace)}),
            erlang:raise(Class, Reason, Stacktrace)
        end.
    init_it(Mod, Args) ->
        try
        {ok, Mod:init(Args)}
        catch
        throw:R -> {ok, R};
        Class:R:S -> {'EXIT', Class, R, S}
        end.
    

    init_it完成之后判断结果的值,成功之后,进入loop方法,这个方法是,预先处理hand_call, hand_cast这些回调方法,当cast call请求后 可以 回调响应的hand_call hand_cast方法。

    loop(Parent, Name, State, Mod, {continue, Continue} = Msg, HibernateAfterTimeout, Debug) ->
        Reply = try_dispatch(Mod, handle_continue, Continue, State),
        case Debug of
        [] ->
            handle_common_reply(Reply, Parent, Name, undefined, Msg, Mod,
                    HibernateAfterTimeout, State);
        _ ->
            Debug1 = sys:handle_debug(Debug, fun print_event/3, Name, Msg),
            handle_common_reply(Reply, Parent, Name, undefined, Msg, Mod,
                    HibernateAfterTimeout, State, Debug1)
        end;
    
    loop(Parent, Name, State, Mod, hibernate, HibernateAfterTimeout, Debug) ->
        proc_lib:hibernate(?MODULE,wake_hib,[Parent, Name, State, Mod, HibernateAfterTimeout, Debug]);
    
    loop(Parent, Name, State, Mod, infinity, HibernateAfterTimeout, Debug) ->
        receive
            Msg ->
                decode_msg(Msg, Parent, Name, State, Mod, infinity, HibernateAfterTimeout, Debug, false)
        after HibernateAfterTimeout ->
            loop(Parent, Name, State, Mod, hibernate, HibernateAfterTimeout, Debug)
        end;
    
    loop(Parent, Name, State, Mod, Time, HibernateAfterTimeout, Debug) ->
        Msg = receive
              Input ->
                Input
          after Time ->
              timeout
          end,
        decode_msg(Msg, Parent, Name, State, Mod, Time, HibernateAfterTimeout, Debug, false).
    
    wake_hib(Parent, Name, State, Mod, HibernateAfterTimeout, Debug) ->
        Msg = receive
              Input ->
              Input
          end,
        decode_msg(Msg, Parent, Name, State, Mod, hibernate, HibernateAfterTimeout, Debug, true).
    
    decode_msg(Msg, Parent, Name, State, Mod, Time, HibernateAfterTimeout, Debug, Hib) ->
        case Msg of
        {system, From, Req} ->
            sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
                      [Name, State, Mod, Time, HibernateAfterTimeout], Hib);
        {'EXIT', Parent, Reason} ->
            terminate(Reason, ?STACKTRACE(), Name, undefined, Msg, Mod, State, Debug);
        _Msg when Debug =:= [] ->
            handle_msg(Msg, Parent, Name, State, Mod, HibernateAfterTimeout);
        _Msg ->
            Debug1 = sys:handle_debug(Debug, fun print_event/3,
                          Name, {in, Msg}),
            handle_msg(Msg, Parent, Name, State, Mod, HibernateAfterTimeout, Debug1)
        end.
    

    将进入handle_msg 方法,hangle_msg将处理回调方法,
    例如:

    try_handle_call(Mod, Msg, From, State) ->
        try
        {ok, Mod:handle_call(Msg, From, State)}
        catch
        throw:R ->
            {ok, R};
        Class:R:Stacktrace ->
            {'EXIT', Class, R, Stacktrace}
        end.
    

    call是同步调用,cast异步调用。
    三个handle开头的回调函数对应着三种不同的使用服务器的方式。如下:

    gen_server:call ------------- handle_call/3
    gen_server:cast ------------- handle_cast/2
    用!向服务进程发消息 ------------- handle_info/2
    call是有返回值的调用;cast是无返回值的调用,即通知;而直接向服务器进程发的 消息则由handle_info处理。

    • call
      call对应的回调函数handle_call/3在正常情况下的返回值是{reply, Reply, NewState},使用call要小心的是,两个服务器进程不能互相call,不然会死锁。gen_server将调用Module:handle_call/3来处理该请求。
      其中ServerRef 可以是:
      • 进程ID,
      • 进程名:如果gen_server被注册为本地进程的话;
      • {Name,Node}:如果gen_server在其它结点注册的话;
      • {global,GlobalName}:如果gen_server注册为全局进程的话
      call源码如下
    call(Name, Request) ->
        case catch gen:call(Name, '$gen_call', Request) of
        {ok,Res} ->
            Res;
        {'EXIT',Reason} ->
            exit({Reason, {?MODULE, call, [Name, Request]}})
        end.
    
    call(Name, Request, Timeout) ->
        case catch gen:call(Name, '$gen_call', Request, Timeout) of
        {ok,Res} ->
            Res;
        {'EXIT',Reason} ->
            exit({Reason, {?MODULE, call, [Name, Request, Timeout]}})
        end.
    
    • cast
      cast是没有返回值的调用,实际是往这个Pid发送消息。它是一个“异步”的调用,调用后会直接收到 ok,无需等待回调函数执行完毕,直接发送消息,无需返回消息,gen_server将调用Module:handle_cast/2来处理请求。
    cast({global,Name}, Request) ->
        catch global:send(Name, cast_msg(Request)),
        ok;
    cast({via, Mod, Name}, Request) ->
        catch Mod:send(Name, cast_msg(Request)),
        ok;
    cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) -> 
        do_cast(Dest, Request);
    cast(Dest, Request) when is_atom(Dest) ->
        do_cast(Dest, Request);
    cast(Dest, Request) when is_pid(Dest) ->
        do_cast(Dest, Request).
    
    do_cast(Dest, Request) -> 
        do_send(Dest, cast_msg(Request)),
        ok.
    
    do_send(Dest, Msg) ->
        try erlang:send(Dest, Msg)
        catch
            error:_ -> ok
        end,
        ok.
    
    • enter_loop方法
    enter_loop(Mod, Options, State) ->
        enter_loop(Mod, Options, State, self(), infinity).
    
    enter_loop(Mod, Options, State, ServerName = {Scope, _})
      when Scope == local; Scope == global ->
        enter_loop(Mod, Options, State, ServerName, infinity);
    
    enter_loop(Mod, Options, State, ServerName = {via, _, _}) ->
        enter_loop(Mod, Options, State, ServerName, infinity);
    
    enter_loop(Mod, Options, State, Timeout) ->
        enter_loop(Mod, Options, State, self(), Timeout).
    
    enter_loop(Mod, Options, State, ServerName, Timeout) ->
        Name = gen:get_proc_name(ServerName),
        Parent = gen:get_parent(),
        Debug = gen:debug_options(Name, Options),
        HibernateAfterTimeout = gen:hibernate_after(Options),
        loop(Parent, Name, State, Mod, Timeout, HibernateAfterTimeout, Debug).
    

    调用enter_loop将进入loop,使一个已存在的进程成为gen_server.函数不会返回,调用该函数的进程将进入gen_server的接收循环,并成为一个gen_server进程。调用进程必须是用proc_lib中的启动函数之一启动的。使用者需要负责对进程做所有初始化工作,包括给进程注册名字。当初始化过程非常复杂,而gen_server行为提供的初始化接口不能完成时,这个函数非就常有用。
    Module,Options 和ServerName跟调用gen_server:start[link]/3,4时的意义一样。但是,如果指定了ServerName的话,调用进程必须在调用该函数前已经被注册为ServerName。
    State 和Timeout跟Module:init/1返回值中的意义一样。并且回调模块不必导出init/1函数。
    失败情况:调用进程不是通过proc_lib中的启动函数启动的,或者没用注册为ServerName

    • abcast 向指定的节点中所有注册为Name的gen_server发送异步请求。不管节点和gen_server Name是否存在,函数都立即返回.gen_server将调用Module:handle_cast/2来处理请求。
    abcast(Name, Request) when is_atom(Name) ->
        do_abcast([node() | nodes()], Name, cast_msg(Request)).
    
    abcast(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) ->
        do_abcast(Nodes, Name, cast_msg(Request)).
    
    do_abcast([Node|Nodes], Name, Msg) when is_atom(Node) ->
        do_send({Name,Node},Msg),
        do_abcast(Nodes, Name, Msg);
    do_abcast([], _,_) -> abcast.
    
    • multi_call
      对所指定的多个节点中注册为Name的gen_server进程发送请求作同步调用,然后等待返回。gen_server将调用Module:handle_call/3来处理请求,函数返回一个元组{Replies,BadNodes},其中Replies是以{Node,Reply}为元素的列表,BadNodes是以下情况的节点列表:节点不存在,或以Name为名的gen_server在该节点不存在或无返回。Nodes是发送请求的目标节点名的列表,默认值是所有已知节点的列表[node()|nodes()].Name是每gen_servr在其本地注册的名字。
      Request 可以为任意项,其作为参数之一传递给Module:handle_call/3。
      Timeout为大于0的整数,指定了等待返回值的最长毫秒数,或者设为infinity让函数无限等待。默认值为infinity.如果在指定的时间内没有收到节点的返回
    do_multi_call(Nodes, Name, Req, infinity) ->
        Tag = make_ref(),
        Monitors = send_nodes(Nodes, Name, Tag, Req),
        rec_nodes(Tag, Monitors, Name, undefined);
    do_multi_call(Nodes, Name, Req, Timeout) ->
        Tag = make_ref(),
        Caller = self(),
        Receiver =
        spawn(
          fun() ->
              %% Middleman process. Should be unsensitive to regular
              %% exit signals. The sychronization is needed in case
              %% the receiver would exit before the caller started
              %% the monitor.
              process_flag(trap_exit, true),
              Mref = erlang:monitor(process, Caller),
              receive
                  {Caller,Tag} ->
                  Monitors = send_nodes(Nodes, Name, Tag, Req),
                  TimerId = erlang:start_timer(Timeout, self(), ok),
                  Result = rec_nodes(Tag, Monitors, Name, TimerId),
                  exit({self(),Tag,Result});
                  {'DOWN',Mref,_,_,_} ->
                  %% Caller died before sending us the go-ahead.
                  %% Give up silently.
                  exit(normal)
              end
          end),
        Mref = erlang:monitor(process, Receiver),
        Receiver ! {self(),Tag},
        receive
        {'DOWN',Mref,_,_,{Receiver,Tag,Result}} ->
            Result;
        {'DOWN',Mref,_,_,Reason} ->
            %% The middleman code failed. Or someone did 
            %% exit(_, kill) on the middleman process => Reason==killed
            exit(Reason)
        end.
    

    multi_call调用了do_multi_call,do_multi_call将启动启动会一个进程,去监控process,向Receiver发送消息,为了避免延时返回污染调用者的消息队列,有一个中间进程来做实际的调用。当延时返回发送结已终止进程时将被丢弃,

    相关文章

      网友评论

        本文标题:Erlang 源码之OTP通用服务器行为模式分析(1)

        本文链接:https://www.haomeiwen.com/subject/vjyelqtx.html