美文网首页RabbitMQSpring Boot
RabbitMQ(3)SpringBoot+RabbitMQ基于

RabbitMQ(3)SpringBoot+RabbitMQ基于

作者: 枫子夜 | 来源:发表于2019-04-17 11:53 被阅读0次

    上一章讲解了RabbitMQ的一些基础概念,包括:RabbitMQ概念、生产者(producer)、消费者(consumer)、信道(channel)、队列(queue)、交换器(exchange)(direct、fanout、topic)、绑定(binding)、路由键(routing key)、持久化(durable)等,本章开始写第一个HelloWorld程序,话不多说,直接上代码。

    • SpringBoot项目搭建
    • Maven引入RabbitMQ jar
    <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    • 配置RabbitMQ
    # RabbitMQ配置:IP、端口、用户名、密码、vhost
    spring.rabbitmq.host=192.168.89.168
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=fzb
    spring.rabbitmq.password=fzb2019
    spring.rabbitmq.virtual-host=fzb_host
    
    • 生成User表
    CREATE TABLE `user` (
        `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
        `name` varchar(255) DEFAULT NULL COMMENT '姓名',
        `age` int(11) DEFAULT NULL COMMENT '年龄',
        `birthday` timestamp NULL DEFAULT NULL COMMENT '生日',
        `salary` decimal(10,2) DEFAULT NULL COMMENT '年薪',
        `create_date` timestamp NULL DEFAULT NULL COMMENT '创建时间',
        `update_date` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
        PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;
    INSERT INTO `user` VALUES ('1', '张三', '18', '2008-02-29 15:47:42', '5000000.00', '2008-02-29 15:47:42', '2019-04-12 14:35:24');
    INSERT INTO `user` VALUES ('2', '李四', '17', '2008-02-29 15:47:42', '5000000.00', '2019-03-01 15:48:09', '2019-04-12 14:35:29');
    INSERT INTO `user` VALUES ('3', '王五', '3', '2018-02-28 15:49:15', '50000000.00', '2019-03-04 09:38:09', '2019-04-12 14:35:16');
    

    一:基于代码消息队列

    1. direct类型
    • 新建配置 MQConfig.java类,@Component 把该类注册成组件, @Bean 创建交换器、队列及他们的绑定关系。
    • 声明Exchange(交换器名称,durable,autoDelete)
    • 声明Queue(队列名称,durable,autoDelete)
    • 绑定:BindingBuilder绑定队列到交换器,并设置路由键。
    package com.fzb.rabbitmq.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    
    /**
     * @Description MQ配置信息
     *
     * 基于代码的绑定交换器、队列、路由键设置
     * 1. 声明Exchange(交换器名称,durable,autoDelete)
     * 2. 声明Queue(队列名称,durable,autoDelete)
     * 3. 绑定:BindingBuilder绑定队列到交换器,并设置路由键
     * @Author jxb
     * @Date 2019-03-10 10:25:30
     */
    @Component
    public class MQConfig {
    
        /**
         * @Description 创建1:1 类型交换器(direct)
         * new DirectExchange(String,boolean,boolean)
         * new FanoutExchange(String,boolean,boolean)
         * new TopicExchange(String,boolean,boolean)
         * 1. 交换器名称
         * 2. durable 是否持久化 默认true
         * 3. autoDelete 是否自动删除 默认false
         * @Author jxb
         * @Date 2019-03-02 14:26:59
         */
        @Bean
        private DirectExchange directExchange() {
            return new DirectExchange("direct.exchange");
        }
    
        /**
         * @Description 创建队列
         * new Queue(String,boolean,boolean,boolean)
         * 1. 队列名称
         * 2. durable 是否持久化 默认true
         * 3. exclusive 排他队列,第一个链接消费后自动删除 默认 false
         * 4. autoDelete 是否自动删除 默认false
         * @Author jxb
         * @Date 2019-03-02 14:12:31
         */
        @Bean
        private Queue directQueue() {
            return new Queue("direct.queue");
        }
    
        /**
         * @Description 绑定队列、交换器、路由键
         * @Author jxb
         * @Date 2019-03-04 16:43:08
         */
        @Bean
        private Binding bindingDirect() {
            return BindingBuilder.bind(directQueue()).to(directExchange()).with("HelloWorld");
        }
    }
    
    • 新建生产者 MQProducer.java类,@RestController 注解为一个控制类,@RequestMapping("mqProducer")
      设置访问路径。
    • @Autowired 注入RabbitTemplate工具类(SpringBoot集成RabbitMQ自带的)
    • @Autowired 注入userService访问数据库(需连接数据库访问User表)
    package com.fzb.rabbitmq.producer;
    
    import com.fzb.user.bean.User;
    import com.fzb.user.service.UserService;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.List;
    
    /**
     * @Description 生产者
     * @Author jxb
     * @Date 2019-03-09 09:43:47
     */
    @RestController
    @RequestMapping("mqProducer")
    public class MQProducer {
    
        @Autowired
        public RabbitTemplate rabbitTemplate;
    
        @Autowired
        public UserService userService;
    
        /**
         * @Description direct 1:1 类型 交换器队列 生产
         * @Author jxb
         * @Date 2019-03-09 09:56:45
         */
        @RequestMapping(value = "/directMQ", method = {RequestMethod.GET})
        public List<User> directMQ() {
            List<User> users = userService.getUserList(null);
            for (User user : users) {
                CorrelationData correlationData = new CorrelationData(String.valueOf(user.getId()));
                rabbitTemplate.convertAndSend("direct.exchange", "HelloWorld", user, correlationData);
            }
            return users;
        }
    
    }
    
    • 新建消费者 MQConsumer.java类,@Component 注册为组件
    package com.fzb.rabbitmq.consumer;
    
    import com.fzb.user.bean.User;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.stereotype.Component;
    
    /**
     * @Description 消费者
     * @Author jxb
     * @Date 2019-03-09 09:43:47
     */
    @Component
    public class MQConsumer {
    
        /**
         * @Description direct 1:1 类型 交换器队列 消费
         * @Author jxb
         * @Date 2019-03-09 09:58:12
         */
        @RabbitListener(queues = "direct.queue")
        public void getDirectMessage(User user) throws Exception {
            // 模拟执行任务
            Thread.sleep(1000);
            System.out.println("--jxb--MQConsumer--getDirectMessage:" + user.toString());
        }
    
        /**
         * @Description 配合楼上的队列,消费同一个队列,均匀分配到两个消费者
         * @Author jxb
         * @Date 2019-03-02 14:53:28
         */
        @RabbitListener(queues = "direct.queue")
        public void getDirectMessageCopy(User user) throws Exception {
            // 模拟执行任务
            Thread.sleep(1000);
            System.out.println("--jxb--MQConsumer--getDirectMessageCopy:" + user.toString());
        }
    
    }
    
    • 至此一个基础的SpringBoot集成RabbitMQ的direct类型交换器队列创建完成。推荐基于注解试编程,更直观也符合SpringBoot约定大于配置的思想,所以本文会着重介绍基于注解讲解。

    二:基于注解消息队列

    1. direct类型
    • 注释掉 MQConfig.java的 @Component 注解
    • 生产者不做修改
    • 消费者
    package com.fzb.rabbitmq.consumer;
    
    import com.fzb.user.bean.User;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.stereotype.Component;
    
    /**
     * 基于注解的绑定交换器、队列、路由键设置
     * 1. Queue配置:value=队列名称、durable=是否持久化(默认true)、exclusive=排他队列,只在当前connection可用(默认false)、autoDelete=如无消息是否自动删除(默认false)
     * 2. Exchange配置:value=交换器名称、type=类型(默认direct)、durable=是否持久化(默认true)、autoDelete=如无消息是否自动删除(默认false)
     * 3. QueueBinding配置:key=路由键(string数组,支持* # 匹配),*必须匹配一个单词,#匹配0个或N个单词,用.分隔
     * 4. RabbitListener配置: bindings=Queue配置+Exchange配置+QueueBinding配置
     * 注:如果代码创建交换器等且配置绑定关系,注解只需监听队列即可,如:@RabbitListener(queues = "direct.queue")
     *
     * @Description 消费者
     * @Author jxb
     * @Date 2019-03-09 09:43:47
     */
    @Component
    public class MQConsumer {
    
        /**
         * @Description direct 1:1 类型 交换器队列 消费
         * @Author jxb
         * @Date 2019-03-09 09:58:12
         */
        @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "direct.queue"), exchange = @Exchange(value = "direct.exchange"), key = "HelloWorld")})
        public void getDirectMessage(User user) throws Exception {
            // 模拟执行任务
            Thread.sleep(1000);
            System.out.println("--jxb--MQConsumer--getDirectMessageCopy:" + user.toString());
        }
    
        /**
         * @Description 配合楼上的队列,消费同一个队列,均匀分配到两个消费者
         * @Author jxb
         * @Date 2019-03-02 14:53:28
         */
        @RabbitListener(queues = "direct.queue")
        public void getDirectMessageCopy(User user) throws Exception {
            // 模拟执行任务
            Thread.sleep(1000);
            System.out.println("--jxb--MQConsumer--getDirectMessageCopy:" + user.toString());
        }
    }
    
    2. fanout类型
    • 生产者,1:N模式,所以不需要设置路由键,即使设置也会忽略
        /**
         * @Description fanout 1:n 类型 交换器队列 生产
         * @Author jxb
         * @Date 2019-03-09 09:56:45
         */
        @RequestMapping(value = "/fanoutMQ", method = {RequestMethod.GET})
        public List<User> fanoutMQ() {
            List<User> users = userService.getUserList(null);
            for (User user : users) {
                rabbitTemplate.convertAndSend("fanout.exchange", "", user.getName());
            }
            return users;
        }
    
    • 消费者,定义了三个消费者,可以根据同一条消息做出不同的动作
        /**
         * @Description fanout 1:n 类型 交换器队列 消费(3个)
         * @Author jxb
         * @Date 2019-03-09 09:58:12
         */
        @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "fanout.queue.01"), exchange = @Exchange(value = "fanout.exchange", type = "fanout"))})
        public void getFanoutMessage01(String message) throws InterruptedException {
            // 模拟执行任务
            Thread.sleep(1000);
            System.out.println("--jxb--MQConsumer--getFanoutMessage01:" + "短信通知:您好," + message + "!感谢您成为FZB会员");
        }
    
        @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "fanout.queue.02"), exchange = @Exchange(value = "fanout.exchange", type = "fanout"))})
        public void getFanoutMessage02(String message) throws InterruptedException {
            // 模拟执行任务
            Thread.sleep(1000);
            System.out.println("--jxb--MQConsumer--getFanoutMessage02:" + "增加积分:您好," + message + "!您的当前积分为100");
        }
    
        @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "fanout.queue.03"), exchange = @Exchange(value = "fanout.exchange", type = "fanout"))})
        public void getFanoutMessage03(String message) throws InterruptedException {
            // 模拟执行任务
            Thread.sleep(1000);
            System.out.println("--jxb--MQConsumer--getFanoutMessage03:" + "通知好友:您好,您的朋友" + message + "已成为FZB会员,赶快一起互动吧");
        }
    
    3. topic类型
    • 生产者,三种不同路由键的topic交换器,会根据规则路由到不同的队列
        /**
         * @Description topic n:1 类型 交换器队列 生产(3个)
         * @Author jxb
         * @Date 2019-03-09 09:56:45
         */
        @RequestMapping(value = "/topicMQ01", method = {RequestMethod.GET})
        public List<User> topicMQ01() {
            List<User> users = userService.getUserList(null);
            for (User user : users) {
                rabbitTemplate.convertAndSend("topic.exchange", "jd.reg.msg", user.getName());
            }
            return users;
        }
    
        @RequestMapping(value = "/topicMQ02", method = {RequestMethod.GET})
        public List<User> topicMQ02() {
            List<User> users = userService.getUserList(null);
            for (User user : users) {
                rabbitTemplate.convertAndSend("topic.exchange", "tm.reg.msg", user.getName());
            }
            return users;
        }
    
        @RequestMapping(value = "/topicMQ03", method = {RequestMethod.GET})
        public List<User> topicMQ03() {
            List<User> users = userService.getUserList(null);
            for (User user : users) {
                rabbitTemplate.convertAndSend("topic.exchange", "super.fzb.reg.msg", user.getName());
            }
            return users;
        }
    
    • 消费者,模糊匹配规则:“.”把路由键分成了几部分,“*”匹配一个词,“#”匹配0个或N个。
    
        /**
         * @Description topic n:1 类型 交换器队列 消费(普通会员注册提醒)
         * @Author jxb
         * @Date 2019-03-02 14:55:16
         */
        @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "topic.queue.01"), exchange = @Exchange(value = "topic.exchange", type = "topic"), key = {"*.reg.msg"})})
        public void getTopicMessage01(String message) throws InterruptedException {
            // 模拟执行任务
            Thread.sleep(1000);
            System.out.println("--jxb--MQConsumer--getTopicMessage01:" + "短信通知:您好," + message + "!感谢您成为FZB会员");
        }
    
        /**
         * @Description topic n:1 类型 交换器队列 消费(超级会员注册提醒)
         * @Author jxb
         * @Date 2019-03-02 14:55:16
         */
        @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "topic.queue.02"), exchange = @Exchange(value = "topic.exchange", type = "topic"), key = {"*.*.reg.msg.#"})})
        public void getTopicMessage02(String message) throws InterruptedException {
            // 模拟执行任务
            Thread.sleep(1000);
            System.out.println("--jxb--MQConsumer--getTopicMessage02:" + "短信通知:您好," + message + "!感谢您成为FZB超级会员,祝您玩的开心");
        }
    
    • direct、fanout、topic介绍完成,运行结果,自行检验。

    生活好苦,但你好甜

    相关文章

      网友评论

        本文标题:RabbitMQ(3)SpringBoot+RabbitMQ基于

        本文链接:https://www.haomeiwen.com/subject/ixuapqtx.html