by shihang.mai
1. 角色关系
RocketMq角色关系- broker向NameServer注册自己的信息
- producer向NameServer 获取broker信息,producer向某个topic发送消息时,拿topic去匹配获取的信息,获取该Topic对应的broker,消息就向这些broker去投放
- consumer向NameServer获取broker信息,consumer消费某个Topic时,拿topic去匹配获取的信息,获取该Topic对应的broker,consumer就去消费broker里面的消息
2 集群模型
RocketMq集群模型- 上图存在3组broker,分别为brokerName=s1,brokerName=s2,brokerName=s3
- 其中brokerName=s1有两个,brokerId=0的机为master,brokerId>0为salve
- brokerName =s2, brokerName =s3, brokerName =s1(brokerId=0)形成了master集群
- 有一个Topic1,Topic1是一个抽象,它是所有queue的抽象,Topic1在各个broker上都有queue,并且queue的数量是可以不一样的
- consumer可以向master消费,也可以向salve消费.master与salve需要数据同步,而数据同步分为两种方式
- 同步执行(同步双写):producer向broker发送msg,master同步msg给salve,然后salve返回ok给master,master再返回给producer
- 异步执行:producer向broker发送msg, broker立刻返回.而master和salve间的数据同步它们自行执行
Topic2只为了说明第5点.左上角的定位Ip,定位queue需要看完源码后才能理解
3. 生产者发出消息
3.1 同步发出消息
发出,等到broker返回消息
producer.send(msg);
3.2 异步发出消息
发出,利用事件监听broker返回消息
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(Throwable throwable) {
}
});
3.3 单向发出消息
发出,不等待broker返回消息
nmsProducer.sendOneway(msg);
3.4 事务发出消息
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
//执行本地事务
/**
* 回滚 LocalTransactionState.ROLLBACK_MESSAGE;
* broker定时检查producer事务状态 LocalTransactionState.UNKNOW;
* 成功 LocalTransactionState.COMMIT_MESSAGE
*/
//
// LocalTransactionState.ROLLBACK_MESSAGE;
//LocalTransactionState.UNKNOW;
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
//broker端回调监测本地事务
return LocalTransactionState.COMMIT_MESSAGE;
}
});
3.5 发消息重试
//producer端
producer.setRetryTimesWhenSendFailed(2);
producer.setRetryAnotherBrokerWhenNotStoreOK(true)
3.6 顺序发出消息
//producer端利用MessageQueueSelector
4. 消费模式(同一组消费组内)
4.1 广播消费
即消费组内所有节点都消费该消息,但该模式broker不会重投。offset记录在consumer上。
consumer.setMessageModel(MessageModel.BROADCASTING);
4.2 集群消费(默认)
一组consumer派出一个节点消费,若消费不成功,broker会重投。offset记录在broker上。
consumer.setMessageModel(MessageModel.CLUSTERING);
4.3 消费时sql过滤
需要producer端和consumer端互相配合,并且需要配置broker.conf
//producer端
msg.putUserProperty("age","18");
//consumer端
MessageSelector messageSelector = MessageSelector.bySql("age >= 18 and age <= 28");
consumer.subscribe("topic",messageSelector);
//broker.conf添加配置
enablePropertyFilter=true
//broker启动时加载上面的文件broker.conf
4.4 consumer消费重试
/**
* consumer端 返回CONSUMER_LATER
* 只有在集群模式下,有消息重投机制. 1s 5s .....2h
/
4.5 顺序消费
//consumer端利用MessageListenerOrderly
4.6 订阅关系一致
订阅关系一致指的是同一个消费者Group ID下所有Consumer实例所订阅的Topic、Group ID、Tag必须完全一致
https://zhuanlan.zhihu.com/p/58728454
https://zhuanlan.zhihu.com/p/58755005
网友评论