美文网首页
利用线程池调度维护设备会话状态

利用线程池调度维护设备会话状态

作者: 大风过岗 | 来源:发表于2020-05-21 16:28 被阅读0次

    WaterMachineThreadFactory

    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @Author: chihaojie
     * @Date: 2020/5/21 15:14
     * @Version 1.0
     * @Note
     */
    public class WaterMachineThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
    
        public static WaterMachineThreadFactory forName(String name) {
            return new WaterMachineThreadFactory(name);
        }
    
        private WaterMachineThreadFactory(String name) {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                    Thread.currentThread().getThreadGroup();
            namePrefix = name + "-" +
                    poolNumber.getAndIncrement() +
                    "-thread-";
        }
    
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                    namePrefix + threadNumber.getAndIncrement(),
                    0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
    

    SessionMetaData

    package com.emqx;
    
    import lombok.Data;
    
    /**
     * @Author: chihaojie
     * @Date: 2020/5/21 14:19
     * @Version 1.0
     * @Note
     */
    @Data
    public class SessionMetaData {
    
        private volatile String deviceId;
        private volatile long lastActivityTime;
    
        public SessionMetaData(String deviceId, long lastActivityTime) {
            this.deviceId = deviceId;
            this.lastActivityTime = lastActivityTime;
        }
    
        void updateLastActivityTime() {
            this.lastActivityTime = System.currentTimeMillis();
        }
    
    }
    
    

    DeviceSessionManager

    package com.emqx;
    
    import java.util.concurrent.*;
    
    /**
     * @Author: chihaojie
     * @Date: 2020/5/21 15:36
     * @Version 1.0
     * @Note
     */
    public class DeviceSessionManager {
        //TODO code refactor
        //1.为心跳设备建立session
        //2.session中维护最新活跃时间
        //3.定期检查session中的僵尸设备,修改设备状态,踢出会话
    
        private long sessionInactivityTimeout=10000;
    
        private long sessionReportTimeout=5000;
    
        protected ScheduledExecutorService schedulerExecutor;
    
        private ConcurrentMap<String, SessionMetaData> sessions = new ConcurrentHashMap<>();
    
        /**
         * 注册会话
         */
        private void registerSession(){
            SessionMetaData currentSession = new SessionMetaData("deviceId",System.currentTimeMillis());
            sessions.putIfAbsent("deviceId", currentSession);
        }
    
        /**
         * 检查会话的活跃状态
         */
        private void checkInactivityAndReportActivity() {
            //System.out.println("【检查状态】");
            long expTime = System.currentTimeMillis() - sessionInactivityTimeout;
            sessions.forEach((k,v)->{
                if (v.getLastActivityTime() < expTime) {
                    //过期
                }else {
                    //活跃
                    v.updateLastActivityTime();
                }
            });
        }
    
        /**
         *  更新活跃时间
         * @param sessionId
         * @return
         */
        private SessionMetaData reportActivityInternal(String sessionId) {
            SessionMetaData sessionMetaData = sessions.get(sessionId);
            if (sessionMetaData != null) {
                sessionMetaData.updateLastActivityTime();
            }
            return sessionMetaData;
        }
    
    
    
        /**
         * 清除会话
         * @param session
         */
        private void deregisterSession(SessionMetaData session) {
            SessionMetaData currentSession = sessions.get(session.getDeviceId());
            sessions.remove(currentSession.getDeviceId());
        }
    
        public void init() {
            //System.out.println("【构建线程池】");
            this.schedulerExecutor = Executors.newScheduledThreadPool(2,WaterMachineThreadFactory.forName("device-online-scheduler"));
            this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, 2000, sessionReportTimeout, TimeUnit.MILLISECONDS);
        }
    
        public void destroy() {
            //销毁线程池
            if (schedulerExecutor != null) {
                schedulerExecutor.shutdownNow();
            }
    
        }
    
    }
    
    

    相关文章

      网友评论

          本文标题:利用线程池调度维护设备会话状态

          本文链接:https://www.haomeiwen.com/subject/fhftahtx.html