美文网首页
2021-10-11_如何通过rabbitmq实现前后台的多系统

2021-10-11_如何通过rabbitmq实现前后台的多系统

作者: kikop | 来源:发表于2021-10-13 22:37 被阅读0次

    20211011_如何通过rabbitmq实现前后台的多系统实时通信

    1概述

    本文基于SpringBoot(v2.2.1)介绍如何通过rabbitmq(v3.7.8)实现前后台的多系统实时通信,具体实现思路有2种方式,本节主要实现1.1.2。系统架构如下:

    image-20211013223446871.png

    项目结构:

    image-20211013220457366.png

    涉及的知识点如下:

    1. rabbitmq的生产投递
    2. rabbitmq消费手动确认应答及消费限流。
    3. rabbitmq的数据json序列化。
    4. aqs互斥锁及条件变量(Lock+Condition)的运用。
    5. 前端Web系统、后台系统A、后台系统B、rabbitmq的多系统的实时通信。

    1.1实现思路

    1.1.1rabbitmq整合websocket(未实现)

    1. 后台系统A,开启webSocket服务监听。
    2. 后台系统A开启对rabbitmq结果任务队列的消费监听。
    3. 前端Web系统作为websocket的客户端,连接webSocket服务(webSocket服务缓存所有连接的客户端),并进行消息订阅
    4. 前端Web系统发起请求到后台系统A后一直处于等待状态,后台系统A将请求发给rabbitmq服务中的请求任务队列。
    5. 后台系统B消费rabbitmq请求队列中的任务,消费完成后,将结果推送到rabbitmq中的结果任务队列。
    6. 后台系统A将监听到的结果,进行反推请求发起者,最终将结果通过webSocket推送到前端Web系统。

    1.1.2rabbitmq整合aqs

    1. 后台系统A开启对rabbitmq结果任务队列的消费监听。
    2. 前端Web系统作为websocket的客户端,连接webSocket服务(webSocket服务缓存所有连接的客户端),并进行消息订阅
    3. 前端Web系统发起请求到后台系统A后一直处于等待状态,后台系统A以线程的方式将请求发给rabbitmq服务中的请求任务队列,同时进行future.get()同步阻塞等待,线程中通过Lock+condition实现条件等待await。
    4. 后台系统B消费rabbitmq请求队列中的任务,消费完成后,将结果推送到rabbitmq中的结果任务队列。
    5. 后台系统A将监听到的结果,进行反推请求发起者,唤醒指定的condition,从而将结束同步阻塞中的任务。

    1.2时序错误场景分析

    // lock对应以个condition可能引起的唤醒时序错误场景
    1.前端Web系统发起请求任务1、前端Web系统发起请求任务2
    2.rabbitmq请求任务队列:
    请求任务1、请求任务2
    2.处理完成后,结果任务队列可能的情况:
    请求任务2、请求任务1
    3.rabbitmq消费顺序:
    消费任务2-->signal2WaitQueueBySequenceTo1(错误)
    消费任务1-->signal2WaitQueueBySequenceTo2(错误)
    

    2代码实现

    2.1配置

    2.1.1yml配置

    spring:
     profiles:
      active: dev
    
    # 配置 RabbitMQ的基本信息
     rabbitmq:
      host: 127.0.0.1
      port: 5672
      username: guest
      password: guest
      virtual-host: /
    #  生产投递机制(1.事务、2.Confirm、3.异步监听Return)
      # 开启Exchange消息发送确认功能
      publisher-confirm-type: correlated
      # 开启Queue失败退回功能
      publisher-returns: true
      listener:
        type: direct
        direct:
    #    手动确认
          acknowledge-mode: manual
    #      basicQos
          prefetch: 1
    #      消息拒绝是否重写入队
          default-requeue-rejected: true
    #      重试配置
          retry:
            enabled: true
            max-attempts: 3
    
    server.port=8085
    server.servlet.context-path=/myrabbitwebrequest
    

    2.1.2生产端配置

    package com.kikop.config;
    
    import com.kikop.ConstRabbit;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * @author kikop
     * @version 1.0
     * @project Name: myrabbitwebrequest
     * @file Name: RabbitProducerConfig
     * @desc mq生产端
     * @date 2021/10/10
     * @time 8:00
     * @by IDE: IntelliJ IDEA
     */
    @Configuration
    public class RabbitProducerConfig implements InitializingBean {
    
    
        @Autowired
        public RabbitTemplate rabbitTemplate;
    
    
        /**
         * 设置一个简单的队列2
         */
        @Bean(name = ConstRabbit.QUEUE_WEBREQUEST_DIRECT)
        public Queue queue2() {
    
            /*
             * 参数1:队列名称
             * 参数2:是否定义持久化队列
             * 参数3:是否独占本次连接
             * 参数4:是否在不使用的时候自动删除队列,一直保留
             * 参数5:队列其它参数
             */
            return new Queue(ConstRabbit.QUEUE_WEBREQUEST_DIRECT,
                    true, false, false, null);
        }
    
    
        /**
         * 设置一个简单的队列2
         */
        @Bean(name = ConstRabbit.EXCHANGE_WEBREQUEST_DIRECT)
        public Exchange exchange2() {
    
            return ExchangeBuilder.topicExchange(ConstRabbit.EXCHANGE_WEBREQUEST_DIRECT).durable(true).build();
        }
    
    
        /*
            1. 知道哪个队列
            2. 知道哪个交换机
            3. routing key
         */
        @Bean
        public Binding bindQueueExchange2(@Qualifier(ConstRabbit.QUEUE_WEBREQUEST_DIRECT) Queue queue,
                                          @Qualifier(ConstRabbit.EXCHANGE_WEBREQUEST_DIRECT) Exchange exchange) {
            // import org.springframework.amqp.core.Exchange;
            return BindingBuilder.bind(queue).to(exchange).with(ConstRabbit.ROUTINGKEY_WEBREQUEST_DIRECT).noargs();
        }
    
    
        /**
         * 设置一个简单的队列2
         */
        @Bean(name = ConstRabbit.QUEUE_WEBREQUEST_DIRECT_RESULT)
        public Queue queue3() {
    
            /*
             * 参数1:队列名称
             * 参数2:是否定义持久化队列
             * 参数3:是否独占本次连接
             * 参数4:是否在不使用的时候自动删除队列,一直保留
             * 参数5:队列其它参数
             */
            return new Queue(ConstRabbit.QUEUE_WEBREQUEST_DIRECT_RESULT,
                    true, false, false, null);
        }
    
    
        /**
         * 设置一个简单的队列2
         */
        @Bean(name = ConstRabbit.EXCHANGE_WEBREQUEST_DIRECT_RESULT)
        public Exchange exchange3() {
    
            return ExchangeBuilder.topicExchange(ConstRabbit.EXCHANGE_WEBREQUEST_DIRECT_RESULT).durable(true).build();
        }
    
        /*
            1. 知道哪个队列
            2. 知道哪个交换机
            3. routing key
         */
        @Bean
        public Binding bindQueueExchange3(@Qualifier(ConstRabbit.QUEUE_WEBREQUEST_DIRECT_RESULT) Queue queue,
                                          @Qualifier(ConstRabbit.EXCHANGE_WEBREQUEST_DIRECT_RESULT) Exchange exchange) {
            // import org.springframework.amqp.core.Exchange;
            return BindingBuilder.bind(queue).to(exchange).with(ConstRabbit.ROUTINGKEY_WEBREQUEST_DIRECT_RESULT)
                    .noargs();
        }
    
    
    
    
    //    @Bean(name = "myMqCondition")
    //    public Condition myMqCondition(@Qualifier("myMqLock") ReentrantLock reentrantLock) {
    //        Condition condition = reentrantLock.newCondition();
    //        return condition;
    //    }
    
    
        @Bean
        public Jackson2JsonMessageConverter converter() {
            return new Jackson2JsonMessageConverter();
        }
    
    
        @Override
        public void afterPropertiesSet() throws Exception {
            // 生产端序列化
            rabbitTemplate.setMessageConverter(converter());
        }
    }
    

    2.1.3消费端配置

    package com.kikop.config;
    
    import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.messaging.converter.MappingJackson2MessageConverter;
    import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
    
    /**
     * @author kikop
     * @version 1.0
     * @project Name: myrabbitwebrequest
     * @file Name: RabbitConfig
     * @desc mq消费端
     * @date 2021/10/10
     * @time 8:00
     * @by IDE: IntelliJ IDEA
     */
    @Configuration
    public class RabbitConsumerConfig implements RabbitListenerConfigurer {
    
    
        // 注意,引入包的类型
        // org.springframework.amqp.rabbit.connection
    //    connectionFactory instance CachingConnectionFactory
        @Autowired
        public ConnectionFactory connectionFactory;
    
    
        @Bean
        public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
    
            DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
            // 这里的转换器设置实现了 通过 @Payload 注解 自动反序列化message body
            factory.setMessageConverter(new MappingJackson2MessageConverter());
            return factory;
        }
    
        @Override
        public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
            registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
        }
    
    }
    

    2.1.4独占锁配置

    package com.kikop.config;
    
    import com.kikop.ConstRabbit;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * @author kikop
     * @version 1.0
     * @project Name: myrabbitwebrequest
     * @file Name: ReentrantLockConfig
     * @desc mq生产端
     * @date 2021/10/10
     * @time 8:00
     * @by IDE: IntelliJ IDEA
     */
    @Configuration
    public class ReentrantLockConfig  {
    
        /**
         * 向SpringIoc容器中注入可重入锁
         * 一个任务一个条件对象Condition,每个Condition只关联一个等待节点(是不是很浪费,你们说呢)
         * 主要为了解决:
         * 生产、消费的数据不一致性,请求响应数据错乱问题
         * @return
         */
        @Bean(name = "myMqLock")
        public ReentrantLock myMqLock() {
            ReentrantLock myMqLock = new ReentrantLock();
            return myMqLock;
        }
    
    }
    

    2.1.4常量配置

    package com.kikop;
    
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.locks.Condition;
    
    /**
     * @author kikop
     * @version 1.0
     * @project Name: myrabbitwebrequest
     * @file Name: ConstRabbit
     * @desc 配置类
     * @date 2021/10/10
     * @time 16:59
     * @by IDE: IntelliJ IDEA
     */
    public class ConstRabbit {
    
        // 正常交换机
        public static final String EXCHANGE_WEBREQUEST_DIRECT = "ex_webrequest";
        // 正常路由
        public static final String ROUTINGKEY_WEBREQUEST_DIRECT = "rk_webrequest";
        // 正常队列
        public static final String QUEUE_WEBREQUEST_DIRECT = "queue_webrequest";
    
        // 正常交换机
        public static final String EXCHANGE_WEBREQUEST_DIRECT_RESULT = "ex_webrequest_result";
        // 正常路由
        public static final String ROUTINGKEY_WEBREQUEST_DIRECT_RESULT = "rk_webrequest_result";
        // 正常队列
        public static final String QUEUE_WEBREQUEST_DIRECT_RESULT = "queue_webrequest_result";
    
        /**
         * 缓存所有的条件对象
         * 一个任务一个条件对象Condition,每个Condition只关联一个等待节点(是不是很浪费,你们说呢)
         * 主要为了解决:
         * 生产、消费的数据不一致性,请求响应数据错乱问题
         */
        public static final ConcurrentHashMap<String, Condition> conditionCache = new ConcurrentHashMap<String, Condition>();
    }
    

    2.2消息请求体

    package com.kikop.model;
    
    
    /**
     * @author kikop
     * @version 1.0
     * @project Name: myrabbitwebrequest
     * @file Name: MqRequest
     * @desc mq消息请求体(8:00,13:30,19:30,21:00)
     * @date 2021/10/13
     * @time 8:00
     * @by IDE: IntelliJ IDEA
     */
    public class MqRequest {
        public String reqId;
        public String reqInfo;
    
        public String getReqId() {
            return reqId;
        }
    
        public void setReqId(String reqId) {
            this.reqId = reqId;
        }
    
        public String getReqInfo() {
            return reqInfo;
        }
    
        public void setReqInfo(String reqInfo) {
            this.reqInfo = reqInfo;
        }
    }
    

    2.3web层

    package com.kikop.controller;
    
    
    import com.alibaba.fastjson.JSONObject;
    import com.kikop.ConstRabbit;
    import com.kikop.handler.MyMqRequestTask;
    import com.kikop.model.MqRequest;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.*;
    
    import javax.annotation.PreDestroy;
    import java.util.concurrent.*;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    
    /**
     * @author kikop
     * @version 1.0
     * @project Name: myrabbitwebrequest
     * @file Name: OrderPayController
     * @desc
     * @date 2021/10/10
     * @time 8:00
     * @by IDE: IntelliJ IDEA
     */
    @RestController
    @RequestMapping("/orderpay")
    public class OrderPayController {
    
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
    
        @Autowired
        private ReentrantLock myMqLock;
    
    
        private ExecutorService executorService = Executors.newFixedThreadPool(10);
    
        /**
         * communicateWithMq
         *
         * @return
         */
        @RequestMapping(value = "communicateWithMq", method = {RequestMethod.GET, RequestMethod.POST})
        @ResponseBody
        public JSONObject communicateWithMq(String task_uuid) {
    
            // http://localhost:8085/myrabbitwebrequest/aform/communicateWithMq?task_uuid=1
    
            // 1.后台请求
            System.out.println("------------开始后台系统A请求:" + task_uuid);
    
            JSONObject result = new JSONObject();
            result.put("success", false);
    
            Future<String> stringFuture = executorService.submit(new MyMqRequestTask(
                    rabbitTemplate, myMqLock, task_uuid));
            try {
                String strMqResult = null;
                // 2.同步等待后台处理,但执行时在其它系统中完成
                strMqResult = stringFuture.get(5, TimeUnit.MINUTES);
                result.put("data", strMqResult);
                result.put("success", true);
    
            } catch (TimeoutException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
    
            // 3.gc释放
            if (ConstRabbit.conditionCache.contains(task_uuid)) {
                Condition currentCndition = ConstRabbit.conditionCache.remove(task_uuid);
                if (null != currentCndition) {
                    currentCndition = null; // for gc
                }
            }
            System.out.println("------------结束后台系统A请求:" + task_uuid);
            return result;
        }
    
    
        /**
         * communicateWithMq
         * RequestBody作用:序列化json发送
         *
         * @return
         */
        @RequestMapping(value = "communicateWithMqByReqObj", method = {RequestMethod.GET, RequestMethod.POST})
        @ResponseBody
        public JSONObject communicateWithMqByReqObj(@RequestBody MqRequest mqRequest) {
    
            // 1.后台请求
            String task_uuid = mqRequest.getReqId();
            System.out.println("------------前端Web系统开始后台系统A请求:" + task_uuid);
            JSONObject result = new JSONObject();
            result.put("success", false);
    
            Future<String> stringFuture = executorService.submit(new MyMqRequestTask(
                    rabbitTemplate, myMqLock, mqRequest));
            try {
                String strMqResult = null;
    
                // 2.同步等待后台处理,但执行时在其它系统中完成
                strMqResult = stringFuture.get(5, TimeUnit.MINUTES);
                result.put("data", strMqResult);
                result.put("success", true);
    
            } catch (TimeoutException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
    
            // 3.gc释放
            if (ConstRabbit.conditionCache.contains(task_uuid)) {
                Condition currentCndition = ConstRabbit.conditionCache.remove(task_uuid);
                if (null != currentCndition) {
                    currentCndition = null; // for gc
                }
            }
            System.out.println("------------前端Web系统结束后台系统A请求:" + task_uuid);
            return result;
        }
    
        @PreDestroy
        public void destroy() {
            executorService.shutdown();
        }
    
    }
    

    2.4业务线程

    package com.kikop.handler;
    
    import com.kikop.ConstRabbit;
    import com.kikop.model.MqRequest;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    
    /**
     * @author kikop
     * @version 1.0
     * @project Name: myrabbitwebrequest
     * @file Name: MyMqRequestTask
     * @desc 线程池任务处理器(mq请求任务)
     * @date 2021/10/10
     * @time 8:00
     * @by IDE: IntelliJ IDEA
     */
    public class MyMqRequestTask implements Callable<String> {
    
    
        private RabbitTemplate rabbitTemplate;
    
    
        private ReentrantLock myMqLock;
    
    
        private Condition myMqCondition;
    
        private MqRequest mqRequest;
    
        public MyMqRequestTask(RabbitTemplate rabbitTemplate, ReentrantLock myMqLock, String task_uuid) {
        }
    
        public MyMqRequestTask(RabbitTemplate rabbitTemplate, ReentrantLock myMqLock, MqRequest mqRequest) {
            this.rabbitTemplate = rabbitTemplate;
            this.myMqLock = myMqLock;
            Condition condition = this.myMqLock.newCondition();
            this.mqRequest = mqRequest;
            ConstRabbit.conditionCache.put(this.mqRequest.getReqId(), condition);
            this.myMqCondition = condition;
        }
    
        @Override
        public String call() throws Exception {
    
            try {
                System.out.println("------------业务线程发送到rabbitmq服务中的请求队列,开始获取锁:" + this.mqRequest.getReqId());
                myMqLock.lock();
    
                System.out.println("------------业务线程发送到rabbitmq服务中的请求队列,获取锁成功:" + this.mqRequest.getReqId());
    
    
    
                // 发送的对象 Object:String类型
                rabbitTemplate.convertAndSend(ConstRabbit.EXCHANGE_WEBREQUEST_DIRECT,
                        ConstRabbit.ROUTINGKEY_WEBREQUEST_DIRECT,
                        this.mqRequest.getReqId());
    
                // 等待mq处理结果
                System.out.println("------------业务线程发送到rabbitmq服务中的请求队列,await等待结果:" + this.mqRequest.getReqId() + ",准备唤醒AQS节点中下一个锁");
                myMqCondition.await(); // release aqs state,node-->condition queue
    
                return "result_" + this.mqRequest.getReqId();
    
            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                System.out.println("------------业务线程开始释放锁:" + this.mqRequest.getReqId());
                myMqLock.unlock(); // 释放锁
            }
            return "";
        }
    }
    

    2.5mq消费端

    package com.kikop.listener;
    
    import com.fasterxml.jackson.databind.ser.std.RawSerializer;
    import com.kikop.ConstRabbit;
    import com.kikop.model.MqRequest;
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    
    /**
     * @author kikop
     * @version 1.0
     * @project Name: myrabbitwebrequest
     * @file Name: MyRabbitListener
     * @desc
     * @date 2021/10/10
     * @time 8:00
     * @by IDE: IntelliJ IDEA
     */
    @Slf4j
    @Component
    public class MyRabbitListener {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Autowired
        private ReentrantLock myMqLock;
    
        private Condition myMqCondition;
    
        private static Random sleepRandom;
    
        static {
            sleepRandom = new Random(System.currentTimeMillis());
        }
    
    // 1.Java原生数据类型消费
    //    @RabbitListener(queues = ConstRabbit.QUEUE_WEBREQUEST_DIRECT_RESULT)
    //    public void helloRabbitMq(Message message, Channel channel) throws IOException {
    
        // 2.序列化Json数据消费
        @RabbitHandler
        @RabbitListener(queues = ConstRabbit.QUEUE_WEBREQUEST_DIRECT_RESULT)
        public void helloRabbitMq(Message message, @Payload MqRequest mqRequest, Channel channel) throws IOException {
    
            MessageProperties messageProperties = message.getMessageProperties();
            log.info(messageProperties.toString());
    
            try {
    
                // 1.队列结果
                log.info(message.toString());
                // body:payLoad负载
                log.info(new String(message.getBody()));
                byte[] messageBody = message.getBody();
                // 这个task_uuid能是sync队列中的第一节点吗
                // 不一定
                String task_uuid = mqRequest.getReqId();
    
                // 2.注意:
                // 1.手动应答模式需要,消息中带:getDeliveryTag,用于重写投递
                //            listener:
                //            simple:
                //            # manual 手动确认
                //                acknowledge-mode: manual
    
                // 2.报错信息
                //    Channel shutdown: channel error; protocol method: #method<channel.close>
                //    (reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1,
                //        class-id=60, method-id=80)
                channel.basicAck(messageProperties.getDeliveryTag(), false);
    
    
                // 3.推送处理结果到-->mq
    //            rabbitTemplate.convertAndSend(ConstRabbit.EXCHANGE_WEBREQUEST_DIRECT_RESULT,
    //                    ConstRabbit.ROUTINGKEY_WEBREQUEST_DIRECT_RESULT,
    //                    message.getBody());
    
    
                // 模拟XXX系统处理耗时(理论上应该在XXX系统中)
    //            int sleepTimes = sleepRandom.nextInt(60) + 1;
    //            System.out.println("------------task_uuid:" + task_uuid + ",sleepTimes:" + sleepTimes);
    //            TimeUnit.SECONDS.sleep(sleepTimes);
    
    
                // 4.条件变量通知
                try {
                    System.out.println("------------Mq消费者(后台系统A中)解析XXX系统理结果,开始获取锁:" + task_uuid);
                    myMqLock.lock();
    
                    System.out.println("------------Mq消费者(后台系统A中)解析XXX系统理结果,获取锁成功:" + task_uuid);
    
                    System.out.println("------------Mq消费者解析XXX系统处理结果,激活信号:" + task_uuid);
    
                    // 按顺序唤醒条件队列中的节点(11,22),和task_uuid没有直接的关系绑定(22,11)
    
                    Condition condition = ConstRabbit.conditionCache.get(task_uuid);
                    this.myMqCondition = condition;
                    if (this.myMqCondition != null) {
                        myMqCondition.signal();  // node-->aqs
                    } else {
                        System.out.println("------------Mq消费者(后台系统A中)解析XXX系统处理结果,无效的条件:" + task_uuid);
                    }
    
                } catch (Exception ex) {
                    ex.printStackTrace();
                } finally {
                    System.out.println("------------Mq消费者(后台系统A中)开始释放锁:" + task_uuid);
                    myMqLock.unlock(); // 唤醒aqs节点
                }
    
            } catch (Exception ex) {
                ex.printStackTrace();
    
                // begin_手动消费方式是启用
                if (messageProperties.getRedelivered()) {
                    // 当前的消息是否重新投递的消息,也就是该消息是重新回到队列里的消息
    
                    // 主要防止死循环消费
                    log.info("------------Mq消费者消息已重复处理失败,拒绝再次接收...");
                    // 拒绝消息
                    channel.basicReject(messageProperties.getDeliveryTag(), false);
                } else {
                    log.info("------------Mq消费者消息即将再次返回队列处理...");
                    channel.basicNack(messageProperties.getDeliveryTag(), false, true);
                }
                // end_手动消费方式是启用
            }
        }
    }
    

    2.6启动类

    package com.kikop;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    
    /**
     * @author kikop
     * @version 1.0
     * @project Name: myrabbitwebrequest
     * @file Name: MyRabbitWebRequestApplication
     * @desc
     * @date 2021/10/11
     * @time 8:00
     * @by IDE: IntelliJ IDEA
     */
    
    // 默认组件扫描当前包空间
    @SpringBootApplication
    public class MyRabbitWebRequestApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(MyRabbitWebRequestApplication.class, args);
        }
    
    }
    
    

    2.7后台系统B

    package com.kikop;
    
    
    import com.alibaba.fastjson.JSONObject;
    import com.kikop.model.MqRequest;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.boot.test.web.client.TestRestTemplate;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import java.util.stream.IntStream;
    
    
    /**
     * @author kikop
     * @version 1.0
     * @project Name: myrabbitreliableproducer
     * @file Name: ProducerTest
     * @desc
     * @date 2021/10/10
     * @time 8:00
     * @by IDE: IntelliJ IDEA
     */
    //@SpringBootTest
    @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
    @RunWith(SpringRunner.class)
    
    public class RabbitWebRequestTest {
    
    
        // 引入 SpringBootTest
        // 模拟 rest请求
        @Autowired
        private TestRestTemplate testRestTemplate;
    
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 传输字符串
         * get请求
         */
        @Test
        public void testConsurrentSend() {
    
            // http://localhost:8085/myrabbitwebrequest/aform/communicateWithMq?task_uuid=1
    
            // 本机CPU为4核,同时并发最多4个
    //        IntStream.range(0, 3).parallel().forEach(i ->
    //                {
    //                    MqRequest mqRequest = new MqRequest();
    //                    mqRequest.setReqId(String.valueOf(i + 1));
    //                    mqRequest.setReqInfo("info_" + String.valueOf(i + 1));
    //
    //                    JSONObject result = testRestTemplate.getForObject("/orderpay/communicateWithMqByReqObj?task_uuid={task_uuid}",
    //                            JSONObject.class, mqRequest);
    //                    System.out.println(result.toJSONString());
    //                }
    //        );
        }
    
        /**
         * 传输字符串
         * post请求
         */
        @Test
        public void testConsurrentSendByObj() {
    
            // 本机CPU为4核,同时并发最多4个
            MqRequest mqRequest = new MqRequest();
            mqRequest.setReqId(String.valueOf(1));
            mqRequest.setReqInfo("info_" + String.valueOf(1));
    
            JSONObject result = testRestTemplate.postForObject("/orderpay/communicateWithMqByReqObj",
                    mqRequest,
                    JSONObject.class, "");
            System.out.println(result.toJSONString());
    
        }
    
        @Test
        public void testProcResult() {
            MqRequest mqRequest = new MqRequest();
            mqRequest.setReqId(String.valueOf(1));
            mqRequest.setReqInfo("info_" + String.valueOf(1));
    
            System.out.println("------------模拟后台系统B进行请求处理");
            rabbitTemplate.convertAndSend(ConstRabbit.EXCHANGE_WEBREQUEST_DIRECT_RESULT,
                    ConstRabbit.ROUTINGKEY_WEBREQUEST_DIRECT_RESULT,
                    mqRequest);
        }
    }
    

    2.8postman测试

    image-20211013215913972.png image-20211013215947627.png
    // 后台系统A
    2021-10-13 21:58:11,271 [INFO] [http-nio-8085-exec-1] [org.springframework.web.servlet.DispatcherServlet:547] [] Completed initialization in 5 ms
    ------------前端Web系统开始后台系统A请求:1
    ------------业务线程发送到rabbitmq服务中的请求队列,开始获取锁:1
    ------------业务线程发送到rabbitmq服务中的请求队列,获取锁成功:1
    ------------业务线程发送到rabbitmq服务中的请求队列,await等待结果:1,准备唤醒AQS节点中下一个锁
    
    // 后台系统B
    ------------模拟后台系统B进行请求处理
    
    // 后台系统A
    2021-10-13 22:00:20,063 [INFO] [pool-1-thread-4] [com.kikop.listener.MyRabbitListener:63] [] MessageProperties [headers={__TypeId__=com.kikop.model.MqRequest}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=ex_webrequest_result, receivedRoutingKey=rk_webrequest_result, deliveryTag=1, consumerTag=amq.ctag-j7dC0hzytf8DF2_SQ3Jhsw, consumerQueue=queue_webrequest_result]
    2021-10-13 22:00:20,064 [INFO] [pool-1-thread-4] [com.kikop.listener.MyRabbitListener:68] [] (Body:'{"reqId":"1","reqInfo":"info_1"}' MessageProperties [headers={__TypeId__=com.kikop.model.MqRequest}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=ex_webrequest_result, receivedRoutingKey=rk_webrequest_result, deliveryTag=1, consumerTag=amq.ctag-j7dC0hzytf8DF2_SQ3Jhsw, consumerQueue=queue_webrequest_result])
    2021-10-13 22:00:20,064 [INFO] [pool-1-thread-4] [com.kikop.listener.MyRabbitListener:70] [] {"reqId":"1","reqInfo":"info_1"}
    ------------Mq消费者(后台系统A中)解析XXX系统理结果,开始获取锁:1
    ------------Mq消费者(后台系统A中)解析XXX系统理结果,获取锁成功:1
    ------------Mq消费者解析XXX系统处理结果,激活信号:1
    ------------Mq消费者(后台系统A中)开始释放锁:1
    ------------业务线程开始释放锁:1
    ------------前端Web系统结束后台系统A请求:1
    

    参考

    1RabbitMQ笔记(七)-SimpleMessageListenerContainer和DirectMessageListenerContainer

    https://blog.csdn.net/yingziisme/article/details/86418580

    2postman发送json格式的post请求

    https://www.cnblogs.com/shimh/p/6093229.html

    相关文章

      网友评论

          本文标题:2021-10-11_如何通过rabbitmq实现前后台的多系统

          本文链接:https://www.haomeiwen.com/subject/ljzeoltx.html