美文网首页
Android MQTT(阿里云IoT和华为IoT)

Android MQTT(阿里云IoT和华为IoT)

作者: Leero丶 | 来源:发表于2020-03-30 19:12 被阅读0次

    理论知识这里就不作说明了,自己去补,下面提供的是阿里云IoT和华为IoT的对接(踩坑)经验:)

    1、添加依赖

    GitHub 链接 paho.mqtt.android
    根目录下 build.gradle

    repositories {
        maven {
            url "https://repo.eclipse.org/content/repositories/paho-snapshots/"
        }
    }
    

    app/build.gradle

    dependencies {
        compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
        compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
    }
    

    2、功能实现

    • 为了提高消息到达率,项目中同时接了JPush(经常挂)、阿里云IoT和华为IoT,所以MQTTService服务中有个mChannel变量(1_JPush,2_阿里云IoT,3_华为IoT)
    • 阿里云IoT的是MQTT协议和华为IoT的是MQTTS协议,所以对接华为IoT时需要添加证书,对
      于pem证书的解析,网上有很多的方法,但是感觉都好麻烦。。。推荐使用下面的库,简单便捷
      implementation 'org.bouncycastle:bcpkix-jdk15on:1.59'
      SSLUtil.getSingleSocketFactory()方法在下面也有提供,可以放心使用
    • 关于MQTT断线重连问题,只有连接成功后,断线重连机制才会生效,对于第一次连接失败的,需要自己重新连接
    • 还有需要注意的是阿里云IoT和华为IoT返回的数据格式问题,华为IoT多包装了一层,在下面的代码里也有注释
    • 心跳间隔时间设置,不要设置太小,阿里云IoT的范围是 [60, 300] 秒,低于60会连接失败,会一直看到 无效客户机标识(2) 的提示
    public class MQTTService extends Service {
    
        private static final String TAG = MQTTService.class.getSimpleName();
    
        private MqttAndroidClient mClient;
        private MqttConnectOptions mConnectOptions;
    
        private String mTopic;
        private int mChannel; 
    
        @Override
        public void onCreate() {
            super.onCreate();
            AhsLog.tagD(TAG, "-- MQTTService onCreate --");
        }
    
        @Override
        public int onStartCommand(Intent intent, int flags, int startId) {
            super.onStartCommand(intent, flags, startId);
            init();
            return START_STICKY;
        }
    
        private void init() {
            String url = SharedPrefs.Builder.getDefault(this).getString(Config.MQTT_ADDRESS);
            String port = SharedPrefs.Builder.getDefault(this).getString(Config.MQTT_PORT);
            String clientId = SharedPrefs.Builder.getDefault(this).getString(Config.MQTT_CLIENT_ID);
            String userName = SharedPrefs.Builder.getDefault(this).getString(Config.MQTT_USER_NAME);
            String password = SharedPrefs.Builder.getDefault(this).getString(Config.MQTT_PASSWORD);
            mTopic = SharedPrefs.Builder.getDefault(this).getString(Config.MQTT_TOPIC);
    
            String broker = url + ":" + port;
            mChannel = SharedPrefs.Builder.getDefault(this).getInteger(Config.MQTT_CHANNEL, -1);
            if (mChannel == 3) {
                broker = "ssl://" + broker;
            } else {
                broker = "tcp://" + broker;
            }
            mClient = new MqttAndroidClient(this, broker, clientId);
            mClient.setCallback(mMqttCallback);
    
            mConnectOptions = new MqttConnectOptions();
            mConnectOptions.setServerURIs(new String[]{broker});
            // 清除缓存
            mConnectOptions.setCleanSession(true);
            // 设置超时时间,单位秒
            mConnectOptions.setConnectionTimeout(10);
            // 心跳包发送间隔,单位秒
            mConnectOptions.setKeepAliveInterval(60);
            // 用户名
            mConnectOptions.setUserName(userName);
            // 密码
            mConnectOptions.setPassword(password.toCharArray());
            // 断线重连
            mConnectOptions.setAutomaticReconnect(true);
    
            // 添加证书 (华为MQTT)
            if (mChannel == 3) {
                SocketFactory socketFactory = null;
                try {
                    InputStream inputStream = App.getInstance().getAssets().open("rootcert.pem");
                    socketFactory = SSLUtil.getSingleSocketFactory(inputStream);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                if (socketFactory != null) {
                    mConnectOptions.setSocketFactory(socketFactory);
                }
            }
    
            connectToServer();
        }
    
        public void connectToServer() {
            if (mClient != null && !mClient.isConnected() && NetworkUtils.isNetworkConnected(this)) {
                try {
                    AhsLog.tagD(TAG, "-- 开始MQTT连接 --");
                    mClient.connect(mConnectOptions, null, mIMqttActionListener);
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * MQTT连接回调
         */
        private IMqttActionListener mIMqttActionListener = new IMqttActionListener() {
    
            @Override
            public void onSuccess(IMqttToken asyncActionToken) {
                AhsLog.tagD(TAG, "--- 连接成功 ---");
                try {
                    // 订阅主题
                    mClient.subscribe(mTopic, 0);
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
    
            @Override
            public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                AhsLog.tagE(TAG, "--- 连接失败 ---");
                AhsLog.tagE(TAG, exception.toString());
                exception.printStackTrace();
                connectToServer();
            }
        };
    
        /**
         * 获取消息回调
         */
        private MqttCallback mMqttCallback = new MqttCallback() {
            @Override
            public void connectionLost(Throwable cause) {
                AhsLog.tagE(TAG, "-- MQTT连接中断 -- " + cause.toString());
                cause.printStackTrace();
            }
    
            @Override
            public void messageArrived(String topic, MqttMessage message) throws JSONException {
                String str = "topic: " + topic + "; qos: " + message.getQos() + "; retained: " + message.isRetained();
                AhsLog.tagD(TAG, str);
                String msg = new String(message.getPayload());
    
                // 华为IOT数据多包装了一层
                if (mChannel == 3) {
                    JSONObject object = new JSONObject(msg);
                    msg = object.getString("paras");
                }
                AhsLog.tagD(TAG, "messageArrived: " + msg);
            }
    
            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
    
            }
        };
    
        @Override
        public void onDestroy() {
            super.onDestroy();
            try {
                mClient.disconnect();
            } catch (MqttException e) {
                e.printStackTrace();
            }
            stopSelf();
        }
    
        @Nullable
        @Override
        public IBinder onBind(Intent intent) {
            return new MQTTServiceBinder();
        }
    
        public class MQTTServiceBinder extends Binder {
            public MQTTService getService() {
                return MQTTService.this;
            }
        }
    
    }
    
        /**
         * 获取单向认证 SSLSocketFactory
         */
        public static SSLSocketFactory getSingleSocketFactory(InputStream interMediateCrtFileInputStream) throws Exception {
            Security.addProvider(new BouncyCastleProvider());
            X509Certificate caCert = null;
    
            BufferedInputStream bis = new BufferedInputStream(interMediateCrtFileInputStream);
            CertificateFactory cf = CertificateFactory.getInstance("X.509");
    
            while (bis.available() > 0) {
                caCert = (X509Certificate) cf.generateCertificate(bis);
            }
            KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
            caKs.load(null, null);
            caKs.setCertificateEntry("ca-certificate", caCert);
            TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509");
            tmf.init(caKs);
            SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
            sslContext.init(null, tmf.getTrustManagers(), null);
            return sslContext.getSocketFactory();
        }
    

    相关文章

      网友评论

          本文标题:Android MQTT(阿里云IoT和华为IoT)

          本文链接:https://www.haomeiwen.com/subject/wpjinctx.html