美文网首页
[Erlang开发之路]二十三点二、用OTP去改造构建Job_C

[Erlang开发之路]二十三点二、用OTP去改造构建Job_C

作者: 循环不计次 | 来源:发表于2019-07-25 11:00 被阅读0次

文件结构:


|--job_center_alarm.erl [gen_event通用事件处理]
|--job_center_app.erl [application回调模块]
|--job_center_supervisor.erl [supervisor监控树的应用]
|--job_center.app [application资源文件]
|--job_center.erl [gen_server构建的服务器]

1.job_center_alarm.erl [gen_event通用事件处理]

-module(job_center_alarm).
-export([init/1,handle_event/2,handle_call/2,handle_info/2,code_change/3,terminate/2]).
-behaviour(gen_event).

init(Args)->{ok,0}.

handle_event({set_alarm,What},State)->
        io:format("Alarm comming:~p!~n",[What]),
        {ok,State};

handle_event({clear_alarm,What},State)->
        io:format("Alarm_clean:~p!~n",[What]),
        {ok,State}.

handle_call(_Req,State)->
        {ok,ignore,State}.

handle_info(_Info,State)->
        {ok,State}.

code_change(_OldVsn,State,_Extra)->{ok,State}.

terminate(_Reason,_State)->ok.

2.job_center_app.erl [application回调模块]

-module(job_center_app).
-behaviour(application).
-export([start/2,stop/1]).
start(_Type,StartArgs)->
        job_center_supervisor:start_link(StartArgs).
stop(_State)->
        ok.

3.job_center_supervisor.erl [supervisor监控树的应用]

-module(job_center_supervisor).
-export([start/0,test/0,init/1,start_link/1]).
-behaviour(supervisor).
-define(MAXRESTARTS,3).
-define(TIME,10).
start()->
        spawn(fun()-> supervisor:start_link({local,?MODULE},?MODULE,[]) end).
start_link(Args)->
        supervisor:start_link({local,?MODULE},?MODULE,Args).
test()->
        {ok,Pid}=supervisor:start_link({local,?MODULE},?MODULE,[]),
        unlink(Pid).
init([])->
        gen_event:swap_handler(alarm_handler,{alarm_handler,swap},{job_center_alarm,[]}),
        {ok,{{one_for_one,?MAXRESTARTS,?TIME},
        [{job_center,
         {job_center,start_link,[]},
         permanent,
         10000,
         worker,
         [job_center]
         }
        ]}}.

4.job_center.app [application资源文件]

{application,job_center_app,
[{description,"A Job Deliver Center 1.0"},
 {vsn,"1.0"},
 {modules,[job_center_app,job_center,job_center_supervisor,job_center_alarm]},
 {registered,[job_center]},
 {applications,[kernel,stdlib]},
 {mod,{job_center_app,[]}},
 {start_phases,[]}
]}.

5.job_center.erl [gen_server构建的服务器]

-module(job_center).
-behaviour(gen_server).
-export([code_change/3,terminate/2,test/0,start_link/0,job_getStatistic/1,stop/0,init/1,add_job/1,work_wanted/0,job_done/2,handle_call/3,handle_cast/2,handle_info/2,job_statistic/0]).
-import(gen_server,[start_link/4,call/2]).
-spec add_job(fun())->integer().

-define(debug_flag,true).
-ifdef(debug_flag).
-define(DEBUG(X),io:format("DEBUG ~p:~p ~p~n",[?MODULE,?LINE,X])).
-else.
-define(DEBUG(X),void).
-endif.

test()->
        start_link(),
        job_center:add_job(fun()->good_job end).


start_link()->
        io:format("start_link_now!~n"),
        start_link({local,?MODULE},?MODULE,[],[]).

stop()->
        call(?MODULE,stop).

init([])->
        io:format("job_center Start!~n"),
        %process_flag(trap_exit,true),
        State=ets:new(?MODULE,[]),
        ets:insert(State,{job_index,0}),
        {ok,State}.
%                      user api
add_job(Fun)->
        call(?MODULE,{add_job,Fun}).

work_wanted()->
        call(?MODULE,{work_wanted}).

job_done(JobNumber,Status)->
        io:format("Request change Job:~p Status:~p~n",[JobNumber,Status]),
        call(?MODULE,{job_done,JobNumber,Status}).

job_statistic()->
        call(?MODULE,{job_statistic}).

job_getStatistic(Index)->
        call(?MODULE,{job_getStatistic,Index}).

%                    callback Function 
handle_call({job_getStatistic,Index},_From,State)->
        {reply,ets:lookup_element(State,Index,3),State};
handle_call({job_statistic},_From,State)->
        %未完成的任务数
        Match_not_done=ets:match(State,{'$1','$2',not_done}),
        io:format("未完成的数量:~p~n",[length(Match_not_done)]),
        Match_doing=ets:match(State,{'$1','$2',doing}),
        io:format("正在进行的数量:~p~n",[length(Match_doing)]),
        Match_done=ets:match(State,{'$1','$2',done}),
        io:format("已完成的数量:~p~n",[length(Match_done)]),
        {reply,show_all_finish,State};
handle_call({add_job,Fun},_From,State)->
        [{job_index,Index}]=ets:lookup(State,job_index),
        ets:insert(State,[{job_index,Index+1},{Index,Fun,not_done}]),
        Reply=ok,
        {reply,Reply,State};

handle_call({work_wanted},{Pid,_Ref},State)->
        Match_Ret=ets:match(State,{'$1','$2',not_done},1),
        case Match_Ret of
            '$end_of_table'->%没job了
                    Fun=void,
                    Index=-1,
                    {reply,{no_job_now,Index,Fun},State};
             {[[Index,Fun]],_}->%判断表中还有未完成的job
                    ets:update_element(State,Index,{3,doing}),%更新状态为doing正在进行
                    %-------------------检测崩溃
                    spawn_link(fun()->
                    Ref=monitor(process,Pid),
                    receive
                            {'DOWN',Ref,process,Pid,Why}->
                                    io:format("worker:~p leave for ~p ~n",[Pid,Why]),
                                    Bool=Why=:=normal,
                                    if 
                                            Bool=:=false-> 
                                                    job_center:job_done(Index,not_done);
                                            Bool=:=true->
                                                    void
                                    end
                    end
                          end),%监控是否崩溃防止崩溃时数据停留
                    %-------------------检测超时
                    spawn(fun()->
                        timer:sleep(4000),
                        case ets:lookup_element(State,Index,3) of
                                    doing->Pid ! {hurry_up};
                                    _->ok
                        end,
                        timer:sleep(2000),
                        exit(Pid,you_are_fired)
                          end),%检测超时,超过6秒就炒鱿鱼
                    {reply,{get_job_succ,Index,Fun,5000},State}
        end;
        

handle_call({job_done,JobNumber,Status},_From,State)->
        case ets:member(State,JobNumber) of%首先判断这个键值是否存在,存在才可以去lookup否则报错
                true->
                        Status_change_before=ets:lookup_element(State,JobNumber,3),
                        %io:format("Match:~p~n",[Status_change_before]),
                        case Status_change_before of
                                done->
                                        Reply=aleady_done;
                                doing->
                                        Reply=good_job,
                                        io:format("comfirm to change the status:~p of Index~p~n",[Status,JobNumber]),
                                        ets:update_element(State,JobNumber,{3,Status});
                                not_done->
                                        Reply=did_not_been_got
                        end,
                        {reply,Reply,State};
                false->
                        {reply,job_not_found,State}
        end;
handle_call(stop,_From,State)->
        io:format("good bye!~n"),
        {stop,normal,stopped,State}.

handle_cast(_Msg,State)->
        {noreply,State}.

handle_info(_Info,State)->
        {noreply,State}.

terminate(_Reason,_State)->ok.

code_change(_OldVsn,State,_Extra)->{ok,State}.


相关文章

网友评论

      本文标题:[Erlang开发之路]二十三点二、用OTP去改造构建Job_C

      本文链接:https://www.haomeiwen.com/subject/yumcrctx.html