美文网首页MQ基础知识
springboot2 整合rocketMQ4.3

springboot2 整合rocketMQ4.3

作者: JSM_SIMONS | 来源:发表于2018-09-18 18:31 被阅读1240次

    前言
    前端时间更新技术架构的时候,看到rocketmq出了4的版本,而且本身这个mq有事务消息,在分布式的场景中有很好的启发性,和作用,而且本身它也是阿里开源到apache的一个项目,从出身还是实力来说都很不错,所以今天就抱着支持国产的心态开看一看这个mq

    1.项目代码
    maven

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.3.0</version>
     </dependency>
    

    producer
    单从分类producer的官网doc来看主要分成3种,
    DefaultMQProducer
    TransactionMQProducer
    messagingAccessPoint.createProducer()
    本文主要说的是,DefaultMQProducer,TransactionMQProducer
    DefaultMQProducer
    默认的producer,从官方的文档来看,前四个都是对这个producer的运用只是set的值不同而已,而且是很细微的变化而已


    1B48.tmp.png

    那么从最简单的开始
    application.yml文件

    rocketmq: 
      # 生产者配置
      producer: 
        groupName: ${spring.application.name}
        namesrvAddr: 192.168.40.133:9876
        default: false
    

    yml文件配置读取类

    @Getter
    @Setter
    @ConfigurationProperties(prefix = "rocketmq.producer")
    @Configuration
    @ToString
    public class ProducerConfig {
        private String namesrvAddr;
        
        private String groupName;
    }
    

    producer类的创建类,需要注意的是这个producer一个程序里面只能出现一个,当重复创建时就会报错

    @Log4j2
    @Configuration
    public class ProducerConfigure {
    
        @Autowired
        private ProducerConfig producerConfigure;
    
        /**
         * 创建普通消息发送者实例
         * 
         * @return
         * @throws MQClientException
         */
        @Bean
        @ConditionalOnProperty(prefix = "rocketmq.producer", value = "default", havingValue = "true")
        public DefaultMQProducer defaultProducer() throws MQClientException {
            log.info(producerConfigure.toString());
            log.info("defaultProducer 正在创建---------------------------------------");
            DefaultMQProducer producer = new DefaultMQProducer(producerConfigure.getGroupName());
            producer.setNamesrvAddr(producerConfigure.getNamesrvAddr());
            producer.setVipChannelEnabled(false);
            producer.setRetryTimesWhenSendAsyncFailed(10);
            producer.start();
            log.info("rocketmq producer server开启成功---------------------------------.");
            return producer;
        }
    }
    

    当producer创建完毕之后就是consumer的公用设置
    首先也是yml和配置类的定义

    rocketmq: 
      # 消费者配置
      consumer: 
        groupName: ${spring.application.name}
        namesrvAddr: 192.168.40.133:9876
    
    @Getter
    @Setter
    @ConfigurationProperties(prefix = "rocketmq.consumer")
    @Configuration
    @ToString
    public class ConsumerConfig {
        private String groupName;
        
        private String namesrvAddr;
    }
    
    @Configuration
    @Log4j2
    public abstract class DefaultConsumerConfigure {
    
        @Autowired
        private ConsumerConfig consumerConfig;
    
        // 开启消费者监听服务
        public void listener(String topic, String tag) throws MQClientException {
            log.info("开启" + topic + ":" + tag + "消费者-------------------");
            log.info(consumerConfig.toString());
    
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerConfig.getGroupName());
    
            consumer.setNamesrvAddr(consumerConfig.getNamesrvAddr());
    
            consumer.subscribe(topic, tag);
    
            // 开启内部类实现监听
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    return DefaultConsumerConfigure.this.dealBody(msgs);
                }
            });
    
            consumer.start();
    
            log.info("rocketmq启动成功---------------------------------------");
    
        }
        
        // 处理body的业务
        public abstract ConsumeConcurrentlyStatus dealBody(List<MessageExt> msgs);
    
    }
    

    值得注意的是,这里DefaultConsumerConfigure没有定义在什么时候运行,还有对body的操作也抽象出来了,提供给实现类做处理,方便业务抽取

    @Log4j2
    @Configuration
    public class TestConsumer extends DefaultConsumerConfigure implements ApplicationListener<ContextRefreshedEvent>{
    
        @Override
        public void onApplicationEvent(ContextRefreshedEvent arg0) {
            try {
                super.listener("t_TopicTest", "Tag1");
            } catch (MQClientException e) {
                log.error("消费者监听器启动失败", e);
            }
            
        }
    
        @Override
        public ConsumeConcurrentlyStatus dealBody(List<MessageExt> msgs)  {
            int num = 1;
            log.info("进入");
            for(MessageExt msg : msgs) {
                log.info("第" + num + "次消息");
                try {
                    String msgStr = new String(msg.getBody(), "utf-8");
                    log.info(msgStr);
                } catch (UnsupportedEncodingException e) {
                    log.error("body转字符串解析失败");
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }
    
    

    那么这些写完之后基本就做测试类的编写了
    这个地实现了ApplicationListener,让他在启动的时候就开始执行这个consumer,相信有些同学会喜欢用@PostConstruct,但是不要这么做,因为他会在init之前执行,那么有些类会加载不完全,会导致无法开机启动的

    然后再controller里面引入producer,然后直接调用即可

    @RestController
    @RequestMapping("/test")
    @Log4j2
    public class TestController {
    
        @Autowired
        private DefaultMQProducer defaultMQProducer;
        
    //  @Autowired
    //  private TransactionMQProducer producer;
        
        @Autowired
        private TestTransactionListener testTransactionListener;
    
        @GetMapping("/test")
        public void test(String info) throws Exception {
            Message message = new Message("TopicTest", "Tag1", "12345", "rocketmq测试成功".getBytes());
                    // 这里用到了这个mq的异步处理,类似ajax,可以得到发送到mq的情况,并做相应的处理
                   //不过要注意的是这个是异步的
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    log.info("传输成功");
                    log.info(GsonUtil.GSON.toJson(sendResult));
                }
                @Override
                public void onException(Throwable e) {
                    log.error("传输失败", e);
                }
            });
        }
    }
    

    跑一跑,就可以看到结果了

    说完了DefaultMQProducer那么就来说transactionMQProducer
    同样参考官网的例子来做整合
    在原来的ProducerConfigure类的基础上加上即可

    需要注意的是ConditionalOnProperty这个必须得有,而且配置文件中
    transaction和default中只能有一个是true,不然就会同时创建两个producer,那么就会报错

        /**
         * 创建事务消息发送者实例
         * 
         * @return
         * @throws MQClientException
         */
        @Bean
        @ConditionalOnProperty(prefix = "rocketmq.producer", value = "transaction", havingValue = "true")
        public TransactionMQProducer transactionMQProducer() throws MQClientException {
            log.info(producerConfigure.toString());
            log.info("TransactionMQProducer 正在创建---------------------------------------");
            TransactionMQProducer producer = new TransactionMQProducer(producerConfigure.getGroupName());
    
            ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
                    new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable r) {
                            Thread thread = new Thread(r);
                            thread.setName("client-transaction-msg-check-thread");
                            return thread;
                        }
                    });
            producer.setNamesrvAddr(producerConfigure.getNamesrvAddr());
            producer.setExecutorService(executorService);
            producer.start();
            log.info("TransactionMQProducer server开启成功---------------------------------.");
            return producer;
        }
    

    因为transaction的流程下,rocketmq会先发送一个consumer不可见的消息,然后在调用
    TransactionListener这个接口中的executeLocalTransaction,中的方法执行事务,然后方法内部需要返回
    一个LocalTransactionState的枚举信息,分别为

    public enum LocalTransactionState {
        COMMIT_MESSAGE, // 提交
        ROLLBACK_MESSAGE, // 回滚
        UNKNOW, // 未知
    }
    

    相应的当我们返回的是COMMIT_MESSAGE时,那么producer会把消息提交到mq上,
    如果是ROLLBACK_MESSAGE那么producer就会结束,并且不提交到mq,

    public interface TransactionListener {
        /**
         * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
         *
         * @param msg Half(prepare) message
         * @param arg Custom business parameter
         * @return Transaction state
         */
        LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
    
        /**
         * When no response to prepare(half) message. broker will send check message to check the transaction status, and this
         * method will be invoked to get local transaction status.
         *
         * @param msg Check message
         * @return Transaction state
         */
        LocalTransactionState checkLocalTransaction(final MessageExt msg);
    }
    

    需要注意的是checkLocalTransaction是用作mq长时间没有收到producer的executeLocalTransaction响应的时候调用的,这个类在3.0之后的版本就被阉割了,只有接口,却没有实现,那么直接写一个空实现即可,在我这边的代码上,我做了一个抽象,把需要实现的executeLocalTransaction抽象出来

    @Configuration
    public abstract class AbstractTransactionListener implements TransactionListener {
    
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    
    }
    

    这个是executeLocalTransaction的实现类,简单的做了些业务,然后返回了一个commit

    @Configuration
    @Log4j2
    public class TestTransactionListener extends AbstractTransactionListener {
    
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                    log.info(new String(msg.getBody()));
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    
    }
    

    consumer是没有变化的,基本相同,那么就直接贴上controller的测试代码

        
        @GetMapping("t_test")
        public void Ttest(String info) throws Exception {
            Message message = new Message("t_TopicTest", "Tag1", "12345", "rocketmq测试成功".getBytes());
            producer.setTransactionListener(testTransactionListener);
            producer.setSendMsgTimeout(10000);
            producer.sendMessageInTransaction(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    log.info("传输成功");
                    log.info(GsonUtil.GSON.toJson(sendResult));
                }
                @Override
                public void onException(Throwable e) {
                    log.error("传输失败", e);
                }
            });
        }
    

    跑一跑即可看到结果参数

    相关文章

      网友评论

        本文标题:springboot2 整合rocketMQ4.3

        本文链接:https://www.haomeiwen.com/subject/iftmnftx.html