美文网首页
6:RocketMq实战(生产者与消费者 各种实战)(扩展)(文

6:RocketMq实战(生产者与消费者 各种实战)(扩展)(文

作者: _River_ | 来源:发表于2021-04-20 21:14 被阅读0次
    目录
    1:RocketMQ生产消息 异步发送
    2:RocketMQ OneWay发送消息及多种场景对比
    3:RocketMQ 延迟消息实战 定时消息 与在电商系统中应用
    

    4: RocketMQ4.X里面的标签Tag实战和消息过滤原理
    5: PushConsumer(常用)、PullConsumer消费模式分析
    
    1:RocketMQ生产消息 异步发送
    可用于需求速度非常高的时候
    特别注意  最后defaultMQProducer的shutdown之前 需要先睡眠几秒钟
                避免异步结果没有回来就关闭了生产者
    
    
    //生产者同步发送消息
    //结果同步返回
    SendResult sendResult = defaultMQProducer.send(message);
    log.info("RocketMQ order 消息发送结果:{}", JSON.toJSONString(sendResult));
    
    
    //异步异步发送消息(需求要求速度非常高的时候 )
    //结果异步返回
     defaultMQProducer.send(message, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            log.info("RocketMQ order 消息异步发送结果:{}", JSON.toJSONString(sendResult));
            //发送成功可以触发其他的逻辑
        }
    
        @Override
        public void onException(Throwable throwable) {
            log.info("异步发送返回异常:"+throwable,throwable.getMessage());
        }
    });
    
    //关闭生产者之前先睡眠几秒钟
    TimeUnit.SECONDS.sleep(5);
    defaultMQProducer.shutdown();
    
    
    2:RocketMQ OneWay发送消息及多种场景对比
    简介:讲解使用RocketMQ发送oneway消息和使用场景,多种发送模式对比
    
    SYNC :同步(不丢失)
    应用场景:重要通知邮件、报名短信通知、营销短信系统等
    
    ASYNC :异步(不丢失)
    应用场景:对RT时间敏感,可以支持更高的并发,回调成功触发相对应的业务,比如注册成功 后通知积分系统发放优惠券
    
    ONEWAY :无需要等待响应 (很快 有可能丢失)
    使用场景:适用于某些耗时非常短,,只负责发送消息,但对可靠性要求并不高的场景,也不做要求。
    主要是日志收集,如日志服务LogServer,该服务作为消费端 通过ONEWAY发送消息过来,里面的队列可以存放消息。
    
    //生产者同步发送消息
    SendResult sendResult = defaultMQProducer.send(message);
    log.info("RocketMQ order 消息发送结果:{}", JSON.toJSONString(sendResult));
    
    //生产者同步发送OneWay消息
    //没有结果的返回
    defaultMQProducer.sendOneWay(message);
    
    
    3:RocketMQ 延迟消息实战 定时消息 与在电商系统中应用
    延迟消息:    
        Producer将消息发送到消息队列RocketMQ服务端,但并不期望这条消息立马投递,
        而是推 迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息,
        目前 支持固定精度的消息
    
        代码:rocketmq-store > MessageStoreConfig.java 属性 messageDelayLevel
         "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    
        //xxx是级别,1表示配置里面的第一个级别,2表示第 二个级别
        message.setDelayTimeLevel(xxx) 
    
    定时消息:目前rocketmq开源版本还不支持,商业版本则有,两者使用场景类似
    

    使用场景:
    1:消息生产和消费有时间窗口要求:
        比如在天猫电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。
        这条消息将会在30分钟以后投递绐消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。
        如没有支付则取消  如已支付则忽略
    
    2:通过消息触发一些定时任务:
        比如在某一固定时间点向用户发送提醒消息(用户会员于到期那天的9点  发送提醒消息)
    
    4: RocketMQ4.X里面的标签Tag实战和消息过滤原理
     一个Message只有一个Tag, tag是二级分类
        订单:数码类订单、食品类订单
        过滤分为Broker端和Consumer端过滤
            Consumer端过滤:完全可以根据业务需求进行调整,但是增加了很多无用的消息传输
            Broker端过滤:减少了无用的消息的进行网络传输,增加了broker的负担
    

    Consumer端过滤 (tag(常用) 或者 sql(了解))
        消息发送端
       Message message = new Message(topic,tag, JSON.toJSONString(data).getBytes());      
    
        消费端
        一般是监听*,
        或者指定tag(tag性能高,逻辑简单)    
        1:defaultMQPushConsumer.subscribe(topic, "*");
        2:defaultMQPushConsumer.subscribe(topic, "my_tag");
        3:defaultMQPushConsumer.subscribe(topic, "my_tag_A || my_tag_B");
    

        消息发送端:
        Message message = new Message(topic,tag, JSON.toJSONString(data).getBytes());
        message.putUserProperty("amount",amount)
       
       消费端:
       也可以使用SQL92(支持复杂逻辑  只支持PushConsumer中使用MessageSelector.bySql()  (了解即可)
       
        获取该Topic下  消息发送端设置了 putUserProperty "amount >5"的所有消息
        defaultMQPushConsumer.subscribe(topic,MessageSelector.bySql("amount >5") )
    
        SQL92的常见错误:
            The broker does not support consumer to filter message by SQL92
             解决:broker.conf里面配置如下
             enablePropertyFilter=true
    

        注意:如果想使用多个Tag,可以使用sql表达式,但是不建议,单一职责,多个队列     
    

    Broker端过滤(了解)
    第一次对比(hashcode):
    在Broker端进行MessageTag过滤,遍历message queue存储的message tag和 Consumer订阅的tag
    如果两者的hashcode不一样则跳过。符合的则传输绐Consumer。
    
    第二次对比(tag)
    为了防止 通过第一次对比的hashcode 是由于hash碰撞产生的
    Consumer收到过滤消息后也会进行匹配操作 ,但是对比真实的message tag而不是hashcode。
    
    优点和保障
         1:过滤中不访问commit log(后续提及),可以高效过滤  (优点)
         2:consume queue存储使用hashcode定长,节约空间 (优点 保障)      
         3:如果存在hash冲突,Consumer端可以进行再次确认  (保障)
    
    5: PushConsumer(常用)、PullConsumer消费模式分析
    消费端的三种重要概念:
        1:Pull
        2:Push  
        3:长轮训
    

    消费端一共有几种处理消息的方式 
        1:PushConsumer 本质是长轮训
        2:PullConsumer  本质是Pull
    

    Pull(很少用)
        消费端从Broker拉取消息,主动权在消费者端,可控性好;
        但间隔时间不好设置:
            时间间隔太短,则有可能什么都拉取不到请求,浪费资源;
            时间间隔太长,则消息不能及时处理。                   
    

    Push(常用)
        实时性高;
        增加Broker的压力,消费端能力不同,如果Push推送过快,消费端有可能会出现消息堆积。
    

    长轮询(结合pull 与 Push 的优点):
        消费端请求Broker的时候,Broker会保持当前连接一段时间默认是 15s,
        如果这段时间内有消息到达,则立刻返回绐Consumer,
        如果没消息的时间超过15s,则返回为空,Consumer再进行重新请求;
        
        主动权在Consumer中(因为需要Consumer进行重新请求),Broker即使有大量的消息也不会主动提送 Consumer。
        
        缺点:Broker需要保持Consumer的请求,会占用资源,需要消费端连接数可控 否则会一堆连接
    
    PushConsumer本质是长轮训
        DefaultMQPushConsumerImpl
        系统收到消息后自动处理消息和offset(后续提及)
        
        在broker端可以通过longPollingEnable=true来开启长轮询(默认开启)
        
        broker端代码(RocketMQ源码):broker.longpolling
        消息消费端代码:DefaultMQPushConsumerImpl->pullMessage->PullCallback    
               
        优雅关闭:主要是释放资源和保存Offset,调用shutdown。        
               
        虽然方法名是PushConsumer,但是代码里面大量使用了pull,
        因为使用长轮训方式:既有Broker的Push的实时性 又有Consumer的Pull的主动性。
     
     Offset的存放(后续会提及):
     集群消费模式存放在Broker端     广播消费模式存放在Consumer端
    
    PullConsumer (了解)(需要自己维护Offset)
        DefaultMQPullConsumer
    
        参考官方例子 org.apache.rocketmq.example.simple.PullConsumer
    
        1:进行MessageQueue遍历
        2:每一次获取MessageQueue 的消息的时候  获取下一条消息的Offset
        3:消费端维护Offset,需用本地存储Offset,存储内存、磁盘、数据库等
        4:处理不同状态的消息 4种状态:
          FOUND(成功)、NO_NEW_MSG(没有新消息)、 NO_MATCHED_MSG(没有匹配)、OFFSET_ILLRGL(偏移量错误)
    
    优雅关闭:主要是释放资源和保存Offset,需用Consumer端保存好Offset,特别是异常处理的时候
    灵活性高可控性强,但是编码复杂度很高
    
    Offset的存放(后续会提及):
    为什么PullConsumer必须把Offset存放在本地:
    因为这样可以更方便 请求获取对Offert后  进行各种处理
    

    项目连接

    请配合项目代码食用效果更佳:
    项目地址:
    https://github.com/hesuijin/hesuijin-study-project
    Git下载地址:
    https://github.com.cnpmjs.org/hesuijin/hesuijin-study-project.git
    
    rocketmq-module项目模块下 
    注意:因为需要修改相应配置 相关测试的代码
        生产者代码主要在  单元测试 
        消费者代码主要在  项目代码 com.example.rocketmq.demo.consumer.Junit包下

    相关文章

      网友评论

          本文标题:6:RocketMq实战(生产者与消费者 各种实战)(扩展)(文

          本文链接:https://www.haomeiwen.com/subject/eokolltx.html