springboot mqtt进行断开自动重连
final CallbackConnection callbackConnection = mqtt.callbackConnection();
callbackConnection.listener(new Listener() {
@Override
public void onConnected() {
log.info("mqtt callback onConnected");
}
@Override
public void onDisconnected() {
log.info("mqtt callback onDisconnected");
}
@Override
public void onPublish(UTF8Buffer utf8Buffer, Buffer buffer, Runnable ack) {
//当有设备向服务已订阅的主题发送消息时,该方法会消费
String topic = utf8Buffer.utf8().toString();
String payload = buffer.utf8().toString();
log.info("mq监听接收到的消息{},{}", topic, payload);
ack.run();
}
@Override
public void onFailure(Throwable throwable) {
}
});
callbackConnection.connect(new Callback<Void>() {
@Override
public void onSuccess(Void aVoid) {
//连接成功后会默认订阅主题($client/mengsu)
log.info("连接成功");
}
@Override
public void onFailure(Throwable throwable) {
}
});
// 创建相关的MQTT 的主题列表
Topic[] topics = {new Topic(topic, QoS.EXACTLY_ONCE)};
callbackConnection.subscribe(topics, new Callback<byte[]>() {
@Override
public void onSuccess(byte[] qoses) {
//主题订阅成功
log.info("mqtt subscribe " + topic + " success");
}
@Override
public void onFailure(Throwable value) {
//状态主题订阅失败
}
});
网友评论