美文网首页
Centos7 搭建v2.3.11 emq broker及编写k

Centos7 搭建v2.3.11 emq broker及编写k

作者: Cherron | 来源:发表于2019-01-08 17:12 被阅读0次

    背景:

    • emq开源版集成了许多插件,但没有提供kafka的。但官方提供了插件的模板,我们只需要照葫芦画瓢即可;
    • 由于emq是基于Erlang编程语言,所以我们需要搭建Erlang环境(Erlang这语言,不到万不得已,千万别碰!!);
    • 选择v2.3.11是因为截止写这篇博客的时候,v2版本才是真正的稳定版。

    Ok,开始吧!


    一、Erlang环境
    1. 安装依赖
      yum install fop
      yum install gcc gcc-c++ glibc-devel kernel-devel make openssl-devel autoconf
      yum -y install ncurses ncurses-devel
      yum install m4 openssl-devel unixODBC unixODBC-devel
    2. 下载安装包
      wget http://erlang.org/download/otp_src_20.3.tar.gz
    3. 解压、安装
      tar -zxvf otp_src_20.3.tar.gz
      ./configure --prefix=/usr/local/erlang
      make
      make install
    4. 添加环境变量
      vi /etc/profile
    #set erlang environment
    export PATH=$PATH:/usr/local/erlang/bin
    

    source /etc/profile

    1. 验证


      Erlang成功!ctrl + c >press 'a'退出
    二、测试安装Emq

    因为我们后续要加上自己的kafka插件代码,所以这步只是测试是否能正常编译

    1. 下载源码
      wget https://codeload.github.com/emqx/emqx-rel/tar.gz/v2.3.11
      mv v2.3.11 emq-v2.3.11.tar.gz
    2. 解压
      tar -zxvf emq-v2.3.11.tar.gz
      cd emqx-rel-2.3.11
    3. 编译
      make
      编译需要花点时间,如果编译成功,就可以进入下一步了。
      如果有错误,那么一定是linux环境的问题,因为这个Erlang版本和Emq版本是我测试过的。我之前就因为一直没通过,所以重新在本地建了一个centos7虚拟机,再按照步骤来,就成功了。
    三、编写插件

    非常感谢写这篇博客《EMQ集成Kafka插件编写过程 emq_plugin_kafka》的作者啊,帮了大忙!
    除了有一些小问题~

    1. 第一次编译之后,文件结构如下:


      emq文件结构
    2. 复制一份插件模板,并更名:
      cd deps
      cp -r emq_plugin_template emq_plugin_kafka
      cd emq_plugin_kafka
    3. 插件目录结构


      插件目录结构
    4. 修改关键字“template”为“kafka”
      将插件目录emq_plugin_kafka中所有文件名和里面的内容中的“template”改为“kafka”
      (你总不希望别人看你的代码代码时,上面的“template”几个大字赫然在目吧?),包括src目录下、Makefile文件、etc下等
    5. 初步处理src下的文件
    • 因为没用到权限验证和登录授权验证,所以删除acl和auth代码
      rm -rf emq_acl_demo.erl
      rm -rf emq_auth_demo.erl
    • 修改emq_plugin_kafka_app.erl文件,把acl和auth的模块注册代码去掉,并加些打印语句
      vim emq_plugin_kafka_app.erl
    -module(emq_plugin_kafka_app).
    
    -behaviour(application).
    
    %% Application callbacks
    -export([start/2, stop/1]).
    
    start(_StartType, _StartArgs) ->
        {ok, Sup} = emq_plugin_kafka_sup:start_link(),
        %% ok = emqttd_access_control:register_mod(auth, emq_auth_demo, []),
        %% ok = emqttd_access_control:register_mod(acl, emq_acl_demo, []),
        emq_plugin_kafka:load(application:get_all_env()),
        io:format("emq_plugin_kafka start.~n", []),
        {ok, Sup}.
    
    stop(_State) ->
        %% ok = emqttd_access_control:unregister_mod(auth, emq_auth_demo),
        %% ok = emqttd_access_control:unregister_mod(acl, emq_acl_demo),
        emq_plugin_kafka:unload(),
        io:format("emq_plugin_kafka stop.~n", []).
    
    • 暂时测试一下
      ①修改配置文件relx.config:vim relx.config
      添加一行:{emq_plugin_kafka, load},
      ②修改Makefile:vim Makefile
      添加emq_plugin_kafka:
    DEPS += emqttd emq_modules emq_dashboard emq_retainer emq_recon emq_reloader \
            emq_auth_clientid emq_auth_username emq_auth_ldap emq_auth_http \
            emq_auth_mysql emq_auth_pgsql emq_auth_redis emq_auth_mongo \
            emq_sn emq_coap emq_stomp emq_plugin_template emq_web_hook \
            emq_lua_hook emq_auth_jwt emq_plugin_kafka
    

    ③设置emq启动时加载插件:vim data/loaded_plugins

    emq_recon.
    emq_modules.
    emq_retainer.
    emq_dashboard.
    emq_plugin_kafka.
    

    ④在emq根目录make,如果不成功,可能是文件/内容修改不完全
    控制台启动emq
    ./_rel/emqttd/bin/emqttd console
    启动日志:

    启动日志,也可以通过ip:18083来查看插件启动状态
    6.1. 连接单节点的kafka的代码——ekaf工具

    如何搭建kafka,可参考我的另一篇博客《三台虚拟机搭建kafka集群(和zookeeper集群)
    假设你已经搭建好了kafka。
    为了连接上kafka,需要用到基于Erlang写的工具ekaf

    • 修改配置文件
      ① 配置文件变更历史,官方介绍
      通过我的后期实践的理解,conf后缀对应key=value这种配置格式,config后缀对应Erlang原始配置格式
      Makefile里会有对应配置文件路径:
    app.config::
            ./deps/cuttlefish/cuttlefish -l info -e etc/ -c etc/emq_plugin_kafka.conf -i priv/emq_plugin_kafka.schema -d data
    

    ② 进入emq_plugin_kafka/etc,修改文件名后缀为conf,因为Makefile里写的是conf
    mv emq_plugin_kafka.config emq_plugin_kafka.conf
    ③ 编辑该文件:vim emq_plugin_kafka.conf

    emq.plugin.kafka.server = 192.168.220.129:9092
    emq.plugin.kafka.topic = kafka-topic
    

    ④ 在deps/emq_plugin_kafka目录下,新建priv文件夹,然后新建schema文件
    mkdir priv
    vim priv/emq_plugin_kafka.schema
    我复制上面提到的博客《EMQ集成Kafka插件编写过程 emq_plugin_kafka》里的代码,老是解析这个文件出错,不得已,自己一点一点敲,最后变成了这个样子:

    {
        mapping,
        "emq.plugin.kafka.server",
        "emq_plugin_kafka.server",
        [
    {
        mapping,
        "emq.plugin.kafka.server",
        "emq_plugin_kafka.server",
        [
            {default, {"127.0.0.1", 9092}},
            {datatype, [integer, ip, string]}
        ]
    }.
    
    {
        mapping,
        "emq.plugin.kafka.topic",
        "emq_plugin_kafka.server",
        [
            {default, "test"},
            {datatype, string},
            hidden
        ]
    }.
    
    {
        translation,
        "emq_plugin_kafka.server",
        fun(Conf) ->
                {RHost, RPort} = case cuttlefish:conf_get("emq.plugin.kafka.server", Conf) of
                                     {Ip, Port} -> {Ip, Port};
                                     S          -> case string:tokens(S, ":") of
                                                       [Domain]       -> {Domain, 9092};
                                                       [Domain, Port] -> {Domain, list_to_integer(Port)}
                                                   end
                                 end,
                Topic = cuttlefish:conf_get("emq.plugin.kafka.topic", Conf),
                [
                 {host, RHost},
                 {port, RPort},
                 {topic, Topic}
                ]
        end
    }.
    

    ⑤ 修改Makefile文件,增加ekaf依赖

    PROJECT = emq_plugin_kafka
    PROJECT_DESCRIPTION = EMQ Plugin Kafka
    PROJECT_VERSION = 2.3.11
    
    BUILD_DEPS = emqttd cuttlefish ekaf
    dep_emqttd = git https://github.com/emqtt/emqttd v2.3.11
    dep_cuttlefish = git https://github.com/emqtt/cuttlefish v2.0.11
    dep_ekaf = git https://github.com/helpshift/ekaf master
    
    ERLC_OPTS += +debug_info
    ERLC_OPTS += +'{parse_transform, lager_transform}'
    
    NO_AUTOPATCH = cuttlefish
    
    COVER = true
    
    include erlang.mk
    
    app:: rebar.config
    
    app.config::
            ./deps/cuttlefish/cuttlefish -l info -e etc/ -c etc/emq_plugin_kafka.conf -i priv/emq_plugin_kafka.schema -d data
    

    ⑥ 编写逻辑代码vim src/emq_plugin_kafka.erl
    主要是写了个 ekaf_send(Message, _Env) 方法,然后再消息到来的时候调用

    -module(emq_plugin_kafka).
    
    -include_lib("emqttd/include/emqttd.hrl").
    
    -define(APP, emq_plugin_kafka).
    
    -export([load/1, unload/0]).
    
    %% Hooks functions
    
    -export([on_client_connected/3, on_client_disconnected/3]).
    
    -export([on_client_subscribe/4, on_client_unsubscribe/4]).
    
    -export([on_session_created/3, on_session_subscribed/4, on_session_unsubscribed/4, on_session_terminated/4]).
    
    -export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]).
    
    %% Called when the plugin application start
    load(Env) ->
        ekaf_init(Env),
        emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
        emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
        emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4, [Env]),
        emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4, [Env]),
        emqttd:hook('session.created', fun ?MODULE:on_session_created/3, [Env]),
        emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),
        emqttd:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4, [Env]),
        emqttd:hook('session.terminated', fun ?MODULE:on_session_terminated/4, [Env]),
        emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
        emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]),
        emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]).
    
    on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId}, _Env) ->
        io:format("client ~s connected, connack: ~w~n", [ClientId, ConnAck]),
        {ok, Client}.
    
    on_client_disconnected(Reason, _Client = #mqtt_client{client_id = ClientId}, _Env) ->
        io:format("client ~s disconnected, reason: ~w~n", [ClientId, Reason]),
        ok.
    
    on_client_subscribe(ClientId, Username, TopicTable, _Env) ->
        io:format("client(~s/~s) will subscribe: ~p~n", [Username, ClientId, TopicTable]),
        {ok, TopicTable}.
    
    on_client_unsubscribe(ClientId, Username, TopicTable, _Env) ->
        io:format("client(~s/~s) unsubscribe ~p~n", [ClientId, Username, TopicTable]),
        {ok, TopicTable}.
    
    on_session_created(ClientId, Username, _Env) ->
        io:format("session(~s/~s) created.", [ClientId, Username]).
    
    on_session_subscribed(ClientId, Username, {Topic, Opts}, _Env) ->
        io:format("session(~s/~s) subscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
        {ok, {Topic, Opts}}.
    
    on_session_unsubscribed(ClientId, Username, {Topic, Opts}, _Env) ->
        io:format("session(~s/~s) unsubscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
        ok.
    
    on_session_terminated(ClientId, Username, Reason, _Env) ->
        io:format("session(~s/~s) terminated: ~p.", [ClientId, Username, Reason]).
    
    %% transform message and return
    on_message_publish(Message = #mqtt_message{topic = <<"$SYS/", _/binary>>}, _Env) ->
        {ok, Message};
    
    on_message_publish(Message, _Env) ->
        io:format("publish ~s~n", [emqttd_message:format(Message)]),
        ekaf_send(Message, _Env),
        {ok, Message}.
    
    on_message_delivered(ClientId, Username, Message, _Env) ->
        io:format("delivered to client(~s/~s): ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
        {ok, Message}.
    
    on_message_acked(ClientId, Username, Message, _Env) ->
        io:format("client(~s/~s) acked: ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
        {ok, Message}.
    
    %% Called when the plugin application stop
    unload() ->
        emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3),
        emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3),
        emqttd:unhook('client.subscribe', fun ?MODULE:on_client_subscribe/4),
        emqttd:unhook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4),
        emqttd:unhook('session.created', fun ?MODULE:on_session_created/3),
        emqttd:unhook('session.subscribed', fun ?MODULE:on_session_subscribed/4),
        emqttd:unhook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4),
        emqttd:unhook('session.terminated', fun ?MODULE:on_session_terminated/4),
        emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),
        emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/4),
        emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/4).
    
    ekaf_init(_Env) ->
        {ok, Kafka_Env} = application:get_env(?APP, server),
        Host = proplists:get_value(host, Kafka_Env),
        Port = proplists:get_value(port, Kafka_Env),
        Broker = {Host, Port},
        %Broker = {"192.168.52.130", 9092},
        Topic = proplists:get_value(topic, Kafka_Env),
        %Topic = "test-topic",
    
        application:set_env(ekaf, ekaf_partition_strategy, strict_round_robin),
        application:set_env(ekaf, ekaf_bootstrap_broker, Broker),
        application:set_env(ekaf, ekaf_bootstrap_topics, list_to_binary(Topic)),
        %%设置数据上报间隔,ekaf默认是数据达到1000条或者5秒,触发上报
        application:set_env(ekaf, ekaf_buffer_ttl, 100),
    
        {ok, _} = application:ensure_all_started(ekaf).
        %io:format("Init ekaf with ~p~n", [Broker]),
        %Json = mochijson2:encode([
        %    {type, <<"connected">>},
        %    {client_id, <<"test-client_id">>},
        %    {cluster_node, <<"node">>}
        %]),
        %io:format("send : ~w.~n",[ekaf:produce_async_batched(list_to_binary(Topic), list_to_binary(Json))]).
    
    
    ekaf_send(Message, _Env) ->
        From = Message#mqtt_message.from,
        Topic = Message#mqtt_message.topic,
        Payload = Message#mqtt_message.payload,
        Qos = Message#mqtt_message.qos,
        Dup = Message#mqtt_message.dup,
        Retain = Message#mqtt_message.retain,
        ClientId = get_form_clientid(From),
        Username = get_form_username(From),
        io:format("publish ~s~n", [emqttd_message:format(Message)]),
        Str = [
                  {client_id, ClientId},
                  {message, [
                                {username, Username},
                                {topic, Topic},
                                {payload, Payload},
                                {qos, Qos},
                                {dup, Dup},
                                {retain, Retain}
                            ]},
                   {cluster_node, node()},
                   {ts, emqttd_time:now_ms()}
               ],
        %io:format("Str : ~s~n", [Str]),
        Json = mochijson2:encode(Str),
        KafkaTopic = get_topic(),
        ekaf:produce_sync_batched(KafkaTopic, list_to_binary(Json)).
    
    get_form_clientid({ClientId, Username}) -> ClientId;
    get_form_clientid(From) -> From.
    get_form_username({ClientId, Username}) -> Username;
    get_form_username(From) -> From.
    
    get_topic() ->
        {ok, Topic} = application:get_env(ekaf, ekaf_bootstrap_topics),
        Topic.
    
    6.2 连接集群的kafka的代码——brod工具

    ekaf工具不支持集群,brod支持,当然,它也支持单节点~
    因为我用了git工具,所以大家可以看看,在6.1的基础上,做了哪些改动

    [root@localhost emq_plugin_kafka]# git status
    # 位于分支 based-on-brod
    # 尚未暂存以备提交的变更:
    #   (使用 "git add/rm <file>..." 更新要提交的内容)
    #   (使用 "git checkout -- <file>..." 丢弃工作区的改动)
    #
    #   修改:      Makefile
    #   删除:      etc/emq_plugin_kafka.conf
    #   修改:      src/emq_plugin_kafka.erl
    #
    # 未跟踪的文件:
    #   (使用 "git add <file>..." 以包含要提交的内容)
    #
    #   etc/emq_plugin_kafka.config
    修改尚未加入提交(使用 "git add" 和/或 "git commit -a")
    

    所以你们只需要:
    ① 修改Makefile

    PROJECT = emq_plugin_kafka
    PROJECT_DESCRIPTION = EMQ Plugin Kafka
    PROJECT_VERSION = 2.3.11
    
    BUILD_DEPS = emqttd cuttlefish ekaf brod
    dep_emqttd = git https://github.com/emqtt/emqttd v2.3.11
    dep_cuttlefish = git https://github.com/emqtt/cuttlefish v2.0.11
    dep_ekaf = git https://github.com/helpshift/ekaf master
    dep_brod = git https://github.com/klarna/brod.git 3.7.3
    
    ERLC_OPTS += +debug_info
    ERLC_OPTS += +'{parse_transform, lager_transform}'
    
    NO_AUTOPATCH = cuttlefish
    
    COVER = true
    
    include erlang.mk
    
    app:: rebar.config
    
    app.config::
            ./deps/cuttlefish/cuttlefish -l info -e etc/ -c etc/emq_plugin_kafka.config -i priv/emq_plugin_kafka.schema -d data
    

    ② 删除etc/emq_plugin_kafka.conf
    rm -f etc/emq_plugin_kafka.conf
    ③ 修改src/emq_plugin_kafka.erl

    -module(emq_plugin_kafka).
     
    -include_lib("emqttd/include/emqttd.hrl").
     
    -include_lib("brod/include/brod_int.hrl").
     
    -define(TEST_TOPIC, <<"test-topic">>).
     
    -export([load/1, unload/0]).
     
    %% Hooks functions
     
    -export([on_client_connected/3, on_client_disconnected/3]).
     
    -export([on_client_subscribe/4, on_client_unsubscribe/4]).
     
    -export([on_session_created/3, on_session_subscribed/4, on_session_unsubscribed/4, on_session_terminated/4]).
     
    -export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]).
     
    %% Called when the plugin application start
    load(Env) ->
        brod_init([Env]),
        emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
        emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
        emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4, [Env]),
        emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4, [Env]),
        emqttd:hook('session.created', fun ?MODULE:on_session_created/3, [Env]),
        emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),
        emqttd:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4, [Env]),
        emqttd:hook('session.terminated', fun ?MODULE:on_session_terminated/4, [Env]),
        emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
        emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]),
        emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]).
     
    on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId}, _Env) ->
        io:format("client ~s connected, connack: ~w~n", [ClientId, ConnAck]),
        Json = mochijson2:encode([
            {type, <<"connected">>},
            {client_id, ClientId},
            {cluster_node, node()},
            {ts, emqttd_time:now_ms()}
        ]),
        
        %%ok = brod:produce_sync(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_1">>, list_to_binary(Json)),
        {ok, Kafka} = application:get_env(?MODULE, kafka),
        KafkaTopic = proplists:get_value(kafka_topic, Kafka),
        {ok, CallRef} = brod:produce(brod_client_1, KafkaTopic, 0, <<"mykey_1">>, list_to_binary(Json)),
        receive
            #brod_produce_reply{ call_ref = CallRef
                               , result   = brod_produce_req_acked
                           } ->
            io:format("brod_produce_reply:ok ~n"),
            ok
        after 5000 ->
            io:format("brod_produce_reply:exit ~n"),
            erlang:exit(timeout)
            %%ct:fail({?MODULE, ?LINE, timeout})
        end,
     
        {ok, Client}.
     
    on_client_disconnected(Reason, _Client = #mqtt_client{client_id = ClientId}, _Env) ->
        io:format("client ~s disconnected, reason: ~w~n", [ClientId, Reason]),
        Json = mochijson2:encode([
            {type, <<"disconnected">>},
            {client_id, ClientId},
            {reason, Reason},
            {cluster_node, node()},
            {ts, emqttd_time:now_ms()}
        ]),
     
        %%ok = brod:produce_sync(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_2">>, list_to_binary(Json)),
        {ok, Kafka} = application:get_env(?MODULE, kafka),
        KafkaTopic = proplists:get_value(kafka_topic, Kafka),
        {ok, CallRef} = brod:produce(brod_client_1, KafkaTopic, 0, <<"mykey_2">>, list_to_binary(Json)),
        receive
            #brod_produce_reply{ call_ref = CallRef
                               , result   = brod_produce_req_acked
                           } ->
            ok
        after 5000 ->
            ct:fail({?MODULE, ?LINE, timeout})
        end,
     
        ok.
     
    on_client_subscribe(ClientId, Username, TopicTable, _Env) ->
        io:format("client(~s/~s) will subscribe: ~p~n", [Username, ClientId, TopicTable]),
        {ok, TopicTable}.
        
    on_client_unsubscribe(ClientId, Username, TopicTable, _Env) ->
        io:format("client(~s/~s) unsubscribe ~p~n", [ClientId, Username, TopicTable]),
        {ok, TopicTable}.
     
    on_session_created(ClientId, Username, _Env) ->
        io:format("session(~s/~s) created.", [ClientId, Username]).
     
    on_session_subscribed(ClientId, Username, {Topic, Opts}, _Env) ->
        io:format("session(~s/~s) subscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
        {ok, {Topic, Opts}}.
     
    on_session_unsubscribed(ClientId, Username, {Topic, Opts}, _Env) ->
        io:format("session(~s/~s) unsubscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
        ok.
     
    on_session_terminated(ClientId, Username, Reason, _Env) ->
        io:format("session(~s/~s) terminated: ~p.", [ClientId, Username, Reason]).
     
    %% transform message and return
    %%根据topic前缀来分发到对应方法:以$SYS/开头
    on_message_publish(Message = #mqtt_message{topic = <<"$SYS/", _/binary>>}, _Env) ->
        {ok, Message};
     
    on_message_publish(Message, _Env) ->
        io:format("publish ~s~n", [emqttd_message:format(Message)]),
        
        Id = Message#mqtt_message.id,
        From = Message#mqtt_message.from, %需要登录和不需要登录这里的返回值是不一样的
        Topic = Message#mqtt_message.topic,
        Payload = Message#mqtt_message.payload,
        Qos = Message#mqtt_message.qos,
        Dup = Message#mqtt_message.dup,
        Retain = Message#mqtt_message.retain,
        Timestamp = Message#mqtt_message.timestamp,
     
        ClientId = c(From),
        Username = u(From),
    
        %%ClientId作为Key
        Key = iolist_to_binary(ClientId),
        %%获得分区数
        Partition = getPartition(Key),
        %%读取配置文件
        {ok, Kafka} = application:get_env(?MODULE, kafka),
        KafkaTopic = proplists:get_value(kafka_topic, Kafka),
     
        Json = mochijson2:encode([
                      {type, <<"publish">>},
                                  {partition_num, Partition},
                      {client_id, ClientId},
                      {message, [
                         {username, Username},
                         {topic, Topic},
                         {payload, Payload},
                         {qos, i(Qos)},
                         {dup, i(Dup)},
                         {retain, i(Retain)}
                        ]},
                      {cluster_node, node()},
                      {ts, emqttd_time:now_ms()}
                     ]),
        %%ok = brod:produce_sync(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_3">>, list_to_binary(Json)),
        {ok, CallRef} = brod:produce(brod_client_1, KafkaTopic, Partition, ClientId, list_to_binary(Json)),
        receive
            #brod_produce_reply{ call_ref = CallRef
                               , result   = brod_produce_req_acked
                           } ->
            ok
        after 5000 ->
            ct:fail({?MODULE, ?LINE, timeout})
        end,
     
        {ok, Message}.
    
    %%key的md5的最后一位进行取模,获取分区数
    getPartition(Key) ->
        {ok, Kafka} = application:get_env(?MODULE, kafka),
        PartitionNum = proplists:get_value(kafka_producer_partition, Kafka),
        <<Fix:120, Match:8>> = crypto:hash(md5, Key),
        abs(Match) rem PartitionNum.
     
    on_message_delivered(ClientId, Username, Message, _Env) ->
        io:format("delivered to client(~s/~s): ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
        {ok, Message}.
     
    on_message_acked(ClientId, Username, Message, _Env) ->
        io:format("client(~s/~s) acked: ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
        {ok, Message}.
     
    %% Called when the plugin application stop
    unload() ->
        %%application:stop(brod),
        emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3),
        emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3),
        emqttd:unhook('client.subscribe', fun ?MODULE:on_client_subscribe/4),
        emqttd:unhook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4),
        emqttd:unhook('session.created', fun ?MODULE:on_session_created/3),
        emqttd:unhook('session.subscribed', fun ?MODULE:on_session_subscribed/4),
        emqttd:unhook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4),
        emqttd:unhook('session.terminated', fun ?MODULE:on_session_terminated/4),
        emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),
        emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/4),
        emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/4).
     
    %% ===================================================================
    %% brod_init https://github.com/klarna/brod
    %% ===================================================================
    brod_init(_Env) ->
        {ok, _} = application:ensure_all_started(brod),
        {ok, Kafka} = application:get_env(?MODULE, kafka),
        KafkaBootstrapEndpoints = proplists:get_value(bootstrap_broker, Kafka),
        %%KafkaBootstrapEndpoints = [{"127.0.0.1", 9092}], %%localhost,172.16.6.161
        %%KafkaBootstrapEndpoints = [{"localhost", 9092}], %%localhost,172.16.6.161
        %%ClientConfig = [{reconnect_cool_down_seconds, 10}],%% socket error recovery
        ClientConfig = [],%% socket error recovery
        {ok, Kafka} = application:get_env(?MODULE, kafka),
        Topic = proplists:get_value(kafka_topic, Kafka),
        Partition = 0,
    %%下面两行是初始化client,一个client只能发送到一个topic,如果要多个topic,则创建多个client
        ok = brod:start_client(KafkaBootstrapEndpoints, brod_client_1, ClientConfig),
        ok = brod:start_producer(brod_client_1, Topic, _ProducerConfig = []),
        %%ok = brod:produce_sync(brod_client_1, Topic, Partition, <<"key1">>, <<"value1">>),
        %%{ok, CallRef} = brod:produce(brod_client_1, Topic, Partition, <<"key1">>, <<"value2">>),
        io:format("Init brod with ~p~n", [KafkaBootstrapEndpoints]).
     
    i(true) -> 1;
    i(false) -> 0;
    i(I) when is_integer(I) -> I.
    c({ClientId, Username}) -> ClientId;
    c(From) -> From.
    u({ClientId, Username}) -> Username;
    u(From) -> From.
    
    

    ④ 新增etc/emq_plugin_kafka.config

    [
      {emq_plugin_kafka, [
            {kafka, [
          { bootstrap_broker, [{"192.168.220.129", 9092},{"192.168.220.130", 9092},{"192.168.220.131", 9092}] },
          { query_api_versions, false },
          { reconnect_cool_down_seconds, 10},
          {kafka_producer_partition, 3},
          {kafka_topic, <<"kafka-new-topic">>}
        ]}
      ]}
    ].
    
    1. 再修改一下emq的配置文件:vim relx.config
      添加:{ekaf, load},
    四、大功告成
    1. 先删除_rel文件夹
    2. make
    3. 控制台启动:./_rel/emqttd/bin/emqttd console
    4. 打开kafka控制台查看日志,这里的“kafka-topic”就是emq_plugin_kafka.conf文件里的配置项:
      ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-topic
    5. 网页测试:


      网页测试

    相关文章

      网友评论

          本文标题:Centos7 搭建v2.3.11 emq broker及编写k

          本文链接:https://www.haomeiwen.com/subject/ntfbrqtx.html