RocketMQ

作者: 追风还是少年 | 来源:发表于2023-08-23 23:45 被阅读0次

RocketMQ组成

image.png

RocketMQ由NameServer、Broker、Producer、Consumer组成:

  • NameServer
    NameServer节点之间无任何信息同步,topic和路由信息管理
  • Broker
    master与slave的对应关系通过指定相同brokerName、不同brokerId来定义,brokerId为0表示master,非0表示slave
    每个broker与NameServer集群中的所有节点建立长连接,定时注册topic信息到所有NameServer
  • Producer
    与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取topic路由信息,并向提供topic服务的master建立长连接,且定时向master发送心跳
  • Consumer
    与NameServerr集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取topic路由信息,并向提供topic服务的master、slave建立长连接,且定时向master、slave发送心跳,即可从master订阅消息,也可从slave订阅消息,由Broker配置决定

Producer Group作用

主要的用途是事务消息,Broker 需要向消息发送者回查事务状态

顺序消息

消息的顺序性分为两部分,生产顺序性和消费顺序性

Apache RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。
如需保证消息生产的顺序性,则必须满足以下条件:

  • 单一生产者
    消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的消息组,不同生产者之间产生的消息也无法判定其先后顺序。

  • 串行发送
    Apache RocketMQ 生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。

满足以上条件的生产者,将顺序消息发送至 Apache RocketMQ 后,会保证设置了同一消息组的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:

  • 相同消息组的消息按照先后顺序被存储在同一个队列。

  • 不同消息组的消息可以混合在同一个队列中,且不保证连续。

Apache RocketMQ 通过消费者和服务端的协议保障消息消费严格按照存储的先后顺序来处理。
由于顺序消息都存储在同一个队列里,而同一个队列只会由消费组中的一个消费者消费
如需保证消息消费的顺序性,则必须满足以下条件:

  • 投递顺序
    Apache RocketMQ 通过客户端SDK和服务端通信协议保障消息按照服务端存储顺序投递,但业务方消费消息时需要严格按照接收---处理---应答的语义处理消息,避免因异步处理导致消息乱序。

  • 有限重试
    Apache RocketMQ 顺序消息投递仅在重试次数限定范围内,即一条消息如果一直重试失败,超过最大重试次数后将不再重试,跳过这条消息消费,不会一直阻塞后续消息处理。
    对于需要严格保证消费顺序的场景,请务设置合理的重试次数,避免参数不合理导致消息乱序。

顺序消息的缺陷:

  • 发送顺序消息无法利用集群的Failover特性,因为不能更换MessageQueue进行重试

  • 因为发送的路由策略导致的热点问题,可能某一些MessageQueue的数据量特别大

  • 消费的并行读依赖于分区数量
    一个队列,只能被一个消费者串行消费,不能多线程消费

使用建议:

  • 串行消费,避免批量消费导致乱序
    消息消费建议串行处理,避免一次消费多条消费,否则可能出现乱序情况。
    例如:发送顺序为1->2->3->4,消费时批量消费,消费顺序为1->23(批量处理,失败)->23(重试处理)->4,此时可能由于消息3的失败导致消息2被重复处理,最后导致消息消费乱序。

  • 消息组尽可能打散,避免集中导致热点
    Apache RocketMQ 保证相同消息组的消息存储在同一个队列中,如果不同业务场景的消息都集中在少量或一个消息组中,则这些消息存储压力都会集中到服务端的少量队列或一个队列中。容易导致性能热点,且不利于扩展。一般建议的消息组设计会采用订单ID、用户ID作为顺序参考,即同一个终端用户的消息保证顺序,不同用户的消息无需保证顺序
    因此建议将业务以消息组粒度进行拆分,例如,将订单ID、用户ID作为消息组关键字,可实现同一终端用户的消息按照顺序处理,不同用户的消息无需保证顺序。

public class SelectMessageQueueByHash implements MessageQueueSelector {
    public SelectMessageQueueByHash() {
    }

    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object shardingKey) {
        int value = shardingKey.hashCode();
        if (value < 0) {
            value = Math.abs(value);
        }

        value %= mqs.size();
        return (MessageQueue)mqs.get(value);
    }
}
SendResult sendResultRMQ = this.defaultMQProducer.send(msgRMQ 
, new SelectMessageQueueByHash ()
, shardingKey);

事务消息

事务消息支持在分布式场景下保障消息生产和本地事务的最终一致性
事务消息交互流程如下图所示:

image.png

使用限制:

  • 消费事务性
    Apache RocketMQ 事务消息保证本地主分支事务和下游消息发送事务的一致性,但不保证消息消费结果和上游事务的一致性。因此需要下游业务分支自行保证消息正确处理,建议消费端做好消费重试,如果有短暂失败可以利用重试机制保证最终处理成功。

  • 中间状态可见性
    Apache RocketMQ 事务消息为最终一致性,即在消息提交到下游消费端处理完成之前,下游分支和上游事务之间的状态会不一致。因此,事务消息仅适合接受异步执行的事务场景。

  • 事务超时机制
    Apache RocketMQ 事务消息的命周期存在超时机制,即半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。

事务消息涉及的主题:

  • RMQ_SYS_TRANS_HALF_TOPIC
    prepare消息的主题,事务消息首先先进入到该主题。

  • RMQ_SYS_TRANS_OP_HALF_TOPIC
    当消息服务器收到事务消息的提交或回滚请求后,会将消息存储在该主题下。

回查流程:


image.png

事务消息在RocketMQ中处理流程:


image.png image.png
@Component
@Slf4j
public class OrderTransactionalListener implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        log.info("开始执行本地事务....");
        LocalTransactionState state;
        try{
            String body = new String(message.getBody());
            OrderDTO order = JSONObject.parseObject(body, OrderDTO.class);
            orderService.createOrder(order,message.getTransactionId());
            state = LocalTransactionState.COMMIT_MESSAGE;
            log.info("本地事务已提交。{}",message.getTransactionId());
        }catch (Exception e){
            log.error("执行本地事务失败。{}",e);
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return state;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        log.info("开始回查本地事务状态。{}",messageExt.getTransactionId());
        LocalTransactionState state;
        String transactionId = messageExt.getTransactionId();
        if (transactionLogService.get(transactionId)>0){
            state = LocalTransactionState.COMMIT_MESSAGE;
        }else {
            state = LocalTransactionState.UNKNOW;
        }
        log.info("结束本地事务状态查询:{}",state);
        return state;
    }
}
@Component
@Slf4j
public class TransactionalMsgProducer implements InitializingBean, DisposableBean {
    private String GROUP = "order_transactional";
    private TransactionMQProducer msgProducer;
    //用于执行本地事务和事务状态回查的监听器
    @Autowired
    private OrderTransactionalListener orderTransactionListener;
    //执行任务的线程池
    private ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
            TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(50));
    private void start(){
        try {
            this.msgProducer.start();
        } catch (MQClientException e) {
            log.error("msg producer starter occur error;",e);
        }
    }
    private void shutdown() {
        if(null != msgProducer) {
            try {
                msgProducer.shutdown();
            } catch (Exception e) {
                log.error("producer shutdown occur error;",e);
            }
        }
    }
    public TransactionSendResult send(String data, String topic) throws MQClientException {
        Message message = new Message(topic,data.getBytes());
        return this.msgProducer.sendMessageInTransaction(message, null);
    }
    @Override
    public void afterPropertiesSet() throws Exception {
        msgProducer = new TransactionMQProducer(GROUP);
        msgProducer.setNamesrvAddr("namesrvHost:ip");
        msgProducer.setSendMsgTimeout(Integer.MAX_VALUE);
        msgProducer.setExecutorService(executor);
        msgProducer.setTransactionListener(orderTransactionListener);
        this.start();
    }
    @Override
    public void destroy() throws Exception {
        this.shutdown();
    }
}
@Service
@Slf4j
public class OrderService {
    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private  TransactionLogMapper transactionLogMapper;
    @Autowired
    private TransactionalMsgProducer producer;
    //执行本地事务时调用,将订单数据和事务日志写入本地数据库
    @Transactional
    @Override
    public void createOrder(OrderDTO orderDTO,String transactionId){
        //1.创建订单
        Order order = new Order();
        BeanUtils.copyProperties(orderDTO,order);
        orderMapper.createOrder(order);
        //2.写入事务日志
        TransactionLog log = new TransactionLog();
        log.setId(transactionId);
        log.setBusiness("order");
        log.setForeignKey(String.valueOf(order.getId()));
        transactionLogMapper.insert(log);
        log.info("create order success,order={}",orderDTO);
    }
    //前端调用,只用于向RocketMQ发送事务消息
    @Override
    public void createOrder(OrderDTO order) throws MQClientException {
        order.setId(snowflake.nextId());
        order.setOrderNo(snowflake.nextIdStr());
        producer.send(JSON.toJSONString(order),"order");
    }
}

定时/延时消息

当消息写入到Broker后,不能立刻被消费者消费,需要等待指定的时长后才可被消费处理的消息,称为延时消息。

延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。默认支持18个等级的延迟消息,延时等级定义为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,设置延时等级的时候是设置延迟时间对应的序号(从1开始)

延时消息写入Commitlog文件的时,是先写入SCHEDULE_TOPIC_XXX主题的对应延时等级队列中的,SCHEDULE_TOPIC_XXX一共有18个消息队列,分别对应每个延时等级,每个消息队列的queueId=延时等级-1


image.png

延时消息处理过程:

  • 1.发送消息
    修改消息Topic名称和队列信息并存储到CommitLog文件
  • 2.投递
    转发消息到延迟主题的CosumeQueue中
  • 3.消息
    延迟服务消费SCHEDULE_TOPIC_XXXX消息
    1. 把消息复原并存储
      将信息重新存储到CommitLog中
  • 5.重新投递
    将消息投递到目标Topic中
  • 6.消费
    消费者消费目标topic中的数据
Message msg = new Message("TopicTest" ,"TagA" ,("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET) );
// 设置延时消息的级别
msg.setDelayTimeLevel(2);

重试消息

消息收发过程中,若Consumer消费某条消息失败或消费超时,则会在重试间隔时间后,将消息重新投递给Consumer消费,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列。您可以通过消费死信队列中的死信消息来恢复业务异常。
重试消息Topic为%RETRY%{ConsumeGroup}

消息重试主要功能行为包括:

  • 重试间隔:上一次消费失败或超时后,距下次消息可被重新消费的间隔时间。
    1.无序消息
    间隔时间根据重试次数阶梯变化,取值范围:10秒~2小时。不支持自定义配置。
    (1)若最大重试次数小于等于16次,则每次重试的间隔时间会阶梯变化,具体时间见下面表格
    (2)若最大重试次数大于16次,则超过16次的间隔时间均为2小时。
    2.顺序消息
    间隔时间可通过自定义参数suspendTimeMillis取值进行配置。参数取值范围:10~30000,单位:毫秒,默认值:1000毫秒,即1秒。
  • 最大重试次数:消息消费失败后,可被重复投递的最大次数。
    1.无序消息
    最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。默认值为16次,该参数取值无最大限制,建议使用默认值。
    2.顺序消息
    最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。该参数取值无最大限制。若未设置参数值,默认最大重试次数为Integer.MAX。

配置采用覆盖的方式生效,即最后启动的Consumer实例会覆盖之前启动的实例的配置。因此,请确保同一Group ID下的所有Consumer实例设置的最大重试次数和重试间隔相同,否则各实例间的配置将会互相覆盖

image.png

一条消息无论重试多少次,这些重试消息的Message ID都不会改变。
消息重试只针对集群消费模式生效。
广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

集群消费模式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):

  • 方式1:返回ConsumeConcurrentlyStatus.RECONSUME_LATER(推荐)
  • 方式2:返回Null
  • 方式3:抛出异常
consumer.setMaxReconsumeTimes(20);

死信队列

死信队列用于处理无法被正常消费的消息,即死信消息
当一条消息初次消费失败,RocketMQ会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,RocketMQ不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中
在RocketMQ中,这种正常情况下无法被消费的消息称为死信消息,存储死信消息的特殊队列称为死信队列,死信消息的Topic为%DLQ%{ConsumerGroup}。

死信消息具有以下特性:

  • 不会再被消费者正常消费。
  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。

死信队列具有以下特性:

  • 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
  • 如果一个 Group ID 未产生死信消息,消息队列RocketMQ版不会为其创建相应的死信队列。
  • 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。

一条消息进入死信队列,意味着某些因素导致消费者无法正常消费该消息,因此,通常需要您对其进行特殊处理。排查可疑因素并解决问题后,您可以在RocketMQ控制台重新发送该消息,让消费者重新消费一次。

负载均衡

producer发送消息的负载均衡

对于非顺序消息(普通消息、定时/延时消息、事务消息)场景,默认且只能使用RoundRobin模式的负载均衡策略。
(1)轮询模式(默认),RoundRobin
RoundRobin模式下,生产者发送消息时,以消息为粒度,按照轮询方式将消息依次发送到指定主题中的所有可写目标队列中,保证消息尽可能均衡地分布到所有队列。


image.png

consumer订阅消息的负载均衡

image.png

负载均衡策略算法:
(1)平均负载(默认策略),AllocateMessageQueueAveragely
分配方式类似分页,对topic下的所有MessageQueue进行排序,对同一个消费组的所有ConsumerId进行排序
MessageQueue作为需要分页的记录,Consumer作为页码,计算每页多少个MessageQueue,每页有哪些MessageQueue
(2)环形平均负载,AllocateMessageQueueAveragelyByCircle
每个Consumer分配到MessageQueue的个数与平均负载相同,只是每个Consumer不是分配到连续的MessageQueue
(3)用户自定义配置,AllocateMessageQueueByConfig
(4)机房负载策略,AllocateMessageQueueByMachineRoom
(5)机房负载策略改进版本,AllocateMachineRoomNearBy
(6)一致性哈希策略,AllocateMessageQueueConsistentHash


image.png

一致性哈希有个哈希环的概念,哈希环由0到2^31-1,哈希上的点都是虚拟的,将所有的Consumer使用Consumer的Id进行哈希计算,得到是哈希环上的点,然后把点存储到TreeMap里,将所有的MessageQueue一次进行相同的哈希计算,按顺时针方向找到距离计算出的哈希值最近的Consumer点,MessageQueue最终就归属这个Consumer

负载均衡策略由每个消费者自己执行计算的
负载均衡触发时机:

  • 消费者启动
    每当实例的数量有变更,都会触发一次消费组的所有实例的负载均衡
  • 定时任务,每隔20s执行

消息存储

消息存储的整体架构 消息存储目录结构 每条消息格式

RocketMQ存储的文件主要有Commitlog文件、ConsumeQueue文件、IndexFile文件,ConsumeQueue文件和IndexFile文件是基于Commitlog文件异步生成的:

  • Commitlog文件
    RocketMQ 将所有主题的消息存储在同一个文件 ,确保消息发送时顺序写文件,尽最大的能力确保消息发送的高性能与高吞吐量
    文件名为文件的第一个消息的物理偏移量,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。
    因为消息是一条一条写入到 commitlog 文件 ,写入完成后,我们可以得到这条消息的物理偏移量。
    每条消息的物理偏移量是唯一的, commitlog 文件名是递增的,可以根据消息的物理偏移量通过二分查找,定位消息位于那个文件中,并获取到消息实体数据。

  • ConsumeQueue文件
    用于方便、快速基于topic检索消息并消费消息。
    每个topic包含多个消息队列,每个消息队列有多个ConsumerQueue。
    consumequeue文件名由20位数字构成,表示当前文件的第一个索引条目的起始偏移量。与commitLog文件名不同的是,consumequeue后续文件名是固定的,由于consumequeue文件大小是固定不变的。
    ConsumeQueue文件采取定长设计,每一个条目共20个字节,分别为8字节的CommitLog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M。
    MessageQueue与ConsumeQueue为1:N关系

  • IndexFile文件
    加速消息的检索性能,根据消息属性快速从Commitlog文件文件中检索消息。
    IndexFile文件名为文件创建的时间戳,单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引条目,索引条目只存储消息key的HashCode而不存消息key,是为了将索引条目设计为定长,方便检索与定位条目,IndexFile的底层存储设计为在文件系统中实现HashMap结构
    indexFile的具体结构,主要包含三部分:索引头Header、槽位表SlotTable(500W个槽位)、索引链表index list(2000W个索引数据)


    image.png
image.png

消息查询

  • 按照Message Id查询消息
    RocketMQ中的MessageId的长度总共有16字节,其中包含了消息存储主机地址(IP地址和端口),消息Commit Log offset。
    “按照MessageId查询消息”在RocketMQ中具体做法是:
    (1)Client端从MessageId中解析出Broker的地址(IP地址和端口)和Commit Log的偏移地址后封装成一个RPC请求后通过Remoting通信层发送(业务请求码:VIEW_MESSAGE_BY_ID)。
    (2)Broker端走的是QueryMessageProcessor,读取消息的过程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的记录并解析成一个完整的消息返回。
  • 按照Message Key查询消息
    主要通过Broker端的QueryMessageProcessor业务处理器来查询,读取消息的过程
    (1)找槽位=40byte +hash(topic + "#" + key) %500W*4byte ,槽位值slotValue=最新插入index的位置
    (2)遍历单向链表:从slotValue找到最新index在整个索引文件中位置=40byte +500w * 4byte + slotValue * 20byte,然后根据单个索引文件的pre index值找到前一个索引,一直遍历下去。index数据中key hash和时间区间都满足,则匹配。添加到 List<Long> phyOffsets(commitLog的偏移量list)中。最终根据其中的commitLog offset从CommitLog文件中读取消息的实体内容。


    image.png
    image.png

消费进度

  • 广播消息消费时,消费进度由消费者本地保存
    本地存储路径为[{rocketmq.client.localOffsetStoreDir}|{user.home}/.rocketmq_offsets]/clientId/consumerGroup/offsets.json
  • 集群消息消费时,消费进度由消费者主动上报给broker
    消费者消费消息后的消费进度更新是先更新本地内存,当消费者重新负载均衡活消费者关闭时,会上报消费进度到broker
    RemoteBrokerOffsetStore:
    private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
        new ConcurrentHashMap<MessageQueue, AtomicLong>();

消费者上报到broker的消费进度,在broker中也是先保存在在本地内存
ConsumerOffsetManager:

    private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
        new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);

broker中保存的消费进度通过在broker BrokerController 启动定时任务每隔5秒持
久化到文件/store/config/consumerOffset.json,该文件存储的json格式为


image.png
{
    "Topic@ConsumeGroup": {
        QueueId: Offset
    }
}
public class BrokerController {
    public boolean initialize() throws CloneNotSupportedException {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.consumerOffsetManager.persist();
                    } catch (Throwable e) {
                        log.error("schedule persist consumerOffset error.", e);
                    }
                }
            }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    }
}

消息刷盘

消息发送到broker后,broker不是直接把消息写到commitlog文件,而是写到操作系统的PageCache,之后再从PageCache刷盘到commitlog文件
消息刷盘方式有三种:

  • 同步刷盘
    使用GroupCommitService
  • 异步刷盘
    未开启 TransientStorePool 缓存,使用 FlushRealTimeService
    开启 TransientStorePool 缓存,使用 CommitRealService

高性能

  • 内存映射mmap
    mmap 把文件映射到用户空间里的虚拟内存,省去了从内核缓冲区复制到用户空间的过程,文件中的位置在虚拟内存中有了对应的地址,可以像操作内存一样操作这个文件,相当于已经把整个文件放入内存,但在真正使用到这些数据前却不会消耗物理内存,也不会有读写磁盘的操作,只有真正使用这些数据时。
    这个地址映射的过程,具体到代码层面,RocketMQ是利用JDK NIO包下的MappedByteBuffer.map()来实现的,其底层就是基于mmap技术。
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, position, fileSize)

使用Mmap的限制:
a.Mmap映射的内存空间释放的问题;由于映射的内存空间本身就不属于JVM的堆内存区(Java Heap),因此其不受JVM GC的控制,卸载这部分内存空间需要通过系统调用 unmap()方法来实现。然而unmap()方法是FileChannelImpl类里实现的私有方法,无法直接显示调用。RocketMQ中的做法是,通过Java反射的方式调用“sun.misc”包下的Cleaner类的clean()方法来释放映射占用的内存空间;
b.MappedByteBuffer内存映射大小限制;因为其占用的是虚拟内存(非JVM的堆内存),大小不受JVM的-Xmx参数限制,但其大小也受到OS虚拟内存大小的限制。一般来说,一次只能映射1.5~2G 的文件至用户态的虚拟内存空间,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了;
c.使用MappedByteBuffe的其他问题;会存在内存占用率较高和文件关闭不确定性的问题;


image.png image.png image.png
  • PageCache
    PageCache是OS对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写访问,这里的主要原因就是在于OS使用PageCache机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache
    (1)对于数据文件的读取
    如果一次读取文件时出现未命中PageCache的情况,OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取。这样,只要下次访问的文件已经被加载至PageCache时,读取操作的速度基本等于访问内存。
    (2)对于数据文件的写入
    OS会先写入至Cache内,随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。
    对于文件的顺序读写操作来说,读和写的区域都在OS的PageCache内,此时读写性能接近于内存。
    RocketMQ的大致做法是,将数据文件映射到OS的虚拟内存中(通过JDK NIO的MappedByteBuffer),写消息的时候首先写入PageCache,并通过异步刷盘的方式将消息批量的做持久化(同时也支持同步刷盘)。
    订阅消费消息时(对CommitLog操作是随机读取),由于PageCache的局部性热点原理且整体情况下还是从旧到新的有序读,因此大部分情况下消息还是可以直接从Page Cache中读取,不会产生太多的缺页(Page Fault)中断而从磁盘读取。


    image.png

PageCache机制也不是完全无缺点的,当遇到OS进行脏页回写,内存回收,内存swap等情况时,就会引起较大的消息读写延迟。
对于这些情况,RocketMQ采用了多种优化技术,比如内存预分配,文件预热,mlock系统调用等,来保证在最大可能地发挥PageCache机制优点的同时,尽可能地减少其缺点带来的消息读写延迟。

  • 预先分配MappedFile
    在消息写入过程中(调用CommitLog的putMessage()方法),CommitLog会先从MappedFileQueue队列中获取一个 MappedFile,如果没有就新建一个。
    这里,MappedFile的创建过程是将构建好的一个AllocateRequest请求(具体做法是,将下一个文件的路径、下下个文件的路径、文件大小为参数封装为AllocateRequest对象)添加至队列中,后台运行的AllocateMappedFileService服务线程(在Broker启动时,该线程就会创建并运行),会不停地run,只要请求队列里存在请求,就会去执行MappedFile映射文件的创建和预分配工作,分配的时候有两种策略,一种是使用Mmap的方式来构建MappedFile实例,另外一种是从TransientStorePool堆外内存池中获取相应的DirectByteBuffer来构建MappedFile(ps:具体采用哪种策略,也与刷盘的方式有关)。并且,在创建分配完下个MappedFile后,还会将下下个MappedFile预先创建并保存至请求队列中等待下次获取时直接返回。RocketMQ中预分配MappedFile的设计非常巧妙,下次获取时候直接返回就可以不用等待MappedFile创建分配所产生的时间延迟。
image.png
  • 文件预热、mlock系统调用
    (1)mlock系统调用:其可以将进程使用的部分或者全部的地址空间锁定在物理内存中,防止其被交换到swap空间。对于RocketMQ这种的高吞吐量的分布式消息队列来说,追求的是消息读写低延迟,那么肯定希望尽可能地多使用物理内存,提高数据读写访问的操作效率。
    (2)文件预热:预热的目的主要有两点;第一点,由于仅分配内存并进行mlock系统调用后并不会为程序完全锁定这些内存,因为其中的分页可能是写时复制的。因此,就有必要对每个内存页面中写入一个假的值。其中,RocketMQ是在创建并分配MappedFile的过程中,预先写入一些随机值至Mmap映射出的内存空间里。第二,调用Mmap进行内存映射后,OS只是建立虚拟内存地址至物理地址的映射表,而实际并没有加载任何文件至内存中。程序要访问数据时OS会检查该部分的分页是否已经在内存中,如果不在,则发出一次缺页中断。这里,可以想象下1G的CommitLog需要发生多少次缺页中断,才能使得对应的数据才能完全加载至物理内存中(ps:X86的Linux中一个标准页面大小是4KB)?RocketMQ的做法是,在做Mmap内存映射的同时进行madvise系统调用,目的是使OS做一次内存映射后对应的文件数据尽可能多的预加载至内存中,从而达到内存预热的效果。

高可用

相关文章

网友评论

      本文标题:RocketMQ

      本文链接:https://www.haomeiwen.com/subject/hcgrmdtx.html