kafka简介
kafka的优点
Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:
1、 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能
2、 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输
3、 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输
4、 同时支持离线数据处理和实时数据处理
为什么选用kafka消息中间件
1、 考虑到后期聊天记录会很多,直接持久化对接数据库,会对数据库造成较大的压力,所以采用kafka消息中间件,建立集群,对消息进行持久化,相当于一个缓冲的作用,减少服务器端的压力。
2、kafka比较便于扩展,如果后期集群有瓶颈,可以针对不同的业务进行建立不同的kafka集群,灵活性比较高。便于后期扩展
3、减少系统之间的耦合性,多个系统之间都可以通过kafka进行通信。
Kafka相关概念说明
1、broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker
2、Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)
3、Partition
parition是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件
partition和replication之间的关系非常形象:

4、Producer
负责发布消息到Kafka broker
5、Custom
消费消息。每个consumer属于一个特定的consumer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。使用consumer high level API时,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。
Kafka架构
1、kafka基本架构

2、Broker的内部设计

zookeeper在kafka的作用
kafka在zookeeper中注册相关信息,zookeeper对于kafka来说起到协调服务的作用,下图展示了zookeeper对于kafka的意义。

zookeeper我们采用3台机器便于后期进行服务仲裁选择leader,如果其中一台zookeeper服务中断了,不影响整个集群的使用。
系统架构图(采用dia软件画的)

Zookeeper集群搭建
Zookeeper集群搭建
目前在测试环境搭建了三台服务器zookeeper服务器,分别是:
118.144.xxx.148
118.144.xxx.151
118.144.xxx.154
下载zookeeper安装文件: 下载地址: /opt/
解压后进行安装,修改名称为 zookeeper
在路径/opt/zookeeper/conf下创建配置文件zoo.cfg
注意需要先在zookeeper下创建如下文件夹: data dataLog 用于存放数据文件和日志文件
在对应的data目录下创建myid文件,里面放置1,2,3,其中1放置在118.144.xxx.148 2放置在118.144.xxx.151 3放置在118.144.xxx.154
配置zookeeper配置文件
添加如下配置:
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
server.1=118.144.xxx.148:2888:3888
server.2=118.144.xxx.151:2888:3888
server.3=118.144.xxx.154:2888:3888
dataDir=/opt/zookeeper/data
dataLogDir=/opt/zookeeper/dataLog
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
验证zookeeper安装成功
如上配置文件,分别在118.144.xxx.148 , 118.144.xxx.151 , 118.144.xxx.154三台机器上做相同的操作即可
然后进行启动操作:
启动命令:
bin/zkServer.sh start
代表启动了zookeeper,然后通过 如下命令查看zookeeper的状态:
bin/zkServer.sh status
状态如下:

如上图分别在三台服务器上查看状态,可以明显看到zookeeper的启动状态,其中进行了选举,经过选举:
118.144.xxx.154为leader
118.144.xxx.151为follower
118.144.xxx.148为follower
以上是zookeeper集群的搭建过程,具体参数的信息可以参考官方文档
kafka集群的搭建过程
kafka集群的搭建过程
首先下载kafka安装包,我们目前搭建的的kafka版本为:

如上版本,为什么没有选用最新的版本,担心最新的版本有一些未知bug影响稳定性,目前选择的这个版本相对比较稳定。
Kafka下载路径: /opt/kafka ,下载完成以后进行解压
设置配置文件
在三台机器上分别修改配置文件 /opt/kafka/config中的server.properties
配置文件如下为如下:
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
listeners=PLAINTEXT://118.144.xxx.148:9092
# The number of threads handling network requests
num.network.threads=3
# The number of threads doing disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=2
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=118.144.xxx.148:2181,118.144.xxx.151:2181,118.144.xxx.154:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
delete.topic.enable=true
以上是kafka的配置文件,其中如下代表了对应的zookeeper配置
zookeeper.connect=118.144.xxx.148:2181,118.144.xxx.151:2181,118.144.xxx.154:2181
如下两个参数是kafka删除历史消息的策略:
1、消息超过七天的进行删除 2、消息大小超过1G开始进行删除
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
设置了默认的分区和日志的的位置:
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=2
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
然后分别启动kafka即可,kafka启动方法:
在/opt/kafka目录下执行:
bin/kafka-server-start.sh –daemon config/server.properties
三台机器分别执行如上命令
验证kafka安装成功方法
1、 创建私聊消息 topic为msgTopic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic msgTopic --partitions 2 --replication-factor 2
如上参数分别制定了partition个数以及复制个数
2、 创建群聊消息topic为 groupMsgTopic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic groupMsgTopic --partitions 2 --replication-factor 2
3、 启动生产者进行生产测试
bin/kafka-console-producer.sh --broker-list 118.144.xxx.148:9092 --topic msgTopic
4、启动消费者进行消费测试
bin/kafka-console-consumer.sh --bootstrap-server 118.144.xxx.151:2181 --topic groupMsgTopic --from-beginning
5、查看对应的topic信息
bin/kafka-topics.sh --describe --zookeeper 118.144.xxx.148:2181 –topic msgTopic
以上是kafka集群的整个配置过程
如下是查看的具体信息:

网友评论