WaterMachineThreadFactory
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Author: chihaojie
* @Date: 2020/5/21 15:14
* @Version 1.0
* @Note
*/
public class WaterMachineThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public static WaterMachineThreadFactory forName(String name) {
return new WaterMachineThreadFactory(name);
}
private WaterMachineThreadFactory(String name) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = name + "-" +
poolNumber.getAndIncrement() +
"-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
SessionMetaData
package com.emqx;
import lombok.Data;
/**
* @Author: chihaojie
* @Date: 2020/5/21 14:19
* @Version 1.0
* @Note
*/
@Data
public class SessionMetaData {
private volatile String deviceId;
private volatile long lastActivityTime;
public SessionMetaData(String deviceId, long lastActivityTime) {
this.deviceId = deviceId;
this.lastActivityTime = lastActivityTime;
}
void updateLastActivityTime() {
this.lastActivityTime = System.currentTimeMillis();
}
}
DeviceSessionManager
package com.emqx;
import java.util.concurrent.*;
/**
* @Author: chihaojie
* @Date: 2020/5/21 15:36
* @Version 1.0
* @Note
*/
public class DeviceSessionManager {
//TODO code refactor
//1.为心跳设备建立session
//2.session中维护最新活跃时间
//3.定期检查session中的僵尸设备,修改设备状态,踢出会话
private long sessionInactivityTimeout=10000;
private long sessionReportTimeout=5000;
protected ScheduledExecutorService schedulerExecutor;
private ConcurrentMap<String, SessionMetaData> sessions = new ConcurrentHashMap<>();
/**
* 注册会话
*/
private void registerSession(){
SessionMetaData currentSession = new SessionMetaData("deviceId",System.currentTimeMillis());
sessions.putIfAbsent("deviceId", currentSession);
}
/**
* 检查会话的活跃状态
*/
private void checkInactivityAndReportActivity() {
//System.out.println("【检查状态】");
long expTime = System.currentTimeMillis() - sessionInactivityTimeout;
sessions.forEach((k,v)->{
if (v.getLastActivityTime() < expTime) {
//过期
}else {
//活跃
v.updateLastActivityTime();
}
});
}
/**
* 更新活跃时间
* @param sessionId
* @return
*/
private SessionMetaData reportActivityInternal(String sessionId) {
SessionMetaData sessionMetaData = sessions.get(sessionId);
if (sessionMetaData != null) {
sessionMetaData.updateLastActivityTime();
}
return sessionMetaData;
}
/**
* 清除会话
* @param session
*/
private void deregisterSession(SessionMetaData session) {
SessionMetaData currentSession = sessions.get(session.getDeviceId());
sessions.remove(currentSession.getDeviceId());
}
public void init() {
//System.out.println("【构建线程池】");
this.schedulerExecutor = Executors.newScheduledThreadPool(2,WaterMachineThreadFactory.forName("device-online-scheduler"));
this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, 2000, sessionReportTimeout, TimeUnit.MILLISECONDS);
}
public void destroy() {
//销毁线程池
if (schedulerExecutor != null) {
schedulerExecutor.shutdownNow();
}
}
}
网友评论