美文网首页rocketmq
RocketMQ与Springboot封装

RocketMQ与Springboot封装

作者: 邓启翔 | 来源:发表于2018-02-01 16:33 被阅读0次

    消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,日志记录,流量削锋、分布式事务等问题,实现高性能,高可用,可伸缩和最终一致性架构。

    zebra架构选用RocketMQ作为消息队列组件,下面介绍下RocketMQ如何与Springboot进行组合封装。

    1、引入依赖包

    image

    2、设置配置项信息

    namesrvAddr地址

    zebra.rocketmq.namesrvAddr=0.0.0.0:9876

    生产者group名称

    zebra.rocketmq.producerGroupName=producerGroupName

    事务生产者group名称

    zebra.rocketmq.transactionProducerGroupName=transactionProducerGroupName

    消费者group名称

    zebra.rocketmq.consumerGroupName=consumerGroupName

    生产者实例名称

    zebra.rocketmq.producerInstanceName=producerInstanceName

    消费者实例名称

    zebra.rocketmq.consumerInstanceName=consumerInstanceName

    事务生产者实例名称

    zebra.rocketmq.producerTranInstanceName=producerTranInstanceName

    一次最大消费多少数量消息

    zebra.rocketmq.consumerBatchMaxSize=1

    广播消费

    zebra.rocketmq.consumerBroadcasting=false

    消费的topic:tag

    zebra.rocketmq.subscribe[0]=TopicTest1:TagA

    启动的时候是否消费历史记录

    zebra.rocketmq.enableHisConsumer=false

    启动顺序消费

    zebra.rocketmq.enableOrderConsumer=false

    3、编写配置类

    
    @ConfigurationProperties(RocketmqProperties.PREFIX)
    public class RocketmqProperties {
       public static final String PREFIX = "zebra.rocketmq";
       private String namesrvAddr;
       private String producerGroupName; 
       private String transactionProducerGroupName; 
       private String consumerGroupName; 
       private String producerInstanceName;
       private String consumerInstanceName;
       private String producerTranInstanceName;
       private int consumerBatchMaxSize;
       private boolean consumerBroadcasting;
       private boolean enableHisConsumer;
       private boolean enableOrderConsumer;
       private List subscribe = new ArrayList<>();
    }
    

    4、编写producer和consumer初始化类

    @Configuration
    @EnableConfigurationProperties(RocketmqProperties.class)
    @ConditionalOnProperty(prefix = RocketmqProperties.PREFIX, value = "namesrvAddr")
    public class RocketmqAutoConfiguration {
       private static final Logger log = LogManager.getLogger(RocketmqAutoConfiguration.class);
       @Autowired
       private RocketmqProperties properties;
       @Autowired
       private ApplicationEventPublisher publisher;
    
       private static boolean isFirstSub = true;
    
       private static long startTime = System.currentTimeMillis();
    
       /**
        * 初始化向rocketmq发送普通消息的生产者
        */
       @Bean
       @ConditionalOnProperty(prefix = RocketmqProperties.PREFIX, value = "producerInstanceName")
       @ConditionalOnBean(EtcdClient.class)
       public DefaultMQProducer defaultProducer() throws MQClientException {
           /**
            * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br>
            * 注意:ProducerGroupName需要由应用来保证唯一<br>
            * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
            * 因为服务器会回查这个Group下的任意一个Producer
            */
           DefaultMQProducer producer = new DefaultMQProducer(properties.getProducerGroupName());
           producer.setNamesrvAddr(properties.getNamesrvAddr());
           producer.setInstanceName(properties.getProducerInstanceName());
           producer.setVipChannelEnabled(false);
           producer.setRetryTimesWhenSendAsyncFailed(10);
    
           /**
            * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>
            * 注意:切记不可以在每次发送消息时,都调用start方法
            */
           producer.start();
           log.info("RocketMq defaultProducer Started.");
           return producer;
       }
    
       /**
        * 初始化向rocketmq发送事务消息的生产者
        */
       @Bean
       @ConditionalOnProperty(prefix = RocketmqProperties.PREFIX, value = "producerTranInstanceName")
       @ConditionalOnBean(EtcdClient.class)
       public TransactionMQProducer transactionProducer() throws MQClientException {
           /**
            * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br>
            * 注意:ProducerGroupName需要由应用来保证唯一<br>
            * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
            * 因为服务器会回查这个Group下的任意一个Producer
            */
           TransactionMQProducer producer = new TransactionMQProducer(properties.getTransactionProducerGroupName());
           producer.setNamesrvAddr(properties.getNamesrvAddr());
           producer.setInstanceName(properties.getProducerTranInstanceName());
           producer.setRetryTimesWhenSendAsyncFailed(10);
    
           // 事务回查最小并发数
           producer.setCheckThreadPoolMinSize(2);
           // 事务回查最大并发数
           producer.setCheckThreadPoolMaxSize(2);
           // 队列数
           producer.setCheckRequestHoldMax(2000);
    
           // TODO 由于社区版本的服务器阉割调了消息回查的功能,所以这个地方没有意义
           // TransactionCheckListener transactionCheckListener = new
           // TransactionCheckListenerImpl();
           // producer.setTransactionCheckListener(transactionCheckListener);
    
           /**
            * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>
            * 注意:切记不可以在每次发送消息时,都调用start方法
            */
           producer.start();
    
           log.info("RocketMq TransactionMQProducer Started.");
           return producer;
       }
    
       /**
        * 初始化rocketmq消息监听方式的消费者
        */
       @Bean
       @ConditionalOnProperty(prefix = RocketmqProperties.PREFIX, value = "consumerInstanceName")
       @ConditionalOnBean(EtcdClient.class)
       public DefaultMQPushConsumer pushConsumer() throws MQClientException {
           DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(properties.getConsumerGroupName());
           consumer.setNamesrvAddr(properties.getNamesrvAddr());
           consumer.setInstanceName(properties.getConsumerInstanceName());
           if (properties.isConsumerBroadcasting()) {
               consumer.setMessageModel(MessageModel.BROADCASTING);
           }
           consumer.setConsumeMessageBatchMaxSize(
                   properties.getConsumerBatchMaxSize() == 0 ? 1 : properties.getConsumerBatchMaxSize());// 设置批量消费,以提升消费吞吐量,默认是1
           /**
            * 订阅指定topic下tags
            */
           List<String> subscribeList = properties.getSubscribe();
           for (String sunscribe : subscribeList) {
               consumer.subscribe(sunscribe.split(":")[0], sunscribe.split(":")[1]);
           }
           if (properties.isEnableOrderConsumer()) {
               consumer.registerMessageListener((List<MessageExt> msgs, ConsumeOrderlyContext context) -> {
                   try {
                       context.setAutoCommit(true);
                       msgs =filter(msgs);
                       if(msgs.size()==0) return ConsumeOrderlyStatus.SUCCESS;
                       this.publisher.publishEvent(new RocketmqEvent(msgs, consumer));
                   } catch (Exception e) {
                       e.printStackTrace();
                       return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                   }
                   // 如果没有return success,consumer会重复消费此信息,直到success。
                   return ConsumeOrderlyStatus.SUCCESS;
               });
           } else {
               consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
                   try {
                       msgs=filter(msgs);
                       if(msgs.size()==0) return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                       this.publisher.publishEvent(new RocketmqEvent(msgs, consumer));
                   } catch (Exception e) {
                       e.printStackTrace();
                       return ConsumeConcurrentlyStatus.RECONSUME_LATER;  
                   }
                   // 如果没有return success,consumer会重复消费此信息,直到success。
                   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
               });
           }
           new Thread(new Runnable() {
               @Override
               public void run() {
                   try {
                       Thread.sleep(5000);// 延迟5秒再启动,主要是等待spring事件监听相关程序初始化完成,否则,回出现对RocketMQ的消息进行消费后立即发布消息到达的事件,然而此事件的监听程序还未初始化,从而造成消息的丢失
                       /**
                        * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
                        */
                       try {
                           consumer.start();
                       } catch (Exception e) {
                           log.info("RocketMq pushConsumer Start failure!!!.");
                           log.error(e.getMessage(), e);
                       }
                       log.info("RocketMq pushConsumer Started.");
                   } catch (InterruptedException e) {
                       e.printStackTrace();
                   }
               }
    
           }).start();
    
           return consumer;
       }
       
       private List<MessageExt> filter(List<MessageExt> msgs){
           if(isFirstSub&&!properties.isEnableHisConsumer()){
               msgs =msgs.stream().filter(item ->startTime - item.getBornTimestamp() < 0).collect(Collectors.toList());
           }
           if(isFirstSub && msgs.size()>0){
               isFirstSub = false;
           }
           return msgs;
       }
    

    4、编写Event,方便Consumer使用

    public class RocketmqEvent extends ApplicationEvent {
       private static final long serialVersionUID = -4468405250074063206L;
       private DefaultMQPushConsumer consumer;
       private List<MessageExt> msgs;
    
       public RocketmqEvent(List<MessageExt> msgs, DefaultMQPushConsumer consumer) throws Exception {
           super(msgs);
           this.consumer = consumer;
           this.setMsgs(msgs);
       }
    
       public String getMsg(int idx) {
           try {
               return new String(getMsgs().get(idx).getBody(), "utf-8");
           } catch (UnsupportedEncodingException e) {
               return null;
           }
       }
    
       public String getMsg(int idx,String code) {
           try {
               return new String(getMsgs().get(idx).getBody(), code);
           } catch (UnsupportedEncodingException e) {
               return null;
           }
       }
    
       public DefaultMQPushConsumer getConsumer() {
           return consumer;
       }
    
       public void setConsumer(DefaultMQPushConsumer consumer) {
           this.consumer = consumer;
       }
    
       public MessageExt getMessageExt(int idx) {
           return getMsgs().get(idx);
       }
    
    
       public String getTopic(int idx) {
           return getMsgs().get(idx).getTopic();
       }
    
    
       public String getTag(int idx) {
           return getMsgs().get(idx).getTags();
       }
    
    
       public byte[] getBody(int idx) {
           return getMsgs().get(idx).getBody();
       }
    
    
       public String getKeys(int idx) {
           return getMsgs().get(idx).getKeys();
       }
    
       public List<MessageExt> getMsgs() {
           return msgs;
       }
    
       public void setMsgs(List<MessageExt> msgs) {
           this.msgs = msgs;
       }
    }
    

    范例

    Producer
    @RestController
    public class ProducerDemo {
       @Autowired
       private DefaultMQProducer defaultProducer;
    
       @Autowired
       private TransactionMQProducer transactionProducer;
    
       private int i = 0;
    
       @RequestMapping(value = "/sendMsg", method = RequestMethod.GET)
       public void sendMsg() {
           Message msg = new Message("TopicTest1", // topic
                   "TagA", // tag
                   "OrderID00" + i, // key
                   ("Hello zebra mq" + i).getBytes());// body
           try {
               defaultProducer.send(msg, new SendCallback() {
    
                   @Override
                   public void onSuccess(SendResult sendResult) {
                       System.out.println(sendResult);
                       // TODO 发送成功处理
                   }
    
                   @Override
                   public void onException(Throwable e) {
                       System.out.println(e);
                       // TODO 发送失败处理
                   }
               });
               i++;
           } catch (Exception e) {
               e.printStackTrace();
           }
       }
    
       @RequestMapping(value = "/sendTransactionMsg", method = RequestMethod.GET)
       public String sendTransactionMsg() {
           SendResult sendResult = null;
           try {
               // 构造消息
               Message msg = new Message("TopicTest1", // topic
                       "TagA", // tag
                       "OrderID001", // key
                       ("Hello zebra mq").getBytes());// body
    
               // 发送事务消息,LocalTransactionExecute的executeLocalTransactionBranch方法中执行本地逻辑
               sendResult = transactionProducer.sendMessageInTransaction(msg, (Message msg1, Object arg) -> {
                   int value = 1;
    
                   // TODO 执行本地事务,改变value的值
                   // ===================================================
                   System.out.println("执行本地事务。。。完成");
                   if (arg instanceof Integer) {
                       value = (Integer) arg;
                   }
                   // ===================================================
    
                   if (value == 0) {
                       throw new RuntimeException("Could not find db");
                   } else if ((value % 5) == 0) {
                       return LocalTransactionState.ROLLBACK_MESSAGE;
                   } else if ((value % 4) == 0) {
                       return LocalTransactionState.COMMIT_MESSAGE;
                   }
                   return LocalTransactionState.ROLLBACK_MESSAGE;
               }, 4);
               System.out.println(sendResult);
           } catch (Exception e) {
               e.printStackTrace();
           }
           return sendResult.toString();
       }
    
       @RequestMapping(value = "/sendMsgOrder", method = RequestMethod.GET)
       public void sendMsgOrder() {
           Message msg = new Message("TopicTest1", // topic
                   "TagA", // tag
                   "OrderID00" + i, // key
                   ("Hello zebra mq" + i).getBytes());// body
           try {
               defaultProducer.send(msg, new MessageQueueSelector() {
                   @Override
                   public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                       System.out.println("MessageQueue" + arg);
                       int index = ((Integer) arg) % mqs.size();
                       return mqs.get(index);
                   }
               }, i);// i==arg
               i++;
           } catch (Exception e) {
               e.printStackTrace();
           }
       }
    }
    
    Consumer
    @Component
    public class ConsumerDemo {
       @EventListener(condition = "#event.msgs[0].topic=='TopicTest1' && #event.msgs[0].tags=='TagA'")
       public void rocketmqMsgListen(RocketmqEvent event) {
    //      DefaultMQPushConsumer consumer = event.getConsumer();
           try {
               System.out.println("com.guosen.client.controller.consumerDemo监听到一个消息达到:" + event.getMsgs().get(0).getMsgId());
               // TODO 进行业务处理
           } catch (Exception e) {
               e.printStackTrace();
           }
       }
    }
    
    zebra

    相关文章

      网友评论

        本文标题:RocketMQ与Springboot封装

        本文链接:https://www.haomeiwen.com/subject/ffndzxtx.html