美文网首页java
MQTT JAVA paho实例

MQTT JAVA paho实例

作者: 黄河边的牧马人 | 来源:发表于2018-08-21 17:01 被阅读0次

    上面两篇文章介绍了mqtt服务的安装,

    https://www.jianshu.com/p/d421bb32fe4c

    https://www.jianshu.com/p/879bd7f2db92

    这里介绍一下用java来实现mqtt的订阅和消息分发,我们采用eclipse paho的jar包。

    1、创建java工程,添加依赖

    <dependency>

            <groupId>org.eclipse.paho</groupId>

            <artifactId>org.eclipse.paho.client.mqttv3<artifactId>

            <version>1.2.0</version>

    </dependency>

    2、订阅者

    package com.chen.mqtt.mosquitto;

    import org.eclipse.paho.client.mqttv3.MqttClient;

    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

    import org.eclipse.paho.client.mqttv3.MqttException;

    import org.eclipse.paho.client.mqttv3.MqttTopic;

    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

    import java.util.concurrent.ScheduledExecutorService;

    /**

    * @author: ChenJie

    * @date 2018/8/10

    */

    public class Subscriber {

        public static final StringHOST ="tcp://172.16.1.86:1883";

        public static final StringTOPIC ="speedTopic";

        private static final Stringclientid ="client83";

        private MqttClientclient;

        private MqttConnectOptionsoptions;

    //    private String userName = "admin";

    //    private String passWord = "password";

        private ScheduledExecutorServicescheduler;

        public static void main(String[] args) {

            Subscriber client =new Subscriber();

            client.start();

        }

        private void start() {

            try {

                client =new MqttClient(HOST,clientid,new MemoryPersistence());

                options =new MqttConnectOptions();

                options.setCleanSession(true);

                options.setConnectionTimeout(10);

                options.setKeepAliveInterval(20);

                client.setCallback(new PushCallback());

                MqttTopic topic =client.getTopic(TOPIC);

                //遗言

                options.setWill(topic,"close".getBytes(),2,true);

                client.connect(options);

                int [] qos = {1};

                String  [] topics = {TOPIC};

                client.subscribe(topics,qos);

            }catch (MqttException e) {

                e.printStackTrace();

            }

        }

    }

    3、消息发布者

    package com.chen.mqtt.mosquitto;

    import org.eclipse.paho.client.mqttv3.*;

    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

    /**

    * @author: ChenJie

    * @date 2018/8/10

    */

    public class Publisher {

        //tcp://MQTT安装的服务器地址:MQTT定义的端口号

        public static final StringHOST ="tcp://172.16.1.86:1883";

        //定义一个主题

        public static final StringTOPIC ="speedTopic";

        //定义MQTT的ID,可以在MQTT服务配置中指定

        private static final Stringclientid ="server84";

        private MqttClientclient;

        private MqttTopictopic;

        private StringuserName ="mosquitto";

        private Stringpassword ="";

        private MqttMessagemessage;

        /**

        * 构造函数

        * @throws MqttException

        */

        public Publisher()throws MqttException {

            // MemoryPersistence设置clientid的保存形式,默认为以内存保存

            client =new MqttClient(HOST, clientid, new MemoryPersistence());

            connect();

        }

    private void connect() {

    MqttConnectOptions options =new MqttConnectOptions();

            options.setCleanSession(false);

    //        options.setUserName(userName);

    //        options.setPassword(password.toCharArray());

            //超时时长

            options.setConnectionTimeout(100);

            //心跳时长

            options.setKeepAliveInterval(20);

            options.setServerURIs(new String[]{HOST}  );

            try{

                client.setCallback(new PushCallback() );

                client.connect(options);

                topic =client.getTopic(TOPIC);

            }

            catch(Exception e){

                e.printStackTrace();

            }

    }

    public void publish(MqttTopic topic,MqttMessage message)throws MqttException {

    MqttDeliveryToken token = topic.publish(message);

            System.out.println("等待发送成功:"+token.isComplete());

            token.waitForCompletion();

            System.out.println("已经发送成功:"+token.isComplete());

        }

    public static void main(String[] args)throws MqttException {

            Publisher server =new Publisher();

            server.message =new MqttMessage();

            server.message.setQos(1);

            server.message.setRetained(true);

            for(int i=0;i<10;i++){

                server.message.setPayload(("hello,topic speed "+i).getBytes());

                server.publish(server.topic,server.message);

            }

        }

    }

    4、PushCallback

    package com.chen.mqtt.mosquitto;

    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;

    import org.eclipse.paho.client.mqttv3.MqttCallback;

    import org.eclipse.paho.client.mqttv3.MqttMessage;

    import java.time.LocalDateTime;

    /**

    * 发布消息的回调类

    *

    * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。

    * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。

    * 在回调中,将它用来标识已经启动了该回调的哪个实例。

    * 必须在回调类中实现三个方法:

    *

    *  public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。

    *

    *  public void connectionLost(Throwable cause)在断开连接时调用。

    *

    *  public void deliveryComplete(MqttDeliveryToken token))

    *  接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。

    *  由 MqttClient.connect 激活此回调。

    */

    public class PushCallbackimplements MqttCallback{

    @Override

        public void connectionLost(Throwable throwable) {

            System.out.println("连接断开!");

            System.out.println(LocalDateTime.now());

        }

    @Override

        public void messageArrived(String topic, MqttMessage message)throws Exception {

            System.out.println("接收消息主题 : " + topic);

            System.out.println("接收消息Qos : " + message.getQos());

            System.out.println("接收消息内容 : " +new String(message.getPayload()));

        }

        @Override

        public void deliveryComplete(IMqttDeliveryToken token) {

            System.out.println("分发完成---------" + token.isComplete());

            System.out.println(LocalDateTime.now());

        }

    }

    相关文章

      网友评论

        本文标题:MQTT JAVA paho实例

        本文链接:https://www.haomeiwen.com/subject/qcyjiftx.html