添加引用
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
功能封装,我这里订阅的topic 和clientId一样,有需要的可以自行修改
class PushClient : MqttCallback {
private val TAG = "PushClient"
//mqtt连接client
var mClient: MqttAndroidClient? = null
//mqtt连接参数设置
var options: MqttConnectOptions? = null;
/*连接成功之后设置连接断开的缓冲配置*/
var disconnectedBufferOptions: DisconnectedBufferOptions? = null
var context: Context? = null
var countDownTimer: CountDownTimer? = null//倒计时重连
var exit = false//是否退出
constructor(host: String, port: Int, context: Context, clientId: String) {
this.context = context;
exit = false
var uri = "tcp://$host:$port"
// var uri = "ssl://$host:$port"
mClient = MqttAndroidClient(context, uri, clientId)
mClient?.setCallback(this)
//mqtt连接参数设置
options = MqttConnectOptions();
//设置自动重连
options?.isAutomaticReconnect = false;
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录
// 这里设置为true表示每次连接到服务器都以新的身份连接
options?.isCleanSession = false;
// 设置超时时间 单位为秒
options?.connectionTimeout = 5;
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options?.keepAliveInterval = 20;
/*连接成功之后设置连接断开的缓冲配置*/
disconnectedBufferOptions = DisconnectedBufferOptions();
//开启
disconnectedBufferOptions?.isBufferEnabled = true;
//离线后最多缓存100调
disconnectedBufferOptions?.bufferSize = 100;
//不一直持续留存
disconnectedBufferOptions?.isPersistBuffer = false;
//删除旧消息
disconnectedBufferOptions?.isDeleteOldestMessages = false;
}
companion object {
fun create(host: String, port: Int, context: Context, clientId: String): PushClient {
return PushClient(host = host, port = port, context = context, clientId = clientId)
}
}
/**
* 链接
*/
fun connect() {
Log.e(TAG, "链接${mClient?.serverURI}")
mClient?.connect(options, null, object : IMqttActionListener {
override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
Log.e(TAG, "链接失败")
resetConnect()
}
override fun onSuccess(asyncActionToken: IMqttToken?) {
Log.e(TAG, "链接成功")
if (countDownTimer != null) {
countDownTimer?.cancel()
countDownTimer = null
}
subscribeToTopic()
}
})
}
/**
* 3s后重新链接
*/
fun resetConnect() {
countDownTimer = object : CountDownTimer(3000, 1000) {
override fun onTick(millisUntilFinished: Long) {
Log.e(TAG, "等待重新链接 $millisUntilFinished")
}
override fun onFinish() {
Log.e(TAG, "重新链接")
connect()
}
}.start()
}
override fun messageArrived(topic: String?, message: MqttMessage?) {
Log.e(TAG, "messageArrived")
}
/**
* @desc 连接断开回调
* 可在这里做一些重连等操作
*/
override fun connectionLost(cause: Throwable?) {
Log.e(TAG, "connectionLost")
if (!exit) {
resetConnect()
}
}
override fun deliveryComplete(token: IMqttDeliveryToken?) {
Log.e(TAG, "deliveryComplete")
}
/**
* 断开链接
*/
fun disConnect() {
exit = true
if (countDownTimer != null) {
countDownTimer?.cancel()
countDownTimer = null
}
Log.e(TAG, "断开链接")
mClient?.disconnect(null, object : IMqttActionListener {
override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
Log.e(TAG, "断开链接失败")
}
override fun onSuccess(asyncActionToken: IMqttToken?) {
Log.e(TAG, "断开链接成功")
}
})
}
/**
* 订阅主题
*/
var set = false
fun subscribeToTopic() {
Log.e(TAG, "消息订阅${mClient?.clientId}")
mClient?.setBufferOpts(disconnectedBufferOptions)
//主题、QOS、context,订阅监听,消息监听
var token = mClient?.subscribe(mClient?.clientId, 2, null, object : IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken?) {
Log.e(TAG, "订阅成功")
}
override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
Log.e(TAG, "订阅失败")
}
}) { topic, message ->
Log.e(TAG, "$topic 消息到达 $message")
showMessage(message = message.toString())
}
Log.e(TAG, "token ${token?.isComplete}")
}
/**
* 是否链接
*/
fun isConnected(): Boolean {
return if (mClient == null) {
false
} else {
mClient!!.isConnected
}
}
private fun showMessage(message: String) {
//项目之前有极光推送,为了方便就直接极光本地推送
val jsonObject = JSONObject(message)
val ln = JPushLocalNotification()
ln.extras = message
ln.builderId = 0
ln.content = jsonObject.getString("content")
// ln.extras = " {\"content\":\"您收到一个案件处理任务,请及时处理\",\"messageId\":215,\"realId\":285,\"summary\":\"\",\"title\":\"处理案件提醒\",\"type\":200,\"url\":\"\"}";
ln.title = jsonObject.getString("title")
ln.notificationId = 11111111
ln.broadcastTime = System.currentTimeMillis() + 1000 * 1
JPushInterface.addLocalNotification(context, ln)
}
}
网友评论