美文网首页
4:RocketMq实战(生产者与消费者 各种实战)(常见)(文

4:RocketMq实战(生产者与消费者 各种实战)(常见)(文

作者: _River_ | 来源:发表于2021-04-20 21:03 被阅读0次
    目录
    1:服务器配置文件核心配置 config 
    2:生产者核心配置(Java代码)
    3:消费者核心配置(Java代码)
    4:RocketMQ消息常见发送状态
    5: 集群和广播模式下RocketMQ消费端处理
    
    1:服务器配置文件核心配置 config
    namesrvAddr:该节点所在集群地址 (192.168.159.129:9876;192.168.159.130:9876)
    brokerClusterName :集群名称       (XdclassCluster)
    
    brokerIP1 : Broker服务地址(设置为本服务所在服务器的具体IP  防止拿了内网IP)
    brokerld : 0表示Master主节点  大于0表示从节点
    brokerName :Broker节点名称(主从模式下  主节点和从节点的brokerName一致)
    listenPort : Broker监听的端口号
    brokerRole : Broker角色 ASYNC_MASTER/ SYNC_MASTER/ SLAVE
        
    flushDiskType :刷盘策略,默认为ASYNC_FLUSH(异步刷盘),另外是SYNC_FLUSH(同步刷盘)
    syncFlushTimeout :同步刷盘超时时间    
    
    autoCreateTopicEnable :是否自动创建主题Topic,开发建议为true,生产要为false
    autoCreateSubscriptionGroup:是否允许Broker自动创建订阅组,建议线下开发开启,线上关闭
    
    
    deleteWhen :每天执行删除过期文件的时间,默认每天凌晨4点        
    mapedFileSizeCommitLog :单个Conmmitlog文件大小,默认是 1GB
    mapedFileSizeConsumeQueue: ConsumeQueue每个文件默认存30W条,可以根据项目调整
    storePathCommitLog: Commitlog存储目录默认为${storePathRootDir}/commitlog
    storePathRootDir :存储消息以及一些配置信息的根目录默认为用户的${HOME}/store
    storePathlndex:消息索引存储路径
    
    diskMaxUsedSpaceRatio :检测可用的磁盘空间大小,超过后会写入报错
    
    2:生产者核心配置(Java代码)
     DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
    
            //该生产者所在group
            defaultMQProducer.setProducerGroup(producerGroup);
            //如果是 namesrvAddr形成集群模式 以 ; 分开   "IP1:9876;IP2:9876;"
            defaultMQProducer.setNamesrvAddr(nameSrvAddr);
            //是否走Vip通道
            defaultMQProducer.setVipChannelEnabled(false);
            //设置发送失败重试次数
            defaultMQProducer.setRetryTimesWhenSendAsyncFailed(3);
    
            //消息同步发送失败重试次数
            defaultMQProducer.setRetryTimesWhenSendFailed(3);
            //消息异步发送失败重试次数
            defaultMQProducer.setRetryTimesWhenSendAsyncFailed(3);
    
    //        消息超过默认字节4096后进行压缩
    //        defaultMQProducer.setCompressMsgBodyOverHowmuch(4096);
    //        最大消息配置,默认128k
    //        defaultMQProducer.setMaxMessageSize(12);
    //        自动创建服务器不存在的Topic,默认创建的队列数
    //        defaultMQProducer.setDefaultTopicQueueNums(4);
        
    
    3:消费者核心配置(Java代码)
    1:messageModel :消费者消费模式
        CLUSTERING――默认是集群模式(该消费者组里面只能有一个消费者去消费该消息)
       BROADCASTING — —广播模式(该消费者组里面所有的消费者都需要消费该消息)
    
    2:offsetStore:消息消费进度存储器 (现在已经不允许设置)
        有两个策略:RemoteBrokerOffsetStore 和 LocalFileOffsetStore
        集群模式默认使用 RemoteBrokerOffsetStore
        广播模式默认使用 LocalFileOffsetStore 
         
    
    3:队列初次创建时候 从哪里开始消费
        CONSUME_FROM丄AST_OFFSET:默认策略,初次从该队列最尾开始消费,即跳过历史 消息,后续再启动接着上次消费的进
        CONSUME_FROM_FIRST_OFFSET:初次从消息队列头部开始消费,即历史消息(还储存 在broker的)全部消费一遍,
                                                    后续再启动接着上次消费的进度开始消费
        CONSUME_FROM_TIMESTAMP :从某个时间点开始消费,默认是半个小时以前,后续再启 动接着上次消费的进度开始消费
    
    4:allocateMessageQueueStrategy:
        负载均衡策略算法,即消费者分配到queue的算法,
        默认值是 AllocateMessageQueueAveragely 即取模平均分配
    
    5:线程数
        consumeThreadMin最小消费线程池数量
        consumeThreadMax最大消费线程池数量
    
    6:消费者从broker一次性拉取消息的数量
        pullBatchSize:默认32 可选配置
    
        private DefaultMQPushConsumer creatDefaultMQPushConsumer() throws MQClientException {
            DefaultMQPushConsumer  defaultMQPushConsumer = new DefaultMQPushConsumer();
            //如果是 namesrvAddr形成集群模式 以 ; 分开   "IP1:9876;IP2:9876;"
            defaultMQPushConsumer.setNamesrvAddr(nameSrvAddr);
            //设置消费者存放的组
            defaultMQPushConsumer.setConsumerGroup(pay_consumer_group);
            //订阅的主题 topic
            defaultMQPushConsumer.subscribe(topic, "*");
    
    //        消费者消费模式
    //        defaultMQPushConsumer.setMessageModel(CLUSTERING);
    //        offsetStore:消息消费进度存储器 (现在已经不允许设置)
    //        defaultMQPushConsumer.setOffsetStore(RemoteBrokerOffsetStore);
    //
    //        队列初次创建时候 从哪里开始消费
    //        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    //        
    //        负载均衡策略算法  消费者分配队列的算法  默认就是AllocateMessageQueueAveragely 即取模平均分配
    //        defaultMQPushConsumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
    //       
    //        最小消费线程池数量
    //        defaultMQPushConsumer.setConsumeThreadMin(20);
    //        最大消费线程池数量
    //        defaultMQPushConsumer.setConsumeThreadMax(20);
    //       
    //       一次性拉取数据的数量
    //        defaultMQPushConsumer.setPullBatchSize(32);
    
            return defaultMQPushConsumer;
        }
    
    4:RocketMQ消息常见发送状态
    消息发送有同步和异步
        Broker消息投递状态讲解
            FLUSH_DISK_TIMEOUT:
                没有在规定时间内完成刷盘(刷盘策略需要为SYNC_FLUSH才会出这个错误)
                
            FLUSH_SLAVE_TIMEOUT:
                主从模式下,broker是SYNC_MASTER,没有在规定时间内完成主从同步
        
            SLAVE_NOT_AVAILABLE:
                主从模式下,broker是SYNC_MASTER,但是没有找到被配置成Slave的Broker
    
            SEND_OK:
                 发送成功,没有发生上面的三种问题
    
    5: 集群和广播模式下RocketMQ消费端处理
    Topic下队列的奇偶数会影响Customer个数里面的消费数量    
        如果是4个队列,8个消息,4个节点则会各消费2条,如果不对等,则负载均衡会分配不均。
        由于Cnnsumer是监听某个队列进行消费的,因此也会消费不均衡
    
        如果consumer实例的数量比message queue的总数量还多的话,
        多出来的consumer实例将 无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用,
        所以需要控制让 queue的总数量大于等于consumer的数量
    

    集群模式(默认)CLUSTERING 
    同一个Consumer_Group内订阅了该Topic的Consumer 
    只有一个Consumer 能消费该消息
    
    例子:订单消息,一般是只被消费一次
    

    广播模式 BROADCASTING
    同一个Consumer_Group内订阅了该Topic的Consumer 
    所有的Consumer  都会消费该消息
    
    例子:群公告,每个人都需要消费这个消息
    

    通过消费端的 setMessageModel
    
    测试:
        同一个服务内 创建两个完全相同的消费者   消费者名称分别为
        defaultMQPushConsumer.setInstanceName("consumer-instance");
        defaultMQPushConsumerOther.setInstanceName("consumer-instance-Other");
    
        设置为集群模式  发送消息  其中一个消费者收到消息
        发一条 一个consumer消费一条
    
        设置为广播模式 发送消息  其中两个消费者收到消息
        发一条  两个consumer各消费一条
    

    项目连接

    请配合项目代码食用效果更佳:
    项目地址:
    https://github.com/hesuijin/hesuijin-study-project
    Git下载地址:
    https://github.com.cnpmjs.org/hesuijin/hesuijin-study-project.git
    
    rocketmq-module项目模块下 
    注意:因为需要修改相应配置 相关测试的代码
        生产者代码主要在  单元测试 
        消费者代码主要在  项目代码 com.example.rocketmq.demo.consumer.Junit包下

    相关文章

      网友评论

          本文标题:4:RocketMq实战(生产者与消费者 各种实战)(常见)(文

          本文链接:https://www.haomeiwen.com/subject/efkolltx.html