美文网首页RabbitMQSpring Boot
RabbitMQ(3)SpringBoot+RabbitMQ基于

RabbitMQ(3)SpringBoot+RabbitMQ基于

作者: 枫子夜 | 来源:发表于2019-04-17 11:53 被阅读0次

上一章讲解了RabbitMQ的一些基础概念,包括:RabbitMQ概念、生产者(producer)、消费者(consumer)、信道(channel)、队列(queue)、交换器(exchange)(direct、fanout、topic)、绑定(binding)、路由键(routing key)、持久化(durable)等,本章开始写第一个HelloWorld程序,话不多说,直接上代码。

  • SpringBoot项目搭建
  • Maven引入RabbitMQ jar
<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 配置RabbitMQ
# RabbitMQ配置:IP、端口、用户名、密码、vhost
spring.rabbitmq.host=192.168.89.168
spring.rabbitmq.port=5672
spring.rabbitmq.username=fzb
spring.rabbitmq.password=fzb2019
spring.rabbitmq.virtual-host=fzb_host
  • 生成User表
CREATE TABLE `user` (
    `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
    `name` varchar(255) DEFAULT NULL COMMENT '姓名',
    `age` int(11) DEFAULT NULL COMMENT '年龄',
    `birthday` timestamp NULL DEFAULT NULL COMMENT '生日',
    `salary` decimal(10,2) DEFAULT NULL COMMENT '年薪',
    `create_date` timestamp NULL DEFAULT NULL COMMENT '创建时间',
    `update_date` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
    PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;
INSERT INTO `user` VALUES ('1', '张三', '18', '2008-02-29 15:47:42', '5000000.00', '2008-02-29 15:47:42', '2019-04-12 14:35:24');
INSERT INTO `user` VALUES ('2', '李四', '17', '2008-02-29 15:47:42', '5000000.00', '2019-03-01 15:48:09', '2019-04-12 14:35:29');
INSERT INTO `user` VALUES ('3', '王五', '3', '2018-02-28 15:49:15', '50000000.00', '2019-03-04 09:38:09', '2019-04-12 14:35:16');

一:基于代码消息队列

1. direct类型
  • 新建配置 MQConfig.java类,@Component 把该类注册成组件, @Bean 创建交换器、队列及他们的绑定关系。
  • 声明Exchange(交换器名称,durable,autoDelete)
  • 声明Queue(队列名称,durable,autoDelete)
  • 绑定:BindingBuilder绑定队列到交换器,并设置路由键。
package com.fzb.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;

/**
 * @Description MQ配置信息
 *
 * 基于代码的绑定交换器、队列、路由键设置
 * 1. 声明Exchange(交换器名称,durable,autoDelete)
 * 2. 声明Queue(队列名称,durable,autoDelete)
 * 3. 绑定:BindingBuilder绑定队列到交换器,并设置路由键
 * @Author jxb
 * @Date 2019-03-10 10:25:30
 */
@Component
public class MQConfig {

    /**
     * @Description 创建1:1 类型交换器(direct)
     * new DirectExchange(String,boolean,boolean)
     * new FanoutExchange(String,boolean,boolean)
     * new TopicExchange(String,boolean,boolean)
     * 1. 交换器名称
     * 2. durable 是否持久化 默认true
     * 3. autoDelete 是否自动删除 默认false
     * @Author jxb
     * @Date 2019-03-02 14:26:59
     */
    @Bean
    private DirectExchange directExchange() {
        return new DirectExchange("direct.exchange");
    }

    /**
     * @Description 创建队列
     * new Queue(String,boolean,boolean,boolean)
     * 1. 队列名称
     * 2. durable 是否持久化 默认true
     * 3. exclusive 排他队列,第一个链接消费后自动删除 默认 false
     * 4. autoDelete 是否自动删除 默认false
     * @Author jxb
     * @Date 2019-03-02 14:12:31
     */
    @Bean
    private Queue directQueue() {
        return new Queue("direct.queue");
    }

    /**
     * @Description 绑定队列、交换器、路由键
     * @Author jxb
     * @Date 2019-03-04 16:43:08
     */
    @Bean
    private Binding bindingDirect() {
        return BindingBuilder.bind(directQueue()).to(directExchange()).with("HelloWorld");
    }
}
  • 新建生产者 MQProducer.java类,@RestController 注解为一个控制类,@RequestMapping("mqProducer")
    设置访问路径。
  • @Autowired 注入RabbitTemplate工具类(SpringBoot集成RabbitMQ自带的)
  • @Autowired 注入userService访问数据库(需连接数据库访问User表)
package com.fzb.rabbitmq.producer;

import com.fzb.user.bean.User;
import com.fzb.user.service.UserService;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

/**
 * @Description 生产者
 * @Author jxb
 * @Date 2019-03-09 09:43:47
 */
@RestController
@RequestMapping("mqProducer")
public class MQProducer {

    @Autowired
    public RabbitTemplate rabbitTemplate;

    @Autowired
    public UserService userService;

    /**
     * @Description direct 1:1 类型 交换器队列 生产
     * @Author jxb
     * @Date 2019-03-09 09:56:45
     */
    @RequestMapping(value = "/directMQ", method = {RequestMethod.GET})
    public List<User> directMQ() {
        List<User> users = userService.getUserList(null);
        for (User user : users) {
            CorrelationData correlationData = new CorrelationData(String.valueOf(user.getId()));
            rabbitTemplate.convertAndSend("direct.exchange", "HelloWorld", user, correlationData);
        }
        return users;
    }

}
  • 新建消费者 MQConsumer.java类,@Component 注册为组件
package com.fzb.rabbitmq.consumer;

import com.fzb.user.bean.User;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * @Description 消费者
 * @Author jxb
 * @Date 2019-03-09 09:43:47
 */
@Component
public class MQConsumer {

    /**
     * @Description direct 1:1 类型 交换器队列 消费
     * @Author jxb
     * @Date 2019-03-09 09:58:12
     */
    @RabbitListener(queues = "direct.queue")
    public void getDirectMessage(User user) throws Exception {
        // 模拟执行任务
        Thread.sleep(1000);
        System.out.println("--jxb--MQConsumer--getDirectMessage:" + user.toString());
    }

    /**
     * @Description 配合楼上的队列,消费同一个队列,均匀分配到两个消费者
     * @Author jxb
     * @Date 2019-03-02 14:53:28
     */
    @RabbitListener(queues = "direct.queue")
    public void getDirectMessageCopy(User user) throws Exception {
        // 模拟执行任务
        Thread.sleep(1000);
        System.out.println("--jxb--MQConsumer--getDirectMessageCopy:" + user.toString());
    }

}
  • 至此一个基础的SpringBoot集成RabbitMQ的direct类型交换器队列创建完成。推荐基于注解试编程,更直观也符合SpringBoot约定大于配置的思想,所以本文会着重介绍基于注解讲解。

二:基于注解消息队列

1. direct类型
  • 注释掉 MQConfig.java的 @Component 注解
  • 生产者不做修改
  • 消费者
package com.fzb.rabbitmq.consumer;

import com.fzb.user.bean.User;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * 基于注解的绑定交换器、队列、路由键设置
 * 1. Queue配置:value=队列名称、durable=是否持久化(默认true)、exclusive=排他队列,只在当前connection可用(默认false)、autoDelete=如无消息是否自动删除(默认false)
 * 2. Exchange配置:value=交换器名称、type=类型(默认direct)、durable=是否持久化(默认true)、autoDelete=如无消息是否自动删除(默认false)
 * 3. QueueBinding配置:key=路由键(string数组,支持* # 匹配),*必须匹配一个单词,#匹配0个或N个单词,用.分隔
 * 4. RabbitListener配置: bindings=Queue配置+Exchange配置+QueueBinding配置
 * 注:如果代码创建交换器等且配置绑定关系,注解只需监听队列即可,如:@RabbitListener(queues = "direct.queue")
 *
 * @Description 消费者
 * @Author jxb
 * @Date 2019-03-09 09:43:47
 */
@Component
public class MQConsumer {

    /**
     * @Description direct 1:1 类型 交换器队列 消费
     * @Author jxb
     * @Date 2019-03-09 09:58:12
     */
    @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "direct.queue"), exchange = @Exchange(value = "direct.exchange"), key = "HelloWorld")})
    public void getDirectMessage(User user) throws Exception {
        // 模拟执行任务
        Thread.sleep(1000);
        System.out.println("--jxb--MQConsumer--getDirectMessageCopy:" + user.toString());
    }

    /**
     * @Description 配合楼上的队列,消费同一个队列,均匀分配到两个消费者
     * @Author jxb
     * @Date 2019-03-02 14:53:28
     */
    @RabbitListener(queues = "direct.queue")
    public void getDirectMessageCopy(User user) throws Exception {
        // 模拟执行任务
        Thread.sleep(1000);
        System.out.println("--jxb--MQConsumer--getDirectMessageCopy:" + user.toString());
    }
}
2. fanout类型
  • 生产者,1:N模式,所以不需要设置路由键,即使设置也会忽略
    /**
     * @Description fanout 1:n 类型 交换器队列 生产
     * @Author jxb
     * @Date 2019-03-09 09:56:45
     */
    @RequestMapping(value = "/fanoutMQ", method = {RequestMethod.GET})
    public List<User> fanoutMQ() {
        List<User> users = userService.getUserList(null);
        for (User user : users) {
            rabbitTemplate.convertAndSend("fanout.exchange", "", user.getName());
        }
        return users;
    }
  • 消费者,定义了三个消费者,可以根据同一条消息做出不同的动作
    /**
     * @Description fanout 1:n 类型 交换器队列 消费(3个)
     * @Author jxb
     * @Date 2019-03-09 09:58:12
     */
    @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "fanout.queue.01"), exchange = @Exchange(value = "fanout.exchange", type = "fanout"))})
    public void getFanoutMessage01(String message) throws InterruptedException {
        // 模拟执行任务
        Thread.sleep(1000);
        System.out.println("--jxb--MQConsumer--getFanoutMessage01:" + "短信通知:您好," + message + "!感谢您成为FZB会员");
    }

    @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "fanout.queue.02"), exchange = @Exchange(value = "fanout.exchange", type = "fanout"))})
    public void getFanoutMessage02(String message) throws InterruptedException {
        // 模拟执行任务
        Thread.sleep(1000);
        System.out.println("--jxb--MQConsumer--getFanoutMessage02:" + "增加积分:您好," + message + "!您的当前积分为100");
    }

    @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "fanout.queue.03"), exchange = @Exchange(value = "fanout.exchange", type = "fanout"))})
    public void getFanoutMessage03(String message) throws InterruptedException {
        // 模拟执行任务
        Thread.sleep(1000);
        System.out.println("--jxb--MQConsumer--getFanoutMessage03:" + "通知好友:您好,您的朋友" + message + "已成为FZB会员,赶快一起互动吧");
    }
3. topic类型
  • 生产者,三种不同路由键的topic交换器,会根据规则路由到不同的队列
    /**
     * @Description topic n:1 类型 交换器队列 生产(3个)
     * @Author jxb
     * @Date 2019-03-09 09:56:45
     */
    @RequestMapping(value = "/topicMQ01", method = {RequestMethod.GET})
    public List<User> topicMQ01() {
        List<User> users = userService.getUserList(null);
        for (User user : users) {
            rabbitTemplate.convertAndSend("topic.exchange", "jd.reg.msg", user.getName());
        }
        return users;
    }

    @RequestMapping(value = "/topicMQ02", method = {RequestMethod.GET})
    public List<User> topicMQ02() {
        List<User> users = userService.getUserList(null);
        for (User user : users) {
            rabbitTemplate.convertAndSend("topic.exchange", "tm.reg.msg", user.getName());
        }
        return users;
    }

    @RequestMapping(value = "/topicMQ03", method = {RequestMethod.GET})
    public List<User> topicMQ03() {
        List<User> users = userService.getUserList(null);
        for (User user : users) {
            rabbitTemplate.convertAndSend("topic.exchange", "super.fzb.reg.msg", user.getName());
        }
        return users;
    }
  • 消费者,模糊匹配规则:“.”把路由键分成了几部分,“*”匹配一个词,“#”匹配0个或N个。

    /**
     * @Description topic n:1 类型 交换器队列 消费(普通会员注册提醒)
     * @Author jxb
     * @Date 2019-03-02 14:55:16
     */
    @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "topic.queue.01"), exchange = @Exchange(value = "topic.exchange", type = "topic"), key = {"*.reg.msg"})})
    public void getTopicMessage01(String message) throws InterruptedException {
        // 模拟执行任务
        Thread.sleep(1000);
        System.out.println("--jxb--MQConsumer--getTopicMessage01:" + "短信通知:您好," + message + "!感谢您成为FZB会员");
    }

    /**
     * @Description topic n:1 类型 交换器队列 消费(超级会员注册提醒)
     * @Author jxb
     * @Date 2019-03-02 14:55:16
     */
    @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "topic.queue.02"), exchange = @Exchange(value = "topic.exchange", type = "topic"), key = {"*.*.reg.msg.#"})})
    public void getTopicMessage02(String message) throws InterruptedException {
        // 模拟执行任务
        Thread.sleep(1000);
        System.out.println("--jxb--MQConsumer--getTopicMessage02:" + "短信通知:您好," + message + "!感谢您成为FZB超级会员,祝您玩的开心");
    }
  • direct、fanout、topic介绍完成,运行结果,自行检验。

生活好苦,但你好甜

相关文章

网友评论

    本文标题:RabbitMQ(3)SpringBoot+RabbitMQ基于

    本文链接:https://www.haomeiwen.com/subject/ixuapqtx.html