美文网首页我爱编程
kafka搭建及测试

kafka搭建及测试

作者: gb_QA_log | 来源:发表于2018-03-09 15:02 被阅读0次

    kafka

    介绍

    Kafka的github官网

    搭建

    配置zookeeper 2181

    zookeeper的配置要求参考这处文档

    搭建主要参考这篇教程

    配置kafka 9092

    搭建主要参考这篇教程及官方tutorial

    搭建出现的error及解决

    • zookeeper 的 bin 下面的日志 zookeeper.out
      出现Connection refused。很麻烦,可能是防火墙,也可能是端口占用,也可能是hosts文件
      参考:
      http://rayfuxk.iteye.com/blog/2279596
      https://blog.csdn.net/cflys/article/details/76598413 https://blog.csdn.net/newjueqi/article/details/38518917
      https://blog.csdn.net/caimengyuan/article/details/72875621

    • 出现warning

      查了资料发现没有配置下面这句listeners = PLAINTEXT://node1:9092

    • 出现error

      查了资料发现这是因为配置文件中的PLAINTEXT跟你请求的内容不同。举例来说,我在配置文件里配置的listeners=PLAINTEXT://10.127.96.151:9092,但是我想测试的时候请求的是./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic1 --from-beginning

      正确的应该是./kafka-console-consumer.sh --bootstrap-server 10.127.96.151:9092 --topic topic1 --from-beginning ,或者都更改node14

    • topics创建出错
      为主题生产/消费消息时,用node14,node15都可以,但是node16时候报test1=LEADER_NOT_AVAILABLE错误,此时Leader: 14 Replicas: 16,14,15 Isr: 14,15
      但是改成node14 node15 却没有问题,原因肯定在node16上
      查配置文件,发现所有node的zoo.cfg中 server把node16写成ndoe16,改正。

    • 仍然接不上node16
      把所有zookeeper和kafka关闭,先启动所有zookeeper发现正常,再启动kafka的node16,查看主题没问题,producer发现有问题,根据error信息,是接不上node16,再启动node14,查看主题没问题,producer出现error是no route to host。检查sudo firewall-cmd --state 果然node16防火墙还在,而node14 node15都关了

      关闭防火墙
      sudo systemctl stop firewalld.service
      sudo systemctl disable firewalld.service
      参考这里


    运行

    启动

    • 启动zookeeper:所有节点
    ./zkServer.sh start
    ./zkServer.sh status
    ./zkServer.sh restart
    
    • 启动kafka:所有节点
    nohup ./kafka-server-start.sh -daemon ../config/server.properties >/dev/null 2>&1 &
    
    ./kafka-server-stop.sh
    
    • jps:jps命令可以查看当前java进程

    单机启动

    • 启动zookeeper:单机
    ./bin/zookeeper-server-start.sh config/zookeeper.properties
    
    • 启动kafka:所以节点
    ./bin/kafka-server-start.sh config/server.properties
    

    基本操作

    备注:

    • bootstrap.servers只是用于客户端启动的时候有一个可以热启动的一个连接者,一旦启动完毕客户端就应该可以得知当前集群的所有节点的信息,日后集群扩展的时候客户端也能够自动实时的得到新节点的信息,即使bootstrap.servers里面的挂掉了也应该是能正常运行的,除非节点挂掉后客户端也重启了。
    • --broker-list 是--bootstrap-server的旧版本
    • 新建主题:创建的时候replication不能超过broker,partition无所谓
    ./kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 3 --partitions 3 --topic test
    
    • 查看主题
    ./kafka-topics.sh --list --zookeeper node1:2181
    
    • 主题详情
    ./kafka-topics.sh --describe --zookeeper node1:2181 --topic test
    
    • 创建发布者
    ./kafka-console-producer.sh --broker-list node1:9092 --topic test
    
    • 创建消费者
    ./kafka-console-consumer.sh --bootstrap-server node1:9092 --topic test --from-beginning
    
    • 删除主题:删除topic需要另外配置,可以参考这篇帖子
    ./kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --delete --topic "test"
    

    以上操作都运行成功,则搭建完成了。


    自带shell脚本测试

    配置kafka /config/server.properties 文件
    本地 KAFKA_HEAP_OPT="-Xmx2G"

    sogon KAFKA_HEAP_OPT="-Xmx30G"

    producer

    • 根据这封邮件,得知kafka 0.9之后,不支持thread的配置,而需要multi process来运行。但是也有coder在contribute了。
    • 在AWS测试,出现存储不够的问题,重新买机器,8g变为16g,再跑。可以df -h查看目前存储情况。要考虑50000000*128/1024/1024/1024 = 5.96 G每次测试产生的数据量。

    consumer

    待补充


    java client

    java client代码可以参考:

    java client project出现的error及解决

    • maven配置java client project以为更方便,没想到它的maven缺少了个log4j的jar包,运行出错又没有说是这个包的问题。在国外论坛看到别人说创建java project把包一个个放进去可以用,才发现有warning提示这个log4j,然后也放到libs文件夹,再设置properties中的Java Build Path的Libraries即可,。目前还不清楚maven的解决方案。

    • terminal运行出错。正确命令如下:

      在项目目录:

      javac -cp "libs/*" src/com/exam/main/Main.java -d bin/

      在项目目录/bin:

      java -cp ".:../libs/*" com/exam/main/Main

      java执行.class需要搜索.即本地目录。而linux下用:分开。java需要在package的目录,相对路径才能正确


    maven 项目

    maven错误发现是1.0.0包本身有问题,在pom.xml更改为1.0.1后问题解决。

    创建 project

    • 创建project,根据该文档修改pom.xml,添加如下依赖。
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>1.0.1</version>
    </dependency>
    
    • 右键项目把jre system library改为9.0.1,
    • 右键properties-java compiler改compiler compliance level为9。*
    • 在src/main/java下创建com.exam.main包,创建Main.java,写入如下测试代码即可。
    package com.exam.main;
    
    import java.util.Properties;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    public class Main {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
            for(int i = 0; i < 100; i++)
                producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)));
            producer.close();
        }
    }
    
    • 测试代码时,需要运行zookeeper、kafka,可同时打开一个consumer进行观察。

    相关文章

      网友评论

        本文标题:kafka搭建及测试

        本文链接:https://www.haomeiwen.com/subject/epgpfftx.html