美文网首页
RocketMQ-基础使用(二)

RocketMQ-基础使用(二)

作者: 石头耳东 | 来源:发表于2022-05-28 20:29 被阅读0次

前置文章:
RocketMQ-基础使用(一),该文主要涉及MQ基础、RocketMQ安装&集群搭建、RocketMQ监控平台。

官方基础使用样例,很多基础内容其实官方文档都有很详细的说明。日常使用如果时间充足,还是推荐查看官方文档。学习官方文档是一个良好的习惯。

零、本文纲要

一、RocketMQ-基础使用

  1. 前置文章基础指令

二、RocketMQ-发送消息

  1. 发送同步消息
  2. 发送异步消息
  3. 发送单向消息

三、RocketMQ-接收消息

  1. 消息接收
  2. 消息接收-负载均衡【默认】
  3. 消息接收-广播模式

四、RocketMQ-消息类型

  1. 顺序消息
  2. 延迟消息
  3. 批量消息
  4. 过滤消息
  5. 事务消息

一、RocketMQ-基础使用

0. 前置文章基础指令

Ⅰ 启动RocketMQ的基础指令

# Start Name Server
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log

# Start Broker
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log 

指定自定义配置文件启动nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &

Ⅱ 关闭RocketMQ的基础指令

# Shutdown Servers
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv

二、RocketMQ-发送消息

发送同步消息 / 发送异步消息 / 发送单向消息

1. 发送同步消息

这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

  • ① 基础依赖
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.0</version>
</dependency>
  • ② 同步消息代码
/**
 * 发送同步消息
 */
public class SyncProducer {

    public static void main(String[] args) throws Exception {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2.指定Nameserver地址,多个NameServer则用“;”隔开
        producer.setNamesrvAddr("192.168.253.128:9876");
        //3.启动producer
        producer.start();

        for (int i = 0; i < 10; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数一:消息主题Topic
             * 参数二:消息Tag
             * 参数三:消息内容
             */
            Message msg = new Message("base", "tag1", ("Hello RocketMQ [" + i + "]").getBytes());
            //5.发送消息
            SendResult result = producer.send(msg);
            //发送状态
            SendStatus status = result.getSendStatus();
            String msgId = result.getMsgId();
            int queueId = result.getMessageQueue().getQueueId();
            System.out.printf("发送状态:%s,消息ID:%s,队列:%d%n", status, msgId, queueId);
            //线程睡1秒
            Thread.sleep(1000);
        }

        //6.关闭生产者producer
        producer.shutdown();
    }
}

截取控制台输出
发送状态:SEND_OK,消息ID:C0A8026AC05818B4AAC28D428B270000,队列:3

2. 发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

/**
 * 发送异步消息
 */
public class AsyncProducer {

    public static void main(String[] args) throws Exception {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group2");
        //2.指定Nameserver地址,多个NameServer则用“;”隔开
        producer.setNamesrvAddr("192.168.253.128:9876");
        //3.启动producer
        producer.start();

        for (int i = 0; i < 10; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数一:消息主题Topic
             * 参数二:消息Tag
             * 参数三:消息内容
             */
            Message msg = new Message("base", "tag2", ("AsyncMsg [" + i + "]").getBytes());
            //5.发送异步消息
            producer.send(msg, new SendCallback() {
                /**
                 * 发送成功的回调函数
                 * @param sendResult 发送结果
                 */
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("发送结果:" + sendResult);
                }

                /**
                 * 发送失败的回调函数
                 * @param throwable 发送异常
                 */
                @Override
                public void onException(Throwable throwable) {
                    System.out.println("发送异常:" + throwable);
                }
            });
            //线程睡1秒
            Thread.sleep(1000);
        }

        //6.关闭生产者producer
        producer.shutdown();
    }
}

与同步消息的不同之处在于通过回调函数来获取发送结果。

3. 发送单向消息

这种方式主要用在不特别关心发送结果的场景,比如:日志发送。

/**
 * 发送单向消息
 */
public class OneWayProducer {

    public static void main(String[] args) throws Exception, MQBrokerException {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group3");
        //2.指定Nameserver地址,多个NameServer则用“;”隔开
        producer.setNamesrvAddr("192.168.253.128:9876");
        //3.启动producer
        producer.start();

        for (int i = 0; i < 10; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数一:消息主题Topic
             * 参数二:消息Tag
             * 参数三:消息内容
             */
            Message msg = new Message("base", "tag3", ("OneWayMsg [" + i + "]").getBytes());
            //5.发送单向消息
            producer.send(msg);
            //线程睡1秒
            Thread.sleep(1000);
        }

        //6.关闭生产者producer
        producer.shutdown();
    }
}

三、RocketMQ-接收消息

1. 消息接收

/**
 * 消息的接受者
 */
public class Consumer {

    public static void main(String[] args) throws Exception {
        //1.创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group3");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr("192.168.253.128:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("base", "tag1");
        //设定消费模式:负载均衡|广播模式

        //4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            //接收消息内容
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
                                                            ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.println(list);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //5.启动消费者consumer
        consumer.start();
    }
}

2. 消息接收-负载均衡【默认】

消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同。【默认的消息消费方式】

consumer.setMessageModel(MessageModel.CLUSTERING);

3. 消息接收-广播模式

消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的。

consumer.setMessageModel(MessageModel.BROADCASTING);

四、RocketMQ-消息类型

1. 顺序消息

  • ① 基础分析

假定一个订单的顺序流程是:创建、付款、推送、完成。有张三、李四两人进行订单业务。

a、全局有序:
张三所有消息消费完,再消费李四消息,且内部有序;
一个Borker,一个MessageQueue;

b、局部有序:
只要保证各自消息内部的有序消费,交替消费两者的消息是可以的;
一个Borker,多个MessageQueue,一个MessageQueue对应一个订单。

所以,一般仅需保证局部有序即可。
实现方式:同一个用户的一个业务消息放到同一个队列,比如:订单号相同的消息进同一个队列。

  • ② 代码实现

Ⅰ 消息生产者producer的核心代码

/**
 * 参数一: 消息对象
 * 参数二: 消息队列选择器 MessageQueueSelector
 * 参数三: 选择队列业务标识,此处为订单ID
 */
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    /**
     *
     * @param list      消息队列
     * @param message   消息对象
     * @param o         业务标识的参数
     * @return 消息队列
     */
    @Override
    public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
        long orderId = (long) o;
        long index = orderId % list.size(); //订单ID一致,则取模结果一致,最终选择的队列一致
        return list.get((int) index);
    }
}, order.getOrderId());

Ⅱ 消息消费者consumer的核心代码

此处是通过有序消息监听MessageListenerOrderly来实现的

//4.注册消息监听器
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
        for (MessageExt messageExt : list) {
            System.out.println("线程名称:" + Thread.currentThread().getName() + " → " +
                    "消费消息:" + new String(messageExt.getBody()));
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

2. 延迟消息

使用场景:比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

延迟消息使用限制:

// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

Ⅰ 消息生产者producer的核心代码

msg.setDelayTimeLevel(2);

Ⅱ 消息消费者consumer的核心代码

无需调整。

注意:受限于网络情况,实际的延迟往往大于设置的延迟。

3. 批量消息

批量发送消息能显著提高传递小消息的性能。

限制:
a、相同的topic;
b、相同的waitStoreMsgOK;
c、不能是延时消息;
d、总大小不应超过4MB。

Ⅰ 消息生产者producer的核心代码

List<Message> messageList = new ArrayList<>();

Ⅱ 消息消费者consumer的核心代码

无需调整。

4. 过滤消息

一般过滤消息可通过 TAG / SQL92标准 来进行过滤

Ⅰ 消息生产者producer的核心代码

//方式一:通过Tag过滤的使用方法,消息发送方不做调整
//...

//方式二:通过sql过滤的使用方法,使用putUserProperty设置一些消息属性
msg.putUserProperty("a", String.valueOf(i));

Ⅱ 消息消费者consumer的核心代码

//方式一:通过Tag过滤的使用方法,consumer使用" || "分隔订阅不同的Tag即可
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

//方式二:通过sql过滤的使用方法,通过MessageSelector消息选择器的bySql方法过滤消息
consumer.subscribe("topic_sql_filter", MessageSelector.bySql("num > 5"));

方式二如果报错:The broker does not support consumer to filter message by SQL92
则需要在我们对应的Broker配置文件内做调整,添加enablePropertyFilter=true,重启服务即可生效。

关于SQL92基础语法,RocketMQ只定义了一些基本语法来支持这个特性:
数值比较,比如:>,>=,<,<=,BETWEEN,=;
字符比较,比如:=,<>,IN;
IS NULL 或者 IS NOT NULL;
逻辑符号 AND,OR,NOT;

常量支持类型为:
数值,比如:123,3.1415;
字符,比如:'abc',必须用单引号包裹起来;
NULL,特殊的常量
布尔值,TRUE 或 FALSE

注意:只有使用push模式的消费者才能用使用SQL92标准的sql语句。

5. 事务消息

在【分布式事务-可靠消息最终一致性】的解决方案内使用的就是事务消息。

事务消息流程:正常事务消息的发送及提交,以及事务消息的补偿【事务状态回查】;

事务状态:
LocalTransactionState.COMMIT_MESSAGE 提交状态 允许消费消息;
LocalTransactionState.ROLLBACK_MESSAGE 回滚状态 删除消息,不允许被消费;
LocalTransactionState.UNKNOW 中间状态 需要回查事务。

Ⅰ 消息生产者producer的核心代码

/**
 * 发送同步消息
 */
public class Producer {

    public static void main(String[] args) throws Exception {
        //1.创建消息生产者producer,并制定生产者组名
        TransactionMQProducer producer = new TransactionMQProducer("group1");
        //2.指定Nameserver地址,多个NameServer则用“;”隔开
        producer.setNamesrvAddr("192.168.253.128:9876");
        //3.设置消息事务的监听器
        producer.setTransactionListener(new TransactionListener() {
            /**
             * 在该方法中执行本地的事务
             * @param message 消息
             * @param o
             * @return 事务状态
             */
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                String messageTags = message.getTags();
                if (StringUtils.equals("TagA", messageTags)) {
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else if (StringUtils.equals("TagB", messageTags)) {
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                } else {
                    return LocalTransactionState.UNKNOW;
                }
            }

            /**
             * 该方法进行MQ事务状态的回查
             * @param messageExt 消息
             * @return 事务状态
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println("消息的Tag:" + messageExt.getTags());
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        String[] tags = new String[]{"TagA", "TagB", "TagC"};

        //4.启动producer
        producer.start();
        for (int i = 0; i < 3; i++) {
            //5.创建消息对象,指定Topic、Tag、消息体
            Message msg = new Message("topic_transaction", tags[i],
                    (tags[i] + " Hello transactionMsg " + i).getBytes(StandardCharsets.UTF_8));
            //6.发送消息
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
            System.out.println("发送结果:" + sendResult);
            Thread.sleep(1000);
        }
        //7.关闭生产者producer,此处需要回查,所以不关闭
        //producer.shutdown();
    }
}

Ⅱ 消息消费者consumer的核心代码

/**
 * 消息的接受者
 */
public class Consumer {

    public static void main(String[] args) throws Exception {
        //1.创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr("192.168.253.128:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("topic_transaction", "*");
        //4.注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
                                                            ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt messageExt : list) {
                    System.out.println("消费消息:" + new String(messageExt.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();
        System.out.println("消费者启动了...");
    }
}

注意:事务消息不支持延时消息和批量消息。

五、结尾

以上即为RocketMQ-基础使用(二)的全部内容,感谢阅读。

相关文章

网友评论

      本文标题:RocketMQ-基础使用(二)

      本文链接:https://www.haomeiwen.com/subject/cbhwprtx.html