美文网首页
KAFKA环境配置以及运行

KAFKA环境配置以及运行

作者: muffinfeng | 来源:发表于2018-09-28 17:12 被阅读0次
    KAFKA理解

    首先下载zookeeper(cdh5.7.0版本),解压,配置环境变量。

    在conf目录下,复制zoo_sample.cfg  重新命名zoo.cfg

    更改dataDir目录的值,因为tmp文件夹在关机之后会被删除。

    开启zookeeper:

    在bin目录下

    ./zkServer.sh start

    接下来下载KAFKA(0.9.0.0版本),解压,配置环境变量

    接下来修改配置文件server.properties

    log.dirs值要修改:因为tmp文件夹在关机之后会被删除。

    host_name值要修改为hadoop000(自己主机的名字)

    zookeeper.connect修改为hadoop000:2181

    记得先改 sudo vi /etc/hosts 的ip地址信息!!!

    开启KAFKA:kafka-server-start.sh $KAFKA_HOME/config/server.properties

    创建topic :kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic hello_topic

    replication-factor是副本数量,partitions 是分区数量   hello_topic是自定义topic名。

    查看所有topic:kafka-topics.sh --list --zookeeper hadoop000:2181

    发送消息 broker:kafka-console-producer.sh --broker-list hadoop000:9092 --topic hello_topic

    消费消息 :kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic hello_topic

    接下来是单节点多broker部署和使用:

    复制三个server.properties...

    server-1.properties要修改的地方:

        log.dirs=/home/hadoop/app/tmp/kafka-logs-1

        listeners=PLAINTEXT://:9093

        broker.id=1

    server-2.properties要修改的地方:

        log.dirs=/home/hadoop/app/tmp/kafka-logs-2

        listeners=PLAINTEXT://:9094

        broker.id=2

    server-3.properties要修改的地方:

        log.dirs=/home/hadoop/app/tmp/kafka-logs-3

        listeners=PLAINTEXT://:9095

        broker.id=3

        开启三个KAFKA节点:

    kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties &

    kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties &

    kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties &

    创建topic:

    kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

    发送消息

    broker:kafka-console-producer.sh --broker-list hadoop000:9093,hadoop000:9094,hadoop000:9095 --topic my-replicated-topic

    接受消息:

    kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic my-replicated-topic

    接下来是IDEA上的KAFKA的API调用:

    创建常量类:

    //kafka常用配置文件

    public class KafkaProperties {

    public static final StringZK ="192.168.8.51";

        public static final StringTOPIC ="hello_topic";

        public static final StringBROKER_LIST ="192.168.8.51:9092";

    }

    KAFKAProducer类(集成Thread类)主要代码:

    public KafkaProducer(String topic){

    this.topic = topic;

        Properties properties =new Properties();

        properties.put("metadata.broker.list",KafkaProperties.BROKER_LIST);

        properties.put("serializer.class","kafka.serializer.StringEncoder");

        properties.put("request.required.acks","1");

        producer =new Producer(new ProducerConfig(properties));

    }

    public void run() {

    int messageNo =1;

        while(true){

    String message ="message_" + messageNo;

            producer.send(new KeyedMessage(topic,message));

            System.out.println("Sent: " + message);

            messageNo++;

            try{

    Thread.sleep(2000);

            }catch (Exception e){

    e.printStackTrace();

            }

    }

    }

    虚拟中先要开启zookeeper和KAFKA,然后跑KafkaProducer的run方法

    结果

    楼主第一次执行其实是有报错的,原因是kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.

    刚开始以为是网络问题,但是发现在windows上能ping通虚拟机,然后

    百度了之后发现在server.properties中加入一行advertised.listeners=PLAINTEXT://192.168.8.51:9092之后就行了。

    FLUME 整合KAFKA过程:

    先开启zookeeper和KAFKA:

    ./zkServer.sh start

    kafka-server-start.sh $KAFKA_HOME/config/server.properties。

    修改FLUME_HOME/conf 下的avro-memory-logger.conf更名为avro-memory-kafka.conf。修改的内容如下:

    avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink

    avro-memory-kafka.sinks.kafka-sink.brokerList = hadoop000:9092

    avro-memory-kafka.sinks.kafka-sink.topic = hello_topic

    avro-memory-kafka.sinks.kafka-sink.batchSize = 5

    avro-memory-kafka.sinks.kafka-sink.requiredAcks =1

    开启第二个FLUME:

    flume-ng agent \

    --name avro-memory-kafka \

    --conf $FLUME_HOME/conf \

    --conf-file $FLUME_HOME/conf/avro-memory-kafka.conf \

    -Dflume.root.logger=INFO,console

    再开启第一个FLUME:

    flume-ng agent \

    --name exec-memory-avro \

    --conf $FLUME_HOME/conf \

    --conf-file $FLUME_HOME/conf/exec-memory-avro.conf \

    -Dflume.root.logger=INFO,console

    开启消费者看是否能接收消息:

    kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic hello_topic

    相关文章

      网友评论

          本文标题:KAFKA环境配置以及运行

          本文链接:https://www.haomeiwen.com/subject/rnbsoftx.html