美文网首页RocketMQ系列
RocketMQ系列(三):producer

RocketMQ系列(三):producer

作者: 范柏柏 | 来源:发表于2020-06-06 19:27 被阅读0次

rocketMq消息体

消息体.png

properties扩展中存了什么呢

  • tag: 消息tag,用于消息过滤

  • keys:message索引键,多个空格隔开,rocketMq可以根据这些key快速检索到消息。具体怎么快速检索,在消息存储章节细聊。

  • waitStoreMsgOk: 消息发送时是否等消息存储完成后再返回

  • delayTimeLevel: 消息延迟级别,用于定时消息和消息重试

producer启动过程

启动过程就是新建一个MQClientInstance实例。整个JVM实例中只存在一个MQClientInstance实例。

clientId为 本机ip + JVM线程id。
MQClientInstance作用是:与nameServer交互。网络请求,心跳检测。

发送消息过程

1、获取路由信息

路由信息其实就是消息队列列表
topic路由信息会缓存在producer中,以一个list变量的形式存在在内存中。
如果本地没有topic路由信息,向nameServer发送请求,获取路由信息,更新本地路由表。

所以问题了,本地有了,这个路由表什么时候更新呢?
起一个定时任务,每隔30s从nameServer获取topic路由表,更新本地路由。
producer会跟topic涉及的所有broker建立长连接,没隔30秒发送一个心跳。
broker端也会每10秒扫描一次注册的producer,如果2分钟没有心跳,则断开连接。

2、按照负载均衡策略,选择路由

2.1 默认投递方式:轮训

  • 对queue list进行排序
  • 获取一个全局自增的计数变量。获取一次自增一次。
  • 用这个变量对队列size取模
  • 模到了几,就选择哪个队列。因为变量是自增的,所以模的值也是根据队列size自增的,也就是轮训的。
  • 当自增值增加到int最大值后,该值重置为0

2.2 默认投递方式增强:轮训算法和延迟最小策略

默认的投递方式比较简单,但是也暴露了一个问题,就是有些queue可能由于自身数量积压等原因,可能投递的过程比较长,就尽量不要选择这样的queue了。rocketMq在每次发送一个MQ消息后,都会统计一下消息投递的时间延迟,根据这个时间延迟,可以知道往哪些queue投递的速度快。在这种场景下,会优先使用消息投递最小延迟策略。

2.3 顺序消息的投递方式

如果不用默认方式,可以自己选择MessageQueueSelector。
recketMq也提供了集中选择器实现,当然也可以自己实现。

生产者在消息投递的过程中,使用了 MessageQueueSelector 作为队列选择的策略接口,其定义如下:

package org.apache.rocketmq.client.producer;

import java.util.List;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

public interface MessageQueueSelector {
        /**
         * 根据消息体和参数,从一批消息队列中挑选出一个合适的消息队列
         * @param mqs  待选择的MQ队列选择列表
         * @param msg  待发送的消息体
         * @param arg  附加参数
         * @return  选择后的队列
         */
        MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
投递策略 策略实现类 说明
随机分配策略 SelectMessageQueueByRandom 使用了简单的随机数选择算法
基于Hash分配策略 SelectMessageQueueByHash 根据附加参数的Hash值,按照消息队列列表的大小取余数,得到消息队列的index
基于机器机房位置分配策略 SelectMessageQueueByMachineRoom 开源的版本没有具体的实现,基本的目的应该是机器的就近原则分配

hash代码实现

public class SelectMessageQueueByHash implements MessageQueueSelector {

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = arg.hashCode();
        if (value < 0) {
            value = Math.abs(value);
        }

        value = value % mqs.size();
        return mqs.get(value);
    }
}

3、根据选择出的路由,发送消息到broker

发送消息的三种方式

  • 可靠同步发送
    发送者执行发送消息api时,同步等待,直到消息服务器返回发送结果。

  • 可靠异步发送
    发送者执行发送消息api时,指定消息发送成功后的回调函数,然后立即返回,线程不阻塞,消息发送成功或失败,在回调函数(一个新的线程)中执行。

  • 单向发送
    发送者执行发送消息api时,直接返回,也没有回调函数。简单的说,就是只管发,不在乎消息是否成功存储在消息服务器上。

怎么控制是使用哪种方式??
在producer调用send函数的时候,有不同的send函数。

如何保证消息一定发送成功

rocketMq有重试机制。调用api的时候,会返回成功。如果返回不成功,则进行下一次投递,往下一个queue投。直到server端返回了成功。
如果在设置的重置次数用完了,还没成功。那就是真失败了。这个时候用户端就会感知了,会抛异常给用户端。

相关文章

网友评论

    本文标题:RocketMQ系列(三):producer

    本文链接:https://www.haomeiwen.com/subject/zeqgzhtx.html