美文网首页
redis-delayqueue

redis-delayqueue

作者: zhangsanzhu | 来源:发表于2018-12-29 09:32 被阅读0次

    redis-delayqueue

    适用场景

    相比于专业的消息队列,Redis的优点就是简单,适用于只有一个消费者的场景.

    重点

    没有ack机制保障,肯能会丢.可靠性相对于kafka差一点.
    brpop/blpop 属于阻塞读,队列为空的时候,会进入sleep,队列不为空,会被唤醒.
    brpop/blpop 有个问题,长时间阻塞会被认为是空闲连接,服务器会自动断开连接,要记得捕获异常,并重试.

    实现原理

    基本操作

    127.0.0.1:6379> rpush n kkkk kkkk lll
    (integer) 3
    127.0.0.1:6379> lpop n
    "kkkk"
    127.0.0.1:6379> lpop n
    "kkkk"
    127.0.0.1:6379> lpop n
    "lll"
    127.0.0.1:6379> rpush n kkkk kkkk lll
    (integer) 3
    127.0.0.1:6379> rpop n
    "lll"
    127.0.0.1:6379> rpop n
    "kkkk"
    127.0.0.1:6379> rpop n
    "kkkk"
    127.0.0.1:6379> rpush n kkkk kkkk lll  dasfasd asdfasd ddd assaa qqq eee rrr ooo ppp
    (integer) 12
    127.0.0.1:6379> brpop n 1
    1) "n"
    2) "ppp"
    127.0.0.1:6379> brpop n 1
    1) "n"
    2) "ooo"
    127.0.0.1:6379> brpop n 1
    1) "n"
    2) "rrr"
    127.0.0.1:6379> brpop n 1
    

    实现

    public class RedisDelayingQueue<T> {
    
        static class TaskItem<T> {
            public String id;
            public T msg;
        }
    
        private Type TaskType = new TypeReference<TaskItem<T>>() {}.getType();
    
        private Jedis jedis;
        private String queueKey;
    
        public RedisDelayingQueue(Jedis jedis, String queueKey) {
            this.jedis = jedis;
            this.queueKey = queueKey;
        }
    
        public void delay(T msg) {
            TaskItem<T> task = new TaskItem<T>();
            task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid
            task.msg = msg;
            String s = JSON.toJSONString(task); // fastjson 序列化
            jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试
        }
    
        public void loop() {
            while (!Thread.interrupted()) {
                // 只取一条
                Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
                if (values.isEmpty()) {
                    try {
                        Thread.sleep(500); // 歇会继续
                    } catch (InterruptedException e) {
                        break;
                    }
                    continue;
                }
                String s = values.iterator().next();
                if (jedis.zrem(queueKey, s) > 0) { // 抢到了
                    TaskItem<T> task = JSON.parseObject(s, TaskType); // fastjson 反序列化
                    this.handleMsg(task.msg);
                }
            }
        }
    
        public void handleMsg(T msg) {
            System.out.println(msg);
        }
    
        public static void main(String[] args) {
            Jedis jedis = new Jedis("127.0.0.1", 6379);
            RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");
            Thread producer = new Thread() {
    
                public void run() {
                    for (int i = 0; i < 10; i++) {
                        queue.delay("codehole" + i);
                    }
                }
    
            };
            Thread consumer = new Thread() {
    
                public void run() {
                    queue.loop();
                }
    
            };
            producer.start();
            consumer.start();
            try {
                producer.join();
                Thread.sleep(6000);
                consumer.interrupt();
                consumer.join();
            } catch (InterruptedException e) {
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:redis-delayqueue

          本文链接:https://www.haomeiwen.com/subject/snfulqtx.html