美文网首页
RocketMQ 消息的类型

RocketMQ 消息的类型

作者: 黄靠谱 | 来源:发表于2019-02-07 16:00 被阅读56次

    参考资料

    阿里云官方英文、最新的Demo和Guidence
    http://rocketmq.apache.org/docs/transaction-example/

    阿里云的帮助文档啊,超级详细而且有Demo
    https://help.aliyun.com/document_detail/29551.html

    阿里云在github上的Demos(包括整合Spring 和简单TCP的形式)
    https://github.com/AliwareMQ/mq-demo

    消息的类型

    http://rocketmq.apache.org/docs/simple-example/

    按照发送的特点分:
    https://help.aliyun.com/document_detail/29547.html?spm=a2c4g.11186623.2.13.496e2379jnaAlt

    1. 同步消息
    2. 异步消息
    3. 单向消息

    按照使用功能特点分:

    1. 普通消息(订阅)
    2. 顺序消息
    3. 广播消息
    4. 延时消息
    5. 批量消息
    6. 事务消息

    同步发送(可靠)

    1. 同步发送,线程阻塞,投递completes阻塞结束
    2. 如果发送失败,会在默认的超时时间3秒内进行重试,最多重试2次
    3. 投递completes不代表投递成功,要check SendResult.sendStatus来判断是否投递成功
    //Send message in synchronous mode. This method returns only when the sending procedure totally completes.
    SendResult sendResult = producer.send(msg);
    
    1. SendResult里面有发送状态的枚举:SendStatus,同步的消息投递有一个状态返回值的
    public enum SendStatus {
        SEND_OK,
        FLUSH_DISK_TIMEOUT,
        FLUSH_SLAVE_TIMEOUT,
        SLAVE_NOT_AVAILABLE,
    }
    
    1. retry的实现原理:只有ack的SendStatus=SEND_OK才会停止retry
     int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
      for (; times < timesTotal; times++) {
            trySend();
            sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
            switch (communicationMode) {
                    case ASYNC:
                        return null;
                    case ONEWAY:
                        return null;
                    case SYNC:
                        if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                            if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                continue;
                            }
                        }
                        return sendResult;
                    default:
                        break;
                }
        }
    

    注意事项:发送同步消息且Ack为SEND_OK,只代表该消息成功的写入了MQ当中,并不代表该消息成功的被Consumer消费了

    异步发送(可靠)

    1. 异步调用的话,当前线程一定要等待异步线程回调结束再关闭producer啊,因为是异步的,不会阻塞,提前关闭producer会导致未回调链接就断开了
    2. 异步消息不retry,投递失败回调onException()方法,只有同步消息才会retry,源码参考 DefaultMQProducerImpl.class
    int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
    
    1. 异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.printf("%-10d OK %s %n", index,sendResult.getMsgId());
            }
            @Override
            public void onException(Throwable e) {
                System.out.printf("%-10d Exception %s %n", index, e);
                e.printStackTrace();
            }
        });               
    

    Oneway(不可靠,类似于UPD)

    1. 消息不可靠,性能高,只负责往服务器发送一条消息,不会重试也不关心是否发送成功
    2. 此方式发送消息的过程耗时非常短,一般在微秒级别
        for (int i = 0; i < 50; i++) {
             Message msg = new Message("TopicTest2" ,"TagA" ,("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));
             producer.sendOneway(msg);
        }
    

    延迟发送

    艿艿的博客
    https://blog.csdn.net/github_38592071/article/details/72230984
    延迟的机制是在 服务端实现的,也就是Broker收到了消息,但是经过一段时间以后才发送
    服务器按照1-N定义了如下级别: “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;若要发送定时消息,在应用层初始化Message消息对象之后,调用Message.setDelayTimeLevel(int level)方法来设置延迟级别,按照序列取相应的延迟级别,例如level=2,则延迟为5s

     msg.setDelayTimeLevel(2);
     SendResult sendResult = producer.send(msg);
    

    实现原理:

    1. 发送消息的时候如果消息设置了DelayTimeLevel,那么该消息会被丢到ScheduleMessageService.SCHEDULE_TOPIC这个Topic里面
    2. 根据DelayTimeLevel选择对应的queue
    3. 再把真实的topic和queue信息封装起来,set到msg里面
    4. 然后每个SCHEDULE_TOPIC_XXXX的每个DelayTimeLevelQueue,有定时任务去刷新,是否有待投递的消息
    5. 每 10s 定时持久化发送进度

    源码参考 :rocketmq-store.4.4.0.jar
    CommitLog.putMessage()

    1. 存储延迟消息
    if (msg.getDelayTimeLevel() > 0) {
        topic = ScheduleMessageService.SCHEDULE_TOPIC;
        queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
    
        // Backup real topic, queueId
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
        msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
    
        msg.setTopic(topic);
        msg.setQueueId(queueId);
     }
    
    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
    
    1. MQ对SCHEDULE_TOPIC_XXXX 每条消费队列对应单独一个定时任务进行轮询,发送 到达投递时间【计划消费时间】 的消息。
      ScheduleMessageService.java类里面有个内部类 DeliverDelayedMessageTimerTask,定时执行check待执行消息的功能,每个类有delayLevel、offset2个变量
      class DeliverDelayedMessageTimerTask extends TimerTask {
            private final int delayLevel;
            private final long offset;
    
            public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
                this.delayLevel = delayLevel;
                this.offset = offset;
            }
    
            @Override
            public void run() {
                    this.executeOnTimeup();
            }
    }
    

    批量发送

    • 需要rocketmq的版本要高,4.2以上是支持的,4.1不知道,4.0不支持
    • 提升性能,建议一次消息的大小不超过1M,超大的话,官网有方法Split
    • Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support.
        String topic = "TopicTest";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "TagA", "Order1", "Hello world 0".getBytes()));
        messages.add(new Message(topic, "TagA", "Order2", "Hello world 1".getBytes()));
        messages.add(new Message(topic, "TagA", "Order3", "Hello world 2".getBytes()));
        producer.send(messages);
    

    广播、集群订阅(只需要对接收端做控制即可)

    通过在Consumer做配置,实现广播消息,或者集群订阅消息(默认)

    consumer.setMessageModel(MessageModel.CLUSTERING); //默认
    consumer.setMessageModel(MessageModel.BROADCASTING);//广播模式
    

    相关文章

      网友评论

          本文标题:RocketMQ 消息的类型

          本文链接:https://www.haomeiwen.com/subject/rbfgsqtx.html