美文网首页
RocketMQ 从入门到放弃

RocketMQ 从入门到放弃

作者: 我是晓梦啊 | 来源:发表于2021-11-17 19:29 被阅读0次

rocketmq介绍

RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点:

  • 能够保证严格的消息顺序
  • 提供丰富的消息拉取模式
  • 高效的订阅者水平扩展能力
  • 实时的消息订阅机制
  • 亿级消息堆积能力

选择RocketMQ的理由:

  • 强调集群模式无单点,可扩展,任意一点高可用,水平扩展
  • 海量数据的堆积能力,消息堆积后,写入延迟低
  • 支持上万个队列
  • 消息失败重试机制
  • 消息可查询
  • 开源社区活跃
  • 成熟度(==Rocketmq经受住了多年双11的亿万级消息的考验。==)

常见mq对比

image.png

消息中间件解决微服务中的痛点

应用解耦(异步解耦)

微服务架构中,存在着众多子系统,共同完成对外部用户的服务。

举个例子:当用户在订单系统下单时,订单子系统除了需要执行自己系统的业务逻辑之外,可能还需要调用库存子系统去扣减库存;调用会员子系统去增加用户的积分;调用数据分析子系统去插入用户下单的分析数据等等。用户的一个下单行为横跨了N个业务子系统,如果按照传统的同步串行方式一个接一个的调用,用户的下单操作将会执行较长的时间,对用户不友好。同时,由于是==同步调用==,一旦某一个子系统出现了宕机,访问超时等问题,整个下单业务都将陷入瘫痪。

消息队列可以将同步的系统调用转为异步的消息投递,一定程度上解除业务子系统间的耦合。当订单子系统执行完本地逻辑后,只需发送一个标识下单成功的消息,让下游依赖的子系统订阅此消息,消费处理消息来完成对应的业务。这样,用户的下单操作将很快完成,也不必担心下游子系统的故障会波及到订单系统。

虽然消息队列解除了业务子系统间的耦合,但同时也让业务子系统对消息队列系统有了很强的依赖关系,如果消息队列出现了故障,业务系统将会出现严重故障。

但由于消息队列在设计之初的目的十分简单明确:就是为了可靠的收发消息。因此其可用性,稳定性比绝大多数业务系统要高的多。天下没有免费的午餐,在微服务系统中引入消息队列依然是利大于弊的。

流量削峰

   **大多数系统的访问流量并不是一天24小时均匀稳定的,而是存在着一定的突发性。**例如电商的秒杀活动,系统配置在平时能承受住500qps,可在进行秒杀活动时,瞬时的qps可能达到了5000,为平常的10倍,如果不进行处理防护,将会导致服务瘫痪。

可以选择扩容服务器来应对可能的高峰流量,但扩容的服务器在秒杀活动过去之后多数会被闲置,从而造成很大的浪费;也可以设定并发的阈值,在访问并发数达到一定程度时就进行熔断限流,拒绝手慢的秒杀用户下单,可这样会让用户体验很差。

这时,消息队列就能派上用场了。我们可以在系统中使用消息队列作为缓冲,将每一个用户下单请求都作为一条消息存入消息队列,消息队列会根据消费者的消费速度以一种稳定的方式将流量传递给下游消费者系统,在消费者系统处理完下单操作后异步的通知用户下单结果。虽然用户可能会延迟一段时间才能得到反馈,但无论如何也比无法下单要好。

消息队列就像一个漏桶,可以将瞬时的尖峰流量缓存起来,并以一种稳定的速度传递给下游消费者,从而达到流量削峰的目的。

消息分发

    沿用之前的例子,订单子系统的下单成功操作在业务上可能有许多其它系统需要对其做出响应(扣库存,加积分,核销优惠券等等)。

按照传统的方式,需要订单系统挨个调用其它子系统的接口。随着业务的变化,每当有新的子系统需要对下单成功操作做出响应时,就需要改动订单系统的代码逻辑去适应新的需求。

而如果引入了消息队列,则可以在下单成功之后由订单系统发送一条消息,让感兴趣的其它子系统去订阅下单成功消息。如果新的系统也出现了依赖下单成功动作的需求,自行订阅对应消息即可,并不需要订单系统做出任何的改变。

可以利用消息分发机制可以实现代码逻辑的解耦

rocketmq架构图

image.png

rocketmq中间件组成

rocketmq由四大核心模块组成:producer、consumer、brokerServer、nameServer。其中brokerServer和nameServer是rocketmq的服务端,两者一起独立的对外提供服务;而producer和consumer可看做是rocketmq的客户端,一般依附于业务应用程序。

1.nameServer

两个作用:

    Broker管理:接受Broker集群的注册信息并且保存下来作为路由信息的基本数据;提供心跳检测机制,检查Broker是否还存活

    路由信息管理:每个NameServer中都保存着Boker集群的整个路由信息和用于客户端查询的队列信息。Produce和consume通过NameSever可以获取整个Broker集群的整个路由信息,从而进行消息的投递和消费

   **nameServer负责提供路由元数据**。例如,brokerServer通常是集群部署的,其结构会经常的发生变化。如果每次集群中broker机器的上下线都需要通知所有的消费者、生产者,效率太低。

因此,rocketmq引入了nameServer作为brokerServer路由信息的维护者,broker的每次上下线都和nameServer通信,由 nameServer来维护broker的路由信息,而producer和consumer通过访问nameServer获得对应broker的访问地址后,再向对应的broker发起请求。nameServer解除了broker和客户端的耦合依赖关系,大大提高了效率。

在其它主流消息队列中也存在着类似的维护元信息功能的组件,如zookeeper等。rocketmq的设计者认为zk的功能过于强大,杀鸡焉用牛刀,通过一个精简版的元数据服务nameServer,以减少对外部系统的耦合依赖,得以提供更可靠的服务。

nameServer同样能以集群形式对外提供服务。但和zk集群不同的是,==集群内的nameServer服务器并不会互相通信==,而是保持相互独立。

路由注册

NameServer通常也是以集群的方式部署,不过,NameServer是无状态的,即NameServer集群中的各个节点间是无差异的,各个节点间相互不进行的信息通讯。==那各个 节点中数据是如何进行同步的呢?== 在Broker节点启动时,轮询NameServer列表,与每个NameServer 节点建立长连接,发起注册请求。在nameServer内部维护着一个Broker列表,用来动态存储Broker的信息

==注意==

这是与其他像ZK、Eureka、Nacos 等注册中心不同的地方

Nameserver无状态的优缺点:

    优点:name Server集群搭建简单,扩容简单

    缺点:对于Broker,必须明确指出所有NameServer地址。否则未指出的将不会去注册,也正是因为如此,NameServer并不能随便扩容.因为,如果Broker不重新配置,新增的NameServer对于Broker来说是不可见的

Broker节点为了证明自己是活着的,为了维护与nameServer间的长连接,会将最新的信息以心跳包的方式上报给Nameserver,每隔30秒发送一次心跳包。心跳包中包含brokerId、Broker地址、Broker名称、Broker所属集群名等等。NameServer在接收心跳包后,会更新 ==心跳时间戳==,记录这个Broker的最新存活时间

路由剔除

由于Broker关机、宕机或网络抖动等原因,nameServer没有收到Broker的心跳,Nameserver可能会将其从Broker列表中剔除

NameServer中有一个定时任务,每隔10秒就会扫描一次Broker表,查看每一个Broker的==最新心跳时间戳距离当前时间是否超过120秒==,如果超过,则会判定Broker失效,然后将其从Broker列表中剔除

路由发现

RoketMq的路由发现采用的是pull模型。当ToPic路由信息出现变化时,==nameServer不会主动推送给客户端==,而是客户端定时拉取主题最新的路由。默认客户端每30秒会拉取一次最新的路由

  扩展: 

      1) push模型: 推送模型。其实时性较好,是一个“发布>订阅”模型,需要维护一个长连接。而长连接的维护是需要资源成本的,该模型适合于的场景
     * 实时性要求较高
      2) pull模型: 拉取模型,存在的问题是,实时性差
      3) Long polling模型: 长轮询模式,其实是对push和pull 模型的整合,充分利用了这两种的优势,屏蔽了劣势
客户端Nameserver选择策略

客户端 指producer 和 consumer

客户端在配置时必须写上Nameserver集群的地址,那么客户端到底连接的是哪个Namesever节点呢?

客户端首先会产生一个随机数,再与nameserver节点数量取模,此时得到的就是所要连接的节点索引,然后就会进行连接,如果连接失败,则会采用round -robin策略,逐个尝试这着去连接其他的节点

总结 : ==首先采用的随机策略进行选择,失败后 采用的是轮询策略==

扩展: zookeeper client 是如何选择zookeeper Server的?

简单来说就是:经过两次shuffle(打散),然后选择第一台Zookeeper Server。

详细的说就是,将配置文件中 的zk server地址进行第一次shuffle,然后随机选择一个。这个选择出的一般是一个hostname 。然后获取到该hostname 对应的索引ip,再对这些ip进行第二次shuffle,从shuffle的结果中去取第一个server地址进行来接。

2.Broker

   **brokerServer负责消息的接收,存储和分发,**是rocketmq最核心,最重量级的组成部分。

为实现高可用和高吞吐,brokerServer通常采用集群部署,共同对外提供服务。

模块构成
image.png

==Romoting module==:整个Broker的实体,负责处理来自clients端的请求。而这个broker实体则由以下模块构成

==client Manager==:客户端管理器。负责接收、解析客户端(produce/consume)请求,管理客户端, 例如,维护consume的topic订阅信息

==store Service==: 存储服务,提供方便简单的APi接口,处理==消息存储到物理硬盘==和==消息查询==功能

==HA serve==r : 高可用服务,提供Master broker和slave broker之间的数据同步功能

==Index server==:索引服务,根据特定的Message Key ,对投递到Broekr的消息进行索引服务,同时也提供根据Message key对消息进行快速查询的功能

3.Producer

producer负责发送消息。使用producer将消息发送到brokerServer,由brokerServer统一进行消息的分发。

rocketmq支持多种消息发送方式,如同步消息发送异步回调消息发送顺序消息发送以及单向消息发送(异步无回调)。除了单向消息发送,其余的发送方式均需要brokerServer返回发送结果的确认消息。

特别的,rocketmq的一大特色是支持发送事务消息(半消息),能一定程度上解决分布式事务的问题。

4.Consumer

  **consumer 负责消费producer发送的消息**。consumer会从brokerServer获取消息,并传递给应用程序。

rocketMQ使用的消息原语是At Least Once(至少一次成功消),如果一定时间内没有接收到consumer消息确认消费的响应结果,会将同一条消息再次投递给consumer。rocketmq采用ack机制保证消息的消费成功所以consumer可能会多次收到同一条消息,需要consumer的业务方做好==幂等==防护。最方便的幂等防护就是设置==业务标识key==,例如唯一流水号,订单号之类的

从使用者的角度来看,consumer分为两种方式来获取信息。一种是推模式(push consume),推模式看起来像是brokerServer将消息推给了consumer;另一种是拉模式(pull consume),拉模式看起来像是consumer主动的去brokerServer拉取消息(实际上,推模式是基于拉模式实现的)。

rocketmq基本模型概念

1.Topic 主题

    topic主题,代表一系列消息的集合,任何消息只能属于一个topic主题,主题是rocketmq进行消息发布订阅的最小单位。业务方可以通过创建并订阅各式各样的主题来满足自身的业务要求。**不同主题之间的消息在逻辑上没有关联。**

2.tag 标签

tag标签,tag从属于topic主题,主要用于对同一主题下的消息进行进一步区分。标签可以简单的认为是二级主题,通过tag标签功能,业务方可以方便的实现对各种二级主题的消费需求。

3.group组

    group组,代表着同一类客户端的集合。具体可分为**消费者组(\**consumer group\**)和\**生产者组(\*\*producer group\*\*)\****两种。**消费者组和生产者组之间没有任何关联(即使组名一样)。**
消费者组
    **消费者组代表着同一类型的消费者集群。**同一消费组内的消费者通常消费同样的消息且消息消费逻辑一致。消费者组的概念使得consumer集群在消费消息时,rocketmq可以通过负载均衡来做到消费消息时的高可用和容错。
生产者组
    **生产者组代表着同一类型的生产者集群。**一般来说,消息的生产者在发出了消息得到确认之后便完成了任务,似乎没有必要为此抽象出生产者集群的概念。

    **rocketmq具有发送事务消息的特性**,发送事务消息简单来说就是生产者先发送出一个**半消息(预消息),然后执行本地的事务,在事务完成提交之后再跟着发送一个事务确认消息**。半消息和普通消息的最大区别在于,半消息在投递给broker之后,broker不会马上让消费者进行消费,而是等待。只有当接收到生产者后续对应的的事务确认消息后,预消息和确认消息合二为一,才将对应的事务消息交给消费者去消费;而如果最终没有接收到事务确认消息,则会将消息直接删除不投递给消费者,以达到类似事务回滚的效果。事务消息对消费者来说是透明无感知的。

    可如果生产者在发送了预消息之后挂了怎么办?为解决这个问题,broker会在一定时间没有收到确认消息后,==定时的回查生产者当前事务消息的状态,回查的范围是整个生产者组中的某一个在线节点==。这种情况下,生产者和消费者一样,也构成了一个集群监听来自broker的回查。这样,即使发送消息的生产者发生了故障,在一定条件下整个生产者集群的事务消息发送功能依然可以正常运转。

通过生产者组的概念,rocketmq实现了事务消息投递的高可用。

4.Queue

队列,特定的生产者向特定的queue发送消息,消费者订阅特定的queue完成特定消息的接收

一个Topic中可以包含多个Queue,每个Queue中存放的就是该Topic的消息,一个Topic的Queue也被称为一个Topic中的==消息分区(Partition)==

一个queue中的消息不允许一个消费组中的多个消费者同时消费

==分片(Sharding)==

在Rocketmq中,分片指的是存放相应Topic 的Broker。每个分片会创建出相应数量的分区,即queue


image.png

5.消息标识

RocketMQ中每个消息拥有唯一的MessageId,且可以携带具有业务标识的Key,以方便对消息的查询,MessageId有两个:在生产者send()消息是会自动生产一个MessaeId(msgId),当消息到达Broker后,Broker也会自动生成一个MessageId(offsetMsgId)。msgId 、offsetMsgId与key都被称为消息标识。

  • msgId:由producer端生成,其生成规则为:==producerIp+进程pid+MessageClientIdSetter类的ClassLoader的hashCode+当前时间AutomicInteger自增计数器==
  • offsetMsgId:由broker端生成,其生成规则:==brokerIp+物理分区的offset(Queue中的偏移量)==
  • key:由用户指定业务相关的唯一标识, 比如订单流水号等

6.Message

    message消息是rocketmq中传递消息的主体,消息具有全局唯一的messageID属性,用户可以根据messageID查询进行消息的精确查询。

消息的内容可以是不超过rocketmq限制的、二进制的任意数据,rocketmq不会对消息承载的数据内容做任何干预。

持续更新中。。。

相关文章

网友评论

      本文标题:RocketMQ 从入门到放弃

      本文链接:https://www.haomeiwen.com/subject/qmqyaltx.html