美文网首页我爱编程
RocketMQ简单封装

RocketMQ简单封装

作者: July_lt | 来源:发表于2018-05-23 10:42 被阅读0次

    common-rocketmq介绍

    该项目对rocketmq在实际项目开发中做了简单的封装,使用简单,common-rocketmq是基于JDK8的,
    github地址:https://github.com/xinyi-lt/common-rocketmq
    下面是common-rocketmq结构:

    包含5个类和一个XML配置文件

    rocketmq---------common-------MessageData(消息传输DTO)
                |
                |---producer----------RocketMQProducer(生产者接口)
                |               |
                |               |---RocketMQProducerImpl(生产者接口实现)
                |---consumer------RocketMQConsumer(消费者)
                                |
                                |---ConsumerService(消费者业务逻辑需要实现的接口)
    common-rocketmq.xml
    

    使用举例

    demo的github地址:https://github.com/xinyi-lt/common-rocketmq-demo

    1.maven依赖

    <dependency>
        <groupId>com.sxzq.lt</groupId>
        <artifactId>common-rocketmq</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
    

    2.producer生产者

    • 配置文件引入
        <import resource="classpath*:conf/common-rocketmq.xml"/>
    
    • 代码中注入producer
            RocketMQProducer producer = applicationContext.getBean(RocketMQProducer.class);
    
    • 发送消息
            //消息传输DTO
            MessageData<UserInfo> messageData = new MessageData<>();
    
            //时间戳
            messageData.setTimestamp(System.currentTimeMillis());
            //幂等控制根据UUID来控制
            messageData.setUuid(UUID.randomUUID().toString());
    
            UserInfo userInfo = new UserInfo();
            //useinfo数据
    
            messageData.setData(userInfo);
    
            logger.info("start to send message data:{}", JSON.toJSONString(messageData));
    
            //发送消息
            SendResult result = producer.sendMessage(
                    new Message(MQConstant.MQ_TOPIC_PRODUCER_DEMO,
                    MQConstant.MQ_TAG_PRODUCER_DEMO_FIRST,
                    UUID.randomUUID().toString(),
                    JSON.toJSONString(messageData).getBytes(Charset.forName("utf-8"))));
    
            logger.info("send message complete result:{} ", result);
    
    • 发送延时消息

            //消息
              Message msg = new Message(MQConstant.MQ_TOPIC_PRODUCER_DEMO,
                      MQConstant.MQ_TAG_PRODUCER_DEMO_FIRST,
                      msgKey,
                      JSON.toJSONString(messageData).getBytes(Charset.forName("utf-8")));
      
              //设置延迟消息级别
              // messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
              msg.setDelayTimeLevel(4);
      
              //发送消息
              SendResult result = producer.sendMessage(msg);
      

    3.consumer消费者

    • 配置文件引入

       <import resource="classpath*:conf/common-rocketmq.xml"/>
      
    • 实现common-rocketmq中的ConsumerService的接口
            public class DemoConsumerService implements ConsumerService {
               @Override
                public boolean consume(Message message) {
       
                      //消费者业务逻辑
                      return consumeResult;
                }
            }
    
    • DemoConsumerService添加到spring容器进行管理
           <bean id="demoConsumerService" class="com.sxzq.lt.demo.consumer.DemoConsumerService"/>
    
    • 创建该业务实例的消费者
             <bean id="demoConsumer" class="com.sxzq.lt.rocketmq.consumer.RocketMQConsumer"
                  init-method="start" destroy-method="shutdown">  
                     <!--consumerGroup -->
                     <constructor-arg value="MQ_CONSUMER_GROUP_DEMO_1" index="0"></constructor-arg>
                     <!-- topic -->
                     <constructor-arg value="MQ_TOPIC_PRODUCER_DEMO" index="1"></constructor-arg>
                     <!-- topic's subExpression -->
                     <constructor-arg value="MQ_TAG_PRODUCER_DEMO_1" index="2"></constructor-arg>
                     <property name="namesrvAddr" value="127.0.0.1:9876"></property>
                     <property name="consumeFromWhere" ref="CONSUME_FROM_FIRST_OFFSET"/>
                     <!--消费者业务逻辑实现-->
                     <property name="consumerHandler" ref="demoConsumerService"/>
                     <!--集群模式,默认为集群消费,不需要配置-->
                     <property name="messageModel" ref="CLUSTERING"/>
            </bean>
    

    相关文章

      网友评论

        本文标题:RocketMQ简单封装

        本文链接:https://www.haomeiwen.com/subject/nyzljftx.html