美文网首页
RocketMQ消费者

RocketMQ消费者

作者: 杨健kimyeung | 来源:发表于2020-08-26 15:42 被阅读0次

    官方文档

    概要

    MQ中Pull和Push的两种消费方式

    Push方式:由消息中间件主动地将消息推送给消费者;采用Push方式,可以尽可能实时地将消息发送给消费者进行消费。但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较多导致消费时间比较久。概括起来地说就是“慢消费问题”),而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出,导致异常;

    Pull方式:由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息;采用Pull方式,如何设置Pull消息的频率需要重点去考虑,举个例子来说,可能1分钟内连续来了1000条消息,然后2小时内没有新消息产生(概括起来说就是“消息延迟与忙等待”)。如果每次Pull的时间间隔比较久,会增加消息的延迟,即消息到达消费者的时间加长,MQ中消息的堆积量变大;若每次Pull的时间间隔较短,但是在一段时间内MQ中并没有任何消息可以消费,那么会产生很多无效的Pull请求的RPC开销,影响MQ整体的网络性能;

    消费模式:有序消费和并发消费。

    有序消费模式是按照消息的顺序进行消费,并发消费的消费速度要比有序消费更快。

    消息模式: Clustering 和Broadcasting

    CLUSTERING:同组里的每个Consumer 只消费所订阅消息的一部分内容。

    BROADCASTING:同组里的每个Consumer 消费所订阅消息的全部内容

    • 顺序消费

    • 消息轨迹

    • ACL

    • pull消费

    • push消费

    核心注解

    <pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n24" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">@Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public @interface RocketMQMessageListener {
    // 指定consumerGroup
    String consumerGroup();
    // 指定消费的topic
    String topic();
    // 指定消费过滤方式: TAG
    SelectorType selectorType() default SelectorType.TAG;
    // 根据过滤方式,定义选择表达式
    String selectorExpress() default "*";
    // 消费方式:并发,顺序
    ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
    // 消费模式: 集群, 广播
    MessageModel messageModel() default MessageModel.CLUSTERING;
    //消费的并发线程数
    int consumeThreadMax() default 64;
    }</pre>

    使用

    application.yml配置

    <pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="yaml" cid="n27" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">rocketmq:
    name-server: 127.0.0.1:9876</pre>

    添加监听

    <pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n29" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">@Service
    @Slf4j
    @RocketMQMessageListener(consumerGroup = "hello", topic = "test-topic")
    public class RocketMqConsumerListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
    log.error(message);
    }
    }
    ​</pre>

    相关文章

      网友评论

          本文标题:RocketMQ消费者

          本文链接:https://www.haomeiwen.com/subject/hhffsktx.html