基于ScheduledFuture来完成任务取消
代码如下:
public class ScheduledExecutorPingSender implements MqttPingSender {
private static final String CLASS_NAME = ScheduledExecutorPingSender.class.getName();
private final Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT,CLASS_NAME);
private ClientComms comms;
private ScheduledExecutorService executorService;
private ScheduledFuture scheduledFuture;
private String clientid;
public ScheduledExecutorPingSender(ScheduledExecutorService executorService){
if(executorService == null){
throw new IllegalArgumentException("ExecutorService cannot be null");
}
this.executorService = executorService;
}
@Override
public void init(ClientComms comms) {
if(comms == null){
throw new IllegalArgumentException("ClientComms cannot be null");
}
this.comms = comms;
clientid = comms.getClient().getClientId();
}
@Override
public void start() {
final String methodName = "start";
log.fine(CLASS_NAME,methodName,"659",new Object[]{clientid});
//第一个心跳间隔过完之后,要检查ping
schedule(comms.getKeepAlive());
}
@Override
public void stop() {
final String methodName = "stop";
log.fine(CLASS_NAME,methodName,"661",null);
if(scheduledFuture !=null){
//基于异步调度的任务取消行为
scheduledFuture.cancel(true);
}
}
@Override
public void schedule(long delayInMillseconds) {
scheduledFuture = executorService.schedule(new PingRunnable(),delayInMillseconds, TimeUnit.MILLISECONDS);
}
private class PingRunnable implements Runnable{
private static final String methodName = "PingTask.run";
@Override
public void run() {
String originalThreadName = Thread.currentThread().getName();
//设置当前线程的名称
Thread.currentThread().setName("MQTT Ping: " + clientid);
log.fine(CLASS_NAME,methodName,"660",new Object[]{Long.valueOf(System.nanoTime())});
comms.checkForActivity();
Thread.currentThread().setName(originalThreadName);
}
}
}
网友评论