美文网首页
docker kafka 入门实践

docker kafka 入门实践

作者: 10xjzheng | 来源:发表于2019-01-31 17:43 被阅读3次

    1 前置条件

    Linux-centos7 && docker && docker compose

    2. 镜像

    3. docker-compose.yml示例

    version: '2'
    services:
      zookeeper:
        image: wurstmeister/zookeeper
        ports:
          - "2181"
      kafka:
        image: wurstmeister/kafka
        ports:
          - "9092"
        environment:
          KAFKA_ADVERTISED_HOST_NAME: 172.18.55.63 # 此ip是我自己的内网ID
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
        volumes:
          - /var/run/docker.sock:/var/run/docker.sock
    

    备注:亦可参照https://hub.docker.com/r/wurstmeister/kafka/站点的信息

    3.启动

    在docker-compose.yml所在的目录执行以下命令:

    docker-compose up -d
    

    运行:

    docker ps -a
    

    查看docker的容器信息:

    CONTAINER ID        IMAGE                                     COMMAND                  CREATED             STATUS              PORTS                                                                        NAMES
    ffa65c7bce19        wurstmeister/kafka                        "start-kafka.sh"         6 days ago          Up 6 days           0.0.0.0:32768->9092/tcp                                                      kafka_kafka_1
    6d9cd607fc78        wurstmeister/zookeeper                    "/bin/sh -c '/usr/sb…"   6 days ago          Up 6 days           22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:32769->2181/tcp                          kafka_zookeeper_1
    
    

    4. 测试

    • 进入容器
    docker exec -it kafka_kafka_1 /bin/bash
    
    • 创建topic
    $KAFKA_HOME/bin/kafka-topics.sh --create --topic xiaojun --zookeeper kafka_zookeeper_1:2181 --replication-factor 1 --partitions 1
    
    • 查看topic信息
    $KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper kafka_zookeeper_1 --topic xiaojun
    

    输出:

    Topic:xiaojun   PartitionCount:1    ReplicationFactor:1 Configs:
        Topic: xiaojun  Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001
    
    • 发布消息[发布完按ctrl+C退出]
    $KAFKA_HOME/bin/kafka-console-producer.sh --topic=xiaojun --broker-list kafka_kafka_1:9092
    

    我发布了三条信息:

    bash-4.4# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --from-beginning --topic xiaojun
    Today I am very upset
    Tomorrow is another day
    Can you help me
    
    • 消费信息
      1 从头消费
    $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --from-beginning --topic xiaojun
    

    输出:

    bash-4.4# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --from-beginning --topic xiaojun
    Today I am very upset
    Tomorrow is another day
    Can you help me
    

    2 按offset消费

    $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --partition 0 --offset 2 --topic xiaojun
    

    输出:

    bash-4.4# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --partition 0 --offset 2 --topic xiaojun
    Can you help me
    
    • 删除topic
    $KAFKA_HOME/bin/kafka-topics.sh --delete --topic test  --zookeeper kafka_zookeeper_1:2181
    

    5. PHP实践

    1. Producer:
    <?php
    
    $rk = new RdKafka\Producer();
    $rk->setLogLevel(LOG_DEBUG);
    $rk->addBrokers("127.0.0.1");
    
    $topic = $rk->newTopic("test");
    
    for ($i = 0; $i < 10; $i++) {
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
        $rk->poll(0);
    }
    
    while ($rk->getOutQLen() > 0) {
        $rk->poll(50);
    }
    
    ?>
    
    2. Consumer:
    <?php
    
    $conf = new RdKafka\Conf();
    
    // Set a rebalance callback to log partition assignments (optional)
    $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
        switch ($err) {
            case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                echo "Assign: ";
                var_dump($partitions);
                $kafka->assign($partitions);
                break;
    
             case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                 echo "Revoke: ";
                 var_dump($partitions);
                 $kafka->assign(NULL);
                 break;
    
             default:
                throw new \Exception($err);
        }
    });
    
    // Configure the group.id. All consumer with the same group.id will consume
    // different partitions.
    $conf->set('group.id', 'myConsumerGroup');
    
    // Initial list of Kafka brokers
    $conf->set('metadata.broker.list', '127.0.0.1');
    
    $topicConf = new RdKafka\TopicConf();
    
    // Set where to start consuming messages when there is no initial offset in
    // offset store or the desired offset is out of range.
    // 'smallest': start from the beginning
    $topicConf->set('auto.offset.reset', 'smallest');
    
    // Set the configuration to use for subscribed/assigned topics
    $conf->setDefaultTopicConf($topicConf);
    
    $consumer = new RdKafka\KafkaConsumer($conf);
    
    // Subscribe to topic 'test'
    $consumer->subscribe(['test']);
    
    echo "Waiting for partition assignment... (make take some time when\n";
    echo "quickly re-joining the group after leaving it.)\n";
    
    while (true) {
        $message = $consumer->consume(120*1000);
        switch ($message->err) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                var_dump($message);
                break;
            case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                echo "No more messages; will wait for more\n";
                break;
            case RD_KAFKA_RESP_ERR__TIMED_OUT:
                echo "Timed out\n";
                break;
            default:
                throw new \Exception($message->errstr(), $message->err);
                break;
        }
    }
    
    ?>
    

    6 最后简要说一下kafka快的原理

    6.1 先说几个概念

    • producer:生产者
    • consumer:消费者
    • topic:主题,可以理解为队列的名称
    • broker:集群的一个实例,可以是指定的服务器IP+端口
    • group:消费者可以是一个群组

    6.2 原理

    参考:
    kafka效率
    为什么Kafka那么快
    总结:

    写入优化

    读写数据的效率瓶颈主要在于磁盘IO,所谓磁盘IO,需要经历机械磁盘的磁臂移动(寻道时间)和旋转延迟(寻址时间),如果是随机IO,那么这个时间耗费就更大了,但是呢,如果是顺序IO,则可以大大加快读写操作的效率。

    以上说的简单的优化产生了数量级的加速。 批处理导致更大的网络数据包,更大的顺序磁盘操作,连续的内存块等,这些都允许Kafka将随机消息写入的突发流转换成流向消费者的线性写入。

    当然,即便是顺序写入硬盘,硬盘的访问速度还是不可能追上内存。所以Kafka的数据并不是实时的写入硬盘,它充分利用了现代操作系统分页存储来利用内存提高I/O效率。

    Kafka还适用了Memory Mapped Files进行写入优化。

    Memory Mapped Files(后面简称mmap)也被翻译成内存映射文件,在64位操作系统中一般可以表示20G的数据文件,它的工作原理是直接利用操作系统的Page来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)。

    通过mmap,进程像读写硬盘一样读写内存(当然是虚拟机内存),也不必关心内存的大小有虚拟内存为我们兜底。
    使用这种方式可以获取很大的I/O提升,省去了用户空间到内核空间复制的开销(调用文件的read会把数据先放到内核空间的内存中,然后再复制到用户空间的内存中。)也有一个很明显的缺陷——不可靠,写到mmap中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用flush的时候才把数据真正的写到硬盘。Kafka提供了一个参数——producer.type来控制是不是主动flush,如果Kafka写入到mmap之后就立即flush然后再返回Producer叫同步(sync);写入mmap之后立即返回Producer不调用flush叫异步(async)。

    同时,Kafka采用由生产者,经纪人和消费者共享的标准化二进制消息格式(样数据块就可以在它们之间自由传输,无需转换)来避免无效率是字节复制。

    读取优化

    另外一个很重要的优化是:
    通常来说,数据从文件传输到socket的公共数据路径是:

      1. 操作系统将数据从磁盘读入到内核空间的页缓存
      1. 应用程序将数据从内核空间读入到用户空间缓存中
      1. 应用程序将数据写回到内核空间到socket缓存中
      1. 操作系统将数据从socket缓冲区复制到网卡缓冲区,以便将数据经网络发出

    这样做明显是低效的,这里有四次拷贝,两次系统调用。如果使用sendfile,再次拷贝可以被避免:允许操作系统将数据直接从页缓存发送到网络上。所以在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是需要的。

    以上就是所谓的sendfile和zero copy。

    相关文章

      网友评论

          本文标题:docker kafka 入门实践

          本文链接:https://www.haomeiwen.com/subject/lmxlsqtx.html