美文网首页程序员
Kafka将EMQX的消息传递给消费者处理(Windows平台)

Kafka将EMQX的消息传递给消费者处理(Windows平台)

作者: delicacylee | 来源:发表于2020-11-23 14:06 被阅读0次

1、Docker安装Kafka

打开Cmd命令行

docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka

创建docker-compose.yml文件

version: '1'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    depends_on: [ zookeeper ]
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 本机IP
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - 本地路径(如D:\)docker.sock:/var/run/docker.sock

在docker-compose.yml文件目录进行服务打包

docker-compose build

启动服务

docker-compose up -d

创建两个Topic为后面的程序使用

kafka-topics.sh --zookeeper 本机IP:2181 --create --replication-factor 1 --partitions 3 --topic first  // 生产者使用 
kafka-topics.sh --zookeeper 本机IP:2181 --create --replication-factor 1 --partitions 3 --topic second // 消费者使用 

2、用Java获取Emq(见之前的文档)的数据,并由生产者发出

pom.xml(同消费者)

<dependencies>
    <!--mqtt-->
    <dependency>
        <groupId>org.eclipse.paho</groupId>
        <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
        <version>1.2.5</version>
    </dependency>
     <!--kafka-->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.6.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.6.0</version>
    </dependency>
    <!--日志-->
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-nop</artifactId>
        <version>1.7.22</version>
    </dependency>
</dependencies>

MqttKafkaProducer.java

package cc.hiver.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;

/**
 * MQTT生产者
 */
public class MqttKafkaProducer {
    /**
     * 向Kafka传入数据
     * @param msgData
     */
    public static void pushData(String msgData) {
        Properties props = new Properties();
        // 集群地址,多个服务器用","分隔
        props.put("bootstrap.servers", "本机IP:9092");
        // 重新发送消息次数,到达次数返回错误
        props.put("retries", 0);
        // Producer会尝试去把发往同一个Partition的多个Requests进行合并,batch.size指明了一次Batch合并后Requests总大小的上限。如果这个值设置的太小,可能会导致所有的Request都不进行Batch。
        props.put("batch.size", 163840);
        // Producer默认会把两次发送时间间隔内收集到的所有Requests进行一次聚合然后再发送,以此提高吞吐量,而linger.ms则更进一步,这个参数为每次发送增加一些delay,以此来聚合更多的Message。
        props.put("linger.ms", 1);
        // 在Producer端用来存放尚未发送出去的Message的缓冲区大小
        props.put("buffer.memory", 33554432);
        // key、value的序列化,此处以字符串为例,使用kafka已有的序列化类
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // props.put("partitioner.class", "com.kafka.demo.Partitioner");//分区操作,此处未写
        props.put("acks", "1");
        props.put("request.timeout.ms", "60000");
        props.put("compression.type", "lz4");
        //创建生产者
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        //通过时间做轮循,均匀分布设置的partition,提高效率。
        int partition = (int) (System.currentTimeMillis() % 3);
        //写入名为"test-partition-1"的topic
        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("first", partition, UUID.randomUUID().toString(), msgData);
        try {
            producer.send(producerRecord).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("写入emqtopic到first:" + msgData);
    }
}

OnMessageCallback.java

package cc.hiver.producer;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * 消息回调函数
 */
public class OnMessageCallback implements MqttCallback {
    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        System.out.println("连接断开,可以做重连");
    }

    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe后得到的消息会执行到这里面
        System.out.println("接收消息主题:" + topic);
        System.out.println("接收消息Qos:" + message.getQos());
        System.out.println("接收消息内容:" + new String(message.getPayload()));

        //接收到的消息发送到Kafka
        MqttKafkaProducer.pushData(new String(message.getPayload()));
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
    }
}

ProducerApp.java

package cc.hiver.producer;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * 生产者App
 */
public class ProducerApp {
    /**
     * 主程序
     *
     * @param args
     */
    public static void main(String[] args) {
        // 订阅的主题
        String subTopic = "testtopic/#";
        // Broker服务
        String broker = "tcp://本机IP:1883";
        // 客户端名称
        String clientId = "mqtt_java_hiver";
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient client = new MqttClient(broker, clientId, persistence);
            // MQTT 连接选项
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName("admin");    // 用户名
            connOpts.setPassword("public".toCharArray());     // 密码
            // 保留会话
            connOpts.setCleanSession(true);
            // 设置回调
            client.setCallback(new OnMessageCallback());
            // 建立连接
            System.out.println("Connecting to broker: " + broker);
            client.connect(connOpts);
            System.out.println("Connected");
            // 订阅
            client.subscribe(subTopic);
        } catch (MqttException me) {
            // 异常捕捉
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }
}

3、消费者

LogProcessor.java

package cc.hiver.consumer;

import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;

/**
 * 日志清理
 */
public class LogProcessor implements Processor<byte[], byte[]> {

    private ProcessorContext context;

    public void init(ProcessorContext context) {
        this.context = context;
    }

    public void process(byte[] key, byte[] value) {
        String input = new String(value);
        if(input.contains("hello")) {
            System.out.println("logProcessor:" + input.toString());
            context.forward("logProcessor".getBytes(), input.getBytes());
        } else {
            // 这里可以进行数据清理
            // 输出到下一个topic
            context.forward("logProcessor".getBytes(), input.getBytes());
        }
    }

    public void punctuate(long timestamp) {

    }

    public void close() {

    }
}

ConsumerApp.java

package cc.hiver.consumer;

import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;

public class ConsumerApp {
    public static void main(String[] args) {
        // 定义输入的topic
        String from = "first";
        // 定义输出的topic
        String to = "second";

        // 设置参数
        Properties settings = new Properties();
        settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
        settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "本机IP:9092");

        StreamsConfig config = new StreamsConfig(settings);

        // 构建拓扑
        Topology builder = new Topology();

        builder.addSource("SOURCE", from)
                .addProcessor("PROCESS", new ProcessorSupplier<byte[], byte[]>() {

                    @Override
                    public Processor<byte[], byte[]> get() {
                        // 具体分析处理
                        return new LogProcessor();
                    }
                }, "SOURCE")
                .addSink("SINK", to, "PROCESS");

        // 创建kafka stream
        KafkaStreams streams = new KafkaStreams(builder, config);
        streams.start();
    }
}

4、测试数据

Emq X Broker消息模拟
接收消息并通过Kafka转发至消费者
消费者接收消息进行处理

附录:kafka集群管理界面

docker run -itd --name=kafka-manager -p 9000:9000 -e ZK_HOSTS="本机IP:2181" sheepkiller/kafka-manager

访问:http://本机IP:9000


管理后台

相关文章

  • Kafka将EMQX的消息传递给消费者处理(Windows平台)

    1、Docker安装Kafka 打开Cmd命令行 创建docker-compose.yml文件 在docker-c...

  • Pulsar的Key_Shared消费模式与Kafka的消费者再

    Kafka消费者再均衡 在Kafka中,一个分区只能有一个消费者,处于一个消费组的消费者,处理消息的时候是...

  • Kafka集群部署指南

    一、前言 1、Kafka简介 Kafka是一个开源的分布式消息引擎/消息中间件,同时Kafka也是一个流处理平台。...

  • Kafka集群部署指南

    一、前言 1、Kafka简介 Kafka是一个开源的分布式消息引擎/消息中间件,同时Kafka也是一个流处理平台。...

  • Kafka 学习笔记

    一、Kafka简介 Kafka (科技术语)。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规...

  • Kafka集群部署

    一、前言 1.Kafka简介 Kafka是一个开源的分布式消息引擎/消息中间件,同时Kafka也是一个流处理平台。...

  • Kafka集群

    一、简介 Kafka简介Kafka是一个开源的分布式消息引擎/消息中间件,同时Kafka也是一个流处理平台。Kak...

  • 清华架构师熬夜整理,带你走进Kafka

    Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所在动作流数据。 Kafka基础 消息...

  • 如何快速全面掌握Kafka?5000字吐血整理

    Kafka 是目前主流的分布式消息引擎及流处理平台,经常用做企业的消息总线、实时数据管道,本文挑选了 Kafka ...

  • Kafka灵魂30问 - 核心篇

    1、 Kafka只是消息引擎系统吗? 除了作为消息引擎,Kafka 能够被用作流处理平台和分布式存储系统。 2、K...

网友评论

    本文标题:Kafka将EMQX的消息传递给消费者处理(Windows平台)

    本文链接:https://www.haomeiwen.com/subject/uyamiktx.html