美文网首页
「物联网」编译emqtt桥接kafka消息队列插件

「物联网」编译emqtt桥接kafka消息队列插件

作者: SmartKarren | 来源:发表于2018-05-01 17:42 被阅读0次

    前言

    emqtt插件开发网络上可参考的地方很少,这里做一个记录,希望能帮你节约一点时间。

    引用

      -emqtt
      -emq-plugin-template插件模板
      -参考的emqtt-kafka插件

    环境

    -编译环境 erlang OTP19
    -运行环境 64位Centos7
    -emqtt-2.3.7

    编译

    添加引用

    #修改emq-relx的Makefile
    DEPS += emqttd emq_modules emq_dashboard emq_retainer emq_recon emq_reloader \
            emq_auth_clientid emq_auth_username emq_auth_ldap emq_auth_http \
            emq_auth_mysql emq_auth_pgsql emq_auth_redis  emq_auth_mongo \
            emq_sn emq_coap emq_stomp emq_plugin_template emq_web_hook \
            emq_lua_hook emq_auth_jwt <font color=#ff0000> emqttd_plugin_kafka_bridge </font>
    
    #添加插件git路径,或者用svn、cp(本地路径)
    dep_emqttd_plugin_kafka_bridge = git https://github.com/msdevanms/emqttd_plugin_kafka_bridge master
    

    添加插件自启动

    #在emq-relx/data/loaded_plugins文件末尾添加
    emqttd_plugin_kafka_bridge.
    

    添加配置

    #修改emq-relx的relx.config,只列出添加项
    {release, {emqttd, "2.3.7"}, [
      ...
      #添加
      kafkamocker,
      ekaf,
      ranch,
      {emqttd_plugin_kafka_bridge, load},
      #添加结束
      {emq_plugin_template, load}
    ]}.
    {overlay, [
        #添加配置文件
        {copy, "rel/conf/plugins/plugin.config", "etc/plugins/emqttd_plugin_kafka_bridge.config"},
        #添加结束
    ]}.
    

    运行

    $ cd _rel/emqttd/bin
    $ ./emqttd start
    

    kafka桥接修改的示例代码如下

    %%%-----------------------------------------------------------------------------
    %%% Copyright (c) 2016 Huang Rui<vowstar@gmail.com>, All Rights Reserved.
    %%%
    %%% Permission is hereby granted, free of charge, to any person obtaining a copy
    %%% of this software and associated documentation files (the "Software"), to deal
    %%% in the Software without restriction, including without limitation the rights
    %%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    %%% copies of the Software, and to permit persons to whom the Software is
    %%% furnished to do so, subject to the following conditions:
    %%%
    %%% The above copyright notice and this permission notice shall be included in all
    %%% copies or substantial portions of the Software.
    %%%
    %%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    %%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    %%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    %%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    %%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
    %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
    %%% SOFTWARE.
    %%%-----------------------------------------------------------------------------
    %%% @doc
    %%% emqttd_plugin_kafka_bridge.
    %%%
    %%% @end
    %%%-----------------------------------------------------------------------------
    -module(emqttd_plugin_kafka_bridge).
    
    -include("../../emqttd/include/emqttd.hrl").
    
    -include("../../emqttd/include/emqttd_protocol.hrl").
    
    -include("../../emqttd/include/emqttd_internal.hrl").
    
    -export([load/1, unload/0]).
    
    %% Hooks functions
    -export([on_client_connected/3, on_client_disconnected/3]).
    
    -export([on_client_subscribe/4, on_client_unsubscribe/4]).
    
    -export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]).
    
    -record(struct, {lst=[]}).
    
    %% Called when the plugin application start
    load(Env) ->
        ekaf_init([Env]),
        emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
        emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
        emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4, [Env]),
        emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4, [Env]),
        emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
        emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]),
        emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]).
    
    %%-----------client connect start-----------------------------------%%
    
    on_client_connected(ConnAck, Client = #mqtt_client{client_id  = ClientId}, _Env) ->
        io:format("client ~s connected, connack: ~w~n", [ClientId, ConnAck]),
    
        Json = mochijson2:encode([
            {type, <<"connected">>},
            {client_id, ClientId},
            {cluster_node, node()}
    %       ,{ts, emqttd_time:now_to_secs()}
        ]),
        
        ekaf:produce_sync(<<"broker_message">>, list_to_binary(Json)),
    
        {ok, Client}.
    
    %%-----------client connect end-------------------------------------%%
    
    
    
    %%-----------client disconnect start---------------------------------%%
    
    on_client_disconnected(Reason, _Client = #mqtt_client{client_id = ClientId}, _Env) ->
    %    io:format("client ~s disconnected, reason: ~w~n", [ClientId, Reason]),
    
        Json = mochijson2:encode([
            {type, <<"disconnected">>},
            {client_id, ClientId},
            {reason, Reason},
            {cluster_node, node()}
    %       ,{ts, emqttd_time:now_to_secs()}
        ]),
    
        ekaf:produce_sync(<<"broker_message">>, list_to_binary(Json)),
    
        ok.
    
    %%-----------client disconnect end-----------------------------------%%
    
    
    
    %%-----------client subscribed start---------------------------------------%%
    
    %% should retain TopicTable
    on_client_subscribe(ClientId,Username, TopicTable, _Env) ->
        io:format("client ~s will subscribe ~p~n", [ClientId, TopicTable]),
        
        case TopicTable of
            [_|_] -> 
                %% If TopicTable list is not empty
                Key = proplists:get_keys(TopicTable),
                %% build json to send using ClientId
                Json = mochijson2:encode([
                    {type, <<"subscribed">>},
                    {client_id, ClientId},
                    {topic, lists:last(Key)},
                    {cluster_node, node()}
                ]),
                ekaf:produce_sync(<<"broker_message">>, list_to_binary(Json));
            _ -> 
                %% If TopicTable is empty
                io:format("empty topic ~n")
        end,
        {ok, TopicTable}.
       
    %on_client_subscribe_after(ClientId, TopicTable, _Env) ->
    %    io:format("client ~s subscribed ~p~n", [ClientId, TopicTable]),
        
    %    case TopicTable of
    %        [_|_] -> 
    %            %% If TopicTable list is not empty
    %            Key = proplists:get_keys(TopicTable),
    %            %% build json to send using ClientId
    %            Json = mochijson2:encode([
    %                {type, <<"subscribed">>},
    %                {client_id, ClientId},
    %                {topic, lists:last(Key)},
    %                {cluster_node, node()}
    
    %                ,{ts, emqttd_time:now_to_secs()}
    %            ]),
    %            ekaf:produce_sync(<<"broker_message">>, list_to_binary(Json));
    %        _ -> 
    %            %% If TopicTable is empty
    %            io:format("empty topic ~n")
    %    end,
    
    %    {ok, TopicTable}.
    
    %%-----------client subscribed end----------------------------------------%%
    
    
    
    %%-----------client unsubscribed start----------------------------------------%%
    
    on_client_unsubscribe(ClientId,Username, Topics, _Env) ->
        io:format("client ~s unsubscribe ~p~n", [ClientId, Topics]),
    
        % build json to send using ClientId
        Json = mochijson2:encode([
            {type, <<"unsubscribed">>},
            {client_id, ClientId},
            {topic, lists:last(Topics)},
            {cluster_node, node()}
    %        ,{ts, emqttd_time:now_to_secs()}
        ]),
        
        ekaf:produce_sync(<<"broker_message">>, list_to_binary(Json)),
        
        {ok, Topics}.
    
    %%-----------client unsubscribed end----------------------------------------%%
    
    
    
    %%-----------message publish start--------------------------------------%%
    
    %% transform message and return
    on_message_publish(Message = #mqtt_message{topic = <<"$SYS/", _/binary>>}, _Env) ->
        {ok, Message};
    
    on_message_publish(Message, _Env) ->
        io:format("publish ~s~n", [emqttd_message:format(Message)]),   
    
    %    From = Message#mqtt_message.from,
    %    Sender =  Message#mqtt_message.sender,
        Topic = Message#mqtt_message.topic,
        Payload = Message#mqtt_message.payload, 
        QoS = Message#mqtt_message.qos,
        Timestamp = Message#mqtt_message.timestamp,
    
        Json = mochijson2:encode([
            {type, <<"published">>},
    %        {client_id, From},
            {topic, Topic},
    % 如果是二进制 {payload, binary_to_list(Payload)},
            {payload, Payload},
            {qos, QoS},
            {cluster_node, node()}
    %        ,{ts, emqttd_time:now_to_secs(Timestamp)}
        ]),
    
        ekaf:produce_sync(<<"broker_message">>, list_to_binary(Json)),
    
        {ok, Message}.
    
    %%-----------message delivered start--------------------------------------%%
    on_message_delivered(ClientId, Username, Message, _Env) ->
        io:format("delivered to client ~s: ~s~n", [ClientId, emqttd_message:format(Message)]),
    
    %    From = Message#mqtt_message.from,
    %    Sender =  Message#mqtt_message.sender,
        Topic = Message#mqtt_message.topic,
        Payload = Message#mqtt_message.payload, 
        QoS = Message#mqtt_message.qos,
        Timestamp = Message#mqtt_message.timestamp,
    
        Json = mochijson2:encode([
            {type, <<"delivered">>},
            {client_id, ClientId},
    %        {from, From},
            {topic, Topic},
    % 如果是二进制 {payload, binary_to_list(Payload)},
            {payload, Payload},
            {qos, QoS},
            {cluster_node, node()}
    %        ,{ts, emqttd_time:now_to_secs(Timestamp)}
        ]),
    
        ekaf:produce_sync(<<"broker_message">>, list_to_binary(Json)),
    
        {ok, Message}.
    %%-----------message delivered end----------------------------------------%%
    
    %%-----------acknowledgement publish start----------------------------%%
    on_message_acked(ClientId, Username, Message, _Env) ->
        io:format("client ~s acked: ~s~n", [ClientId, emqttd_message:format(Message)]),   
    
    %    From = Message#mqtt_message.from,
    %    Sender =  Message#mqtt_message.sender,
        Topic = Message#mqtt_message.topic,
        Payload = Message#mqtt_message.payload, 
        QoS = Message#mqtt_message.qos,
        Timestamp = Message#mqtt_message.timestamp,
    
        Json = mochijson2:encode([
            {type, <<"acked">>},
            {client_id, ClientId},
    %        {from, From},
            {topic, Topic},
    % 如果是二进制 {payload, binary_to_list(Payload)},
            {payload, Payload},
            {qos, QoS},
            {cluster_node, node()}
    %        ,{ts, emqttd_time:now_to_secs(Timestamp)}
        ]),
    
        ekaf:produce_sync(<<"broker_message">>, list_to_binary(Json)),
        {ok, Message}.
    
    %% ===================================================================
    %% ekaf_init
    %% ===================================================================
    
    ekaf_init(_Env) ->
        %% Get parameters
        {ok, Kafka} = application:get_env(emqttd_plugin_kafka_bridge, kafka),
        BootstrapBroker = proplists:get_value(bootstrap_broker, Kafka),
        PartitionStrategy= proplists:get_value(partition_strategy, Kafka),
        %% Set partition strategy, like application:set_env(ekaf, ekaf_partition_strategy, strict_round_robin),
        application:set_env(ekaf, ekaf_partition_strategy, PartitionStrategy),
        %% Set broker url and port, like application:set_env(ekaf, ekaf_bootstrap_broker, {"127.0.0.1", 9092}),
        application:set_env(ekaf, ekaf_bootstrap_broker, BootstrapBroker),
        %% Set topic
        application:set_env(ekaf, ekaf_bootstrap_topics, <<"broker_message">>),
    
        {ok, _} = application:ensure_all_started(kafkamocker),
        {ok, _} = application:ensure_all_started(gproc),
        {ok, _} = application:ensure_all_started(ranch),
        {ok, _} = application:ensure_all_started(ekaf),
    
        io:format("Init ekaf with ~p~n", [BootstrapBroker]).
    
    %% Called when the plugin application stop
    unload() ->
        emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3),
        emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3),
        emqttd:unhook('client.subscribe', fun ?MODULE:on_client_subscribe/3),
        emqttd:unhook('client.subscribe.after', fun ?MODULE:on_client_subscribe_after/3),
        emqttd:unhook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/3),
        emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),
        emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/3),
        emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/3).
    
    

    相关文章

      网友评论

          本文标题:「物联网」编译emqtt桥接kafka消息队列插件

          本文链接:https://www.haomeiwen.com/subject/jrqfrftx.html