美文网首页RabbitQ
Springboot 技术整合--笔记7--集成rabbitmq

Springboot 技术整合--笔记7--集成rabbitmq

作者: 牵手生活 | 来源:发表于2019-05-31 17:16 被阅读0次

注意点

  • 消息的唯一id
  • 生产者发送消息之前需要先创建路由key,否则消息无法发送
  • 消费者手动签收时,依赖消息通道Channel
  • 通常情况下生产者与消费者是在不同的web app
  • 消费者的@RabbitListener 会自动帮助我们创建队列、路由key及绑定关系

rabbitmq常用地址

#管理后台
http://192.168.0.19:15672

最佳测试方式是启动2个服务

如8081端口发布消息(测试用例)
8080接收消息

websocke+rabbitmqt流程.jpg
image.png

pom.xml导入依赖

spring-boot-starter-test是为了后面写测试类用,
spring-boot-starter-amqp才是真正的使用rabbitmq的依赖

        <!--添加rabbitmq -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>1.5.2.RELEASE</version>
        </dependency>
        <!--添加rabbitmq  测试用例用 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
<!--单元测试系列:Mock工具之Mockito实战 https://www.cnblogs.com/zishi/p/6780719.html -->

        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-core</artifactId>
            <version>2.7.19</version>
            <scope>test</scope>
        </dependency>

application.properties配置文件当中引入RabbitMQ基本的配置信息

############################################################
#
# RabbitMQ 配置
#
############################################################
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#虚拟主机如virtual-task
spring.rabbitmq.virtual-host=/
#链接超时时间15秒
spring.rabbitmq.connection-timeout=15000s

配置jackson部分--非必须的

############################################################
#
#jackson 部分
#如data部分的时间格式化
# 不允许传空
############################################################
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=NON_NULL

创建RabbitConfig

使用@Value注解获取application.properties配置信息

/**
 Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,
 Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
 Queue:消息的载体,每个消息都会被投到一个或多个队列。
 Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
 Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
 vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
 Producer:消息生产者,就是投递消息的程序.
 Consumer:消息消费者,就是接受消息的程序.
 Channel:消息通道,在客户端的每个连接里,可建立多个channel.
 */

@Configuration
public class RabbitConfig {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Value("${spring.rabbitmq.host}")  //在application.properties中配置
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;


    public static final String EXCHANGE_TASK = "my-mq-exchange_task";
    public static final String EXCHANGE_B = "my-mq-exchange_B";
    public static final String EXCHANGE_C = "my-mq-exchange_C";


    public static final String QUEUE_A = "QUEUE_A";
    public static final String QUEUE_B = "QUEUE_B";
    public static final String QUEUE_C = "QUEUE_C";

    public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";
    public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";
    public static final String ROUTINGKEY_C = "spring-boot-routingKey_C";

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    //必须是prototype类型
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }

    /**
     * 针对消费者配置
     * 1. 设置交换机类型
     * 2. 将队列绑定到交换机
     FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
     HeadersExchange :通过添加属性key-value匹配
     DirectExchange:按照routingkey分发到指定队列
     TopicExchange:多关键字匹配
     */
    @Bean
    public DirectExchange defaultExchange() {
        return new DirectExchange(EXCHANGE_TASK);
    }
    /**
     * 获取队列A
     * @return
     */
    @Bean
    public Queue queueA() {
        return new Queue(QUEUE_A, true); //队列持久
    }
    @Bean
    public Binding binding() {

        return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
    }

}

创建消息的生产者MsgProducer


/**
 * 消息的生产者
 */
@Component
public class MsgProducer implements RabbitTemplate.ConfirmCallback {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    //由于rabbitTemplate的scope属性设置为ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自动注入
    private RabbitTemplate rabbitTemplate;
    /**
     * 构造方法注入rabbitTemplate
     */
    @Autowired
    public MsgProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容
    }

    public void sendMsg(String content) {
        //消息ID
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); //
        //把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列A
        logger.info("rabbitMq 生产者消息  内容content = {} CorrelationData= {}", content,correlationId);
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TASK, RabbitConfig.ROUTINGKEY_A, content, correlationId);
    }
    /**
     * 回调
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        logger.info(" 回调id:" + correlationData);
        if (ack) {
            logger.info("消息成功消费");
        } else {
            logger.info("消息消费失败:" + cause);
        }
    }
}

创建测试类RabbitMQTest

@SpringBootTest(classes= WechatTaskApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
public class RabbitMQTest {
    @Autowired
    private MsgProducer msgProducer;

    @Test
    public void testRabbit_Producer() {
        String mq = "{\n" +
                "    \"task_id\": 1462,\n" +
                "    \"wechat_id\": \"wxid_on8oksh88zo22\"\n" +
                "}";
        msgProducer.sendMsg(mq);
    }


}
运行testRabbit_Producer测试方法

运行测试类RabbitMQTestd的testRabbit_Producer


image.png

rabbitmq--消费端配置


############################################################
#
# Springboot RabbitMQ 基本配置 192.168.0.19
#192.168.2.34 ubuntu的ip被自动变为192.168.0.19
############################################################
spring.rabbitmq.host=192.168.0.19
spring.rabbitmq.port=5672
spring.rabbitmq.username=czg-admin
spring.rabbitmq.password=czg_pass
#虚拟主机如virtual-task
spring.rabbitmq.virtual-host=/
#链接超时时间15秒
spring.rabbitmq.connection-timeout=15000s



############################################################
#
# Springboot RabbitMQ 消费者配置(依赖RabbitMQ的基本配置)
#
############################################################
#最大并发数
spring.rabbitmq.listener.simple.concurrency=5
#最大并发支持
spring.rabbitmq.listener.simple.max-concurrency=10
#签收模式--可以手工或自动
spring.rabbitmq.listener.simple.acknowledge-mode=auto
#spring.rabbitmq.listener.simple.acknowledge-mode=manual
#限流(一条条过来,每一个线程即并发数一次消费一条)---在大流量时控制一次消费消息个数
spring.rabbitmq.listener.simple.prefetch=1
############################################################

添加消息的消费者MsgReceiver

**
 * 消费方
 * 默认情况下消息消费者是自动 ack (确认)消息的,如果要手动 ack(确认)则需要修改确认模式为 manual
 */
@Component
@RabbitListener(queues = RabbitConfig.QUEUE_A)
public class MsgReceiver {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

   /* //方式1=不处理消费确认
   @RabbitHandler
    public void process(String content) {
        logger.info("接收处理队列A当中的消息: " + content);
    }
*/

    /**
     * 方式2
     * Channel
     * @param message
     * @param channel com.rabbitmq.client.Channel https://blog.csdn.net/asdfsadfasdfsa/article/details/79671097
     * @param tag  好像都是1,2之类 deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel
     *
     * RabbitMQ(4)SpringBoot+RabbitMQ发送确认和消费手动确认机制
     * https://www.jianshu.com/p/fae8fca98522
     */
    @RabbitHandler
    public void processMessage2(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        logger.info("接收处理队列A当中的message = {},消息tag ={}" + message,tag);

        try {
            // 模拟执行任务
            Thread.sleep(1000);
            channel.basicAck(tag,false);            // 确认收到消息(需要在application.properties消费消息时指定非自动确认),消息将被队列移除,false只确认当前consumer一个消息收到,true确认所有consumer获得的消息。
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

再运行刚才的测试方法

image.png

关于rabbitmq的文章收录

rabbitmq消息队列的简单入门-csdn
Spring Boot整合RabbitMQ详细教程-csdn
springboot学习笔记-6 springboot整合RabbitMQ

RabbitMQ:消息发送确认 与 消息接收确认(ACK)
RabbitMQ(4)SpringBoot+RabbitMQ发送确认和消费手动确认机制
RabbitMQ消息中间件极速入门与实战--慕课

相关文章

网友评论

    本文标题:Springboot 技术整合--笔记7--集成rabbitmq

    本文链接:https://www.haomeiwen.com/subject/xxvwtctx.html