美文网首页
KAKFA 常见操作小白使用

KAKFA 常见操作小白使用

作者: 鸟它鸟 | 来源:发表于2018-12-10 01:03 被阅读0次

    启动kafka服务的docker,如何构建kafka看上篇文档https://www.jianshu.com/p/daea2db9cceb

    ljpMacBookPro:~ liangjiapeng$ docker run --rm -it \
    > -p 2181:2181 -p 3030:3030 -p 8081:8081 \
    > -p 8082:8082 -p 8083:8083 -p 9092:9092 \
    > -e ADV_HOST=127.0.0.1 \
    > landoop/fast-data-dev
    Setting advertised host to 127.0.0.1.
    Operating system RAM available is 3455 MiB, which is less than the lowest
    recommended of 4096 MiB. Your system performance may be seriously impacted.
    Starting services.
    This is Landoop’s fast-data-dev. Kafka 1.1.1-L0 (Landoop's Kafka Distribution).
    You may visit http://127.0.0.1:3030 in about a minute.
    2018-12-09 15:16:01,639 INFO Included extra file "/etc/supervisord.d/01-zookeeper.conf" during parsing
    2018-12-09 15:16:01,639 INFO Included extra file "/etc/supervisord.d/02-broker.conf" during parsing
    2018-12-09 15:16:01,639 INFO Included extra file "/etc/supervisord.d/03-schema-registry.conf" during parsing
    2018-12-09 15:16:01,640 INFO Included extra file "/etc/supervisord.d/04-rest-proxy.conf" during parsing
    2018-12-09 15:16:01,640 INFO Included extra file "/etc/supervisord.d/05-connect-distributed.conf" during parsing
    2018-12-09 15:16:01,640 INFO Included extra file "/etc/supervisord.d/06-caddy.conf" during parsing
    2018-12-09 15:16:01,640 INFO Included extra file "/etc/supervisord.d/07-smoke-tests.conf" during parsing
    2018-12-09 15:16:01,640 INFO Included extra file "/etc/supervisord.d/08-logs-to-kafka.conf" during parsing
    2018-12-09 15:16:01,640 INFO Included extra file "/etc/supervisord.d/99-supervisord-sample-data.conf" during parsing
    2018-12-09 15:16:01,640 INFO Set uid to user 0 succeeded
    2018-12-09 15:16:01,658 INFO RPC interface 'supervisor' initialized
    2018-12-09 15:16:01,658 CRIT Server 'unix_http_server' running without any HTTP authentication checking
    2018-12-09 15:16:01,659 INFO supervisord started with pid 6
    2018-12-09 15:16:02,664 INFO spawned: 'sample-data' with pid 164
    2018-12-09 15:16:02,668 INFO spawned: 'zookeeper' with pid 165
    2018-12-09 15:16:02,673 INFO spawned: 'caddy' with pid 166
    2018-12-09 15:16:02,677 INFO spawned: 'broker' with pid 168
    2018-12-09 15:16:02,686 INFO spawned: 'smoke-tests' with pid 169
    2018-12-09 15:16:02,689 INFO spawned: 'connect-distributed' with pid 170
    2018-12-09 15:16:02,693 INFO spawned: 'logs-to-kafka' with pid 171
    2018-12-09 15:16:02,715 INFO spawned: 'schema-registry' with pid 177
    2018-12-09 15:16:02,750 INFO spawned: 'rest-proxy' with pid 184
    2018-12-09 15:16:03,767 INFO success: sample-data entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
    2018-12-09 15:16:03,767 INFO success: zookeeper entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
    2018-12-09 15:16:03,767 INFO success: caddy entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
    2018-12-09 15:16:03,768 INFO success: broker entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
    2018-12-09 15:16:03,768 INFO success: smoke-tests entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
    2018-12-09 15:16:03,769 INFO success: connect-distributed entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
    2018-12-09 15:16:03,769 INFO success: logs-to-kafka entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
    2018-12-09 15:16:03,770 INFO success: schema-registry entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
    2018-12-09 15:16:03,770 INFO success: rest-proxy entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
    2018-12-09 15:21:04,693 INFO exited: logs-to-kafka (exit status 0; expected)
    2018-12-09 15:21:45,545 INFO exited: sample-data (exit status 0; expected)
    2018-12-09 15:22:41,400 INFO exited: smoke-tests (exit status 2; expected)
    

    再启动一个docker

    ljpMacBookPro:~ liangjiapeng$ docker run --rm -it --net=host landoop/fast-data-dev bash
    root@fast-data-dev / $
    

    topic 部分

    创建一个topic

    root@fast-data-dev / $ kafka-topics --zookeeper 127.0.0.1:2181 --create --topic first_topic --partitions 3 --replication-factor 1
    WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
    Created topic "first_topic".
    

    查看topic列表

    root@fast-data-dev / $ kafka-topics --zookeeper 127.0.0.1:2181 --list
    __consumer_offsets
    _schemas
    backblaze_smart
    connect-configs
    connect-offsets
    connect-statuses
    coyote-test-avro
    coyote-test-binary
    coyote-test-json
    first_topic
    logs_broker
    nyc_yellow_taxi_trip_data
    reddit_posts
    sea_vessel_position_reports
    telecom_italia_data
    telecom_italia_grid
    

    topic比较多,是我们用了fast-data-dev这个系统产生的,我们关注自己建立的那个topic即可。

    查看topic的信息

    root@fast-data-dev / $ kafka-topics --zookeeper 127.0.0.1:2181 --describe --topic first_topic
    Topic:first_topic   PartitionCount:3    ReplicationFactor:1 Configs:
        Topic: first_topic  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
        Topic: first_topic  Partition: 1    Leader: 0   Replicas: 0 Isr: 0
        Topic: first_topic  Partition: 2    Leader: 0   Replicas: 0 Isr: 0
    

    删除topic

    root@fast-data-dev / $ kafka-topics --zookeeper 127.0.0.1:2181 --delete --topic first_topic
    Topic first_topic is marked for deletion.
    Note: This will have no impact if delete.topic.enable is not set to true.
    

    初学者的疑惑?(建议读下这篇博文,读完就都明白了,别人写好了,直接引用链接吧https://www.cnblogs.com/likehua/p/3999538.html
    Q:为什么建立topic不知道kafka的端口,指定zookeeper的端口呢?
    A:kafka的配置信息都在zookeeper上,所以建立topic需要指定zookeeper的端口

    Q:partitions是啥?
    A:partitions是topic的分区,一个topic有多个分区,可以同时进行数据的存取,当然也可以通过一些策略控制数据存储在具体的分区上。

    Q:replication-factor是啥?
    A:replication-factor是复制数,即这个topic的数据要在kafka的集群中存储多少份。

    了解了topic创建部分,下面我们看下生产数据和消费数据部分

    再建立一个topic

    root@fast-data-dev / $ kafka-topics --zookeeper 127.0.0.1:2181 --create --topic my_topic --partitions 3 --replication-factor 1
    WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
    Created topic "my_topic".
    

    首先是最基本的写入操作

    root@fast-data-dev / $ kafka-console-producer --broker-list 127.0.0.1:9092 --topic my_topic
    >111
    

    我们再开一个终端就可以读取了

    root@fast-data-dev / $ kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic my_topic --from-beginning
    111
    

    初学者的疑惑?
    Q:kafka-console-producer是啥?
    A:kafka-console是kafka的客户端工具,producer是kafka中用来做写入数据的模块。

    Q:kafka-console-consumer是啥?
    A:consumer是kafka中用来消费kafka数据的模块

    Q:broker-list
    A:kafka 的节点

    Q:bootstrap-server
    A:同样指向kafka的节点,10版本之前的kafka是指向zookeeper

    Q:from-beginning
    A:默认kafka不会读取历史的数据,会从末尾读,比如我们上面的这个例子,如果我们先开了一个生产者,写入了一条数据,再开一个消费者,而消费者不携带from-beginning属性的话,那么不会获取到他启动之前生产者创建的数据。

    测试从指定topic的分区读

    在生产者终端多写入几条数据

    root@fast-data-dev / $ kafka-console-producer --broker-list 127.0.0.1:9092 --topic my_topic
    >111
    >222
    >333
    >444
    >555
    >666
    

    读取分区2的数据

    root@fast-data-dev / $ kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic my_topic --from-beginning --partition 2
    111
    444
    

    我们看到只读取了2条数据,原因是默认情况下,数据会分散的存储在topic中的不同分区中,而且不同分区中的数据互相没有关联。

    如何指定数据写入到一个分区呢?使用key
    使用key写入一些数据(key可以是字母,这里演示就用了个1)

    root@fast-data-dev / $ kafka-console-producer --broker-list 127.0.0.1:9092 --topic my_topic --property "parse.key=true" --property "key.separator=:"
    >1:k1
    >1:k2
    >1:k3
    >1:k4
    >1:k5
    

    看下结果,我们在分区0中看到了这些数据,具体在哪个分区,需要我们一个个看。

    root@fast-data-dev / $ kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic my_topic --from-beginning --partition 0
    333
    666
    k1
    k2
    k3
    k4
    k5
    

    消费组

    我们建立consumer消费数据,我们会发现如果开启了多个consumer同时消费一个topic,那么当生产者建立数据的时候,所有的消费者都可以接收到数据,同时如果一个新的消费者加进来携带了from-beginning参数,那么他会把所有的数据全部再消费一遍。

    以上的场景,单机使用可能没有问题,但是如果是多设备使用的话,就会出现重复消费的情况,这个时候使用消费组可以规避这个问题。

    建立了消费组后,消费者在消费数据的时候可以指定组参数,在多个消费组同时使用这个消费组的时候,只要任何一台设备消费了生产者的数据,另外的消费者将不再消费这条数据。有新加进来的设备指定了这个消费组的同样受此规则影响,
    使用了消费组后,组对于topic的消费都会被记录,所以任意时刻加进来的主机都不会读取到已经被其他主机消费的数据。

    kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic my_topic --consumer-property group.id=my-group-1  --from-beginning
    

    当然首次执行这条命令的时候会建立一个my-group-1的组,如果我们加了--from-beginning参数他会把数据从头消费一下,之后在运行这条命令的设备,--from-beginning参数就没有了实际意义,因为组已经建立了,也有了消费记录。

    相关文章

      网友评论

          本文标题:KAKFA 常见操作小白使用

          本文链接:https://www.haomeiwen.com/subject/qvulhqtx.html