美文网首页Kafka
Hadoop生态架构之kafka

Hadoop生态架构之kafka

作者: 勇于自信 | 来源:发表于2020-01-11 16:11 被阅读0次

    1.kafka基本理论知识

    1、定位:分布式的消息队列系统,同时提供数据分布式缓存功能(默认7天)
    2、消息持久化到磁盘,达到O(1)访问速度,预读和后写,对磁盘的顺序访问(比内存访问还要快)
    3、Storm(分布式的实时计算框架)
    Kafka目标成为队列平台
    4、基本组件:
    Broker:每一台机器是一个Broker
    Producer:日志消息生产者,主要写数据
    Consumer:日志消息消费者,主要读数据
    Topic:是虚拟概念,不同的consumer去指定的topic去读数据,不同producer可以往不同的topic去写
    Partition:是实际概念,文件夹,是在Topic的基础上做了进一步分层
    5、Partition功能:负载均衡,需要保证消息的顺序性
    顺序性的保证:订阅消息是从头往后读取的,写消息是尾部追加,所以整体消息是顺序的
    如果有多个partiton存在,可能会出现顺序不一致的情况,原因:每个Partition相互独立
    6、Topic:逻辑概念
    一个或多个Partition组成一个Topic
    7、Partition以文件夹的形式存在
    8、Partition有两部分组成:
    (1)index log:(存储索引信息,快速定位segment文件)
    (2)message log:(真实数据的所在)
    9、HDFS多副本的方式来完成数据高可用
    如果设置一个Topic,假设这个Topic有5个Partition,3个replication
    Kafka分配replication的算法:
    假设:
    将第i个Partition分配到(i % N)个Broker上
    将第i个Partition的第j个replication分配到( (i+j) % N)个Broker上
    虽然Partition里面有多个replication
    如果里面有M个replication,其中有一个是Leader,其他M-1个follower
    10、zookeeper包系统的可用性,zk中会保存一些meta信息(topic)
    11、物理上,不同的topic的消息肯定是分开存储的
    12、偏移量——offset:用来定位数据读取的位置
    13、kafka内部最基本的消息单位——message
    14、传输最大消息message的size不能超过1M,可以通过配置来修改
    15、Consumer Group
    16、传输效率:zero-copy
    0拷贝:减少Kernel和User模式上下文的切换
    直接把disk上的data传输给socket,而不是通过应用程序来传输
    17、Kafka的消息是无状态的,消费者必须自己维护已消费的状态信息(offset)
    减轻Kafka的实现难度
    18、Kafka内部有一个时间策略:SLA——消息保留策略(消息超过一定时间后,会自动删除)
    19、交付保证:
    at least once:至少一次(会有重复、但不丢失)
    at most once:最多发送一次(不重复、但可能丢失)
    exactly once:只有一次(最理想),目前不支持,只能靠客户端维护
    20、Kafka集群里面,topic内部由多个partition(包含多个replication),达到高可用的目的:
    日志副本:保证可靠性
    角色:主、从
    ISR:是一个集合,只有在集合中的follower,才有机会被选为leader
    如何让leader知道follower是否成功接收数据(心跳,ack)
    如果心跳正常,代表节点活着
    21、怎么算“活着”
    (1)心跳
    (2)如果follower能够紧随leader的更新,不至于被落的太远
    如果一旦挂掉,从ISR集合把该节点删除掉

    2.实践

    1.kafka基本操作命令

    前提:需要把zookeeper提前启动好
    一、单机版
    1、启动进程:
    ]# ./bin/kafka-server-start.sh config/server.properties
    2、查看topic列表:
    ]# ./bin/kafka-topics.sh --list --zookeeper localhost:2181
    3、创建topic:
    ]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic newyear_test
    4、查看topic描述:
    ]# ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic newyear_test
    5、producer发送数据:
    ]# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic newyear_test
    6、consumer接收数据:
    ]# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic newyear_test --from-beginning
    7、删除topic:
    ]# ./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic newyear_test
    二、集群版
    在slave1和slave2上的broker.id一定设置不同
    分别在slave1和slave2上开启进程:
    ./bin/kafka-server-start.sh config/server.properties

    创建topic:
    ]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic newyear_many_test

    2.自主写producer、consumer

    1、实现一个consumer group
    首先在不同的终端分别开启consumer,保证groupid一致
    ]# python consumer_kafka.py

    执行一次producer:
    ]# python producer_kafka.py

    2、指定partition发送数据
    ]# python producer_kafka_2.py

    3、指定partition读出数据
    ]# python consumer_kafka_2.py
    consumer_kafka.py:

    from kafka import KafkaConsumer
    def main():
        consumer = KafkaConsumer(b"newyear_many_test", group_id=b"my_group_id",
                                 bootstrap_servers=["master:9092", "slave1:9092", "slave2:9092"],
                auto_offset_reset='earliest')
                #auto_offset_reset='latest')
        for message in consumer:
            # This will wait and print messages as they become available
            print(message)
    
    
    if __name__ == "__main__":
        main()
    

    producer_kafka.py:

    import time
    
    from kafka import SimpleProducer, KafkaClient
    from kafka.common import LeaderNotAvailableError
    
    def print_response(response=None):
        if response:
            print('Error: {0}'.format(response[0].error))
            print('Offset: {0}'.format(response[0].offset))
    
    
    def main():
        kafka = KafkaClient("localhost:9092")
        producer = SimpleProducer(kafka)
    
        topic = b'newyear_many_test'
        msg = b'Hello World from Badou!'
    
        try:
            print_response(producer.send_messages(topic, msg))
        except LeaderNotAvailableError:
            time.sleep(1)
            print_response(producer.send_messages(topic, msg))
    
        kafka.close()
    
    if __name__ == "__main__":
        main()
    

    consumer_kafka_2.py:

    #encoding=utf8
    from kafka import KafkaConsumer
    from kafka import TopicPartition
    from kafka.structs import OffsetAndMetadata
    from kafka.structs import TopicPartition
    
    def main():
        consumer = KafkaConsumer('newyear_many_test', bootstrap_servers=['master:9092'])
    
        print consumer.partitions_for_topic("newyear_many_test")
        print consumer.topics()  #获取主题列表
        print consumer.subscription()  #获取当前消费者订阅的主题
        print consumer.assignment()  #获取当前消费者topic、分区信息
        print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量
    
        consumer.seek(TopicPartition(topic=u'newyear_many_test', partition=2), 5)  #重置偏移量
        for message in consumer:
            print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                message.offset, message.key,
                message.value))
    
    
    if __name__ == "__main__":
        main()
    

    producer_kafka_2.py:

    from kafka import KafkaProducer
    
    producer = KafkaProducer(bootstrap_servers='localhost:9092')
    
    # # block until all pending messages are sent
    # for _ in range(10):
    #     producer.send('test_m_brokers', b'are you ok!!!')
    #
    # producer.flush()
    
    
    # key for hashed partitioning
    producer.send('newyear_many_test', key=b'1', value=b'aaa')
    producer.flush()
    
    3.kafka打通flume

    1.新建./conf/kafka_test/flume_kafka.conf

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -f /home/dong/flume_test/1.log
    
    #a1.sources.r1.type = http
    #a1.sources.r1.host = master
    #a1.sources.r1.port = 52020
    
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
    a1.sources.r1.interceptors.i1.headerName = key
    a1.sources.r1.interceptors.i1.preserveExisting = false
    
    # Describe the sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.brokerList  = master:9092
    #a1.sinks.k1.topic = badou_flume_kafka_test
    #a1.sinks.k1.topic = badou_storm_kafka_test
    a1.sinks.k1.topic = topic_1013
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    2.启动flume:
    ]# flume-ng agent -c conf -f ./conf/kafka_test/flume_kafka.conf -n a1 -Dflume.root.logger=INFO,console
    启动成功,如下图:


    3.测试:
    1.1flume监控产生的数据:
    ]# for i in seq 1 100; do echo '====> '$i >> 1.log ; done
    1.2kafka消费数据:
    ]# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic_1013 --from-beginning
    消费结果如下图:


    至此,消费成功!
    补充,另外一种消费方式可以使用下面命令来产生测试数据:
    ]# curl -X POST -d '[{"headers":{"flume":"flume is very easy!"}, "body":"111"}]' http://master:52020

    相关文章

      网友评论

        本文标题:Hadoop生态架构之kafka

        本文链接:https://www.haomeiwen.com/subject/ecqbactx.html