美文网首页
[Erlang开发之路]二十二点三、加入gen_tcp的job_

[Erlang开发之路]二十二点三、加入gen_tcp的job_

作者: 循环不计次 | 来源:发表于2019-07-26 18:02 被阅读0次

    意义

    加入gen_tcp之后,可以跨机器、网络去操作job_center

    过程中遇到的问题:
    clien收到rpc来的消息后,向tcp服务端发送消息,然后在接收到tcp服务端的消息之前老是先接收到进程rpc来的消息{From,Bin},也就是在第二个receive的时候收到了非服务端来的消息,导致没有匹配成功4

    loopRecv(Socket)->
        receive
                {tcp,Socket,Bin}->
                        Term=binary_to_term(Bin),
                        io:format("Recv From Job_center:~p~n",[Term]);
                {tcp_closed,Socket}->
                        io:format("Drop Connect with Job_center~n");
                {From,Bin}->
                        gen_tcp:send(Socket,term_to_binary(Bin)),
                        receive
                                {tcp,Socket,Bin}->
                                        From ! binary_to_term(Bin);
                                {tcp_closed,Socket}->
                                        From ! {tcp_closed,Socket}
                        after 0->
                            do_nothings
                        end;
                _->
                        unknown
        end,
        io:format("finish a recv!~n"),
        loopRecv(Socket).
    

    后来加入after优先处理之后就好了,因为after在第二个receive中没有匹配到的时候,直接到after去,但是我after那里do_nothings,也就是不做事,然后就回到第一个receive去匹配了,但我还是没想明白为啥会收到一条来自进程的消息。

    job_center.app

    {application,job_center_app,
    [{description,"A Job Deliver Center 1.0"},
     {vsn,"1.0"},
     {modules,[job_center_app,job_center_server,job_center,job_center_supervisor,job_center_alarm]},
     {registered,[job_center]},
     {applications,[kernel,stdlib]},
     {mod,{job_center_app,[]}},
     {start_phases,[]}
    ]}.
    
    

    job_center.erl (job_center核心功能

    -module(job_center).
    -behaviour(gen_server).
    -export([loopAccept/1,loopRecv/1,code_change/3,terminate/2,test/0,start_link/0,job_getStatistic/1,stop/0,init/1,add_job/1,work_wanted/0,job_done/2,handle_call/3,handle_cast/2,handle_info/2,job_statistic/0]).
    -import(gen_server,[start_link/4,call/2]).
    -spec add_job(fun())->integer().
    
    -define(debug_flag,true).
    -ifdef(debug_flag).
    -define(DEBUG(X),io:format("DEBUG ~p:~p ~p~n",[?MODULE,?LINE,X])).
    -else.
    -define(DEBUG(X),void).
    -endif.
    
    
    test()->
            start_link(),
            job_center:add_job(fun()->good_job end).
    
    
    start_link()->
            io:format("start_link_now!~n"),
            start_link({local,?MODULE},?MODULE,[],[]).
    
    stop()->
            call(?MODULE,stop).
    
    init([])->
            io:format("job_center Start!~n"),
            %process_flag(trap_exit,true),
            {ok,Listen}=gen_tcp:listen(6612,[binary,{packet,4}]),
            io:format("Listen:~p~n",[Listen]),
            spawn(fun()->loopAccept(Listen) end),
            State=ets:new(?MODULE,[]),
            ets:insert(State,{job_index,0}),
            {ok,State}.
    %                      gen_tcp function
    loopAccept(Listen)->
        {ok,Socket}=gen_tcp:accept(Listen),
        spawn(fun()->loopAccept(Listen) end),
        io:format("new Client In!~n"),
        loopRecv(Socket).
    loopRecv(Socket)->
            receive
                    {tcp,Socket,Binary} when is_binary(Binary)->
                            Packet=binary_to_term(Binary),
                            io:format("Recv a message:~p~n",[Packet]),
                            Ret=case Packet of
                                    {add_job,Fun}->
                                            add_job(Fun);
                                    {work_wanted}->
                                            work_wanted();
                                    {job_done,JobNumber,Status}->
                                            job_done(JobNumber,Status);
                                    {job_statistic}->
                                            job_statistic();
                                    {job_getStatistic,JobNumber}->
                                            job_getStatistic(JobNumber);
                                    _->
                                            command_not_found
                            end,
                            gen_tcp:send(Socket,term_to_binary(Ret)),
                            loopRecv(Socket);
                    {tcp_closed,Socket}->
                            io:format("Client Disconnent~n");
                    UnKnown->
                            gen_tcp:send(Socket,term_to_binary({command_not_found,UnKnown})),
                            loopRecv(Socket)
            end,
            io:format("finish recv a msg!~n").
    %                      user api
    add_job(Fun)->
            call(?MODULE,{add_job,Fun}).
    
    work_wanted()->
            call(?MODULE,{work_wanted}).
    
    job_done(JobNumber,Status)->
            io:format("Request change Job:~p Status:~p~n",[JobNumber,Status]),
            call(?MODULE,{job_done,JobNumber,Status}).
    
    job_statistic()->
            call(?MODULE,{job_statistic}).
    
    job_getStatistic(Index)->
            call(?MODULE,{job_getStatistic,Index}).
    
    %                    callback Function 
    handle_call({job_getStatistic,Index},_From,State)->
            io:format("someone want to get statistic~n"),
            case ets:member(State,Index) of
                true->
                    io:format("getStatistic:~p~n",[Index]),
                    {reply,ets:lookup_element(State,Index,3),State};
                false->
                    {reply,job_not_found,State}
            end;
    handle_call({job_statistic},_From,State)->
            %未完成的任务数
            Match_not_done=ets:match(State,{'$1','$2',not_done}),
            io:format("未完成的数量:~p~n",[length(Match_not_done)]),
            Match_doing=ets:match(State,{'$1','$2',doing}),
            io:format("正在进行的数量:~p~n",[length(Match_doing)]),
            Match_done=ets:match(State,{'$1','$2',done}),
            io:format("已完成的数量:~p~n",[length(Match_done)]),
            {reply,show_all_finish,State};
    handle_call({add_job,Fun},_From,State)->
            [{job_index,Index}]=ets:lookup(State,job_index),
            ets:insert(State,[{job_index,Index+1},{Index,Fun,not_done}]),
            Reply=ok,
            {reply,Reply,State};
    
    handle_call({work_wanted},{Pid,_Ref},State)->
            Match_Ret=ets:match(State,{'$1','$2',not_done},1),
            case Match_Ret of
                '$end_of_table'->%没job了
                        Fun=void,
                        Index=-1,
                        {reply,{no_job_now,Index,Fun},State};
                 {[[Index,Fun]],_}->%判断表中还有未完成的job
                        ets:update_element(State,Index,{3,doing}),%更新状态为doing正在进行
                        %-------------------检测崩溃
                        spawn_link(fun()->
                        Ref=monitor(process,Pid),
                        receive
                                {'DOWN',Ref,process,Pid,Why}->
                                        io:format("worker:~p leave for ~p ~n",[Pid,Why]),
                                        Bool=Why=:=normal,
                                        if 
                                                Bool=:=false-> 
                                                        job_center:job_done(Index,not_done);
                                                Bool=:=true->
                                                        void
                                        end
                        end
                              end),%监控是否崩溃防止崩溃时数据停留
                        %-------------------检测超时
                        spawn(fun()->
                            timer:sleep(4000),
                            case ets:lookup_element(State,Index,3) of
                                        doing->Pid ! {hurry_up};
                                        _->ok
                            end,
                            timer:sleep(2000),
                            exit(Pid,you_are_fired)
                              end),%检测超时,超过6秒就炒鱿鱼
                        {reply,{get_job_succ,Index,Fun,5000},State}
            end;
            
    
    handle_call({job_done,JobNumber,Status},_From,State)->
            case ets:member(State,JobNumber) of%首先判断这个键值是否存在,存在才可以去lookup否则报错
                    true->
                            Status_change_before=ets:lookup_element(State,JobNumber,3),
                            %io:format("Match:~p~n",[Status_change_before]),
                            case Status_change_before of
                                    done->
                                            Reply=aleady_done;
                                    doing->
                                            Reply=good_job,
                                            io:format("comfirm to change the status:~p of Index~p~n",[Status,JobNumber]),
                                            ets:update_element(State,JobNumber,{3,Status});
                                    not_done->
                                            Reply=did_not_been_got
                            end,
                            {reply,Reply,State};
                    false->
                            {reply,job_not_found,State}
            end;
    handle_call(stop,_From,State)->
            io:format("good bye!~n"),
            {stop,normal,stopped,State}.
    
    handle_cast(_Msg,State)->
            {noreply,State}.
    
    handle_info(_Info,State)->
            {noreply,State}.
    
    terminate(_Reason,_State)->ok.
    
    code_change(_OldVsn,State,_Extra)->{ok,State}.
    

    job_center_alarm.erl (警报处理器

    -module(job_center_alarm).
    -export([init/1,handle_event/2,handle_call/2,handle_info/2,code_change/3,terminate/2]).
    -behaviour(gen_event).
    
    init(Args)->{ok,0}.
    
    handle_event({set_alarm,What},State)->
            io:format("Alarm comming:~p!~n",[What]),
            {ok,State};
    
    handle_event({clear_alarm,What},State)->
            io:format("Alarm_clean:~p!~n",[What]),
            {ok,State}.
    
    handle_call(_Req,State)->
            {ok,ignore,State}.
    
    handle_info(_Info,State)->
            {ok,State}.
    
    code_change(_OldVsn,State,_Extra)->{ok,State}.
    
    terminate(_Reason,_State)->ok.
    

    job_center_app.erl (application打包管理

    -module(job_center_app).
    -behaviour(application).
    -export([start/2,stop/1]).
    start(_Type,StartArgs)->
            job_center_supervisor:start_link(StartArgs).
    stop(_State)->
            ok.
    

    job_center_client.erl (Tcp客户端

    -module(job_center_client).
    -export([start_client/0,loopRecv/1,rpc/1,job_done/2,job_statistic/0,job_getStatistic/1,work_wanted/0,add_job/1]).
    
    start_client()->
            case gen_tcp:connect("localhost",6612,[binary,{packet,4}]) of
                    {ok,Socket}->
                            register(job_center_client,spawn(fun()->loopRecv(Socket) end));
                    {error,Desc}->
                            io:format("connect error!~p~n",[Desc])
            end.
    loopRecv(Socket)->
        receive
                {tcp,Socket,Bin}->
                        Term=binary_to_term(Bin),
                        io:format("Recv From Job_center:~p~n",[Term]);
                {tcp_closed,Socket}->
                        io:format("Drop Connect with Job_center~n");
                {process,From,Bin}->
                        gen_tcp:send(Socket,term_to_binary(Bin)),
                        receive
                                {tcp,Socket,Bin}->
                                        From ! binary_to_term(Bin);
                                {tcp_closed,Socket}->
                                        From ! {tcp_closed,Socket}
                        after 0->
                            %gen_tcp:send(Socket,term_to_binary({err}))
                            do_nothing
                        end;
                _->
                        unknown
        end,
        io:format("finish a recv!~n"),
        loopRecv(Socket).
    job_done(Index,State)->
            rpc({job_done,Index,State}).
    job_statistic()->
            rpc({job_statistic}).
    job_getStatistic(Index)->
            rpc({job_getStatistic,Index}).
    add_job(Fun)->
            rpc({add_job,Fun}).
    work_wanted()->
            rpc({work_wanted}).
    rpc(Req)->
            io:format("start to send~n"),
            job_center_client ! {process,self(),Req},
            io:format("send finish!~n"),
            receive 
                    {tcp_closed,_Socket}->
                            io:format("Drop Connect with Job_center~n");
                    {tcp,_Socket,Bin}->
                            io:format("Ret:~p~n",[binary_to_term(Bin)]);
                    Any->
                            io:format("unknow:~p~n",[Any])
            end,
            io:format("receive finish").
    
    

    job_center_server.erl(job_center服务端

    -module(job_center_server).
    -export([start_server/0,loopAccept/1,loopRecv/1]).
    
    start_server()->
            {ok,Listen}=gen_tcp:listen(6612,[binary,{packet,4},{reuseaddr,true},{active,true}]),
            spawn(fun()-> loopAccept(Listen) end).
    
    
    loopAccept(Listen)->
        {ok,Socket}=gen_tcp:accept(Listen),
        spawn(fun()->loopAccept(Listen) end),
        loopRecv(Socket).
    
    loopRecv(Socket)->
            receive
                    {tcp,Socket,Binary}->
                            Packet=binary_to_term(Binary),
                            Ret=rpc:call(job_center,Packet),
                            gen_tcp:send(Socket,term_to_binary(Ret)),
                            loopRecv(Socket);
                    {tcp_close,Socket}->
                            io:format("Server socket closed!~n",[])
            end.    
    

    job_center_supervisor.erl (job_center监控树

    -module(job_center_supervisor).
    -export([start/0,test/0,init/1,start_link/1]).
    -behaviour(supervisor).
    -define(MAXRESTARTS,3).
    -define(TIME,10).
    start()->
            spawn(fun()-> supervisor:start_link({local,?MODULE},?MODULE,[]) end).
    start_link(Args)->
            supervisor:start_link({local,?MODULE},?MODULE,Args).
    test()->
            {ok,Pid}=supervisor:start_link({local,?MODULE},?MODULE,[]),
            unlink(Pid).
    init([])->
            gen_event:swap_handler(alarm_handler,{alarm_handler,swap},{job_center_alarm,[]}),
            {ok,{{one_for_one,?MAXRESTARTS,?TIME},
            [{job_center,
             {job_center,start_link,[]},
             permanent,
             10000,
             worker,
             [job_center]
             }
            ]}}.
    

    相关文章

      网友评论

          本文标题:[Erlang开发之路]二十二点三、加入gen_tcp的job_

          本文链接:https://www.haomeiwen.com/subject/roedrctx.html