美文网首页k8s容器容器框架
kafka(二)Kafka快速入门

kafka(二)Kafka快速入门

作者: 万事万物 | 来源:发表于2021-06-05 14:09 被阅读0次

    集群部署

    1. 配置 server.properties
    #broker的全局唯一编号,不能重复
    broker.id=0
    #删除topic功能使能,当前版本此配置默认为true,已从配置文件移除
    delete.topic.enable=true
    #kafka运行日志存放的路径
    log.dirs=/opt/module/kafka/logs
    #配置连接Zookeeper集群地址
    zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
    

    其他服务器一样配置

    1. 启动集群
    bin/kafka-server-start.sh -daemon config/server.properties
    

    其他服务器一样。

    Kafka 命令行操作

    topic 操作

    脚本
    kafka]$ bin\kafka-topics.sh
    命令选项

    选项 描述
    --alter 更改分区数,副本分配,和/或主题的配置。
    --at-min-isr-partitions 如果在描述主题时设置,则仅显示 isr 计数为的分区等于配置的最小值。 不是支持 --zookeeper 选项。
    --bootstrap-server <String: server to connect to> 必需:要连接的 Kafka 服务器。 如果提供此项,则不需要直接的 Zookeeper 连接。
    --command-config <String: command config property file> 包含要传递给管理客户端的配置的属性文件。 这仅与 --bootstrap-server 选项一起用于描述和更改代理配置。
    --config <String: name=value>
    --create 创建一个新的topic
    --delete 删除一个topic
    --delete-config <String: name> 要为现有主题删除的主题配置覆盖(请参阅 --config 选项下的配置列表)。 不支持 --bootstrap-server 选项。
    --describe 列出给定主题的详细信息。
    --disable-rack-aware 禁用机架感知副本分配
    --exclude-internal 运行 list 或 describe 命令时排除内部主题。 默认会列出内部主题
    --force 禁止控制台提示
    --help 打印帮助信息。
    --if-exists 如果在更改或删除或描述主题时设置,则该操作仅在主题存在时执行。 不支持 --bootstrap-server 选项。
    --if-not-exists 如果在创建主题时设置,则只有在主题不存在时才会执行操作。 不支持 --bootstrap- 服务器选项。
    --list 列出所有可用的topic。
    --partitions <Integer: # of partitions> 设置topic 分区数
    --replication-factor <Integer:replication factor> 指定topic的副本数
    --topic <String: topic> 指定topic 名称
    --topics-with-overrides 如果在描述主题时设置,则仅显示已覆盖配置的主题
    --unavailable-partitions 如果在描述主题时设置,则只显示其领导者不可用的分区
    --under-min-isr-partitions 如果在描述主题时设置,则仅显示 isr 计数小于配置的最小值的分区。 不支持 --zookeeper 选项。
    --under-replicated-partitions 如果在描述主题时设置,则仅显示在复制分区下
    --version 展示Kafka版本
    --zookeeper <String: hosts> 已弃用,zookeeper 连接的连接字符串,格式为 host:port。 可以提供多个主机以允许故障转移。

    案例

    1. 创建一个 topic
      语法:kafka-topics.sh --create --zookeeper <host>:<port> --if-not-exists --replication-factor <副本数> --partitions <分区数> --topic <副本名称>
    bin]$ kafka-topics.sh --create --zookeeper hadoop102:2181 --if-not-exists --replication-factor 3 --partitions 3 --topic test
    #输出结果
    Created topic test.
    
    1. 查看当前服务器中的所有 topic
      语法: kafka-topics.sh --zookeeper <host>:<port> --list
    bin]$ kafka-topics.sh --zookeeper hadoop102:2181 --list
    # 输出结果
    __consumer_offsets
    abc
    test
    
    1. 删除一个topic
      语法:kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic test
      需要server.properties中设置delete.topic.enable=true否则只是标记删除。
    bin]$ kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic test
    # 输出结果
    Topic test is marked for deletion. # 并不会马上删除,而是先对该topic做一个标记,后面再进行删除
    #需要在 配置中设置 delete.topic.enable=true ,否则不会进行删除
    Note: This will have no impact if delete.topic.enable is not set to true.
    
    1. 查看 topic 详情
      语法:--describe
    [atguigu@hadoop102 bin]$ kafka-topics.sh  --describe --bootstrap-server hadoop102:9092 --topic abc
    #  topic  abc 详细信息
    Topic: abc  PartitionCount: 1   ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: abc  Partition: 0    Leader: 1   Replicas: 2,0,1 Isr: 1,2,0
    
    参数 描述
    Topic topic名称
    PartitionCount 分区数
    ReplicationFactor 定义的分区数
    Configs 配置
    Partition 当前分区位置
    Leader 当前那个broker为Leader
    Replicas 副本位置
    Isr lsr同步队列

    producer 操作

    脚本
    kafka]$ bin\kafka-console-producer.sh
    命令选项

    选项 描述
    --batch-size <Integer: size> 如果消息不是同步发送的,则要在单个批次中发送的消息数。 (默认值:200)
    --broker-list <String: broker-list> 链接Kafka,必需:采用 HOST1:PORT1,HOST2:PORT2 形式的代理列表字符串。
    --compression-codec [String: compression-codec] 支持的压缩方式'none', 'gzip', 'snappy', 'lz4', or 'zstd'. 默认 'gzip'
    --help 打印帮助信息
    --line-reader <String: reader_class> 用于从标准输入读取行的类的类名。默认情况下,每行都作为单独的消息读取。 (默认:kafka.tools.ConsoleProducer$LineMessageReader)
    --max-block-ms <Long: > 生产者发送的最大时间(默认:60000)
    --max-memory-bytes <Long: > 缓冲大小,以字节为单位 (默认:33554432)
    --max-partition-memory-bytes <Long: The buffer size allocated for a memory in bytes per partition> 合并数据的最小数 (默认: 16384)
    --message-send-max-retries <Integer> 退休数,默认为3
    --metadata-expiry-ms <Long:> 强制刷新数据条数默认为300000,元数据以毫秒为单位的过期间隔时间段
    --producer-property <String> 传递用户定义的Producer_Prop的机制
    --producer.config <String: config file> 指定配置文件。 请注意, [producer-property] 优先于此配置。
    --property <String: prop> 一种将用户定义的属性以 key=value 的形式传递给消息阅读器的机制。 这允许对用户定义的消息阅读器进行自定义配置。
    --request-required-acks <String:> 设置ack(确认收到)的三种模式(0,1,-1),默认为1
    --request-timeout-ms <Integer:> 设置ack 的超时时间(单位毫秒)默认为 1500
    --retry-backoff-ms <Integer> 等待选举时间,默认为100)
    --socket-buffer-size <Integer: size> 设置 tcp RECV 大小(默认: 102400)
    --sync 设置为同步的
    --timeout <Integer: timeout_ms> 如果设置和生产者运行异步模式,这给一条消息的最长时间是否有足够的队列等待批处理大小。该值以ms为单位。(默认:1000)
    --topic <String: topic> 生产的消息发送给定的主题
    --version 显示Kafka版本
    1. 发送消息
      语法:kafka-console-producer.sh --broker-list <kafkaIP1>:<端口> <kafkaIP2>:<端口> --topic <topic名称>
    bin]$ kafka-console-producer.sh --broker-list hadoop102:9092 hadoop103:9092 --topic abc
    #输出
    >hello
    

    hadoop102 接收 topic abc 消息

    [admin@hadoop102 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic abc
    #接收生产者推送的消息
    hello
    

    hadoop103 接收 topic abc 消息

    [admin@hadoop103 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic abc
    #接收生产者推送的消息
    hello
    

    consumer操作

    脚本
    kafka]$ bin/kafka-console-consumer.sh
    命令选项

    选项 描述
    --bootstrap-server <String: server to connect to> 需:要连接的服务器。
    --consumer-property <String: consumer_prop> 一种将用户定义的属性以 key=value 的形式传递给消费者的机制。
    --consumer.config <String: config file> consumer配置属性文件。 请注意, [consumer-property] 优先于此配置。
    --enable-systest-events 记录消费者的消息及生命周期,用于系统测试
    --formatter <String: class> 用于格式化 kafka 消息以供显示的类的名称。 (默认:kafka.tools.DefaultMessageFormatter)
    --from-beginning 如果消费者还没有一个既定的偏移量来消费,那么从日志中出现的最早的消息而不是最新的消息开始。
    --group <String: consumer group id> 消费者的消费者组ID。
    --help 打印帮助信息
    --isolation-level <String> 设置为 read_committed 以过滤掉未提交的事务消息。 设置为 read_uncommitted 以读取所有消息。 (默认值:read_uncommitted)
    --key-deserializer <String: deserializer for key> 设置 密钥的解串器
    --max-messages <Integer: num_messages> 退出前消费的最大消息数。 如果未设置,则消耗是连续的。
    --offset <String: consume offset> 要消耗的偏移量 id(非负数),或 'earliest' 表示从开始,或 'latest' 表示从结束(默认值:latest)
    --partition <Integer: partition> 要消费的分区。 除非指定了“--offset”,否则消耗从分区的末尾开始。
    --property <String: prop> 初始化消息格式化程序的属性
    --skip-message-on-error 如果在处理消息时出现错误,请跳过而不是暂停。
    --timeout-ms <Integer: timeout_ms> 如果指定,则在指定的时间间隔内没有可供消费的消息时退出。要消费的主题 ID。
    --value-deserializer <String: deserializer for values> 值的解串器
    --version 显示Kafka版本
    --whitelist <String: whitelist> 指定要包含以供使用的主题白名单的正则表达式。

    案例

    1. 消费消息
      语法:kafka-console-consumer.sh --bootstrap-server <host>:<post> --topic <topic名称>
    [admin@hadoop103 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic abc
    #接收生产者推送的消息
    hello
    
    1. 消费所有的消息
      语法:--from-beginning
    [admin@hadoop103 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic abc --from-beginning
    #接收生产者推送的消息
    sh
    nihao
    发哦那旮
    ka
    niha
    hdalfajkl
    你好
    股东大法师
    hello
    python
    hello
    haoh
    hello
    hello
    hflahfla
    flajklfja
    flajla
    afadf
    

    相关文章

      网友评论

        本文标题:kafka(二)Kafka快速入门

        本文链接:https://www.haomeiwen.com/subject/myqgsltx.html