目录
1:服务器配置文件核心配置 config
2:生产者核心配置(Java代码)
3:消费者核心配置(Java代码)
4:RocketMQ消息常见发送状态
5: 集群和广播模式下RocketMQ消费端处理
1:服务器配置文件核心配置 config
namesrvAddr:该节点所在集群地址 (192.168.159.129:9876;192.168.159.130:9876)
brokerClusterName :集群名称 (XdclassCluster)
brokerIP1 : Broker服务地址(设置为本服务所在服务器的具体IP 防止拿了内网IP)
brokerld : 0表示Master主节点 大于0表示从节点
brokerName :Broker节点名称(主从模式下 主节点和从节点的brokerName一致)
listenPort : Broker监听的端口号
brokerRole : Broker角色 ASYNC_MASTER/ SYNC_MASTER/ SLAVE
flushDiskType :刷盘策略,默认为ASYNC_FLUSH(异步刷盘),另外是SYNC_FLUSH(同步刷盘)
syncFlushTimeout :同步刷盘超时时间
autoCreateTopicEnable :是否自动创建主题Topic,开发建议为true,生产要为false
autoCreateSubscriptionGroup:是否允许Broker自动创建订阅组,建议线下开发开启,线上关闭
deleteWhen :每天执行删除过期文件的时间,默认每天凌晨4点
mapedFileSizeCommitLog :单个Conmmitlog文件大小,默认是 1GB
mapedFileSizeConsumeQueue: ConsumeQueue每个文件默认存30W条,可以根据项目调整
storePathCommitLog: Commitlog存储目录默认为${storePathRootDir}/commitlog
storePathRootDir :存储消息以及一些配置信息的根目录默认为用户的${HOME}/store
storePathlndex:消息索引存储路径
diskMaxUsedSpaceRatio :检测可用的磁盘空间大小,超过后会写入报错
2:生产者核心配置(Java代码)
DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
//该生产者所在group
defaultMQProducer.setProducerGroup(producerGroup);
//如果是 namesrvAddr形成集群模式 以 ; 分开 "IP1:9876;IP2:9876;"
defaultMQProducer.setNamesrvAddr(nameSrvAddr);
//是否走Vip通道
defaultMQProducer.setVipChannelEnabled(false);
//设置发送失败重试次数
defaultMQProducer.setRetryTimesWhenSendAsyncFailed(3);
//消息同步发送失败重试次数
defaultMQProducer.setRetryTimesWhenSendFailed(3);
//消息异步发送失败重试次数
defaultMQProducer.setRetryTimesWhenSendAsyncFailed(3);
// 消息超过默认字节4096后进行压缩
// defaultMQProducer.setCompressMsgBodyOverHowmuch(4096);
// 最大消息配置,默认128k
// defaultMQProducer.setMaxMessageSize(12);
// 自动创建服务器不存在的Topic,默认创建的队列数
// defaultMQProducer.setDefaultTopicQueueNums(4);
3:消费者核心配置(Java代码)
1:messageModel :消费者消费模式
CLUSTERING――默认是集群模式(该消费者组里面只能有一个消费者去消费该消息)
BROADCASTING — —广播模式(该消费者组里面所有的消费者都需要消费该消息)
2:offsetStore:消息消费进度存储器 (现在已经不允许设置)
有两个策略:RemoteBrokerOffsetStore 和 LocalFileOffsetStore
集群模式默认使用 RemoteBrokerOffsetStore
广播模式默认使用 LocalFileOffsetStore
3:队列初次创建时候 从哪里开始消费
CONSUME_FROM丄AST_OFFSET:默认策略,初次从该队列最尾开始消费,即跳过历史 消息,后续再启动接着上次消费的进
CONSUME_FROM_FIRST_OFFSET:初次从消息队列头部开始消费,即历史消息(还储存 在broker的)全部消费一遍,
后续再启动接着上次消费的进度开始消费
CONSUME_FROM_TIMESTAMP :从某个时间点开始消费,默认是半个小时以前,后续再启 动接着上次消费的进度开始消费
4:allocateMessageQueueStrategy:
负载均衡策略算法,即消费者分配到queue的算法,
默认值是 AllocateMessageQueueAveragely 即取模平均分配
5:线程数
consumeThreadMin最小消费线程池数量
consumeThreadMax最大消费线程池数量
6:消费者从broker一次性拉取消息的数量
pullBatchSize:默认32 可选配置
private DefaultMQPushConsumer creatDefaultMQPushConsumer() throws MQClientException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
//如果是 namesrvAddr形成集群模式 以 ; 分开 "IP1:9876;IP2:9876;"
defaultMQPushConsumer.setNamesrvAddr(nameSrvAddr);
//设置消费者存放的组
defaultMQPushConsumer.setConsumerGroup(pay_consumer_group);
//订阅的主题 topic
defaultMQPushConsumer.subscribe(topic, "*");
// 消费者消费模式
// defaultMQPushConsumer.setMessageModel(CLUSTERING);
// offsetStore:消息消费进度存储器 (现在已经不允许设置)
// defaultMQPushConsumer.setOffsetStore(RemoteBrokerOffsetStore);
//
// 队列初次创建时候 从哪里开始消费
// defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//
// 负载均衡策略算法 消费者分配队列的算法 默认就是AllocateMessageQueueAveragely 即取模平均分配
// defaultMQPushConsumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
//
// 最小消费线程池数量
// defaultMQPushConsumer.setConsumeThreadMin(20);
// 最大消费线程池数量
// defaultMQPushConsumer.setConsumeThreadMax(20);
//
// 一次性拉取数据的数量
// defaultMQPushConsumer.setPullBatchSize(32);
return defaultMQPushConsumer;
}
4:RocketMQ消息常见发送状态
消息发送有同步和异步
Broker消息投递状态讲解
FLUSH_DISK_TIMEOUT:
没有在规定时间内完成刷盘(刷盘策略需要为SYNC_FLUSH才会出这个错误)
FLUSH_SLAVE_TIMEOUT:
主从模式下,broker是SYNC_MASTER,没有在规定时间内完成主从同步
SLAVE_NOT_AVAILABLE:
主从模式下,broker是SYNC_MASTER,但是没有找到被配置成Slave的Broker
SEND_OK:
发送成功,没有发生上面的三种问题
5: 集群和广播模式下RocketMQ消费端处理
Topic下队列的奇偶数会影响Customer个数里面的消费数量
如果是4个队列,8个消息,4个节点则会各消费2条,如果不对等,则负载均衡会分配不均。
由于Cnnsumer是监听某个队列进行消费的,因此也会消费不均衡
如果consumer实例的数量比message queue的总数量还多的话,
多出来的consumer实例将 无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用,
所以需要控制让 queue的总数量大于等于consumer的数量
集群模式(默认)CLUSTERING
同一个Consumer_Group内订阅了该Topic的Consumer
只有一个Consumer 能消费该消息
例子:订单消息,一般是只被消费一次
广播模式 BROADCASTING
同一个Consumer_Group内订阅了该Topic的Consumer
所有的Consumer 都会消费该消息
例子:群公告,每个人都需要消费这个消息
通过消费端的 setMessageModel
测试:
同一个服务内 创建两个完全相同的消费者 消费者名称分别为
defaultMQPushConsumer.setInstanceName("consumer-instance");
defaultMQPushConsumerOther.setInstanceName("consumer-instance-Other");
设置为集群模式 发送消息 其中一个消费者收到消息
发一条 一个consumer消费一条
设置为广播模式 发送消息 其中两个消费者收到消息
发一条 两个consumer各消费一条
项目连接
请配合项目代码食用效果更佳:
项目地址:
https://github.com/hesuijin/hesuijin-study-project
Git下载地址:
https://github.com.cnpmjs.org/hesuijin/hesuijin-study-project.git
rocketmq-module项目模块下
注意:因为需要修改相应配置 相关测试的代码
生产者代码主要在 单元测试
消费者代码主要在 项目代码 com.example.rocketmq.demo.consumer.Junit包下
网友评论