Erlang并发基于Actor并发模型,容错 Actor之间的通信是异步的,发送方只管发送,不关心超时以及错误,这些都由框架层和独立的错误处理机制接管。Erlang/OTP 是Actor模型的标杆,OTP行为模式的理解非常像面向对象语言的接口,OTP行为模式分成通用部分和具体实现部分,行为模式包括gen_server, gen_statem, gen_event, gen_fsm, supervisor,下面分析它们实现的原理,
可以看到gen_event, gen_fsm, gen_statem, gen_event,依赖于gen这个通用模块,这种关于类似于
sequenceDiagram
调用者->>行为模块: 请求
行为模块->>调用者: 响应
行为模块->>gen: 请求
gen-->>调用者: 响应
下面分析gen源代码
- gen暴露的方法,
-export([start/5, start/6, debug_options/2, hibernate_after/1,
name/1, unregister_name/1, get_proc_name/1, get_parent/0,
call/3, call/4, reply/2, stop/1, stop/3]).
-export([init_it/6, init_it/7]).
-export([format_status_header/2]).
首先看start方法实现代码块
-spec start(module(), linkage(), emgr_name(), module(), term(), options()) ->
start_ret().
start(GenMod, LinkP, Name, Mod, Args, Options) ->
case where(Name) of
undefined ->
do_spawn(GenMod, LinkP, Name, Mod, Args, Options);
Pid ->
{error, {already_started, Pid}}
end.
-spec start(module(), linkage(), module(), term(), options()) -> start_ret().
start(GenMod, LinkP, Mod, Args, Options) ->
do_spawn(GenMod, LinkP, Mod, Args, Options).
%%-----------------------------------------------------------------
%% Spawn the process (and link) maybe at another node.
%% If spawn without link, set parent to ourselves 'self'!!!
%%-----------------------------------------------------------------
do_spawn(GenMod, link, Mod, Args, Options) ->
Time = timeout(Options),
proc_lib:start_link(?MODULE, init_it,
[GenMod, self(), self(), Mod, Args, Options],
Time,
spawn_opts(Options));
do_spawn(GenMod, _, Mod, Args, Options) ->
Time = timeout(Options),
proc_lib:start(?MODULE, init_it,
[GenMod, self(), self, Mod, Args, Options],
Time,
spawn_opts(Options)).
do_spawn(GenMod, link, Name, Mod, Args, Options) ->
Time = timeout(Options),
proc_lib:start_link(?MODULE, init_it,
[GenMod, self(), self(), Name, Mod, Args, Options],
Time,
spawn_opts(Options));
do_spawn(GenMod, _, Name, Mod, Args, Options) ->
Time = timeout(Options),
proc_lib:start(?MODULE, init_it,
[GenMod, self(), self, Name, Mod, Args, Options],
Time,
spawn_opts(Options)).
-type linkage() :: 'link' | 'nolink'.
-type emgr_name() :: {'local', atom()}
| {'global', term()}
| {'via', Module :: module(), Name :: term()}.
-type start_ret() :: {'ok', pid()} | 'ignore' | {'error', term()}.
-type debug_flag() :: 'trace' | 'log' | 'statistics' | 'debug'
| {'logfile', string()}.
-type option() :: {'timeout', timeout()}
| {'debug', [debug_flag()]}
| {'hibernate_after', timeout()}
| {'spawn_opt', [proc_lib:spawn_option()]}.
-type options() :: [option()].
第一个start传入6个参数,GenMod表示属于哪个行为,LinkP表示否启动Link监控进程,link方式可以建立进程之间的双向链接关系,当其中一个进程退出时,另一个进程会收到该进程退出的消息,Name表示进程名称,Mod表示模块名称,Args表示多余参数,Options表示是否debug hibernate spawn_otp某一选项,
第一个start方法进行了where方法判断 判断是否已经存在,如果存在返回改模块已经启动。没有启动则进入do_spwan方法中
where({global, Name}) -> global:whereis_name(Name);
where({via, Module, Name}) -> Module:whereis_name(Name);
where({local, Name}) -> whereis(Name).
do_spwan根据不同的参数,调用proc_lib:start_link或者proc_lib:start,proc_lib用来同步启动符合OTP设计原则的进程,start_link带了进程监控,proc_lib:start_link和proc_lib:spawn_link的不同之处在于,前者是同步创建子进程,后者是异步创建子进程,proc_lib:start_link调用后会阻塞,直到子进程初始化完毕,调用proc_lib:init_ack后才返回。而proc_lib:spawn_link一调用就会立即返回子进程ID,启动完do_spwan将回调启动init_it方法
init_it(GenMod, Starter, Parent, Mod, Args, Options) ->
init_it2(GenMod, Starter, Parent, self(), Mod, Args, Options).
init_it(GenMod, Starter, Parent, Name, Mod, Args, Options) ->
case register_name(Name) of
true ->
init_it2(GenMod, Starter, Parent, Name, Mod, Args, Options);
{false, Pid} ->
proc_lib:init_ack(Starter, {error, {already_started, Pid}})
end.
init_it2(GenMod, Starter, Parent, Name, Mod, Args, Options) ->
GenMod:init_it(Starter, Parent, Name, Mod, Args, Options).
init_it方法,进行Name注册进程,如果注册失败 则返回,成功则启动行为模块中的init_it方法。
- debug_options方法
debug_options(Name, Opts) ->
case lists:keyfind(debug, 1, Opts) of
{_,Options} ->
try sys:debug_options(Options)
catch _:_ ->
error_logger:format(
"~tp: ignoring erroneous debug options - ~tp~n",
[Name,Options]),
[]
end;
false ->
[]
end.
- call方法
call(Process, Label, Request) ->
call(Process, Label, Request, ?default_timeout).
%% Optimize a common case.
call(Process, Label, Request, Timeout) when is_pid(Process),
Timeout =:= infinity orelse is_integer(Timeout) andalso Timeout >= 0 ->
do_call(Process, Label, Request, Timeout);
call(Process, Label, Request, Timeout)
when Timeout =:= infinity; is_integer(Timeout), Timeout >= 0 ->
Fun = fun(Pid) -> do_call(Pid, Label, Request, Timeout) end,
do_for_proc(Process, Fun).
do_call(Process, Label, Request, Timeout) when is_atom(Process) =:= false ->
Mref = erlang:monitor(process, Process),
erlang:send(Process, {Label, {self(), Mref}, Request}, [noconnect]),
receive
{Mref, Reply} ->
erlang:demonitor(Mref, [flush]),
{ok, Reply};
{'DOWN', Mref, _, _, noconnection} ->
Node = get_node(Process),
exit({nodedown, Node});
{'DOWN', Mref, _, _, Reason} ->
exit(Reason)
after Timeout ->
erlang:demonitor(Mref, [flush]),
exit(timeout)
end.
do_for_proc(Pid, Fun) when is_pid(Pid) ->
Fun(Pid);
%% Local by name
do_for_proc(Name, Fun) when is_atom(Name) ->
case whereis(Name) of
Pid when is_pid(Pid) ->
Fun(Pid);
undefined ->
exit(noproc)
end;
%% Global by name
do_for_proc(Process, Fun)
when ((tuple_size(Process) == 2 andalso element(1, Process) == global)
orelse
(tuple_size(Process) == 3 andalso element(1, Process) == via)) ->
case where(Process) of
Pid when is_pid(Pid) ->
Node = node(Pid),
try Fun(Pid)
catch
exit:{nodedown, Node} ->
%% A nodedown not yet detected by global,
%% pretend that it was.
exit(noproc)
end;
undefined ->
exit(noproc)
end;
%% Local by name in disguise
do_for_proc({Name, Node}, Fun) when Node =:= node() ->
do_for_proc(Name, Fun);
%% Remote by name
do_for_proc({_Name, Node} = Process, Fun) when is_atom(Node) ->
if
node() =:= nonode@nohost ->
exit({nodedown, Node});
true ->
Fun(Process)
end.
将监控Process进程,并向该进程发送消息,收到消息,则取消监控。do_for_proc调用Fun函数传入Pid,
- reply发送消息给客户端
reply({To, Tag}, Reply) ->
Msg = {Tag, Reply},
try To ! Msg catch _:_ -> Msg end.
gen_server(通用客户端-服务端)源码分析
暴露的方法:
%% API
-export([start/3, start/4,
start_link/3, start_link/4,
stop/1, stop/3,
call/2, call/3,
cast/2, reply/2,
abcast/2, abcast/3,
multi_call/2, multi_call/3, multi_call/4,
enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/6]).
需要回调的方法为: init, handle_call, handle_cast, handle_info, handle_continue, terminate,
code_change,format_status,其中init, hand_call, hand_cast, terminate必须回调,
- 启动方法:
start(Mod, Args, Options) ->
gen:start(?MODULE, nolink, Mod, Args, Options).
start(Name, Mod, Args, Options) ->
gen:start(?MODULE, nolink, Name, Mod, Args, Options).
start_link(Mod, Args, Options) ->
gen:start(?MODULE, link, Mod, Args, Options).
start_link(Name, Mod, Args, Options) ->
gen:start(?MODULE, link, Name, Mod, Args, Options).
其中start_link实际上启动了一个带监控的进程, 进入gen:start, 在进入回调gen_server中的init_it方法,init其中调用Mod中的init方法进行操作。
init_it(Starter, self, Name, Mod, Args, Options) ->
init_it(Starter, self(), Name, Mod, Args, Options);
init_it(Starter, Parent, Name0, Mod, Args, Options) ->
Name = gen:name(Name0),
Debug = gen:debug_options(Name, Options),
HibernateAfterTimeout = gen:hibernate_after(Options),
case init_it(Mod, Args) of
{ok, {ok, State}} ->
proc_lib:init_ack(Starter, {ok, self()}),
loop(Parent, Name, State, Mod, infinity, HibernateAfterTimeout, Debug);
{ok, {ok, State, Timeout}} ->
proc_lib:init_ack(Starter, {ok, self()}),
loop(Parent, Name, State, Mod, Timeout, HibernateAfterTimeout, Debug);
{ok, {stop, Reason}} ->
%% For consistency, we must make sure that the
%% registered name (if any) is unregistered before
%% the parent process is notified about the failure.
%% (Otherwise, the parent process could get
%% an 'already_started' error if it immediately
%% tried starting the process again.)
gen:unregister_name(Name0),
proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason);
{ok, ignore} ->
gen:unregister_name(Name0),
proc_lib:init_ack(Starter, ignore),
exit(normal);
{ok, Else} ->
Error = {bad_return_value, Else},
proc_lib:init_ack(Starter, {error, Error}),
exit(Error);
{'EXIT', Class, Reason, Stacktrace} ->
gen:unregister_name(Name0),
proc_lib:init_ack(Starter, {error, terminate_reason(Class, Reason, Stacktrace)}),
erlang:raise(Class, Reason, Stacktrace)
end.
init_it(Mod, Args) ->
try
{ok, Mod:init(Args)}
catch
throw:R -> {ok, R};
Class:R:S -> {'EXIT', Class, R, S}
end.
init_it完成之后判断结果的值,成功之后,进入loop方法,这个方法是,预先处理hand_call, hand_cast这些回调方法,当cast call请求后 可以 回调响应的hand_call hand_cast方法。
loop(Parent, Name, State, Mod, {continue, Continue} = Msg, HibernateAfterTimeout, Debug) ->
Reply = try_dispatch(Mod, handle_continue, Continue, State),
case Debug of
[] ->
handle_common_reply(Reply, Parent, Name, undefined, Msg, Mod,
HibernateAfterTimeout, State);
_ ->
Debug1 = sys:handle_debug(Debug, fun print_event/3, Name, Msg),
handle_common_reply(Reply, Parent, Name, undefined, Msg, Mod,
HibernateAfterTimeout, State, Debug1)
end;
loop(Parent, Name, State, Mod, hibernate, HibernateAfterTimeout, Debug) ->
proc_lib:hibernate(?MODULE,wake_hib,[Parent, Name, State, Mod, HibernateAfterTimeout, Debug]);
loop(Parent, Name, State, Mod, infinity, HibernateAfterTimeout, Debug) ->
receive
Msg ->
decode_msg(Msg, Parent, Name, State, Mod, infinity, HibernateAfterTimeout, Debug, false)
after HibernateAfterTimeout ->
loop(Parent, Name, State, Mod, hibernate, HibernateAfterTimeout, Debug)
end;
loop(Parent, Name, State, Mod, Time, HibernateAfterTimeout, Debug) ->
Msg = receive
Input ->
Input
after Time ->
timeout
end,
decode_msg(Msg, Parent, Name, State, Mod, Time, HibernateAfterTimeout, Debug, false).
wake_hib(Parent, Name, State, Mod, HibernateAfterTimeout, Debug) ->
Msg = receive
Input ->
Input
end,
decode_msg(Msg, Parent, Name, State, Mod, hibernate, HibernateAfterTimeout, Debug, true).
decode_msg(Msg, Parent, Name, State, Mod, Time, HibernateAfterTimeout, Debug, Hib) ->
case Msg of
{system, From, Req} ->
sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
[Name, State, Mod, Time, HibernateAfterTimeout], Hib);
{'EXIT', Parent, Reason} ->
terminate(Reason, ?STACKTRACE(), Name, undefined, Msg, Mod, State, Debug);
_Msg when Debug =:= [] ->
handle_msg(Msg, Parent, Name, State, Mod, HibernateAfterTimeout);
_Msg ->
Debug1 = sys:handle_debug(Debug, fun print_event/3,
Name, {in, Msg}),
handle_msg(Msg, Parent, Name, State, Mod, HibernateAfterTimeout, Debug1)
end.
将进入handle_msg 方法,hangle_msg将处理回调方法,
例如:
try_handle_call(Mod, Msg, From, State) ->
try
{ok, Mod:handle_call(Msg, From, State)}
catch
throw:R ->
{ok, R};
Class:R:Stacktrace ->
{'EXIT', Class, R, Stacktrace}
end.
call是同步调用,cast异步调用。
三个handle开头的回调函数对应着三种不同的使用服务器的方式。如下:
gen_server:call ------------- handle_call/3
gen_server:cast ------------- handle_cast/2
用!向服务进程发消息 ------------- handle_info/2
call是有返回值的调用;cast是无返回值的调用,即通知;而直接向服务器进程发的 消息则由handle_info处理。
- call
call对应的回调函数handle_call/3在正常情况下的返回值是{reply, Reply, NewState},使用call要小心的是,两个服务器进程不能互相call,不然会死锁。gen_server将调用Module:handle_call/3来处理该请求。
其中ServerRef 可以是:
• 进程ID,
• 进程名:如果gen_server被注册为本地进程的话;
• {Name,Node}:如果gen_server在其它结点注册的话;
• {global,GlobalName}:如果gen_server注册为全局进程的话
call源码如下
call(Name, Request) ->
case catch gen:call(Name, '$gen_call', Request) of
{ok,Res} ->
Res;
{'EXIT',Reason} ->
exit({Reason, {?MODULE, call, [Name, Request]}})
end.
call(Name, Request, Timeout) ->
case catch gen:call(Name, '$gen_call', Request, Timeout) of
{ok,Res} ->
Res;
{'EXIT',Reason} ->
exit({Reason, {?MODULE, call, [Name, Request, Timeout]}})
end.
- cast
cast是没有返回值的调用,实际是往这个Pid发送消息。它是一个“异步”的调用,调用后会直接收到 ok,无需等待回调函数执行完毕,直接发送消息,无需返回消息,gen_server将调用Module:handle_cast/2来处理请求。
cast({global,Name}, Request) ->
catch global:send(Name, cast_msg(Request)),
ok;
cast({via, Mod, Name}, Request) ->
catch Mod:send(Name, cast_msg(Request)),
ok;
cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) ->
do_cast(Dest, Request);
cast(Dest, Request) when is_atom(Dest) ->
do_cast(Dest, Request);
cast(Dest, Request) when is_pid(Dest) ->
do_cast(Dest, Request).
do_cast(Dest, Request) ->
do_send(Dest, cast_msg(Request)),
ok.
do_send(Dest, Msg) ->
try erlang:send(Dest, Msg)
catch
error:_ -> ok
end,
ok.
- enter_loop方法
enter_loop(Mod, Options, State) ->
enter_loop(Mod, Options, State, self(), infinity).
enter_loop(Mod, Options, State, ServerName = {Scope, _})
when Scope == local; Scope == global ->
enter_loop(Mod, Options, State, ServerName, infinity);
enter_loop(Mod, Options, State, ServerName = {via, _, _}) ->
enter_loop(Mod, Options, State, ServerName, infinity);
enter_loop(Mod, Options, State, Timeout) ->
enter_loop(Mod, Options, State, self(), Timeout).
enter_loop(Mod, Options, State, ServerName, Timeout) ->
Name = gen:get_proc_name(ServerName),
Parent = gen:get_parent(),
Debug = gen:debug_options(Name, Options),
HibernateAfterTimeout = gen:hibernate_after(Options),
loop(Parent, Name, State, Mod, Timeout, HibernateAfterTimeout, Debug).
调用enter_loop将进入loop,使一个已存在的进程成为gen_server.函数不会返回,调用该函数的进程将进入gen_server的接收循环,并成为一个gen_server进程。调用进程必须是用proc_lib中的启动函数之一启动的。使用者需要负责对进程做所有初始化工作,包括给进程注册名字。当初始化过程非常复杂,而gen_server行为提供的初始化接口不能完成时,这个函数非就常有用。
Module,Options 和ServerName跟调用gen_server:start[link]/3,4时的意义一样。但是,如果指定了ServerName的话,调用进程必须在调用该函数前已经被注册为ServerName。
State 和Timeout跟Module:init/1返回值中的意义一样。并且回调模块不必导出init/1函数。
失败情况:调用进程不是通过proc_lib中的启动函数启动的,或者没用注册为ServerName
- abcast 向指定的节点中所有注册为Name的gen_server发送异步请求。不管节点和gen_server Name是否存在,函数都立即返回.gen_server将调用Module:handle_cast/2来处理请求。
abcast(Name, Request) when is_atom(Name) ->
do_abcast([node() | nodes()], Name, cast_msg(Request)).
abcast(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) ->
do_abcast(Nodes, Name, cast_msg(Request)).
do_abcast([Node|Nodes], Name, Msg) when is_atom(Node) ->
do_send({Name,Node},Msg),
do_abcast(Nodes, Name, Msg);
do_abcast([], _,_) -> abcast.
- multi_call
对所指定的多个节点中注册为Name的gen_server进程发送请求作同步调用,然后等待返回。gen_server将调用Module:handle_call/3来处理请求,函数返回一个元组{Replies,BadNodes},其中Replies是以{Node,Reply}为元素的列表,BadNodes是以下情况的节点列表:节点不存在,或以Name为名的gen_server在该节点不存在或无返回。Nodes是发送请求的目标节点名的列表,默认值是所有已知节点的列表[node()|nodes()].Name是每gen_servr在其本地注册的名字。
Request 可以为任意项,其作为参数之一传递给Module:handle_call/3。
Timeout为大于0的整数,指定了等待返回值的最长毫秒数,或者设为infinity让函数无限等待。默认值为infinity.如果在指定的时间内没有收到节点的返回
do_multi_call(Nodes, Name, Req, infinity) ->
Tag = make_ref(),
Monitors = send_nodes(Nodes, Name, Tag, Req),
rec_nodes(Tag, Monitors, Name, undefined);
do_multi_call(Nodes, Name, Req, Timeout) ->
Tag = make_ref(),
Caller = self(),
Receiver =
spawn(
fun() ->
%% Middleman process. Should be unsensitive to regular
%% exit signals. The sychronization is needed in case
%% the receiver would exit before the caller started
%% the monitor.
process_flag(trap_exit, true),
Mref = erlang:monitor(process, Caller),
receive
{Caller,Tag} ->
Monitors = send_nodes(Nodes, Name, Tag, Req),
TimerId = erlang:start_timer(Timeout, self(), ok),
Result = rec_nodes(Tag, Monitors, Name, TimerId),
exit({self(),Tag,Result});
{'DOWN',Mref,_,_,_} ->
%% Caller died before sending us the go-ahead.
%% Give up silently.
exit(normal)
end
end),
Mref = erlang:monitor(process, Receiver),
Receiver ! {self(),Tag},
receive
{'DOWN',Mref,_,_,{Receiver,Tag,Result}} ->
Result;
{'DOWN',Mref,_,_,Reason} ->
%% The middleman code failed. Or someone did
%% exit(_, kill) on the middleman process => Reason==killed
exit(Reason)
end.
multi_call调用了do_multi_call,do_multi_call将启动启动会一个进程,去监控process,向Receiver发送消息,为了避免延时返回污染调用者的消息队列,有一个中间进程来做实际的调用。当延时返回发送结已终止进程时将被丢弃,
网友评论