美文网首页
kafka learning note

kafka learning note

作者: 简为2016 | 来源:发表于2020-03-06 16:45 被阅读0次

    2020/3/6

    Kafka Producer API

    1. producer.send (new ProducerRecord<byte[],byte[]>(topIc, partition, key1, value1), callback);
      (1) ProducerRecord - asynchronously send message to the topic
      (2) callback - the callback sent back to the user when the server confirm the record
    2. public void send(KeyedMessage<k,v> message)
      -sends the data to a single topic, partitioned by key using either sync or async producer.
    3. public void send(List<KeyedMessage<k,v>>messages)
      -sends data to multiple topics.
    4. SETTING PRODICERCONFIG
    properties prop = new Properties();
    prop.put(producer.type,”async”) asynchronization -> asynchronous
    ProducerConfig config = new ProducerConfig(prop);
    
    1. public void flush()
      -confirm that all the sending function has finished
    2. public Map metrics()
      -partition for get partition metadata of specifical topic
    3. producer API
    • clinet.id
    • producer.type
      -async/sync
    • acks ???

    acks配置控制生产者请求下的标准是完全的。

    • bootstrapping broker list
    • linger.ms
    • retry
      -setting linger.ms higher than specifical value to reduce requests
    • key.serializer
    • value.serializer
    • batch.size
    • buffer.memory
    1. ProducerRecord
    • public ProducerRecord ( string topic, int partition, k key, v value)
    • public ProducerRecord ( string topic, k key, v value)
      -no partition
    • public ProducerRecord ( string topic, v value)
      -either partiton and key

    SimpleProducer application

    import java.util.Properties;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    //Create java class named “SimpleProducer"
    public class SimpleProducer {
       
       public static void main(String[] args) throws Exception{
          
          // Check arguments length value
          if(args.length == 0){
             System.out.println("Enter topic name");
             return;
          }      
          //Assign topicName to string variable
          String topicName = args[0].toString();      
          // create instance for properties to access producer configs   
          Properties props = new Properties();      
          //Assign localhost id
          props.put("bootstrap.servers", “localhost:9092"); 
          //your ip:host based on config/servers.properties      
          
          //Set acknowledgements for producer requests.      
          props.put("acks", “all");
          
          //If the request fails, the producer can automatically retry,
          props.put("retries", 0);
          
          //Specify buffer size in config
          props.put("batch.size", 16384);
          
          //Reduce the no of requests less than 0   
          props.put("linger.ms", 1);
          
          //The buffer.memory controls the total amount of memory available to the producer for buffering.   
          props.put("buffer.memory", 33554432);
          
          props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
             
          props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
          
          Producer<String, String> producer = new KafkaProducer
             <String, String>(props);
                
          for(int i = 0; i < 10; i++) {
             producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i)));
          }
          System.out.println(“Message sent successfully");
          producer.close();
       }
    }
    
    • compile
      • javac "$KAFKA_HOME/libs/*" SimpleProducer.java
    • run
      • java "$KAFKA_HOME/libs/*" SimpleProducer

    kafka consumer API

    1. public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
    2. consumer api
    • public java.util.Set< TopicPar- tition> assignment()
      -get partition set currently divided by user
    • public string subscription()
      -get dynamic signature in specific topics
    • public void subscribe(java.util.List< java.lang.String> topics,ConsumerRe-balanceListener listener)
      -get dynamic signature in specific topics.
    • public void unsubscribe()
      -cancel subscription
    • public void subscribe(java.util.List< java.lang.String> topics)
      -if topics is null, share same function with unsubscribe
    • public void subscribe(java.util.regex.Pattern pattern,ConsumerRebalanceLis-tener listener)
    • public void assign(java.util.List< TopicPartion> partitions)
    • poll()
      -get data from partition
    • public void commitSync()
      -return the offset
    • public void seek(TopicPartition partition,long offset)
      -comsumer will adopt current offset in next poll
    • public void resume
    • public void wakeup

    PublicRecord API

    • public ConsumerRecord(string topic,int partition, long offset,K key, V value)
      -get record from kafka group
    • public ConsumerRecords(java.util.Map<TopicPartition,java.util.List<Consumer-Record>K,V>>> records)
      -as the container of record
      • public int count() - count of all topics
      • public Set partitions() - return partition with data set
      • public Iterator()
      • public List records()

    ConsumerClient API

    • broker list
    • group.id
    • enable.auto.commit
    • auto.commit.interval.ms
    • session.timeout.ms

    SimpleConsumer

    import java.util.Properties;
    import java.util.Arrays;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    
    public class SimpleConsumer {
       public static void main(String[] args) throws Exception {
          if(args.length == 0){
             System.out.println("Enter topic name");
             return;
          }
          //Kafka consumer configuration settings
          String topicName = args[0].toString();
          Properties props = new Properties();
          props.put("bootstrap.servers", "localhost:9092");
          props.put("group.id", "test");
          props.put("enable.auto.commit", "true");
          props.put("auto.commit.interval.ms", "1000");
          props.put("session.timeout.ms", "30000");
          props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
          props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
          KafkaConsumer<String, String> consumer = new KafkaConsumer <String, String>(props);
          //Kafka Consumer subscribes list of topics here.
          consumer.subscribe(Arrays.asList(topicName))
          //print the topic name
          System.out.println("Subscribed to topic " &plus; topicName);
          int i = 0;
          while (true) {
             ConsumerRecords<String, String> records = consumer.poll(100);
             for (ConsumerRecord<String, String> record : records)
             // print the offset,key and value for the consumer records.
             System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
          }
       }
    }
    

    相关文章

      网友评论

          本文标题:kafka learning note

          本文链接:https://www.haomeiwen.com/subject/loeirhtx.html