美文网首页
微服务消息队列-RabbitMQ的简单使用

微服务消息队列-RabbitMQ的简单使用

作者: 侧耳倾听y | 来源:发表于2022-02-27 20:53 被阅读0次

1. 下载RabbitMQ

可以使用docker镜像:
https://python.iitter.com/other/170428.html

2. 添加依赖

生产者和消费者都需要添加以下依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

3. 生产者

@Service
@Slf4j
public class Producer {

    @Autowired
    private StreamBridge streamBridge;

    public void send(Object message) {
        log.info("sent: {}", message);
        streamBridge.send("testMessage-out-0", message);
    }

}

4. 消费者


@Slf4j
@Service
public class Consumer {

    @Autowired
    private ConsumerService consumerService;

    @Bean
    public Consumer<Object> testMessage() {
        return request -> {
            log.info("received: {}", request);
            consumerService.testConsumer(request);
        };
    }
}

在默认情况下,框架会使用消费者方法的 method name 作为当前消费者的标识,如果消费者标识和配置文件中的名称不一致,那么 Spring 应用就不知道该把当前的消费者绑定到哪一个 Stream 信道上去。

5. 配置文件

  • Binder

如果程序中只使用了单一的中间件,比如只接入了 RabbitMQ,那么可以直接在 spring.rabbitmq 节点下配置连接串,不需要特别指定 binders 配置。

spring:
  cloud:
    stream:
      # 如果你项目里只对接一个中间件,那么不用定义binders
      # 当系统要定义多个不同消息中间件的时候,使用binders定义
      binders:
        my-rabbit:
          type: rabbit # 消息中间件类型
          environment: # 连接信息
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
  • bindings

配置了生产者、消费者、binder 和 RabbitMQ 四方的关联关系。

spring:
  cloud:
    stream:
      bindings:
        # 添加 Producer
        testMessage-out-0:
          destination: testMessage-topic
          content-type: application/json
          binder: my-rabbit
        # Consumer
        testMessage-in-0:
          destination: testMessage-topic
          content-type: application/json
          # 消费组,同一个组内只能被消费一次
          group: add-coupon-group
          binder: my-rabbit
      function:
        definition: testMessage

如上就可以生成和消费消息了。

6. 异常处理

  • 重试

消息重试是一种简单高效的异常恢复手段,当 Consumer 端抛出异常的时候,Stream 会自动执行 2 次重试。

spring:
  cloud:
    stream:
      bindings:
        testMessage-in-0:
          destination: testMessage-topic
          content-type: application/json
          # 消费组,同一个组内只能被消费一次
          group: testMessage-group
          binder: my-rabbit
          consumer:
            # 如果最大尝试次数为1,即不重试
            # 默认是做3次尝试
            max-attempts: 5
            # 两次重试之间的初始间隔
            backOffInitialInterval: 2000
            # 重试最大间隔
            backOffMaxInterval: 10000
            # 每次重试后,间隔时间乘以的系数
            backOffMultiplier: 2
            # 如果某个异常你不想重试,写在这里
            retryableExceptions:
              java.lang.IllegalArgumentException: false

除了本地重试以外,还可以把这个失败的消息丢回到原始队列中,做一个 requeue 的操作。

spring:
  cloud:
    stream:
      rabbit:
        bindings:
          # requeue重试
          testMessage-in-0:
            consumer:
              requeue-rejected: true
  • 降级

通过 spring-integration 的注解 @ServiceActivator 做了一个桥接,将指定 Channel 的异常错误转到本地方法里。

@ServiceActivator(inputChannel = "testMessage-topic.testMessage-group.errors")
public void requestCouponFallback(ErrorMessage errorMessage) throws Exception {
    log.info("consumer error: {}", errorMessage);
    // 实现自己的逻辑
}

  • 死信队列

如果想要保留这条出错的 Message,可以选择将它发送到另一个 Queue 里。这个特殊的 Queue 就叫做死信队列。

开启

rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management

配置

spring:
  cloud:
    stream:
      rabbit:
        bindings:
          testMessage-in-0:
            consumer:
              auto-bind-dlq: true

死信队列的名称和第一个队列几乎一样,唯一区别就是末尾多了一个.dlq,这个 dlq 就是死信队列的标志。

7.延迟消息

下载rabbitmq_delayed_message_exchange插件

https://blog.csdn.net/u010375456/article/details/106323962/

  • 生产者

// 使用延迟消息发送
public void sendInDelay(Object message) {
    log.info("sent: {}", coupon);
    streamBridge.send("testMessageDelay-out-0"
            MessageBuilder.withPayload(message)
                    .setHeader("x-delay", 10 * 1000)
                    .build());
}
  • 消费者

@Bean
public Consumer<Object> testMessageDelay() {
    return request -> {
        log.info("received: {}", request);
        service.consumer(request);
    };
}
  • 配置文件

spring:
  cloud:
    stream:
      bindings:
        # 延迟发券 - producer
        testMessageDelay-out-0:
          destination: testMessage-delayed-topic
          content-type: application/json
          binder: my-rabbit
        # 延迟发券 - Consumer
        testMessageDelay-in-0:
          destination: testMessage-delayed-topic
          content-type: application/json
          # 消费组,同一个组内只能被消费一次
          group: testMessage-group
          binder: my-rabbit
          consumer:
            # 如果最大尝试次数为1,即不重试
            # 默认是做3次尝试
            max-attempts: 1
      function:
        definition: testMessageDelay
      rabbit:
        bindings:
          testMessageDelay-out-0:
            producer:
              delayed-exchange: true
          testMessageDelay-in-0:
            consumer:
              delayed-exchange: true

相关文章

  • win10使用RabbitMQ实现消息队列

    第三方消息队列服务RabbitMQ RabbitMQ简介 可靠性(Reliability):RabbitMQ 使用...

  • 异步消息队列

    1. 消息队列 rabbitmq - 提供消息队列服务 rabbitmq 常用指令 docker run -d -...

  • docker安装RabbitMQ

    摘要 本文主要介绍了使用docker安装rabbitmq消息队列服务,并简述了使用php操作rabbitmq的必要...

  • 消息队列——了解一下

    本文将从以下几点展开 为什么使用消息队列? 消息队列的缺点 消息队列如何选型 RabbitMQ RabbitMQ ...

  • SpringBoot使用RabbitMQ及RabbitMQ介绍

    SpringBoot使用RabbitMQ RabbitMQ 是 消息队列 实现AMQP(高级消息队列协议Advan...

  • 微服务消息队列-RabbitMQ的简单使用

    1. 下载RabbitMQ 可以使用docker镜像:https://python.iitter.com/othe...

  • Nova 组件

    Nova 子服务协作流程 消息队列使用rabbitMQ:OpenStack组件的各个子服务之间不直接调用,全部通过...

  • RabbitMQ-1使用

    一. RabbitMQ使用 什么叫消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只...

  • RabbbitMQ RabbitListener 使用IP动态队

    在RabbitMQ消息队列使用 @RabbitListener 接收消息,队列名称使用常量命名,但是如果使用动态队...

  • RabbitMQ-简单队列

    简单队列 添加依赖 P:消息的生产者-->队列-->消费者 连接rabbitmq 发送消息 监听(接收消息) 简单...

网友评论

      本文标题:微服务消息队列-RabbitMQ的简单使用

      本文链接:https://www.haomeiwen.com/subject/frdqrrtx.html