美文网首页
Spring 整合RabbitMQ实例

Spring 整合RabbitMQ实例

作者: 浪客行1213 | 来源:发表于2020-11-19 17:39 被阅读0次

    了解RabbitMQ

    RabbitMQ使用场景

    Spring 整合RabbitMQ

    配置maven : pom.xml
    <dependency>
          <groupId>com.rabbitmq</groupId>
          <artifactId>amqp-client</artifactId>
          <version>3.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.4.5.RELEASE</version>
        </dependency>
    
    1.配置连接信息 : rabbit.properties 文件
    //连接信息
    rabbit.vhost=/test  //虚拟机
    rabbit.addresses=10.166.27.51:5672//服务器 端口
    rabbit.username=test//用户名
    rabbit.password=test//密码
    channel.cache.size=50
    //exchange 交换机配置名称
    exchange.direct=test//交换机名
    //配置队列名
    queue.sync_nc=newretail.queue.sync_nc
    queue.sync_nc_error=newretail.queue.sync_nc_error
    queue.reply=queue.reply
    //队列交换机的路由键
    route.sync_nc=route.queue.sync_nc
    route.reply=route.reply
    route.sync_nc_error=route.queue.sync_nc_error
    
    2. 依赖注入 : spring-rabbit.xml
    rabbit 命名空间包含了多个元素,帮助我们声明队列、Exchange 以及将它们结合在一起的 binding
    
    元素 作用
    <queue> 创建一个队列
    <fanout-exchange> 创建一个 fanout 类型的 Exchange
    <header-exchange> 创建一个 header 类型的 Exchange
    <topic-exchange> 创建一个 topic 类型的 Exchange
    <direct-exchange> 创建一个 direct 类型的 Exchange
    <bindings><binding></bindings> 元素定义一个或多个元素的集合。元素创建 Exchange 和队列之间的 binding
    这些配置元素要与 <admin> 元素一起使用。
    <admin> 元素会创建一个 RabbitMQ 管理组件(administrative component),
    它会自动创建 (如果它们在 RabbitMQ 代理中尚未存在的话)上述这些元素所声明的队列、Exchange 以及 binding。
    
    <?xml version="1.0" encoding="UTF-8" ?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
        xmlns:rabbit="http://www.springframework.org/schema/rabbit"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/rabbit
            http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    
          <!--引用ExecutorService或ThreadPoolTask​​Executor(由<task:executor />元素定义)。创建连接时传递给Rabbit库。如果未提供,Rabbit库当前使用具有5个线程的固定线程池ExecutorService-->
            <task:executor id="poolTaskExecutor"
                       pool-size="10"
                       queue-capacity="10"
                       keep-alive="20"
                       rejection-policy="DISCARD_OLDEST"/>
    
        <!-- RabbitMQ 连接工厂公共配置 连接服务-->
        <rabbit:connection-factory id="rabbitConnectionFactory"
            addresses="${rabbit.addresses}" 
            virtual-host="${rabbit.vhost}"
            username="${rabbit.username}" 
            password="${rabbit.password}"
    <!--上面配置的线程池 一般不需配置默认 即可-->
                    executor="poolTaskExecutor"
    <!-- channel-cache-size,channel的缓存数量,默认值为25-->
            channel-cache-size="${channel.cache.size}" 
    <!-- 设置此属性配置可以确保消息成功发送到交换器-->
            publisher-confirms="true"
    <!-- 可以确保消息在未被队列接收时返回-->
            publisher-returns="true"    
                    <!-- cache-mode 一般不需要配置,缓存连接模式,
    默认值为CHANNEL(单个connection连接,连接之后关闭,自动销毁) -->
                    cache-mode="CHANNEL" />
    <!-- <admin> 元素会创建一个 RabbitMQ 管理组件(administrative component)-->
        <rabbit:admin connection-factory="rabbitConnectionFactory" />
        <!-- 队列配置 -->
    <!--定义消息队列,durable:是否持久化,
    如果想在RabbitMQ退出或崩溃的时候,不会失去所有的queue和消息,
    需要同时标志队列(queue)和交换机(exchange)是持久化的,即rabbit:queue标签和rabbit:direct-exchange中的durable=true,
    而消息(message)默认是持久化的可以看类org.springframework.amqp.core.MessageProperties中的属性
    public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
    exclusive: 仅创建者可以使用的私有队列,断开后自动删除;
    auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 -->
        <rabbit:queue name="${queue.sync_nc}" durable="true" auto-delete="false" exclusive="false">
            <rabbit:queue-arguments>
                <entry key="x-max-priority">
                    <value type="java.lang.Integer">10</value> 
    <!-- 设定队列支持的最大优先级:rabbit3.5以上支持,3.5以下 需要安装插件 -->
                </entry>  
            </rabbit:queue-arguments>
        </rabbit:queue>
    <rabbit:queue name="${queue.reply}" durable="true" auto-delete="false" exclusive="false" />
        <!-- 交换机配置 -->
    <!--绑定队列,rabbitmq的exchangeType常用的三种模式:direct,fanout,topic三种,
    我们用direct模式,即rabbit:direct-exchange标签,
    Direct交换器很简单,如果是Direct类型,就会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。有一个需要注意的地方:如果找不到指定的exchange,就会报错。但routing key找不到的话,不会报错,这条消息会直接丢失,所以此处要小心,
    auto-delete:自动删除,如果为Yes,则该交换机所有队列queue删除后,自动删除交换机,默认为false -->
        <rabbit:direct-exchange name="${exchange.direct}" durable="true" auto-delete="false">
            <rabbit:bindings>
                <rabbit:binding queue="${queue.sync_nc}" key="${route.sync_nc}" />
                    <rabbit:binding queue="${queue.reply}" key="${route.reply}" />
            </rabbit:bindings>
        </rabbit:direct-exchange>
        <!-- 生产者配置 -->
          <!--实例化两个ben处理消息到达交换机,和消息进入队列的节点-->
        <bean id="messageConfirm" class="com.sjky.platform.common.rabbit.MessageConfirm" />
        <bean id="messageReturn" class="com.sjky.platform.common.rabbit.MessageReturn" />
            <!--spring 为amqp默认的是jackson的一个插件生产者生产的数据转换为json存入消息队列-->
        <bean id="messageConverter"
               class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter">
        </bean>
        <rabbit:template id="rabbitTemplate"
            connection-factory="rabbitConnectionFactory" //连接工厂
            message-converter="messageConverter"
            exchange="${exchange.direct}" 
            reply-timeout="2000"   //发送和接收操作的超时时间(以毫秒为单位)。默认值为5000(5秒)
            retry-template="retryTemplate" 
            mandatory="true"
    <!--引用上面实体,消息到达交换机时被调用-->
            confirm-callback="messageConfirm"
    <!--应用上面定义的实体,消息无法到达队列是使用-->
            return-callback="messageReturn"
        />
        <!-- retryTemplate为连接失败时的重试模板 -->
        <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
            <property name="backOffPolicy">
                <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
                    <property name="initialInterval" value="5000" />
                    <property name="multiplier" value="10.0" />
                    <property name="maxInterval" value="10000" />
                </bean>
            </property>
            <property name="retryPolicy">
                <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
                    <property name="maxAttempts" value="3"/>
                </bean>
            </property>
        </bean>
        <!--在任何Spring管理的对象上启用对@RabbitListener批注的检测-->
        <rabbit:annotation-driven />
    <!-- 消费者配置AUTO方式 -->
        <bean id="rabbitListenerContainerFactory"
            class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
            <property name="messageConverter" ref="messageConverter" />
            <property name="connectionFactory" ref="rabbitConnectionFactory" />
            <property name="concurrentConsumers" value="9" />
            <property name="maxConcurrentConsumers" value="50" />
            <property name="taskExecutor" ref="taskExecutor" />
            <property name="prefetchCount" value="3" /> <!-- 每次1个 -->
        <!--选项: NONE,MANUAL,AUTO 默认:AUTO,当为MANUAL时必须调用Channel.basicAck()来手动应答所有消息 -->
            <property name="acknowledgeMode" value="AUTO" />
            <property name="errorHandler" ref="mqErrorHandler" /><!-- 引用注册的实体,处理未捕获异常 -->
        </bean>
          <!--处理未捕获异常-->
        <bean id="mqErrorHandler" class="com.sjky.platform.common.rabbit.MQErrorHandler" />
        <!-- 消费者配置MANUAL方式 -->
        <!-- <bean id="rabbitListenerContainerFactory"
            class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
            <property name="messageConverter" ref="messageConverter" />
            <property name="connectionFactory" ref="rabbitConnectionFactory" />
            <property name="concurrentConsumers" value="1" />
            <property name="maxConcurrentConsumers" value="100" />
            <property name="taskExecutor" ref="taskExecutor" />
            <property name="prefetchCount" value="1" /> 
            选项: NONE,MANUAL,AUTO 默认:AUTO,当为MANUAL时必须调用Channel.basicAck()来手动应答所有消息
            <property name="acknowledgeMode" value="MANUAL" />
            <property name="errorHandler" ref="mqErrorHandler" /> 处理未捕获异常
            <property name="adviceChain" ref="retryOperationsInterceptorFactoryBean" />
        </bean> 
        <bean id="mqMessageRecover" class="com.sjky.platform.common.rabbit.MQMsgRecover"/>
        -->
        <!-- 实现异常事件处理逻辑 -->
        <!-- <bean id="retryOperationsInterceptorFactoryBean"
            class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean">
            <property name="messageRecoverer" ref="mqMessageRecover" />配置消息恢复者
            <property name="retryOperations" ref="retryTemplate" />配置重试模板
        </bean> -->
    </beans>
    
    3. 注入时需要的Rabbit工具类
    /**
     * 消息到达交换机时被调用
     * ack=true 成功 ,ack=false 失败
    */
    
    public class MessageConfirm implements ConfirmCallback {
    
        private static final Logger logger = LoggerFactory.getLogger(MessageConfirm.class);
        
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    
            if (!ack) {
                logger.error("消息无法到达交换机[ack: " + ack + ",correlationData: " + correlationData + ",cause : " + cause+"].");
            }else {
                logger.info("消息到达交换机[ack: " + ack + ",correlationData: " + correlationData + ",cause : " + cause+"].");
            }
        }
    
    }
    
    
    /**
     * Exchange无法将消息路由到任何队列时会被调用
    */
    public class MessageReturn implements ReturnCallback {
        
        private static final Logger logger = LoggerFactory.getLogger(MessageReturn.class);
        
        @Autowired
        private RabbitTemplate rabbitTemplate;
        
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            //将消息发送到错误队列
            RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(rabbitTemplate,RabbitConfig.getDirectExchange(), RabbitConfig.getErrorRoute());
            Exception cause = new Exception(new Exception("route fail and republish"));
            recoverer.recover(message,cause);
            logger.error("消息无法到达队列[Returned Message: " + replyText + ",code: " + replyCode + ",exchange: " + exchange + ",routingKey :" + routingKey+"].");
        }
    
    }
    
    /**
     * 消费时  消费失败 抛出异常时 调用
    */
    public class MQErrorHandler implements ErrorHandler {
        
        private static final Logger logger = LoggerFactory.getLogger(MQErrorHandler.class);
    
        @Override
        public void handleError(Throwable cause) {
            logger.error("一个错误发生了:", cause);
        }
        
    }
    
    /**
     * acknowledgeMode=MANUL时的队列监听出现异常时消息恢复到队列
     * 注:必须调用channel.basicAck()确认回执
     *  <!-- 消费者配置示例 -->
        <bean id="rabbitListenerContainerFactory"
            class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
            <property name="messageConverter" ref="messageConverter" />
            <property name="connectionFactory" ref="rabbitConnectionFactory" />
            <property name="concurrentConsumers" value="1" />
            <property name="maxConcurrentConsumers" value="100" />
            <property name="taskExecutor" ref="taskExecutor" />
            <property name="prefetchCount" value="1" /> 
            <!--选项: NONE,MANUAL,AUTO 默认:AUTO,当为MANUAL时必须调用Channel.basicAck()来手动应答所有消息 -->
            <property name="acknowledgeMode" value="MANUAL" />
            <property name="errorHandler" ref="mqErrorHandler" /> <!-- 处理未捕获异常 -->
            <property name="adviceChain" ref="retryOperationsInterceptorFactoryBean" />
        </bean>
        <bean id="mqErrorHandler" class="com.sjky.platform.common.rabbit.MQErrorHandler" />
        <bean id="mqMessageRecover" class="com.sjky.platform.common.rabbit.MQMsgRecover"/>
        <!-- 实现异常事件处理逻辑 -->
        <bean id="retryOperationsInterceptorFactoryBean"
            class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean">
            <property name="messageRecoverer" ref="mqMessageRecover" />
            <property name="retryOperations" ref="retryTemplate" />
        </bean>
     *
     */
    public class MQMessageRecover implements MessageRecoverer {
    
        private static final Logger logger = LoggerFactory.getLogger(MQMessageRecover.class);
        
        @Autowired
        private RabbitTemplate rabbitTemplate;
        @Autowired
        private Jackson2JsonMessageConverter msgConverter;
        
        @Override
        public void recover(Message message, Throwable cause) {
            String data=msgConverter.fromMessage(message).toString();
            MessageProperties messageProperties=message.getMessageProperties();
            Map<String, Object> headers = messageProperties.getHeaders();
            headers.put("x-exception-stacktrace", getStackTraceAsString(cause));
            headers.put("x-exception-message", cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage());
            headers.put("x-original-exchange", message.getMessageProperties().getReceivedExchange());
            headers.put("x-original-routingKey", message.getMessageProperties().getReceivedRoutingKey());
            messageProperties.setReceivedDeliveryMode(MessageDeliveryMode.PERSISTENT);
            //重新将数据放回队列中
            rabbitTemplate.send(messageProperties.getReceivedExchange(), messageProperties.getReceivedRoutingKey(), message);
            logger.error("处理消息(" + data + ") 错误, 重新发布去队列.", cause);
        }
        
        private String getStackTraceAsString(Throwable cause) {
            StringWriter stringWriter = new StringWriter();
            PrintWriter printWriter = new PrintWriter(stringWriter, true);
            cause.printStackTrace(printWriter);
            return stringWriter.getBuffer().toString();
        }
    
    4. 将消息添加到队列
    /**
     * @description: 添加请求到队列
     * @author: xhh
     * @create: 2020-10-22 14:24
     **/
    @Controller
    @RequestMapping("/nc/***/")
    public class RabbirController extends BaseController {
    
        @PostMapping("addOrderToQueue")
        @ResponseBody
        public ResultData addOrderToQueue(@RequestParam String orderCode, HttpServletRequest request){
            ResultData resultData = new ResultData();
            resultData.setResult(true);
            resultData.setMessage("加入队列成功!");
            logger.info("=================系统请求将订单code:{}放入队列=====================",orderCode);
            try {
                //判断订单是否加入过队列
                String userCode = WebUtils.getLoginUserCode(request);
                HashMap<String, String> stringStringHashMap = new HashMap<>(2);
                stringStringHashMap.put("code",orderCode);
                stringStringHashMap.put("createUserCode",userCode);
                //将订单号发送到指定交换机的队列 (交换机,陆游键,消息内容)
    rabbitTemplate.convertAndSend(RabbitConfig.getDirectExchange(),RabbitConfig.getSyncNcRoute(),stringStringHashMap);
              } catch (Exception e) {
                resultData.setResult(false);
                resultData.setMessage(e.getMessage());
                e.printStackTrace();
            }
            return resultData;
        }
    }
    
    5. 监听 消费队列
    /**
     * @description: 监听同步nc的队列
     * @author: xhh
     * @create: 2020-10-23 16:36
     **/
    @Service
    public class ToNcQueueListener {
        private final Logger logger = LoggerFactory.getLogger(this.getClass());
    
    
        /*****
         * @Description: 监听同步NC的队列
         * @Param: [message]
         * @return: void
         * @Author: xh
         * @Date: 2020/10/27 16:32
         */
        @RabbitListener(queues = "${queue.sync_nc}")//此注解 监听的队列名称
        public void processMsg(Message message) throws Exception {
            String data = new String(message.getBody());
            JSONObject jsonObject = JSONObject.parseObject(data);
            //订单号
            String orderCode = jsonObject.getString("code");
            //登录员工code
            String userCode = jsonObject.getString("createUserCode");
            logger.info("======================监听到订单信息:{}=============================", data);
        }
    }
    

    浪客行1213的简书


    xhh

    相关文章

      网友评论

          本文标题:Spring 整合RabbitMQ实例

          本文链接:https://www.haomeiwen.com/subject/skxpmktx.html