美文网首页
Mqtt3.1的常用Api解析

Mqtt3.1的常用Api解析

作者: AirLan | 来源:发表于2017-09-19 13:47 被阅读441次

    本文主要是讲解Mqtt3.1协议下jar包的相关Api

    Mqtt对应jar包

    首先来看一下两个接口:

    IMqttClient和IMqttAsyncClient为了更好的对比这两个接口我就将二者的方法写在一起了(注释的方法为IMqttAsyncClient的接口方法)
    public interface IMqttClient {
    
         void connect() throws MqttSecurityException, MqttException;
         void connect(MqttConnectOptions var1) throws MqttSecurityException, MqttException;
         IMqttToken connectWithResult(MqttConnectOptions var1) throws MqttSecurityException, MqttException;  
      
      /**
        *以下是IMqttAsyncClient的connect方法
        */
       IMqttToken connect() throws MqttException, MqttSecurityException;
       IMqttToken connect(MqttConnectOptions var1) throws MqttException, 
         MqttSecurityException;
      IMqttToken connect(Object var1, IMqttActionListener var2) throws 
        MqttException, MqttSecurityException;
      IMqttToken connect(MqttConnectOptions var1, Object var2, IMqttActionListener var3) throws MqttException, MqttSecurityException;
    
    ============================================================
    
        void disconnect() throws MqttException;
        void disconnect(long var1) throws MqttException;
        void disconnectForcibly() throws MqttException;
        void disconnectForcibly(long var1) throws MqttException;
        void disconnectForcibly(long var1, long var3) throws MqttException;
        
      /**
        *以下是IMqttAsyncClient的disconnect方法
        */
        IMqttToken disconnect() throws MqttException;
        IMqttToken disconnect(long var1) throws MqttException;
        IMqttToken disconnect(Object var1, IMqttActionListener var2) throws MqttException;
        IMqttToken disconnect(long var1, Object var3, IMqttActionListener var4) throws MqttException;
        void disconnectForcibly() throws MqttException;
        void disconnectForcibly(long var1) throws MqttException;
        void disconnectForcibly(long var1, long var3) throws MqttException;
        
    ============================================================
    
        void subscribe(String var1) throws MqttException, MqttSecurityException;
        void subscribe(String[] var1) throws MqttException;
        void subscribe(String var1, int var2) throws MqttException;
        void subscribe(String[] var1, int[] var2) throws MqttException;
        void subscribe(String var1, IMqttMessageListener var2) throws MqttException, MqttSecurityException;
        void subscribe(String[] var1, IMqttMessageListener[] var2) throws MqttException;
        void subscribe(String var1, int var2, IMqttMessageListener var3) throws MqttException;
        void subscribe(String[] var1, int[] var2, IMqttMessageListener[] var3) throws MqttException;
    
        /**
        *以下是IMqttAsyncClient的subscribe方法
        */
        IMqttToken subscribe(String var1, int var2) throws MqttException;
        IMqttToken subscribe(String var1, int var2, Object var3, IMqttActionListener var4) throws MqttException;
        IMqttToken subscribe(String[] var1, int[] var2) throws MqttException;
        IMqttToken subscribe(String[] var1, int[] var2, Object var3, IMqttActionListener var4) throws MqttException;
        IMqttToken subscribe(String var1, int var2, Object var3, IMqttActionListener var4, IMqttMessageListener var5) throws MqttException;
        IMqttToken subscribe(String var1, int var2, IMqttMessageListener var3) throws MqttException;
        IMqttToken subscribe(String[] var1, int[] var2, IMqttMessageListener[] var3) throws MqttException;
        IMqttToken subscribe(String[] var1, int[] var2, Object var3, IMqttActionListener var4, IMqttMessageListener[] var5) throws MqttException;
    
     ============================================================
        void unsubscribe(String var1) throws MqttException;
        void unsubscribe(String[] var1) throws MqttException;
        
      /**
        *以下是IMqttAsyncClient的unsubscribe方法
        */
        IMqttToken unsubscribe(String var1) throws MqttException;
        IMqttToken unsubscribe(String[] var1) throws MqttException;
        IMqttToken unsubscribe(String var1, Object var2, IMqttActionListener var3) throws MqttException;
        IMqttToken unsubscribe(String[] var1, Object var2, IMqttActionListener var3) throws MqttException;
    
     ============================================================
    
        void publish(String var1, byte[] var2, int var3, boolean var4) throws MqttException, MqttPersistenceException;
        void publish(String var1, MqttMessage var2) throws MqttException, MqttPersistenceException;
    
      /**
        *以下是IMqttAsyncClient的publish方法
        */
        IMqttDeliveryToken publish(String var1, byte[] var2, int var3, boolean var4) throws MqttException, MqttPersistenceException;
        IMqttDeliveryToken publish(String var1, byte[] var2, int var3, boolean var4, Object var5, IMqttActionListener var6) throws MqttException, MqttPersistenceException;
        IMqttDeliveryToken publish(String var1, MqttMessage var2) throws MqttException, MqttPersistenceException;
        IMqttDeliveryToken publish(String var1, MqttMessage var2, Object var3, IMqttActionListener var4) throws MqttException, MqttPersistenceException;
    
     ============================================================
    
        void setCallback(MqttCallback var1);
      /**
        *以下是IMqttAsyncClient的setCallback方法
        */
        void setCallback(MqttCallback var1);
    
     ============================================================
    
        MqttTopic getTopic(String var1);//IMqttAsyncClient无此方法
    
     ============================================================
    
        boolean isConnected();
      /**
        *以下是IMqttAsyncClient的isConnected方法
        */
        boolean isConnected();
    
     ============================================================
    
        String getClientId();
        String getServerURI();
    
      /**
        *以下是IMqttAsyncClient的getClientId和getServerURI方法
        */
        String getClientId();
        String getServerURI();
    
     ============================================================
    
        IMqttDeliveryToken[] getPendingDeliveryTokens();
        void setManualAcks(boolean var1);
        void messageArrivedComplete(int var1, int var2) throws MqttException;
        void close() throws MqttException;
    
      /**
        *以下是IMqttAsyncClient对应的方法
        */
        IMqttDeliveryToken[] getPendingDeliveryTokens();
        void setManualAcks(boolean var1);
        void messageArrivedComplete(int var1, int var2) throws MqttException;
        void close() throws MqttException;
    }
    

    可以看到这两个接口定义的方法基本相同,从接口名也可看出一个是同步的一个是异步的,这两个接口对应的唯一实现类分别是:MqttClient和MqttAsyncClient使用的时候根据实际需求使用;接下来是初步实现的一个Mqtt连接(采用了前台服务,守护服务,广播监听来实现互相唤醒尽最大可能不被系统杀掉)连接的建立核心部分在initData()方法:

    public class IqPushService extends Service {
       
        private MqttAsyncClient mqttClient;
        private MqttConnectOptions options;
        private RegisterDeviceResult registerDeviceResult;
        private Subscription subscription;
        private boolean currentStatus;
        private static String message = "";
        private static long preTime;
        private static String pakageName = "";
    
        @Override
        public void onCreate() {
            super.onCreate();
            //使用了前台服务,防杀
            NotificationCompat.Builder builder = new NotificationCompat.Builder(this);
            builder.setSmallIcon(android.R.drawable.ic_menu_manage);
            builder.setContentTitle("前台服务");
            builder.setContentText("推送");
            Notification notification = builder.build();
            startForeground(110, notification);
        }
    
        @Nullable
        @Override
        public IBinder onBind(Intent intent) {
            return null;
        }
    
        @Override
        public int onStartCommand(final Intent intent, int flags, final int startId) {
            pakageName = getPackageName();
            initData();
            if (subscription != null && !subscription.isUnsubscribed()) {
                subscription.unsubscribe();
            }
            subscription = Observable.interval(5, 20, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    boolean flag = ProcessCheckUtils.checkServiceRunning(IqPushService.this, ProtectedService.class.getName());
                    if (!flag) {
                        Intent intent1 = new Intent(getApplicationContext(), ProtectedService.class);
                        startService(intent1);
                    }
                    reConnect();
                }
            });
            return super.onStartCommand(intent, flags, startId);
        }
    
        //连接的建立
        private void initData() {
            registerDeviceResult = AppDeviceInfo.getDeviceResult();
            if (registerDeviceResult != null && NetworkUtils.isConnected()) {
                RegisterDeviceResult.DataBean dataBean = registerDeviceResult.getData();
                if (dataBean != null) {
                    try {
                        mqttClient = new MqttAsyncClient("tcp://" + dataBean.getHost() + ":" + dataBean.getPort(), dataBean.getClient_id(), new MemoryPersistence());
                        options = new MqttConnectOptions();
                        options.setUserName(dataBean.getUsername());
                        options.setPassword(dataBean.getPassword().toCharArray());
                        options.setConnectionTimeout(10);
                        options.setKeepAliveInterval(20);
                        options.setAutomaticReconnect(false);
                        options.setCleanSession(false);
                        mqttClient.setCallback(new MqttCallback() {
                            @Override
                            public void connectionLost(Throwable throwable) {
                                    reConnect();
                            }
    
                            @Override
                            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                                if (!message.equals(mqttMessage.toString()) || System.currentTimeMillis() - preTime > 5000) {
                                    message = mqttMessage.toString();
                                    preTime = System.currentTimeMillis();
                                    sendMessage(s, mqttMessage.toString());
                                }
    
                            }
    
                            @Override
                            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    
                            }
                        });
                        mqttClient.connect(options, null, new IMqttActionListener() {
                            @Override
                            public void onSuccess(IMqttToken iMqttToken) {
                                if (!TextUtils.isEmpty(registerDeviceResult.getData().getTopic())) {
                                    try {
                                        JSONArray jsonArray = new JSONArray(registerDeviceResult.getData().getTopic());
                                        for (int i = 0; i < jsonArray.length(); i++) {
                                            mqttClient.subscribe(jsonArray.getString(i), 2);
                                        }
                                        sendStatus(true);
                                    } catch (Exception e) {
                                        e.printStackTrace();
                                    }
                                }
                            }
    
                            @Override
                            public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
                                sendStatus(false);
                                reConnect();
                            }
                        });
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                }
            }
    
        }
    
        private void reConnect() {
            if (mqttClient == null) {
                sendStatus(false);
                if (NetworkUtils.isConnected()) {
                    Intent intent = new Intent("com.ebanswers.reiq");
                    intent.setPackage(pakageName);
                    getApplicationContext().sendBroadcast(intent);
                }
            } else if (!mqttClient.isConnected()) {
                if (currentStatus)
                    sendStatus(false);
                if (NetworkUtils.isConnected()) {
                    Log.d("IqPushService_connect", "connect");
                    try {
                        mqttClient.reconnect();
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                }
            } else {
                if (!currentStatus)
                    sendStatus(true);
            }
    
        }
    
        private void sendMessage(String topic, String message) {
            Intent intent = new Intent("com.ebanswers.message");
            intent.putExtra("topic", topic);
            intent.putExtra("message", message);
            intent.setPackage(pakageName);
            sendBroadcast(intent);
        }
    
        private void sendStatus(boolean isSuccess) {
            currentStatus = isSuccess;
            Intent intent = new Intent("com.ebanswers.status");
            intent.putExtra("isSucess", isSuccess);
            intent.setPackage(pakageName);
            sendBroadcast(intent);
        }
    
        @Override
        public void onDestroy() {
            super.onDestroy();
            subscription.unsubscribe();
            Intent intent = new Intent("com.ebanswers.reiq");
            intent.setPackage(pakageName);
            getApplicationContext().sendBroadcast(intent);
        }
    }
    

    相关文章

      网友评论

          本文标题:Mqtt3.1的常用Api解析

          本文链接:https://www.haomeiwen.com/subject/ueqwsxtx.html