前言
项目中有用到mqtt,碰巧没人负责这一块,所以毛遂自荐就看了一波,下面是一些简单的使用记录,写得不好,仅供参考。若没有mqtt服务器的朋友,建议先建一个mqtt服务,不然看不到效果。
什么是Mqtt?
MQTT 的全称为 Message Queue Telemetry Transport,是轻量级基于代理的发布/订阅的消息传输协议,它可以通过很少的代码和带宽和远程设备连接。例如通过卫星和代理连接,通过拨号和医疗保健提供者连接,以及在一些自动化或小型设备上,而且由于小巧,省电,协议开销小和能高效的向一和多个接收者传递信息,故同样适用于称动应用设备上。MQTT就包含了以下一些特点:
- 实现简单
- 提供数据传输的 QoS
- 轻量、占用带宽低
- 可传输任意类型的数据
- 可保持的会话(session)
Android 下如何使用Mqtt?
在Android中使用Mqtt可以分为6个步骤:
- 导入mqtt包;
- 配置MqttConnectOptions;
- 调用connect并将配置好的参数写入;
- 通过指定的消息进行消息订阅;
- 向订阅的topic中发布消息;
- 通过mqttCallBack的回调对接收到的消息进行处理;
// mqtt 包导入
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
导入类:
public class MQTTManager {
private static final String TAG = "MQTTManager";
public static final String SERVER_HOST = "tcp://52.80.116.245:1883";
private String clientid = "2df8aabfb8b6088953664f413a446bbc";
private static MQTTManager mqttManager=null;
private MqttClient client;
private MqttConnectOptions options;
private Context mContext;
private MessageHandlerCallBack callBack;
private MQTTManager(Context context){
mContext = context;
clientid+=MqttClient.generateClientId();
}
/**
* 获取一个MQTTManager单例
* @param context
* @return 返回一个MQTTManager的实例对象
*/
public static MQTTManager getInstance(Context context) {
Log.d(TAG,"mqttManager="+mqttManager);
if (mqttManager==null) {
mqttManager=new MQTTManager(context);
synchronized (Object.class) {
Log.d(TAG,"synchronized mqttManager="+mqttManager);
if (mqttManager!=null) {
return mqttManager;
}
}
}else {
Log.d(TAG,"else mqttManager="+mqttManager);
return mqttManager;
}
return null;
}
/**
* 连接服务器
*/
public void connect(){
Log.d(TAG,"开始连接MQtt");
try {
// host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
client = new MqttClient(SERVER_HOST, "2df8aabfb8b6088953664f413a446bbc", new MemoryPersistence());
// MQTT的连接设置
options = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
// 设置连接的用户名
options.setUserName("7302");
// 设置连接的密码
options.setPassword("64ec6f32366ccb80f0dacc804546d62e623e4b72".toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(30);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(30);
// 设置回调
// MqttTopic topic = client.getTopic(TOPIC);
//setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
// options.setWill(topic, "close".getBytes(), 2, true);
SSLSocketFactory sslSocketFactory = null;
/* try {
sslSocketFactory = sslContextFromStream(mContext.getAssets().open("server.pem")).getSocketFactory();
} catch (Exception e) {
e.printStackTrace();
}
options.setSocketFactory(sslSocketFactory);*/
client.setCallback(new PushCallback());
client.connect(options);
Log.d(TAG,"ClientId="+client.getClientId());
} catch (MqttException e) {
e.printStackTrace();
Log.e(TAG, "connect: " + e );
}
}
/**
* 订阅消息
* @param topic 订阅消息的主题
*/
public void subscribeMsg(String topic,int qos){
if (client!=null) {
int[] Qos = {qos};
String[] topic1 = {topic};
try {
client.subscribe(topic1, Qos);
Log.d(TAG,"开始订阅topic="+topic);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
/**
* 发布消息
* @param topic 发布消息主题
* @param msg 消息体
* @param isRetained 是否为保留消息
*/
public void publish(String topic,String msg,boolean isRetained,int qos) {
try {
if (client!=null) {
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(isRetained);
message.setPayload(msg.getBytes());
client.publish(topic, message);
Log.d(TAG,"topic="+topic+"--msg="+msg+"--isRetained"+isRetained);
}
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
int count=0;
/**
* 发布和订阅消息的回调
*
*/
public class PushCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
Log.e(TAG, "connectionLost: " + cause );
if (count<5) {
count++;//5次重连
Log.d(TAG,"断开连接,重新连接"+count+"次"+cause);
try {
client.close();
connect();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
/**
* 发布消息的回调
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
//publish后会执行到这里
Log.d(TAG,"发布消息成功的回调"+token.isComplete());
}
/**
* 接收消息的回调方法
*/
@Override
public void messageArrived(final String topicName, final MqttMessage message)
throws Exception {
//subscribe后得到的消息会执行到这里面
Log.d(TAG,"接收消息=="+new String(message.getPayload()));
if (callBack!=null) {
callBack.messageSuccess(topicName,new String(message.getPayload()));
}
}
}
/**
* 设置接收消息的回调方法
* @param callBack
*/
public void setMessageHandlerCallBack(MessageHandlerCallBack callBack){
this.callBack = callBack;
}
public MessageHandlerCallBack getMessageHandlerCallBack(){
if (callBack!=null) {
return callBack;
}
return null;
}
/**
* 断开链接
*/
public void disconnect(){
if (client!=null&&client.isConnected()) {
try {
client.disconnect();
mqttManager=null;
} catch (MqttException e) {
e.printStackTrace();
}
}
}
/**
* 释放资源
*/
public void release(){
if (mqttManager!=null) {
mqttManager=null;
}
}
/**
* 判断服务是否连接
* @return
*/
public boolean isConnected(){
if (client!=null) {
return client.isConnected();
}
return false;
}
public SSLContext sslContextFromStream(InputStream inputStream) throws Exception {
CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");
Certificate certificate = certificateFactory.generateCertificate(inputStream);
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
keyStore.load(null, null);
keyStore.setCertificateEntry("ca", certificate);
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init(keyStore);
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, trustManagerFactory.getTrustManagers(), null);
return sslContext;
}
}
调用:
MQTTManager instance = MQTTManager.getInstance(mContext);
instance.setMessageHandlerCallBack(new MessageHandlerCallBack() {
@Override
public void messageSuccess(String topicName, String s) {
Log.e(TAG, "messageSuccess: " + s);
}
});
instance.connect();
连接成功后,就可以实现和服务端消息的发送和接收。
项目地址
AserbaosAndroid
aserbao的个人Android总结项目,希望这个项目能成为最全面的Android开发学习项目,这是个美好的愿景,项目中还有很多未涉及到的地方,有很多没有讲到的点,希望看到这个项目的朋友,如果你在开发中遇到什么问题,在这个项目中没有找到对应的解决办法,希望你能够提出来,给我留言或者在项目github地址提issues,我有时间就会更新项目没有涉及到的部分!项目会一直维护下去。当然,我希望是Aserbao'sAndroid 能为所有Android开发者提供到帮助!也期望更多Android开发者能参与进来,只要你熟悉Android某一块,都可以将你的代码pull上分支供大家学习!
总结
mqtt Android客户端的代码很有限,没有过多的操作!简单的配置,连接,然后接受回调处理接受消息就可以了,协议内容今后有时间再细看吧!
网友评论