美文网首页
RabbitMQ在Springboot下的使用

RabbitMQ在Springboot下的使用

作者: 冬天里的懒喵 | 来源:发表于2021-11-02 15:43 被阅读0次

在springboot下操作rabbitMQ。

1.pom文件配置

pom文件配置如下:

  <!-- Spring Boot -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- amqp协议 用来连接rabbitmq -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!-- fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.78</version>
        </dependency>

2.yml配置

server:
  port: 8080

spring:
  rabbitmq:
    host: 192.168.161.114
    port: 5672
    username: root
    password: 123456
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual # 手动ack
        concurrency: 5 #消费端最小并发数
        max-concurrency: 10 #消费端最大并发数
        prefetch: 5 # 一次请求中预处理的消息数量
    cache:
      channel:
        size: 50 #缓存的channel数量

#自定义配置
rabbitmq-demo:
  defaultExchange: amqpExchange
  queue: queue
  routeKey: queue_key

3.java代码

自定义MQ配置:MQProperties:

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Component
@ConfigurationProperties(prefix = "rabbitmq-demo")
@Data
public class MQProperties {

    private String defaultExchange;
    private String routeKey;
    private String queue;
}

RabbitMQ中队列及exchange的配置:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableRabbit
public class RabbitMQConfig {
    
    @Autowired
    private MQProperties mqProperties;
    
    @Bean
    public Queue queue() {
        boolean durable = true;
        boolean exclusive = false;
        boolean autoDelete = false;
        return new Queue(mqProperties.getQueue(),durable,exclusive,autoDelete);
    }


    @Bean
    public DirectExchange defaultExchange() {
        boolean durable = true;
        boolean autoDelete = false;
        return new DirectExchange(mqProperties.getDefaultExchange(), durable, autoDelete);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue())
                .to(defaultExchange())
                .with(mqProperties.getRouteKey());
    }
}

生产者Producer:

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class Producer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private MQProperties mqProperties;

    public void sendMessage(String msg) {
        rabbitTemplate.convertAndSend(mqProperties.getDefaultExchange(),
                mqProperties.getRouteKey(), msg);
    }
}

消费者 Consumer:

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class Consumer {
    
    @RabbitListener(queues = "${rabbitmq-demo.queue}")
    public void  receive(String payload, Channel channel,
                         @Header(AmqpHeaders.DELIVERY_TAG) long tag){
        log.info("消费者获取消息内容:{}",payload);
        RabbitMQUtils.askMessage(channel, tag);
    }
}

定义一个发送数据的接口 Controller:

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
@Slf4j
public class Controller {
    @Autowired
    Producer  producer;

    @RequestMapping("/sendQueue")
    @ResponseBody
    public String sendQueue(String msg) {
        producer.sendMessage(msg);
        return "success";
    }
}

定义一个工具类RabbitMQUtils:

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;

@Slf4j
public class RabbitMQUtils {

    public static void askMessage(Channel channel, long tag) {
        askMessage(channel, tag, false);
    }

    public static void askMessage(Channel channel, long tag, boolean multiple) {
        try {
            channel.basicAck(tag, multiple);
        } catch (IOException e) {
            log.error("RabbitMQ,IO异常,异常原因为:{}", e.getMessage());
        }
    }

    public static void rejectMessage(Channel channel, long tag) {
        rejectMessage(channel, tag, false, false);
    }

    public static void rejectAndBackMQ(Channel channel, long tag) {
        rejectMessage(channel, tag, false, true);
    }

    public static void rejectMessage(Channel channel, long tag, boolean multiple, boolean request) {
        try {
            channel.basicNack(tag, multiple, request);
        } catch (IOException e) {
            log.error("RabbitMQ,IO异常,异常原因为:{}", e.getMessage());
        }
    }
}

4.测试

上述代码在springboot中启动以后,进行测试:

wget http://127.0.0.1:8080/sendQueue?msg=testmsg

测试结果:

2021-11-02 15:36:45.515  INFO 14692 --- [ntContainer#0-4] com.dhb.rabbitmq.demo.Consumer           : 消费者获取消息内容:testmsg

在RabbitMQ的后台界面,队列queue的绑定关系如下:

rabbitmq队列queue在控制台中的绑定关系

相关文章

网友评论

      本文标题:RabbitMQ在Springboot下的使用

      本文链接:https://www.haomeiwen.com/subject/rttfzltx.html