美文网首页
2020-07-20

2020-07-20

作者: fly林十一 | 来源:发表于2020-07-20 16:26 被阅读0次

<meta name="source" content="lake">

《RocketMQ实战与原理解析》读书笔记和总结

基本介绍

核心部分

NameServer集群、Broker集群、生产者、消费者

  • NameServer

负责管理所有的Broker消息

让生产者和消费者鬼知道集群里有哪些Broker,然后与之通信

  • Broker

实现数据多副本存储和高可用,使用主从架构

  • 生产者

向MQ发送消息

  • 消费者

从MQ获取消息

其他

  • topic

用于区分不同类型的消息

  • message queue

Topic可以根据需求设置一个或多个Message Queue,能支持增加并行处理的机器来提高处理速度。

工作原理

先启动NameServer,再启动Broker。这时消息队列已经可以提供服务了,想发送消息队列就使用producter,想接收消息队列就使用consumer。

消费者

一个是DefaultMQPushConsumer,由系统控制读取操作,收到消息后自动调用传入的处理方法来处理;(被动接收并自动处理)

另一个是DefaultMQPullConsumer,读取操作中的大部分功能由使用者自主控制。(主动拉取处理)

DefaultMQPushConsumer

  • Consumer的GroupName

  • Consumer的GroupName用于把多个Consumer组织到一起,提高并发处理能力,GroupName需要和消息模式(MessageModel)配合使用。

  • NameServer的地址和端口号

  • 可以填写多个,用分号隔开,达到消除单点故障的目的,比如 “ip1:port; ip2:port; ip3:port”。

  • Topic名称

  • Topic名称用来标识消息类型,需要提前创建。如果不需要消费某个Topic下的所有消息,可以通过指定消息的Tag进行消息过滤,比如:Consumer.subscribe("TopicTest", "tag1 ||tag2 || tag3"),表示这个Consumer要消费“TopicTest”下带有tag1或tag2或tag3的消息(Tag是在发送消息时设置的标签)。在填写Tag参数的位置,用null或者“*”表示要消费这个Topic的所有消息。

RocketMQ支持两种消息模式:Clustering和Broadcasting。

在Clustering模式下,同一个ConsumerGroup(GroupName相同)里的每个Consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消费的内容合起来才是所订阅Topic内容的整体,从而达到负载均衡的目的。

在Broadcasting模式下,同一个ConsumerGroup里的每个Consumer都能消费到所订阅Topic的全部消息,也就是一个消息会被多次分发,被多个Consumer消费。

长轮询方式

DefaultMQPushConsuer的源码中有很多PullRequest语句。

Push方式是Server端接收到消息后,主动把消息推送给Client端,实时性高。对于一个提供队列服务的Server来说,用Push方式主动推送有很多弊端:首先是加大Server端的工作量,进而影响Server的性能;其次,Client的处理能力各不相同,Client的状态不受Server控制,如果Client不能及时处理Server推送过来的消息,会造成各种潜在问题。

Pull方式是Client端循环地从Server端拉取消息,主动权在Client手里,自己拉取到一定量消息后,处理妥当了再接着取。Pull方式的问题是循环拉取消息的间隔不好设定,间隔太短就处在一个“忙等”的状态,浪费资源;每个Pull的时间间隔太长,Server端有消息到来时,有可能没有被及时处理。

“长轮询”方式通过Client端和Server端的配合,达到既拥有Pull的优点,又能达到保证实时性的目的。

“长轮询”的核心是,Broker端HOLD住客户端过来的请求一小段时间,在这个时间内有新消息到达,就利用现有的连接立刻返回消息给Consumer。

“长轮询”的主动权还是掌握在Consumer手中,Broker即使有大量消息积压,也不会主动推送给Consumer

DefaultMQPushConsumer的流量控制

PushConsumer会判断获取但还未处理的消息个数、消息总大小、Offset的跨度,任何一个值超过设定的大小就隔一段时间再拉取消息,从而达到流量控制的目的。此外ProcessQueue还可以辅助实现顺序消费的逻辑。

DefaultMQPullConsumer

  • 获取Message Queue并遍历

一个Topic包括多个Message Queue,如果这个Consumer需要获取Topic下所有的消息,就要遍历多有的Message Queue。如果有特殊情况,也可以选择某些特定的Message Queue来读取消息。

  • 维护Offsetstore

从一个Message Queue里拉取消息的时候,要传入Offset参数(long类型的值),随着不断读取消息,Offset会不断增长。这个时候由用户负责把Offset存储下来,根据具体情况可以存到内存里、写到磁盘或者数据库里等。

  • 根据不同的消息状态做不同的处理

拉取消息的请求发出后,会返回:FOUND、NO_MATCHED_MSG、NO_NEW_MSG、OFFSET_ILLEGAL四种状态,需要根据每个状态做不同的处理。比较重要的两个状态是FOUNT和NO_NEW_MSG,分别表示获取到消息和没有新的消息。实际情况中可以把while(true)放到外层,达到无限循环的目的。因为PullConsumer需要用户自己处理遍历Message Queue、保存Offset,所以PullConsumer有更多的自主性和灵活性。

Consumer的启动、关闭流程

Consumer分为Push和Pull两种方式,对于PullConsumer来说,使用者主动权很高,可以根据实际需要暂停、停止、启动消费过程。需要注意的是Offset的保存,要在程序的异常处理部分增加把Offset写入磁盘方面的处理,记准了每个Message Queue的Offset,才能保证消息消费的准确性。

DefaultMQPushConsumer的退出,要调用shutdown()函数,以便释放资源、保存Offset等。这个调用要加到Consumer所在应用的退出逻辑中。

RocketMQ集群可以有多个NameServer、Broker,某个机器出异常后整体服务依然可用。所以DefaultMQPushConsumer被设计成当发现某个连接异常时不立刻退出,而是不断尝试重新连接。

生产者

生产者向消息队列里写入消息,不同的业务场景需要生产者采用不同的写入策略。比如同步发送、异步发送、延迟发送、自定义发送规则、发送事务消息等。

发送消息的步骤

  1. 设置Producer的GroupName。
  2. 设置InstanceName,当一个Jvm需要启动多个Producer的时候,通过设置不同的InstanceName来区分,不设置的话系统使用默认名称“DEFAULT”。
  3. 设置发送失败重试次数,当网络出现异常的时候,这个次数影响消息的重复投递次数。
  4. 设置NameServer地址。
  5. 组装消息并发送。

发送延迟消息

RocketMQ支持发送延迟消息,Broker收到这类消息后,延迟一段时间再处理,使消息在规定的一段时间后生效。

延迟消息的使用方法是在创建Message对象时,调用setDelayTimeLevel(int level)方法设置延迟时间,然后再把这个消息发送出去。目前延迟的时间不支持任意设置,仅支持预设值的时间长度(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h)。比如setDelayTimeLevel(3)表示延迟10s。

自定义发送规则

可以通过把MessageQueueSelector的对象作为参数,根据传入的Object参数,或者根据Message消息内容确定把消息发往那个Message Queue,返回被选中的Message Queue。

存储队列位置信息

Offset

Offset是指某个Topic下的一条消息在某个Message Queue里的位置,通过Offset的值可以定位到这条消息,或者指示Consumer从这条消息开始向后继续处理。

DefaultMQPushConsumer类里有个函数用来设置从哪儿开始消费消息:比如setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET),这个语句设置从最小的Offset开始读取。如果从队列开始到感兴趣的消息之间有很大的范围,用CONSUME_FROM_FIRST_OFFSET参数就不合适了,可以设置从某个时间开始消费消息,比如Consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP),Consumer. setConsumeTimestamp("20131223171201"),时间戳格式是精确到秒的。

自定义日志输出

Log是监控系统状态,排查问题的重要手段,RocketMQ的默认Log存储位置是:${user.home}/Logs/rocketmqLogs, Log配置文件的设置可以通过JVM启动参数、环境变量、代码中的设置语句这三种方式来配置。

NameServer

  • 整个消息队列的状态服务器,集群的各个组件通过它来了解全局的信息。
  • 各个角色的机器都要定期向NameServer上报自己的状态,超时不上报的话,NameServer会认为某个机器出故障不可用了。
  • NameServer可以部署多个,相互之间独立。

集群状态的存储结构

image.png image.png

状态维护逻辑

其他角色会主动向NameServer上报状态,所以NameServer的主要逻辑在DefaultRequestProcessor类中,根据上报消息里的请求码做相应的处理,更新存储的对应信息。此外,连接断开的事件也会触发状态更新,具体逻辑在org.apache.rocketmq.namesrv.routeinfo的BrokerHousekeepingService类中。

为何不用ZooKeeper

ZooKeeper是Apache的一个开源软件,为分布式应用程序提供协调服务。那为什么RocketMQ要自己造轮子,开发集群的管理程序呢?

答案是ZooKeeper的功能很强大,包括自动Master选举等,RocketMQ的架构设计决定了它不需要进行Master选举,用不到这些复杂的功能,只需要一个轻量级的元数据服务器就足够了。中间件对稳定性要求很高,RocketMQ的NameServer只有很少的代码,容易维护,所以不需要再依赖另一个中间件,从而减少整体维护成本。

底层通信机制

待完善

RocketMQ是基于Netty库来完成RemotingServer和RemotingClient具体的通信实现的

相关文章

  • 魏城《终点站》

    终点站 作者:魏城 写于2020-07-20 伦敦 ..................................

  • Elasticsearch + Kibana本地搭建(windo

    2020-07-20 1.Elasticsearch需要至少3个节点才能构成集群,所以搭建时至少启动成功3个节点后...

  • 苹果电脑出现问好文件夹

    2020-07-20 解决办法:1、先强制关机2、按住cmd+R不放,然后点击开机,直至出现苹果logo,放开3、...

  • 2020-07-20

    2020-07-20 日精进打卡 姓名:彭新 宁波蓝天白云供应链管理有限公司 【日精进打卡第865天】 【知学习】...

  • 向量检索排序

    2020-07-20 如何计算特征距离 欧式距离: 两点间的真实距离,值越小,说明距离越近; 余弦距离:就是两个向...

  • 【D207】我们有选择跳出旧有模式的权力——写作营共读打卡第17

    2020-07-20,周一,晴 今天阅读《被讨厌的勇气》第一章(前部分)。 Day174《我们有选择跳出旧有模式的...

  • 独立开发者记录的第四周

    记录日期(2020-07-20) 抱歉更新晚了,本来打算不更新了的,但是想想,一定要坚持才行,所以熬个夜也要把它写...

  • 2020-09-28

    2020-07-20 22:09 中原焦点团队中20李倩坚持分享第185天2020年7月20日 万能问句:是什么让...

  • 相处难?

    很多时候,善良的建议反而会让人变得平庸,人生总需要一些一意孤行 2020-07-20 早起 晴了几天又是一上午的暴...

  • 杂记

    2020-07-20 你想成为什么样的人,完全取决于你能吃多少苦和有着日复一日不可懈怠的i自律。 真的,人要走出去...

网友评论

      本文标题:2020-07-20

      本文链接:https://www.haomeiwen.com/subject/sqjykktx.html