美文网首页
一种简单的Java延迟处理器

一种简单的Java延迟处理器

作者: coder_jerry | 来源:发表于2018-04-18 20:03 被阅读170次

    最近做一个新项目时,有这么一个场景:使用TDDL数据库分库分表,且按SQL读写分离。在为系统做缓存层时,考虑到并发读写,可能会出现这么个问题:

    缓存未设置或已过期时,写数据,主库已完成但未拉起缓存,从库也未完成同步,此时有查询请求,将会访问到从库,把旧值查询出来,如果此时写数据线程已经用最新数据拉起缓存,那读数据线程将会用旧值将缓存覆盖,导致缓存与数据库不一致,在缓存失效或下次修改之前,缓存数据将一直是过期的。

    这种情况虽然比较少见,但还是有一定概率出现的。为了屏蔽主从同步可能带来的问题,一种比较简单的解决方法是延迟更新缓存,给数据库一定时间完成同步再进行缓存设置,这样就能保证最后缓存的是新数据。当然这种方案只能解决数据库同步延迟不大的普通情况。

    如果是一些读多写少的数据,也可以在修改完成改一遍缓存,再放到延迟任务改一遍。

    为此实现了一个简单的延迟任务处理器,主要思路是将任务放入DelayQueue,由一个专用线程从DelayQueue中取出到期的任务,放入执行线程池中执行。整体比较简单。

    import com.google.common.util.concurrent.ThreadFactoryBuilder;
    import java.lang.Thread.UncaughtExceptionHandler;
    import java.util.List;
    import java.util.concurrent.DelayQueue;
    import java.util.concurrent.Delayed;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import org.apache.commons.collections.CollectionUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.DisposableBean;
    import org.springframework.beans.factory.InitializingBean;
    
    /**
     * 延迟任务处理器
     * @author jerryli
     * @date 2018/3/27
     */
    public class DelayedTaskExecutor implements InitializingBean, DisposableBean{
    
        private static final Logger logger = LoggerFactory.getLogger(DelayedTaskExecutor.class);
    
        private String name;
    
        private ThreadPoolExecutor executor;
    
        private volatile boolean running;
    
        private ThreadPoolExecutor putExecutor ;
    
        private DelayQueue<Task> delayQueue = new DelayQueue<>();
    
        public DelayedTaskExecutor(String name){
            this.name = name;
        }
    
        public void submit(Runnable runnable, long delayedTime){
            Task task = new Task(runnable,delayedTime);
            delayQueue.offer(task);
            logger.info("{}添加延迟任务 {} 延迟{}ms",this.name,task,delayedTime);
        }
    
        @Override
        public void destroy() throws Exception {
            running = false;
    
            if(putExecutor != null){
                putExecutor.shutdown();
                boolean shutdown = putExecutor.awaitTermination(8000, TimeUnit.MILLISECONDS);
                if(!shutdown){
                    List<Runnable> notRunList = putExecutor.shutdownNow();
                    if(CollectionUtils.isNotEmpty(notRunList)){
                        logger.warn("强制关闭延迟任务转移线程...未执行任务数量{}",notRunList.size());
                    }
                }
            }
            if(executor != null){
                executor.shutdown();
                boolean shutdown = executor.awaitTermination(8000, TimeUnit.MILLISECONDS);
                if(!shutdown){
                    List<Runnable> notRunList = executor.shutdownNow();
                    if(CollectionUtils.isNotEmpty(notRunList)){
                        logger.warn("强制关闭延迟任务执行线程池...未执行任务数量{}",notRunList.size());
                    }
                }
            }
        }
    
        @Override
        public void afterPropertiesSet() throws Exception {
            running = true;
    
            executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() + 1,
                Runtime.getRuntime().availableProcessors() * 2,0,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),
                new ThreadFactoryBuilder()
                    .setNameFormat(this.name + "-pool-%d")
                    .setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
                        @Override
                        public void uncaughtException(Thread t, Throwable e) {
                            logger.error("延迟任务执行线程异常,thread={}",t,e);
                        }
                    })
                    .build());
    
            putExecutor = new ThreadPoolExecutor(1,1,0,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),
                new ThreadFactoryBuilder()
                    .setNameFormat(this.name + "-submiter")
                    .setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
                        @Override
                        public void uncaughtException(Thread t, Throwable e) {
                            logger.error("延迟任务转移线程异常,thread={}",t,e);
                        }
                    })
                    .build());
    
            putExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    while(running || delayQueue.size() > 0){
                        try {
                            Task task = delayQueue.poll(1000, TimeUnit.MILLISECONDS);
                            if(task == null){
                                continue;
                            }
                            logger.info("开始执行延迟任务 {}",task);
                            executor.execute(task.getRunnable());
                        } catch (InterruptedException e) {
                            logger.error("延迟任务转移线程异常interrupted",e);
                        } catch (Throwable e){
                            logger.error("延迟任务转移线程异常",e);
                        }
                    }
                    logger.info("延迟任务转移线程退出");
                }
            });
        }
    
        class Task implements Delayed{
    
            // 提交时间
            private long submitTime;
    
            private long runningTime;
    
            private Runnable runnable;
    
            Task(Runnable runnable,long delayTime){
                this.runnable = runnable;
                this.submitTime = System.currentTimeMillis();
                this.runningTime = submitTime + delayTime;
            }
    
            @Override
            public long getDelay(TimeUnit unit) {
                return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            }
    
            @Override
            public int compareTo(Delayed o) {
                if(o == null || !(o instanceof Task)){
                    return 1;
                }
                if(o == this){
                    return 0;
                }
                Task otherTask = (Task) o;
                if(this.runningTime > otherTask.runningTime){
                    return 1;
                }else if(this.runningTime == otherTask.runningTime){
                    return 0;
                }else{
                    return -1;
                }
            }
    
            public Runnable getRunnable() {
                return runnable;
            }
        }
    
    }
    
    

    测试代码:

    public static void main(String[] args) throws Exception {
            // 使用Spring可不用调用afterPropertiesSet和destroy方法
            DelayedTaskExecutor delayedTaskExecutor = new DelayedTaskExecutor("cache-delay");
            delayedTaskExecutor.afterPropertiesSet();
            delayedTaskExecutor.submit(() -> System.out.println("hello"), 2000);
            delayedTaskExecutor.destroy();
        }
    

    相关文章

      网友评论

          本文标题:一种简单的Java延迟处理器

          本文链接:https://www.haomeiwen.com/subject/zhybkftx.html