美文网首页
RoketMQ 顺序消息

RoketMQ 顺序消息

作者: 程序猿TT | 来源:发表于2023-07-30 10:11 被阅读0次

    顺序消息DEMO

    1. 准备

    ProductOrder

    public class ProductOrder {
        private String orderId;
        private String type;
    
        public ProductOrder(String orderId, String type) {
            this.orderId = orderId;
            this.type = type;
        }
    
        public String getOrderId() {
            return orderId;
        }
    
        public void setOrderId(String orderId) {
            this.orderId = orderId;
        }
    
        public String getType() {
            return type;
        }
    
        public void setType(String type) {
            this.type = type;
        }
    
        @Override
        public String toString() {
            return "ProductOrder{" +
                    "orderId='" + orderId + '\'' +
                    ", type='" + type + '\'' +
                    '}';
        }
    }
    

    OrderType

    public enum OrderType {
        ORDER_CREATE, ORDER_PAYMENT, ORDER_FINISHED
    }
    

    Producer

    public class OrderProducer {
        private static final List<ProductOrder> orderList = new ArrayList<>();
    
        static {
            orderList.add(new ProductOrder("ORDER_001", OrderType.ORDER_CREATE.name()));
            orderList.add(new ProductOrder("ORDER_001", OrderType.ORDER_PAYMENT.name()));
            orderList.add(new ProductOrder("ORDER_001", OrderType.ORDER_FINISHED.name()));
            orderList.add(new ProductOrder("ORDER_002", OrderType.ORDER_CREATE.name()));
            orderList.add(new ProductOrder("ORDER_002", OrderType.ORDER_PAYMENT.name()));
            orderList.add(new ProductOrder("ORDER_002", OrderType.ORDER_FINISHED.name()));
            orderList.add(new ProductOrder("ORDER_003", OrderType.ORDER_CREATE.name()));
            orderList.add(new ProductOrder("ORDER_003", OrderType.ORDER_PAYMENT.name()));
            orderList.add(new ProductOrder("ORDER_003", OrderType.ORDER_FINISHED.name()));
            orderList.add(new ProductOrder("ORDER_004", OrderType.ORDER_CREATE.name()));
            orderList.add(new ProductOrder("ORDER_004", OrderType.ORDER_PAYMENT.name()));
            orderList.add(new ProductOrder("ORDER_004", OrderType.ORDER_FINISHED.name()));
        }
    
        public static void main(String[] args) throws Exception {
            //创建一个消息生产者,并设置一个消息生产者组
            DefaultMQProducer producer = new DefaultMQProducer("default-producer");
            //指定 NameServer 地址
            producer.setNamesrvAddr("localhost:9876");
            //初始化 Producer,整个应用生命周期内只需要初始化一次
            producer.start();
    
            for (int i = 0; i < orderList.size(); i++) {
                //获取当前order
                ProductOrder order = orderList.get(i);
                //创建一条消息对象,指定其主题、标签和消息内容
                Message message = new Message(
                        /* 消息主题名 */
                        "topicTest_1",
                        /* 消息标签 */
                        order.getType(),
                        /* 消息KEY */
                        order.getOrderId(),
                        /* 消息内容 */
                        (order.toString()).getBytes(StandardCharsets.UTF_8)
                );
    
                //发送消息并返回结果 使用hash选择策略
                SendResult sendResult = producer.send(message, new SelectMessageQueueByHash(), order.getOrderId());
    
                System.out.println("product: 发送状态:" + sendResult.getSendStatus() + ",存储queue:" + sendResult.getMessageQueue().getQueueId() + ",orderID:" + order.getOrderId() + ",type:" + order.getType());
            }
    
            // 一旦生产者实例不再被使用则将其关闭,包括清理资源,关闭网络连接等
            producer.shutdown();
        }
    }
    

    Consumer

    public class OrderConsumer {
        private static final Random random = new Random();
    
        public static void main(String[] args) throws Exception {
            //创建一个消息消费者,并设置一个消息消费者组
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_order");
            //指定 NameServer 地址
            consumer.setNamesrvAddr("localhost:9876");
            //设置 Consumer 第一次启动时从队列头部开始消费还是队列尾部开始消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            //订阅指定 Topic 下的所有消息
            consumer.subscribe("topicTest_1", "*");
    
            //注册消费的监听 这里注意顺序消费为MessageListenerOrderly
            consumer.registerMessageListener((MessageListenerOrderly) (list, context) -> {
                //默认 list 里只有一条消息,可以通过设置参数来批量接收消息
                if (list != null) {
                    for (MessageExt ext : list) {
                        try {
                            try {
                                //模拟业务逻辑处理中...
                                TimeUnit.SECONDS.sleep(random.nextInt(10));
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                            //重试次数
                            int retryTimes = ext.getReconsumeTimes();
                            //获取接收到的消息
                            String message = new String(ext.getBody(), RemotingHelper.DEFAULT_CHARSET);
                            //获取队列ID
                            int queueId = context.getMessageQueue().getQueueId();
                            //打印消息
                            System.out.println("Consumer-线程名称=[" + Thread.currentThread().getId() + "],重试次数:[" + retryTimes + "],接收queueId:[" + queueId + "],接收时间:[" + new Date().getTime() + "],消息=[" + message + "]");
    
                        } catch (UnsupportedEncodingException e) {
                            e.printStackTrace();
                        }
                    }
                }
                // 模拟异常
                int num = random.nextInt(10);
                if (num % 3 == 0) {
                    System.out.println("系统出现异常,阻塞当前队列...");
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
                return ConsumeOrderlyStatus.SUCCESS;
            });
    
            // 消费者对象在使用之前必须要调用 start 初始化
            consumer.start();
            System.out.println("消息消费者已启动");
        }
    }
    

    相关文章

      网友评论

          本文标题:RoketMQ 顺序消息

          本文链接:https://www.haomeiwen.com/subject/rkcspdtx.html