美文网首页
扩展RocketMQ 使其支持任意时间精度的消息延迟

扩展RocketMQ 使其支持任意时间精度的消息延迟

作者: 平凡人笔记 | 来源:发表于2020-05-31 10:30 被阅读0次

    前言

    本想使用rocketmq的延迟消息特性,但延迟的范围有限,仅支持
    1s 5s 10s 30s
    1m 2m 3m 4m
    5m 6m 7m 8m
    9m 10m 20m 30m
    1h 2h
    这18个等级,之外的延迟时间不支持,本文是为了说明如何利用rocketmq的延迟消息实现任意时间的消息延迟,如何进行扩展的

    技术架构

    rocketmq部署

    下载安装包

    https://mirror.bit.edu.cn/apache/rocketmq/4.7.0/rocketmq-all-4.7.0-bin-release.zip

    解压到目录

    /Users/mengfanxiao/Documents/third_software/mq/rocketmq/rocketmq-all-4.7.0-bin-release

    配置Java_home环境变量

    sudo vim /etc/profile
    jdk1.8安装报路径:/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home

    添加

    JAVA_HOME="/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home"
    export JAVA_HOME
    CLASS_PATH="$JAVA_HOME/lib"

    安装nameserver

    启动 mqnamesrv

    sh bin/mqnamesrv

    安装broker


    sh bin/mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true -c /Users/mengfanxiao/Documents/third_software/mq/rocketmq/rocketmq-all-4.7.0-bin-release/conf/broker.conf

    安装console

    下载 rocketmq 插件

    • 代码下载
    https://gitee.com/pingfanrenbiji/rocketmq-externals.git
    • 修改配置
    vim rocketmq-externals/rocketmq-console/src/main/resources/application.properties
    • 代码编译
    进入‘\rocketmq-externals\rocketmq-console’文件夹,执行‘mvn clean package -Dmaven.test.skip=true’,编译生成
    • 启动
    编译成功之后,Cmd进入‘target’文件夹,执行‘java -jar rocketmq-console-ng-1.0.1.jar’,启动‘rocketmq-console-ng-1.0.0.jar’。
    • 访问控制台页面
    吐槽下:页面和rabbitmq相比弱爆了

    代码

    producer

    producer配置

    producer 延迟消息生产流程解析

    延迟发送逻辑
    • 根据延迟时间和当前时间差值计算延迟等级
    Integer level = DelayLevelCalculate.calculateDefault(l);
    • 标记该消息为延迟消息 希望该消息被消费的时候 若发现是延迟消息即还未到消费的时间 则不消费 再次的触发生产者生产一次该消息
    fillMessage(msg, level, startSendTime);

    consumer

    consumer配置

    consumer 延迟消息消费流程解析

    消息消费逻辑
    若发现是延迟消息即还未到消费的时间 则不消费 再次的触发生产者生产一次该消息

    参考代码

    https://gitee.com/pingfanrenbiji/civism-rocket.git

    参考文献

    https://www.jianshu.com/p/4f968cd96b87

    相关文章

      网友评论

          本文标题:扩展RocketMQ 使其支持任意时间精度的消息延迟

          本文链接:https://www.haomeiwen.com/subject/agjlzhtx.html