1 前置条件
Linux-centos7 && docker && docker compose
2. 镜像
3. docker-compose.yml示例
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 172.18.55.63 # 此ip是我自己的内网ID
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
备注:亦可参照https://hub.docker.com/r/wurstmeister/kafka/站点的信息
3.启动
在docker-compose.yml所在的目录执行以下命令:
docker-compose up -d
运行:
docker ps -a
查看docker的容器信息:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
ffa65c7bce19 wurstmeister/kafka "start-kafka.sh" 6 days ago Up 6 days 0.0.0.0:32768->9092/tcp kafka_kafka_1
6d9cd607fc78 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 6 days ago Up 6 days 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:32769->2181/tcp kafka_zookeeper_1
4. 测试
- 进入容器
docker exec -it kafka_kafka_1 /bin/bash
- 创建topic
$KAFKA_HOME/bin/kafka-topics.sh --create --topic xiaojun --zookeeper kafka_zookeeper_1:2181 --replication-factor 1 --partitions 1
- 查看topic信息
$KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper kafka_zookeeper_1 --topic xiaojun
输出:
Topic:xiaojun PartitionCount:1 ReplicationFactor:1 Configs:
Topic: xiaojun Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
- 发布消息[发布完按ctrl+C退出]
$KAFKA_HOME/bin/kafka-console-producer.sh --topic=xiaojun --broker-list kafka_kafka_1:9092
我发布了三条信息:
bash-4.4# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --from-beginning --topic xiaojun
Today I am very upset
Tomorrow is another day
Can you help me
- 消费信息
1 从头消费
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --from-beginning --topic xiaojun
输出:
bash-4.4# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --from-beginning --topic xiaojun
Today I am very upset
Tomorrow is another day
Can you help me
2 按offset消费
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --partition 0 --offset 2 --topic xiaojun
输出:
bash-4.4# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --partition 0 --offset 2 --topic xiaojun
Can you help me
- 删除topic
$KAFKA_HOME/bin/kafka-topics.sh --delete --topic test --zookeeper kafka_zookeeper_1:2181
5. PHP实践
- 使用扩展:php-rdkafka
- 文档:https://arnaud-lb.github.io/php-rdkafka/phpdoc/index.html
- docker php-kafka示例:https://github.com/10xjzheng/php-kafka
- PHP Manual Examples :
1. Producer:
<?php
$rk = new RdKafka\Producer();
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1");
$topic = $rk->newTopic("test");
for ($i = 0; $i < 10; $i++) {
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
$rk->poll(0);
}
while ($rk->getOutQLen() > 0) {
$rk->poll(50);
}
?>
2. Consumer:
<?php
$conf = new RdKafka\Conf();
// Set a rebalance callback to log partition assignments (optional)
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
echo "Assign: ";
var_dump($partitions);
$kafka->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
echo "Revoke: ";
var_dump($partitions);
$kafka->assign(NULL);
break;
default:
throw new \Exception($err);
}
});
// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set('group.id', 'myConsumerGroup');
// Initial list of Kafka brokers
$conf->set('metadata.broker.list', '127.0.0.1');
$topicConf = new RdKafka\TopicConf();
// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');
// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);
$consumer = new RdKafka\KafkaConsumer($conf);
// Subscribe to topic 'test'
$consumer->subscribe(['test']);
echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joining the group after leaving it.)\n";
while (true) {
$message = $consumer->consume(120*1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
?>
6 最后简要说一下kafka快的原理
6.1 先说几个概念
- producer:生产者
- consumer:消费者
- topic:主题,可以理解为队列的名称
- broker:集群的一个实例,可以是指定的服务器IP+端口
- group:消费者可以是一个群组
6.2 原理
参考:
kafka效率
为什么Kafka那么快
总结:
写入优化
读写数据的效率瓶颈主要在于磁盘IO,所谓磁盘IO,需要经历机械磁盘的磁臂移动(寻道时间)和旋转延迟(寻址时间),如果是随机IO,那么这个时间耗费就更大了,但是呢,如果是顺序IO,则可以大大加快读写操作的效率。
以上说的简单的优化产生了数量级的加速。 批处理导致更大的网络数据包,更大的顺序磁盘操作,连续的内存块等,这些都允许Kafka将随机消息写入的突发流转换成流向消费者的线性写入。
当然,即便是顺序写入硬盘,硬盘的访问速度还是不可能追上内存。所以Kafka的数据并不是实时的写入硬盘,它充分利用了现代操作系统分页存储来利用内存提高I/O效率。
Kafka还适用了Memory Mapped Files进行写入优化。
Memory Mapped Files(后面简称mmap)也被翻译成内存映射文件,在64位操作系统中一般可以表示20G的数据文件,它的工作原理是直接利用操作系统的Page来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)。
通过mmap,进程像读写硬盘一样读写内存(当然是虚拟机内存),也不必关心内存的大小有虚拟内存为我们兜底。
使用这种方式可以获取很大的I/O提升,省去了用户空间到内核空间复制的开销(调用文件的read会把数据先放到内核空间的内存中,然后再复制到用户空间的内存中。)也有一个很明显的缺陷——不可靠,写到mmap中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用flush的时候才把数据真正的写到硬盘。Kafka提供了一个参数——producer.type来控制是不是主动flush,如果Kafka写入到mmap之后就立即flush然后再返回Producer叫同步(sync);写入mmap之后立即返回Producer不调用flush叫异步(async)。
同时,Kafka采用由生产者,经纪人和消费者共享的标准化二进制消息格式(样数据块就可以在它们之间自由传输,无需转换)来避免无效率是字节复制。
读取优化
另外一个很重要的优化是:
通常来说,数据从文件传输到socket的公共数据路径是:
- 操作系统将数据从磁盘读入到内核空间的页缓存
- 应用程序将数据从内核空间读入到用户空间缓存中
- 应用程序将数据写回到内核空间到socket缓存中
- 操作系统将数据从socket缓冲区复制到网卡缓冲区,以便将数据经网络发出
这样做明显是低效的,这里有四次拷贝,两次系统调用。如果使用sendfile,再次拷贝可以被避免:允许操作系统将数据直接从页缓存发送到网络上。所以在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是需要的。
以上就是所谓的sendfile和zero copy。
网友评论