美文网首页高并发与分布式
Apache Kafka 部署与启动

Apache Kafka 部署与启动

作者: 挂机的啊洋zzZ | 来源:发表于2019-01-21 23:27 被阅读33次
    1.png

    Apache Kafka 部署与启动

    介绍完kafka基础信息,下面进行部署和启动介绍。


    安装前的环境准备

    由于Kafka是用Scala语言开发的,运行在JVM上,因此在安装Kafka之前需要先安装JDK。
    最好选择JDK1.8+的版本。

    安装JDK

    可参考:
    Linux CentOS 7 安装JDK详细步骤


    kafka依赖zookeeper,所以需要先安装zookeeper

    安装zookeeper

    获取zookeeper压缩包:

    [root@node-100 local]# mkdir zookeeper
    [root@node-100 local]# cd zookeeper/
    [root@node-100 local]# wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.12.tar.gz
    

    解压:

    [root@node-100 zookeeper]# tar -zxvf zookeeper-3.4.12.tar.gz
    

    进入解压好的目录,修改配置文件:

    [root@node-100 zookeeper]# ls
    zookeeper-3.4.12
    [root@node-100 zookeeper]# cd zookeeper-3.4.12/
    [root@node-100 zookeeper-3.4.12]# ls
    bin        conf     dist-maven  ivysettings.xml  lib          NOTICE.txt  README_packaging.txt  src                   zookeeper-3.4.12.jar.asc  zookeeper-3.4.12.jar.sha1
    build.xml  contrib  docs        ivy.xml          LICENSE.txt  README.md   recipes               zookeeper-3.4.12.jar  zookeeper-3.4.12.jar.md5
    [root@node-100 zookeeper-3.4.12]# cd conf
    [root@node-100 conf]# ls
    configuration.xsl  log4j.properties  zoo_sample.cfg
    [root@node-100 conf]# cp zoo_sample.cfg zoo.cfg.bak
    [root@node-100 conf]# mv zoo_sample.cfg zoo.cfg
    [root@node-100 conf]# ls
    configuration.xsl  log4j.properties  zoo.cfg  zoo.cfg.bak
    [root@node-100 conf]# 
    

    修改日志目录:

    [root@node-100 conf]# vim zoo.cfg
    

    修改:

    dataDir=/usr/local/zookeeper/zookeeper-3.4.12/logs #日志目录
    clientPort=2181  #端口
    

    启动服务端:

    [root@node-100 zookeeper-3.4.12]# bin/zkServer.sh start
    ZooKeeper JMX enabled by default
    Using config: /usr/local/zookeeper/zookeeper-3.4.12/bin/../conf/zoo.cfg
    Starting zookeeper ... STARTED
    [root@node-100 zookeeper-3.4.12]# 
    

    启动客户端:

    [root@node-100 zookeeper-3.4.12]# bin/zkCli.sh -server 192.168.5.100:2181
    Connecting to 192.168.5.100:2181
    2019-01-03 23:15:32,779 [myid:] - INFO  [main:Environment@100] - Client environment:zookeeper.version=3.4.12-e5259e437540f349646870ea94dc2658c4e44b3b, built on 03/27/2018 03:55 GMT
    2019-01-03 23:15:32,782 [myid:] - INFO  [main:Environment@100] - Client environment:host.name=node-100
    2019-01-03 23:15:32,782 [myid:] - INFO  [main:Environment@100] - Client environment:java.version=1.8.0_191
    2019-01-03 23:15:32,783 [myid:] - INFO  [main:Environment@100] - Client environment:java.vendor=Oracle Corporation
    2019-01-03 23:15:32,783 [myid:] - INFO  [main:Environment@100] - Client environment:java.home=/usr/local/java/jdk1.8.0_191/jre
    2019-01-03 23:15:32,784 [myid:] - INFO  [main:Environment@100] - Client environment:java.class.path=/usr/local/zookeeper/zookeeper-3.4.12/bin/../build/classes:/usr/local/zookeeper/zookeeper-3.4.12/bin/../build/lib/*.jar:/usr/local/zookeeper/zookeeper-3.4.12/bin/../lib/slf4j-log4j12-1.7.25.jar:/usr/local/zookeeper/zookeeper-3.4.12/bin/../lib/slf4j-api-1.7.25.jar:/usr/local/zookeeper/zookeeper-3.4.12/bin/../lib/netty-3.10.6.Final.jar:/usr/local/zookeeper/zookeeper-3.4.12/bin/../lib/log4j-1.2.17.jar:/usr/local/zookeeper/zookeeper-3.4.12/bin/../lib/jline-0.9.94.jar:/usr/local/zookeeper/zookeeper-3.4.12/bin/../lib/audience-annotations-0.5.0.jar:/usr/local/zookeeper/zookeeper-3.4.12/bin/../zookeeper-3.4.12.jar:/usr/local/zookeeper/zookeeper-3.4.12/bin/../src/java/lib/*.jar:/usr/local/zookeeper/zookeeper-3.4.12/bin/../conf:.:/usr/local/java/jdk1.8.0_191/lib:/usr/local/java/jdk1.8.0_191/jre/lib:.:/usr/local/java/jdk1.8.0_191/lib:/usr/local/java/jdk1.8.0_191/jre/lib:.:/usr/local/java/jdk1.8.0_191/lib:/usr/local/java/jdk1.8.0_191/jre/lib:
    2019-01-03 23:15:32,784 [myid:] - INFO  [main:Environment@100] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
    2019-01-03 23:15:32,784 [myid:] - INFO  [main:Environment@100] - Client environment:java.io.tmpdir=/tmp
    2019-01-03 23:15:32,784 [myid:] - INFO  [main:Environment@100] - Client environment:java.compiler=<NA>
    2019-01-03 23:15:32,784 [myid:] - INFO  [main:Environment@100] - Client environment:os.name=Linux
    2019-01-03 23:15:32,784 [myid:] - INFO  [main:Environment@100] - Client environment:os.arch=amd64
    2019-01-03 23:15:32,784 [myid:] - INFO  [main:Environment@100] - Client environment:os.version=3.10.0-327.el7.x86_64
    2019-01-03 23:15:32,784 [myid:] - INFO  [main:Environment@100] - Client environment:user.name=root
    2019-01-03 23:15:32,784 [myid:] - INFO  [main:Environment@100] - Client environment:user.home=/root
    2019-01-03 23:15:32,785 [myid:] - INFO  [main:Environment@100] - Client environment:user.dir=/usr/local/zookeeper/zookeeper-3.4.12
    2019-01-03 23:15:32,786 [myid:] - INFO  [main:ZooKeeper@441] - Initiating client connection, connectString=192.168.5.100:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@69d0a921
    Welcome to ZooKeeper!
    JLine support is enabled
    2019-01-03 23:15:32,879 [myid:] - INFO  [main-SendThread(192.168.5.100:2181):ClientCnxn$SendThread@1028] - Opening socket connection to server 192.168.5.100/192.168.5.100:2181. Will not attempt to authenticate using SASL (unknown error)
    2019-01-03 23:15:32,967 [myid:] - INFO  [main-SendThread(192.168.5.100:2181):ClientCnxn$SendThread@878] - Socket connection established to 192.168.5.100/192.168.5.100:2181, initiating session
    2019-01-03 23:15:33,020 [myid:] - INFO  [main-SendThread(192.168.5.100:2181):ClientCnxn$SendThread@1302] - Session establishment complete on server 192.168.5.100/192.168.5.100:2181, sessionid = 0x1000030aeca0000, negotiated timeout = 30000
    
    WATCHER::
    
    WatchedEvent state:SyncConnected type:None path:null
    [zk: 192.168.5.100:2181(CONNECTED) 0] 
    

    查看根节点:

    [zk: 192.168.5.100:2181(CONNECTED) 0] ls /
    [zookeeper]
    [zk: 192.168.5.100:2181(CONNECTED) 1] 
    

    开始部署Kafka
    第一步:下载安装包

    创建kafka目录

    [root@node-100 local]# cd /usr/local
    [root@node-100 local]# mkdir kafka
    

    获取安装包:kafka_2.12-2.1.0.tgz(这是目前最新的版本,如果实际生产中应用,最好下载之前的release版本,例如:1.1.0 release版本)

    wget http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.12-2.1.0.tgz
    tar -xvf kafka_2.12-2.1.0.tgz 
    cd kafka_2.12-2.1.0/
    
    第二步:启动服务

    修改配置文件:server.properties

    [root@node-100 kafka_2.12-2.1.0]# cd config/
    [root@node-100 config]# ls
    connect-console-sink.properties    connect-file-sink.properties    connect-standalone.properties  producer.properties     trogdor.conf
    connect-console-source.properties  connect-file-source.properties  consumer.properties            server.properties       zookeeper.properties
    connect-distributed.properties     connect-log4j.properties        log4j.properties               tools-log4j.properties
    [root@node-100 config]# vim server.properties 
    

    server.properties :

    ############################# Server Basics #############################
    # 每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况
    broker.id=0
    
    # broker server服务端口
    port=9092
    
    # The number of threads that the server uses for receiving requests from the network and sending responses to the network
    # broker处理消息的最大线程数,一般情况下不需要去修改
    num.network.threads=3
    
    # The number of threads that the server uses for processing requests, which may include disk I/O
    # broker处理磁盘IO的线程数,数值应该大于你的硬盘数
    num.io.threads=8
    
    # The send buffer (SO_SNDBUF) used by the socket server
    # socket server的发送缓冲区,socket的调优参数SO_SNDBUFF
    socket.send.buffer.bytes=102400
    
    # The receive buffer (SO_RCVBUF) used by the socket server
    # socket server的接受缓冲区,socket的调优参数SO_RCVBUFF
    socket.receive.buffer.bytes=102400
    
    # The maximum size of a request that the socket server will accept (protection against OOM)
    # socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
    socket.request.max.bytes=104857600
    
    ############################# Log Basics #############################
    # A comma separated list of directories under which to store log files
    # kafka数据的存放地址,多个地址的话用逗号分割 /data/kafka-logs-1,/data/kafka-logs-2
    log.dirs=/usr/local/kafka/kafka_2.12-2.1.0/data/kafka-logs
    
    # The default number of log partitions per topic. More partitions allow greater
    # parallelism for consumption, but this will also result in more files across
    # the brokers.
    # 每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖
    num.partitions=1
    
    # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    # This value is recommended to be increased for installations with data dirs located in RAID array.
    # 每个数据目录用来日志恢复的线程数目
    num.recovery.threads.per.data.dir=1
    
    ############################# Internal Topic Settings  #############################
    # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
    # For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
    # 组元数据内部主题的复制因子
    # 对于开发测试以外的任何其他测试,建议大于1的值以确保可用性,如3。
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    ############################# Log Flush Policy #############################
    
    # The number of messages to accept before forcing a flush of data to disk
    # log文件”sync”到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个”数据可靠性"的必要手段,所以此参数的设置,
    # 需要在"数据可靠性"与"性能"之间做必要的权衡.如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞),如果此值过小,
    # 将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.物理server故障,将会导致没有fsync的消息丢失.
    #log.flush.interval.messages=10000
    
    # The maximum amount of time a message can sit in a log before we force a flush
    # 仅仅通过interval来控制消息的磁盘写入时机,是不足的.此参数用于控制"fsync"的时间间隔,
    # 如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔达到阀值,也将触发.
    #log.flush.interval.ms=1000
    
    ############################# Log Retention Policy #############################
    
    # The minimum age of a log file to be eligible for deletion due to age
    # 每个日志文件删除之前保存的时间。默认数据保存时间对所有topic都一样。
    # log.retention.minutes和log.retention.bytes都是用来设置删除日志文件的,无论哪个属性已经溢出。
    # 这个属性设置可以在topic基本设置时进行覆盖。
    log.retention.hours=168
    
    # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
    # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
    # 每个topic下每个partition保存数据的总量;
    # 注意,这是每个partitions的上限,因此这个数值乘以partitions的个数就是每个topic保存的数据总量。
    # 同时注意:如果log.retention.hours和log.retention.bytes都设置了,
    # 则超过了任何一个限制都会造成删除一个段文件。
    # 这项设置可以由每个topic设置时进行覆盖。
    #log.retention.bytes=1073741824
    
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    # topic partition的日志存放在某个目录下诸多文件中,这些文件将partition的日志切分成一段一段的;
    # 这个属性就是每个文件的最大尺寸;当尺寸达到这个数值时,就会创建新文件。此设置可以由每个topic基础设置时进行覆盖。
    log.segment.bytes=1073741824
    
    # The interval at which log segments are checked to see if they can be deleted according
    # to the retention policies
    # 检查日志分段文件的间隔时间,以确定是否文件属性是否到达删除要求。300000(5 minutes)
    log.retention.check.interval.ms=300000
    
    ############################# Zookeeper #############################
    
    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.
    # 指定zookeeper的连接的字符串,格式是hostname:port,
    # 此处host和port都是zookeeper server的host和port,
    # 为避免某个zookeeper 机器宕机之后失联,你可以指定多个hostname:port,
    # 使用逗号作为分隔:hostname1:port1,hostname2:port2,hostname3:port3
    # 可以在zookeeper连接字符串中加入zookeeper的chroot路径,
    # 此路径用于存放他自己的数据,
    # 方式:hostname1:port1,hostname2:port2,hostname3:port3/chroot/path
    zookeeper.connect=192.168.5.100:2181
    
    # Timeout in ms for connecting to zookeeper
    # 客户端在建立通zookeeper连接中的最大等待时间
    zookeeper.connection.timeout.ms=6000
    
    ############################# Group Coordinator Settings #############################
    
    # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
    # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
    # The default value for this is 3 seconds.
    # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
    # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
    #以下配置指定GroupCoordinator将延迟初始使用者重新平衡的时间(以毫秒为单位)。
    #当新成员加入该组时,重新平衡将被group.initial.rebalance.delay.ms的值进一步延迟,最大值为max.poll.interval.ms。
    #默认值为3秒。
    #我们在这里将其覆盖为0,因为它为开发和测试提供了更好的开箱即用体验。
    #然而,在生产环境中,默认值3秒更合适,因为这将有助于避免在应用程序启动期间不必要的、潜在的昂贵的重新平衡。
    group.initial.rebalance.delay.ms=0
    

    更多配置信息可以参考:

    https://yq.aliyun.com/ziliao/417941
    https://www.cnblogs.com/fillPv/p/5953852.html
    

    下面来启动kafka:

    [root@node-100 kafka_2.12-2.1.0]# ls
    bin  config  data  libs  LICENSE  NOTICE  site-docs
    [root@node-100 kafka_2.12-2.1.0]# bin/kafka-server-start.sh -daemon config/server.properties 
    [root@node-100 kafka_2.12-2.1.0]# 
    

    提示:

    启动脚本语法:kafka-server-start.sh [-daemon] server.properties
    可以看到,server.properties的配置路径是一个强制的参数,
    -daemon表示以后台进程运行,否则ssh客户端退出后,就会停止服务。
    (注意,在启动kafka时会使用linux主机名关联的ip地址,
    所以需要把主机名和linux的ip映射配置到本地host里,用vim /etc/hosts)
    

    我们进入zookeeper目录通过zookeeper客户端查看下zookeeper的目录树

    [zk: localhost:2181(CONNECTED) 1] ls /
    [cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
    [zk: localhost:2181(CONNECTED) 2] ls /brokers/ids
    [0]
    [zk: localhost:2181(CONNECTED) 3] 
    

    ok,启动成功。


    由于时间太晚了,先写一半,后面的topic的创建与发送消息和接收消息,以及单播消息和多播消息还有集群的创建下篇继续写。
    如果有问题,欢迎指正:)
    感觉有帮助,可以点个喜欢:)

    相关文章

      网友评论

        本文标题:Apache Kafka 部署与启动

        本文链接:https://www.haomeiwen.com/subject/pzlkjqtx.html