美文网首页
kafka小记

kafka小记

作者: 刘单纯 | 来源:发表于2020-03-11 15:04 被阅读0次

    1.基本概念

    • broker
      kafka由一台或多台机器组成,每一台机器都是一个broker

    • topic
      每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

    • partition
      Parition是物理上的概念,每个Topic包含一个或多个Partition.

    • Segment
      partition物理上由多个segment组成

    • offset
      每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息.

    • producer
      负责发布消息到Kafka broker

    • consumer
      消息消费者,向Kafka broker读取消息的客户端。

    • Consumer Group
      每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)

    2. kafka拓扑结构

    3.小结

    1. 每个topic对应一个或多个partition,每个partion都是一个单独的文件夹
    2. 消费者消费完消息之后并不会真的从物理上删除这条数据,这条数据依旧会被保留,删除的时间根据配置文件决定
    3. 使用Consumer high level API时,同一Topic的一条消息只能被同一个Consumer Group内的一个Consumer消费,但多个Consumer Group可同时消费这一消息

    4.常用命令

    # kafka集群启动
    nohup /home/kafka/bin/kafka-server-start.sh -daemon /home/kafka/config/server.properties 1>/export/logs/kafka/stdout.log 2>/export/logs/kafka/stderr.log &
    
    # 关闭kafka集群
    sudo ./kafka-server-stop.sh
    
    # 创建topic
    bin/kafka-topics.sh --create --zookeeper emr-header-1:2181 --replication-factor 3 --partitions 3 --topic test
    
    # 查看topic信息
    ./kafka-topics.sh --zookeeper emr-header-1:2181 --topic test --describe
    
    # 获取topic的最大位移
    ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list emr-header-1:9092 --topic test2 --time -1
    
    # 生产者
    ./kafka-console-producer.sh --broker-list emr-header-1:9092 --topic test
    
    # 消费者
    ./kafka-console-consumer.sh --zookeeper emr-header-1:2181 --topic test --from-beginning
    
    
    # 删除topic
    # 法一
    ./bin/kafka-topics.sh  --delete --zookeeper emr-header-1:2181  --topic test
    
    # 法二
    # 登录zookeeper客户端
    /usr/lib/zookeeper-current/bin/zkCli.sh
    # 找到topic所在的目录
    ls /brokers/topics/
    ls /admin/delete_topics/
    # 删除topic
    rmr /brokers/topics/名称
    rmr /admin/delete_topics/名称
    删除log存储位置对应的partition
    

    5.python操作Kafka

    生产者

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    import json
    from kafka import KafkaProducer
    
    def test_producer():
        producer = KafkaProducer(bootstrap_servers='localhost:9092',
                                 value_serializer=lambda v: json.dumps(v).encode('utf-8'), acks='all', api_version=(0, 10))
        for i in range(10):
            producer.send('test2', str(i), partition=i % 3)
            producer.flush()
    
        producer.close()
        print 'ok'
    
    
    if __name__ == '__main__':
        test_producer()
    
    

    消费者

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    from kafka import KafkaConsumer
    
    def test_consumer():
        consumer = KafkaConsumer('test2', bootstrap_servers=['localhost:9092'], group_id='test_group', api_version=(0, 10))
        while True:
            for message in consumer:
                print message.value
    
            import time
            time.sleep(1)
    
    
    if __name__ == '__main__':
        test_consumer()
    

    6.优秀的文章

    【美团】Kafka文件存储机制那些事

    【infoq】Kafka背景及架构介绍

    相关文章

      网友评论

          本文标题:kafka小记

          本文链接:https://www.haomeiwen.com/subject/jyfuiftx.html