class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time) extends Logging with KafkaMetricsGroup {
// 服务器配有多个网卡,可以配置多个ip,这样可以同时监听多个端口
// endpoint封装了host、port和网络协议
// 每个endpoint创建一个对应的Acceptor
private val endpoints = config.listeners
// 在RequestChannel.requestQueue中缓存的最大请求个数
private val maxQueuedRequests = config.queuedMaxRequests
// 每个endpoint的Processor线程个数
private val numProcessorThreads = config.numNetworkThreads
// Processor线程总数
private val totalProcessorThreads = numProcessorThreads * endpoints.size
// 每个ip上能创建的最大连接数
private val maxConnectionsPerIp = config.maxConnectionsPerIp
// 手动覆盖上面指定ip的最大连接数
private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides
// Processor线程与Handler线程之间交换数据的队列
// 创建totalProcessorThreads个responseQueue队列
val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
// Processor线程的集合
private val processors = new Array[Processor](totalProcessorThreads)
// Acceptor集合
private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
// 控制每个ip上的连接数,底层维护一张 ip->连接数 的map
// 存在多个Acceptor线程并发访问底层map的场景,需要synchronized同步
private var connectionQuotas: ConnectionQuotas = _
// 当Handler线程向某个responseQueue写入数据时,唤醒对应的Processor处理
requestChannel.addResponseListener(id => processors(id).wakeup())
// 服务端网络层启动入口
def startup() {
this.synchronized {
connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
// Socket的sendBuffer和recvBuffer大小
val sendBufferSize = config.socketSendBufferBytes
val recvBufferSize = config.socketReceiveBufferBytes
var processorBeginIndex = 0
// 遍历endPoints集合
endpoints.values.foreach { endpoint =>
val protocol = endpoint.protocolType
val processorEndIndex = processorBeginIndex + numProcessorThreads
// 每个endpoint创建numProcessThread个processor
for (i <- processorBeginIndex until processorEndIndex)
processors(i) = newProcessor(i, connectionQuotas, protocol)
// 每个endpoint创建一个acceptor
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
acceptors.put(endpoint, acceptor)
// 创建acceptor线程并启动
Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start()
// 主线程阻塞等待acceptor线程启动完成
// 主线程只负责启动的初始化工作,后面干活的还是acceptor线程
acceptor.awaitStartup()
processorBeginIndex = processorEndIndex
}
}
}
}
private[kafka] class Acceptor(val endPoint: EndPoint,
val sendBufferSize: Int,
val recvBufferSize: Int,
brokerId: Int,
processors: Array[Processor],
connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
// 服务端监听连接的channel和selector由Acceptor来创建
private val nioSelector = NSelector.open() // 这个selector专门处理OP_ACCEPT
val serverChannel = openServerSocket(endPoint.host, endPoint.port)
// 为这个Acceptor创建并启动关联的Processor线程集合
// 启动Acceptor的同时就已经启动了Processor线程了
this.synchronized {
processors.foreach { processor =>
Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, endPoint.protocolType.toString, processor.id), processor, false).start()
}
}
def run() {
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
startupComplete()
try {
var currentProcessor = 0
while (isRunning) {
val ready = nioSelector.select(500)
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
val key = iter.next
iter.remove()
if (key.isAcceptable)
accept(key, processors(currentProcessor))
// round robin 负载均衡分配 Processor
currentProcessor = (currentProcessor + 1) % processors.length
}
}
}
} finally {
shutdownComplete()
}
}
def accept(key: SelectionKey, processor: Processor) {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
val socketChannel = serverSocketChannel.accept()
try {
connectionQuotas.inc(socketChannel.socket().getInetAddress)
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
socketChannel.socket().setKeepAlive(true)
socketChannel.socket().setSendBufferSize(sendBufferSize)
processor.accept(socketChannel)
} catch {
close(socketChannel)
}
}
private[kafka] class Processor(val id: Int, time: Time, maxRequestSize: Int, requestChannel: RequestChannel,
connectionQuotas: ConnectionQuotas, connectionsMaxIdleMs: Long, protocol: SecurityProtocol,
channelConfigs: java.util.Map[String, _],
metrics: Metrics) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
// 保存此processor处理的新建客户端连入的SocketChannel
private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
// 收到客户端的请求,还未发送响应的缓存
private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
// 就是KafkaSelector
private val selector = new KSelector( maxRequestSize, connectionsMaxIdleMs, metrics, time,
"socket-server", metricTags, false,
ChannelBuilders.create(protocol, Mode.SERVER, LoginType.SERVER, channelConfigs, null, true))
def accept(socketChannel: SocketChannel) {
newConnections.add(socketChannel)
// 底层通过selector.wakeup实现,唤醒processor线程处理newConnections队列
wakeup()
}
override def run() {
startupComplete()
while (isRunning) {
try {
configureNewConnections() // 处理新的客户端请求注册OP_READ
processNewResponses() // 从队列里获取response放入待发送缓存
poll() // channel阻塞select等待发送响应
// selector的pollSelectionKeys会读取客户端的请求,这里把请求放入RequestChannel队列里
processCompletedReceives()
processCompletedSends()
processDisconnected()
} catch {}
}
shutdownComplete()
}
private def configureNewConnections() {
while (!newConnections.isEmpty) {
// 从newConnections队列里获取新连接的客户端channel
val channel = newConnections.poll()
try {
// 生成connectionId,注册OP_READ; 创建KafkaChannel,加入到selector.channels里
val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
selector.register(connectionId, channel)
} catch {
close(channel)
}
}
}
private def processNewResponses() {
// 获取该processor对应的responseQueue
var curr = requestChannel.receiveResponse(id)
while (curr != null) {
try {
curr.responseAction match {
case RequestChannel.NoOpAction =>
// 不需要发送响应,这个channel重新注册OP_READ
selector.unmute(curr.request.connectionId)
case RequestChannel.SendAction =>
sendResponse(curr)
case RequestChannel.CloseConnectionAction =>
// 关闭连接
close(selector, curr.request.connectionId)
}
} finally {
// 上一条处理失败,继续获取下一条
curr = requestChannel.receiveResponse(id)
}
}
}
protected[network] def sendResponse(response: RequestChannel.Response) {
val channel = selector.channel(response.responseSend.destination)
// channel注册OP_WRITE,response写入对应channel的send缓存待发送,
selector.send(response.responseSend)
// 加入inflightResspones队列
inflightResponses += (response.request.connectionId -> response)
}
private def processCompletedReceives() {
// 遍历selector.completedReceives队列
selector.completedReceives.asScala.foreach { receive =>
try {
val channel = selector.channel(receive.source)
// 创建KafkaChannel对应的session对象,和权限有关
val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress)
// 创建RequestChannel.Request请求对象
val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
// 放入RequestChannel.requestQueue请求队列里
requestChannel.sendRequest(req)
// 取消关注OP_READ
selector.mute(receive.source)
} catch {
close(selector, receive.source)
}
}
}
private def processCompletedSends() {
selector.completedSends.asScala.foreach { send =>
// 发送完response后从在途响应队列里移除
val resp = inflightResponses.remove(send.destination)
// 添加关注OP_READ
selector.unmute(send.destination)
}
}
private def processDisconnected() {
selector.disconnected.asScala.foreach { connectionId =>
val remoteHost = ConnectionId.fromString(connectionId)
// 连接断开从在途响应队列里移除
inflightResponses.remove(connectionId)
// 管理连接数
connectionQuotas.dec(InetAddress.getByName(remoteHost))
}
}
}
// Acceptor和Processor的父类,主要是操作启动和关闭
private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging {
// startup是否完成
private val startupLatch = new CountDownLatch(1)
// shutdown是否完成
private val shutdownLatch = new CountDownLatch(1)
// 线程是否存活,shutdown会置为false
private val alive = new AtomicBoolean(true)
}
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
// 在SocketServer里添加listener,监听response队列有响应就唤醒processor,返回response给client
private var responseListeners: List[(Int) => Unit] = Nil
// 所有Processor把请求都放在这个队列,保证线程安全
// queueSize:请求缓存最大个数
private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
// 每个Processor对应一个response队列
private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
for(i <- 0 until numProcessors)
responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
// 返回response到队列里,通过listener唤醒Processor取走响应返回给clinet
def sendResponse(response: RequestChannel.Response) {
responseQueues(response.processor).put(response)
for(onResponse <- responseListeners)
onResponse(response.processor)
}
}
case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer, startTimeMs: Long, securityProtocol: SecurityProtocol) {
// 涉及到跨线程比较,所以声明volatile
@volatile var requestDequeueTimeMs = -1L
@volatile var apiLocalCompleteTimeMs = -1L
@volatile var responseCompleteTimeMs = -1L
@volatile var responseDequeueTimeMs = -1L
@volatile var apiRemoteCompleteTimeMs = -1L
val header: RequestHeader =
buffer.rewind
RequestHeader.parse(buffer)
val body: AbstractRequest =
if (header.apiKey == ApiKeys.API_VERSIONS.id && !Protocol.apiVersionSupported(header.apiKey, header.apiVersion))
new ApiVersionsRequest
else
AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
}
网友评论