美文网首页
5:RocketMq实战(生产者与消费者 各种实战)(核心)(文

5:RocketMq实战(生产者与消费者 各种实战)(核心)(文

作者: _River_ | 来源:发表于2021-04-20 21:07 被阅读0次
    目录
    1:RocketMQ 生产和消费消息重试及处理 (重要 重要 重要)
            设置重试次数(报错消息存储)
            消费端去重(消息唯一性)
            测试集群消费与广播消费的重试
    

    顺序消息
    
    2:RocketMQ生产者之MessageQueueSelector实战(指定队列)(顺序消息前置学习)
    3:讲解顺序消息(发送与消费)在电商 和 证券系统中应用场景
    4:RocketMQ顺序消息概念
    5:RocketMQ顺序消息发送  实战代码
    6:RocketMQ顺序消息消费  实战代码
    
    1:RocketMQ生产和消费消息重试及处理 (保证消息的唯一性)
    生产者Producer重试(异步和SendOneWay下配置无效)
        消息重投(保证数据的高可靠性),本身内部支持重试,默认次数是2,
        如果网络情况比较差,或者跨集群则建改多几次
    

    消费端重试
        原因:消息处理异常、broker端到consumer端各种问题
                如网络原因闪断, 消费处理失败,ACK返回失败等等问题。
         
         注意 :      
                重试的间隔时间(默认重试16次):
                默认:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
                
                1:设置重试次数
                
                2:消费端去重:一条消息无论重试多少次,这些重试消息的Message ID, key不会改变。
    
                3:消费重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试
    
    public  ConsumerReconsume() throws MQClientException {
    
            DefaultMQPushConsumer defaultMQPushConsumer = creatDefaultMQPushConsumer();
    
            // 确认问题 3:消费重试只针对集群消费方式生效 广播方式不提供失败重试特性,即消费失败后,失败消息不再重试
    //        defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
    
            //监听器
            defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
    
                MessageExt msg = messages.get(0);
                int times = msg.getReconsumeTimes();
                System.out.println("重试次数:"+times);
    
                //判断该msgId是否已经在redis中
                // 解决问题 2:消费端去重:一条消息无论重试多少次,这些重试消息的Message ID, key不会改变。
    
                //遗留问题 假如在下面逻辑成功之前 有另一个一样msgId的消息进入 那么该如何解决
                //考虑再添加一个Synchronized的锁  但这个性能消耗大 同时概率极低 因此不推荐
    
                try {
                    log.info(" Receive New Messages: {},{} ",Thread.currentThread().getName(), new String(messages.get(0).getBody()));
                    String topic = msg.getTopic();
                    String tags = msg.getTags();
                    String keys = msg.getKeys();
                    String body = new String(msg.getBody(), "utf-8");
                    log.info("topic=" + topic + ", tags=" + tags + ", keys = " + keys + ", msg = " + body);
    
                    // 使消费一定失败
                    if(true){
                        throw new Exception();
                    }
    
                    System.out.println("重试时的megId:"+ msg.getMsgId());
                    //解决问题 2: 可以把这个msgId存放到redis中设置对应的过期时间 如果这个Id已经存在则不需要重复进入该逻辑
    
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    log.info("rocket消费者异常:{}",e.getMessage(),e);
    
                    //times的初始值为0 等于等于2就是一共尝试了3次
                    //解决问题 1:设置重试次数
                    if (times>=2){
                        System.out.println("记录数据库 通知运维人员");
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
    
                    System.out.println("重试次数异常:"+times);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            });
            log.info("consumer  start");
            defaultMQPushConsumer.start();
        }
    
    
    2:RocketMQ生产者之MessageQueueSelector实战(指定队列)(顺序消息前置学习)
    简介:生产消息使用MessageQueueSelector投递到Topic下指定的queue 
    默认topic下的queue数量是4  可以进行配置
    
    应用场景:配合  顺序消息  分摊负载 的使用
     
    顺序消息:消息是有序的(同步发送)
    分摊负载:可以自定义规则 选择发到topic的队列
                 某种事件的消息 发送到 队列里面的消息特别多 导致该topic里面的4个队列都不可用
                 该事件可以特殊选的某个队列  那么其他3个队列就可用
    
    注意:
        1:支持同步,异步发送:
        2:选择的queue数量必须小于配置的,否则会出错
    
    //顺序发送可以根据一个订单号进行取模 然后该订单选择对应的队列
    //与业务是强耦合的 需要根据业务做具体的分析
    
    //public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
    //同步发送
    //选择对应的队列
    SendResult sendResultSyn =  defaultMQProducer.send(message, new MessageQueueSelector() {
        @Override
        public MessageQueue select(List<MessageQueue> messageQueueList, Message message, Object arg) {
           //2:选择使用该topic下的哪个  queueId   该Id需要在 arg获取
            int queueNum = Integer.parseInt(arg.toString());
            return messageQueueList.get(queueNum);
        }
        // 1:arg 0 为选择队列  queueId =0
    },0);
    
    log.info("RocketMQ order 消息同步发送 结果:{}",  JSON.toJSONString(sendResultSyn));
    
    //public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
    //异步发送
    //选择对应的队列
     defaultMQProducer.send(message, (messageQueueList, message1, arg) -> {
        int queueNum = Integer.parseInt(arg.toString());
        return messageQueueList.get(queueNum);
    }, 1, new SendCallback() {
    
        @Override
        public void onSuccess(SendResult sendResultAsyn) {
            log.info("RocketMQ order 消息异步发送 结果:{}",  JSON.toJSONString(sendResultAsyn));
        }
        @Override
        public void onException(Throwable throwable) {
    
        }
    });
    
    3:讲解顺序消息(发送与消费)在电商 和 证券系统中应用场景
    简介:基础介绍顺序消息和对应可以使用的场景,订单系统,
    
    什么是顺序消息:消息的生产和消费顺序一致
    
    全局顺序:topic下面全部消息都要有序(少用)(该topic下只有一个队列)(了解即可)
        性能要求不高,所有的消息严格按照FIFO原则进行消息发布和消费的场景,吞吐量不够.
        在证券处理中,以人民币兑换美元为例子,在价格相同的情况下,
        有多个出价者,先出价者优先处理, 则FIFO的方式进行发布和消费
    注意:也可以使用局部顺序配合其他方案处理
    
    局部顺序:只要保证一组消息被顺序消费即可(RocketMQ使用)
           电商的订单创建,同一个订单相关的创建订单消息、订单支付消息、订单交易成功、订单退款消息
        (阿里巴巴集团内部电商系统均使用局部顺序消息,既保证业务的顺序,同时又能保证 业业务的高性能
    
    
    顺序发布:对于指定的一个Topic,客户端将按照一定的先后顺序发送消息
    顺序消费:对于指定的一个Topic,按照一定的先后顺序接收消息,即先发送的消息一定会先被客 户端接收到。
    
    注意:
        顺序消息暂不支持广播模式
        顺序消息不支持异步发送方式,否则将无法严格保证顺序
    
    4:RocketMQ顺序消息 概念
    生产端保证发送消息有序,且发送到同一个Topic的同个queue里面,RocketMQ的确是能保证FIFO
    
    例子:订单的顺序流程是:
        创建、付款、物流、完成,订单号相同的消息会被先后发送到同 —个队列中,
        根据MessageQueueSelector里面自定义策略,
        根据同个业务id放置到同个queue里面,如订 单号取模运算再放到selector中,同一个模的值都会投递到同一条queue
    

    消费端要在保证消费同个topic里的同个队列,
    不应该用MessageListenerConcurrently  并发使用多线程去消费
    
    应该使用MessageListenerOrderly       自带单线程消费消息,消费端分配到的queue数量是固定的,
                                                    消费者集群会锁住当前正在消费的队列集合的消息,所以会保证顺序消费。
    MessageListenerOrderly                                       
            1:Consumer会平均分配Queue的数量 
            2:一个Consumer需要先处理完一个Consumer Queue 才能切换到其他的Consumer Queue
            3 :为每个Consumer Quene加个锁,消费每个消息前,需要获得这个消息所在的Queue的锁,
                  这样同个时间,同个Queue的消息不被其他的Consumer Quene并发消费,但是不同Queue的消息可以并发处理
     
            锁的机制类似于ConcurrentHashMap
    
      小技巧:启动两个节点:
     然后修改yml里面的端口 再次启动发现同一个服务可以启动两次 
    
    5:RocketMQ顺序消息发送 实战代码
      private void  sendOrderMessage(RocketEvent<?> rocketEvent) {
            //主题
            String topic = "pay_Sequence_topic";
            Message message ;
            try {
                if (rocketEvent.getTag() != null && !"".equals(rocketEvent.getTag()) ) {
                    message = new Message(topic,rocketEvent.getTag(), JSON.toJSONString(rocketEvent).getBytes());
                }else {
                    message = new Message(topic, JSON.toJSONString(rocketEvent).getBytes());
                }
                log.info("RocketMQ order 消息发送:{}", JSON.toJSONString(rocketEvent));
    
                //转换为Order
                Order  order = JSONObject.parseObject(JSONObject.toJSONString(rocketEvent.getData()), Order.class);
    
                //顺序发送可以根据一个订单号进行取模 然后该订单选择对应的队列
                SendResult sendResultSyn =  defaultMQProducer.send(message, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> messageQueueList, Message message, Object arg) {
                       Long orderId = (Long) arg;
                       //根据 订单Id 取模后循序投递到队列
                       long queueNum =  orderId%messageQueueList.size();
                       return messageQueueList.get((int) queueNum);
                    }
                    // 取订单Id作为入参
                },order.getOrderId());
                log.info("RocketMQ order 消息同步发送 结果:{}",  JSON.toJSONString(sendResultSyn));
            } catch (Exception e) {
                log.error("RocketMQ order 消息发送异常: " + e.getMessage(), e);
            }
        }
    
    6:RocketMQ顺序消息消费 实战代码
    public ConsumerConsumeSequence() throws MQClientException {
    
            //创建DefaultMQPushConsumer
            DefaultMQPushConsumer defaultMQPushConsumer = creatDefaultMQPushConsumer();
    
            //MessageListenerOrderly用于顺序消费
            //该消费者在处理一个 topic下的 某个队列的时候 其他消费者不能再去处理该队列
    
            //监听器
            defaultMQPushConsumer.registerMessageListener((MessageListenerOrderly) (messages, context) -> {
                try {
                    Message msg = messages.get(0);
                    log.info(" Receive New Messages: {},{} ",Thread.currentThread().getName(), new String(messages.get(0).getBody()));
                    String topic = msg.getTopic();
                    String tags = msg.getTags();
                    String keys = msg.getKeys();
                    String body = new String(msg.getBody(), "utf-8");
                    log.info("topic=" + topic + ", tags=" + tags + ", keys = " + keys + ", msg = " + body);
                    return ConsumeOrderlyStatus.SUCCESS;
                } catch (UnsupportedEncodingException e) {
                    log.info("rocket消费者异常:{}",e.getMessage(),e);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            });
            log.info("consumer  start");
            defaultMQPushConsumer.start();
        }
    

    项目连接

    请配合项目代码食用效果更佳:
    项目地址:
    https://github.com/hesuijin/hesuijin-study-project
    Git下载地址:
    https://github.com.cnpmjs.org/hesuijin/hesuijin-study-project.git
    
    rocketmq-module项目模块下 
    
    注意:因为需要修改相应配置 相关测试的代码
        生产者代码主要在  单元测试 
        消费者代码主要在  项目代码 com.example.rocketmq.demo.consumer.Junit包下
    

    相关文章

      网友评论

          本文标题:5:RocketMq实战(生产者与消费者 各种实战)(核心)(文

          本文链接:https://www.haomeiwen.com/subject/tekolltx.html