美文网首页消息中间件
Java emq客户端的简单使用

Java emq客户端的简单使用

作者: LssTechnology | 来源:发表于2019-10-08 19:41 被阅读0次
    前言
    在java开发中,经常会用到消息服务,emq是我们现在通常使用的消息中间件,搭建一个emq服务器此处就不做过多讲解,本文主要说的是基于java环境搭建一个mqtt客户端。
    你也可以参考emq官网的示例进行搭建,但官网过于简单,本文会有一些详细的参数配置 emq官方文档
    1、引入pom依赖
    <dependency>
        <groupId>org.eclipse.paho</groupId>
        <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
        <version>1.2.0</version>
    </dependency>
    
    2、application.yml配置
    # emq
    mqtt:
      clientID: client-test
      username: admin
      password: public
      serverURI: tcp://127.0.0.1:1883
      isReconnect: true
      keepAliveInterval: 10
      topic: topic-test
    
    3、ClientMQTT.java 相关代码
    package cn.bdp.ws.emq;
    
    import cn.bdp.ws.service.DmcMapService;
    import cn.bdp.ws.service.MessageConsumeService;
    import lombok.extern.slf4j.Slf4j;
    import org.eclipse.paho.client.mqttv3.*;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import javax.annotation.PreDestroy;
    import java.util.concurrent.ScheduledExecutorService;
    
    @Component
    @Slf4j
    public class ClientMQTT {
    
        @Value("${mqtt.serverURI}")
        private String serverURI;
    
        @Value("${mqtt.clientID}")
        private String clientId;
    
        @Value("${mqtt.username}")
        private String username;
    
        @Value("${mqtt.password}")
        private String password;
    
        @Value("${mqtt.isReconnect}")
        private boolean isReconnect;
    
        @Value("${mqtt.keepAliveInterval}")
        private int keepAliveInterval;
    
        @Value("${mqtt.topic}")
        private String mqttTopic;
    
        private MqttClient client;
        private MqttConnectOptions options;
    
        @Autowired
        DmcMapService dmcMapService;
    
        @Autowired
        MessageConsumeService messageConsumeService;
    
        @PostConstruct
        private void start() {
    
            try {
                client = new MqttClient(serverURI, clientId, new MemoryPersistence());
                options = new MqttConnectOptions();
                options.setCleanSession(true);
                options = new MqttConnectOptions();
                // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
                options.setCleanSession(true);
                // 设置连接的用户名
                options.setUserName(username);
                // 设置连接的密码
                options.setPassword(password.toCharArray());
                // 设置超时时间 单位为秒
                options.setConnectionTimeout(10);
                // 设置会话心跳时间 单位为秒
                options.setKeepAliveInterval(keepAliveInterval);
                // 设置自动重连
                options.setAutomaticReconnect(true);
                System.out.println("Connecting to broker:" + serverURI);
                client.connect(options);
                System.out.println("Connected");
                System.out.println("Subscribe to topic:" + mqttTopic);
                client.subscribe(mqttTopic);
                // 此处使用的MqttCallbackExtended类而不是MqttCallback,是因为如果emq服务出现异常导致客户端断开连接后,重连后会自动调用connectComplete方法
                client.setCallback(new MqttCallbackExtended() {
                    @Override
                    public void connectComplete(boolean reconnect, String serverURI) {
                        System.out.println("连接完成...");
                        try {
                            // 重连后要自己重新订阅topic,这样emq服务发的消息才能重新接收到,不然的话,断开后客户端只是重新连接了服务,并没有自动订阅,导致接收不到消息
                            client.subscribe(mqttTopic);
                            log.info("订阅成功");
                        }catch (Exception e){
                            log.info("订阅出现异常:{}", e);
                        }
    
                    }
    
                    @Override
                    public void connectionLost(Throwable cause) {
                        System.out.println("失去连接....");
                    }
    
                    @Override
                    public void messageArrived(String topic, MqttMessage message) throws Exception {
                        // subscribe后得到的消息会执行到这里面
                        String content = new String(message.getPayload());
                        System.out.println("接收消息主题 : " + topic);
                        System.out.println("接收消息Qos : " + message.getQos());
                        System.out.println("接收消息内容 : " + content);
                        // 处理数据
                        messageConsumeService.dealEmqContent(content);
                    }
    
                    @Override
                    public void deliveryComplete(IMqttDeliveryToken token) {
                        System.out.println("deliveryComplete....");
                    }
                });
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        @PreDestroy
        public void end(){
            try {
                client.disconnect();
                client.close();
            } catch (MqttException e) {
                log.info("断开连接失败:{}", e);
            }
        }
    }
    

    此时,重启服务,如果emq服务器正常,客户端就可以正常连接

    注意点(两个):

    1、client.setCallback(new MqttCallbackExtended()
    此处使用的MqttCallbackExtended类而不是MqttCallback,是因为如果emq服务出现异常导致客户端断开连接后,重连后会自动调用connectComplete方法
    2、client.subscribe(mqttTopic)
    重连后要自己重新订阅topic,这样emq服务发的消息才能重新接收到,不然的话,断开后客户端只是重新连接了服务,并没有自动订阅,导致接收不到消息

    如有不对,欢迎指正,希望对你有所帮助!

    相关文章

      网友评论

        本文标题:Java emq客户端的简单使用

        本文链接:https://www.haomeiwen.com/subject/vlripctx.html