美文网首页
RocketMQ demo

RocketMQ demo

作者: Lucie_xxm | 来源:发表于2019-04-28 16:51 被阅读0次

    解决连接超时问题

    我们采用 Docker 部署了 RocketMQ 服务,此时 RocketMQ Broker 暴露的地址和端口(10909,10911)是基于容器的,会导致我们开发机无法连接,从而引发 org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout 异常
    注意下图中的 IP 地址,这个是容器的 IP,开发机与容器不在一个局域网所以无法连接。


    解决方案是在 broker.conf配置文件中增加 brokerIP1=宿主机IP 即可

    本文章项目都是基于spring boot 构建web项目

    RocketMQ 生产者

    POM

           <!-- Spring Cloud Begin -->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
            </dependency>
            <!-- Spring Cloud End -->
    

    主要增加了org.springframework.cloud:spring-cloud-starter-stream-rocketmq依赖

    消息生产者服务

    @Service
    public class ProviderService {
        @Autowired
        private MessageChannel output;
    
        public void send(String message) {
            output.send(MessageBuilder.withPayload(message).build());
        }
    }
    

    Application

    配置 Output(Source.class) 的 Binding 信息并配合 @EnableBinding 注解使其生效

    @SpringBootApplication
    @EnableBinding({Source.class})
    public class RocketMQProviderApplication implements CommandLineRunner {
    
        @Autowired
        private ProviderService providerService;
    
        public static void main(String[] args) {
            SpringApplication.run(RocketMQProviderApplication.class, args);
        }
    
        /**
         * 实现了 CommandLineRunner 接口,只是为了 Spring Boot 启动时执行任务,不必特别在意
         * @param args
         * @throws Exception
         */
        @Override
        public void run(String... args) throws Exception {
            providerService.send("Hello RocketMQ");
        }
    }
    

    application.yml

    spring:
      application:
        name: rocketmq-provider
      cloud:
        stream:
          rocketmq:
            binder:
              # RocketMQ 服务器地址
              namesrv-addr: 192.168.10.149:9876
          bindings:
            # 这里是个 Map 类型参数,{} 为 YAML 中 Map 的行内写法
            output: {destination: test-topic, content-type: application/json}
    
    server:
      port: 9093
    
    management:
      endpoints:
        web:
          exposure:
            include: '*'
    

    运行成功后即可在 RocketMQ 控制台的 消息 列表中选择 test-topic 主题即可看到发送的消息

    RocketMQ 消费者

    主要增加了 org.springframework.cloud:spring-cloud-starter-stream-rocketmq 依赖

    POM

         <!-- Spring Cloud Begin -->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
            </dependency>
            <!-- Spring Cloud End -->
    

    消息消费者服务

    主要使用 @StreamListener("input") 注解来订阅从名为 input 的 Binding 中接收的消息

    @Service
    public class ConsumerReceive {
    
        @StreamListener("input")
        public void receiveInput(String message) {
            System.out.println("Receive input: " + message);
        }
    }
    

    Application

    配置 Input(Sink.class) 的 Binding 信息并配合 @EnableBinding注解使其生效

    @SpringBootApplication
    @EnableBinding({Sink.class})
    public class RocketMQConsumerApplication {
        public static void main(String[] args) {
            SpringApplication.run(RocketMQConsumerApplication.class, args);
        }
    }
    

    application.yml

    spring:
      application:
        name: rocketmq-consumer
      cloud:
        stream:
          rocketmq:
            binder:
              namesrv-addr: 192.168.10.149:9876
            bindings:
              input: {consumer.orderly: true}
          bindings:
            input: {destination: test-topic, content-type: text/plain, group: test-group, consumer.maxAttempts: 1}
    
    server:
      port: 9094
    
    management:
      endpoints:
        web:
          exposure:
            include: '*'
    

    运行成功后即可在控制台接收到消息:Receive input: Hello RocketMQ

    相关文章

      网友评论

          本文标题:RocketMQ demo

          本文链接:https://www.haomeiwen.com/subject/rqljnqtx.html