启动kafka服务的docker,如何构建kafka看上篇文档https://www.jianshu.com/p/daea2db9cceb
ljpMacBookPro:~ liangjiapeng$ docker run --rm -it \
> -p 2181:2181 -p 3030:3030 -p 8081:8081 \
> -p 8082:8082 -p 8083:8083 -p 9092:9092 \
> -e ADV_HOST=127.0.0.1 \
> landoop/fast-data-dev
Setting advertised host to 127.0.0.1.
Operating system RAM available is 3455 MiB, which is less than the lowest
recommended of 4096 MiB. Your system performance may be seriously impacted.
Starting services.
This is Landoop’s fast-data-dev. Kafka 1.1.1-L0 (Landoop's Kafka Distribution).
You may visit http://127.0.0.1:3030 in about a minute.
2018-12-09 15:16:01,639 INFO Included extra file "/etc/supervisord.d/01-zookeeper.conf" during parsing
2018-12-09 15:16:01,639 INFO Included extra file "/etc/supervisord.d/02-broker.conf" during parsing
2018-12-09 15:16:01,639 INFO Included extra file "/etc/supervisord.d/03-schema-registry.conf" during parsing
2018-12-09 15:16:01,640 INFO Included extra file "/etc/supervisord.d/04-rest-proxy.conf" during parsing
2018-12-09 15:16:01,640 INFO Included extra file "/etc/supervisord.d/05-connect-distributed.conf" during parsing
2018-12-09 15:16:01,640 INFO Included extra file "/etc/supervisord.d/06-caddy.conf" during parsing
2018-12-09 15:16:01,640 INFO Included extra file "/etc/supervisord.d/07-smoke-tests.conf" during parsing
2018-12-09 15:16:01,640 INFO Included extra file "/etc/supervisord.d/08-logs-to-kafka.conf" during parsing
2018-12-09 15:16:01,640 INFO Included extra file "/etc/supervisord.d/99-supervisord-sample-data.conf" during parsing
2018-12-09 15:16:01,640 INFO Set uid to user 0 succeeded
2018-12-09 15:16:01,658 INFO RPC interface 'supervisor' initialized
2018-12-09 15:16:01,658 CRIT Server 'unix_http_server' running without any HTTP authentication checking
2018-12-09 15:16:01,659 INFO supervisord started with pid 6
2018-12-09 15:16:02,664 INFO spawned: 'sample-data' with pid 164
2018-12-09 15:16:02,668 INFO spawned: 'zookeeper' with pid 165
2018-12-09 15:16:02,673 INFO spawned: 'caddy' with pid 166
2018-12-09 15:16:02,677 INFO spawned: 'broker' with pid 168
2018-12-09 15:16:02,686 INFO spawned: 'smoke-tests' with pid 169
2018-12-09 15:16:02,689 INFO spawned: 'connect-distributed' with pid 170
2018-12-09 15:16:02,693 INFO spawned: 'logs-to-kafka' with pid 171
2018-12-09 15:16:02,715 INFO spawned: 'schema-registry' with pid 177
2018-12-09 15:16:02,750 INFO spawned: 'rest-proxy' with pid 184
2018-12-09 15:16:03,767 INFO success: sample-data entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2018-12-09 15:16:03,767 INFO success: zookeeper entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2018-12-09 15:16:03,767 INFO success: caddy entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2018-12-09 15:16:03,768 INFO success: broker entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2018-12-09 15:16:03,768 INFO success: smoke-tests entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2018-12-09 15:16:03,769 INFO success: connect-distributed entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2018-12-09 15:16:03,769 INFO success: logs-to-kafka entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2018-12-09 15:16:03,770 INFO success: schema-registry entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2018-12-09 15:16:03,770 INFO success: rest-proxy entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2018-12-09 15:21:04,693 INFO exited: logs-to-kafka (exit status 0; expected)
2018-12-09 15:21:45,545 INFO exited: sample-data (exit status 0; expected)
2018-12-09 15:22:41,400 INFO exited: smoke-tests (exit status 2; expected)
再启动一个docker
ljpMacBookPro:~ liangjiapeng$ docker run --rm -it --net=host landoop/fast-data-dev bash
root@fast-data-dev / $
topic 部分
创建一个topic
root@fast-data-dev / $ kafka-topics --zookeeper 127.0.0.1:2181 --create --topic first_topic --partitions 3 --replication-factor 1
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "first_topic".
查看topic列表
root@fast-data-dev / $ kafka-topics --zookeeper 127.0.0.1:2181 --list
__consumer_offsets
_schemas
backblaze_smart
connect-configs
connect-offsets
connect-statuses
coyote-test-avro
coyote-test-binary
coyote-test-json
first_topic
logs_broker
nyc_yellow_taxi_trip_data
reddit_posts
sea_vessel_position_reports
telecom_italia_data
telecom_italia_grid
topic比较多,是我们用了fast-data-dev这个系统产生的,我们关注自己建立的那个topic即可。
查看topic的信息
root@fast-data-dev / $ kafka-topics --zookeeper 127.0.0.1:2181 --describe --topic first_topic
Topic:first_topic PartitionCount:3 ReplicationFactor:1 Configs:
Topic: first_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: first_topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: first_topic Partition: 2 Leader: 0 Replicas: 0 Isr: 0
删除topic
root@fast-data-dev / $ kafka-topics --zookeeper 127.0.0.1:2181 --delete --topic first_topic
Topic first_topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
初学者的疑惑?(建议读下这篇博文,读完就都明白了,别人写好了,直接引用链接吧https://www.cnblogs.com/likehua/p/3999538.html)
Q:为什么建立topic不知道kafka的端口,指定zookeeper的端口呢?
A:kafka的配置信息都在zookeeper上,所以建立topic需要指定zookeeper的端口
Q:partitions是啥?
A:partitions是topic的分区,一个topic有多个分区,可以同时进行数据的存取,当然也可以通过一些策略控制数据存储在具体的分区上。
Q:replication-factor是啥?
A:replication-factor是复制数,即这个topic的数据要在kafka的集群中存储多少份。
了解了topic创建部分,下面我们看下生产数据和消费数据部分
再建立一个topic
root@fast-data-dev / $ kafka-topics --zookeeper 127.0.0.1:2181 --create --topic my_topic --partitions 3 --replication-factor 1
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "my_topic".
首先是最基本的写入操作
root@fast-data-dev / $ kafka-console-producer --broker-list 127.0.0.1:9092 --topic my_topic
>111
我们再开一个终端就可以读取了
root@fast-data-dev / $ kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic my_topic --from-beginning
111
初学者的疑惑?
Q:kafka-console-producer是啥?
A:kafka-console是kafka的客户端工具,producer是kafka中用来做写入数据的模块。
Q:kafka-console-consumer是啥?
A:consumer是kafka中用来消费kafka数据的模块
Q:broker-list
A:kafka 的节点
Q:bootstrap-server
A:同样指向kafka的节点,10版本之前的kafka是指向zookeeper
Q:from-beginning
A:默认kafka不会读取历史的数据,会从末尾读,比如我们上面的这个例子,如果我们先开了一个生产者,写入了一条数据,再开一个消费者,而消费者不携带from-beginning属性的话,那么不会获取到他启动之前生产者创建的数据。
测试从指定topic的分区读
在生产者终端多写入几条数据
root@fast-data-dev / $ kafka-console-producer --broker-list 127.0.0.1:9092 --topic my_topic
>111
>222
>333
>444
>555
>666
读取分区2的数据
root@fast-data-dev / $ kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic my_topic --from-beginning --partition 2
111
444
我们看到只读取了2条数据,原因是默认情况下,数据会分散的存储在topic中的不同分区中,而且不同分区中的数据互相没有关联。
如何指定数据写入到一个分区呢?使用key
使用key写入一些数据(key可以是字母,这里演示就用了个1)
root@fast-data-dev / $ kafka-console-producer --broker-list 127.0.0.1:9092 --topic my_topic --property "parse.key=true" --property "key.separator=:"
>1:k1
>1:k2
>1:k3
>1:k4
>1:k5
看下结果,我们在分区0中看到了这些数据,具体在哪个分区,需要我们一个个看。
root@fast-data-dev / $ kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic my_topic --from-beginning --partition 0
333
666
k1
k2
k3
k4
k5
消费组
我们建立consumer消费数据,我们会发现如果开启了多个consumer同时消费一个topic,那么当生产者建立数据的时候,所有的消费者都可以接收到数据,同时如果一个新的消费者加进来携带了from-beginning参数,那么他会把所有的数据全部再消费一遍。
以上的场景,单机使用可能没有问题,但是如果是多设备使用的话,就会出现重复消费的情况,这个时候使用消费组可以规避这个问题。
建立了消费组后,消费者在消费数据的时候可以指定组参数,在多个消费组同时使用这个消费组的时候,只要任何一台设备消费了生产者的数据,另外的消费者将不再消费这条数据。有新加进来的设备指定了这个消费组的同样受此规则影响,
使用了消费组后,组对于topic的消费都会被记录,所以任意时刻加进来的主机都不会读取到已经被其他主机消费的数据。
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic my_topic --consumer-property group.id=my-group-1 --from-beginning
当然首次执行这条命令的时候会建立一个my-group-1的组,如果我们加了--from-beginning参数他会把数据从头消费一下,之后在运行这条命令的设备,--from-beginning参数就没有了实际意义,因为组已经建立了,也有了消费记录。
网友评论