美文网首页Android
ConnectInterceptor

ConnectInterceptor

作者: 大佬的上半生 | 来源:发表于2019-10-24 17:21 被阅读0次

    ConnectInterceptor负责网络连接过滤器

    object ConnectInterceptor : Interceptor {
    
      @Throws(IOException::class)
      override fun intercept(chain: Interceptor.Chain): Response {
        val realChain = chain as RealInterceptorChain
        val request = realChain.request()
        val transmitter = realChain.transmitter()
    
        // We need the network to satisfy this request. Possibly for validating a conditional GET.
        val doExtensiveHealthChecks = request.method != "GET"
        val exchange = transmitter.newExchange(chain, doExtensiveHealthChecks)
    
        return realChain.proceed(request, transmitter, exchange)
      }
    }
    
    拦截
    intercept(chain: Interceptor.Chain)
    
    分析transmitter()

    创建连接对象,释放连接资源,取消连接超时等

     val transmitter = realChain.transmitter()
    
    class Transmitter(
      private val client: OkHttpClient,
      private val call: Call
    ) {
    //连接池
      private val connectionPool: RealConnectionPool = client.connectionPool.delegate
    //监听对象
      private val eventListener: EventListener = client.eventListenerFactory.create(call)
      private val timeout = object : AsyncTimeout() {
        override fun timedOut() {
          cancel()
        }
      }.apply {
        timeout(client.callTimeoutMillis.toLong(), MILLISECONDS)
      }
    
      private var callStackTrace: Any? = null
    
      private var request: Request? = null
      private var exchangeFinder: ExchangeFinder? = null
    
      // Guarded by connectionPool.
      var connection: RealConnection? = null
      private var exchange: Exchange? = null
      private var exchangeRequestDone = false
      private var exchangeResponseDone = false
      private var canceled = false
      private var timeoutEarlyExit = false
      private var noMoreExchanges = false
    
      val isCanceled: Boolean
        get() {
          synchronized(connectionPool) {
            return canceled
          }
        }
    
      fun timeout(): Timeout = timeout
    
      fun timeoutEnter() {
        timeout.enter()
      }
    
      //在呼叫完全完成之前停止应用超时。 这用于WebSockets
        和双工调用,其中超时仅适用于初始设置。
      fun timeoutEarlyExit() {
        check(!timeoutEarlyExit)
        timeoutEarlyExit = true
        timeout.exit()
      }
    
      private fun <E : IOException?> timeoutExit(cause: E): E {
        if (timeoutEarlyExit) return cause
        if (!timeout.exit()) return cause
    
        val e = InterruptedIOException("timeout")
        if (cause != null) e.initCause(cause)
        @Suppress("UNCHECKED_CAST") // E is either IOException or IOException?
        return e as E
      }
    
      fun callStart() {
        this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()")
        eventListener.callStart(call)
      }
    
      //装备一个请求,有的话 则不需要新建立
      fun prepareToConnect(request: Request) {
        if (this.request != null) {
          if (this.request!!.url.canReuseConnectionFor(request.url) && exchangeFinder!!.hasRouteToTry()) {
            return // Already ready.
          }
          check(exchange == null)
    
          if (exchangeFinder != null) {
            maybeReleaseConnection(null, true)
            exchangeFinder = null
          }
        }
    
        this.request = request
        this.exchangeFinder = ExchangeFinder(
            this, connectionPool, createAddress(request.url), call, eventListener)
      }
      //创建Address
      private fun createAddress(url: HttpUrl): Address {
        var sslSocketFactory: SSLSocketFactory? = null
        var hostnameVerifier: HostnameVerifier? = null
        var certificatePinner: CertificatePinner? = null
    //判断是否后Https
        if (url.isHttps) {
          sslSocketFactory = client.sslSocketFactory
          hostnameVerifier = client.hostnameVerifier
          certificatePinner = client.certificatePinner
        }
    
        return Address(url.host, url.port, client.dns, client.socketFactory,
            sslSocketFactory, hostnameVerifier, certificatePinner, client.proxyAuthenticator,
            client.proxy, client.protocols, client.connectionSpecs, client.proxySelector)
      }
    
      //获取一个新的请求对,request,response
      internal fun newExchange(chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean): Exchange {
        synchronized(connectionPool) {
          check(!noMoreExchanges) { "released" }
          check(exchange == null) {
            "cannot make a new request because the previous response is still open: " +
                "please call response.close()"
          }
        }
      //查找连接
        val codec = exchangeFinder!!.find(client, chain, doExtensiveHealthChecks)
        val result = Exchange(this, call, eventListener, exchangeFinder!!, codec)
    
        synchronized(connectionPool) {
          this.exchange = result
          this.exchangeRequestDone = false
          this.exchangeResponseDone = false
          return result
        }
      }
    
      fun acquireConnectionNoEvents(connection: RealConnection) {
        assert(Thread.holdsLock(connectionPool))
    
        check(this.connection == null)
        this.connection = connection
        connection.transmitters.add(TransmitterReference(this, callStackTrace))
      }
    
      //从连接的分配列表中删除发射机。 返回调用者应该关闭的套接字
      fun releaseConnectionNoEvents(): Socket? {
        assert(Thread.holdsLock(connectionPool))
    
        val index = connection!!.transmitters.indexOfFirst { it.get() == this@Transmitter }
        check(index != -1)
    
        val released = this.connection
        released!!.transmitters.removeAt(index)
        this.connection = null
    
        if (released.transmitters.isEmpty()) {
          released.idleAtNanos = System.nanoTime()
          if (connectionPool.connectionBecameIdle(released)) {
            return released.socket()
          }
        }
    
        return null
      }
    
      fun exchangeDoneDueToException() {
        synchronized(connectionPool) {
          check(!noMoreExchanges)
          exchange = null
        }
      }
    
      //释放随请求或响应而持有的资源
      internal fun <E : IOException?> exchangeMessageDone(
        exchange: Exchange,
        requestDone: Boolean,
        responseDone: Boolean,
        e: E
      ): E {
        var result = e
        var exchangeDone = false
        synchronized(connectionPool) {
          if (exchange != this.exchange) {
            return result // This exchange was detached violently!
          }
          var changed = false
          if (requestDone) {
            if (!exchangeRequestDone) changed = true
            this.exchangeRequestDone = true
          }
          if (responseDone) {
            if (!exchangeResponseDone) changed = true
            this.exchangeResponseDone = true
          }
          if (exchangeRequestDone && exchangeResponseDone && changed) {
            exchangeDone = true
            this.exchange!!.connection()!!.successCount++
            this.exchange = null
          }
        }
        if (exchangeDone) {
          result = maybeReleaseConnection(result, false)
        }
        return result
      }
    
      fun noMoreExchanges(e: IOException?): IOException? {
        synchronized(connectionPool) {
          noMoreExchanges = true
        }
        return maybeReleaseConnection(e, false)
      }
    
      //释放连接
      private fun <E : IOException?> maybeReleaseConnection(e: E, force: Boolean): E {
        var result = e
        val socket: Socket?
        var releasedConnection: Connection?
        val callEnd: Boolean
        synchronized(connectionPool) {
          check(!force || exchange == null) { "cannot release connection while it is in use" }
          releasedConnection = this.connection
          socket = if (this.connection != null && exchange == null && (force || noMoreExchanges)) {
            releaseConnectionNoEvents()
          } else {
            null
          }
          if (this.connection != null) releasedConnection = null
          callEnd = noMoreExchanges && exchange == null
        }
        socket?.closeQuietly()
    
        if (releasedConnection != null) {
          eventListener.connectionReleased(call, releasedConnection!!)
        }
    
        if (callEnd) {
          val callFailed = result != null
          result = timeoutExit(result)
          if (callFailed) {
            eventListener.callFailed(call, result!!)
          } else {
            eventListener.callEnd(call)
          }
        }
        return result
      }
      //取消重试
      fun canRetry(): Boolean {
        return exchangeFinder!!.hasStreamFailure() && exchangeFinder!!.hasRouteToTry()
      }
    
      fun hasExchange(): Boolean {
        synchronized(connectionPool) {
          return exchange != null
        }
      }
    
      //取消连接
      fun cancel() {
        val exchangeToCancel: Exchange?
        val connectionToCancel: RealConnection?
        synchronized(connectionPool) {
          canceled = true
          exchangeToCancel = exchange
          connectionToCancel = exchangeFinder?.connectingConnection() ?: connection
        }
        exchangeToCancel?.cancel() ?: connectionToCancel?.cancel()
      }
    
      internal class TransmitterReference(
        referent: Transmitter,
        //
        val callStackTrace: Any?
      ) : WeakReference<Transmitter>(referent)
    }
    
    建立连接newExchange(chain, doExtensiveHealthChecks)
      //获取一个新的请求对,request,reponse
      internal fun newExchange(chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean): Exchange {
        synchronized(connectionPool) {
          check(!noMoreExchanges) { "released" }
          check(exchange == null) {
            "cannot make a new request because the previous response is still open: " +
                "please call response.close()"
          }
        }
    
        val codec = exchangeFinder!!.find(client, chain, doExtensiveHealthChecks)
        val result = Exchange(this, call, eventListener, exchangeFinder!!, codec)
    
        synchronized(connectionPool) {
          this.exchange = result
          this.exchangeRequestDone = false
          this.exchangeResponseDone = false
          return result
        }
      }
    

    find()获取HttpCoder

    fun find(
        client: OkHttpClient,
        chain: Interceptor.Chain,
        doExtensiveHealthChecks: Boolean
      ): ExchangeCodec {
      //获取参数
        val connectTimeout = chain.connectTimeoutMillis()
        val readTimeout = chain.readTimeoutMillis()
        val writeTimeout = chain.writeTimeoutMillis()
        val pingIntervalMillis = client.pingIntervalMillis
        val connectionRetryEnabled = client.retryOnConnectionFailure
    
        try {
      //获取一个健康的连接
          val resultConnection = findHealthyConnection(
              connectTimeout = connectTimeout,
              readTimeout = readTimeout,
              writeTimeout = writeTimeout,
              pingIntervalMillis = pingIntervalMillis,
              connectionRetryEnabled = connectionRetryEnabled,
              doExtensiveHealthChecks = doExtensiveHealthChecks
          )
      //返回结果
          return resultConnection.newCodec(client, chain)
        } catch (e: RouteException) {
          trackFailure()
          throw e
        } catch (e: IOException) {
          trackFailure()
          throw RouteException(e)
        }
      }
    

    获取一个健康的连接

     private fun findHealthyConnection(
        connectTimeout: Int,
        readTimeout: Int,
        writeTimeout: Int,
        pingIntervalMillis: Int,
        connectionRetryEnabled: Boolean,
        doExtensiveHealthChecks: Boolean
      ): RealConnection {
    //循环任务,知道获取到位置
        while (true) {
          val candidate = findConnection(
              connectTimeout = connectTimeout,
              readTimeout = readTimeout,
              writeTimeout = writeTimeout,
              pingIntervalMillis = pingIntervalMillis,
              connectionRetryEnabled = connectionRetryEnabled
          )
    
          //如果这是一个全新的连接,我们可以跳过广泛的健康检查。
          synchronized(connectionPool) {
            if (candidate.successCount == 0) {
              return candidate
            }
          }
    
          //执行(可能很慢)检查以确认池连接仍然良好。 如果它
         // 不是,把它从池中取出然后重新开始。
          if (!candidate.isHealthy(doExtensiveHealthChecks)) {
            candidate.noNewExchanges()
            continue
          }
    
          return candidate
        }
      }
    
    
    //返回连接。 如果存在连接,则优先选择现有连接,
     //如果不存在从连接池,建立一个新的连接。
      @Throws(IOException::class)
      private fun findConnection(
        connectTimeout: Int,
        readTimeout: Int,
        writeTimeout: Int,
        pingIntervalMillis: Int,
        connectionRetryEnabled: Boolean
      ): RealConnection {
        var foundPooledConnection = false
        var result: RealConnection? = null
        var selectedRoute: Route? = null
        var releasedConnection: RealConnection?
        val toClose: Socket?
        synchronized(connectionPool) {
          if (transmitter.isCanceled) throw IOException("Canceled")
          hasStreamFailure = false // This is a fresh attempt.
      //连接
          releasedConnection = transmitter.connection
    //获取是否连接成功
          toClose = if (transmitter.connection != null && transmitter.connection!!.noNewExchanges) {
            transmitter.releaseConnectionNoEvents()
          } else {
            null
          }
    
          if (transmitter.connection != null) {
            // 已经分配了一个连接
            result = transmitter.connection
            releasedConnection = null
          }
    
          if (result == null) {
            // Attempt to get a connection from the pool.
            if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
              foundPooledConnection = true
              result = transmitter.connection
            } else if (nextRouteToTry != null) {
              selectedRoute = nextRouteToTry
              nextRouteToTry = null
            } else if (retryCurrentRoute()) {
              selectedRoute = transmitter.connection!!.route()
            }
          }
        }
        toClose?.closeQuietly()
    
        if (releasedConnection != null) {
          eventListener.connectionReleased(call, releasedConnection!!)
        }
        if (foundPooledConnection) {
          eventListener.connectionAcquired(call, result!!)
        }
        if (result != null) {
          // 如果我们找到已经分配或池化的连接,我们就完成了。
          return result!!
        }
    
        //选择路由
        var newRouteSelection = false
        if (selectedRoute == null && (routeSelection == null || !routeSelection!!.hasNext())) {
          newRouteSelection = true
          routeSelection = routeSelector.next()
        }
    
        var routes: List<Route>? = null
        synchronized(connectionPool) {
          if (transmitter.isCanceled) throw IOException("Canceled")
    
          if (newRouteSelection) {
            // 拿到IP开始连接匹配
            routes = routeSelection!!.routes
            if (connectionPool.transmitterAcquirePooledConnection(
                    address, transmitter, routes, false)) {
              foundPooledConnection = true
              result = transmitter.connection
            }
          }
    
          if (!foundPooledConnection) {
            if (selectedRoute == null) {
              selectedRoute = routeSelection!!.next()
            }
    
      
    //创建连接并立即将其分配给此分配。 这使它成为可能
            //  对于异步取消()来中断我们即将要做的握手。
            result = RealConnection(connectionPool, selectedRoute!!)
            connectingConnection = result
          }
        }
    
        // 如果我们第二次发现了汇集连接,我们就完成了。
        if (foundPooledConnection) {
          eventListener.connectionAcquired(call, result!!)
          return result!!
        }
    
        // 做TCP + TLS握手。 这是一个阻止操作。
        result!!.connect(
            connectTimeout,
            readTimeout,
            writeTimeout,
            pingIntervalMillis,
            connectionRetryEnabled,
            call,
            eventListener
        )
        connectionPool.routeDatabase.connected(result!!.route())
    
        var socket: Socket? = null
        synchronized(connectionPool) {
          connectingConnection = null
         //上次尝试连接合并,只有在我们尝试多次时才会发生
            //与同一主机的并发连接。
          if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
            // 关闭创建的连接,返回连接池的连接
            result!!.noNewExchanges = true
            socket = result!!.socket()
            result = transmitter.connection
    
            // 我们可以获得立即不健康的合并连接。 在
              //在这种情况下,我们将重试我们刚刚成功连接的路线。
            nextRouteToTry = selectedRoute
          } else {
            connectionPool.put(result!!)
            transmitter.acquireConnectionNoEvents(result!!)
          }
        }
        socket?.closeQuietly()
    
        eventListener.connectionAcquired(call, result!!)
        return result!!
      }
    

    相关文章

      网友评论

        本文标题:ConnectInterceptor

        本文链接:https://www.haomeiwen.com/subject/wtosyctx.html