美文网首页Laboratory
Kafka实践(一)

Kafka实践(一)

作者: MisterCH | 来源:发表于2017-03-02 14:20 被阅读198次

    昨天在测试环境搭建了一套zookeeper+kafka(各一台)的机器,开始进行kafka的实践之旅。昨天下班前一直都出现无法发送无法接收的问题,今天终于搞定了。

    zookeeper的安装

    直接从官网下载bin包后,解压即可

    tar -zxvf zookeeper-3.4.9.tar.gz
    

    需要修改的配置有:

    1. 把conf目录下的zoo_sample.cfg改名为zoo.cfg(并修改dataDir)
    2. 修改bin目录下的zkEnv.sh脚本中的ZOO_LOG_DIR和ZOO_LOG4J_PROP

    启动zookeeper

    bin/zkServer.sh start
    

    Kafka的安装

    由于只使用了一个broker,所以直接解压包

    tar -zxvf kafka_2.11-0.10.2.0.tgz
    

    需要修改的配置为config/server.properties文件,主要修改的有log.dirs和listeners。

    listeners=PLAINTEXT://localhost:9092
    

    这里有个坑,server.properties中一定要配置host.name或者listeners,不然会出现无法收发消息的现象
    然后启动即可

    bin/kafka-server-start.sh config/server.properties &
    

    客户端

    安装完以后需要写生产者的消费者了,直接用最简单的方法来写。

    Producer

    package producer;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    public class Producer {
      public static void main(String[] args) throws InterruptedException, ExecutionException {
        Properties props = new Properties();
        props.put("bootstrap.servers","122.20.109.68:9092");
        props.put("acks","1");
        props.put("retries","0");
        props.put("batch.size","16384");
    // props.put("linger.ms","1");
    // props.put("buffer.memory","33554432");
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
    //生产者的建立
        KafkaProducer producer = new KafkaProducer<>(props);
    
        for (int i=0;i<100;i++) {
          System.out.println("seding message "+i);
          ProducerRecord record = new ProducerRecord("testTopic",String.valueOf(i),"this is message"+i);
          producer.send(record, new Callback() {
            public void onCompletion (RecordMetadata metadata, Exception e) {
              if (null != e) {
                e.printStackTrace();
              } else {
                System.out.println(metadata.offset());
              }
            }
          });
        }
        Thread.sleep(100000);
        producer.close();
      }
    } 
    

    这里有个坑,如果我直接用producer.send(ProducerRecord)方法,发完100条以后producer.close(),会导致Kafka无法收到消息,怀疑是异步发送导致的,需要真的发送到Kafka以后才能停止Producer,所以我在后面sleep了一下,加上以后就可以正常发送了。
    使用callback是异步发送,此外还能使用同步发送,直接在send方法后加上一个get方法就会直接阻塞直到broker返回消息已收到。

    producer.send(record).get();
    

    Producer的properties有几个常用配置:

    • bootstrap.servers:Kafka集群连接串,可以由多个host:port组成
    • acks:broker消息确认的模式,有三种:
      0:不进行消息接收确认,即Client端发送完成后不会等待Broker的确认
      1:由Leader确认,Leader接收到消息后会立即返回确认信息
      all:集群完整确认,Leader会等待所有in-sync的follower节点都确认收到消息后,再返回确认信息
      我们可以根据消息的重要程度,设置不同的确认模式。默认为1
    • retries:发送失败时Producer端的重试次数,默认为0
    • batch.size:当同时有大量消息要向同一个分区发送时,Producer端会将消息打包后进行批量发送。如果设置为0,则每条消息都DuLi发送。默认为16384字节
    • linger.ms:发送消息前等待的毫秒数,与batch.size配合使用。在消息负载不高的情况下,配置linger.ms能够让Producer在发送消息前等待一定时间,以积累更多的消息打包发送,达到节省网络资源的目的。默认为0
    • key.serializer/value.serializer:消息key/value的序列器Class,根据key和value的类型决定
    • buffer.memory:消息缓冲池大小。尚未被发送的消息会保存在Producer的内存中,如果消息产生的速度大于消息发送的速度,那么缓冲池满后发送消息的请求会被阻塞。默认33554432字节(32MB)

    Consumer

    package consumer;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    
    public class Consumer {
      public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers","122.20.109.68:9092");
        props.put("group.id","test");
        props.put("enable.auto.commit","true");
        props.put("auto.commit.interval.ms","1000");
        props.put("session.timeout.ms","30000");
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    
        KafkaConsumer consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("testTopic"));
        while(true) {
          ConsumerRecords records = consumer.poll(1000);
          for (ConsumerRecord record: records) {
            System.out.println("offset "+record.offset()+" Message: "+record.value());
          }
        }
      }
    }
    

    Consumer的Properties的常用配置有:

    • bootstrap.servers/key.deserializer/value.deserializer:和Producer端的含义一样,不再赘述
    • fetch.min.bytes:每次最小拉取的消息大小(byte)。Consumer会等待消息积累到一定尺寸后进行批量拉取。默认为1,代表有一条就拉一条
    • max.partition.fetch.bytes:每次从单个分区中拉取的消息最大尺寸(byte),默认为1M
    • group.id:Consumer的group id,同一个group下的多个Consumer不会拉取到重复的消息,不同group下的Consumer则会保证拉取到每一条消息。注意,同一个group下的consumer数量不能超过分区数。
    • enable.auto.commit:是否自动提交已拉取消息的offset。提交offset即视为该消息已经成功被消费,该组下的Consumer无法再拉取到该消息(除非手动修改offset)。默认为true
    • auto.commit.interval.ms:自动提交offset的间隔毫秒数,默认5000。

    参考:http://www.cnblogs.com/edison2012/p/5774207.html

    相关文章

      网友评论

        本文标题:Kafka实践(一)

        本文链接:https://www.haomeiwen.com/subject/bughgttx.html