美文网首页
RocketMQ生产者核心知识

RocketMQ生产者核心知识

作者: JBryan | 来源:发表于2020-04-26 14:53 被阅读0次

本篇文章代码基于:SpringBoot整合RocketMQhttps://www.jianshu.com/p/0028969d5e17

1、生产者核心配置

compressMsgBodyOverHowmuch :消息超过默认字节4096后进行压缩
retryTimesWhenSendFailed : 失败重发次数
maxMessageSize : 最大消息配置,默认128k
topicQueueNums : 主题下面的队列数量,默认是4

autoCreateTopicEnable : 是否自动创建主题Topic, 开发建议为true,生产要为false
defaultTopicQueueNums : 自动创建服务器不存在的topic,默认创建的队列数
autoCreateSubscriptionGroup: 是否允许 Broker 自动创建订阅组,建议线下开发开启,线上关闭
brokerClusterName : 集群名称

brokerId : 0表示Master主节点 大于0表示从节点
brokerIP1 : Broker服务地址
brokerRole : broker角色 ASYNC_MASTER/ SYNC_MASTER/ SLAVE
deleteWhen : 每天执行删除过期文件的时间,默认每天凌晨4点

flushDiskType :刷盘策略, 默认为 ASYNC_FLUSH(异步刷盘), 另外是SYNC_FLUSH(同步刷盘)
listenPort : Broker监听的端口号
mapedFileSizeCommitLog : 单个conmmitlog文件大小,默认是1GB
mapedFileSizeConsumeQueue:ConsumeQueue每个文件默认存30W条,可以根据项目调整
storePathRootDir : 存储消息以及一些配置信息的根目录 默认为用户的 ${HOME}/store

storePathCommitLog:commitlog存储目录默认为${storePathRootDir}/commitlog
storePathIndex: 消息索引存储路径
syncFlushTimeout : 同步刷盘超时时间
diskMaxUsedSpaceRatio : 检测可用的磁盘空间大小,超过后会写入报错

2、消息发送状态

RocketMQ源码中,有一个SendStatus枚举类,定义了消息投递的状态

package org.apache.rocketmq.client.producer;

public enum SendStatus {
    SEND_OK,
    FLUSH_DISK_TIMEOUT,
    FLUSH_SLAVE_TIMEOUT,
    SLAVE_NOT_AVAILABLE,
}

FLUSH_DISK_TIMEOUT
没有在规定时间内完成刷盘 (刷盘策略需要为SYNC_FLUSH 才会出这个错误)

FLUSH_SLAVE_TIMEOUT
主从模式下,broker是SYNC_MASTER, 没有在规定时间内完成主从同步

SLAVE_NOT_AVAILABLE
从模式下,broker是SYNC_MASTER, 但是没有找到被配置成Slave的Broker

SEND_OK
发送成功,没有发生上面的三种问题

3、生产和消费消息重试及处理

3.1、生产者Producer重试

消息重投(保证数据的高可靠性),本身内部支持重试,默认次数是2

//生产者投递消息重复次数
producer.setRetryTimesWhenSendFailed(3);
3.2、消费端重试

原因:消息处理异常、broker端到consumer端各种问题,如网络原因闪断,消费处理失败,ACK返回失败等等问题。

package com.ljessie.rocketmqdemo.jms;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.List;

@Component
public class PayConsumer {

    private DefaultMQPushConsumer consumer;

    public PayConsumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer(JMSConfig.consumerGroup);
        consumer.setNamesrvAddr(JMSConfig.nameServer);
        //从队首开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //订阅主题,第二个参数*代表注册这个主题下面的所有tag
        consumer.subscribe(JMSConfig.TOPIC,"*");
        //默认是集群模式,可以更改为广播,更改为广播之后,不支持重试
//        consumer.setMessageModel(MessageModel.BROADCASTING);
        //配置消费监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                MessageExt message = list.get(0);
                int retryTimes = message.getReconsumeTimes();
                try {
                    System.out.println("重试次数:"+retryTimes);
                    if(1==1){
                        throw new Exception();
                    }
                    System.out.println("Consumer消费成功:"+new String(message.getBody(),"utf-8"));
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    System.out.println("消费异常");
                    //如果重复次数超过两次不成功,则人工介入。广播方式不提供重试机制。
                    if(retryTimes >= 2){
                        System.out.println("复次数超过两次不成功,人工介入");
                        //告诉Broker消费成功
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }

            }
        });
        consumer.start();
    }
}

控制台打印:

java.lang.Exception
重试次数:0
    at com.ljessie.rocketmqdemo.jms.PayConsumer$1.consumeMessage(PayConsumer.java:40)
消费异常
    at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:417)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
重试次数:1
java.lang.Exception
消费异常
    at com.ljessie.rocketmqdemo.jms.PayConsumer$1.consumeMessage(PayConsumer.java:40)
    at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:417)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
重试次数:2
消费异常
复次数超过两次不成功,人工介入

4、异步发送消息和回调

异步发送:不会重试,发送总次数等于1。
在PayController添加下面的方法:

/**
     * 异步发送消息
     * @param txt
     * @return
     */
    @RequestMapping("async")
    public  Object async(String txt){
        Message message = new Message(JMSConfig.TOPIC,"tag_async",("Hello Async RocketMQ:"+txt).getBytes());
        //设置延迟消息
//        message.setDelayTimeLevel(2);
        try {
                payProducer.getProducer().send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("Producer发送成功:"+sendResult);
                }

                @Override
                public void onException(Throwable throwable) {
                    //补偿机制,根据业务需求,设置是否重试
                }
            });
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

5、OneWay发送消息及多种场景对比

SYNC :同步
应用场景:重要通知邮件、报名短信通知、营销短信系统等

ASYNC :异步
应用场景:对RT时间敏感,可以支持更高的并发,回调成功触发相对应的业务,比如注册成功后通知积分系统发放优惠券

ONEWAY : 无需要等待响应
官方文档:https://rocketmq.apache.org/docs/simple-example/
使用场景:主要是日志收集,适用于某些耗时非常短,但对可靠性要求并不高的场景, 也就是LogServer, 只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。

6、延迟消息

Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到 Consumer 进行消费,该消息即定时消息,目前支持固定精度的消息。

延迟消息的级别 MessageStoreConfig.java 属性 messageDelayLevel

"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

设置第2个级别,会立刻发送成功,但是在5秒后,Consumer才会消费此消息。

 message.setDelayTimeLevel(2);

使用场景
通过消息触发一些定时任务,比如在某一固定时间点向用户发送提醒消息

消息生产和消费有时间窗口要求:比如在交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略。

7、指定消息队列MessageQueueSelector

生产消息使用MessageQueueSelector投递到Topic下指定的queue。
PayController添加下面代码

 /**
     * 指定消息队列
     * @return
     */
    @RequestMapping("queue_selector")
    public Object queueSelector(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        Message message = new Message(JMSConfig.TOPIC,"tag_msg",("Hello MsgQueueSelector RocketMQ:"+text).getBytes());

        //第三个参数指定第几个队列
        SendResult sendResult = payProducer.getProducer().send(message, new MessageQueueSelector() {
            /**
             *第三个参数Object,就是send()方法里面的第三个参数2
             */
            @Override
            public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                int queueNum = Integer.valueOf(o.toString());
                return list.get(queueNum);
            }
        },2);
        System.out.println("Producer发送成功:"+sendResult);
        return sendResult;
    }

访问:http://localhost:8080/api/queue_selector?text=77777
控制台打印:

Producer发送成功:SendResult [sendStatus=SEND_OK, msgId=C0A80A67677C18B4AAC2831E75F50000, offsetMsgId=C0A80A6900002A9F000000000002E758, messageQueue=MessageQueue [topic=JESSIE_PAY_TEST_TOPIC, brokerName=broker-a, queueId=2], queueOffset=1]
重试次数:0
Consumer消费成功:Hello MsgQueueSelector RocketMQ:77777

queueId = 2,消息投递到第三个队列里面去了。


指定消息队列.jpg

控制台里面,查看Topic的status,第三个队列中的maxOffset加了1。

8、顺序消息

顺序消息:消息的生产和消费顺序一致。

全局顺序:topic下面全部消息都要有序(少用)
性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景,并行度成为消息系统的瓶颈, 吞吐量不够.
在证券处理中,以人民币兑换美元为例子,在价格相同的情况下,先出价者优先处理,则可以通过全局顺序的方式按照 FIFO 的方式进行发布和消费

局部顺序:只要保证一组消息被顺序消费即可(RocketMQ使用)
性能要求高
电商的订单创建,同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息、订单交易成功消息 都会按照先后顺序来发布和消费。

顺序发布:对于指定的一个 Topic,客户端将按照一定的先后顺序发送消息
顺序消费:对于指定的一个 Topic,按照一定的先后顺序接收消息,即先发送的消息一定会先被客户端接收到。

注意:
顺序消息暂不支持广播模式
顺序消息不支持异步发送方式,否则将无法严格保证顺序

8.1、生产者

生产端保证发送消息有序,且发送到同一个Topic的同个queue里面
例子:订单的顺序流程是:创建、付款、物流、完成,订单号相同的消息会被先后发送到同一个队列中
ProductOrder 模拟订单

package com.ljessie.rocketmqdemo.domain;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

/**
 * 简写订单
 */
public class ProductOrder implements Serializable {
    private long orderId;
    private String type;

    public ProductOrder(){}

    public ProductOrder(long orderId, String type) {
        this.orderId = orderId;
        this.type = type;
    }

    public long getOrderId() {
        return orderId;
    }

    public void setOrderId(long orderId) {
        this.orderId = orderId;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    /**
     * 创建模拟数据
     * @return
     */
    public static List<ProductOrder> getProductOrderList(){
        List<ProductOrder> list = new ArrayList<>();
        list.add(new ProductOrder(1,"创建订单"));
        list.add(new ProductOrder(2,"创建订单"));
        list.add(new ProductOrder(1,"支付订单"));
        list.add(new ProductOrder(2,"支付订单"));
        list.add(new ProductOrder(3,"创建订单"));
        list.add(new ProductOrder(1,"完成订单"));
        list.add(new ProductOrder(3,"支付订单"));
        list.add(new ProductOrder(2,"完成订单"));
        list.add(new ProductOrder(3,"完成订单"));
        return list;
    }

    @Override
    public String toString() {
        return "ProductOrder{" +
                "orderId=" + orderId +
                ", type='" + type + '\'' +
                '}';
    }
}

PayController添加代码

/**
     * 把同一个订单ID的不同消息,投放到相同的消息队列里面
     * @return
     * @throws InterruptedException
     * @throws RemotingException
     * @throws MQClientException
     * @throws MQBrokerException
     */
    @RequestMapping("send_msg_orderly")
    public Object sendByOrder() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        List<ProductOrder> productOrders = ProductOrder.getProductOrderList();
        List<SendResult> sendResults = new ArrayList<>();
        for(int i = 0; i<productOrders.size();i++){
            ProductOrder order = productOrders.get(i);
            Message message = new Message(JMSConfig.ORDERLY_TOPIC,"",
                    order.getOrderId()+"",order.toString().getBytes());
            SendResult sendResult = payProducer.getProducer().send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                    long id = (long) o;
                    int index = (int) (id % list.size());
                    return list.get(index);
                }
            },order.getOrderId());
            sendResults.add(sendResult);
            System.out.println("prodoctOrder:"+order.toString()+"----->msgQueue:"+sendResult.getMessageQueue());
        }
        return sendResults;
    }
8.2、消费者
package com.ljessie.rocketmqdemo.jms;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.List;

@Component
public class PayOrderlyConsumer {

    private DefaultMQPushConsumer consumer;

    public PayOrderlyConsumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer(JMSConfig.consumerOrdlerlyGroup);
        consumer.setNamesrvAddr(JMSConfig.nameServer);
        //消费地点,从最后一个开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        //订阅主题
        consumer.subscribe(JMSConfig.ORDERLY_TOPIC,"*");
        //配置消费监听器
        consumer.registerMessageListener(new MessageListenerOrderly() {

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeConcurrentlyContext) {
                MessageExt message = list.get(0);
                int retryTimes = message.getReconsumeTimes();
                try {
                    System.out.println("Consumer消费成功:"+new String(message.getBody(),"utf-8"));
                } catch (Exception e) {
                    System.out.println("重试次数:"+retryTimes);
                    //如果重复次数超过两次不成功,则人工介入。广播方式不提供重试机制。
                    if(retryTimes >= 2){
                        //告诉Broker消费成功
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                    e.printStackTrace();
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
    }
}

相关文章

网友评论

      本文标题:RocketMQ生产者核心知识

      本文链接:https://www.haomeiwen.com/subject/gdarwhtx.html