美文网首页SpringCloud/DubboSpringBoot后端技术
SpringBoot集成Kafka,实现简单的收发消息

SpringBoot集成Kafka,实现简单的收发消息

作者: 意识流丶 | 来源:发表于2018-11-22 14:25 被阅读3次

    实现之前一定要把JDK,Zookeeper和Kafka都配置好

    需要先配置下Kafka

    在kafka的config目录下找到server.properties配置文件

    image.png

    listenersadvertised.listeners两处配置的注释去掉,可以根据需要配置连接的服务器外网IP端口号,我这里演示选择的是本地localhost和默认端口9092

    Kafka与SpringBoot进行整合

    1.引入依赖

    <dependency>
         <groupId>org.springframework.kafka</groupId>
         <artifactId>spring-kafka</artifactId>
    </dependency>
    

    2.编写生产者和消费者

    @RestController
    public class KafkaController {
        private static Logger logger = LoggerFactory.getLogger(KafkaController.class);
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        @GetMapping("/kafka")
        public String testKafka() {
            int iMax = 100;
            for (int i = 1; i < iMax; i++) {
                kafkaTemplate.send("test","key" + i, "data" + i);
            }
            return "success";
        }
    
        @KafkaListener(topics = "test")
        public void receive(ConsumerRecord<?, ?> consumer) {
            logger.info("{} - {}:{}", consumer.topic(), consumer.key(), consumer.value());
        }
    }
    

    相关代码说明

    KafkaTemplate这个类包装了个生产者Producer,来提供方便的发送数据到kafka的主题topic里面。
    send()方法的源码,KafkaTemplate类中还重载了很多send()方法,有需要可以看看源码

    public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
            ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, key, data);
            return this.doSend(producerRecord);
        }
    

    通过KafkaTemplate模板类发送数据。
    kafkaTemplate.send(String topic, K key, V data),第一个入参是主题,第二个入参是发送的对象,第三个入参是发送的数据。通过@KafkaListener注解配置用户监听topics

    配置文件application.yml

    spring:
      kafka:
        bootstrap-servers: localhost:9092
        consumer:
          group-id: kafka2
          auto-offset-reset: earliest
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
    

    bootstrap-servers:kafka服务器地址(可以多个)
    consumer.group-id:指定一个默认的组名
    不指定的话会报

    java.lang.IllegalStateException: No group.id found in consumer config, 
    container properties, or @KafkaListener annotation; 
    a group.id is required when group management is used.
    
    auto-offset-reset:自动偏移量

    1.earliest :当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
    2.latest :当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
    3.nonetopic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

    这个属性也是必须配置的,不然也是会报错的

    org.apache.kafka.common.config.ConfigException: 
    Invalid value  for configuration auto.offset.reset: 
    String must be one of: latest, earliest, none
    

    消息序列化和反序列化

    在使用Kafka发送接收消息时,生产者producer端需要序列化,消费者consumer端需要反序列化,由于网络传输过来的是byte[],只有反序列化后才能得到生产者发送的真实的消息内容。这样消息才能进行网络传输
    consumer.key-deserializerconsumer.value-deserializer是消费者key/value反序列化
    producer.key-deserializerproducer.value-deserializer是生产者key/value序列化

    StringDeserializer是内置的字符串反序列化方式

    public class StringDeserializer implements Deserializer<String> {
      public String deserialize(String topic, byte[] data) {
            try {
            //如果数据为空,那么直接返回null即可,否则将byte[]反序列化,即转为String即可
                return data == null ? null : new String(data, this.encoding);
            } catch (UnsupportedEncodingException var4) {
                throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + this.encoding);
            }
        }
       ......
    }
    

    StringSerializer是内置的字符串序列化方式

    public class StringSerializer implements Serializer<String> {
        public byte[] serialize(String topic, String data) {
            try {
            //如果数据为空,那么直接返回null即可,否则将String序列化,即转为byte[]即可
                return data == null ? null : data.getBytes(this.encoding);
            } catch (UnsupportedEncodingException var4) {
                throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + this.encoding);
            }
        }
        ......
    }
    

    org.apache.kafka.common.serialization源码包中还提供了多种类型的序列化和反序列化方式
    要自定义序列化方式,需要实现接口Serializer
    要自定义反序列化方式,需要实现接口Deserializer

    详细可以参考
    https://blog.csdn.net/shirukai/article/details/82152172

    启动项目进行测试

    这是Kafka的消费者Consumer的配置信息,每个消费者都会输出该配置信息

        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [localhost:9092]
        check.crcs = true
        client.id = 
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = kafka2
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    
    2018-11-22 14:16:53.465  INFO 11980 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 2.0.0
    2018-11-22 14:16:53.465  INFO 11980 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : 3402a8361b734732
    2018-11-22 14:16:57.664  INFO 11980 --- [           main] org.apache.kafka.clients.Metadata        : Cluster ID: d3n7Snc2TFmSFcNsHjqgVw
    

    访问http://localhost:8080/kafka,就可以看到控制台打印消息了

    image.png

    相关文章

      网友评论

        本文标题:SpringBoot集成Kafka,实现简单的收发消息

        本文链接:https://www.haomeiwen.com/subject/drhlqqtx.html