美文网首页
mqtt 工具类

mqtt 工具类

作者: 一航jason | 来源:发表于2023-12-10 10:27 被阅读0次
    package com.realtop.mqttutils;
    
    import android.content.Context;
    import android.content.SharedPreferences;
    import android.os.Handler;
    import android.os.Looper;
    import android.text.TextUtils;
    import android.util.Log;
    
    import org.eclipse.paho.android.service.MqttAndroidClient;
    import org.eclipse.paho.client.mqttv3.IMqttActionListener;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.IMqttToken;
    import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    
    import java.lang.ref.WeakReference;
    import java.util.Objects;
    import java.util.Set;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.CopyOnWriteArraySet;
    
    public class MQTTHelper implements Runnable {
    
        private final static class Inner {
            public static final MQTTHelper OBJ = new MQTTHelper();
        }
    
        public static MQTTHelper getInstance() {
            return Inner.OBJ;
        }
    
        private MQTTHelper() {
            Log.i(TAG, "MQTTHelper_init_do_nothing: ");
        }
    
        public static final String TAG = "mqtt_helper";
    
        public static final String INTENT_ACTION_MQTT_MSG = "INTENT_ACTION_MQTT_MSG";
        public static final String MQTT_MSG = "MQTT_MSG";
    
        /**
         * 参数部分
         */
        private String receiveSubject = "";
        private String sendSubject = "";
        private String host = "";
        private String clientId = "";
        private String userName = "";
        private String password = "";
    
        private boolean needAutoConnect = true;
    
        private Context mContext;
        private MqttAndroidClient mMqtt;
        private Handler mHandler;
    
        private final ConcurrentHashMap<Integer, WeakReference<OnSendMsgListener>> mListeners
                = new ConcurrentHashMap<>();
    
        private final Set<MQTTReceiver> mReceivers=new CopyOnWriteArraySet<>();
    
        public Context getContext() {
            return mContext;
        }
    
        public static SharedPreferences getConfig(Context context){
            return context.getSharedPreferences("mqtt_config_prefer", Context.MODE_PRIVATE);
        }
    
        public boolean isNeedAutoConnect() {
            return needAutoConnect;
        }
    
        public void setNeedAutoConnect(boolean needAutoConnect) {
            this.needAutoConnect = needAutoConnect;
        }
    
        public String getSendSubject() {
            return sendSubject;
        }
    
        public MQTTHelper setSendSubject(String sendSubject) {
            this.sendSubject = sendSubject;
            return this;
        }
    
        public String getReceiveSubject() {
            return receiveSubject;
        }
    
        public MQTTHelper setReceiveSubject(String receiveSubject) {
            this.receiveSubject = receiveSubject;
            return this;
        }
    
        public String getHost() {
            return host;
        }
    
        public MQTTHelper setHost(String host) {
            this.host = host;
            return this;
        }
    
        public String getClientId() {
            return clientId;
        }
    
        public MQTTHelper setClientId(String clientId) {
            this.clientId = clientId;
            return this;
        }
    
        public String getUserName() {
            return userName;
        }
    
        public MQTTHelper setUserName(String userName) {
            this.userName = userName;
            return this;
        }
    
        public String getPassword() {
            return password;
        }
    
        public MQTTHelper setPassword(String password) {
            this.password = password;
            return this;
        }
    
        public void init(Context context) {
            mContext = context.getApplicationContext();
            mHandler = new Handler(Looper.getMainLooper());
            if (mMqtt != null)
                return;
            // todo id 服务器提前输入的id, 设备出场时刻录
            String id = MUtils.getFactoryMacAddress(context);
            if (TextUtils.isEmpty(id)){
                id = System.currentTimeMillis()+"_id";
            }
            if (TextUtils.isEmpty(clientId)) {
                clientId = id;
            }
            if (TextUtils.isEmpty(receiveSubject)) {
                receiveSubject = "$thing/down/";
            }
            if (TextUtils.isEmpty(sendSubject)) {
                sendSubject = "$thing/up/";
            }
            mMqtt = new MqttAndroidClient(mContext, host, clientId);
            mMqtt.setCallback(new MqttCallback());
            Log.i(TAG, "init_" + clientId + ": " + receiveSubject + ": " + sendSubject);
        }
    
        @Override
        public void run() {
            startConnect();
        }
    
        /**
         * 发送上行信息
         * @param str 消息json
         * @param listener 发送是否成功回调
         */
        public void sendMsg(String str, OnSendMsgListener listener) {
            sendMsg(getSendSubject(), str, listener);
        }
    
        public void sendMsg(String sendSubject, String str, OnSendMsgListener listener) {
            if (mMqtt == null)
                return;
            new Handler(Looper.getMainLooper()).post(() -> {
                try {
                    MqttMessage msg = new MqttMessage();
                    msg.setPayload(str.getBytes());
                    IMqttDeliveryToken publish = mMqtt.publish(sendSubject, msg);
                    Log.i(TAG, "sendMsg_send_msg_id: " + publish.getMessageId()+"; "+str);
    
                    if (listener == null)
                        return;
                    if (publish.getMessageId() == 0) {
                        listener.onSendFinish(false);
                        return;
                    }
                    WeakReference<OnSendMsgListener> reference = new WeakReference<>(listener);
                    mListeners.put(publish.getMessageId(), reference);
                } catch (Exception e) {
                    Log.i(TAG, "sendMsg_error_" + e.getMessage());
                }
            });
        }
    
        /**
         * 关闭自动重连 默认打开
         */
        public void cancelAutoConnect() {
            needAutoConnect = false;
            mHandler.removeCallbacksAndMessages(null);
        }
    
        public void startConnect() {
            if (mMqtt == null)
                return;
            mHandler.removeCallbacksAndMessages(null);
            try {
                MqttConnectOptions options = new MqttConnectOptions();
                options.setAutomaticReconnect(false);
                options.setCleanSession(true);
                options.setConnectionTimeout(60);
                options.setKeepAliveInterval(60);
                options.setMaxInflight(10);
                options.setUserName(userName);
                options.setPassword(password.toCharArray());
                options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_DEFAULT);
                Log.i(TAG, "startConnect_username_passwd: " +
                        userName + "; " + password + "; " + mMqtt.getClientId() + "; " + options.getUserName());
                mMqtt.connect(options, new IMqttActionListener() {
                    @Override
                    public void onSuccess(IMqttToken asyncActionToken) {
                        Log.i(TAG, "onSuccess_");
                        mHandler.removeCallbacksAndMessages(null);
                    }
    
                    @Override
                    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                        Log.i(TAG, "onFailure_" + exception.getMessage());
                        exception.printStackTrace();
                        mHandler.removeCallbacksAndMessages(null);
                        if (needAutoConnect) {
                            mHandler.postDelayed(getInstance(), 16000);
                        }
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
                Log.i(TAG, "onCreate_error_" + e.getMessage());
            }
        }
    
        public void release() {
            mReceivers.clear();
            clearListener();
            cancelAutoConnect();
            mHandler.removeCallbacksAndMessages(null);
            if (mMqtt == null)
                return;
            if (mMqtt.isConnected()) {
                try {
                    mMqtt.disconnect();
                } catch (Exception e) {
                    Log.i(TAG, "onDestroy_error_" + e.getMessage());
                }
            }
            mMqtt = null;
            Log.i(TAG, "release_end_now: ");
        }
    
        public void clearListener() {
            Set<Integer> integers = mListeners.keySet();
            for (Integer index : integers) {
                try {
                    Objects.requireNonNull(mListeners.get(index)).clear();
                } catch (Exception e) {
                    Log.i(TAG, "clearListener_item_error: " + e.getMessage());
                }
            }
            mListeners.clear();
        }
    
        public void registerCallback(MQTTReceiver callback) {
            mReceivers.add(callback);
        }
    
        public void unRegisterCallback(MQTTReceiver registerReceiver) {
            mReceivers.remove(registerReceiver);
        }
    
        public static class MQTTReceiver {
    
            protected void receiveMsg(String msg) {
    
            }
    
        }
    
    
        private class MqttCallback implements MqttCallbackExtended {
    
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                mHandler.removeCallbacksAndMessages(null);
                try {
                    mMqtt.subscribe(receiveSubject, 0);
                    Log.i(TAG, "connectComplete_start_subscribe");
                } catch (Exception e) {
                    Log.i(TAG, "onSuccess_error_" + e.getMessage());
                }
            }
    
            @Override
            public void connectionLost(Throwable cause) {
                try {
                    if (mMqtt != null)
                        mMqtt.unsubscribe(receiveSubject);
                    Log.i(TAG, "connectionLost_unsubscribe");
                } catch (Exception e) {
                    Log.i(TAG, "connectionLost_error_" + e.getMessage());
                }
                mHandler.removeCallbacksAndMessages(null);
                if (needAutoConnect) {
                    mHandler.postDelayed(getInstance(), 68000);
                }
            }
    
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                try {
                    byte[] payload = message.getPayload();
                    String msg = new String(payload);
                    Log.i(TAG, "messageArrived_msg_" + msg);
                    for (MQTTReceiver item : mReceivers) {
                        item.receiveMsg(msg);
                    }
                } catch (Exception e) {
                    Log.i(TAG, "messageArrived_error:" + e.getMessage());
                }
            }
    
            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                try {
                    WeakReference<OnSendMsgListener> reference = mListeners.get(token.getMessageId());
                    if (reference!=null){
                        OnSendMsgListener onSendMsgListener = reference.get();
                        if (onSendMsgListener != null) {
                            onSendMsgListener.onSendFinish(token.isComplete());
                        }
                        reference.clear();
                        mListeners.remove(token.getMessageId());
                    }
                } catch (Exception e) {
                    Log.i(TAG, "deliveryComplete_error: " + e.getMessage());
                }
    
                Log.i(TAG, "deliveryComplete_send_msg_id: "
                        + token.getMessageId() + "; " + token.isComplete());
            }
    
        }
    
    
        public interface OnSendMsgListener {
            void onSendFinish(boolean isComplete);
        }
    
    
    
    
    }
    

    相关文章

      网友评论

          本文标题:mqtt 工具类

          本文链接:https://www.haomeiwen.com/subject/fubegdtx.html