美文网首页
rocketmq功能

rocketmq功能

作者: sunpy | 来源:发表于2022-08-23 23:23 被阅读0次

架构图


导包


<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>${rocketmq.version}</version>
</dependency>

生产者常用方法


方法名 说明
setRetryTimesWhenSendFailed 同步方式发送消息重试次数,默认为2
setRetryTimesWhenSendAsyncFailed 异步方式发送消息重试次数,默认为2
setSendMsgTimeout 发送消息默认超时时间,默认3000ms
setMaxMessageSize 允许发送的最大消息长度,默认为4M
setCompressMsgBodyOverHowmuch 消息体超过该值则启用压缩,默认4k
setRetryAnotherBrokerWhenNotStoreOK 消息重试时选择另外一个Broker时
setNamesrvAddr 设置NameServer的地址
send 发送消息,可以指定回调函数,同步异步
sendOneway 单向发送消息,不等待broker响应
shutdown 关闭当前生产者实例并释放相关资源
start 启动生产者
viewMessage 根据给定的msgId查询消息,还可指定topic
queryMessage 按关键字查询消息

消费者常用方法


方法名 说明
setNamesrvAddr 设置NameServer的地址
setMessageModel 设置消息消费模式(默认集群消费)
setConsumeThreadMin 消费者最小线程数量(默认20)
setConsumeThreadMax 消费者最大线程数量(默认20)
setPullInterval 推模式下任务间隔时间(推模式也是基于不断的轮训拉取的封装)
setPullBatchSize 推模式下任务拉取的条数,默认32条(一批批拉)
setMaxReconsumeTimes 消息重试次数,-1代表16次 (超过 次数成为死信消息)
setConsumeTimeout 消息消费超时时间(消息可能阻塞正在使用的线程的最大时间:以分钟为单位)

普通消息


生产者思路:

  • 启动生产者
  • 发送消息
  • 关闭生产者

消费者思路:

  • 注册消费的主题
  • 启动消费者
  • 关闭消费者

生产者实现:

public class ProducerUtil {

    private static DefaultMQProducer producer = null;

    public static void start() {
        producer = new DefaultMQProducer("defaultGroup");
        producer.setNamesrvAddr("IP:9876");
        producer.setRetryTimesWhenSendFailed(3);
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    public static ResultModel<String> send(String topic, String tags, String content) {
        Message msg = new Message(topic, tags, "", content.getBytes());
        try {
            producer.send(msg);

            return new ResultModel<>();
        } catch (Exception e) {
            e.printStackTrace();
        }

        return new ResultModel<>();
    }

    public static void shutDownProducer() {
        if(producer != null) {
            producer.shutdown();
        }
    }
}

消费者实现:

@Log
@Service
public class ConsumerService {

    private DefaultMQPushConsumer consumer = null;

    @PostConstruct
    public void initMQConsumer() {
        consumer = new DefaultMQPushConsumer("defaultGroup");
        consumer.setNamesrvAddr("IP:9876");
        consumer.setConsumeTimeout(10000);
        try {
            consumer.subscribe("demo", "*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {

                @Override
                public ConsumeConcurrentlyStatus consumeMessage(
                        List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                        log.info("Message Received: " + new String(msg.getBody()));
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    @PreDestroy
    public void shutDownConsumer() {
        if (consumer != null) {
            consumer.shutdown();
        }
    }
}

调用工具类测试:

顺序消息


顺序消息设计思路:
顺序消息就是指的,按顺序存入,按顺序取出。实现思路就是将同一类消息按照消费顺序放到同一种队列中去,这样就可以保证消费的先后顺序。
生产者实现:

public class ProducerUtil {

    private static DefaultMQProducer producer = null;

    public static void start() {
        producer = new DefaultMQProducer("defaultGroup");
        producer.setNamesrvAddr("49.235.73.14:9876");
        producer.setRetryTimesWhenSendFailed(3);
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    public static ResultModel<String> send(String topic) {
        List<Treat> treatList = new ProducerUtil().buildList();
        // tags数组 使用tag区分
        String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD"};

        for (int i = 0; i < treatList.size(); i++) {
            String body = " treat:" + treatList.get(i);
            Message msg = new Message(topic, tags[i % tags.length], "KEY" + i, body.getBytes());

            try {
                producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        /**
                         * 此处代码逻辑:就是找到消息对应的队列,一类消息一种队列
                         * 一种医疗治疗过程对应一种队列
                         * 20220819001L
                         * 20220819002L
                         * 20220819003L
                         * 20220819004L
                         */
                        String dbId = (String) arg;
                        String indexStr = dbId.substring(dbId.length() - 1);
                        return mqs.get(Integer.parseInt(indexStr) - 1);
                    }
                }, treatList.get(i).getDbId());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        return new ResultModel<>();
    }

    public static void shutDownProducer() {
        if(producer != null) {
            producer.shutdown();
        }
    }

    /**
     * 医疗过程
     */
    static class Treat {
        private String dbId;
        private String desc;

        public Treat() {
        }

        public Treat(String dbId, String desc) {
            this.dbId = dbId;
            this.desc = desc;
        }

        public String getDbId() {
            return dbId;
        }

        public void setDbId(String dbId) {
            this.dbId = dbId;
        }

        public String getDesc() {
            return desc;
        }

        public void setDesc(String desc) {
            this.desc = desc;
        }

        @Override
        public String toString() {
            return "Treat{" +
                    "dbId='" + dbId + '\'' +
                    ", desc='" + desc + '\'' +
                    '}';
        }
    }

    /**
     * 生成模拟订单数据
     */
    private List<Treat> buildList() {
        List<Treat> treatList = new ArrayList<>();
        treatList.add(new Treat("20220819004", "慢病检查"));
        treatList.add(new Treat("20220819001", "孕产妇产前检查"));
        treatList.add(new Treat("20220819003", "老人身体检查"));
        treatList.add(new Treat("20220819001", "孕产妇产中处理"));
        treatList.add(new Treat("20220819002", "婴儿身体检查"));
        treatList.add(new Treat("20220819001", "孕产妇产后护理"));
        treatList.add(new Treat("20220819004", "慢病治疗"));
        treatList.add(new Treat("20220819003", "老人专家会诊"));
        treatList.add(new Treat("20220819002", "婴儿产房护理"));
        treatList.add(new Treat("20220819003", "老人定期随访"));
        treatList.add(new Treat("20220819002", "婴儿定期随访"));
        treatList.add(new Treat("20220819004", "慢病随访"));
        treatList.add(new Treat("20220819004", "慢病观察"));
        return treatList;
    }
}

测试:

延时消息


延时消息指的是间隔一段时间后传给消费者。
setDelayTimeLevel(4);方法:
level(1~18个等级):1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

     public static ResultModel<String> send() {
        List<Treat> treatList = new ProducerUtil().buildList();
        // tags数组 使用tag区分
        String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD"};

        for (int i = 0; i < treatList.size(); i++) {
            String body = " treat:" + treatList.get(i);
            Message msg = new Message(TOPIC, tags[i % tags.length], "KEY" + i, body.getBytes());
            // 第四个等级 30s
            msg.setDelayTimeLevel(4);
            try {
                producer.send(msg);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        return new ResultModel<>();
    }

批量消息


批量消息就是一次性传入多个Message。

     public static ResultModel<String> send() {
        List<Treat> treatList = new ProducerUtil().buildList();
        // tags数组 使用tag区分
        String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD"};
        List<Message> msgList = new ArrayList<>();

        for (int i = 0; i < treatList.size(); i++) {
            String body = " treat:" + treatList.get(i);
            Message msg = new Message(TOPIC, tags[i % tags.length], "KEY" + i, body.getBytes());
            msgList.add(msg);
        }

        try {
            producer.send(msgList);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return new ResultModel<>();
    }

过滤消息


通过Tag标签来过滤不同消息,在消费者端实现。

    @PostConstruct
    public void initMQConsumer() {
        consumer = new DefaultMQPushConsumer(PRODUCER_GROUP);
        consumer.setNamesrvAddr(NAMESRV_ADDR);
        consumer.setConsumeTimeout(10000);

        try {
            consumer.subscribe(TOPIC, "TagC");
            consumer.registerMessageListener(new MessageListenerConcurrently() {

                @Override
                public ConsumeConcurrentlyStatus consumeMessage(
                        List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    try {
                        for (MessageExt msg : msgs) {
                            log.info("QueueId Received: " + msg.getQueueId() + " Message Received: " + new String(msg.getBody()));
                        }
                    } catch (Exception e) {
                        log.info(e.getMessage());
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }

                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

事务消息


https://www.jianshu.com/p/ed5c3c62a3c1

rocketmq顺序消息解决方案


将消息按照顺序放在同一个队列中就可以了。

rocketmq消息重复消费问题(幂等性问题)解决方案


  • 业务方式:消费者业务逻辑处理,譬如消息要我们新增一条记录,那么我们业务逻辑处理就是先去查询,如果该记录已存在,那么我们就不去新增记录。
  • 数据库方式:利用记录的unique索引唯一性,不让记录插入。
  • mq方式:利用日志表去记录每次成功消费的id,消费前先判断下日志表中是否已经消费过(还未尝试使用过)。

rocketmq消息丢失问题解决方案


使用rocketmq提供的自身事务。

参考


https://gitee.com/apache/rocketmq/blob/master/docs/cn/client/java/API_Reference_DefaultMQProducer.md

https://blog.csdn.net/weixin_38880770/article/details/118447350

相关文章

  • RocketMQ分布式事务消息

    1、RocketMQ事务消息概念 RocketMQ事务消息:RocketMQ 提供分布事务功能,通过 Rocket...

  • RocketMQ介绍

    RocketMQ介绍 RocketMQ介绍什么是消息队列产品功能功能概览图多协议支持管理工具特色功能专有云部署消息...

  • rocketmq功能

    架构图 导包 生产者常用方法 方法名说明setRetryTimesWhenSendFailed同步方式发送消息重试...

  • RocketMQ学习笔记-基础概念

    RocketMQ的功能 流量削锋 RocketMQ的引入,可以大大减少系统的流量压力,同时,rokecktmq能支...

  • RocketMQ源码分析之路由中心

    早期的rocketmq版本的路由功能是使用zookeeper实现的,后来rocketmq为了追求性能,自己实现了一...

  • RocketMQ系列(六)批量发送与过滤

    今天我们再来看看RocketMQ的另外两个小功能,消息的批量发送和过滤。这两个小功能提升了我们使用RocketMQ...

  • RocketMQ-Broker

    Broker在RocketMQ中承担着消息存储的功能。(待补充)

  • RocketMq消息队列服务化方案

    一、rocketMq目前设计 1. 目前已有功能概况 架构模式 RocketMQ 与大部分消息中间件 样,采用...

  • RocketMQ使用场景

    RocketMQ使用场景 RocketMQ使用场景场景介绍重要功能实例用户注册传统处理异步解耦分布式事务的数据一致...

  • 大厂生产环境的RocketMQ都是这样部署的

    昨天我们已经学习了RocketMQ的一些基本概念,架构设计和各个角色的功能。今天我们来聊聊RocketMQ的集群部...

网友评论

      本文标题:rocketmq功能

      本文链接:https://www.haomeiwen.com/subject/evhsgrtx.html