美文网首页
RocketMQ学习

RocketMQ学习

作者: John13 | 来源:发表于2020-05-08 11:51 被阅读0次

    RocketMQ深度解析
    RocketMQ之一:RocketMQ整体介绍
    RocketMQ之二:分布式开放消息系统RocketMQ的原理与实践(消息的顺序问题、重复问题、可靠消息/事务消息)
    RocketMQ之六:RocketMQ消息存储
    《浅入浅出》-RocketMQ

    高可用(集群)

    使用镜像模式搭建高可用集群,可以配置数据同步到所有节点还是指定数量的节点以满足实际需求。

    RocketMQ Slave不可以写,可以读,类似于MySQL的主从机制。

    Broker是以Group为单位提供服务。一个Group里面分Master和Slave,Slave从Master同步数据,支持同步双写异步复制两种策略。

    • 单Master模式:
      无需多言,一旦单个broker重启或宕机,一切都结束了!很显然,线上不可以使用。
    • 多Master模式:
      全是Master,没有Slave。当然,一个broker宕机了,应用是无影响的,缺点在于宕机的Master上未被消费的消息在Master没有恢复之前不可以订阅。
    • 多Master多Slave模式(异步复制):
      多对Master-Slave,高可用!采用异步复制的方式,主备之间短暂延迟,MS级别。Master宕机,消费者可以从Slave上进行消费,不受影响,但是Master的宕机,会导致丢失掉极少量的消息。
    • 多Master多Slave模式(同步双写):
      和上面的区别点在于采用的是同步方式,也就是在Master/Slave都写成功的前提下,向应用返回成功,可见不论是数据,还是服务都没有单点,都非常可靠!缺点在于同步的性能比异步稍低。

    多master多slave模式部署架构图:

    • NameServer用于保存集群配置、选举Leader等
    • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Broker Master建立长连接,且定时向Master发送心跳。Producer只能将消息发送到Broker master。
    • Consumer则不一样,它同时与提供Topic服务的Master、Slave建立长连接,既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

    消息存储

    1、CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,顺序写,随机读。
    2、 ConsumeQueue:消息消费的逻辑队列,作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。
    3、IndexFile:索引队列用来存储消息的索引key,为了消息查询提供了一种通过key或时间区间来查询消息的方法。
    4、MapedFileQueue:对连续物理存储的抽象封装类,源码中可以通过消息存储的物理偏移量位置快速定位该offset所在MappedFile(具体物理存储位置的抽象)、创建、删除MappedFile等操作;
    5、MappedFile:文件存储的直接内存映射业务抽象封装类,源码中通过操作该类,可以把消息字节写入PageCache缓存区(commit),或者原子性地将消息持久化的刷盘(flush);

    消息刷盘

    • 同步刷盘:
      只有在消息真正持久化至磁盘后,RocketMQ的Broker端才会真正地返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用领域。RocketMQ同步刷盘的大致做法是,基于生产者消费者模型,主线程创建刷盘请求实例—GroupCommitRequest并在放入刷盘写队列后唤醒同步刷盘线程—GroupCommitService,来执行刷盘动作(其中用了CAS变量和CountDownLatch来保证线程间的同步)。这里,RocketMQ源码中用读写双缓存队列(requestsWrite/requestsRead)来实现读写分离,其带来的好处在于内部消费生成的同步刷盘请求可以不用加锁,提高并发度。
    • 异步刷盘:
      能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。异步和同步刷盘的区别在于,异步刷盘时,主线程并不会阻塞,在将刷盘线程wakeup后,就会继续执行。

    消息类别

    • 普通消息
    • 顺序消息:
      一个topic下有多个队列,为了保证发送有序,RocketMQ提供了MessageQueueSelector队列选择机制,他有三种实现:

    我们可使用Hash取模法,让同一个订单发送到同一个队列中,再使用同步发送,只有同个订单的创建消息发送成功,再发送支付消息。这样,我们保证了发送有序。

    RocketMQ的topic内的队列机制,可以保证存储满足FIFO(First Input First Output 简单说就是指先进先出),剩下的只需要消费者顺序消费即可。

    RocketMQ仅保证顺序发送,顺序消费由消费者业务保证!!!

    这里很好理解,一个订单你发送的时候放到一个队列里面去,你同一个的订单号Hash一下是不是还是一样的结果,那肯定是一个消费者消费,那顺序是不是就保证了?

    但要实现严格顺序消息,简单且可行的办法就是:
    保证生产者 - MQServer - 消费者是一对一对一的关系

    • 广播消息
    • 定时消息:
    // 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递,例如 2016-03-07 16:21:00 投递。如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。    
    long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-03-07 16:21:00").getTime();    
    msg.setStartDeliverTime(timeStamp);​    
    // 发送消息,只要不抛异常就是成功    
    SendResult sendResult = producer.send(msg); 
    
    • 延时消息:
    Message sendMsg = new Message(topic, tags, message.getBytes());
    sendMsg.setDelayTimeLevel(delayLevel);
    // 默认3秒超时
    SendResult sendResult = rocketMQProducer.send(sendMsg);
    
    • 批量消息
    • 事务消息:


    消息发送

    • 同步消息(可靠同步发送):
      同步发送是指消息发送方发出数据后,会阻塞直到MQ服务方发回响应消息。
      应用场景:
      此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
    • 异步消息(可靠异步发送):
      异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。MQ 的异步发送,需要用户实现异步发送回调接口(SendCallback),在执行消息的异步发送时,应用不需要等待服务器响应即可直接返回,通过回调接口接收服务器响应,并对服务器的响应结果进行处理。
      应用场景:
      异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
    • 单向(one-way)消息:
      单向(Oneway)发送特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
      应用场景:
      适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

    消息消费

    • Push模式:即MQServer主动向消费端推送;
    • Pull模式:即消费端在需要时,主动到MQServer拉取。

    但在具体实现时,Push和Pull模式都是采用消费端主动拉取的方式。

    消费端的Push模式是通过长轮询的模式来实现的,就如同下图:

    Consumer端每隔一段时间主动向broker发送拉消息请求,broker在收到Pull请求后,如果有消息就立即返回数据,Consumer端收到返回的消息后,再回调消费者设置的Listener方法。如果broker在收到Pull请求时,消息队列里没有数据,broker端会阻塞请求直到有数据传递或超时才返回。

    当然,Consumer端是通过一个线程将阻塞队列LinkedBlockingQueue<PullRequest>中的PullRequest发送到broker拉取消息,以防止Consumer一致被阻塞。而Broker端,在接收到Consumer的PullRequest时,如果发现没有消息,就会把PullRequest扔到ConcurrentHashMap中缓存起来。

    broker在启动时,会启动一个线程不停的从ConcurrentHashMap取出PullRequest检查,直到有数据返回。

    如果Consumer的数量比Message Queue的总数量还多的话,多出来的Consumer将无法分到Queue,实际上只是起到backup的作用。

    消费模式

    • 集群消费:



      当使用集群消费模式时,消息队列 RocketMQ 认为任意一条消息只需要被集群内的任意一个消费者处理即可。
      一个Consumer Group中的Consumer实例平均分摊消费消息。例如某个Topic有 9 条消息,其中一个Consumer Group有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中的 3 条消息。

    • 广播消费:


    当使用广播消费模式时,消息队列 RocketMQ 会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次。
    一条消息被多个Consumer消费,即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group中的每个Consumer都消费一次。在广播消费中的Consumer Group概念可以认为在消息划分方面无意义。

    负载均衡

    • 发送端负载均衡

    首先分析一下RocketMQ的客户端发送消息的源码:

    在整个应用生命周期内,生产者需要调用一次start方法来初始化,初始化主要完成的任务有:

    1. 如果没有指定namesrv地址,将会自动寻址
    1. 启动定时任务:更新namesrv地址、从namsrv更新topic路由信息、清理已经挂掉的broker、向所有broker发送心跳...
    1. 启动负载均衡的服务

    初始化完成后,开始发送消息,发送消息的主要代码如下:

    代码中需要关注的两个方法tryToFindTopicPublishInfo和selectOneMessageQueue。前面说过在producer初始化时,会启动定时任务获取路由信息并更新到本地缓存,所以tryToFindTopicPublishInfo会首先从缓存中获取topic路由信息,如果没有获取到,则会自己去namesrv获取路由信息。selectOneMessageQueue方法通过轮询的方式,返回一个队列,以达到负载均衡的目的。

    如果Producer发送消息失败,会自动重试,重试的策略:

    1. 重试次数 < retryTimesWhenSendFailed(可配置)
    2. 总的耗时(包含重试n次的耗时) < sendMsgTimeout(发送消息时传入的参数)
    3. 同时满足上面两个条件后,Producer会选择另外一个队列发送消息
    • 接收端负载均衡

    Producer向一些队列轮流发送消息,队列集合称为TopicConsumer如果做广播消费,则一个consumer实例消费这个Topic对应的所有队列;如果做集群消费,则多个Consumer实例平均消费这个Topic对应的队列集合。

    上图的集群模式里,每个consumer消费部分消息,这里的负载均衡是怎样的呢?

    消费端会通过RebalanceService线程,20秒钟做一次基于topic下的所有队列负载:

    1. 遍历Consumer下的所有topic,然后根据topic订阅所有的消息
    2. 获取同一topic和Consumer Group下的所有Consumer
    3. 然后根据具体的分配策略来分配消费队列,分配的策略包含:平均分配、消费端配置等

    如同上图所示:如果有 3 个队列,2 个 consumer,那么第一个 Consumer 消费 2 个队列,第二 consumer 消费 1 个队列。这里采用的就是平均分配策略,它类似于我们的分页,TOPIC下面的所有queue就是记录,Consumer的个数就相当于总的页数,那么每页有多少条记录,就类似于某个Consumer会消费哪些队列。

    通过这样的策略来达到大体上的平均消费,这样的设计也可以很方面的水平扩展Consumer来提高消费能力。

    消息重试

    消费失败就会重投递:
    1、返回 ConsumeConcurrentlyStatus.RECONSUME_LATER
    2、返回 null
    3、抛出异常(如果是被捕获的异常,则不会进行消息重试。)

    RocketMQ 并不会无限制地重试下去,默认每条消息最多重试 16 次。
    如果消息重试 16 次之后还是消费失败怎么办呢?那么消息就不会再投递给消费者,而是将消息放到相对应的死信队列中。这时候我们就需要对死信队列的消息做一些人工补偿处理,因为这些消息可能本身就有问题,也有可能和消费逻辑调用的服务有关等,所以需要人工判断之后再进行处理。

    相关文章

      网友评论

          本文标题:RocketMQ学习

          本文链接:https://www.haomeiwen.com/subject/ztoxlqtx.html