美文网首页分布式
延迟队列(一)zset轮询服务

延迟队列(一)zset轮询服务

作者: 无聊之园 | 来源:发表于2019-06-13 21:34 被阅读2次

    mq因为延迟队列这种有优先级的队列实现很影响性能,所以很多mq都没有很灵活的支持。比如rocketmq,只支持固定的1min,2min什么的延迟时间。

    基于redis的zset做延迟队列。
    参考:有赞延迟队列设计

    大概的思想:
    一、把延迟队列独立成一个服务。

    1.1 通过http请求,发送put延迟任务。
    1)指定topic(比如,订单topic)。
    2) id(任务标识)。
    3)delay(延迟多久执行)。
    4)ttl(job超时时间,超过了多少秒还没有执行,则删除,或者,移入其他地方,之后进行排查,为什么这个任务执行不成功)。
    5) body(延迟队列的具体的数据,json格式。比如,订单id)。

    1.2 设计数据结构


    image.png

    1.3 流程
    1、客户端,put一个job任务。服务端接收job,放入job pool(job pool是任务存放的真正地方),再根据delay时间,转换成绝对时间,然后以(value:topic 连接 jobid,score:delayTime)按delayTime排序放入redis的zset中。这个zset就是delay bucket。topic和jobid连接时因为,所有的topic统一轮询,一个轮询就够了。
    2、有一个timer,轮询delay bucket,如果delayTime大于当前时间,则说明时间到了。只需要遍历zset的前面的元素,前面没有到期,则后面肯定没有到期。比如:每一秒轮询一次,一次轮询,则取出所有到期的job。
    3、如果到时间了,则把这个delay bucket中的元素取出来,然后放到,然后解析出topic和jobid, 然后放到对一个的topic的ready queue(这个ready Queue也可以有序,不过无序也没关系)。
    4、客户端,每次发送订阅topic请求, 然后这个topic如果没有到期的任务,则把这个http夯住,直到topic有数据,或者,http超时了。所以客户端需要while true不断发送http请求。
    5、重试机制怎么做?
    每次客户端从服务端拉取任务的时候,要有重试机制,把任务延长5秒,重新放入添加延迟队列,直到超时时间。
    6、超时了怎么办?
    如果超时了,则可以持久化到数据库,事后运维检查,这些任务为什么会超时。
    7、应答机制怎么做?
    客户端处理了这个topic后,发送finish给服务端,服务端,把这个任务删掉。
    8、客户端处理任务的幂等性要做:比如订单id,则查一下这个订单id是否已处理。或者update where 这种本身就是幂等的操作。

    其实有很多细节:

    思考:
    1、这种机制,其实延迟队列服务端压力不会很大,因为总共也就一秒轮询一次。
    2、服务端的压力很大应该来自于,客户端的轮询http,如果每个topic对应一个消费者,然后可能有很多台客户端服务器,也就是一个topic对应很多个http连接。
    那么,服务端的服务端连接池可能就几百个上千个。所以,如果topic太多,可能需要扩容。
    3、如果扩容的话。上面提到的各个数据结构的操作,可能就要用分布式锁锁住了。
    4、客户端一个一个取任务,任务太多的话,网络传输压力太大,可以考虑,一次指定取多少个,如果没有这么多个,则有多少个取多少个。

    缺点:
    1、需要引入一个定时服务器。所以,如果数据库轮询能处理,就数据库轮询吧。
    1、http轮询,比较麻烦。最好时做到mq那样,直接推。
    2、总感觉,这种方式很麻烦。

    这里有一个别人的实现:这里有一些地方值得商榷,比如,分布式锁方面(他没有做),消息重试方面(无限重试,客户端收到消息后,把过期时间设置到超时时间,然后重新放入队列),消息超时方面(消息超时无限重试)。http轮询方面。
    但是,如果有需要,可以直接修改这个代码,或者直接推翻重写。只是作为一个参考。
    https://github.com/yangwenjie88/delay-queue.git

    相关文章

      网友评论

        本文标题:延迟队列(一)zset轮询服务

        本文链接:https://www.haomeiwen.com/subject/awpsfctx.html