美文网首页
分布式消息队列kafka

分布式消息队列kafka

作者: 机灵鬼鬼 | 来源:发表于2019-05-25 13:34 被阅读0次

    kafka三大特性

    第一、发布和订阅

    第二、实时的流处理

    第三、安全地存储流数据在集群节点上

    kafka的架构

    producers、consumers、brocker

    First a few concepts:

    Kafka is run as a cluster on one or more servers that can span multiple datacenters.

    解析:kafka是一个以集群形式运行在一个或多个机器上,跨多数据中心的服务。

    The Kafka cluster stores streams of records in categories called topics.

    解析:kafa集群使用topics把流数据记录存储时做分类。

    Each record consists of a key, a value, and a timestamp.

    解析:每一条记录包含一个key,一个value,和一个时间戳

    查看官方文档的QUICKSTART模块

    由于kafka依赖zookeeper,所以我们安装kafka之前要先安装zookeeper

    Zookeeper的下载地址,统一从cdh5那个地址(http://archive.cloudera.com/cdh5/cdh/5/)下载。

    解压zk

    配置环境变量~/.bash_profile

    配置zk的数据存储目录

    下载kafka

    1、单节点单broker的部署及使用

    解压kafka

    tar -zxvf kafka_2.11-2.2.0.tgz

    设置kafka的根目录和启动bin目录配置在环境变量里面

    PATH=$PATH:$HOME/bin

    export KAFKA_HOME=/usr/local/kafka_2.11-2.1.1

    export PATH=$KAFKA_HOME/bin:$PATH

    配置环境变量当前登录用户的私有环境变量~/.bash_profile

    配置kafka的配置文件server.properties

    $KAFKA_HOME/config/server.properties

    broker.id=0  //这个配置是broker的编号,且该编号唯一对应一个broker不能重复。

    # The address the socket server listens on. It will get the value returned from

    # java.net.InetAddress.getCanonicalHostName() if not configured

    listeners=PLAINTEXT://10.101.3.3:9092,如果不设置他会默认本机hostname

    log.dirs //存储kafka的日志

    zookeeper.connect //zk的地址

    启动zookeeper

    如果你已经有了zookeeper,那就跳过该步骤,如果没有那就直接使用kafka自己安装包里的zookeeper,启动命令

    bin/zookeeper-server-start.sh config/zookeeper.properties

    启动kafka

    后台方式: ./kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties &

    会话方式:./kafka-server-start.sh ${KAFKA_HOME}/config/server.properties

    查看是否启动 jps 命令

    查看kafka、zookeeper使用的配置文件   jps -m

    创建一个topic:指定ZK

    ./kafka-topics.sh --create --zookeeper 10.101.3.3:2181 --replication-factor 1 --partitions 1 --topic mytopic

    备注:1、创建topic的时候,需要使用--create命令 2、需要指定zk的地址--zookeeper 10.101.3.3:2181 3、指定副本系数--replication-factor 1 4、指定分区 --partitions 1  5、指定topic的名字--topic mytopic

    查看所有已经创建的topic

    bin/kafka-topics.sh --list --zookeeper 10.101.3.3:2181

    生产消息:指定Broker(localhost最好是ip或者域名,不要使用localhost)

    ./kafka-console-producer.sh --broker-list 10.101.3.3:9092 --topic mytopic

    备注:启动消息生产者,也就是启动一个broker,需要使用kafka-console-producer.sh命令指定broker的服务暴漏端口和对应的topic。如果你出现以下错误,就是因为ip或域名没有设置正确导致的

    消费消息:指定生产者关联的Broker

    ./kafka-console-consumer.sh --bootstrap-server 10.101.3.3:9092 --topic mytopic --from-beginning

    --from-beginning参数的解析

    这个参数就是告诉消费者服务,要全部监听所有生产者的消息。如果不带有此参数,那么该消费服务只会接受启动时间之后监听的消息,之前的消息是不会监听的。

    查看所有topic的设置详情

    bin/kafka-topics.sh --describe --zookeeper localhost:2181 

    查看某一个topic的 设置详情

    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic mytopic这里只查看名叫test的topic的设置信息

    关闭kafka的服务

    bin/kafka-server-stop.sh

    关闭前

    关闭后

    kafka服务消失

    2、单节点多Broker部署及应用

    首先看下图,是官网原图:意思就是把kafka的server.properties配置文件复制多份。

    我们就按照官网说的来,我部署三个节点

    三个broker的配置文件

    需要把server-1.properties、server-1.properties、server-1.properties的broker.id和日志目录和listeners配置分别改一下

    config/server-1.properties:

        broker.id=1

        listeners=PLAINTEXT://:9093

        log.dirs=/tmp/kafka-logs-1

    config/server-2.properties:

        broker.id=2

        listeners=PLAINTEXT://:9094

        log.dirs=/tmp/kafka-logs-2

    config/server-3.properties:

        broker.id=3

    listeners=PLAINTEXT://:9095

        log.dirs=/tmp/kafka-logs-3

    其他的两个同理设置

    启动多broker(后台方式启动)

    ./kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties &

    ./kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties &

    ./kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties &

    启动后,可以看到3各kafka的服务。

    kafka服务对应的配置文件:jps -m可以查看到

    创建一个topic

    ./kafka-topics.sh --create --zookeeper 10.101.3.3:2181 --replication-factor 3 --partitions 1 --topic my_m_topic

    备注:这时候创建topic时,需要根据broker的节点数量,指定副本系数--replication-factor,这时候就应该为3了。

    查看所有已创建的topic

    bin/kafka-topics.sh --list --zookeeper localhost:2181

    查看我们多副本topic的详情

    ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my_m_topic

    其中Leader:代表主副本。Replicas:代表副本所在的broker机器编号1、2、3  Isr:代表存活的副本。

    生产消息:指定Broker(localhost最好是ip或者域名,不要使用localhost),这里的--broker-list后面需要多个broker节点ip

    ./kafka-console-producer.sh --broker-list 10.101.3.3:9093,10.101.3.3:9094,10.101.3.3:9095 --topic my_m_topic


    消费消息:指定生产者关联的zk

    ./kafka-console-consumer.sh --bootstrap-server 10.101.3.3:9093,10.101.3.3:9094,10.101.3.3:9095 --from-beginning --topic my_m_topic 

    删除topic

    server.properties 设置 delete.topic.enable=true

    如果没有设置 delete.topic.enable=true,则调用kafka 的delete命令无法真正将topic删除,而是显示(marked for deletion)

    调用命令删除topic:

    ./kafka-topics.sh --delete --zookeeper localhost:2181 --topic my_m3_topic

    删除kafka存储目录(server.properties文件log.dirs配置,默认为"/data/kafka-logs")相关topic的数据目录。

    recovery-point-offset-checkpoint和replication-offset-checkpoint中的1变为0,删除topic对应的名字

    kafka的容错性

    如果kafka的多个节点中,我们杀掉任何一个节点,都不会影响消息的传输。如果我们干掉leader节点,他内部会把其他副节点转正,成为leader,也不会影响消息传输。

    相关文章

      网友评论

          本文标题:分布式消息队列kafka

          本文链接:https://www.haomeiwen.com/subject/gicnpqtx.html