美文网首页
使用Spring AMQP

使用Spring AMQP

作者: 嗷大彬彬 | 来源:发表于2017-07-26 16:40 被阅读119次

    本文使用spring boot作为框架,使用spring framework也可以参考。

    前提

    安装部署并启动rabbitmq,参见 http://www.jianshu.com/p/9a32dca0c6aa

    配置连接

    constant.properties:

    spring.rabbitmq.addresses=192.168.253.133:5672,192.168.253.134:5672
    spring.rabbitmq.username=rabbitmq
    spring.rabbitmq.password=rabbitmq
    # 每个连接缓存channel的数量
    spring.rabbitmq.cachesize=25
    

    配置maven依赖

    pom.xml:

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    
    <dependencyManagement>
        <dependencies>
        </dependencies>
    </dependencyManagement>
    
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.4.RELEASE</version>
    </parent>
    
    <dependencies>
        <!-- 引入spring amqp -->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
        </dependency>
    
        <!-- 引入spring boot -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    
        <!-- 引入spring boot单元测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency> 
        
    </dependencies>
    

    配置

    RabbitConfig .java:

    @Configuration
    public class RabbitConfig {
    
        @Autowired
        private Environment env;
    
        /**
         * 连接工厂
         */
        @Bean
        public ConnectionFactory connectionFactory() {
    
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setAddresses(env.getProperty("spring.rabbitmq.addresses"));
            connectionFactory.setUsername(env.getProperty("spring.rabbitmq.username"));
            connectionFactory.setPassword(env.getProperty("spring.rabbitmq.password"));
            // connectionFactory.setCacheMode(CacheMode.CHANNEL);//默认是CHANNEL
            connectionFactory.setChannelCacheSize(
                    Integer.parseInt(env.getProperty("spring.rabbitmq.cachesize")));
            // 开启发布者确认
            // connectionFactory.setPublisherConfirms(true);
            // 开启发布者返回
            // connectionFactory.setPublisherReturns(true);
            return connectionFactory;
        }
    
        /**
         * 自定义管理类,负责管理声明队列、交换机等管理功能,spring amqp默认开启自动声明
         */
        @Bean
        public RabbitAdmin rabbitAdmin() {
    
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
            return rabbitAdmin;
        }
    
        /**
         * 模板类,主要负责发送接收
         */
        @Bean
        public RabbitTemplate rabbitTemplate() {
    
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
            return rabbitTemplate;
        }
    
        /**
         * direct类型交换机,只有精确匹配routing key的队列才能收到消息,且消息按一定规则分配,即一条消息只会被其中一个队列接收到
         */
        @Bean
        public Exchange directExchange() {
    
            return ExchangeBuilder.directExchange("shark.direct").durable(true).build();
        }
    
        /**
         * fanout类型交换机,类似于广播,所有绑定的队列均收到所有消息
         */
        @Bean
        public Exchange fanoutExchange() {
    
            return ExchangeBuilder.fanoutExchange("shark.fanout").durable(true).build();
        }
    
        /**
         * topic类型交换机,与direct类型相似,不过topic类型支持模糊匹配。# *
         */
        @Bean
        public Exchange topicExchange() {
    
            return ExchangeBuilder.topicExchange("shark.topic").durable(true).build();
        }
    
        /**
         * 自定义队列1
         */
        @Bean
        public Queue infoQueue() {
    
            return QueueBuilder.durable("shark.info").build();
        }
    
        /**
         * 自定义队列2
         */
        @Bean
        public Queue errorQueue() {
    
            return QueueBuilder.durable("shark.error").build();
        }
    
        /**
         * 绑定队列1到fanout交换机
         */
        @Bean
        public Binding bindingFanoutAnony() {
    
            return BindingBuilder.bind(errorQueue()).to(fanoutExchange()).with("").noargs();
        }
    
        /**
         * 绑定队列2到fanout交换机
         */
        @Bean
        public Binding bindingFanoutAnony2() {
    
            return BindingBuilder.bind(infoQueue()).to(fanoutExchange()).with("").noargs();
        }
    
        /**
         * 绑定自定义队列1到direct交换机
         * 
         * @Primary 设置为默认的bean,注入时@Autowired 或未指定名称的@Resource将使用默认的bean
         */
        @Primary
        @Bean
        public Binding bindingDirect1() {
    
            return BindingBuilder.bind(infoQueue()).to(directExchange()).with("shark.info").noargs();
        }
    
        /**
         * 绑定自定义队列2到direct交换机
         */
        @Bean
        public Binding bindingDirect2() {
    
            return BindingBuilder.bind(errorQueue()).to(directExchange()).with("shark.error").noargs();
        }
    
        /**
         * 绑定自定义队列1到topic交换机
         */
        @Bean
        public Binding bindingTopic1() {
    
            return BindingBuilder.bind(errorQueue()).to(topicExchange()).with("shark.*").noargs();
        }
    
        /**
         * 绑定自定义队列2到topic交换机
         */
        @Bean
        public Binding bindingTopic2() {
    
            return BindingBuilder.bind(infoQueue()).to(topicExchange()).with("shark.*").noargs();
        }
    
    }
    

    启动Spring boot应用

    Application.java

    @SpringBootApplication(exclude = RabbitAutoConfiguration.class) //排除spring boot自动注入
    @EnableAspectJAutoProxy(proxyTargetClass = true) // 激活Aspect自动代理
    @PropertySource({ "classpath:/cn/com/ut/config/properties/constant.properties" })
    public class Application {
        public static void main(String[] args) {
    
            SpringApplication application = new SpringApplication(Application.class);
            application.setBannerMode(Banner.Mode.LOG);
            application.run(args);
        }
    }
    

    测试

    TestSendController.java

    @RestController
    @RequestMapping(value = "/test")
    public class TestSendController {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Resource(name = "fanoutExchange")
        private Exchange fanoutExchange;
    
        @Resource(name = "bindingDirect1")
        private Binding bindingDirect1;
    
        @Resource(name = "bindingDirect2")
        private Binding bindingDirect2;
    
        @Resource(name = "topicExchange")
        private Exchange topicExchange;
    
        /**
         * 广播消息到多个队列,所有队列均接收全部消息
         */
        @GetMapping("/sendFanout")
        public String sendFanout(@RequestParam(value = "msg", required = false) final String msg) {
    
            new Thread(new Runnable() {
                @Override
                public void run() {
    
                    for (int i = 0; i < 10; i++) {
                        rabbitTemplate.convertAndSend(fanoutExchange.getName(),
                                "msg" + i + " : " + msg);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }).start();
            return "success send 10 msg to fanout Exchange!";
        }
    
        /**
         * 均衡分配消息多个队列,队列收到的消息预其他队列不一样(精确匹配)
         */
        @GetMapping("/sendDirect")
        public String sendDirect(@RequestParam(value = "msg", required = false) final String msg) {
    
            // rabbitTemplate.convertAndSend(bindingDirect1.getExchange(),
            // bindingDirect1.getRoutingKey(),
            // "msg-1:" + msg);
            // rabbitTemplate.convertAndSend(bindingDirect2.getExchange(),
            // bindingDirect2.getRoutingKey(),
            // "msg-2:" + msg);
    
            new Thread(new Runnable() {
                @Override
                public void run() {
    
                    for (int i = 0; i < 10; i++) {
                        rabbitTemplate.convertAndSend(bindingDirect1.getExchange(), "shark.info",
                                "msg" + i + " : " + msg);
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }).start();
            return "success send 10 msg to topic Exchange!";
        }
    
        /**
         * 均衡分配消息多个队列,队列收到的消息预其他队列不一样(模糊匹配)
         */
        @GetMapping("/sendTopic")
        public String sendTopic(@RequestParam(value = "msg", required = false) final String msg) {
    
            new Thread(new Runnable() {
                @Override
                public void run() {
    
                    for (int i = 0; i < 10; i++) {
                        rabbitTemplate.convertAndSend(topicExchange.getName(), "shark.info",
                                "msg" + i + " : " + msg);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }).start();
            return "success send 10 msg to topic Exchange!";
        }
    
        /******************** 接收(同步) ****************** */
        @GetMapping("/receive")
        public String receive(@RequestParam(value = "queue") String queueName) {
    
            String msg = " 收到來自队列 " + queueName + "的消息:"
                    + (String) rabbitTemplate.receiveAndConvert(queueName);
            return msg;
        }
    }
    

    监听器接收

    1. RabbitConfig .java 添加@EnableRabbit注解和SimpleRabbitListenerContainerFactory 的bean
    @Configuration
    @EnableRabbit
    public class RabbitConfig {
            /**
         * 消息接收监听器容器工厂,需要开启@EnableRabbit注解
         */
        @Bean
        public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory());
            factory.setConcurrentConsumers(3);
            factory.setMaxConcurrentConsumers(10);
            return factory;
        }
    }
    
    1. TestSendController.java 添加监听器
    /******************** 接收监听器****************** */
    @RabbitListener(queues = "#{infoQueue.name}")
    public void receive3(String reciveMsg) {
    
        System.out.println("receive1(订阅了infoQueue) 收到信息:" + reciveMsg);
    }
    
    @RabbitListener(queues = "#{errorQueue.name}")
    public void receive4(String reciveMsg) {
    
        System.out.println("receive2(订阅了errorQueue) 收到信息:" + reciveMsg);
    }
    
    @RabbitListener(queues = "#{infoQueue.name}")
    public void receive5(String reciveMsg) {
    
        System.out.println("receive3(订阅了infoQueue) 收到信息:" + reciveMsg);
    }
    

    相关文章

      网友评论

          本文标题:使用Spring AMQP

          本文链接:https://www.haomeiwen.com/subject/penvkxtx.html