美文网首页Redis
03_redis_延时队列

03_redis_延时队列

作者: A_l_A_n | 来源:发表于2020-07-06 16:19 被阅读0次

对于只有一组消费者的队列,使用redis就可以了。但是没有太多的高级特性,没有ack保证。

异步消息队列

Redis的List数据结构常用来做异步消息队列。使用rpush /lpush入队,lpop/rpop出队

队列空了

客户端中使用pop来获取数据,然后处理数据,处理完接着pop获取数据,处理数据,如果的循环。
但是队列空了,pop出来数据为空,继续pop,陷入死循环。无用的空轮询,空轮询拉高了cpu,redis 的QPS(每秒查询率)也会被拉高。
解决:使用sleep,让线程睡个一个小时。

队列延时

使用sleep会导致消息的延迟增大。
使用blpop,brpop
b blocking - 阻塞读。
阻塞读在队列没有消息的时候会休眠,等到消息到了,会进行唤醒。

空闲连接 自动断开

当消息一直阻塞,redis客户端连接就会成为闲置连接,当闲置连接时间久了,服务器就会主动断开连接,减少资源浪费。这个时候brpop/blpop就会报错。
所以编码时需要捕获异常并重试。

锁冲突

分布式锁加锁失败了怎么办?

1. 直接抛出异常,让用户稍后重试

2. sleep,稍后重试

sleep会阻塞当前队列处理线程,也会导致之后的队列出现延时。如果消息较多、碰撞比较频繁,sleep不合适。如果因为个别线程死锁,导致后续的队列永远得不到处理

3. 将请求转移到延时队列中,之后再重试

比较适合异步队列处理,将冲突的队列扔到另一个队列中稍后重试。

延时队列

通过zset来实现。
将消息序列化到zset 的value中,消息的到期处理时间作为score。使用多个线程进行轮询获取到期的消息进行处理。使用多个线程是为了保证可用性,万一某个线程挂了,还会有别的线程继续处理。因为是多线程还需要考虑并发。

import java.lang.reflect.Type;
import java.util.Set;
import java.util.UUID;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import redis.clients.jedis.Jedis;

public class RedisDelayingQueue<T> {
    static class TaskItem<T> {
        public String id;
        public T msg;
    }

    // fastjson 序列化对象中存在 generic 类型时,需要使用 TypeReference
    private Type TaskType = new TypeReference<TaskItem<T>>() {
    }.getType();
    private Jedis jedis;
    private String queueKey;

    public RedisDelayingQueue(Jedis jedis, String queueKey) {
        this.jedis = jedis;
        this.queueKey = queueKey;
    }

    public void delay(T msg) {
        TaskItem task = new TaskItem();
        task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid
        task.msg = msg;
        String s = JSON.toJSONString(task); // fastjson 序列化
        jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试
    }

    public void loop() {
        while (!Thread.interrupted()) {
            // 只取一条
            Set values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
            if (values.isEmpty()) {
                try {
                    Thread.sleep(500); // 歇会继续
                } catch (InterruptedException e) {
                    break;
                }
                continue;
            }
            String s = values.iterator().next();
            if (jedis.zrem(queueKey, s) > 0) { // 抢到了
                TaskItem task = JSON.parseObject(s, TaskType); // fastjson 反序列化
                this.handleMsg(task.msg);
            }
        }
    }

    public void handleMsg(T msg) {
        System.out.println(msg);
    }

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        RedisDelayingQueue queue = new RedisDelayingQueue<>(jedis, "q-demo");
        Thread producer = new Thread() {
            public void run() {
                for (int i = 0; i < 10; i++) {
                    queue.delay("codehole" + i);
                }
            }
        };
        Thread consumer = new Thread() {
            public void run() {
                queue.loop();
            }
        };
        producer.start();
        consumer.start();
        try {
            producer.join();
            Thread.sleep(6000);
            consumer.interrupt();

            consumer.join();
        } catch (InterruptedException e) {
        }
    }
}

上面的算法中同一个任务可能会被多个进程取到之后再使用 zrem 进行争抢,那些没抢到的进程都是白取了一次任务,这是浪费。可以考虑使用 lua scripting 来优化一下这个逻辑,将zrangebyscore 和 zrem 一同挪到服务器端进行原子化操作,这样多个进程之间争抢任务时就不会出现这种浪费了。

相关文章

  • 03_redis_延时队列

    对于只有一组消费者的队列,使用redis就可以了。但是没有太多的高级特性,没有ack保证。 异步消息队列 Redi...

  • RabbitMQ没有延时队列?我就教你一招,玩转延时队列

    什么是延时队列 延时队列:顾名思义,是一个用于做消息延时消费的队列。但是它也是一个普通队列,所以它具备普通队列的特...

  • RabbitMQ没有延时队列?我就教你一招,玩转延时队列

    什么是延时队列 延时队列:顾名思义,是一个用于做消息延时消费的队列。但是它也是一个普通队列,所以它具备普通队列的特...

  • 延迟队列

    延迟队列 延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后...

  • 延时队列

    一、什么是延时队列[https://www.jianshu.com/p/30f69aac6350] 延时队列相比于...

  • spring boot 集成rabbitmq 实现延迟队列

    rabbitmq 实现延迟队列 什么是延迟队列 延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息”是指当消息...

  • rabbitMQ-延时队列

    延时队列我们可以简单粗暴的理解它为延时发送消息的队列 那延时队列的应用场景有哪些呢,比如订单在一段时间内未支付则取...

  • RabbitMQ实现延时队列

    什么是延时队列 延迟队列首先它是一个队列,作为队列它的第一个特征是有序的,而之所以它被称为延时队列它还有一个更重要...

  • 关于时间轮

    最近翻看了去哪儿的QMQ的关于延时队列的源码,主要就是想要了解它在延时队列上实现的设计方案。 1、延时消息投递到d...

  • SpringBoot中,如何整合RabbitMQ实现延时队列?

    一、介绍 1、什么是延时队列?延时队列即就是放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费...

网友评论

    本文标题:03_redis_延时队列

    本文链接:https://www.haomeiwen.com/subject/czowqktx.html