美文网首页后端java
spring cloud stream kafka实例

spring cloud stream kafka实例

作者: go4it | 来源:发表于2017-04-10 13:22 被阅读7432次

    maven

        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-dependencies</artifactId>
                    <version>Camden.SR6</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>
    
        <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
    

    生产者配置

    server:
      port: 8081
    spring:
      application:
        name: output-demo
      cloud:
         instance-count: 1
         instance-index: 0
         stream:
            kafka:
              binder:
                brokers: localhost:9092
                zk-nodes: localhost:2182
                auto-add-partitions: true
                auto-create-topics: true
                min-partition-count: 1
            bindings:
              output:
                destination: event-demo
                content-type: text/plain
                producer:
                  partitionCount: 1
    
    
    
    • java代码
    @EnableBinding(Source.class)
    public class SendService {
    
        @Autowired
        private Source source;
    
        public void sendMessage(String msg) {
            try {
                source.output().send(MessageBuilder.withPayload(msg).build());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    @RestController
    public class ProducerController {
        
        @Autowired
        private SendService service;
        
        @RequestMapping(value = "/send/{msg}", method = RequestMethod.GET)
        public void send(@PathVariable("msg") String msg){
            service.sendMessage(msg);
        }
        
    }
    

    消费者

    spring:
      application:
        name: input-demo
      cloud:
         instance-count: 1
         instance-index: 0
         stream:
            kafka:
              binder:
                brokers: localhost:9092
                zk-nodes: localhost:2182
                auto-add-partitions: true
                auto-create-topics: true
                min-partition-count: 1
            bindings:
              input:
                destination: event-demo
                group: s1
                consumer:
                  autoCommitOffset: false
                  concurrency: 1
                  partitioned: false
    
    • java代码
    @EnableBinding(Sink.class)
    public class MsgSink {
    
        @StreamListener(Sink.INPUT)
        public void process(Message<?> message) {
            System.out.println(message.getPayload());
            Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
            if (acknowledgment != null) {
                System.out.println("Acknowledgment provided");
                acknowledgment.acknowledge();
            }
        }
    }
    

    运行

    先运行生产者,再运行消费者

    curl -i localhost:8081/send/hello1
    

    doc

    相关文章

      网友评论

        本文标题:spring cloud stream kafka实例

        本文链接:https://www.haomeiwen.com/subject/evysattx.html