public class MqttManage {
private static final String TAG = "MqttManage";
private static final String CLIENT_ID = "123";
private static final short KEEP_ALIVE = 30;//心跳
private static final int PERIOD_RETRY = 10 * 1000;
private static final String HIGHWAY = "highway";
private static final String PUSH = HIGHWAY + "_device";
private MqttAndroidClient mqttAndroidClient;
private MqttConnectOptions mqttConnectOptions;
private Context context;
private boolean connected = false;
private ScheduledExecutorService reconnectPool;//重连线程池
private IMqttListener iMqttListener = null;
private final String imei;
public interface IMqttListener {
void onMqttConnect();
void onMqttDisConnect();
void onMqttMessage(@NonNull Protocol message);
}
public MqttManage(@NonNull Context context, @NonNull String imei) {
this.context = context;
this.imei = imei;
mqttAndroidClient = new MqttAndroidClient(context, BuildConfig.Host_Mqtt, CLIENT_ID);
mqttAndroidClient.setCallback(mqttCallback);
mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setConnectionTimeout(30);
mqttConnectOptions.setKeepAliveInterval(KEEP_ALIVE);
mqttConnectOptions.setCleanSession(true);
}
public void setMqttListener(IMqttListener iMqttListener) {
this.iMqttListener = iMqttListener;
}
/**
* 连接MQTT服务器
*/
private synchronized void doClientConnection() {
if (!mqttAndroidClient.isConnected()) {
try {
mqttAndroidClient.connect(mqttConnectOptions, context, iMqttActionListener);
Log.d(TAG, "mqttAndroidClient-connecting-" + mqttAndroidClient.getClientId());
} catch (MqttException e) {
Log.e(TAG, e.toString());
}
}
}
private final IMqttActionListener iMqttActionListener = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.d(TAG, "connect-" + "onSuccess");
connected = true;
closeReconnectTask();
startSubscribe();
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
//connect-onFailure-MqttException (0) - java.net.UnknownHostException
Log.e(TAG, "connect-" + "onFailure-" + exception);
connected = false;
if (iMqttListener != null) {
iMqttListener.onMqttDisConnect();
}
startReconnectTask();
}
};
public boolean isConnected(){
return connected;
}
private MqttCallback mqttCallback = new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
//close-connectionLost-等待来自服务器的响应时超时 (32000)
//close-connectionLost-已断开连接 (32109)
Log.d(TAG, "close-" + "connectionLost-" + cause);
if (cause != null) {//null表示被关闭
startReconnectTask();
}
}
@Override
public void messageArrived(String topic, MqttMessage message) {
Log.d(TAG, String.format(Locale.getDefault(), "messageArrived topic[%s], message[%s]", topic, new String(message.getPayload())));
Protocol msg = new Gson().fromJson(new String(message.getPayload()), Protocol.class);
if(!topic.endsWith(msg.getImei())){
Log.w(TAG, "消息imei错误,不予处理");
return;
}
if(iMqttListener != null){
iMqttListener.onMqttMessage(msg);
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
try {
Log.d(TAG, "deliveryComplete-" + token.getMessage().toString());
} catch (MqttException e) {
e.printStackTrace();
}
}
};
public void connect() {
doClientConnection();
}
private synchronized void startReconnectTask() {
if (reconnectPool != null) {
return;
}
reconnectPool = Executors.newScheduledThreadPool(1);
reconnectPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
Log.d(TAG, "re-connect");
doClientConnection();
}
}, 0, PERIOD_RETRY, TimeUnit.MILLISECONDS);
}
private synchronized void closeReconnectTask() {
if (reconnectPool != null) {
reconnectPool.shutdownNow();
reconnectPool = null;
}
}
public void startSubscribe(){
final String topic = HIGHWAY + this.imei;
try {
mqttAndroidClient.unsubscribe(topic);
mqttAndroidClient.subscribe(topic, 0, context, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
if (iMqttListener != null) {
iMqttListener.onMqttConnect();
}
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.e(TAG, String.format(Locale.getDefault(), "subscribe topic[%s] failure", topic));
Log.e(TAG, exception.toString());
}
});
} catch (MqttException e) {
Log.e(TAG, e.toString());
}
}
public void pushMessage(@NonNull Protocol.Builder builder){
if(!mqttAndroidClient.isConnected()){
return;
}
try {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(new Gson().toJson(new Protocol(builder)).getBytes());
mqttMessage.setQos(0);
mqttMessage.setRetained(false);
mqttAndroidClient.publish(PUSH, mqttMessage, context, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.v(TAG, "publish success");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.e(TAG, "publish failure");
}
});
} catch (MqttException e) {
Log.v(TAG, e.toString());
}
}
public void release() {
if (mqttAndroidClient != null) {
mqttAndroidClient.close();
mqttAndroidClient.unregisterResources();
}
}
}
网友评论