美文网首页
2020-07-29--stream消息驱动

2020-07-29--stream消息驱动

作者: 李霖神谷 | 来源:发表于2020-07-29 13:13 被阅读0次

    stream是消息驱动框架,类似于jdbc驱动,它整合了kafka、rabitmq。使得开发人员只专注于底层的业务逻辑,不关注用的是什么消息中间件。
    1.demo:

    pom文件:
     <!--rabbit驱动-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    
    配置文件:
    
    server:
      port: 8801
    eureka:
      client:
        service-url:
          defaultZone: http://localhost:7001/eureka/
      instance:
        ##标识你的服务名称
        instance-id: spring-cloud-config-consume
        ##标识你的ip地址
        prefer-ip-address: true
    spring:
      application:
        name: springcloudstream8801
      cloud:
        stream:
          ##绑定rabbitmq信息
          binders:
            defaultRabbit:
              type: rabbit
              enviroment:
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    username: guest
                    password: guest
          ##服务的整合处理
          bindings:
            output:
              ##使用exchange名称定义
              destination: studyExchange
              content-tpe: application/json
              binder: defaultRabbit
    
    业务逻辑:
    消息生产者:
    //定义消息的推送通道
    @EnableBinding(Source.class)
    public class IMessegeOutImpl implements IMessegeOut {
        //        消息发送管道
        @Resource
        private MessageChannel output;
        public String outMessage() {
            output.send(MessageBuilder.withPayload("一剪梅").build());
            return null;
        }
    }
    
    
    消息消费者:
    消费者配置文件里的output要改成input
    @Component
    @EnableBinding(Sink.class)
    public class StreamController {
        @StreamListener(Sink.INPUT)
        public  void  input(Message<String> message ){
            System.out.println("我是消息消费者============="+message.getPayload());
        }
    }
    
    
    

    2.消息重复消费问题、持久化问题:
    由于消费者被绑定之后都会生成不同的流水号的组,这里在配置文件中设置group,设置同一个组名。生产的消息会被均分到同组下的所有服务中去,解决了消息城府消费问题。
    如果某一个服务待机挂掉同组的服务会消费未被接受的消息。

        bindings:
            input:
              ##使用exchange名称定义
              destination: studyExchange
              content-tpe: application/json
              binder: defaultRabbit
              group: lishuaiA
    

    相关文章

      网友评论

          本文标题:2020-07-29--stream消息驱动

          本文链接:https://www.haomeiwen.com/subject/fwchrktx.html