美文网首页
JAVA开发MQTT总结

JAVA开发MQTT总结

作者: 魏春雨 | 来源:发表于2020-02-24 18:12 被阅读0次

    JAVA开发MQTT总结

    MQTT 介绍

    • 它是一种 机器之间通讯 machine-to-machine (M2M)、物联网 Internet of Things (IoT)常用的一种轻量级消息传输协议
    • 适用于网络带宽较低的场合
    • 包含发布、订阅模式,通过一个代理服务器(broker),任何一个客户端(client)都可以订阅或者发布某个主题的消息,然后订阅了该主题的客户端则会收到该消息

    mqtt还是之前公司有需求所以写的一个demo,在这里记录下来,方便有人使用的时候查阅,不涉及mqtt的具体讲解,只是贴代码和运行过程。

    MQTT的入门,以及特性,协议,结构的讲解,请看下面这篇文章

    https://www.runoob.com/w3cnote/mqtt-intro.html

    什么是MQTT,它能干什么,它的应用场景在哪里?请参考下面这篇文章

    https://www.ibm.com/developerworks/cn/iot/iot-mqtt-why-good-for-iot/index.html

    本文中采用的MQTT服务器Apache-Apollo的下载配置搭建过程,请参考下面这篇文章

    https://blog.csdn.net/qq_29350001/article/details/76680646

    下面就开始创建broker,

    RaindeMacBook-Pro:bin rain$ ./apollo create mybroker
    Creating apollo instance at: mybroker
    Generating ssl keystore...
    
    Warning:
    JKS 密钥库使用专用格式。建议使用 "keytool -importkeystore -srckeystore keystore -destkeystore keystore -deststoretype pkcs12" 迁移到行业标准格式 PKCS12。
    
    You can now start the broker by executing:
    
       "/Users/rain/Documents/Soft/apache-apollo-1.7.1/bin/mybroker/bin/apollo-broker" run
    
    Or you can run the broker in the background using:
    
       "/Users/rain/Documents/Soft/apache-apollo-1.7.1/bin/mybroker/bin/apollo-broker-service" start
    
    
    

    进入新生成的broker中

    RaindeMacBook-Pro:bin rain$ ls
    apollo      apollo.cmd  mybroker    testbroker
    RaindeMacBook-Pro:bin rain$ cd mybroker/
    RaindeMacBook-Pro:mybroker rain$ ls
    bin data    etc log tmp
    RaindeMacBook-Pro:mybroker rain$ cd bin
    RaindeMacBook-Pro:bin rain$ ls
    apollo-broker       apollo-broker-service
    

    可以看到有两个文件,启动apollo-broker

    image

    启动成功以后,就可以在浏览器中访问了,默认用户名和密码是admin,password

    image

    刚进去是,Topics选项卡是空的,我是在运行程序后截图的,所以有一个topic列表

    image

    配置Maven

    在pom.xml中添加以下配置

    <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.0</version>
            </dependency>
    

    再创建下面的类

    MqttServer

    import org.eclipse.paho.client.mqttv3.*;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    public class MqttServer2 {
        /**
         * 代理服务器ip地址
         */
        public static final String MQTT_BROKER_HOST = "tcp://127.0.0.1:61613";
    
        /**
         * 订阅标识
         */
        public static final String MQTT_TOPIC = "test2";
    
        private static String userName = "admin";
        private static String password = "password";
    
        /**
         * 客户端唯一标识
         */
        public static final String MQTT_CLIENT_ID = "android_server_xiasuhuei32";
        private static MqttTopic topic;
        private static MqttClient client;
    
        public static void main(String... args) {
            // 推送消息
            MqttMessage message = new MqttMessage();
            try {
                client = new MqttClient(MQTT_BROKER_HOST, MQTT_CLIENT_ID, new MemoryPersistence());
                MqttConnectOptions options = new MqttConnectOptions();
                options.setCleanSession(true);
                options.setUserName(userName);
                options.setPassword(password.toCharArray());
                options.setConnectionTimeout(10);
                options.setKeepAliveInterval(20);
    
                topic = client.getTopic(MQTT_TOPIC);
    
                message.setQos(1);
                message.setRetained(false);
                message.setPayload("message from server222222".getBytes());
                client.connect(options);
    
                while (true) {
                    MqttDeliveryToken token = topic.publish(message);
                    token.waitForCompletion();
                    System.out.println("已经发送222");
                    Thread.sleep(10000);
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    MqttClient

    import org.eclipse.paho.client.mqttv3.*;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    public class MyMqttClient {
        /**
         * 代理服务器ip地址
         */
        public static final String MQTT_BROKER_HOST = "tcp://127.0.0.1:61613";
    
        /**
         * 客户端唯一标识
         */
        public static final String MQTT_CLIENT_ID = "android_xiasuhuei321";
    
        /**
         * 订阅标识
         */
    //    public static final String MQTT_TOPIC = "xiasuhuei321";
    
        /**
         *
         */
        public static final String USERNAME = "admin";
        /**
         * 密码
         */
        public static final String PASSWORD = "password";
        public static final String TOPIC_FILTER = "test2";
    
        private volatile static MqttClient mqttClient;
        private static MqttConnectOptions options;
    
        public static void main(String... args) {
            try {
                // host为主机名,clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,
                // MemoryPersistence设置clientid的保存形式,默认为以内存保存
    
                mqttClient = new MqttClient(MQTT_BROKER_HOST, MQTT_CLIENT_ID, new MemoryPersistence());
                // 配置参数信息
                options = new MqttConnectOptions();
                // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
                // 这里设置为true表示每次连接到服务器都以新的身份连接
                options.setCleanSession(true);
                // 设置用户名
                options.setUserName(USERNAME);
                // 设置密码
                options.setPassword(PASSWORD.toCharArray());
                // 设置超时时间 单位为秒
                options.setConnectionTimeout(10);
                // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
                options.setKeepAliveInterval(20);
                // 连接
                mqttClient.connect(options);
                // 订阅
                mqttClient.subscribe(TOPIC_FILTER);
                // 设置回调
                mqttClient.setCallback(new MqttCallback() {
                    @Override
                    public void connectionLost(Throwable throwable) {
                        System.out.println("connectionLost");
                    }
    
                    @Override
                    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                        System.out.println("Topic: " + s + " Message: " + mqttMessage.toString());
                    }
    
                    @Override
                    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    
    }
    

    PublishSample

    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    /**
     *发布端
     */
    public class PublishSample {
        public static void main(String[] args) {
    
            String topic = "test2";
            String content = "hello 哈哈";
            int qos = 1;
            String broker = "tcp://127.0.0.1:61613";
            String userName = "admin";
            String password = "password";
            String clientId = "pubClient";
            // 内存存储
            MemoryPersistence persistence = new MemoryPersistence();
    
            try {
                // 创建客户端
                MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
                // 创建链接参数
                MqttConnectOptions connOpts = new MqttConnectOptions();
                // 在重新启动和重新连接时记住状态
                connOpts.setCleanSession(false);
                // 设置连接的用户名
                connOpts.setUserName(userName);
                connOpts.setPassword(password.toCharArray());
                // 建立连接
                sampleClient.connect(connOpts);
                // 创建消息
                MqttMessage message = new MqttMessage(content.getBytes());
                // 设置消息的服务质量
                message.setQos(qos);
                // 发布消息
                sampleClient.publish(topic, message);
                // 断开连接
                sampleClient.disconnect();
                // 关闭客户端
                sampleClient.close();
            } catch (MqttException me) {
                System.out.println("reason " + me.getReasonCode());
                System.out.println("msg " + me.getMessage());
                System.out.println("loc " + me.getLocalizedMessage());
                System.out.println("cause " + me.getCause());
                System.out.println("excep " + me);
                me.printStackTrace();
            }
        }
    }
    

    SubscribeSample

    import org.eclipse.paho.client.mqttv3.*;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    /**
     *订阅端
     */
    public class SubscribeSample {
    
        public static void main(String[] args) throws MqttException {
            String HOST = "tcp://127.0.0.1:61613";
            String TOPIC = "test2";
            int qos = 1;
            String clientid = "subClient";
            String userName = "admin";
            String passWord = "password";
            try {
                // host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
                MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence());
                // MQTT的连接设置
                MqttConnectOptions options = new MqttConnectOptions();
                // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
                options.setCleanSession(true);
                // 设置连接的用户名
                options.setUserName(userName);
                // 设置连接的密码
                options.setPassword(passWord.toCharArray());
                // 设置超时时间 单位为秒
                options.setConnectionTimeout(10);
                // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
                options.setKeepAliveInterval(20);
                // 设置回调函数
                client.setCallback(new MqttCallback() {
    
                    public void connectionLost(Throwable cause) {
                        System.out.println("connectionLost");
                    }
    
                    public void messageArrived(String topic, MqttMessage message) throws Exception {
                        System.out.println("topic:"+topic);
                        System.out.println("Qos:"+message.getQos());
                        System.out.println("message content:"+new String(message.getPayload()));
    
                    }
    
                    public void deliveryComplete(IMqttDeliveryToken token) {
                        System.out.println("deliveryComplete---------"+ token.isComplete());
                    }
    
                });
                client.connect(options);
                //订阅消息
                client.subscribe(TOPIC, qos);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    启动程序

    1.启动MqttServer2以后,开始循环发送消息。

    image

    2.启动MyMqttClient开始接收消息。

    image

    到这里,整个程序基本可以运行。

    3.启动PublishSample,发布一条消息,在启动SubscribeSample来订阅发布的消息。

    image

    4.发布的消息在MyMqttClient中也会显示出来

    image

    相关文章

      网友评论

          本文标题:JAVA开发MQTT总结

          本文链接:https://www.haomeiwen.com/subject/jvzbqhtx.html