美文网首页我爱编程
构建大数据平台(六)Kafka集群搭建

构建大数据平台(六)Kafka集群搭建

作者: Mr_ZhaiDK | 来源:发表于2018-02-28 14:54 被阅读0次

一、前提:

(1)Kafka服务,是基于zookeeper的。

(2)Kafka使用稳定版本:kafka_2.11-1.0.0.tgz;

(3)可在master上安装后,scp到其他slave机器上。

二、配置Hbase:

首先要注意在生产环境中目录结构要定义好,防止在项目过多的时候找不到所需的项目

  1. 目录统一放在/home下面 ,首先创建kafka项目目录
cd /home

#创建项目目录
mkdir kafka
cd /home/kafka

#创建kafka消息目录,主要存放kafka消息
mkdir kafkalogs
  1. 将 kafka_2.11-1.0.0.tgz 放在 /home/kafka下,并解压缩,解压缩完成后可删除安装包:
#解压缩
tar -zxvf kafka_2.11-1.0.0.tgz

#完成后可删除
rm -f kafka_2.11-1.0.0.tgz
  1. 修改配置文件

(1)进入到config目录

cd /home/kafka/kafka_2.11-1.0.0/config/

主要关注:server.properties 这个文件即可,我们可以发现在目录下:
有很多文件,这里可以发现有Zookeeper文件,我们可以根据Kafka内带的zk集群来启动,但是建议使用独立的zk集群

(2)修改配置文件 server.properties:

其中broker.id=,host.name=,listeners=,每台服务器都不能相同,scp完后注意修改

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://master:9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value


num.network.threads=3

num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
# A comma seperated list of directories under which to store log files
log.dirs=/home/kafka/kafkalogs


num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# from the end of the log.


message.max.byte=5242880

replica.fetch.max.bytes=5242880

# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

log.segment.bytes=1073741824

default.replication.factor=2

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

log.cleaner.enable=false

############################# Zookeeper #############################

# root directory for all kafka znodes.
############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

(3) 将安装配置好的kafka目录复制到所有slave节点中:

scp -r /home/kafka/ root@slave1:/home/

(4)修改所有slave节点中server.properties中的broker.id=,host.name=,listeners=。

三、启动kafka:

  1. 启动kafka集群,命令如下:
#从后台启动Kafka集群(所有机器都需要启动)
cd
#进入到kafka的bin目录 
cd /home/kafka/kafka_2.11-1.0.0/bin 
#启动服务
./kafka-server-start.sh -daemon ../config/server.properties
  1. 使用命令:jps,检查服务是否启动

  2. 创建Topic来验证是否创建成功

(1)创建Topic:

cd /home/kafka/kafka_2.11-1.0.0/bin 
./kafka-topics.sh --create --zookeeper master:2181 --replication-factor 3 --partitions 1 --topic test1

#解释
--replication-factor 2   #复制3份
--partitions 1 #创建1个分区
--topic #主题为test1

(2)在一台服务器上创建一个发布者,这里我们选用slave1:

#创建一个broker,发布者
./kafka-console-producer.sh --broker-list slave1:9092 --topic test1

(3)在一台服务器上创建一个订阅者,这里我们选择master:

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test1 --from-beginning

(4)测试

在发布者那里发布消息看看订阅者那里是否能正常收到。

如果收到,至此,Kafka集群环境搭建完成!

四、补充:

大部分命令可以去官方文档查看

  1. 查看topic
#就会显示我们创建的所有topic
./kafka-topics.sh --list --zookeeper localhost:2181
  1. 查看topic状态
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1
#下面是显示信息
Topic:test1 PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: test1    Partition: 0    Leader: 0   Replicas: 0,1,2 Isr: 0,1,2
#分区为为1  复制因子为2   test1的分区为0 
#Replicas: 0,1   复制的为0,1,,3
  1. 上面的大家你完成之后可以登录zk来查看zk的目录情况
#使用客户端进入zk
./zkCli.sh -server 127.0.0.1:2181  #默认是不用加’-server‘参数的因为我们修改了他的端口

#标注一个重要的
[zk: 127.0.0.1:2181(CONNECTED) 1] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://master:9092"],"jmx_port":-1,"host":"master","timestamp":"1519797571108","port":9092,"version":4}
cZxid = 0x700000055
ctime = Wed Feb 28 13:59:30 CST 2018
mZxid = 0x700000055
mtime = Wed Feb 28 13:59:30 CST 2018
pZxid = 0x700000055
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x361dafd39070000
dataLength = 182
numChildren = 0

#还有一个是查看partion
[zk: 127.0.0.1:2181(CONNECTED) 3] get /brokers/topics/test1/partitions/0  
null
cZxid = 0x700000066
ctime = Wed Feb 28 14:33:35 CST 2018
mZxid = 0x700000066
mtime = Wed Feb 28 14:33:35 CST 2018
pZxid = 0x700000067
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1

相关文章

网友评论

    本文标题:构建大数据平台(六)Kafka集群搭建

    本文链接:https://www.haomeiwen.com/subject/rkvbxftx.html