本文主要是讲解Mqtt3.1协议下jar包的相关Api
Mqtt对应jar包首先来看一下两个接口:
IMqttClient和IMqttAsyncClient为了更好的对比这两个接口我就将二者的方法写在一起了(注释的方法为IMqttAsyncClient的接口方法)
public interface IMqttClient {
void connect() throws MqttSecurityException, MqttException;
void connect(MqttConnectOptions var1) throws MqttSecurityException, MqttException;
IMqttToken connectWithResult(MqttConnectOptions var1) throws MqttSecurityException, MqttException;
/**
*以下是IMqttAsyncClient的connect方法
*/
IMqttToken connect() throws MqttException, MqttSecurityException;
IMqttToken connect(MqttConnectOptions var1) throws MqttException,
MqttSecurityException;
IMqttToken connect(Object var1, IMqttActionListener var2) throws
MqttException, MqttSecurityException;
IMqttToken connect(MqttConnectOptions var1, Object var2, IMqttActionListener var3) throws MqttException, MqttSecurityException;
============================================================
void disconnect() throws MqttException;
void disconnect(long var1) throws MqttException;
void disconnectForcibly() throws MqttException;
void disconnectForcibly(long var1) throws MqttException;
void disconnectForcibly(long var1, long var3) throws MqttException;
/**
*以下是IMqttAsyncClient的disconnect方法
*/
IMqttToken disconnect() throws MqttException;
IMqttToken disconnect(long var1) throws MqttException;
IMqttToken disconnect(Object var1, IMqttActionListener var2) throws MqttException;
IMqttToken disconnect(long var1, Object var3, IMqttActionListener var4) throws MqttException;
void disconnectForcibly() throws MqttException;
void disconnectForcibly(long var1) throws MqttException;
void disconnectForcibly(long var1, long var3) throws MqttException;
============================================================
void subscribe(String var1) throws MqttException, MqttSecurityException;
void subscribe(String[] var1) throws MqttException;
void subscribe(String var1, int var2) throws MqttException;
void subscribe(String[] var1, int[] var2) throws MqttException;
void subscribe(String var1, IMqttMessageListener var2) throws MqttException, MqttSecurityException;
void subscribe(String[] var1, IMqttMessageListener[] var2) throws MqttException;
void subscribe(String var1, int var2, IMqttMessageListener var3) throws MqttException;
void subscribe(String[] var1, int[] var2, IMqttMessageListener[] var3) throws MqttException;
/**
*以下是IMqttAsyncClient的subscribe方法
*/
IMqttToken subscribe(String var1, int var2) throws MqttException;
IMqttToken subscribe(String var1, int var2, Object var3, IMqttActionListener var4) throws MqttException;
IMqttToken subscribe(String[] var1, int[] var2) throws MqttException;
IMqttToken subscribe(String[] var1, int[] var2, Object var3, IMqttActionListener var4) throws MqttException;
IMqttToken subscribe(String var1, int var2, Object var3, IMqttActionListener var4, IMqttMessageListener var5) throws MqttException;
IMqttToken subscribe(String var1, int var2, IMqttMessageListener var3) throws MqttException;
IMqttToken subscribe(String[] var1, int[] var2, IMqttMessageListener[] var3) throws MqttException;
IMqttToken subscribe(String[] var1, int[] var2, Object var3, IMqttActionListener var4, IMqttMessageListener[] var5) throws MqttException;
============================================================
void unsubscribe(String var1) throws MqttException;
void unsubscribe(String[] var1) throws MqttException;
/**
*以下是IMqttAsyncClient的unsubscribe方法
*/
IMqttToken unsubscribe(String var1) throws MqttException;
IMqttToken unsubscribe(String[] var1) throws MqttException;
IMqttToken unsubscribe(String var1, Object var2, IMqttActionListener var3) throws MqttException;
IMqttToken unsubscribe(String[] var1, Object var2, IMqttActionListener var3) throws MqttException;
============================================================
void publish(String var1, byte[] var2, int var3, boolean var4) throws MqttException, MqttPersistenceException;
void publish(String var1, MqttMessage var2) throws MqttException, MqttPersistenceException;
/**
*以下是IMqttAsyncClient的publish方法
*/
IMqttDeliveryToken publish(String var1, byte[] var2, int var3, boolean var4) throws MqttException, MqttPersistenceException;
IMqttDeliveryToken publish(String var1, byte[] var2, int var3, boolean var4, Object var5, IMqttActionListener var6) throws MqttException, MqttPersistenceException;
IMqttDeliveryToken publish(String var1, MqttMessage var2) throws MqttException, MqttPersistenceException;
IMqttDeliveryToken publish(String var1, MqttMessage var2, Object var3, IMqttActionListener var4) throws MqttException, MqttPersistenceException;
============================================================
void setCallback(MqttCallback var1);
/**
*以下是IMqttAsyncClient的setCallback方法
*/
void setCallback(MqttCallback var1);
============================================================
MqttTopic getTopic(String var1);//IMqttAsyncClient无此方法
============================================================
boolean isConnected();
/**
*以下是IMqttAsyncClient的isConnected方法
*/
boolean isConnected();
============================================================
String getClientId();
String getServerURI();
/**
*以下是IMqttAsyncClient的getClientId和getServerURI方法
*/
String getClientId();
String getServerURI();
============================================================
IMqttDeliveryToken[] getPendingDeliveryTokens();
void setManualAcks(boolean var1);
void messageArrivedComplete(int var1, int var2) throws MqttException;
void close() throws MqttException;
/**
*以下是IMqttAsyncClient对应的方法
*/
IMqttDeliveryToken[] getPendingDeliveryTokens();
void setManualAcks(boolean var1);
void messageArrivedComplete(int var1, int var2) throws MqttException;
void close() throws MqttException;
}
可以看到这两个接口定义的方法基本相同,从接口名也可看出一个是同步的一个是异步的,这两个接口对应的唯一实现类分别是:MqttClient和MqttAsyncClient使用的时候根据实际需求使用;接下来是初步实现的一个Mqtt连接(采用了前台服务,守护服务,广播监听来实现互相唤醒尽最大可能不被系统杀掉)连接的建立核心部分在initData()方法:
public class IqPushService extends Service {
private MqttAsyncClient mqttClient;
private MqttConnectOptions options;
private RegisterDeviceResult registerDeviceResult;
private Subscription subscription;
private boolean currentStatus;
private static String message = "";
private static long preTime;
private static String pakageName = "";
@Override
public void onCreate() {
super.onCreate();
//使用了前台服务,防杀
NotificationCompat.Builder builder = new NotificationCompat.Builder(this);
builder.setSmallIcon(android.R.drawable.ic_menu_manage);
builder.setContentTitle("前台服务");
builder.setContentText("推送");
Notification notification = builder.build();
startForeground(110, notification);
}
@Nullable
@Override
public IBinder onBind(Intent intent) {
return null;
}
@Override
public int onStartCommand(final Intent intent, int flags, final int startId) {
pakageName = getPackageName();
initData();
if (subscription != null && !subscription.isUnsubscribed()) {
subscription.unsubscribe();
}
subscription = Observable.interval(5, 20, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
boolean flag = ProcessCheckUtils.checkServiceRunning(IqPushService.this, ProtectedService.class.getName());
if (!flag) {
Intent intent1 = new Intent(getApplicationContext(), ProtectedService.class);
startService(intent1);
}
reConnect();
}
});
return super.onStartCommand(intent, flags, startId);
}
//连接的建立
private void initData() {
registerDeviceResult = AppDeviceInfo.getDeviceResult();
if (registerDeviceResult != null && NetworkUtils.isConnected()) {
RegisterDeviceResult.DataBean dataBean = registerDeviceResult.getData();
if (dataBean != null) {
try {
mqttClient = new MqttAsyncClient("tcp://" + dataBean.getHost() + ":" + dataBean.getPort(), dataBean.getClient_id(), new MemoryPersistence());
options = new MqttConnectOptions();
options.setUserName(dataBean.getUsername());
options.setPassword(dataBean.getPassword().toCharArray());
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
options.setAutomaticReconnect(false);
options.setCleanSession(false);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
reConnect();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
if (!message.equals(mqttMessage.toString()) || System.currentTimeMillis() - preTime > 5000) {
message = mqttMessage.toString();
preTime = System.currentTimeMillis();
sendMessage(s, mqttMessage.toString());
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
mqttClient.connect(options, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken iMqttToken) {
if (!TextUtils.isEmpty(registerDeviceResult.getData().getTopic())) {
try {
JSONArray jsonArray = new JSONArray(registerDeviceResult.getData().getTopic());
for (int i = 0; i < jsonArray.length(); i++) {
mqttClient.subscribe(jsonArray.getString(i), 2);
}
sendStatus(true);
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Override
public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
sendStatus(false);
reConnect();
}
});
} catch (MqttException e) {
e.printStackTrace();
}
}
}
}
private void reConnect() {
if (mqttClient == null) {
sendStatus(false);
if (NetworkUtils.isConnected()) {
Intent intent = new Intent("com.ebanswers.reiq");
intent.setPackage(pakageName);
getApplicationContext().sendBroadcast(intent);
}
} else if (!mqttClient.isConnected()) {
if (currentStatus)
sendStatus(false);
if (NetworkUtils.isConnected()) {
Log.d("IqPushService_connect", "connect");
try {
mqttClient.reconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
} else {
if (!currentStatus)
sendStatus(true);
}
}
private void sendMessage(String topic, String message) {
Intent intent = new Intent("com.ebanswers.message");
intent.putExtra("topic", topic);
intent.putExtra("message", message);
intent.setPackage(pakageName);
sendBroadcast(intent);
}
private void sendStatus(boolean isSuccess) {
currentStatus = isSuccess;
Intent intent = new Intent("com.ebanswers.status");
intent.putExtra("isSucess", isSuccess);
intent.setPackage(pakageName);
sendBroadcast(intent);
}
@Override
public void onDestroy() {
super.onDestroy();
subscription.unsubscribe();
Intent intent = new Intent("com.ebanswers.reiq");
intent.setPackage(pakageName);
getApplicationContext().sendBroadcast(intent);
}
}
网友评论