美文网首页
Spring Cloud Stream Kafka

Spring Cloud Stream Kafka

作者: dudy | 来源:发表于2019-12-15 12:00 被阅读0次

    Spring Cloud Stream Kafka 基本使用

    RocketMQ对Spring Cloud Stream 的介绍

    Spring Cloud Stream 体系及原理介绍

    主要概念

    • 应⽤用模型
    • Binder 抽象
    • 持久化 发布/订阅⽀支持
    • 消费分组⽀支持
    • 分区⽀支持

    基本概念

    Source:Stream 发送源

    Sink:Stream 接收器器

    Processor:

    相关注解

    激活:

    • @EnableBinding
    • @Configuration
    • @EnableIntegration

    Source:

    • @Output
    • MessageChannel

    Sink:

    • @Input
    • SubscribableChannel
    • @ServiceActivator
    • @StreamListener

    生产者

    配置

    spring:
      application:
        name: stream-sink
      cloud:
        stream:
          kafka:
            binder:
              brokers: localhost:9092
          bindings:
            goods-out: # 输出通道
              destination: goods  # 对应的topic
              contentType: application/json
            #也可以bing多个通道
            log-out:
                destination: log  # 对应的topic
              contentType: application/json
          default-binder: kafka  #与consul 使用时需要指定 binder
    

    定义通道

    public interface GreetingsStreams {
        String OUTPUT = "goods-out"; // 与配置中一样
    
        @Output(OUTPUT)
        MessageChannel outboundGreetings();
    }
    

    激活

    @EnableBinding(GreetingsStreams.class)
    public class StreamsConfig {
    }
    

    发送消息

    @Service
    @Slf4j
    public class GreetingsService {
        private final GreetingsStreams greetingsStreams;
    
        public GreetingsService(GreetingsStreams greetingsStreams) {
            this.greetingsStreams = greetingsStreams;
        }
    
        public void sendGreeting(final Greetings greetings) {
            log.info("Sending greetings {}", greetings);
    
            MessageChannel messageChannel = greetingsStreams.outboundGreetings();
            messageChannel.send(MessageBuilder
                    .withPayload(greetings)
                    .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                    .build());
        }
    }
    

    消费者

    配置

    spring:
      application:
        name: stream-sink
    
      cloud:
        stream:
          kafka:
            binder:
              brokers: localhost:9092
          bindings:
            goods-in:
              destination: goods
              contentType: application/json
              group: finance # 指定消费者组
          default-binder: kafka
    

    定义通道

    public interface GreetingsStreams {
        String INPUT = "goods-in";
        @Input(INPUT)
        SubscribableChannel inboundGreetings();
    }
    
    

    激活

    @EnableBinding(GreetingsStreams.class)
    public class StreamsConfig {
    }
    

    接收消息

    @Component
    @Slf4j
    public class GreetingsListener {
        /**
         *
         * @param greetings
         * @param partition  从哪个分区获取的数据
         */
        @StreamListener(GreetingsStreams.INPUT)
        public void handleGreetings(@Payload Greetings greetings,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
            log.info("Received message: {},from partition : {}", greetings,partition);
        }
    }
    

    消费分区

    kafka 的partition 是一个有序队列,指定key可以将相同key的数据发送到同一个partition,可以保证消息有序消费

    spring:
      application:
        name: stream-source
    
      cloud:
        stream:
          kafka:
            binder:
              brokers: localhost:9092
              auto-add-partitions: true
          bindings:
            goods-out:
              destination: goods
              contentType: application/json
              producer:
                partition-key-expression:  headers['partitionKey'] # partition key 表达式
                partition-count: 4  # partition 数量
          default-binder: kafka
    

    发送端

    private final static String PARTITION_KEY = "partitionKey";
    private final GreetingsStreams greetingsStreams;
    
    public GreetingsService(GreetingsStreams greetingsStreams) {
       this.greetingsStreams = greetingsStreams;
    }
    
    public void sendGreeting(final Greetings greetings, String key) {
        log.info("Sending greetings {}", greetings);
    
        MessageChannel messageChannel = greetingsStreams.outboundGreetings();
        messageChannel.send(MessageBuilder
                .withPayload(greetings)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .setHeader(PARTITION_KEY, key)
                .build());
    }
    

    接收端

    @StreamListener( target = GreetingsStreams.INPUT, condition = "headers['partitionKey']=='2'")
    可以指定condition,接收指定条件的消息

        @StreamListener( target = GreetingsStreams.INPUT, condition = "headers['partitionKey']=='1'")
    
        public void handleKey1(@Payload Greetings greetings,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
            log.info("Received message: {},from partition : {}", greetings,partition);
        }
    
        @StreamListener( target = GreetingsStreams.INPUT, condition = "headers['partitionKey']=='2'")
    
        public void handleKey2(@Payload Greetings greetings,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
            log.info("Received message: {},from partition : {}", greetings,partition);
        }
    
    
        @StreamListener( target = GreetingsStreams.INPUT)
    
        public void handle(@Payload Greetings greetings,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
            log.info("Received message: {},from partition : {}", greetings,partition);
        }
    

    手动应答

    消费者端:

    可以设置 spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset 为false

    @SpringBootApplication
    @EnableBinding(Sink.class)
    public class ManuallyAcknowdledgingConsumer {
    
     public static void main(String[] args) {
         SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
     }
    
     @StreamListener(Sink.INPUT)
     public void process(Message<?> message) {
         Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
         if (acknowledgment != null) {
             System.out.println("Acknowledgment provided");
             acknowledgment.acknowledge();
         }
     }
    }
    

    测试不成功,控制台输出的consumerConfig 依然为true,不知道为啥

    auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [localhost:9092]
        check.crcs = true
        client.id = 
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = true  一直为自动提交
    

    生产端 可以 将应答配置为同步的

    spring.cloud.stream.kafka.bindings.output.producer.sync=true
    

    https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html

    https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/

    相关文章

      网友评论

          本文标题:Spring Cloud Stream Kafka

          本文链接:https://www.haomeiwen.com/subject/rlmknctx.html