美文网首页
利用线程池调度维护设备会话状态

利用线程池调度维护设备会话状态

作者: 大风过岗 | 来源:发表于2020-05-21 16:28 被阅读0次

WaterMachineThreadFactory

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @Author: chihaojie
 * @Date: 2020/5/21 15:14
 * @Version 1.0
 * @Note
 */
public class WaterMachineThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    public static WaterMachineThreadFactory forName(String name) {
        return new WaterMachineThreadFactory(name);
    }

    private WaterMachineThreadFactory(String name) {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                Thread.currentThread().getThreadGroup();
        namePrefix = name + "-" +
                poolNumber.getAndIncrement() +
                "-thread-";
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                namePrefix + threadNumber.getAndIncrement(),
                0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

SessionMetaData

package com.emqx;

import lombok.Data;

/**
 * @Author: chihaojie
 * @Date: 2020/5/21 14:19
 * @Version 1.0
 * @Note
 */
@Data
public class SessionMetaData {

    private volatile String deviceId;
    private volatile long lastActivityTime;

    public SessionMetaData(String deviceId, long lastActivityTime) {
        this.deviceId = deviceId;
        this.lastActivityTime = lastActivityTime;
    }

    void updateLastActivityTime() {
        this.lastActivityTime = System.currentTimeMillis();
    }

}

DeviceSessionManager

package com.emqx;

import java.util.concurrent.*;

/**
 * @Author: chihaojie
 * @Date: 2020/5/21 15:36
 * @Version 1.0
 * @Note
 */
public class DeviceSessionManager {
    //TODO code refactor
    //1.为心跳设备建立session
    //2.session中维护最新活跃时间
    //3.定期检查session中的僵尸设备,修改设备状态,踢出会话

    private long sessionInactivityTimeout=10000;

    private long sessionReportTimeout=5000;

    protected ScheduledExecutorService schedulerExecutor;

    private ConcurrentMap<String, SessionMetaData> sessions = new ConcurrentHashMap<>();

    /**
     * 注册会话
     */
    private void registerSession(){
        SessionMetaData currentSession = new SessionMetaData("deviceId",System.currentTimeMillis());
        sessions.putIfAbsent("deviceId", currentSession);
    }

    /**
     * 检查会话的活跃状态
     */
    private void checkInactivityAndReportActivity() {
        //System.out.println("【检查状态】");
        long expTime = System.currentTimeMillis() - sessionInactivityTimeout;
        sessions.forEach((k,v)->{
            if (v.getLastActivityTime() < expTime) {
                //过期
            }else {
                //活跃
                v.updateLastActivityTime();
            }
        });
    }

    /**
     *  更新活跃时间
     * @param sessionId
     * @return
     */
    private SessionMetaData reportActivityInternal(String sessionId) {
        SessionMetaData sessionMetaData = sessions.get(sessionId);
        if (sessionMetaData != null) {
            sessionMetaData.updateLastActivityTime();
        }
        return sessionMetaData;
    }



    /**
     * 清除会话
     * @param session
     */
    private void deregisterSession(SessionMetaData session) {
        SessionMetaData currentSession = sessions.get(session.getDeviceId());
        sessions.remove(currentSession.getDeviceId());
    }

    public void init() {
        //System.out.println("【构建线程池】");
        this.schedulerExecutor = Executors.newScheduledThreadPool(2,WaterMachineThreadFactory.forName("device-online-scheduler"));
        this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, 2000, sessionReportTimeout, TimeUnit.MILLISECONDS);
    }

    public void destroy() {
        //销毁线程池
        if (schedulerExecutor != null) {
            schedulerExecutor.shutdownNow();
        }

    }

}

相关文章

  • 利用线程池调度维护设备会话状态

    WaterMachineThreadFactory SessionMetaData DeviceSessionMa...

  • 线程池介绍与使用

    一、线程池介绍 一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等...

  • 多线程 | 4.线程池

    Java并发编程:线程池的使用 线程池基础 请求队列 线程池维护一定数量的线程,当线程池在运行状态的线程数量达上...

  • Spring Boot之ThreadPoolTaskExecut

    初始化线程池 corePoolSize 线程池维护线程的最少数量keepAliveSeconds 线程池维护线程...

  • Java 多线程

    多线程主要技术进程与线程线程状态阻塞状态分类线程的调度常用函数说明Thread类方法创建线程线程池线程安全向线程传...

  • 线程池

    线程池解决的核心问题:资源管理问题。 线程池运行机制最主要的三个点: 线程池如何维护自身状态; 线程池如何管理任务...

  • Java线程池的使用

    线程类型: 固定线程 cached线程 定时线程 固定线程池使用 cache线程池使用 定时调度线程池使用

  • 线程池 --------常见的四中线程池

    由于线程的频繁调度,而影响性能,通过线程池来维护,减少线程的频繁的创建和销毁。 在Executors统一管理,看一...

  • Android核心框架记录OkHttp、EventBus、Gli

    1. OkHttp框架的使用和原理 框架原理是建立线程池,利用调度线程,不断的取任务进行处理: Request,a...

  • 线程池ThreadPoolTaskExecutor 配置

    参数说明:corePoolSize:线程池维护线程最小数量maxPoolSize:线程池维护线程最大数量keepA...

网友评论

      本文标题:利用线程池调度维护设备会话状态

      本文链接:https://www.haomeiwen.com/subject/fhftahtx.html