美文网首页Java 杂谈计算机杂谈
2019-08-26/SpringBoot 之RabbitMQ

2019-08-26/SpringBoot 之RabbitMQ

作者: 呼噜噜睡 | 来源:发表于2019-08-26 20:21 被阅读0次

    rabbitmq似乎用的挺多的,在消息可靠性要求比较高的项目上,首选它。为什么是它,我也不太清楚,如果有知道的小伙伴,要告诉我哟~
    好,进入正题,首选安装rabbitmq,这就能够折腾你半天的。一般有三种:
    一、linux或者linux虚拟机上安装rabbitmq,那么首先你要安装erlang。对于下载,安装,配置,让你怀疑人生。具体怎么怀疑人生,百度吧。
    二、使用docker安装。windows7 使用docker toolbox 安装docker,如果你能顺利安装docker,后面畅通无阻。难就难在顺利的安装docker。如果你是win10 旗舰版,那么在docker官网下载docker安装包,妥妥的搞定,爽哉。如果你是win10家庭版,对不起,我没有听清,请再说一遍。
    三、使用windows版的rabbitmq。具体参考https://www.cnblogs.com/ericli-ericli/p/5902270.html

    好的,经过愉快的安装之旅,我们假设你已经成功安装了RabbitMQ了。接下来搭建一个springboot项目,不懂的同学可以上spring官网。引入rabbitmq pom依赖,完整pom如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.3.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.wjp</groupId>
        <artifactId>rabbitmq</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>rabbitmq</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.6.0</version>
            </dependency>
    
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    
    

    application.properties的配置:

    spring.rabbitmq.addresses=127.0.0.1:5672
    spring.rabbitmq.username=wjp
    spring.rabbitmq.password=123456
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000
    #producer config
    spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-returns=true
    spring.rabbitmq.template.mandatory=true
    #consumer config
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    spring.rabbitmq.listener.simple.prefetch=1
    spring.rabbitmq.listener.simple.concurrency=1
    spring.rabbitmq.listener.simple.max-concurrency=1
    

    SpringBoot提供了一个现成的RabbitTemplate bean供我们使用,,发送方代码:

    package com.example.demo.amqp;
    
    import java.util.Map;
    import java.util.UUID;
    
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
    import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageHeaders;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Component;
    import org.springframework.util.IdGenerator;
    
    
    @Component
    public class RabbitSender {
    
        //自动注入RabbitTemplate模板类
        @Autowired
        private RabbitTemplate rabbitTemplate;  
        
        //回调函数: confirm确认
        final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                //System.err.println("生产端---correlationData: " + correlationData);
                //System.err.println("生产端---ack: " + ack);
                if(!ack){
                    System.err.println("生产端---异常处理....");
                }
            }
        };
        //回调函数: return返回
        final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
                    String exchange, String routingKey) {
                System.err.println("生产端---return exchange: " + exchange + ", routingKey: "
                    + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
            }
        };
        
        //发送消息方法调用: 构建Message消息
        public void send(Object message, Map<String, Object> properties) throws Exception {
            MessageHeaders mhs = new MessageHeaders(properties);
            Message msg = MessageBuilder.createMessage(message, mhs);
            rabbitTemplate.setConfirmCallback(confirmCallback);
            rabbitTemplate.setReturnCallback(returnCallback);
            //id + 时间戳 全局唯一 
            //CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()+"---------");
            rabbitTemplate.convertAndSend("", "test_queue", msg);
        }
    }
    
    

    接手方代码:

    package com.example.demo.amqp;
    
    import java.util.Map;
    
    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.handler.annotation.Headers;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.stereotype.Component;
    
    import com.rabbitmq.client.Channel;
    
    @Component
    public class RabbitReceiver {
        
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = "test_queue",durable=true),
                exchange = @Exchange(value = "amq.direct",
                ignoreDeclarationExceptions = "true"),
                key = "test_queue"
                )
        )
        @RabbitHandler
        public void onOrderMessage(@Payload String msg,Channel channel, 
                @Headers Map<String, Object> headers) throws Exception {
            System.err.println("消费端----------------------------: " + msg);
            Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
            //System.out.println(deliveryTag);
            //手工ACK
            channel.basicAck(deliveryTag, false);
        }   
    }
    
    

    直接在启动类上进行测试:

    package com.example.demo;
    
    import com.example.demo.amqp.RabbitSender;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.ConfigurableApplicationContext;
    
    @SpringBootApplication
    public class DemoApplication {
    
    
        public static void main(String[] args) throws Exception{
            ConfigurableApplicationContext configurableApplicationContext =SpringApplication.run(DemoApplication.class, args);
            RabbitSender rabbitSender = configurableApplicationContext.getBean(RabbitSender.class);
            for(int i=0;i<100;i++){
                rabbitSender.send("生产端---发送的消息--"+i,null);
                Thread.sleep(500);
                //System.out.println(i);
            }
    
        }
    
    }
    
    

    相关文章

      网友评论

        本文标题:2019-08-26/SpringBoot 之RabbitMQ

        本文链接:https://www.haomeiwen.com/subject/qhzdectx.html