美文网首页
redis 延时队列

redis 延时队列

作者: SingleException | 来源:发表于2018-08-30 21:42 被阅读0次
    import java.util.Set;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import com.google.gson.Gson;
    import redis.clients.jedis.Jedis;
    
    /**
     * 延时队列
     * @author zhang
     *
     */
    public class MyRedisQueue<T> {
    
        
        private static final ExecutorService  eService = Executors.newCachedThreadPool();
        
        
        public void delay(T op,String queueKey){
            eService.execute(new PushThread(op,queueKey));
        }
        public void pop(String queueKey){
            eService.execute(new PopThread(queueKey));
        }
        
        class PopThread extends Thread{
            private String queueKey ;
            public PopThread(String queueKey){
                this.queueKey = queueKey;
            }
            public void run(){
                System.out.println("开始监听:");  //对消息的业务处理
                while(!Thread.interrupted()){
                     Jedis jedis = RedisPool.getJedis();
                     Set<String> values =  jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);//取出消息
                     if(values.isEmpty()){
                         try {
                            if(jedis!=null){
                                RedisPool.closeConn(jedis);
                            }
                            Thread.sleep(5000);
                        } catch (InterruptedException e) {
                            break;
                        }
                         continue;
                     }
                     String s = values.iterator().next();
                     if(jedis.zrem(queueKey, s)>0){ //抢到了
                        System.out.println("处理消息:"+s);  //对消息的业务处理
                        if(jedis != null){
                            RedisPool.closeConn(jedis);
                        }
                     }
                }
            }
        };
        class PushThread extends Thread{
            private T op ;
            private String queueKey ;
            public PushThread(T op,String queueKey){
                this.op = op;
                this.queueKey = queueKey;
            }
            public void run(){
                Gson gson = new Gson();
                String value = gson.toJson(op);
                System.out.println("生产消息:"+value);  //对消息的业务处理
                Jedis jedis = RedisPool.getJedis();
                jedis.zadd(queueKey, System.currentTimeMillis() + 5000, value); // 塞入延时队列 ,5s 后再试
                if(jedis != null){
                    RedisPool.closeConn(jedis);
                }
            }
        };
    }
    
    
    

    相关文章

      网友评论

          本文标题:redis 延时队列

          本文链接:https://www.haomeiwen.com/subject/dtfjwftx.html