美文网首页
kafka小记

kafka小记

作者: 刘单纯 | 来源:发表于2020-03-11 15:04 被阅读0次

1.基本概念

  • broker
    kafka由一台或多台机器组成,每一台机器都是一个broker

  • topic
    每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

  • partition
    Parition是物理上的概念,每个Topic包含一个或多个Partition.

  • Segment
    partition物理上由多个segment组成

  • offset
    每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息.

  • producer
    负责发布消息到Kafka broker

  • consumer
    消息消费者,向Kafka broker读取消息的客户端。

  • Consumer Group
    每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)

2. kafka拓扑结构

3.小结

  1. 每个topic对应一个或多个partition,每个partion都是一个单独的文件夹
  2. 消费者消费完消息之后并不会真的从物理上删除这条数据,这条数据依旧会被保留,删除的时间根据配置文件决定
  3. 使用Consumer high level API时,同一Topic的一条消息只能被同一个Consumer Group内的一个Consumer消费,但多个Consumer Group可同时消费这一消息

4.常用命令

# kafka集群启动
nohup /home/kafka/bin/kafka-server-start.sh -daemon /home/kafka/config/server.properties 1>/export/logs/kafka/stdout.log 2>/export/logs/kafka/stderr.log &

# 关闭kafka集群
sudo ./kafka-server-stop.sh

# 创建topic
bin/kafka-topics.sh --create --zookeeper emr-header-1:2181 --replication-factor 3 --partitions 3 --topic test

# 查看topic信息
./kafka-topics.sh --zookeeper emr-header-1:2181 --topic test --describe

# 获取topic的最大位移
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list emr-header-1:9092 --topic test2 --time -1

# 生产者
./kafka-console-producer.sh --broker-list emr-header-1:9092 --topic test

# 消费者
./kafka-console-consumer.sh --zookeeper emr-header-1:2181 --topic test --from-beginning


# 删除topic
# 法一
./bin/kafka-topics.sh  --delete --zookeeper emr-header-1:2181  --topic test

# 法二
# 登录zookeeper客户端
/usr/lib/zookeeper-current/bin/zkCli.sh
# 找到topic所在的目录
ls /brokers/topics/
ls /admin/delete_topics/
# 删除topic
rmr /brokers/topics/名称
rmr /admin/delete_topics/名称
删除log存储位置对应的partition

5.python操作Kafka

生产者

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import json
from kafka import KafkaProducer

def test_producer():
    producer = KafkaProducer(bootstrap_servers='localhost:9092',
                             value_serializer=lambda v: json.dumps(v).encode('utf-8'), acks='all', api_version=(0, 10))
    for i in range(10):
        producer.send('test2', str(i), partition=i % 3)
        producer.flush()

    producer.close()
    print 'ok'


if __name__ == '__main__':
    test_producer()

消费者

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from kafka import KafkaConsumer

def test_consumer():
    consumer = KafkaConsumer('test2', bootstrap_servers=['localhost:9092'], group_id='test_group', api_version=(0, 10))
    while True:
        for message in consumer:
            print message.value

        import time
        time.sleep(1)


if __name__ == '__main__':
    test_consumer()

6.优秀的文章

【美团】Kafka文件存储机制那些事

【infoq】Kafka背景及架构介绍

相关文章

  • kafka小记

    1.基本概念 brokerkafka由一台或多台机器组成,每一台机器都是一个broker topic每条发布到Ka...

  • kafka小记

    参考 https://www.cnblogs.com/likehua/p/3999538.html学习记录 Kaf...

  • Kafka详细的设计和生态系统

    Kafka详细的设计和生态系统 Kafka生态系统 - Kafka核心,Kafka流,Kafka连接,Kafka ...

  • kafka全面认知

    什么是Kafka[#---kafka] Kafka的应用场景[#kafka-----] Kafka的架构[#kaf...

  • Kafka & NSQ

    Kafka & NSQ Kafka kafka struct kafka & consumer group 2ka...

  • kafka学习系列

    Kafka学习总结(一)——Kafka简介 Kafka学习总结(二)——Kafka设计原理 Kafka学习总结(三...

  • Kafka Producer源码

    Kafka Producer Kafka Producer 是 kafka 提供的与 Kafka Broker 连...

  • Kafka 详解一 简介

    目录 Kafka 是什么 Kafka 核心组 Kafka 整体架构以及解析 Kafka数据处理步骤 Kafka名词...

  • kafka详解

    目录 Kafka 是什么 Kafka 核心组 Kafka 整体架构以及解析 Kafka数据处理步骤 Kafka名词...

  • kafka配置KAFKA_LISTENERS和KAFKA_ADV

    kafka配置KAFKA_LISTENERS和KAFKA_ADVERTISED_LISTENERS 介绍kafka...

网友评论

      本文标题:kafka小记

      本文链接:https://www.haomeiwen.com/subject/jyfuiftx.html