美文网首页
尚硅谷大数据技术之Kafka

尚硅谷大数据技术之Kafka

作者: 尚硅谷教育 | 来源:发表于2018-12-10 09:32 被阅读6次

    4.2.4 自定义分区生产者
    0)需求:将所有数据存储到topic的第0号分区上
    1)定义一个类实现Partitioner接口,重写里面的方法(过时API)
    package com.atguigu.kafka;
    import java.util.Map;
    import kafka.producer.Partitioner;

    public class CustomPartitioner implements Partitioner {

    public CustomPartitioner() {
        super();
    }
    
    @Override
    public int partition(Object key, int numPartitions) {
        // 控制分区
        return 0;
    }
    

    }
    2)自定义分区(新API)
    package com.atguigu.kafka;
    import java.util.Map;
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;

    public class CustomPartitioner implements Partitioner {

    @Override
    public void configure(Map<String, ?> configs) {
        
    }
    
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 控制分区
        return 0;
    }
    
    @Override
    public void close() {
        
    }
    

    }
    3)在代码中调用
    package com.atguigu.kafka;
    import java.util.Properties;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;

    public class PartitionerProducer {

    public static void main(String[] args) {
        
        Properties props = new Properties();
        // Kafka服务端的主机名和端口号
        props.put("bootstrap.servers", "hadoop103:9092");
        // 等待所有副本节点的应答
        props.put("acks", "all");
        // 消息发送最大尝试次数
        props.put("retries", 0);
        // 一批消息处理大小
        props.put("batch.size", 16384);
        // 增加服务端请求延时
        props.put("linger.ms", 1);
        // 发送缓存区内存大小
        props.put("buffer.memory", 33554432);
        // key序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 自定义分区
        props.put("partitioner.class", "com.atguigu.kafka.CustomPartitioner");
    
        Producer<String, String> producer = new KafkaProducer<>(props);
        producer.send(new ProducerRecord<String, String>("first", "1", "atguigu"));
    
        producer.close();
    }
    

    }
    4)测试
    (1)在hadoop102上监控/opt/module/kafka/logs/目录下first主题3个分区的log日志动态变化情况
    [atguigu@hadoop102 first-0]tail -f 00000000000000000000.log [atguigu@hadoop102 first-1] tail -f 00000000000000000000.log
    [atguigu@hadoop102 first-2]tail -f 00000000000000000000.log (2)发现数据都存储到指定的分区了。 4.3 Kafka消费者Java API 4.3.1 高级API 0)在控制台创建发送者 [atguigu@hadoop104 kafka] bin/kafka-console-producer.sh
    --broker-list hadoop102:9092 --topic first

    hello world
    1)创建消费者(过时API)
    package com.atguigu.kafka.consume;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;

    public class CustomConsumer {

    @SuppressWarnings("deprecation")
    public static void main(String[] args) {
        Properties properties = new Properties();
        
        properties.put("zookeeper.connect", "hadoop102:2181");
        properties.put("group.id", "g1");
        properties.put("zookeeper.session.timeout.ms", "500");
        properties.put("zookeeper.sync.time.ms", "250");
        properties.put("auto.commit.interval.ms", "1000");
        
        // 创建消费者连接器
        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
        
        HashMap<String, Integer> topicCount = new HashMap<>();
        topicCount.put("first", 1);
        
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCount);
        
        KafkaStream<byte[], byte[]> stream = consumerMap.get("first").get(0);
        
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        
        while (it.hasNext()) {
            System.out.println(new String(it.next().message()));
        }
    }
    

    }
    2)官方提供案例(自动维护消费情况)(新API)
    package com.atguigu.kafka.consume;
    import java.util.Arrays;
    import java.util.Properties;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;

    public class CustomNewConsumer {

    public static void main(String[] args) {
    
        Properties props = new Properties();
        // 定义kakfa 服务的地址,不需要将所有broker指定上 
        props.put("bootstrap.servers", "hadoop102:9092");
        // 制定consumer group 
        props.put("group.id", "test");
        // 是否自动确认offset 
        props.put("enable.auto.commit", "true");
        // 自动确认offset的时间间隔 
        props.put("auto.commit.interval.ms", "1000");
        // key的序列化类
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // value的序列化类 
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 定义consumer 
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        
        // 消费者订阅的topic, 可同时订阅多个 
        consumer.subscribe(Arrays.asList("first", "second","third"));
    
        while (true) {
            // 读取数据,读取超时时间为100ms 
            ConsumerRecords<String, String> records = consumer.poll(100);
            
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
    

    }
    4.3.2 低级API
    实现使用低级API读取指定topic,指定partition,指定offset的数据。
    1)消费者使用低级API 的主要步骤:
    步骤 主要工作
    1 根据指定的分区从主题元数据中找到主副本
    2 获取分区最新的消费进度
    3 从主副本拉取分区的消息
    4 识别主副本的变化,重试
    2)方法描述:
    findLeader() 客户端向种子节点发送主题元数据,将副本集加入备用节点
    getLastOffset() 消费者客户端发送偏移量请求,获取分区最近的偏移量
    run() 消费者低级AP I拉取消息的主要方法
    findNewLeader() 当分区的主副本节点发生故障,客户将要找出新的主副本
    3)代码:
    package com.atguigu;
    import java.nio.ByteBuffer;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;

    import kafka.api.FetchRequest;
    import kafka.api.FetchRequestBuilder;
    import kafka.api.PartitionOffsetRequestInfo;
    import kafka.cluster.BrokerEndPoint;
    import kafka.common.ErrorMapping;
    import kafka.common.TopicAndPartition;
    import kafka.javaapi.FetchResponse;
    import kafka.javaapi.OffsetResponse;
    import kafka.javaapi.PartitionMetadata;
    import kafka.javaapi.TopicMetadata;
    import kafka.javaapi.TopicMetadataRequest;
    import kafka.javaapi.consumer.SimpleConsumer;
    import kafka.message.MessageAndOffset;

    public class SimpleExample {
    private List<String> m_replicaBrokers = new ArrayList<>();

    public SimpleExample() {
        m_replicaBrokers = new ArrayList<>();
    }
    
    public static void main(String args[]) {
        SimpleExample example = new SimpleExample();
        // 最大读取消息数量
        long maxReads = Long.parseLong("3");
        // 要订阅的topic
        String topic = "test1";
        // 要查找的分区
        int partition = Integer.parseInt("0");
        // broker节点的ip
        List<String> seeds = new ArrayList<>();
        seeds.add("192.168.9.102");
        seeds.add("192.168.9.103");
        seeds.add("192.168.9.104");
        // 端口
        int port = Integer.parseInt("9092");
        try {
            example.run(maxReads, topic, partition, seeds, port);
        } catch (Exception e) {
            System.out.println("Oops:" + e);
            e.printStackTrace();
        }
    }
    
    public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
        // 获取指定Topic partition的元数据
        PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
        if (metadata == null) {
            System.out.println("Can't find metadata for Topic and Partition. Exiting");
            return;
        }
        if (metadata.leader() == null) {
            System.out.println("Can't find Leader for Topic and Partition. Exiting");
            return;
        }
        String leadBroker = metadata.leader().host();
        String clientName = "Client_" + a_topic + "_" + a_partition;
    
        SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
        long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
        int numErrors = 0;
        while (a_maxReads > 0) {
            if (consumer == null) {
                consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
            }
            FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();
            FetchResponse fetchResponse = consumer.fetch(req);
    
            if (fetchResponse.hasError()) {
                numErrors++;
                // Something went wrong!
                short code = fetchResponse.errorCode(a_topic, a_partition);
                System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
                if (numErrors > 5)
                    break;
                if (code == ErrorMapping.OffsetOutOfRangeCode()) {
                    // We asked for an invalid offset. For simple case ask for
                    // the last element to reset
                    readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
                    continue;
                }
                consumer.close();
                consumer = null;
                leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
                continue;
            }
            numErrors = 0;
    
            long numRead = 0;
            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
                long currentOffset = messageAndOffset.offset();
                if (currentOffset < readOffset) {
                    System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
                    continue;
                }
                readOffset = messageAndOffset.nextOffset();
                ByteBuffer payload = messageAndOffset.message().payload();
    
                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
                numRead++;
                a_maxReads--;
            }
    
            if (numRead == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        if (consumer != null)
            consumer.close();
    }
    
    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);
    
        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }
    
    
    private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
        for (int i = 0; i < 3; i++) {
            boolean goToSleep = false;
            PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
            if (metadata == null) {
                goToSleep = true;
            } else if (metadata.leader() == null) {
                goToSleep = true;
            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
                // first time through if the leader hasn't changed give
                // ZooKeeper a second to recover
                // second time, assume the broker did recover before failover,
                // or it was a non-Broker issue
                //
                goToSleep = true;
            } else {
                return metadata.leader().host();
            }
            if (goToSleep) {
                    Thread.sleep(1000);
            }
        }
        System.out.println("Unable to find new leader after Broker failure. Exiting");
        throw new Exception("Unable to find new leader after Broker failure. Exiting");
    }
    
    private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
        PartitionMetadata returnMetaData = null;
        loop:
        for (String seed : a_seedBrokers) {
            SimpleConsumer consumer = null;
            try {
                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
                List<String> topics = Collections.singletonList(a_topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
    
                List<TopicMetadata> metaData = resp.topicsMetadata();
                for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        if (part.partitionId() == a_partition) {
                            returnMetaData = part;
                            break loop;
                        }
                    }
                }
            } catch (Exception e) {
                System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e);
            } finally {
                if (consumer != null)
                    consumer.close();
            }
        }
        if (returnMetaData != null) {
            m_replicaBrokers.clear();
            for (BrokerEndPoint replica : returnMetaData.replicas()) {
                m_replicaBrokers.add(replica.host());
            }
        }
        return returnMetaData;
    }
    

    }

    本教程由尚硅谷教育大数据研究院出品,如需转载请注明来源,欢迎大家关注尚硅谷公众号(atguigu)了解更多。

    相关文章

      网友评论

          本文标题:尚硅谷大数据技术之Kafka

          本文链接:https://www.haomeiwen.com/subject/uqzlhqtx.html