美文网首页
SocketServer.scala

SocketServer.scala

作者: 上海马超23 | 来源:发表于2018-11-14 22:04 被阅读0次
class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time) extends Logging with KafkaMetricsGroup {
    // 服务器配有多个网卡,可以配置多个ip,这样可以同时监听多个端口
    // endpoint封装了host、port和网络协议
    // 每个endpoint创建一个对应的Acceptor
    private val endpoints = config.listeners
    // 在RequestChannel.requestQueue中缓存的最大请求个数
    private val maxQueuedRequests = config.queuedMaxRequests
    // 每个endpoint的Processor线程个数
    private val numProcessorThreads = config.numNetworkThreads
    // Processor线程总数
    private val totalProcessorThreads = numProcessorThreads * endpoints.size
    // 每个ip上能创建的最大连接数
    private val maxConnectionsPerIp = config.maxConnectionsPerIp
    // 手动覆盖上面指定ip的最大连接数
    private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides
    // Processor线程与Handler线程之间交换数据的队列
    // 创建totalProcessorThreads个responseQueue队列
    val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
    // Processor线程的集合
    private val processors = new Array[Processor](totalProcessorThreads)
    // Acceptor集合
    private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
    // 控制每个ip上的连接数,底层维护一张 ip->连接数 的map
    // 存在多个Acceptor线程并发访问底层map的场景,需要synchronized同步
    private var connectionQuotas: ConnectionQuotas = _

    // 当Handler线程向某个responseQueue写入数据时,唤醒对应的Processor处理
    requestChannel.addResponseListener(id => processors(id).wakeup())
    
    // 服务端网络层启动入口
    def startup() {
      this.synchronized {

        connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
        // Socket的sendBuffer和recvBuffer大小
        val sendBufferSize = config.socketSendBufferBytes
        val recvBufferSize = config.socketReceiveBufferBytes
        var processorBeginIndex = 0

        // 遍历endPoints集合
        endpoints.values.foreach { endpoint =>
          val protocol = endpoint.protocolType
          val processorEndIndex = processorBeginIndex + numProcessorThreads

          // 每个endpoint创建numProcessThread个processor
          for (i <- processorBeginIndex until processorEndIndex)
            processors(i) = newProcessor(i, connectionQuotas, protocol)
          // 每个endpoint创建一个acceptor
          val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
            processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
          acceptors.put(endpoint, acceptor)
          // 创建acceptor线程并启动
          Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start()
          // 主线程阻塞等待acceptor线程启动完成
          // 主线程只负责启动的初始化工作,后面干活的还是acceptor线程
          acceptor.awaitStartup()

          processorBeginIndex = processorEndIndex
        }
      }
    }
}

private[kafka] class Acceptor(val endPoint: EndPoint,
                              val sendBufferSize: Int,
                              val recvBufferSize: Int,
                              brokerId: Int,
                              processors: Array[Processor],
                              connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
  // 服务端监听连接的channel和selector由Acceptor来创建
  private val nioSelector = NSelector.open() // 这个selector专门处理OP_ACCEPT
  val serverChannel = openServerSocket(endPoint.host, endPoint.port)

  // 为这个Acceptor创建并启动关联的Processor线程集合
  // 启动Acceptor的同时就已经启动了Processor线程了
  this.synchronized {
    processors.foreach { processor =>
      Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, endPoint.protocolType.toString, processor.id), processor, false).start()
    }
  }

  def run() {
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    startupComplete()
    try {
      var currentProcessor = 0
      while (isRunning) {
          val ready = nioSelector.select(500)
          if (ready > 0) {
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            while (iter.hasNext && isRunning) {
                val key = iter.next
                iter.remove()
                if (key.isAcceptable)
                  accept(key, processors(currentProcessor))
                // round robin 负载均衡分配 Processor
                currentProcessor = (currentProcessor + 1) % processors.length
            }
          }
      }
    } finally {
      shutdownComplete()
    }
  }

  def accept(key: SelectionKey, processor: Processor) {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    val socketChannel = serverSocketChannel.accept()
    try {
      connectionQuotas.inc(socketChannel.socket().getInetAddress)
      socketChannel.configureBlocking(false)
      socketChannel.socket().setTcpNoDelay(true)
      socketChannel.socket().setKeepAlive(true)
      socketChannel.socket().setSendBufferSize(sendBufferSize)

      processor.accept(socketChannel)
    } catch {
        close(socketChannel)
    }
  }

  private[kafka] class Processor(val id: Int, time: Time, maxRequestSize: Int, requestChannel: RequestChannel,
                               connectionQuotas: ConnectionQuotas, connectionsMaxIdleMs: Long, protocol: SecurityProtocol,
                               channelConfigs: java.util.Map[String, _],
                               metrics: Metrics) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
    // 保存此processor处理的新建客户端连入的SocketChannel
    private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
    // 收到客户端的请求,还未发送响应的缓存
    private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
    // 就是KafkaSelector
    private val selector = new KSelector( maxRequestSize, connectionsMaxIdleMs, metrics, time,
                            "socket-server", metricTags, false,
                            ChannelBuilders.create(protocol, Mode.SERVER, LoginType.SERVER, channelConfigs, null, true))

    def accept(socketChannel: SocketChannel) {
      newConnections.add(socketChannel)
      // 底层通过selector.wakeup实现,唤醒processor线程处理newConnections队列
      wakeup()
    }

    override def run() {
      startupComplete()
      while (isRunning) {
        try {
          configureNewConnections() // 处理新的客户端请求注册OP_READ
          processNewResponses() // 从队列里获取response放入待发送缓存
          poll() // channel阻塞select等待发送响应
          // selector的pollSelectionKeys会读取客户端的请求,这里把请求放入RequestChannel队列里 
          processCompletedReceives() 
          processCompletedSends()
          processDisconnected()
        } catch {}
      }
      shutdownComplete()
    }

    private def configureNewConnections() {
      while (!newConnections.isEmpty) {
        // 从newConnections队列里获取新连接的客户端channel
        val channel = newConnections.poll()
        try {
          // 生成connectionId,注册OP_READ; 创建KafkaChannel,加入到selector.channels里
          val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
          selector.register(connectionId, channel)
        } catch {
            close(channel)
        }
      }
  }

  private def processNewResponses() {
    // 获取该processor对应的responseQueue
    var curr = requestChannel.receiveResponse(id)
    while (curr != null) {
      try {
        curr.responseAction match {
          case RequestChannel.NoOpAction =>
            // 不需要发送响应,这个channel重新注册OP_READ
            selector.unmute(curr.request.connectionId)
          case RequestChannel.SendAction =>
            sendResponse(curr)
          case RequestChannel.CloseConnectionAction =>
            // 关闭连接
            close(selector, curr.request.connectionId)
        }
      } finally {
        // 上一条处理失败,继续获取下一条
        curr = requestChannel.receiveResponse(id)
      }
    }
  }

  protected[network] def sendResponse(response: RequestChannel.Response) {
    val channel = selector.channel(response.responseSend.destination)
    // channel注册OP_WRITE,response写入对应channel的send缓存待发送,
    selector.send(response.responseSend)
    // 加入inflightResspones队列
    inflightResponses += (response.request.connectionId -> response)
  }

  private def processCompletedReceives() {
    // 遍历selector.completedReceives队列
    selector.completedReceives.asScala.foreach { receive =>
      try {
        val channel = selector.channel(receive.source)
        // 创建KafkaChannel对应的session对象,和权限有关
        val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress)
        // 创建RequestChannel.Request请求对象
        val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
        // 放入RequestChannel.requestQueue请求队列里
        requestChannel.sendRequest(req)
        // 取消关注OP_READ
        selector.mute(receive.source)
      } catch {
          close(selector, receive.source)
      }
    }
  }

  private def processCompletedSends() {
    selector.completedSends.asScala.foreach { send =>
      // 发送完response后从在途响应队列里移除
      val resp = inflightResponses.remove(send.destination)
      // 添加关注OP_READ
      selector.unmute(send.destination)
    }
  }

  private def processDisconnected() {
    selector.disconnected.asScala.foreach { connectionId =>
      val remoteHost = ConnectionId.fromString(connectionId)
      // 连接断开从在途响应队列里移除
      inflightResponses.remove(connectionId)
      // 管理连接数
      connectionQuotas.dec(InetAddress.getByName(remoteHost))
    }
  }
}

// Acceptor和Processor的父类,主要是操作启动和关闭
private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging {
  // startup是否完成
  private val startupLatch = new CountDownLatch(1)
  // shutdown是否完成
  private val shutdownLatch = new CountDownLatch(1)
  // 线程是否存活,shutdown会置为false
  private val alive = new AtomicBoolean(true)
}

class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
  // 在SocketServer里添加listener,监听response队列有响应就唤醒processor,返回response给client
  private var responseListeners: List[(Int) => Unit] = Nil
  // 所有Processor把请求都放在这个队列,保证线程安全
  // queueSize:请求缓存最大个数
  private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
  // 每个Processor对应一个response队列
  private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
  for(i <- 0 until numProcessors)
    responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()

  // 返回response到队列里,通过listener唤醒Processor取走响应返回给clinet
  def sendResponse(response: RequestChannel.Response) {
    responseQueues(response.processor).put(response)
    for(onResponse <- responseListeners)
      onResponse(response.processor)
  }
}

case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer, startTimeMs: Long, securityProtocol: SecurityProtocol) {
    // 涉及到跨线程比较,所以声明volatile
    @volatile var requestDequeueTimeMs = -1L
    @volatile var apiLocalCompleteTimeMs = -1L
    @volatile var responseCompleteTimeMs = -1L
    @volatile var responseDequeueTimeMs = -1L
    @volatile var apiRemoteCompleteTimeMs = -1L

    val header: RequestHeader =
        buffer.rewind
        RequestHeader.parse(buffer)

    val body: AbstractRequest =
        if (header.apiKey == ApiKeys.API_VERSIONS.id && !Protocol.apiVersionSupported(header.apiKey, header.apiVersion))
          new ApiVersionsRequest
        else
          AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
}

相关文章

网友评论

      本文标题:SocketServer.scala

      本文链接:https://www.haomeiwen.com/subject/simsfqtx.html