1,启动入口
1)bin/kafka-server-start.sh config/server-1.properties &
image.png
2)启动类Kafka.scala的main方法。
解析配置文件
构造KafkaServerStartable对象
3)初始化组件
创建zk客户端
,cluster_id和brokerId处理
,启动定时任务线程池
image.png
4)创建并启动socket服务器,处理客户端socket连接。Create and start the socket server acceptor threads
image.png
2,配置文件简介
1) image.png
2)brokerStates
image.png
3,SocketServer重点类
1)RequestChannel由ArrayBlockingQueue的requestQueue和一个ConcurrentHashMap的processors构成。
image.png
2)socketServer.startUp时,会创建Acceptor和processors
createAcceptorAndProcessors
通过listeners=PLAINTEXT://:9091
创建Acceptor
通过num.network.threads=3
创建3个Processor
把processor加入SocketServer.RequestChannel
中requestChannel.addProcessor(processor)
SocketServer的processors
放入Processor。 image.png
3)将processors加入到Acceptor中,启动多个kafka-network-thread
线程。
image.png
image.png
4,kafka通信机制
1)SocketServer处理过程
image.png
1个Acceptor负责接受客户端请求,N个Processor线程负责读写数据,M个Handler来处理业务逻辑。在Acceptor和Processor,Processor和Handler之间都有队列来缓冲请求。
网友评论