美文网首页
尚硅谷大数据技术之Kafka

尚硅谷大数据技术之Kafka

作者: 尚硅谷教育 | 来源:发表于2018-12-10 09:32 被阅读6次

4.2.4 自定义分区生产者
0)需求:将所有数据存储到topic的第0号分区上
1)定义一个类实现Partitioner接口,重写里面的方法(过时API)
package com.atguigu.kafka;
import java.util.Map;
import kafka.producer.Partitioner;

public class CustomPartitioner implements Partitioner {

public CustomPartitioner() {
    super();
}

@Override
public int partition(Object key, int numPartitions) {
    // 控制分区
    return 0;
}

}
2)自定义分区(新API)
package com.atguigu.kafka;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

public class CustomPartitioner implements Partitioner {

@Override
public void configure(Map<String, ?> configs) {
    
}

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    // 控制分区
    return 0;
}

@Override
public void close() {
    
}

}
3)在代码中调用
package com.atguigu.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class PartitionerProducer {

public static void main(String[] args) {
    
    Properties props = new Properties();
    // Kafka服务端的主机名和端口号
    props.put("bootstrap.servers", "hadoop103:9092");
    // 等待所有副本节点的应答
    props.put("acks", "all");
    // 消息发送最大尝试次数
    props.put("retries", 0);
    // 一批消息处理大小
    props.put("batch.size", 16384);
    // 增加服务端请求延时
    props.put("linger.ms", 1);
    // 发送缓存区内存大小
    props.put("buffer.memory", 33554432);
    // key序列化
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    // value序列化
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    // 自定义分区
    props.put("partitioner.class", "com.atguigu.kafka.CustomPartitioner");

    Producer<String, String> producer = new KafkaProducer<>(props);
    producer.send(new ProducerRecord<String, String>("first", "1", "atguigu"));

    producer.close();
}

}
4)测试
(1)在hadoop102上监控/opt/module/kafka/logs/目录下first主题3个分区的log日志动态变化情况
[atguigu@hadoop102 first-0]tail -f 00000000000000000000.log [atguigu@hadoop102 first-1] tail -f 00000000000000000000.log
[atguigu@hadoop102 first-2]tail -f 00000000000000000000.log (2)发现数据都存储到指定的分区了。 4.3 Kafka消费者Java API 4.3.1 高级API 0)在控制台创建发送者 [atguigu@hadoop104 kafka] bin/kafka-console-producer.sh
--broker-list hadoop102:9092 --topic first

hello world
1)创建消费者(过时API)
package com.atguigu.kafka.consume;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class CustomConsumer {

@SuppressWarnings("deprecation")
public static void main(String[] args) {
    Properties properties = new Properties();
    
    properties.put("zookeeper.connect", "hadoop102:2181");
    properties.put("group.id", "g1");
    properties.put("zookeeper.session.timeout.ms", "500");
    properties.put("zookeeper.sync.time.ms", "250");
    properties.put("auto.commit.interval.ms", "1000");
    
    // 创建消费者连接器
    ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    
    HashMap<String, Integer> topicCount = new HashMap<>();
    topicCount.put("first", 1);
    
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCount);
    
    KafkaStream<byte[], byte[]> stream = consumerMap.get("first").get(0);
    
    ConsumerIterator<byte[], byte[]> it = stream.iterator();
    
    while (it.hasNext()) {
        System.out.println(new String(it.next().message()));
    }
}

}
2)官方提供案例(自动维护消费情况)(新API)
package com.atguigu.kafka.consume;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class CustomNewConsumer {

public static void main(String[] args) {

    Properties props = new Properties();
    // 定义kakfa 服务的地址,不需要将所有broker指定上 
    props.put("bootstrap.servers", "hadoop102:9092");
    // 制定consumer group 
    props.put("group.id", "test");
    // 是否自动确认offset 
    props.put("enable.auto.commit", "true");
    // 自动确认offset的时间间隔 
    props.put("auto.commit.interval.ms", "1000");
    // key的序列化类
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    // value的序列化类 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    // 定义consumer 
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
    // 消费者订阅的topic, 可同时订阅多个 
    consumer.subscribe(Arrays.asList("first", "second","third"));

    while (true) {
        // 读取数据,读取超时时间为100ms 
        ConsumerRecords<String, String> records = consumer.poll(100);
        
        for (ConsumerRecord<String, String> record : records)
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}

}
4.3.2 低级API
实现使用低级API读取指定topic,指定partition,指定offset的数据。
1)消费者使用低级API 的主要步骤:
步骤 主要工作
1 根据指定的分区从主题元数据中找到主副本
2 获取分区最新的消费进度
3 从主副本拉取分区的消息
4 识别主副本的变化,重试
2)方法描述:
findLeader() 客户端向种子节点发送主题元数据,将副本集加入备用节点
getLastOffset() 消费者客户端发送偏移量请求,获取分区最近的偏移量
run() 消费者低级AP I拉取消息的主要方法
findNewLeader() 当分区的主副本节点发生故障,客户将要找出新的主副本
3)代码:
package com.atguigu;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.BrokerEndPoint;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

public class SimpleExample {
private List<String> m_replicaBrokers = new ArrayList<>();

public SimpleExample() {
    m_replicaBrokers = new ArrayList<>();
}

public static void main(String args[]) {
    SimpleExample example = new SimpleExample();
    // 最大读取消息数量
    long maxReads = Long.parseLong("3");
    // 要订阅的topic
    String topic = "test1";
    // 要查找的分区
    int partition = Integer.parseInt("0");
    // broker节点的ip
    List<String> seeds = new ArrayList<>();
    seeds.add("192.168.9.102");
    seeds.add("192.168.9.103");
    seeds.add("192.168.9.104");
    // 端口
    int port = Integer.parseInt("9092");
    try {
        example.run(maxReads, topic, partition, seeds, port);
    } catch (Exception e) {
        System.out.println("Oops:" + e);
        e.printStackTrace();
    }
}

public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
    // 获取指定Topic partition的元数据
    PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
    if (metadata == null) {
        System.out.println("Can't find metadata for Topic and Partition. Exiting");
        return;
    }
    if (metadata.leader() == null) {
        System.out.println("Can't find Leader for Topic and Partition. Exiting");
        return;
    }
    String leadBroker = metadata.leader().host();
    String clientName = "Client_" + a_topic + "_" + a_partition;

    SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
    long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
    int numErrors = 0;
    while (a_maxReads > 0) {
        if (consumer == null) {
            consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
        }
        FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();
        FetchResponse fetchResponse = consumer.fetch(req);

        if (fetchResponse.hasError()) {
            numErrors++;
            // Something went wrong!
            short code = fetchResponse.errorCode(a_topic, a_partition);
            System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
            if (numErrors > 5)
                break;
            if (code == ErrorMapping.OffsetOutOfRangeCode()) {
                // We asked for an invalid offset. For simple case ask for
                // the last element to reset
                readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
                continue;
            }
            consumer.close();
            consumer = null;
            leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
            continue;
        }
        numErrors = 0;

        long numRead = 0;
        for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
            long currentOffset = messageAndOffset.offset();
            if (currentOffset < readOffset) {
                System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
                continue;
            }
            readOffset = messageAndOffset.nextOffset();
            ByteBuffer payload = messageAndOffset.message().payload();

            byte[] bytes = new byte[payload.limit()];
            payload.get(bytes);
            System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
            numRead++;
            a_maxReads--;
        }

        if (numRead == 0) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException ie) {
            }
        }
    }
    if (consumer != null)
        consumer.close();
}

public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);

    if (response.hasError()) {
        System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
        return 0;
    }
    long[] offsets = response.offsets(topic, partition);
    return offsets[0];
}


private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
    for (int i = 0; i < 3; i++) {
        boolean goToSleep = false;
        PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
        if (metadata == null) {
            goToSleep = true;
        } else if (metadata.leader() == null) {
            goToSleep = true;
        } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
            // first time through if the leader hasn't changed give
            // ZooKeeper a second to recover
            // second time, assume the broker did recover before failover,
            // or it was a non-Broker issue
            //
            goToSleep = true;
        } else {
            return metadata.leader().host();
        }
        if (goToSleep) {
                Thread.sleep(1000);
        }
    }
    System.out.println("Unable to find new leader after Broker failure. Exiting");
    throw new Exception("Unable to find new leader after Broker failure. Exiting");
}

private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
    PartitionMetadata returnMetaData = null;
    loop:
    for (String seed : a_seedBrokers) {
        SimpleConsumer consumer = null;
        try {
            consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
            List<String> topics = Collections.singletonList(a_topic);
            TopicMetadataRequest req = new TopicMetadataRequest(topics);
            kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

            List<TopicMetadata> metaData = resp.topicsMetadata();
            for (TopicMetadata item : metaData) {
                for (PartitionMetadata part : item.partitionsMetadata()) {
                    if (part.partitionId() == a_partition) {
                        returnMetaData = part;
                        break loop;
                    }
                }
            }
        } catch (Exception e) {
            System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e);
        } finally {
            if (consumer != null)
                consumer.close();
        }
    }
    if (returnMetaData != null) {
        m_replicaBrokers.clear();
        for (BrokerEndPoint replica : returnMetaData.replicas()) {
            m_replicaBrokers.add(replica.host());
        }
    }
    return returnMetaData;
}

}

本教程由尚硅谷教育大数据研究院出品,如需转载请注明来源,欢迎大家关注尚硅谷公众号(atguigu)了解更多。

相关文章

网友评论

      本文标题:尚硅谷大数据技术之Kafka

      本文链接:https://www.haomeiwen.com/subject/uqzlhqtx.html