美文网首页
EMQX源码阅读(三)

EMQX源码阅读(三)

作者: andywangzhen | 来源:发表于2020-01-13 12:35 被阅读0次

本次主要走一下客户端创建连接和接收数据的流程。

接上篇,创建Socket成功后,回调函数:emqx_connection, start_link, [Options -- SockOpts]。

emqx_connection.erl:
本模块为一个gen_server模块,所以它会给每一个客户端启动一个进程,并在初始化时,从acceptor接管Socket套接字。
init callback:

init(Parent, Transport, RawSocket, Options) ->
    case Transport:wait(RawSocket) of
        {ok, Socket} ->
            run_loop(Parent, init_state(Transport, Socket, Options));
        {error, Reason} ->
            ok = Transport:fast_close(RawSocket),
            exit_on_sock_error(Reason)
    end.

这里重点函数有两个:init_state/3和run_loop/2
init_state,顾名思义,是将进程state中的数据或对象初始化,其中主要有套接字信息、Frame、Parse、Channel等相关帮助模块的初始化、GC初始化等等。
run_loop,处理Socket数据的轮询函数:

  1. 通过activate_socket,设置允许接收数据包的个数,有效的控制接收流量
  2. 调用hibernate/2
hibernate(Parent, State) ->
    proc_lib:hibernate(?MODULE, wakeup_from_hib, [Parent, State]).

这里重点查了一下proc_lib:hibeernate的用法,类似erlang:hibernate,意思是,使调用进程处于等待状态,当有数据接收时,唤醒并调用MFA。

  1. wakeup_from_hib/2
    其中是一个receive的流程,处理收到的数据:
receive
        {system, From, Request} ->
            sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State);
        {'EXIT', Parent, Reason} ->
            terminate(Reason, State);
        Msg ->
            process_msg([Msg], Parent, ensure_stats_timer(IdleTimeout, State))
    after
        IdleTimeout ->
            hibernate(Parent, cancel_stats_timer(State))
    end.
  1. process_msg:
    主要处理分包,解MQTT协议包。并将完整的解析后数据,交给channel处理。
%%--------------------------------------------------------------------
%% Handle incoming packet

handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) ->
    ok = inc_incoming_stats(Packet),
    ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
    with_channel(handle_in, [Packet], State);

handle_incoming(FrameError, State) ->
    with_channel(handle_in, [FrameError], State).

%%--------------------------------------------------------------------
%% With Channel

with_channel(Fun, Args, State = #state{channel = Channel}) ->
    case erlang:apply(emqx_channel, Fun, Args ++ [Channel]) of
        ok -> {ok, State};
        {ok, NChannel} ->
            {ok, State#state{channel = NChannel}};
        {ok, Replies, NChannel} ->
            {ok, next_msgs(Replies), State#state{channel = NChannel}};
        {shutdown, Reason, NChannel} ->
            shutdown(Reason, State#state{channel = NChannel});
        {shutdown, Reason, Packet, NChannel} ->
            NState = State#state{channel = NChannel},
            ok = handle_outgoing(Packet, NState),
            shutdown(Reason, NState)
    end.

emqx_channel.erl & emqx_session.erl:
为什么将这两个放一起说呢,是因为他们俩是配合做事的。
主要是处理MQTT的各种协议包了:CONNECT,SUBSCRIBE,PUBLISH,UNSUB,DISCONN等等。有兴趣的同学可以深入进去,看看每一个协议包的处理流程,本次就不再赘述了。

截取connect的流程性代码片段:
handle_in:

handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
    case pipeline([fun enrich_conninfo/2,
                   fun check_connect/2,
                   fun enrich_client/2,
                   fun set_logger_meta/2,
                   fun check_banned/2,
                   fun auth_connect/2
                  ], ConnPkt, Channel#channel{conn_state = connecting}) of
        {ok, NConnPkt, NChannel} ->
            process_connect(NConnPkt, NChannel);
        {error, ReasonCode, NChannel} ->
            handle_out(connack, {ReasonCode, ConnPkt}, NChannel)
    end;

process_connect:

%%--------------------------------------------------------------------
%% Process Connect
%%--------------------------------------------------------------------

process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart},
                Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) ->
    case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of
        {ok, #{session := Session, present := false}} ->
            NChannel = Channel#channel{session = Session},
            handle_out(connack, {?RC_SUCCESS, sp(false), ConnPkt}, NChannel);
        {ok, #{session := Session, present := true, pendings := Pendings}} ->
            %%TODO: improve later.
            Pendings1 = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())),
            NChannel = Channel#channel{session  = Session,
                                       resuming = true,
                                       pendings = Pendings1
                                      },
            handle_out(connack, {?RC_SUCCESS, sp(true), ConnPkt}, NChannel);
        {error, client_id_unavailable} ->
            handle_out(connack, {?RC_CLIENT_IDENTIFIER_NOT_VALID, ConnPkt}, Channel);
        {error, Reason} ->
            ?LOG(error, "Failed to open session due to ~p", [Reason]),
            handle_out(connack, {?RC_UNSPECIFIED_ERROR, ConnPkt}, Channel)
    end.

可以简单看出来:

  1. 收到connect请求后,会尝试建立session数据
  2. connect的返回结果,是调用handle_out
-spec(handle_out(atom(), term(), channel())
      -> {ok, channel()}
       | {ok, replies(), channel()}
       | {shutdown, Reason :: term(), channel()}
       | {shutdown, Reason :: term(), replies(), channel()}).
handle_out(connack, {?RC_SUCCESS, SP, ConnPkt}, Channel) ->
    AckProps = run_fold([fun enrich_connack_caps/2,
                         fun enrich_server_keepalive/2,
                         fun enrich_assigned_clientid/2
                        ], #{}, Channel),
    AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps),
    return_connack(AckPacket,
                   ensure_keepalive(AckProps,
                                    ensure_connected(ConnPkt, Channel)));

handle_out(connack, {ReasonCode, _ConnPkt},
           Channel = #channel{conninfo   = ConnInfo,
                              clientinfo = ClientInfo}) ->
    ok = emqx_hooks:run('client.connected', [ClientInfo, ReasonCode, ConnInfo]),
    AckPacket = ?CONNACK_PACKET(
                   case maps:get(proto_ver, ConnInfo) of
                       ?MQTT_PROTO_V5 -> ReasonCode;
                       _Other -> emqx_reason_codes:compat(connack, ReasonCode)
                   end),
    shutdown(emqx_reason_codes:name(ReasonCode), AckPacket, Channel);

相关文章

网友评论

      本文标题:EMQX源码阅读(三)

      本文链接:https://www.haomeiwen.com/subject/cdbgactx.html