关于Mqtt
Mqtt3.1.1中文文档
这是一个客户端服务端架构的发布/订阅模式的消息传输协议;轻巧、开放、简单、规范,易于实现,国内很多关于Mqtt的文章都是讲推送的,其实Mqtt远远不止这个功能,具体详情可自行文档查阅。
场景说明
在Android上,往往具体的业务逻辑要回到Activity中,而Mqtt连接则是建立在service中(考虑到内存资源及退出应用后续操作),同时Mqtt的消息透传回调也是在service中。
通常做法是要写很多类(通常是Boardcast)来实现确保service同Activity组件之间的通信,就像Eclipse paho的Android service一样。
改进方案
本文基于Eclipse的paho框架的java client端,在Android上引入EventBus来做事件分发,简化了代码量,同时方便实现。
以下是我写的一个简单的工具操作类:
public class MqttManager {
// 单例
private static MqttManager mInstance = null;
// 回调
private MqttCallback mCallback;
// Private instance variables
private MqttClient client;
private MqttConnectOptions conOpt;
private boolean clean = true;
private MqttManager() {
mCallback = new MqttCallbackBus();
}
public static MqttManager getInstance() {
if (null == mInstance) {
mInstance = new MqttManager();
}
return mInstance;
}
/** * 释放单例, 及其所引用的资源 */
public static void release() {
try {
if (mInstance != null) {
mInstance.disConnect();
mInstance = null;
}
} catch (Exception e) {
}
}
/**
* 创建Mqtt 连接
*
* @param brokerUrl Mqtt服务器地址(tcp://xxxx:1863)
* @param userName 用户名
* @param password 密码
* @param clientId clientId
* @return
*/
public boolean creatConnect(String brokerUrl, String userName, String password, String clientId) {
boolean flag = false;
String tmpDir = System.getProperty("java.io.tmpdir");
MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir);
try {
// Construct the connection options object that contains connection parameters
// such as cleanSession and LWT
conOpt = new MqttConnectOptions();
conOpt.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
conOpt.setCleanSession(clean);
if (password != null) {
conOpt.setPassword(password.toCharArray());
}
if (userName != null) {
conOpt.setUserName(userName);
}
// Construct an MQTT blocking mode client
client = new MqttClient(brokerUrl, clientId, dataStore);
// Set this wrapper as the callback handler
client.setCallback(mCallback);
flag = doConnect();
} catch (MqttException e) {
Logger.e(e.getMessage());
}
return flag;
}
/**
* 建立连接
*
* @return
*/
public boolean doConnect() {
boolean flag = false;
if (client != null) {
try {
client.connect(conOpt);
Logger.d("Connected to " + client.getServerURI() + " with client ID " + client.getClientId());
flag = true;
} catch (Exception e) {
}
}
return flag;
}
/**
* Publish / send a message to an MQTT server
*
* @param topicName the name of the topic to publish to
* @param qos the quality of service to delivery the message at (0,1,2)
* @param payload the set of bytes to send to the MQTT server
* @return boolean
*/
public boolean publish(String topicName, int qos, byte[] payload) {
boolean flag = false;
if (client != null && client.isConnected()) {
Logger.d("Publishing to topic \"" + topicName + "\" qos " + qos);
// Create and configure a message
MqttMessage message = new MqttMessage(payload); message.setQos(qos);
// Send the message to the server, control is not returned until
// it has been delivered to the server meeting the specified
// quality of service.
try {
client.publish(topicName, message);
flag = true;
} catch (MqttException e) {
}
}
return flag;
}
/**
* Subscribe to a topic on an MQTT server
* Once subscribed this method waits for the messages to arrive from the server
* that match the subscription. It continues listening for messages until the enter key is
* pressed.
*
* @param topicName to subscribe to (can be wild carded)
* @param qos the maximum quality of service to receive messages at for this subscription
* @return boolean
*/
public boolean subscribe(String topicName, int qos) {
boolean flag = false;
if (client != null && client.isConnected()) {
// Subscribe to the requested topic
// The QoS specified is the maximum level that messages will be sent to the client at.
// For instance if QoS 1 is specified, any messages originally published at QoS 2 will
// be downgraded to 1 when delivering to the client but messages published at 1 and 0
// will be received at the same level they were published at.
Logger.d("Subscribing to topic \"" + topicName + "\" qos " + qos);
try {
client.subscribe(topicName, qos);
flag = true;
} catch (MqttException e) {
}
}
return flag;
}
/**
* 取消连接
*
* @throws MqttException
*/
public void disConnect() throws MqttException {
if (client != null && client.isConnected()) {
client.disconnect();
}
}
}
在Mqtt Callback方法中发送Evenet事件:
public class MqttCallbackBus implements MqttCallback {
@Override
public void connectionLost(Throwable cause) {
Logger.e(cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) {
Logger.d(topic + "====" + message.toString());
EventBus.getDefault().post(message);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
}
问题
- 由于EventBus不支持跨进程通信,所以当在service是独立进程时,无法在主进程中接收到EventBus分发的事件。
可以考虑通过广播做中转,即在Mqtt的回调中发送一个广播,在广播中进行事件分发。
- 连接等操作是较费时的,为了避免ANR,应在子线程中完成操作,也可以考虑使用RxJava来简化。
网友评论
//取得SSL的SSLContext实例
SSLContext sslContext = SSLContext.getInstance("TLS");
//取得KeyManagerFactory和TrustManagerFactory的X509密钥管理器实例
TrustManagerFactory trustManager = TrustManagerFactory.getInstance("X509");
//取得BKS密库实例
KeyStore tks = KeyStore.getInstance("BKS");
//加客户端载证书和私钥,通过读取资源文件的方式读取密钥和信任证书
tks.load(MyApplication.getInstance()
.getResources()
.openRawResource(R.raw.client),"123456".toCharArray());
//初始化密钥管理器
trustManager.init(tks);
//初始化SSLContext
sslContext.init(null,trustManager.getTrustManagers(),null);
//生成SSLSocket
factory = sslContext.getSocketFactory();
} catch (Exception e) {
e.printStackTrace();
Log.e("SSL",e.getMessage());
}
factory是这样写的吗?
客户端和服务器订阅相同的主题 然后客户端向这个主题发布消息 客户端会连接中断:connectionLost
很容易出现 代理程序不可用 (3)的情况,或者reconnect很久没有反应
1为什么倒包指引入org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0',我看很多项目都引入了compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1';
2如果反复断网重连的话有什么好的方法重连么,释放资源还需要client.unresoucse之后在disconnect 么;
3.这种方式连接订阅没有用到回调方式,那么我们不用管是么,成功了怎么知道成功呢。。
只有一个MqttCallback,不用ConnectCallBackHandler么,
2. 连接状态变更有回调的, 在那处理就可以了, paho本身并不支持重连
3. 剩下的不知道怎么回答, 看不懂你的提问 :)
05-24 06:04:02.285 2363-2400/com.lichfaker.mqttclientandroid W/OpenGLRenderer: Failed to set EGL_SWAP_BEHAVIOR on surface 0xaf81c120, error=EGL_BAD_MATCH
05-24 06:04:02.301 2363-2363/com.lichfaker.mqttclientandroid W/art: Before Android 4.1, method int android.support.v7.widget.ListViewCompat.lookForSelectablePosition(int, boolean) would have incorrectly overridden the package-private method in android.widget.ListView
05-24 06:04:50.580 2363-3050/com.lichfaker.mqttclientandroid D/MainActivity.java: ╔════════════════════════════════════════════════════════════════════════════════════════
05-24 06:04:50.580 2363-3050/com.lichfaker.mqttclientandroid D/MainActivity.java: ║[ (MainActivity.java:40)#Run ]
05-24 06:04:50.580 2363-3050/com.lichfaker.mqttclientandroid D/MainActivity.java: ╟────────────────────────────────────────────────────────────────────────────────────────
05-24 06:04:50.586 2363-3050/com.lichfaker.mqttclientandroid D/MainActivity.java: ║ isConnected: false
05-24 06:04:50.587 2363-3050/com.lichfaker.mqttclientandroid D/MainActivity.java: ╚══════════════════════════════════════════════════════
这是啥情况?就是连接不了