RabbitMQ

作者: kjy_112233 | 来源:发表于2022-05-16 11:38 被阅读0次

一、简介安装

先安装erlang在安装RabbitMQ
启动:RabbitMQ Command Prompt(以管理员身份运行)
命令:
rabbitmq-service.bat remove
set RABBITMQ_BASE=D:\android_tool\java\erl_rabbit_mq\rabbitmq_server-3.9.15\data
rabbitmq-service.bat install
rabbitmq-plugins enable rabbitmq_management
rabbitmq-service start
http://127.0.0.1:15672/
账号密码:guest

二、普通消息队列使用

1、pom引入
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、yml配置
spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /
    username: guest
    password: guest
3、代码实现

new Queue("simple_queue")创建消息队列
RabbitTemplate调用convertAndSend("simple_queue","消息体")方法发送消息
通过方法注解@RabbitListener(queues = "simple_queue")接收消息

三、WorkQueue

1、WorkQueue模型配置
listener:
  simple:
    prefetch: 1 #设置预取获取消息条数
2、创建消息队列
    @Bean
    public Queue directQueue() {
        return new Queue("simple_queue");
    }
3、发送消息
    @Test
    public void testRabbitTemplate() throws InterruptedException {
        String queueName = "simple_queue";
        String message = "hello spring amqp";
        for (int i = 0; i < 50; i++) {
            rabbitTemplate.convertAndSend(queueName, message);
            Thread.sleep(20);
        }
    }
4、接受消息
    @RabbitListener(queues = "simple_queue")
    public void listenerSimpleQueue1(String msg) throws InterruptedException {
        System.out.println("消息1:" + msg + LocalTime.now());
        Thread.sleep(20);
    }

    @RabbitListener(queues = "simple_queue")
    public void listenerSimpleQueue2(String msg) throws InterruptedException {
        System.out.println("消息2:" + msg + LocalTime.now());
        Thread.sleep(200);
    }

四、FanoutExchange

1、发布订阅FanoutExchange配置:将消息发送给所有绑定的消息队列
@Configuration
public class FanoutConfig {

    /**
     * @return 声明FanoutExchange交换机
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("li.fanout");
    }

    /**
     * @return 声明Queue消息队列
     */
    @Bean
    public Queue queue1() {
        return new Queue("fanout_queue1");
    }

    @Bean
    public Queue queue2() {
        return new Queue("fanout_queue2");
    }

    /**
     * 绑定队列和交换机
     *
     * @param queue1          消息队列
     * @param fanoutExchange 交换机
     * @return 绑定
     */
    @Bean
    public Binding bindingQueue1(Queue queue1, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue1).to(fanoutExchange);
    }

    /**
     * 绑定队列和交换机
     *
     * @param queue2          消息队列
     * @param fanoutExchange 交换机
     * @return 绑定
     */
    @Bean
    public Binding bindingQueue2(Queue queue2, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue2).to(fanoutExchange);
2、发送交换机消息
    @Test
    public void testSendFanoutExchange(){
        //交换机名称
        String exchangeName = "li.fanout";
        //消息
        String message = "hello FanoutExchange";
        rabbitTemplate.convertAndSend(exchangeName,"",message);
    }
3、接收交换机消息
    @RabbitListener(queues = "fanout_queue1")
    public void listenerLiQueue1(String msg) {
        System.out.println("消息1:" + msg + LocalTime.now());
    }

    @RabbitListener(queues = "fanout_queue2")
    public void listenerLiQueue2(String msg) {
        System.out.println("消息2:" + msg + LocalTime.now());
    }

五、DirectExchange:将接收到消息根据规则路由到指定的queue

1、发送消息
    @Test
    public void testSendDirectExchange(){
        //交换机名称
        String exchangeName = "li.direct";
        //消息
        String message = "hello DirectExchange";
        rabbitTemplate.convertAndSend(exchangeName,"red",message);
    }
2、接收消息
    /**
     * DirectExchange交换机接收消息
     *
     * @param msg 消息体
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct_queue1"),
            exchange = @Exchange(name = "li.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "blue"}
    ))
    public void listenerDirectExchange1(String msg) {
        System.out.println("listenerDirectExchange1:" + msg + LocalTime.now());
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct_queue2"),
            exchange = @Exchange(name = "li.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "yellow"}
    ))
    public void listenerDirectExchange2(String msg) {
        System.out.println("listenerDirectExchange2:" + msg + LocalTime.now());
    }

六、TopicExchange:区别于routingKey必须是多个单词的列表,并且以.分割#表示通配符

1、发送消息
    @Test
    public void testSendTopicExchange(){
        //交换机名称
        String exchangeName = "li.topic";
        //消息
        String message = "hello topic exchange";
        rabbitTemplate.convertAndSend(exchangeName,"#.news",message);
    }
2、接收消息
    /**
     * TopicExchange交换机接收消息
     *
     * @param msg 消息体
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic_queue1"),
            exchange = @Exchange(name = "li.topic", type = ExchangeTypes.TOPIC),
            key = "china.#"
    ))
    public void listenerTopicExchange1(String msg) {
        System.out.println("listenerDirectExchange1:" + msg + LocalTime.now());
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic_queue2"),
            exchange = @Exchange(name = "li.topic", type = ExchangeTypes.TOPIC),
            key = "#.news"
    ))
    public void listenerTopicExchange2(String msg) {
        System.out.println("listenerDirectExchange2:" + msg + LocalTime.now());
    }

七、消息转换

1、pom引入
<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
</dependency
2、转换器替换
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

相关文章

网友评论

      本文标题:RabbitMQ

      本文链接:https://www.haomeiwen.com/subject/ahmbyrtx.html