美文网首页
kafka札記

kafka札記

作者: 文子轩 | 来源:发表于2018-07-31 00:09 被阅读27次

一.基本概念

  • Broker
    Kafka集群包含一个或多个服务器,这种服务器被称为broker
  • Topic
    每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)
  • Partition
    parition是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件
  • Porducer
    负责发布消息到Kafka broker
  • Consumer
    消费消息。每个consumer属于一个特定的consumer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。使用consumer high level API时,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。

二.Kafka架构及组件原理

image.png

如上图所示,一个典型的kafka集群中包含若干producer,若干broker,若干consumer group,以及一个Zookeeper)集群。producer使用push模式将消息发布到broker,consumer使用pull模式从broker订阅并消费消息。

  • Push && Pull

    Kafka遵循了传统的方式,选择由producer向broker push消息并由consumer从broker pull消息。
    push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,

  • Topic && Partition

      image.png

Topic在逻辑上可以被认为是一个queue。每条消费都必须指定它的topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以水平扩展,物理上把topic分成一个或多个partition,每个partition在物理上对应一个文件夹,该文件夹下存储这个partition的所有消息和索引文件。


image.png
  • 自定义Partition
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class HashPartitioner implements Partitioner {

public HashPartitioner(VerifiableProperties verifiableProperties) {}

@Override
public int partition(Object key, int numPartitions) {
  if ((key instanceof Integer)) {
      return Math.abs(Integer.parseInt(key.toString())) % numPartitions;
  }
return Math.abs(key.hashCode() % numPartitions);
  }
}

定义Partition分区,利用哈希算法把key相同的数据分到同一个分区中

三.Kafka的HA

  • 背景

在没有Replication的情况下,一旦某机器宕机或者某个Broker停止工作则会造成整个系统的可用性降低。随着集群规模的增加

  • Kafka HA设计解析

Kafka尽量将所有的Partition均匀分配到整个集群上。一个典型的部署方式是一个Topic的Partition数量大于Broker的数量。也需要将同一个Partition的Replica尽量分散到不同的机器。实际上,如果所有的Replica都在同一个Broker上,那一旦该Broker宕机,该Partition的所有Replica都无法工作,也就达不到HA的效果。同时,如果某个Broker宕机了,需要保证它上面的负载可以被均匀的分配到其它幸存的所有Broker上。
  Kafka分配Replica的算法如下:

将所有Broker(假设共n个Broker)和待分配的Partition排序
将第i个Partition分配到第(i mod n)个Broker上
将第i个Partition的第j个Replica分配到第((i + j) mod n)个Broker

  • Propagate消息
image.png
  • 选举Leader

Kafka在所有broker中选出一个controller,所有Partition的Leader选举都由controller决定。controller会将Leader的改变直接通过RPC的方式(比Zookeeper Queue的方式更高效)通知需为此作出响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。

四.Kafka的Consumer

  • Consumer Group

Consumer将从某个Partition读取的最后一条消息的offset存于Zookeeper中开始同时支持将offset存于Zookeeper,这个offset基于客户程序提供给Kafka的名字来保存,这个名字被称为Consumer Group。Consumer Group是整个Kafka集群全局的,每一个 Consumer实例都属于一个Consumer Group,若不指定则属于默认的Group

image.png

1.消息被消费后,并不会被删除,知识相应的offset加一

  1. 对于每条消息,在同一个Consumer Group里只会被一个Consumer消费

3.不通Consumer Group可消费同一条消息

五.KFKKA操作

step1:下載Kafka

tar -xzf kafka_2.9.2-0.8.1.1.tgz
cd kafka_2.9.2-0.8.1.1

step2:啓動服務

Kafka用到了Zookeeper,所有首先启动Zookper,下面简单的启用一个单实例的Zookkeeper服务。可以在命令的结尾加个&符号,这样就可以启动后离开控制台。

bin/zookeeper-server-start.sh config/zookeeper.properties &

step3:創建topic

创建一个叫做“test”的topic,它只有一个分区,一个副本。

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看topic
bin/kafka-topics.sh --list --zookeeper localhost:2181

step4:發送消息

Kafka 使用一个简单的命令行producer,从文件中或者从标准输入中读取消息并发送到服务端。默认的每条命令将发送一条消息。

运行producer并在控制台中输一些消息,这些消息将被发送到服务端:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

step5啓動consumer

Kafka also has a command line consumer that will dump out messages to standard output.
Kafka也有一个命令行consumer可以读取消息并输出到标准输出:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

step6:搭建一個或者多個broker集羣

刚才只是启动了单个broker,现在启动有3个broker组成的集群,这些broker节点也都是在本机上的:
首先为每个节点编写配置文件:

cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

在拷貝出的新文件中添加以下參數

config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1

config/server-2.properties:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2

把這兩個boker啓動

bin/kafka-server-start.sh config/server-1.properties &
...
bin/kafka-server-start.sh config/server-2.properties &

创建一个拥有3个副本的topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

现在我们搭建了一个集群,怎么知道每个节点的信息呢?运行“"describe topics”命令就可以了:
顯示分區和Topic情況

三.基於spark搭建的kakfa開發環境

  • 版本信息
    Spark Streaming 2.2.1与 Kafka 0.8.2.1或更高版本兼容。

  • 操作sparkstreming
    分别在Master、Worker1、Worker2节点启动Kafka集群。

root@master:~# nohup/usr/local/kafka_2.11-0.8.2.1/bin/kafka-server-start.sh /usr/local/kafka_2.11-

0.8.2.1/config/server.properties &

[1] 3359



root@worker1:~# nohup/usr/local/kafka_2.11-0.8.2.1/bin/kafka-server-start.sh/usr/local/kafka_2.11-0.8.2.1/config/server.properties &

[2] 2861



root@worker2:~# nohup/usr/local/kafka_2.11-0.8.2.1/bin/kafka-server-start.sh/usr/local/kafka_2.11-0.8.2.1/config/server.properties &

[1] 2820



使用Jps命令查看。

root@master:~# jps

3280 QuorumPeerMain

3412 Jps

3359 Kafka

root@worker1:~# jps

2861 Kafka
 
2910 Jps

2799 QuorumPeerMain

root@worker2:~# jps

2757 QuorumPeerMain

2853 Jps
  • 創建topic

root@master:/usr/local/kafka_2.11-0.8.2.1/bin#kafka-topics.sh --create --zookeeper

192.168.189.1:2181,192.168.189.2:2181,192.168.189.3:2181 --replication-factor 2 --partitions 4 --topickafka_test

查看topic

root@master:/usr/local/kafka_2.11-0.8.2.1/bin#kafka-topics.sh --list --zookeeper

  • 操作sparksteaming
/usr/local/spark-2.2.1-bin-hadoop2.6/bin/spark-submit  --master spark://192.168.189.1:7077 \

               --deploy-mode client \

               --driver-memory 1g \

                --driver-cores 1 \

               --total-executor-cores 3 \

               --executor-memory 1g \

               --jars /usr/local/kafka_2.11-0.8.2.1/libs/kafka-clients-0.8.2.1.jar \

              --class org.apache.spark.examples.streaming.KafkaWordCountProducer \
               /usr/local/streaming-examples-test/spark-examples_2.11-2.2.1.jar192.168.189.1:9092,192.168.189.2:9092,192.168.189.3:9092  \
                 kafka_test 20 10

在脚本start-producer.sh中需加上kafka-clients-0.8.2.1.jar的Jar包,否则会提示以下异常,找不到类KafkaProducer。

  • 启动start-producer.sh脚本,生产者向Kafka集群发送消息
root@master:/usr/local/streaming-examples-test#start-producer.sh

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in[jar:file:/usr/local/alluxio-1.7.0-hadoop-2.6/client/alluxio-1.7.0-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/usr/local/spark-2.2.1-bin-hadoop2.6/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Seehttp://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type[org.slf4j.impl.Log4jLoggerFactory]

INFO producer.ProducerConfig:ProducerConfig values:

   compression.type = none

   metric.reporters = []

   metadata.max.age.ms = 300000

   metadata.fetch.timeout.ms = 60000

    acks = 1

    batch.size= 16384

   reconnect.backoff.ms = 10

   bootstrap.servers = [192.168.189.1:9092, 192.168.189.2:9092,192.168.189.3:9092]

   receive.buffer.bytes = 32768

   retry.backoff.ms = 100

   buffer.memory = 33554432

    timeout.ms = 30000

   key.serializer = classorg.apache.kafka.common.serialization.StringSerializer

    retries = 0

   max.request.size = 1048576

   block.on.buffer.full = true

   value.serializer = class org.apache.kafka.common.serialization.StringSerializer

   metrics.sample.window.ms = 30000

   send.buffer.bytes = 131072

   max.in.flight.requests.per.connection = 5

   metrics.num.samples = 2

    linger.ms =0

    client.id =

赋予start-consumer.sh脚本执行权限。

  root@master:/usr/local/streaming-examples-test#chmod u+x start-consumer.sh

该脚本对应的KafkaWordCountProducer类的使用方法:

“Usage: KafkaWordCount<zkQuorum> <group> <topics> <numThreads>”

INFO scheduler.DAGScheduler:ResultStage 440 (print at

KafkaWordCount.scala:61) finished in 0.046 s
 
INFO scheduler.DAGScheduler: Job 117finished: print at KafkaWordCount.scala:61, took 0.067648 s

-------------------------------------------

Time: 1519454092000 ms

-------------------------------------------

(4,1352)

(8,1327)

(6,1461)

(0,1451)

(2,1493)

(7,1365)

(5,1405)

(9,1398)

(3,1428)

(1,1520)

INFO scheduler.JobScheduler: Finishedjob streaming job 1519454092000 ms.0 from job set of time 1519454092000 ms

NFOscheduler.JobScheduler: Total delay: 7.238 s for time 1519454092000 ms(execution: 0.290 s)

相关文章

  • kafka札記

    一.基本概念 BrokerKafka集群包含一个或多个服务器,这种服务器被称为broker Topic每条发布到K...

  • 札記

    没有耐心,脾气不稳定,其实是一般人之通病。所以做人必将训练 自我定力功夫放于第一位。人都是一般人,自己也不...

  • 札記

    清心自省 勤行思考 正揖事业 敬爱和諧 勤于实践 勤于力行 —— 人生不易, 而人易执迷, ...

  • 札記

    方法正确,所获良多。不正确,則不得,或有害。凡事皆有一定方法,不得方法,不知要点,焉能有所获兮—— ...

  • 札記

    乾乾自强 自証健康 天之行健 人以品香

  • 札記打卡——讀通札記(1866)

    讀文白對照本《讀通鑒論》之《卷15》,感:1、札記打卡。通覽歷史,俯瞰古今,萬般皆下品 ,惟有讀書高。竊以為非。讀...

  • 札記二

    矛盾乃宇宙现象.人生若象处于机关.熙熙攘攘.苦辣酸甜.错落离合. 人生如斯烦恼.如此疲劳.似...

  • 札記三

    平常心.普通生活.工作好容乐趣其Φ. 吾乃无信仰者.乃信真理故.真理广博.化不以一端.故无以...

  • 札記一

    三式一止定.不动如如.总是明足成就于心知. 抛却天机.唯論真心.自古人生浩荡.干戈习...

  • 札記九

    踵人权.乐国安.立于无产.而有无限.踵炁质.行大道.立于先天而无极.斯人理想境界不过如此. ...

网友评论

      本文标题:kafka札記

      本文链接:https://www.haomeiwen.com/subject/hgoqvftx.html