理论知识这里就不作说明了,自己去补,下面提供的是阿里云IoT和华为IoT的对接(踩坑)经验:)
1、添加依赖
GitHub 链接 paho.mqtt.android
根目录下 build.gradle
repositories {
maven {
url "https://repo.eclipse.org/content/repositories/paho-snapshots/"
}
}
app/build.gradle
dependencies {
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}
2、功能实现
- 为了提高消息到达率,项目中同时接了JPush(经常挂)、阿里云IoT和华为IoT,所以MQTTService服务中有个mChannel变量(1_JPush,2_阿里云IoT,3_华为IoT)
- 阿里云IoT的是MQTT协议和华为IoT的是MQTTS协议,所以对接华为IoT时需要添加证书,对
于pem证书的解析,网上有很多的方法,但是感觉都好麻烦。。。推荐使用下面的库,简单便捷
implementation 'org.bouncycastle:bcpkix-jdk15on:1.59'
SSLUtil.getSingleSocketFactory()方法在下面也有提供,可以放心使用 - 关于MQTT断线重连问题,只有连接成功后,断线重连机制才会生效,对于第一次连接失败的,需要自己重新连接
- 还有需要注意的是阿里云IoT和华为IoT返回的数据格式问题,华为IoT多包装了一层,在下面的代码里也有注释
- 心跳间隔时间设置,不要设置太小,阿里云IoT的范围是 [60, 300] 秒,低于60会连接失败,会一直看到 无效客户机标识(2) 的提示
public class MQTTService extends Service {
private static final String TAG = MQTTService.class.getSimpleName();
private MqttAndroidClient mClient;
private MqttConnectOptions mConnectOptions;
private String mTopic;
private int mChannel;
@Override
public void onCreate() {
super.onCreate();
AhsLog.tagD(TAG, "-- MQTTService onCreate --");
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
super.onStartCommand(intent, flags, startId);
init();
return START_STICKY;
}
private void init() {
String url = SharedPrefs.Builder.getDefault(this).getString(Config.MQTT_ADDRESS);
String port = SharedPrefs.Builder.getDefault(this).getString(Config.MQTT_PORT);
String clientId = SharedPrefs.Builder.getDefault(this).getString(Config.MQTT_CLIENT_ID);
String userName = SharedPrefs.Builder.getDefault(this).getString(Config.MQTT_USER_NAME);
String password = SharedPrefs.Builder.getDefault(this).getString(Config.MQTT_PASSWORD);
mTopic = SharedPrefs.Builder.getDefault(this).getString(Config.MQTT_TOPIC);
String broker = url + ":" + port;
mChannel = SharedPrefs.Builder.getDefault(this).getInteger(Config.MQTT_CHANNEL, -1);
if (mChannel == 3) {
broker = "ssl://" + broker;
} else {
broker = "tcp://" + broker;
}
mClient = new MqttAndroidClient(this, broker, clientId);
mClient.setCallback(mMqttCallback);
mConnectOptions = new MqttConnectOptions();
mConnectOptions.setServerURIs(new String[]{broker});
// 清除缓存
mConnectOptions.setCleanSession(true);
// 设置超时时间,单位秒
mConnectOptions.setConnectionTimeout(10);
// 心跳包发送间隔,单位秒
mConnectOptions.setKeepAliveInterval(60);
// 用户名
mConnectOptions.setUserName(userName);
// 密码
mConnectOptions.setPassword(password.toCharArray());
// 断线重连
mConnectOptions.setAutomaticReconnect(true);
// 添加证书 (华为MQTT)
if (mChannel == 3) {
SocketFactory socketFactory = null;
try {
InputStream inputStream = App.getInstance().getAssets().open("rootcert.pem");
socketFactory = SSLUtil.getSingleSocketFactory(inputStream);
} catch (Exception e) {
e.printStackTrace();
}
if (socketFactory != null) {
mConnectOptions.setSocketFactory(socketFactory);
}
}
connectToServer();
}
public void connectToServer() {
if (mClient != null && !mClient.isConnected() && NetworkUtils.isNetworkConnected(this)) {
try {
AhsLog.tagD(TAG, "-- 开始MQTT连接 --");
mClient.connect(mConnectOptions, null, mIMqttActionListener);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
/**
* MQTT连接回调
*/
private IMqttActionListener mIMqttActionListener = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
AhsLog.tagD(TAG, "--- 连接成功 ---");
try {
// 订阅主题
mClient.subscribe(mTopic, 0);
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
AhsLog.tagE(TAG, "--- 连接失败 ---");
AhsLog.tagE(TAG, exception.toString());
exception.printStackTrace();
connectToServer();
}
};
/**
* 获取消息回调
*/
private MqttCallback mMqttCallback = new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
AhsLog.tagE(TAG, "-- MQTT连接中断 -- " + cause.toString());
cause.printStackTrace();
}
@Override
public void messageArrived(String topic, MqttMessage message) throws JSONException {
String str = "topic: " + topic + "; qos: " + message.getQos() + "; retained: " + message.isRetained();
AhsLog.tagD(TAG, str);
String msg = new String(message.getPayload());
// 华为IOT数据多包装了一层
if (mChannel == 3) {
JSONObject object = new JSONObject(msg);
msg = object.getString("paras");
}
AhsLog.tagD(TAG, "messageArrived: " + msg);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
};
@Override
public void onDestroy() {
super.onDestroy();
try {
mClient.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
stopSelf();
}
@Nullable
@Override
public IBinder onBind(Intent intent) {
return new MQTTServiceBinder();
}
public class MQTTServiceBinder extends Binder {
public MQTTService getService() {
return MQTTService.this;
}
}
}
/**
* 获取单向认证 SSLSocketFactory
*/
public static SSLSocketFactory getSingleSocketFactory(InputStream interMediateCrtFileInputStream) throws Exception {
Security.addProvider(new BouncyCastleProvider());
X509Certificate caCert = null;
BufferedInputStream bis = new BufferedInputStream(interMediateCrtFileInputStream);
CertificateFactory cf = CertificateFactory.getInstance("X.509");
while (bis.available() > 0) {
caCert = (X509Certificate) cf.generateCertificate(bis);
}
KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
caKs.load(null, null);
caKs.setCertificateEntry("ca-certificate", caCert);
TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509");
tmf.init(caKs);
SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
sslContext.init(null, tmf.getTrustManagers(), null);
return sslContext.getSocketFactory();
}
网友评论