美文网首页
整合RabbitMQ&Spring家族

整合RabbitMQ&Spring家族

作者: 青衣敖王侯 | 来源:发表于2019-08-10 16:15 被阅读0次

    1.SpringAQMP组件RabbitAdmin

    RabbitAdmin类是其中最重要的类,使用它可以操作RabbitMq,所以首先我们要在Spring中进行注入

        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setAddresses("49.234.231.49:5672");
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            connectionFactory.setVirtualHost("/");
            return connectionFactory;
        }
        
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            rabbitAdmin.setAutoStartup(true);
            return rabbitAdmin;
        }
    

    RabbitAdmin的注入依赖于ConnectionFactory,所以一般来说我们要初始化两个Bean。这里的autoStartup必须要设置为true,否则Spring容器不会加载RabbitAdmin类,至于为什么一会儿我们会通过源码来说明。另外RabbitAdmin底层实现就是从Spring容器中获取Exchange、Binding、RoutingKey以及Queue的@Bean声明。然后适用RabbitTemplate的execute方法执行对应的声明、修改、删除等一系列RabbitMQ基础功能操作,例如添加一个交换机、删除一个绑定、清空队列里面的消息等等。

    1.1 RabbitAdmin关键源码解析:

    1.1.1 为什么要设置autoStartup为true,如下图所示,不设置直接就返回,根本不再去读容器中的东西



    1.1.2 RabbitAdmin是如何从Spring容器中获取Exchange等内容的?




    首先扫描容器中的Exchange类型Queue类型Binding类型的bean,并放入一个集合中,最后调用rabbitTemplate声明这些Exchange和Queue。

    1.2 SpringAMQP声明式适用

    之前的基础API使用如下图所示:



    SpringAMQP的声明式适用如下图所示:


    2 RabbitTemplate实战



    代码示例:

    @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            return rabbitTemplate;
        }
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
        
        
        @Test
        public void testSendMessage() throws Exception {
            //1 创建消息
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.getHeaders().put("desc", "信息描述..");
            messageProperties.getHeaders().put("type", "自定义消息类型..");
            Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
            
            rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    System.err.println("------添加额外的设置---------");
                    message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
                    message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
                    return message;
                }
            });
        }
        
        @Test
        public void testSendMessage2() throws Exception {
            //1 创建消息
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("text/plain");
            Message message = new Message("mq 消息1234".getBytes(), messageProperties);
            
            rabbitTemplate.send("topic001", "spring.abc", message);
            
            rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
            rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
        }
    

    3 SimpleMessageListenerContainer

    • 可以监听多个队列、自动启动、自动声明功能
    • 设置事务特性、事务管理器、事务并发、是否开启事务(使用较少)
    • 设置消费者数量、最小最大数量、批量消费
    • 设置消息确认和自动确认模式、是否重回队列、异常捕获handler函数
    • 设置消费者标签生成策略、是否独占模式、消费者属性等
    • 设置具体的监听器、消息转换器等等
    • 可以在运行中的应用中动态修改消费者数量的大小、接收消息的模式等
      代码示例:
        @Bean
        public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
            
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
            container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
            container.setConcurrentConsumers(1);
            container.setMaxConcurrentConsumers(5);
            container.setDefaultRequeueRejected(false);
            container.setAcknowledgeMode(AcknowledgeMode.AUTO);
            container.setExposeListenerChannel(true);
            container.setConsumerTagStrategy(new ConsumerTagStrategy() {
                @Override
                public String createConsumerTag(String queue) {
                    return queue + "_" + UUID.randomUUID().toString();
                }
            });
            
            container.setMessageListener(new ChannelAwareMessageListener() {
                @Override
                public void onMessage(Message message, Channel channel) throws Exception {
                    String msg = new String(message.getBody());
                    System.err.println("----------消费者: " + msg);
                }
            });
            return container;
    }
    

    4 MessageListenerAdapter使用

    消息监听适配器,可以使用queueOrTagToMethodName队列标识与方法名称组成的集合意义进行队列与方法名称的匹配。队列和方法名称绑定,指定队列里的消息会被绑定的方法所接受处理。也可以直接指定方法的名字

    @Bean
        public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
            
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
            container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
            container.setConcurrentConsumers(1);
            container.setMaxConcurrentConsumers(5);
            container.setDefaultRequeueRejected(false);
            container.setAcknowledgeMode(AcknowledgeMode.AUTO);
            container.setExposeListenerChannel(true);
            container.setConsumerTagStrategy(new ConsumerTagStrategy() {
                @Override
                public String createConsumerTag(String queue) {
                    return queue + "_" + UUID.randomUUID().toString();
                }
            });
            
            /**
             * 1 适配器方式. 默认是有自己的方法名字的:handleMessage
                // 可以自己指定一个方法的名字: consumeMessage
                // 也可以添加一个转换器: 从字节数组转换为String
            MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
            adapter.setDefaultListenerMethod("consumeMessage");
            adapter.setMessageConverter(new TextMessageConverter());
            container.setMessageListener(adapter);*/
            
            
            /**
             * 2 适配器方式: 我们的队列名称 和 方法名称 也可以进行一一的匹配
             * 
             * */
            MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
            adapter.setMessageConverter(new TextMessageConverter());
            Map<String, String> queueOrTagToMethodName = new HashMap<>();
            queueOrTagToMethodName.put("queue001", "method1");
            queueOrTagToMethodName.put("queue002", "method2");
            adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
            container.setMessageListener(adapter);
    
            return container;
    }
    

    这个adapter默认是调用目标类中的handleMessage方法,这里我们自定义了一种方法。


    TextMessageConverter

    public class TextMessageConverter implements MessageConverter {
    
        @Override
        public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
            return new Message(object.toString().getBytes(), messageProperties);
        }
    
        @Override
        public Object fromMessage(Message message) throws MessageConversionException {
            String contentType = message.getMessageProperties().getContentType();
            if(null != contentType && contentType.contains("text")) {
                return new String(message.getBody());
            }
            return message.getBody();
        }
    
    }
    

    消息代理

    public class MessageDelegate {
    
        
    
        public void handleMessage(byte[] messageBody) {
            System.err.println("默认方法, 消息内容:" + new String(messageBody));
        }
        
        public void consumeMessage(byte[] messageBody) {
            System.err.println("字节数组方法, 消息内容:" + new String(messageBody));
        }
        
        public void consumeMessage(String messageBody) {
            System.err.println("字符串方法, 消息内容:" + messageBody);
        }
        
        public void method1(String messageBody) {
            System.err.println("method1 收到消息内容:" + new String(messageBody));
        }
        
        public void method2(String messageBody) {
            System.err.println("method2 收到消息内容:" + new String(messageBody));
        }
        
        
        public void consumeMessage(Map messageBody) {
            System.err.println("map方法, 消息内容:" + messageBody);
        }
        
        
        public void consumeMessage(Order order) {
            System.err.println("order对象, 消息内容, id: " + order.getId() + 
                    ", name: " + order.getName() + 
                    ", content: "+ order.getContent());
        }
        
        public void consumeMessage(Packaged pack) {
            System.err.println("package对象, 消息内容, id: " + pack.getId() + 
                    ", name: " + pack.getName() + 
                    ", content: "+ pack.getDescription());
        }
        
        public void consumeMessage(File file) {
            System.err.println("文件对象 方法, 消息内容:" + file.getName());
        }
    
    }
    

    测试:

        @Test
        public void testSendMessage4Text() throws Exception {
            //1 创建消息
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("text/plain");
            Message message = new Message("mq 消息1234".getBytes(), messageProperties);
            
            rabbitTemplate.send("topic001", "spring.abc", message);
            rabbitTemplate.send("topic002", "rabbit.abc", message);
        }
    

    5 MessageConverter




    代码示例:

        @Bean
        public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
            
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
            container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
            container.setConcurrentConsumers(1);
            container.setMaxConcurrentConsumers(5);
            container.setDefaultRequeueRejected(false);
            container.setAcknowledgeMode(AcknowledgeMode.AUTO);
            container.setExposeListenerChannel(true);
            container.setConsumerTagStrategy(new ConsumerTagStrategy() {
                @Override
                public String createConsumerTag(String queue) {
                    return queue + "_" + UUID.randomUUID().toString();
                }
            });
            /**
            container.setMessageListener(new ChannelAwareMessageListener() {
                @Override
                public void onMessage(Message message, Channel channel) throws Exception {
                    String msg = new String(message.getBody());
                    System.err.println("----------消费者: " + msg);
                }
            });
            */
            
            /**
             * 1 适配器方式. 默认是有自己的方法名字的:handleMessage
                // 可以自己指定一个方法的名字: consumeMessage
                // 也可以添加一个转换器: 从字节数组转换为String
            MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
            adapter.setDefaultListenerMethod("consumeMessage");
            adapter.setMessageConverter(new TextMessageConverter());
            container.setMessageListener(adapter);*/
            
            
            /**
             * 2 适配器方式: 我们的队列名称 和 方法名称 也可以进行一一的匹配
             * 
             * 
            MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
            adapter.setMessageConverter(new TextMessageConverter());
            Map<String, String> queueOrTagToMethodName = new HashMap<>();
            queueOrTagToMethodName.put("queue001", "method1");
            queueOrTagToMethodName.put("queue002", "method2");
            adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
            container.setMessageListener(adapter);      
            */
            
            // 1.1 支持json格式的转换器
            /** 
            MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
            adapter.setDefaultListenerMethod("consumeMessage");
            
            Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
            adapter.setMessageConverter(jackson2JsonMessageConverter);
            
            container.setMessageListener(adapter);
            */
            
            
            
            // 1.2 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象转换
            /** 
            MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
            adapter.setDefaultListenerMethod("consumeMessage");
            
            Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
            
            DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
            jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
            
            adapter.setMessageConverter(jackson2JsonMessageConverter);
            container.setMessageListener(adapter);
            */
            
            
            //1.3 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象多映射转换
            /** */
            MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
            adapter.setDefaultListenerMethod("consumeMessage");
            Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
            DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
            
            Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
            idClassMapping.put("order", com.bfxy.spring.entity.Order.class);
            idClassMapping.put("packaged", com.bfxy.spring.entity.Packaged.class);
            
            javaTypeMapper.setIdClassMapping(idClassMapping);
            
            jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
            adapter.setMessageConverter(jackson2JsonMessageConverter);
            container.setMessageListener(adapter);
            
            
    //        //1.4 ext convert
    //        
    //        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
    //        adapter.setDefaultListenerMethod("consumeMessage");
    //        
    //        //全局的转换器:
    //      ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
    //      
    //      TextMessageConverter textConvert = new TextMessageConverter();
    //      convert.addDelegate("text", textConvert);
    //      convert.addDelegate("html/text", textConvert);
    //      convert.addDelegate("xml/text", textConvert);
    //      convert.addDelegate("text/plain", textConvert);
    //      
    //      Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
    //      convert.addDelegate("json", jsonConvert);
    //      convert.addDelegate("application/json", jsonConvert);
    //      
    //      ImageMessageConverter imageConverter = new ImageMessageConverter();
    //      convert.addDelegate("image/png", imageConverter);
    //      convert.addDelegate("image", imageConverter);
    //      
    //      PDFMessageConverter pdfConverter = new PDFMessageConverter();
    //      convert.addDelegate("application/pdf", pdfConverter);
    //        
    //      
    //      adapter.setMessageConverter(convert);
    //      container.setMessageListener(adapter);
            
            return container;
            
        }
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class ApplicationTests {
    
        @Test
        public void contextLoads() {
        }
        
        @Autowired
        private RabbitAdmin rabbitAdmin;
        
        @Test
        public void testAdmin() throws Exception {
            rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
            
            rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
            
            rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
            
            rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
            
            rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
            
            rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
            
            rabbitAdmin.declareBinding(new Binding("test.direct.queue",
                    Binding.DestinationType.QUEUE,
                    "test.direct", "direct", new HashMap<>()));
            
            rabbitAdmin.declareBinding(
                    BindingBuilder
                    .bind(new Queue("test.topic.queue", false))     //直接创建队列
                    .to(new TopicExchange("test.topic", false, false))  //直接创建交换机 建立关联关系
                    .with("user.#"));   //指定路由Key
            
            
            rabbitAdmin.declareBinding(
                    BindingBuilder
                    .bind(new Queue("test.fanout.queue", false))        
                    .to(new FanoutExchange("test.fanout", false, false)));
            
            //清空队列数据
            rabbitAdmin.purgeQueue("test.topic.queue", false);
        }
        
        @Autowired
        private RabbitTemplate rabbitTemplate;
        
        
        @Test
        public void testSendMessage() throws Exception {
            //1 创建消息
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.getHeaders().put("desc", "信息描述..");
            messageProperties.getHeaders().put("type", "自定义消息类型..");
            Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
            
            rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    System.err.println("------添加额外的设置---------");
                    message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
                    message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
                    return message;
                }
            });
        }
        
        @Test
        public void testSendMessage2() throws Exception {
            //1 创建消息
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("text/plain");
            Message message = new Message("mq 消息1234".getBytes(), messageProperties);
            
            rabbitTemplate.send("topic001", "spring.abc", message);
            
            rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
            rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
        }
        
        @Test
        public void testSendMessage4Text() throws Exception {
            //1 创建消息
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("text/plain");
            Message message = new Message("mq 消息1234".getBytes(), messageProperties);
            
            
            rabbitTemplate.send("topic002", "rabbit.abc", message);
            rabbitTemplate.send("topic001", "spring.abc", message);
        }
        
        
        @Test
        public void testSendJsonMessage() throws Exception {
            
            Order order = new Order();
            order.setId("001");
            order.setName("消息订单");
            order.setContent("描述信息");
            ObjectMapper mapper = new ObjectMapper();
            String json = mapper.writeValueAsString(order);
            System.err.println("order 4 json: " + json);
            
            MessageProperties messageProperties = new MessageProperties();
            //这里注意一定要修改contentType为 application/json
            messageProperties.setContentType("application/json");
            Message message = new Message(json.getBytes(), messageProperties);
            
            rabbitTemplate.send("topic001", "spring.order", message);
        }
        
        @Test
        public void testSendJavaMessage() throws Exception {
            
            Order order = new Order();
            order.setId("001");
            order.setName("订单消息");
            order.setContent("订单描述信息");
            ObjectMapper mapper = new ObjectMapper();
            String json = mapper.writeValueAsString(order);
            System.err.println("order 4 json: " + json);
            
            MessageProperties messageProperties = new MessageProperties();
            //这里注意一定要修改contentType为 application/json
            messageProperties.setContentType("application/json");
            messageProperties.getHeaders().put("__TypeId__", "com.bfxy.spring.entity.Order");
            Message message = new Message(json.getBytes(), messageProperties);
            
            rabbitTemplate.send("topic001", "spring.order", message);
        }
        
        @Test
        public void testSendMappingMessage() throws Exception {
            
            ObjectMapper mapper = new ObjectMapper();
            
            Order order = new Order();
            order.setId("001");
            order.setName("订单消息");
            order.setContent("订单描述信息");
            
            String json1 = mapper.writeValueAsString(order);
            System.err.println("order 4 json: " + json1);
            
            MessageProperties messageProperties1 = new MessageProperties();
            //这里注意一定要修改contentType为 application/json
            messageProperties1.setContentType("application/json");
            messageProperties1.getHeaders().put("__TypeId__", "order");
            Message message1 = new Message(json1.getBytes(), messageProperties1);
            rabbitTemplate.send("topic001", "spring.order", message1);
            
            Packaged pack = new Packaged();
            pack.setId("002");
            pack.setName("包裹消息");
            pack.setDescription("包裹描述信息");
            
            String json2 = mapper.writeValueAsString(pack);
            System.err.println("pack 4 json: " + json2);
    
            MessageProperties messageProperties2 = new MessageProperties();
            //这里注意一定要修改contentType为 application/json
            messageProperties2.setContentType("application/json");
            messageProperties2.getHeaders().put("__TypeId__", "packaged");
            Message message2 = new Message(json2.getBytes(), messageProperties2);
            rabbitTemplate.send("topic001", "spring.pack", message2);
        }
        
        @Test
        public void testSendExtConverterMessage() throws Exception {
    //          byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "picture.png"));
    //          MessageProperties messageProperties = new MessageProperties();
    //          messageProperties.setContentType("image/png");
    //          messageProperties.getHeaders().put("extName", "png");
    //          Message message = new Message(body, messageProperties);
    //          rabbitTemplate.send("", "image_queue", message);
            
                byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "mysql.pdf"));
                MessageProperties messageProperties = new MessageProperties();
                messageProperties.setContentType("application/pdf");
                Message message = new Message(body, messageProperties);
                rabbitTemplate.send("", "pdf_queue", message);
        }
        
    
    }
    
    public class MessageDelegate {
    
        public void handleMessage(byte[] messageBody) {
            System.err.println("默认方法, 消息内容:" + new String(messageBody));
        }
        
        public void consumeMessage(byte[] messageBody) {
            System.err.println("字节数组方法, 消息内容:" + new String(messageBody));
        }
        
        public void consumeMessage(String messageBody) {
            System.err.println("字符串方法, 消息内容:" + messageBody);
        }
        
        public void method1(String messageBody) {
            System.err.println("method1 收到消息内容:" + new String(messageBody));
        }
        
        public void method2(String messageBody) {
            System.err.println("method2 收到消息内容:" + new String(messageBody));
        }
        
        
        public void consumeMessage(Map messageBody) {
            System.err.println("map方法, 消息内容:" + messageBody);
        }
        
        
        public void consumeMessage(Order order) {
            System.err.println("order对象, 消息内容, id: " + order.getId() + 
                    ", name: " + order.getName() + 
                    ", content: "+ order.getContent());
        }
        
        public void consumeMessage(Packaged pack) {
            System.err.println("package对象, 消息内容, id: " + pack.getId() + 
                    ", name: " + pack.getName() + 
                    ", content: "+ pack.getDescription());
        }
        
        public void consumeMessage(File file) {
            System.err.println("文件对象 方法, 消息内容:" + file.getName());
        }
    }
    

    6 RabbitMq与SpringBoot2.0整合





    6.1实战

    6.1.1生产者配置

    pom.xml配置

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.bfxy</groupId>
        <artifactId>rabbitmq-springboot-producer</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>jar</packaging>
    
        <name>rabbitmq-springboot-producer</name>
        <description>rabbitmq-springboot-producer</description>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.0.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    </project>
    

    application.properties

    spring.rabbitmq.addresses=49.234.231.49:5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000
    
    spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-returns=true
    spring.rabbitmq.template.mandatory=true
    

    定义一个发送类

    @Component
    public class RabbitSender {
    
        //自动注入RabbitTemplate模板类
        @Autowired
        private RabbitTemplate rabbitTemplate;  
        
        //回调函数: confirm确认
        final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.err.println("correlationData: " + correlationData);
                System.err.println("ack: " + ack);
                if(!ack){
                    System.err.println("异常处理....");
                }
            }
        };
        
        //回调函数: return返回
        final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
                    String exchange, String routingKey) {
                System.err.println("return exchange: " + exchange + ", routingKey: " 
                    + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
            }
        };
        
        //发送消息方法调用: 构建Message消息
        public void send(Object message, Map<String, Object> properties) throws Exception {
            MessageHeaders mhs = new MessageHeaders(properties);
            Message msg = MessageBuilder.createMessage(message, mhs);
            rabbitTemplate.setConfirmCallback(confirmCallback);
            rabbitTemplate.setReturnCallback(returnCallback);
            //id + 时间戳 全局唯一 
            CorrelationData correlationData = new CorrelationData("1234567890");
            rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
        }
        
        //发送消息方法调用: 构建自定义对象消息
        public void sendOrder(Order order) throws Exception {
            rabbitTemplate.setConfirmCallback(confirmCallback);
            rabbitTemplate.setReturnCallback(returnCallback);
            //id + 时间戳 全局唯一 
            CorrelationData correlationData = new CorrelationData("0987654321");
            rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);
        }
        
    }
    

    启动类

    @SpringBootApplication
    public class Application {
    
        public static void main(String[] args) {
            try {
                SpringApplication.run(Application.class, args);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    测试类

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class ApplicationTests {
    
        @Test
        public void contextLoads() {
        }
        
        @Autowired
        private RabbitSender rabbitSender;
    
        private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        
        @Test
        public void testSender1() throws Exception {
             Map<String, Object> properties = new HashMap<>();
             properties.put("number", "12345");
             properties.put("send_time", simpleDateFormat.format(new Date()));
             rabbitSender.send("Hello RabbitMQ For Spring Boot!", properties);
        }
        
        @Test
        public void testSender2() throws Exception {
             Order order = new Order("001", "第一个订单");
             rabbitSender.sendOrder(order);
        }
    }
    

    6.1.2消费者配置

    image.png
    image.png
    image.png

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.bfxy</groupId>
        <artifactId>rabbitmq-springboot-consumer</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>jar</packaging>
    
        <name>rabbitmq-springboot-consumer</name>
        <description>rabbitmq-springboot-consumer</description>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.0.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    </project>
    

    application.properties

    spring.rabbitmq.addresses=49.234.231.49:5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000
    
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    spring.rabbitmq.listener.simple.concurrency=5
    spring.rabbitmq.listener.simple.max-concurrency=10
    
    spring.rabbitmq.listener.order.queue.name=queue-2
    spring.rabbitmq.listener.order.queue.durable=true
    spring.rabbitmq.listener.order.exchange.name=exchange-2
    spring.rabbitmq.listener.order.exchange.durable=true
    spring.rabbitmq.listener.order.exchange.type=topic
    spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
    spring.rabbitmq.listener.order.key=springboot.*
    

    监听器

    @Component
    public class RabbitReceiver {
    
        
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = "queue-1", 
                durable="true"),
                exchange = @Exchange(value = "exchange-1", 
                durable="true", 
                type= "topic", 
                ignoreDeclarationExceptions = "true"),
                key = "springboot.*"
                )
        )
        @RabbitHandler
        public void onMessage(Message message, Channel channel) throws Exception {
            System.err.println("--------------------------------------");
            System.err.println("消费端Payload: " + message.getPayload());
            Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
            //手工ACK
            channel.basicAck(deliveryTag, false);
        }
        
        
        /**
         * 
         *  spring.rabbitmq.listener.order.queue.name=queue-2
            spring.rabbitmq.listener.order.queue.durable=true
            spring.rabbitmq.listener.order.exchange.name=exchange-1
            spring.rabbitmq.listener.order.exchange.durable=true
            spring.rabbitmq.listener.order.exchange.type=topic
            spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
            spring.rabbitmq.listener.order.key=springboot.*
         * @param order
         * @param channel
         * @param headers
         * @throws Exception
         */
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", 
                durable="${spring.rabbitmq.listener.order.queue.durable}"),
                exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", 
                durable="${spring.rabbitmq.listener.order.exchange.durable}", 
                type= "${spring.rabbitmq.listener.order.exchange.type}", 
                ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
                key = "${spring.rabbitmq.listener.order.key}"
                )
        )
        @RabbitHandler
        public void onOrderMessage(@Payload com.bfxy.springboot.entity.Order order, 
                Channel channel, 
                @Headers Map<String, Object> headers) throws Exception {
            System.err.println("--------------------------------------");
            System.err.println("消费端order: " + order.getId());
            Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
            //手工ACK
            channel.basicAck(deliveryTag, false);
        }
    }
    

    7.RabbitMQ与SpringCloudStream整合







    7.1实战

    producer搭建
    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.bfxy</groupId>
        <artifactId>rabbitmq-springcloudstream-producer</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>jar</packaging>
    
        <name>rabbitmq-springcloudstream-producer</name>
        <description>rabbitmq-spring</description>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>1.5.8.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>   
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-autoconfigure</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
                <version>1.3.4.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

    application.properties

    server.port=8001
    server.servlet.context-path=/producer
    
    spring.application.name=producer
    spring.cloud.stream.bindings.output_channel.destination=exchange-3
    spring.cloud.stream.bindings.output_channel.group=queue-3
    spring.cloud.stream.bindings.output_channel.binder=rabbit_cluster
    
    spring.cloud.stream.binders.rabbit_cluster.type=rabbit
    spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=49.234.231.49:5672
    spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=guest
    spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=
    spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/
    

    binder的名字和下面的rabbit_cluster必须要写的一样
    自定义接口Barista

    /**
     * <B>中文类名:</B><BR>
     * <B>概要说明:</B><BR>
     * 这里的Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称。
     * 通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
     */
    public interface Barista {
          
        //String INPUT_CHANNEL = "input_channel";  
        String OUTPUT_CHANNEL = "output_channel";  
    
        //注解@Input声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。这一名字与上述配置app2的配置文件中position1应该一致,表明注入了一个名字叫做input_channel的通道,它的类型是input,订阅的主题是position2处声明的mydest这个主题  
    //    @Input(Barista.INPUT_CHANNEL)  
    //    SubscribableChannel loginput();  
        //注解@Output声明了它是一个输出类型的通道,名字是output_channel。这一名字与app1中通道名一致,表明注入了一个名字为output_channel的通道,类型是output,发布的主题名为mydest。  
        @Output(Barista.OUTPUT_CHANNEL)
        MessageChannel logoutput();  
    
    //  String INPUT_BASE = "queue-1";  
    //  String OUTPUT_BASE = "queue-1";  
    //  @Input(Barista.INPUT_BASE)  
    //  SubscribableChannel input1();  
    //  MessageChannel output1();  
          
    }  
    

    定义消息发送类

    @EnableBinding(Barista.class)
    @Service  
    public class RabbitmqSender {  
      
        @Autowired  
        private Barista barista;  
        
        // 发送消息
        public String sendMessage(Object message, Map<String, Object> properties) throws Exception {  
            try{
                MessageHeaders mhs = new MessageHeaders(properties);
                Message msg = MessageBuilder.createMessage(message, mhs);
                boolean sendStatus = barista.logoutput().send(msg);
                System.err.println("--------------sending -------------------");
                System.out.println("发送数据:" + message + ",sendStatus: " + sendStatus);
            }catch (Exception e){  
                System.err.println("-------------error-------------");
                e.printStackTrace();
                throw new RuntimeException(e.getMessage());
               
            }  
            return null;
        }  
        
    }  
    

    测试

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class ApplicationTests {
    
        @Autowired
        private RabbitmqSender rabbitmqSender;
        
        
        @Test
        public void sendMessageTest1() {
           for(int i = 0; i < 1; i ++){
               try {
                   Map<String, Object> properties = new HashMap<String, Object>();
                   properties.put("SERIAL_NUMBER", "12345");
                   properties.put("BANK_NUMBER", "abc");
                   properties.put("PLAT_SEND_TIME", DateUtils.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS"));
                   rabbitmqSender.sendMessage("Hello, I am amqp sender num :" + i, properties);
                  
               } catch (Exception e) {
                   System.out.println("--------error-------");
                   e.printStackTrace(); 
               }
           }
           //TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
        }
        
    }
    

    消费者搭建:
    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.bfxy</groupId>
        <artifactId>rabbitmq-springcloudstream-consumer</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>jar</packaging>
    
        <name>rabbitmq-springcloudstream-consumer</name>
        <description>rabbitmq-spring</description>
     
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>1.5.8.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>   
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>       
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
                <version>1.3.4.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    
    </project>
    

    application.properties

    server.port=8002
    server.context-path=/consumer
    
    spring.application.name=consumer
    spring.cloud.stream.bindings.input_channel.destination=exchange-3
    spring.cloud.stream.bindings.input_channel.group=queue-3
    spring.cloud.stream.bindings.input_channel.binder=rabbit_cluster
    spring.cloud.stream.bindings.input_channel.consumer.concurrency=1
    spring.cloud.stream.rabbit.bindings.input_channel.consumer.requeue-rejected=false
    spring.cloud.stream.rabbit.bindings.input_channel.consumer.acknowledge-mode=MANUAL
    spring.cloud.stream.rabbit.bindings.input_channel.consumer.recovery-interval=3000
    spring.cloud.stream.rabbit.bindings.input_channel.consumer.durable-subscription=true
    spring.cloud.stream.rabbit.bindings.input_channel.consumer.max-concurrency=5
    
    spring.cloud.stream.binders.rabbit_cluster.type=rabbit
    spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=49.234.231.49:5672
    spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=guest
    spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=
    spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/
    

    定义接口Barista

    /**
     * <B>中文类名:</B><BR>
     * <B>概要说明:</B><BR>
     * 这里的Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称。
     * 通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
     * @author ashen(Alienware)
     * @since 2016年7月22日
     */
    
    public interface Barista {
          
        String INPUT_CHANNEL = "input_channel";  
    
        //注解@Input声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。这一名字与上述配置app2的配置文件中position1应该一致,表明注入了一个名字叫做input_channel的通道,它的类型是input,订阅的主题是position2处声明的mydest这个主题  
        @Input(Barista.INPUT_CHANNEL)  
        SubscribableChannel loginput();  
        
          
    }  
    

    定义接收者

    @EnableBinding(Barista.class)
    @Service
    public class RabbitmqReceiver {  
    
        @StreamListener(Barista.INPUT_CHANNEL)  
        public void receiver(Message message) throws Exception {  
            Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
            Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
            System.out.println("Input Stream 1 接受数据:" + message);
            System.out.println("消费完毕------------");
            channel.basicAck(deliveryTag, false);
        }  
    }  
    

    相关文章

      网友评论

          本文标题:整合RabbitMQ&Spring家族

          本文链接:https://www.haomeiwen.com/subject/rybedctx.html