Paho MQTT Android 源码分析 — MqttSer

作者: braincs | 来源:发表于2021-11-07 22:40 被阅读0次

Android的service方式的MQTT的客户端的实现

MqttService的API和IMqttAsyncClient 高度一致,只是在Android平台上进行了一定适配:

  1. invocationContext 字符串类型,用于标记application 上下文

  2. activityToken 字符串类型,用于标记Activity回调方法或上下文相关的数据

大部分代码为了实现:多客户端连接,client handle用于被high level APIs访问

使用

  • Activity调用:bind, BIND_AUTO_CREATE flag

设计:

  • 操作大部分为异步操作

  • 通过广播+intent方式 回传消息给Activity

    • intent中的Action可以使用MqttServiceConstants.CALLBACK_TO_ACTIVITY,方便进行intent过滤

    • Extra data 用于其余的数据传输

    • 具体详见下表

    • Name Data Type Value Operations used for
      {@link MqttServiceConstants#CALLBACK_CLIENT_HANDLE MqttServiceConstants.CALLBACK_CLIENT_HANDLE} String The clientHandle identifying the client which initiated this operation All operations
      {@link MqttServiceConstants#CALLBACK_STATUS MqttServiceConstants.CALLBACK_STATUS} Serializable An {@link Status} value indicating success or otherwise of the operation All operations
      {@link MqttServiceConstants#CALLBACK_ACTIVITY_TOKEN MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN} String the activityToken passed into the operation All operations
      {@link MqttServiceConstants#CALLBACK_INVOCATION_CONTEXT MqttServiceConstants.CALLBACK_INVOCATION_CONTEXT} String the invocationContext passed into the operation All operations
      {@link MqttServiceConstants#CALLBACK_ACTION MqttServiceConstants.CALLBACK_ACTION} String one of{@link MqttServiceConstants#SEND_ACTION MqttServiceConstants.SEND_ACTION}{@link MqttServiceConstants#UNSUBSCRIBE_ACTION MqttServiceConstants.UNSUBSCRIBE_ACTION}{@link MqttServiceConstants#SUBSCRIBE_ACTION MqttServiceConstants.SUBSCRIBE_ACTION}{@link MqttServiceConstants#DISCONNECT_ACTION MqttServiceConstants.DISCONNECT_ACTION}{@link MqttServiceConstants#CONNECT_ACTION MqttServiceConstants.CONNECT_ACTION}{@link MqttServiceConstants#MESSAGE_ARRIVED_ACTION MqttServiceConstants.MESSAGE_ARRIVED_ACTION}{@link MqttServiceConstants#MESSAGE_DELIVERED_ACTION MqttServiceConstants.MESSAGE_DELIVERED_ACTION}{@link MqttServiceConstants#ON_CONNECTION_LOST_ACTION MqttServiceConstants.ON_CONNECTION_LOST_ACTION} All operations
      {@link MqttServiceConstants#CALLBACK_ERROR_MESSAGE MqttServiceConstants.CALLBACK_ERROR_MESSAGE} String A suitable error message (taken from the relevant exception where possible) All failing operations
      {@link MqttServiceConstants#CALLBACK_ERROR_NUMBER MqttServiceConstants.CALLBACK_ERROR_NUMBER} int A suitable error code (taken from the relevant exception where possible) All failing operations
      {@link MqttServiceConstants#CALLBACK_EXCEPTION_STACK MqttServiceConstants.CALLBACK_EXCEPTION_STACK} String The stacktrace of the failing call The Connection Lost event
      {@link MqttServiceConstants#CALLBACK_MESSAGE_ID MqttServiceConstants.CALLBACK_MESSAGE_ID} String The identifier for the message in the message store, used by the Activity to acknowledge the arrival of the message, so that the service may remove it from the store The Message Arrived event
      {@link MqttServiceConstants#CALLBACK_DESTINATION_NAME MqttServiceConstants.CALLBACK_DESTINATION_NAME} String The topic on which the message was received The Message Arrived event
      {@link MqttServiceConstants#CALLBACK_MESSAGE_PARCEL MqttServiceConstants.CALLBACK_MESSAGE_PARCEL} Parcelable The new message encapsulated in Android Parcelable format as a {@link ParcelableMqttMessage} The Message Arrived event

onCreate 创建

创建的时候,创建一下两个实例:

  • MqttServiceBinder 创建Binder以便调用方给service发指令

  • DatabaseMessageStore 持久化接受的消息,直到送达到Service的调用方,使用SQLite数据库进行存储

    参考DatabaseMessageStore

@Override
public void onCreate() {
    super.onCreate();

    // 创建Binder以便调用方给service发指令
    mqttServiceBinder = new MqttServiceBinder(this);

    // 持久化接受的消息,直到送达到Service的调用方
    messageStore = new DatabaseMessageStore(this, this);
}

MqttServiceBinder:在绑定服务的时候,Service传递给Activity的结构体,service和string类型的activitytoken的关联关系。

class MqttServiceBinder extends Binder {

 private MqttService mqttService;
 private String activityToken;
 //...

绑定:

@Override
public IBinder onBind(Intent intent) {
    // What we pass back to the Activity on binding -
    // a reference to ourself, and the activityToken
    // we were given when started
    String activityToken = intent.getStringExtra(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN);
    mqttServiceBinder.setActivityToken(activityToken);
    return mqttServiceBinder;
}

启动:

@Override
public int onStartCommand(final Intent intent, int flags, final int startId) {
    // run till explicitly stopped, restart when
    // process restarted
    // 注册广播监听
    registerBroadcastReceivers();

    return START_STICKY;
}

注册广播监听

@SuppressWarnings("deprecation")
private void registerBroadcastReceivers() {
    //注册网络连接状态监听
    if (networkConnectionMonitor == null) {
        networkConnectionMonitor = new NetworkConnectionIntentReceiver();
        registerReceiver(networkConnectionMonitor, new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION));
    }

    // 可忽略,低版本后台数据监控
    if (Build.VERSION.SDK_INT < 14 /**Build.VERSION_CODES.ICE_CREAM_SANDWICH**/) {
        // Support the old system for background data preferences
        ConnectivityManager cm = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE);
        backgroundDataEnabled = cm.getBackgroundDataSetting();
        if (backgroundDataPreferenceMonitor == null) {
            backgroundDataPreferenceMonitor = new BackgroundDataPreferenceReceiver();
            registerReceiver(backgroundDataPreferenceMonitor, new IntentFilter(ConnectivityManager.ACTION_BACKGROUND_DATA_SETTING_CHANGED));
        }
    }
}

网络连接的广播

  • 获取PowerManager
  • 获取WakeLock
  • 如果上线了,就再次连接
  • 如果下线了,通知所有客户端下线了
private class NetworkConnectionIntentReceiver extends BroadcastReceiver {

    @Override
    @SuppressLint("Wakelock")
    public void onReceive(Context context, Intent intent) {
        traceDebug(TAG, "Internal network status receive.");
        // we protect against the phone switching off
        // by requesting a wake lock - we request the minimum possible wake
        // lock - just enough to keep the CPU running until we've finished
        PowerManager pm = (PowerManager) getSystemService(POWER_SERVICE);
        WakeLock wl = pm.newWakeLock(PowerManager.PARTIAL_WAKE_LOCK, "MQTT");
        wl.acquire();
        traceDebug(TAG, "Reconnect for Network recovery.");
        if (isOnline()) {
            traceDebug(TAG, "Online,reconnect.");
            // we have an internet connection - have another try at
            // connecting
            reconnect();
        } else {
            notifyClientsOffline();
        }

        wl.release();
    }
}

连接状态

public boolean isOnline() {
    // 获取连接Service
    ConnectivityManager cm = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE);
    // 获取网络状态
    NetworkInfo networkInfo = cm.getActiveNetworkInfo();
    //noinspection RedundantIfStatement
    // 如果连接则返回true
    if (networkInfo != null && networkInfo.isAvailable() && networkInfo.isConnected() && backgroundDataEnabled) {
        return true;
    }

    return false;
}

通知离线

/**
* Notify clients we're offline
* 调用 MqttConnection的 offline方法
*/
private void notifyClientsOffline() {
    for (MqttConnection connection : connections.values()) {
        connection.offline();
    }
}

参考MqttConnection

reconnect重连

/**
 * Request all clients to reconnect if appropriate
 */
void reconnect() {
    traceDebug(TAG, "Reconnect to server, client size=" + connections.size());
    // 遍历所有的 Connections 进行重连
    for (MqttConnection client : connections.values()) {
        traceDebug("Reconnect Client:", client.getClientId() + '/' + client.getServerURI());
        if (this.isOnline()) {
            client.reconnect();
        }
    }
}

调用 MqttConnect中的reconnect方法进行重连 参考 reconnect重连

onDestroy 销毁

销毁:

  • 断开连接
  • 释放资源
  • 注销广播监听
// 并发Hash表,对应存储了所有的connections
// ConcurrentHashMap <clientHandle, client>
private Map<String/* clientHandle */, MqttConnection/* client */> connections = new ConcurrentHashMap<>();

@Override
public void onDestroy() {
    // 断开连接
    for (MqttConnection client : connections.values()) {
        client.disconnect(null, null);
    }

    // 释放Binder
    if (mqttServiceBinder != null) {
        mqttServiceBinder = null;
    }

    // 注销广播监听
    unregisterBroadcastReceivers();

    // 关闭消息存储(数据库)
    if (this.messageStore != null) {
        this.messageStore.close();
    }

    super.onDestroy();
}
//...

// 注销广播
private void unregisterBroadcastReceivers() {
    // 注销 网络连接 广播
    if (networkConnectionMonitor != null) {
        unregisterReceiver(networkConnectionMonitor);
        networkConnectionMonitor = null;
    }

    if (Build.VERSION.SDK_INT < 14 /**Build.VERSION_CODES.ICE_CREAM_SANDWICH**/) {
        if (backgroundDataPreferenceMonitor != null) {
            unregisterReceiver(backgroundDataPreferenceMonitor);
        }
    }
}

关于断联缓存消息

/**
 * Sets the DisconnectedBufferOptions for this client
 * 为 client 设置断联缓存
 *
 * @param clientHandle identifier for the client
 * @param bufferOpts   the DisconnectedBufferOptions for this client 断联缓存配置
 */
public void setBufferOpts(String clientHandle, DisconnectedBufferOptions bufferOpts) {
    MqttConnection client = getConnection(clientHandle);
    client.setBufferOpts(bufferOpts);
}

/**
* 获取缓存消息数量
* @param clientHandle client的标识
* @return 数量
*/
public int getBufferedMessageCount(String clientHandle) {
    MqttConnection client = getConnection(clientHandle);
    return client.getBufferedMessageCount();
}

// 从缓存队列中获取缓存的 MqttMessage
public MqttMessage getBufferedMessage(String clientHandle, int bufferIndex) {
    MqttConnection client = getConnection(clientHandle);
    return client.getBufferedMessage(bufferIndex);
}

// 删除缓存消息
public void deleteBufferedMessage(String clientHandle, int bufferIndex) {
    MqttConnection client = getConnection(clientHandle);
    client.deleteBufferedMessage(bufferIndex);
}

DisconnectedBufferOptions

在mqttv3包中:

// 断联配置项:在断开连接后消息的缓存
public class DisconnectedBufferOptions {
    
    /**
     * The default size of the disconnected buffer
     * 默认值 5000 大小
     */
    public static final int DISCONNECTED_BUFFER_SIZE_DEFAULT = 5000;
    // 默认不开启断联buffer
    public static final boolean DISCONNECTED_BUFFER_ENABLED_DEFAULT = false;
    // 默认不开启持久化
    public static final boolean PERSIST_DISCONNECTED_BUFFER_DEFAULT = false;
    // 默认不删除历史消息
    public static final boolean DELETE_OLDEST_MESSAGES_DEFAULT = false;

    private int bufferSize = DISCONNECTED_BUFFER_SIZE_DEFAULT;
    private boolean bufferEnabled = DISCONNECTED_BUFFER_ENABLED_DEFAULT;
    private boolean persistBuffer = PERSIST_DISCONNECTED_BUFFER_DEFAULT;
    private boolean deleteOldestMessages = DELETE_OLDEST_MESSAGES_DEFAULT;
    

功能方法

回传数据(activity)

callbackToActivity

  • 通过 LocalBroadcastManager 广播 intent 的方式进行传输

具体实现:

void callbackToActivity(String clientHandle, Status status, Bundle dataBundle) {
    // Don't call traceDebug, as it will try to callbackToActivity leading
    // to recursion.
    // 创建 intent 进行广播,intent名字为:MqttService.TAG + ".callbackToActivity" + "." + VERSION;
    Intent callbackIntent = new Intent(MqttServiceConstants.CALLBACK_TO_ACTIVITY);
    // 添加 client标识
    if (clientHandle != null) {
        callbackIntent.putExtra(MqttServiceConstants.CALLBACK_CLIENT_HANDLE, clientHandle);
    }
    // 添加 消息状态
    callbackIntent.putExtra(MqttServiceConstants.CALLBACK_STATUS, status);
    // 添加 附带bundle
    if (dataBundle != null) {
        callbackIntent.putExtras(dataBundle);
    }
    // 通过LocalBroadcastManager进行广播
    LocalBroadcastManager.getInstance(this).sendBroadcast(callbackIntent);
}

client创建/获取

MqttService中会对所有连接进行维护,对MqttConnection 创建了ConcurrentHashMap<String, MqttConnection>

private Map<String/* clientHandle */, MqttConnection/* client */> connections = new ConcurrentHashMap<>();
/**
* Get an MqttConnection object to represent a connection to a server
*
* @param serverURI   specifies the protocol, host name and port to be used to connect to an MQTT server 服务端地址
* @param clientId    specifies the name by which this connection should be identified to the server 客户端名字
* @param contextId   specifies the app context info to make a difference between apps 应用上下文名字
* @param persistence specifies the persistence layer to be used with this client 持久化
* @return a string to be used by the Activity as a "handle" for this 返回:MqttConnection的唯一表示
* MqttConnection
*/
public String getClient(String serverURI, String clientId, String contextId, MqttClientPersistence persistence) {
    // 连接的唯一标识 serverURI + ":" + clientId + ":" + contextId
    String clientHandle = serverURI + ":" + clientId + ":" + contextId;
    // 已存在则不进行存入,重复的时候无法覆盖
    if (!connections.containsKey(clientHandle)) {
        MqttConnection client = new MqttConnection(this, serverURI, clientId, persistence, clientHandle);
        connections.put(clientHandle, client);
    }
    return clientHandle;
}

注意

  • 重复的创建是不生效的,维护了一个标识对应的一个单例

connect连接

创建单一client连接 (区分于reconnect,reconnect是所有的都重连)

  1. 查表获取MqttConnection
  2. 调用 MqttConnection 的 connect方法创建连接 [详细参考](#connect 连接)
/**
 * Connect to the MQTT server specified by a particular client
 * 连接到server
 *
 * @param clientHandle   identifies the MqttConnection to use client的唯一标识
 * @param connectOptions the MQTT connection options to be used 连接选项
 * @param activityToken  arbitrary identifier to be passed back to the Activity activity标识
 * @throws MqttSecurityException thrown if there is a security exception
 * @throws MqttException         thrown for all other MqttExceptions
 */
public void connect(String clientHandle, MqttConnectOptions connectOptions, String activityToken)
        throws MqttException {
    // 根据client唯一标识获取连接 查Hashmap表方式
    MqttConnection client = getConnection(clientHandle);
    // 创建连接 调用MqttConnection的连接
    client.connect(connectOptions, null, activityToken);
}

disconnect断开连接

单一client的断开连接

  1. 查表获取 MqttConnection
  2. 调用MqttConnection.disconnect
  3. 哈希表移除MqttConnection
  4. 停止当前service (这里不符合一对多的设计,hashmap无用了
/**
 * Disconnect from the server
 * 断开指定client的连接
 *
 * @param clientHandle      identifies the MqttConnection to use client唯一标识
 * @param invocationContext arbitrary data to be passed back to the application 调用方的context
 * @param activityToken     arbitrary identifier to be passed back to the Activity 调用方activity标识
 */
public void disconnect(String clientHandle, String invocationContext, String activityToken) {
    // 获取连接
    MqttConnection client = getConnection(clientHandle);
    // 调用 MqttConnection.disconnect 
    client.disconnect(invocationContext, activityToken);
    // 哈希表中移除
    connections.remove(clientHandle);


    // the activity has finished using us, so we can stop the service
    // the activities are bound with BIND_AUTO_CREATE, so the service will
    // remain around until the last activity disconnects
    // 停止service,这里感觉有问题,service和activity是一对一的时候,OK
    // 但如果是 service中有多个client的时候此时不能stop
    stopSelf();
}
// 带timeout方法的 断开连接
public void disconnect(String clientHandle, long quiesceTimeout, String invocationContext, String activityToken) {
    MqttConnection client = getConnection(clientHandle);
    client.disconnect(quiesceTimeout, invocationContext, activityToken);
    connections.remove(clientHandle);

    // the activity has finished using us, so we can stop the service
    // the activities are bound with BIND_AUTO_CREATE, so the service will
    // remain around until the last activity disconnects
    stopSelf();
}

isConnected连接状态

单一client的连接状态判断

public boolean isConnected(String clientHandle) {
    MqttConnection client = getConnection(clientHandle);
    return client.isConnected();
}

publish发布

提供两种类型的publish消息的方式

  1. publish 中指定 payload qos
  2. publish MqttMessage (封装了payload qos retained)
public IMqttDeliveryToken publish(String clientHandle, String topic, byte[] payload, int qos, boolean retained, String invocationContext,
        String activityToken) {
    MqttConnection client = getConnection(clientHandle);
    return client.publish(topic, payload, qos, retained, invocationContext, activityToken);
}

public IMqttDeliveryToken publish(String clientHandle, String topic, MqttMessage message, String invocationContext, String activityToken) {
    MqttConnection client = getConnection(clientHandle);
    return client.publish(topic, message, invocationContext, activityToken);
}

subscribe订阅

提供三种subscribe消息方式

  1. 单一topic和qos
  2. 批量topic 和qos
  3. 批量topicFilter 和qos,处理返回消息的回调
// 单一topic 和qos
public void subscribe(String clientHandle, String topic, int qos, String invocationContext, String activityToken) {
    MqttConnection client = getConnection(clientHandle);
    client.subscribe(topic, qos, invocationContext, activityToken);
}

// 批量topic 和qos
public void subscribe(String clientHandle, String[] topic, int[] qos, String invocationContext, String activityToken) {
    MqttConnection client = getConnection(clientHandle);
    client.subscribe(topic, qos, invocationContext, activityToken);
}

// 批量topicFilter 和qos,处理返回消息的回调
public void subscribe(String clientHandle, String[] topicFilters, int[] qos, String invocationContext, String activityToken,
        IMqttMessageListener[] messageListeners) {
    MqttConnection client = getConnection(clientHandle);
    client.subscribe(topicFilters, qos, invocationContext, activityToken, messageListeners);
}

unsubscribe取消订阅

提供两种方式的取消订阅方式

  1. 单条topic
  2. 多条topic
//单条topic
public void unsubscribe(String clientHandle, final String topic, String invocationContext, String activityToken) {
    MqttConnection client = getConnection(clientHandle);
    client.unsubscribe(topic, invocationContext, activityToken);
}

//取消多条
public void unsubscribe(String clientHandle, final String[] topic, String invocationContext, String activityToken) {
    MqttConnection client = getConnection(clientHandle);
    client.unsubscribe(topic, invocationContext, activityToken);
}

获取待发送的消息token

/**
 * Get tokens for all outstanding deliveries for a client
 * 获取一个client对应的所有待发送的消息的token
 *
 * @param clientHandle identifies the MqttConnection
 * @return an array (possibly empty) of tokens
 */
public IMqttDeliveryToken[] getPendingDeliveryTokens(String clientHandle) {
    MqttConnection client = getConnection(clientHandle);
    return client.getPendingDeliveryTokens();
}

消息送达ack

在收到消息后,会返回给服务端消息送达ack确认

具体逻辑:

  1. 发送消息持久化中删除对应的消息 详见:DatabaseMessageStore
  2. 返回确认消息 Status.OK = 0,Status.ERROR = 1
/**
 * Called by the Activity when a message has been passed back to the
 * application
 * 主动返回ack:消息送达确认消息
 *
 * @param clientHandle identifier for the client which received the message
 * @param id           identifier for the MQTT message
 * @return {@link Status}
 */
public Status acknowledgeMessageArrival(String clientHandle, String id) {
    if (messageStore.discardArrived(clientHandle, id)) {
        return Status.OK;
    } else {
        return Status.ERROR;
    }
}

MqttConnection

离线vs断线

offline和 disconnected

  • offline: 设备离线网络不可用;
  • disconnected:断线,设备网络可用,连接中断
/**
* Receive notification that we are offline<br>
* if cleanSession is true, we need to regard this as a disconnection
* 在收到Android 网络状态广播触发
* 如果cleanSession=true的时候,此方法不执行有效内容
*/
void offline() {
    // cleanSession = true 无效
    if (!disconnected && !cleanSession) {
        Exception e = new Exception("Android offline");
        connectionLost(e);
    }
}

connectionLost

public void connectionLost(Throwable why) {
    service.traceDebug(TAG, "connectionLost(" + why.getMessage() + ")");
    disconnected = true;
    try {
        // 在没有配置自动重连的情况下,调用MqttAsyncClient的断开连接
        if (!this.connectOptions.isAutomaticReconnect()) {
            myClient.disconnect(null, new IMqttActionListener() {

                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    // No action
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken,
                        Throwable exception) {
                    // No action
                }
            });
        } else {
            // Using the new Automatic reconnect functionality.
            // We can't force a disconnection, but we can speed one up
            //
            alarmPingSender.schedule(100);

        }
    } catch (Exception e) {
        // ignore it - we've done our best
    }

    Bundle resultBundle = new Bundle();
    resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION, MqttServiceConstants.ON_CONNECTION_LOST_ACTION);
    if (why != null) {
        resultBundle.putString(MqttServiceConstants.CALLBACK_ERROR_MESSAGE, why.getMessage());
        if (why instanceof MqttException) {
            resultBundle.putSerializable(MqttServiceConstants.CALLBACK_EXCEPTION, why);
        }
        resultBundle.putString(MqttServiceConstants.CALLBACK_EXCEPTION_STACK, Log.getStackTraceString(why));
    }
    service.callbackToActivity(clientHandle, Status.OK, resultBundle);
    // client has lost connection no need for wake lock
    releaseWakeLock();
}

reconnect重连

在线的情况下,会触发reconnect() 方法进行重连

  • 如果:设置 isAutomaticReconnect —— MqttAsyncClient.reconnect 重连机制
  • 如果:设置 !cleanSession —— MqttAsyncClient.connect 连接机制
  • 否则:没有,即 cleanSession 的时候,需要手动触发。
synchronized void reconnect() {

    if (myClient == null) {
        service.traceError(TAG, "Reconnect myClient = null. Will not do reconnect");
        return;
    }

    if (isConnecting) {
        service.traceDebug(TAG, "The client is connecting. Reconnect return directly.");
        return;
    }

    if (!service.isOnline()) {
        service.traceDebug(TAG, "The network is not reachable. Will not do reconnect");
        return;
    }

    // 自动重连
    if (connectOptions.isAutomaticReconnect()) {
        //The Automatic reconnect functionality is enabled here
        Log.i(TAG, "Requesting Automatic reconnect using New Java AC");
        // 通过Bundle进行记录
        final Bundle resultBundle = new Bundle();
        resultBundle.putString(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN, reconnectActivityToken);
        resultBundle.putString(MqttServiceConstants.CALLBACK_INVOCATION_CONTEXT, null);
        resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION, MqttServiceConstants.CONNECT_ACTION);
        try {
            // 调用 MqttAsyncClient 的重连
            myClient.reconnect();
        } catch (MqttException ex) {
            Log.e(TAG, "Exception occurred attempting to reconnect: " + ex.getMessage());
            setConnectingState(false);
            // 将bundle发送给调用方
            handleException(resultBundle, ex);
        }
    } else if (disconnected && !cleanSession) {
        // 断联且不清除session
        // use the activityToke the same with action connect
        service.traceDebug(TAG, "Do Real Reconnect!");
        final Bundle resultBundle = new Bundle();
        resultBundle.putString(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN, reconnectActivityToken);
        resultBundle.putString(MqttServiceConstants.CALLBACK_INVOCATION_CONTEXT, null);
        resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION, MqttServiceConstants.CONNECT_ACTION);

        try {

            IMqttActionListener listener = new MqttConnectionListener(resultBundle) {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    // since the device's cpu can go to sleep, acquire a
                    // wakelock and drop it later.
                    service.traceDebug(TAG, "Reconnect Success!");
                    service.traceDebug(TAG, "DeliverBacklog when reconnect.");
                    resultBundle.putBoolean(MqttServiceConstants.SESSION_PRESENT, asyncActionToken.getSessionPresent());
                    doAfterConnectSuccess(resultBundle);
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    resultBundle.putString(MqttServiceConstants.CALLBACK_ERROR_MESSAGE, exception.getLocalizedMessage());
                    resultBundle.putSerializable(MqttServiceConstants.CALLBACK_EXCEPTION, exception);
                    service.callbackToActivity(clientHandle, Status.ERROR, resultBundle);

                    doAfterConnectFail(resultBundle);
                }
            };

            // 调用connect机制,走回调,将状态返回给调用方
            myClient.connect(connectOptions, null, listener);
            setConnectingState(true);
        } catch (MqttException e) {
            service.traceError(TAG, "Cannot reconnect to remote server." + e.getMessage());
            setConnectingState(false);
            handleException(resultBundle, e);
        } catch (Exception e) {
            /*  TODO: Added Due to: https://github.com/eclipse/paho.mqtt.android/issues/101
                For some reason in a small number of cases, myClient is null here and so
    a NullPointer Exception is thrown. This is a workaround to pass the exception
    up to the application. myClient should not be null so more investigation is
    required.
*/
            service.traceError(TAG, "Cannot reconnect to remote server." + e.getMessage());
            setConnectingState(false);
            MqttException newEx = new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR, e.getCause());
            handleException(resultBundle, newEx);
        }
    }
}

相关文章

网友评论

    本文标题:Paho MQTT Android 源码分析 — MqttSer

    本文链接:https://www.haomeiwen.com/subject/dduyzltx.html