美文网首页技术干货Android开发Android开发经验谈
MQTT从搭建代理服务器到推送消息过程

MQTT从搭建代理服务器到推送消息过程

作者: 孤独的二狗 | 来源:发表于2017-07-19 12:35 被阅读0次

    MQTT简介:

    MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议!
    

    MQTT消息的主要特点:

    使用(publish/subscribe)消息模式,简称p/s模式,即发布/订阅!提供一对多的发送方式!
    
    MQTT根据QoS定义的等级来传输消息:
    • level 0:最多一次的传输
    消息是基于TCP/IP网络传输的。没有回应,在协议中也没有定义重传的语义。消息可能到达服务器1次,也可能根本不会到达。
    
    • level 1:至少一次的传输
    服务器接收到消息会被确认,通过传输一个PUBACK信息。如果有一个可以辨认的传输失败,无论是通讯连接还是发送设备,还是过了一段时间确认信息没有收到,发送方都会将消息头的DUP位置1,然后再次发送消息。消息最少一次到达服务器。SUBSCRIBE和UNSUBSCRIBE都使用level 1 的QoS。
    如果客户端没有接收到PUBACK信息(无论是应用定义的超时,还是检测到失败然后通讯session重启),客户端都会再次发送PUBLISH信息,并且将DUP位置1。
    当它从客户端接收到重复的数据,服务器重新发送消息给订阅者,并且发送另一个PUBACK消息。
    
    • level 2: 只有一次的传输
    在QoS level 1上附加的协议流保证了重复的消息不会传送到接收的应用。这是最高级别的传输,当重复的消息不被允许的情况下使用。这样增加了网络流量,但是它通常是可以接受的,因为消息内容很重要。
    QoS level 2在消息头有Message ID。
    

    接下来开始我们的表演:

    下载代理服务器
    创建代理服务器
    • 下载完成然后解压目录

    • 打开dos窗口进入到apache-apollo-1.7.1\bin目录下

    • 执行apollo create testbroker命令创建一个名称为testbroker的代理服务器

    • Paste_Image.png
    • 下面就是我们创建的代理服务器


      Paste_Image.png
    启动代理服务器
    • 使用dos进入testbroker目录中的bin目录下

    • 执行apollo-broker run命令启动代理服务器

    • Paste_Image.png
    通过HTTP访问代理服务器

    现在我们可以打开浏览器看下我们的代理服务器
    输入网址http://127.0.0.1:61680/

    • Paste_Image.png
    • 用户名密码可到配置文件中查看

    • 进入testbroker目录下的etc目录

    Paste_Image.png
    • users.properties中配置的用户名和密码
    Paste_Image.png
    • 默认有个用户名为admin,密码为password的用户

    • 我们也可以自己配置用户

    • 现在就用默认用户登陆

    • Paste_Image.png

    OK登陆成功

    接下来我们编写Android客户端

    Paste_Image.png
    一定保证客户端和服务端以及代理服务器所在的电脑在同一网段下
    • 可以在电脑上生成wifi热点,手机客户端连接热点即可
    • 接下来直接贴Android客户端代码
    MqttService.java
    package com.example.jingwc.mqtt_demo;
    
    import android.app.Service;
    import android.content.Intent;
    import android.os.Binder;
    import android.os.IBinder;
    import android.util.Log;
    
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallback;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.MqttTopic;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    public class MqttService extends Service {
    
        /**
         * 代理服务器ip地址
         */
        public static final String MQTT_BROKER_HOST = "tcp://192.168.1.107:61613";
    
        /**
         * 客户端唯一标识
         */
        public static final String MQTT_CLIENT_ID = "android-jingwc";
    
        /**
         * 订阅标识
         */
        public static final String MQTT_TOPIC = "jingwc";
    
        /**
         * 用户名
         */
        public static final String USERNAME = "admin";
        /**
         *  密码
         */
        public static final String PASSWORD = "password";
    
        private MqttClient mqttClient;
    
        public MqttService() {
        }
    
        @Override
        public IBinder onBind(Intent intent) {
            return binder;
        }
    
        /**
         * 连接mqtt
         */
        public void connect(){
            try {
                // host为主机名,clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,
                // MemoryPersistence设置clientid的保存形式,默认为以内存保存
                mqttClient = new MqttClient(MQTT_BROKER_HOST,MQTT_CLIENT_ID,new MemoryPersistence());
                // 配置参数信息
                MqttConnectOptions options = new MqttConnectOptions();
                // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
                // 这里设置为true表示每次连接到服务器都以新的身份连接
                options.setCleanSession(true);
                // 设置用户名
                options.setUserName(USERNAME);
                // 设置密码
                options.setPassword(PASSWORD.toCharArray());
                // 设置超时时间 单位为秒
                options.setConnectionTimeout(10);
                // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
                options.setKeepAliveInterval(20);
                // 连接
                mqttClient.connect(options);
    
                // 订阅
                mqttClient.subscribe(MQTT_TOPIC);
    
                // 设置回调
                mqttClient.setCallback(new MqttCallback() {
                    //连接丢失后,一般在这里面进行重连
                    @Override
                    public void connectionLost(Throwable throwable) {
                        Log.d("test","connectionLost");
                    }
    
    
                    //subscribe后得到的消息会执行到这里面
                    @Override
                    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                        Log.d("test","messageArrived"+mqttMessage.toString());
                    }
    
                    //publish后会执行到这里
                    @Override
                    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                        Log.d("test","deliveryComplete");
                    }
                });
    
            } catch (MqttException e) {
                e.printStackTrace();
            }catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 断开连接
         */
        public void disconnect(){
            if(mqttClient != null){
                if(mqttClient.isConnected()){
                    try {
                        mqttClient.disconnect();
                        mqttClient = null;
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        private final Binder binder = new MyBinder();
    
        class MyBinder extends Binder{
    
            public MqttService getService(){
                return MqttService.this;
            }
        }
    }
    
    
    MainActivity.java
    package com.example.jingwc.mqtt_demo;
    
    import android.content.ComponentName;
    import android.content.Intent;
    import android.content.ServiceConnection;
    import android.os.IBinder;
    import android.support.v7.app.AppCompatActivity;
    import android.os.Bundle;
    import android.view.View;
    import android.widget.Button;
    import android.widget.EditText;
    
    public class MainActivity extends AppCompatActivity {
    
        MqttService service = null;
    
        private ServiceConnection mConnection = new ServiceConnection() {
            @Override
            public void onServiceConnected(ComponentName componentName, IBinder iBinder) {
                service = ((MqttService.MyBinder)iBinder).getService();
            }
    
            @Override
            public void onServiceDisconnected(ComponentName componentName) {
                service = null;
            }
        };
    
        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_main);
    
            Button bt_connect = (Button) findViewById(R.id.bt_connect);
            Button bt_disconnect = (Button) findViewById(R.id.bt_disconnect);
    
            bt_connect.setOnClickListener(new View.OnClickListener() {
                @Override
                public void onClick(View view) {
                    // 连接
                    service.connect();
                }
            });
    
            bt_disconnect.setOnClickListener(new View.OnClickListener() {
                @Override
                public void onClick(View view) {
                    // 断开连接
                    service.disconnect();
                }
            });
    
            bindService(new Intent(this,MqttService.class),mConnection,BIND_AUTO_CREATE);
        }
    }
    

    服务端代码

    • 也可以在写一个android程序当作服务端
    • 我这里写的是java项目
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.MqttTopic;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    public class MqttServer {
        
        // 代理服务器ip地址
        private static String host = "tcp://192.168.1.107:61613";
        
        private static String userName = "admin";
        private static String password = "password";
        
        private static MqttClient client;
        
        // 主题
        private static MqttTopic topic;
        
        private static MqttMessage message;
        
        // 订阅标识
        private static String topicStr = "jingwc";
        
        public static void main(String[] args) throws MqttException{
            client = new MqttClient(host,"java-server-jingwc",new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setUserName(userName);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(10);
            options.setKeepAliveInterval(20);
            
            topic = client.getTopic(topicStr);
            
            message = new MqttMessage();
            message.setQos(1);
            message.setRetained(true);
            message.setPayload("from server message".getBytes());
            client.connect(options);
            MqttDeliveryToken token = topic.publish(message);
            token.waitForCompletion();
            System.out.println("token:"+token.isComplete());
            
        }
    
    }
    

    服务端通过代理服务器发送客户端订阅的消息图

    • Paste_Image.png
    Paste_Image.png

    相关文章

      网友评论

        本文标题:MQTT从搭建代理服务器到推送消息过程

        本文链接:https://www.haomeiwen.com/subject/exsckxtx.html