rabbitmq

作者: 原来_1361 | 来源:发表于2019-04-08 16:50 被阅读0次

    关于rabbitmq的用法参考:https://www.cnblogs.com/linkenpark/p/5393666.html

    一、 配置

    1. Exchange

    exchange是一个路由器,消息是经过路由器投递到对应的队列中。

    /**
     * 消息交换机配置  可以配置多个
     *
     * @Author administrator
     * @date 2019/4/5
     */
    @Configuration
    public class ExchangeConfig {
    
        /**
         * direct exchange: routing key完全匹配才转发
         *
         * @return
         */
        @Bean
        public DirectExchange directExchange() {
            return new DirectExchange(RabbitMqConfig.EXCHANGE, true, false);
        }
    
        /**
         * fanout exchange: 不理会routing key,消息直接广播到所有绑定的queue
         *
         * @return
         */
        @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange(RabbitMqConfig.EXCHANGE, true, false);
        }
    
        /**
         * topic exchange : 对routing key模式匹配
         *
         * @return
         */
        @Bean
        public TopicExchange topicExchange(){
            return new TopicExchange(RabbitMqConfig.EXCHANGE, true, false);
        }
    }
    

    2. Queue

    队列,存放消息的载体。

    @Configuration
    public class QueueConfig {
        @Bean
        public Queue queue(){
            return new Queue("whc");
        }
    }
    

    3. RouteKey

    映射规则,根据routekey将队列与路由器绑定。

    package com.whc.config;
    
    import com.whc.callback.MsgSendConfirmCallBack;
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    
    /**
     *
     */
    @Configuration
    public class RabbitMqConfig {
        /**
         * 消息交换机的名字
         */
        public static final String EXCHANGE = "exchange_test";
        /**
         * 队列key1
         */
        public static final String ROUTE_KEY_1 = "query_one_key1";
        /**
         * 队列key2
         */
        public static final String ROUTE_KEY_2 = "query_one_key2";
    
        @Autowired
        private QueueConfig queueConfig;
        @Autowired
        private ExchangeConfig exchangeConfig;
    
        /**
         * 连接工厂
         */
        @Autowired
        private ConnectionFactory factory;
    
        /**
         * 将消息队列和交换机通过路由key绑定
         *
         * @return
         */
        @Bean
        public Binding bindingOne() {
            return BindingBuilder.bind(queueConfig.queue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTE_KEY_1);
        }
    
        /**
         * queue监听器,观察者模式
         * 当有消息到达时会通知监听在对应队列上的对象
         *
         * @return
         */
        @Bean
        public SimpleMessageListenerContainer simpleMessageListenerContainer() {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(factory);
            container.addQueues(queueConfig.queue());
            container.setExposeListenerChannel(true);
            container.setMaxConcurrentConsumers(5);
            container.setConcurrentConsumers(1);
            container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            return container;
        }
    
        /**
         * 消息确认机制
         * Confirms给客户端一种轻量级的方式,能够跟踪哪些消息被broker处理,
         * 哪些可能因为broker宕掉或者网络失败的情况而重新发布。
         * 确认并且保证消息被送达,提供了两种方式:发布确认和事务。(两者不可同时使用)
         * 在channel为事务时,不可引入确认模式;同样channel为确认模式下,不可使用事务
         *
         * @return
         */
        @Bean
        public RabbitTemplate rabbitTemplate() {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
            rabbitTemplate.setConfirmCallback(msgSendConfirmCallBack());
            return rabbitTemplate;
        }
    
        @Bean
        public MsgSendConfirmCallBack msgSendConfirmCallBack() {
            return new MsgSendConfirmCallBack();
        }
    }
    
    

    4. pom依赖和配置文件

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.1.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.whc</groupId>
        <artifactId>springboot-rabbitmq</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>springboot-rabbitmq</name>
        <description>rabitmq for springboot</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-devtools</artifactId>
                <scope>runtime</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <!--rabbit-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    
    

    application.properties

    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host=/
    

    二、 生产者和消费者

    FirstConsumer

    package com.whc.consumer;
    
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 这里有两个消费者FirstConsumer和HelloReceiver,都订阅了whc这个消息队列,消息消费的原则是:只能由一个消费者消费
     *
     * @author Administrator
     * @date 2019/4/8
     */
    @Component
    public class FirstConsumer {
        @RabbitListener(queues = {"whc"}, containerFactory = "rabbitListenerContainerFactory")
        public void handleMessage(String message) throws Exception {
            System.out.println("FirstConsumer {} handle message..." + message);
        }
    }
    
    package com.whc.consumer;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @ClassName HelloReceiver
     * @Description TODO
     * @Author Administrator
     * @Date 2018/12/8 19:48
     * @Version 1.0
     */
    @Component
    @RabbitListener(queues = "whc")
    public class HelloReceiver {
    
        @RabbitHandler
        public void process(String hello) {
            System.err.println("Receiver  : " + hello);
        }
    }
    
    package com.whc.sender;
    
    import com.whc.config.RabbitMqConfig;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * @author Administrator
     * @date 2019/4/8
     */
    @Slf4j
    @Component
    public class FirstSender {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 发送消息
         * @param uuid
         * @param msg
         */
        public void send(String uuid, Object msg){
            CorrelationData correlationId = new CorrelationData(uuid);
            rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ROUTE_KEY_1, msg, correlationId);
        }
    
    }
    
    package com.whc.sender;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    /**
     * @ClassName HelloSender
     * @Description TODO
     * @Author Administrator
     * @Date 2018/12/8 19:47
     * @Version 1.0
     */
    @RestController
    public class HelloSender {
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        /**
         * 发送单条信息
         */
        @GetMapping("/send")
        public void send() {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String context = "hello " + sdf.format(new Date());
            System.out.println("Sender : " + context);
            //通过rabbitTemplate.convertAndSend发送context到whc队列中
            this.rabbitTemplate.convertAndSend("whc", context);
        }
    
        /**
         * 循环发送多条信息
         */
        @GetMapping("/multiSend")
        public void multiSend() {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            for (int i = 0; i < 1000; i++) {
                String context = "你好,这是我的第 " + i + "条消息,当前时间为:" + sdf.format(new Date());
                System.out.println("Sender : " + context);
                this.rabbitTemplate.convertAndSend("whc", context);
            }
        }
    }
    
    

    三、 测试用例

    package com.whc;
    
    import com.whc.sender.FirstSender;
    import com.whc.sender.HelloSender;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import java.util.UUID;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class ApplicationTests {
    
        @Autowired
        private HelloSender helloSender;
    
        @Test
        public void contextLoads() {
        }
    
        @Test
        public void hello() throws Exception {
            helloSender.send();
        }
    
        @Autowired
        private FirstSender firstSender;
    
        @Test
        public void send() throws InterruptedException {
            String uuid = UUID.randomUUID().toString();
            for (int i = 0; i < 100; ) {
                firstSender.send(uuid, "hello world, this id send by the " +(++i)+ " one...");
                System.out.println("I am sending you " + i + " message.");
                Thread.sleep(100);
            }
    
        }
    }
    

    相关文章

      网友评论

          本文标题:rabbitmq

          本文链接:https://www.haomeiwen.com/subject/vddpiqtx.html