美文网首页
【Kafka】Kafka入门手记

【Kafka】Kafka入门手记

作者: LamboChen | 来源:发表于2019-11-25 01:09 被阅读0次

    1. 前言

    本文为 Kafka 入门笔记,主要包括 Kafka 单节点部署、生产消费消息,以及新手踩坑记录。

    Kafka 作为大数据必备组件、消息中间件必学的 Apache 顶级开源项目,服务稳定、高吞吐的流数据处理平台。具体介绍可查看文末参考文档。

    目前 简书 暂不支持 markdown 收缩语句块,对于文章展示方面略有问题

    1.1. 可查看美观版本

    文档:Kafka 入门手记.md
    链接:http://note.youdao.com/noteshare?id=d1c65daf3c137f29a860b5efd5dff944&sub=F4E6A5E5FB3946499E7C4C3E6022E1DC

    2. Kafka单节点部署

    2.1. 下载

    http://kafka.apache.org

    选择合适版本即可,这里选择最新版本。Linux 环境。

    2.2. 解压

    Kafka 安装包 后缀为 .tgz , 解压即可。

    tar -zxvf kafka_package.tgz
    

    其中,kafka_package.tgz 是 Kafka 安装包名称。

    2.2.1. zookeeper 安装

    Kafka 依赖于 zookeeper(ZK) 支持,本文直接采用 Kafka 安装包自带的 zookeeper。

    也可以单独部署 zookeeper,使用方式一样。对于 生产环境,建议单独搭建 zookeeper。

    2.3. 配置

    2.3.1. zookeeper

    进入 Kafka 解压包根目录,config 文件夹下的即为 Kafka 提供的默认配置文件。

    此处我们修改下 zookeeper.propertieshost 信息。此处修改 host 配置,主要是为了避免在使用 Java 客户端连接解析为 localhost

    host.name=your_ip
    advertised.host.name=your_ip
    

    其中,your_ip 配置为服务器外网 IP。

    2.3.2. Kafka

    同样在 config 配置文件下修改 server.properties 文件即可。

    主要配置如下:

    # 监听
    listeners=PLAINTEXT://服务器内网IP:9092
    
    # 以下两项类似 zookeeper 配置
    advertised.listeners=PLAINTEXT://服务器外网IP:9092
    host.name=外网IP
    
    # zookeeper 连接信息
    zookeeper.connect=zookeeper服务器IP:2181
    

    2.4. 启动

    先启动 zookeeper,因为 Kafka 启动的时候会连接注册 zookeeper。

    2.4.1. zookeeper

    启动 zookeeper,在 Kafka 根目录执行

    ./bin/zookeeper-server-start.sh ./config/zookeeper.properties
    

    上述方式,会占有 shell 客户端窗口,如果想后台启动,添加参数 daemon 即可

    ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
    

    2.4.2. kafka

    类似 zookeeper 启动。

    ./bin/kafka-server-start.sh ./config/server.properties
    

    后台启动:

    ./bin/kafka-server-start.sh -daemon ./config/server.properties
    

    此时 Kafka 单节点部署就已经完成了,通过 ps -ef | grep zookeeper, ps -ef | grep kafka 看到对应进程,证明启动成功。

    2.5. topic

    2.5.1. 创建 topic

    通过 Kafka 提供的脚本文件,即可创建。在 Kafka 根目录下执行:

    ./config/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic topic_name
    

    1)指定 zk 为 localhost:2181
    2)副本因子为 1,即不需要副本
    3)partition 数量为 3
    4)topic 名称为 top_name

    2.5.2. 查看 topic

    ./config/kafka-topics.sh --list --zookeeper localhost:2181
    
    1. list 命令用于查看
      2)需要指定 zk

    2.6. 生产消费

    此处直接通过 Kafka 提供的简单客户端进行生产消费数据。

    2.6.1. 生产

    1、启动简单 producer

    ./config/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_name
    

    1) --broker-list 指定 Kafka 的地址及端口
    2)--topic 指定具体 topic_name

    2、 生产消息

    直接在 producer 窗口输入消息即可,消息是否发送成功,直接在 consumer 窗口即可查看。

    2.6.2. 消费

    1、启动简单 consumer

    ./config/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_name --from-beginning
    

    1)--bootstrap-server 指定 Kafka 地址及端口
    2)-- topic 指定 topic
    3)--from-beginning 表示指定从 offset 从头开始消费

    2、消费数据

    直接在 producer 窗口发送消息,然后切换至 consumer 窗口,查看是否成功消费消息。

    3. Java Client 生产消费

    此步基于 SpringBoot 进行搭建 demo 项目。 SpringBoot 版本为 2.x

    3.1. 新建 SpringBoot 项目

    直接通过 spring.starter 创建即可。完整项目: lambochen/demo/kafka

    3.1.1. 引入依赖

    Kafka 依赖:

    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.3.1</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    

    3.2. Kafka 配置

    3.2.1. producer

    1、application.properties 配置

    kafka.producer.servers=kafka服务器IP:服务器端口号
    kafka.producer.retries=0
    kafka.producer.batch.size=4096
    kafka.producer.linger=1
    kafka.producer.buffer.memory=40960
    
    kafka.topic.default=topic名称
    

    2、ProducerFactory, KafkaTemplate 配置:

    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new LinkedHashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG,linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        return props;
    }
    
    public ProducerFactory<String, MessageEntity> producerFactory(){
        return new DefaultKafkaProducerFactory<>(
                producerConfigs(),
                new StringSerializer(),
                new JsonSerializer<MessageEntity>());
    }
    
    @Bean("kafkaTemplate")
    public KafkaTemplate<String, MessageEntity> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
    

    3.2.2. consumer

    1、application.properties 配置

    kafka.consumer.zookeeper.connect=ZK服务器端口:ZK端口
    kafka.consumer.servers=Kafka服务器IP:Kafka端口
    kafka.consumer.enable.auto.commit=true
    kafka.consumer.session.timeout=6000
    kafka.consumer.auto.commit.interval=100
    kafka.consumer.auto.offset.reset=latest
    kafka.consumer.topic=topic名称
    kafka.consumer.group.id=consumerGroup名称
    kafka.consumer.concurrency=10
    

    2、KafkaListenerContainerFactory 配置

    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MessageEntity>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, MessageEntity> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    }
    
    private ConsumerFactory<String, MessageEntity> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(
                consumerConfigs(),
                new StringDeserializer(),
                new JsonDeserializer<>(MessageEntity.class)
        );
    }
    
    private Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return propsMap;
    }
    

    3.2.3. ProducerCallBack

    生产回调,主要用于生产者发送消息后的处理。此 demo 仅作日志记录。

    需要集成 ListenableFutureCallback ,并指定消息实体类型。

    public class ProducerCallback implements ListenableFutureCallback<SendResult<String, MessageEntity>> 
    

    其中, MessageEntity 为消息实体类型。

    3.3. 生产消息

    创建 ProducerRecord 消息,通过 KafkaTemplate 发送即可。

    @Autowired
    @Qualifier("kafkaTemplate")
    private KafkaTemplate<String, MessageEntity> kafkaTemplate;
    
    public void send(String topic, MessageEntity message) {
        kafkaTemplate.send(topic, message);
    }
    
    public void send(String topic, String key, MessageEntity message) {
        ProducerRecord<String, MessageEntity> record = new ProducerRecord<>(topic, key, message);
        long startTime = System.currentTimeMillis();
        ListenableFuture<SendResult<String, MessageEntity>> future = kafkaTemplate.send(record);
        future.addCallback(new ProducerCallback(startTime, key, message));
    
    }
    

    3.4. 消费消息

    消费消息,通过 @KafkaConsumer 注解即可实现。

    @KafkaListener(topics = "${kafka.topic.default}", containerFactory = "kafkaListenerContainerFactory")
    public void consumer(MessageEntity message){
        log.info("consumer: " + gson.toJson(message));
    }
    

    3.5. 测试

    启动 SpringBoot 项目,通过提供的 controller 进行请求生产消息。

    查看日志,成功记录消息内容,即为生产、消费成功。

    到此为止,Kafka demo 应用已完成啦

    4. 踩坑记录

    4.1. 打印日志

    我在刚开始建好项目、配置 Kafka 后,启动项目失败,无日志输出,不好排查问题。

    设置日志 level, application.properties 文件配置:

    logging.level.root=debug
    

    4.2. kafka 启动内存不足

    kafka 启动 报错cannot allocate memory,即内存不足

    4.3. java client 连接失败

    按照本教程配置,已经避免了这个问题。

    【kafka】Java连接出现Connection refused: no further information

    5. 参考

    相关文章

      网友评论

          本文标题:【Kafka】Kafka入门手记

          本文链接:https://www.haomeiwen.com/subject/rnarwctx.html