美文网首页软件工程师成长日记
Maven项目Spring整合RocketMQ消费者实现

Maven项目Spring整合RocketMQ消费者实现

作者: 麦克劳林 | 来源:发表于2019-04-15 09:54 被阅读0次

    前言

    RocketMQ是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给Apache基金会,已经于2016年11月成为 Apache 孵化项目,相信RocketMQ的未来会发挥着越来越大的作用,将有更多的开发者因此受益。
    本文仅对RocketMQ的简单实用做入门性介绍,不对RocketMQ的底层原理进行深入介绍,后续文章将对RocketMQ的原理做详细介绍。

    1、RocketMQ的Maven依赖,pom.xml中引入jar包:

    <!-- RocketMQ --> 
    <dependency>
        <groupId>com.alibaba.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>3.5.8</version>
    </dependency>
    <dependency>
       <groupId>com.alibaba.rocketmq</groupId>
       <artifactId>rocketmq-all</artifactId>
       <version>3.5.8</version>
       <type>pom</type>
      </dependency>
    

    2、Spring bean 配置单例

    <!-- 监听实现 -->
    <bean id="rocketMqListener" class="com.rocketMq.service.impl.RocketMqListenerImpl"></bean>
    <!-- 监听配置 -->
    <bean id="consumer" class="com.rocketMq.utils.Consumer" init-method="init">
        <property name="consumerGroup" value="PushConsumer"/>
        <property name="namesrvAddr" value="192.168.0.1:19876"/>
        <property name="topic" value="node_topic"/>
        <property name="subExpression" value="slu"/>
        <property name="rocketMqListener" ref="rocketMqListener"/>
    </bean>  
    

    3、Java源码

    1)、RocketMqListener.java

    package com.rocketMq.dao;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.common.message.MessageExt;
     /**
    * 消费者监听接口,业务需要实现此接口并配置到Consumer中
    * Created by 麦克劳林
    */
    public interface RocketMqListener {
    boolean RocketMqMessage(MessageExt ext,ConsumeConcurrentlyContext Context);
    }
    

    2)、

    package com.rocketMq.dao;
    import java.util.List;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.common.message.MessageExt;
    /**
     * 监听wrapper,用于处理共通业务,并转发消息到业务监听中。
     * Created by 麦克劳林
     */
    public class RocketMqWrapper implements MessageListenerConcurrently{
    private RocketMqListener rocketMqListener;
    public RocketMqListener getRocketMqMessageListener() {
        return rocketMqListener;
    }
    public void setRocketMqListener(RocketMqListener rocketMqListener) {
        this.rocketMqListener = rocketMqListener;
    }
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messagesList,ConsumeConcurrentlyContext consumeConcurrentlyContext) {  
            if(ext.getTags().equals("RFID")){
                if(rocketMqListener.RFIDMessage(ext, consumeConcurrentlyContext)){
                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } else{
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }   
        }
    }
    

    }

    相关文章

      网友评论

        本文标题:Maven项目Spring整合RocketMQ消费者实现

        本文链接:https://www.haomeiwen.com/subject/fwizzxtx.html