美文网首页
kafka学习二

kafka学习二

作者: 穿山甲123 | 来源:发表于2016-10-20 16:08 被阅读0次

上一期介绍了kafka的基本信息和简单使用。这一期介绍kafka的详细信息。

Kafka的Producer

1、消息和数据的生产者,向 Kafka 的 topic 发布消息。
2、Producer将消息发布到指定的Topic中,Producer可以指定将此消息归属于哪个partition,如果不指定,kafka会基于"round-robin"的方式,将消息存放到partition中去.
3、异步发送,批量发送可以很有效的提高发送效率。Kafka producer的异步发送模式允许进行批量发送,先将消息缓存在内存中,然后一次请求批量发送出去。

Kafka的Consumer

1、消息和数据的消费者,订阅 topics 并处理其订阅的消息。
2、每个consumer属于一个consumer group,每个group中可以有多个consumer,发送到Topic的消息,只会被订阅此Topic的每个group中的一个consumer消费。
3、在 kafka中,我们可以认为一个group是一个"订阅"者,一个Topic中的每个partition只会被一个"订阅者"中的一个consumer消费,不过一个 consumer可以消费多个partitions中的消息。
4、consumer group(包含多个consumer)对一个topic进行消费,不同的consumer group之间独立订阅。

Kafka的Broker

1、Broker:缓存代理,Kafka 集群中的一台或多台服务器统称为 broker。
2、为了减少磁盘写入的次数,broker会将消息暂时buffer起来(segment),当消息的个数达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数。
3、Broker不保存订阅者的状态,由订阅者自己保存(低版本kafka由zookeeper保存,新版本中broker保存订阅者的状态)。
4、kafka采用基于时间的SLA(服务水平保证),消息保存一定时间(通常为7天)后会被删除。
5、消息订阅者可以rewind back到任意位置重新进行消费,当订阅者故障时,可以选择最小的offset(id)进行重新读取消费消息。

Kafka的Message

1、Message消息:是通信的基本单位,每个 producer 可以向一个 topic(主题)发布一些消息。
2、Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Message。

Kafka的Partitions

1、kafka基于文件存储.通过分区,可以将日志内容分散到多个server上,来避免文件尺寸达到单机磁盘的上限,每个partiton都会被当前server(kafka实例)保存。
2、可以将一个topic切分多任意多个partitions,来消息保存/消费的效率。
3、越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力。

Kafka的持久化

1、kafka采用日志形式来持久化,添加数据时,直接追加到日志文件上面。
2、一个Topic可以认为是一类消息,每个topic将被分成多partition(区),每个partition在存储层面是append log文件。
3、Logs文件根据broker中的配置要求,保留一定时间后删除来释放磁盘空间。
4、为数据文件建索引:稀疏存储,每隔一定字节的数据建立一条索引。

索引示意图

Kafka的分布式架构


Kafka的分布式架构,producer、broker(kafka)和consumer都可以有多个。Producer,consumer实现Kafka注册的接口,数据从producer发送到broker,broker承担一个中间缓存和分发的作用。broker分发注册到系统中的consumer。broker的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。

kafka-manager集群管理器

kafka-manager是yahoo的开源项目,通过Kafka Manager用户能够更容易地发现集群中哪些主题或者分区分布不均匀,同时能够管理多个集群,能够更容易地检查集群的状态,能够创建主题,执行首选的副本选择,能够基于集群当前的状态生成分区分配,并基于生成的分配执行分区的重分配,此外,Kafka Manager还是一个非常好的可以快速查看集群状态的工具。项目地址:https://github.com/yahoo/kafka-manager

Kafka集群配置

KafkaUtil文件:

package com.lqq.demo2;

import java.util.Properties;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;

public class KafkaUtil {
    public final static String topic = "Kaffka_demo2";
    public final static String bootstrap_servers = "localhost:9092,localhost:9093,localhost:9094";
    
    public final static String group_id = "group_demo2";
    public final static String key_serializer = "org.apache.kafka.common.serialization.StringSerializer";
    public final static String value_serializer = "org.apache.kafka.common.serialization.StringSerializer";
    public final static String key_deserializer = "org.apache.kafka.common.serialization.StringDeserializer";
    public final static String value_deserializer = "org.apache.kafka.common.serialization.StringDeserializer";
    
    private static KafkaProducer<String, String> kp;
    private static KafkaConsumer<String, String> kc;

    public static KafkaProducer<String, String> getProducer() {
        if (kp == null) {
            Properties props = new Properties();
            props.put("bootstrap.servers", bootstrap_servers);
            props.put("acks", "all");
            props.put("client.id", "demo2_producer");
            props.put("retries", 1);
            props.put("batch.size", 16384);
            props.put("key.serializer", key_serializer);
            props.put("value.serializer", value_serializer);
            kp = new KafkaProducer<String, String>(props);
        }
        return kp;
    }

    public static KafkaConsumer<String, String> getConsumer() {
        if (kc == null) {
            Properties props = new Properties();

            props.put("bootstrap.servers", bootstrap_servers);
            props.put("group.id", group_id);
            props.put("client.id", "demo2_consumer");
            props.put("heartbeat.interval.ms", "200");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", key_deserializer);
            props.put("value.deserializer", value_deserializer);
            kc = new KafkaConsumer<String, String>(props);
        }

        return kc;
    }
}

生产者类:

package com.lqq.demo2;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KProducer{
    private static AtomicInteger msgNo = new AtomicInteger(1);
    
    
    public static class ProducerThread implements Runnable{
        Producer<String, String> producer=KafkaUtil.getProducer();
        String topic;
        public ProducerThread(String topic){
            this.topic=topic;
        }
        @Override
        public void run() {
            while (true) {
                int no=msgNo.getAndIncrement();
                String data = new String("hello kafka message " + no);
                String key = String.valueOf(no);
                ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, data);
                producer.send(record, new SendCallback());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        ExecutorService producerPool = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 3; i++) {
            producerPool.execute(new ProducerThread(KafkaUtil.topic));
        }
        producerPool.shutdown();

    }
}

KConsumer类:

package com.lqq.demo2;

import java.util.Arrays;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;


public class KConsumer{
    private final String topic;
    private final Consumer<String, String> consumer=KafkaUtil.getConsumer();
    
    public KConsumer(String topic) {
        this.topic = topic;
    }
    
    public void consume() {
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Consumer  record  offset=" + record.offset() + "  key=" + record.key() + " value="
                        + record.value());
            }
        }
    }
    public void close(){
        consumer.close();
    }
    public static void main(String[] args) {
        KConsumer consumer=new KConsumer(KafkaUtil.topic);
        consumer.consume();
        consumer.close();
    }
}

相关文章

  • kafka学习系列

    Kafka学习总结(一)——Kafka简介 Kafka学习总结(二)——Kafka设计原理 Kafka学习总结(三...

  • kafka学习二

    上一期介绍了kafka的基本信息和简单使用。这一期介绍kafka的详细信息。 Kafka的Producer 1、消...

  • Kafka相关文章索引(2)

    基本常识 kafka主要配置 Kafka配置说明 Kafka学习整理四(Producer配置) Kafka学习整理...

  • Kafka学习笔记(二) :初探Kafka

    看完上一篇,相信大家对消息系统以及Kafka的整体构成都有了初步了解,学习一个东西最好的办法,就是去使用它,今天就...

  • Kafka学习(二)-------- 什么是Kafka

    通过Kafka的快速入门 https://www.cnblogs.com/tree1123/p/11150927....

  • Kafka学习笔记(二)

    Kafka环境搭建 准备工作 Kafka集群是把状态保存在Zookeeper中的,首先要搭建Zookeeper集群...

  • kafka学习笔记(二)

    一、基本概念 kafka中每条记录包含一个key,一个value和一个时间戳。 Topics 每个Topic,ka...

  • kafka集群的搭建

    一、背景 最近在学习 kafka,此处记录一下 mac上 搭建 kafka集群的步骤。 二、安装软件 由于 kaf...

  • Kafka-interview-questions

    一 Kafka架构 Kafka架构图示 二 Kafka压测   Kafka官方自带压力测试脚本(kafka-con...

  • python3读写kafka

    消费kafka数据,方式一 消费kafka数据,方式二 将消息写入kafka

网友评论

      本文标题:kafka学习二

      本文链接:https://www.haomeiwen.com/subject/cpqyyttx.html