美文网首页
微服务-springcloud-stream-kafka

微服务-springcloud-stream-kafka

作者: jianshuqiang | 来源:发表于2020-03-10 19:34 被阅读0次

    stream的作用:

    引入依赖

       <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            </dependency>
    

    添加配置项

    server.port=20001
    spring.application.name=sys-tool-kafka-stream
    kafka.topic=test
    spring.kafka.bootstrap-servers=localhost:9092
    spring.cloud.stream.bindings.output.destination=${kafka.topic}
    spring.cloud.stream.bindings.input.destination=${kafka.topic}
    

    功能测试

    生产者bean

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.support.MessageBuilder;
    
    
    @EnableBinding(Source.class)
    public class MessageProducerBean {
        @Autowired
        @Qualifier(Source.OUTPUT)
        private MessageChannel messageChannel;
        public void send(String message){
            final boolean send = messageChannel.send(MessageBuilder.withPayload(message).build());
        }
    }
    

    消费者bean

    package com.tool.stream.kafka.stream.consumer;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageHandler;
    import org.springframework.messaging.MessagingException;
    import org.springframework.messaging.SubscribableChannel;
    
    import javax.annotation.PostConstruct;
    
    @EnableBinding(Sink.class)
    public class MessageConsumerBean {
        @Autowired
        @Qualifier(Sink.INPUT)
        private SubscribableChannel subscribableChannel;
        @Autowired
        private Sink sink;
    
        //当bean注入完成后调用,以下三中方式用一种即可
        //第一种方式
        @PostConstruct
        public void init() {
            subscribableChannel.subscribe(new MessageHandler() {
                @Override
                public void handleMessage(Message<?> message) throws MessagingException {
                    Object payload = message.getPayload();
                    System.out.println(new String((byte[]) payload));
                }
            });
    
        }
        //第二种方式
        @ServiceActivator(inputChannel = Sink.INPUT)
        public void onMessage(Object message) {
            System.out.println("@ServiceActivator-->" +message);
    
        }
        //第三种方式
        @StreamListener(Sink.INPUT)
        public void onMessage(String message) {
            System.out.println("@StreamListener-->" + message);
    
        }
    
    }
    
    

    消息测试

    注意这些和kafka有关,所以应该先开启kafka

    package com.tool.stream.kafka;
    
    import com.tool.stream.kafka.stream.producer.MessageProducerBean;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @SpringBootApplication
    @RequestMapping("kafkastream")
    @RestController
    public class SysToolKafkaStreamApplication {
        @Autowired
        private MessageProducerBean messageProducerBean;
        @RequestMapping("test")
        public String send(String msg){
            messageProducerBean.send(msg);
            return "success";
        }
    
        public static void main(String[] args) {
            SpringApplication.run(SysToolKafkaStreamApplication.class, args);
        }
    
    }
    
    

    相关文章

      网友评论

          本文标题:微服务-springcloud-stream-kafka

          本文链接:https://www.haomeiwen.com/subject/quetlctx.html