ConnectInterceptor负责网络连接过滤器
object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val request = realChain.request()
val transmitter = realChain.transmitter()
// We need the network to satisfy this request. Possibly for validating a conditional GET.
val doExtensiveHealthChecks = request.method != "GET"
val exchange = transmitter.newExchange(chain, doExtensiveHealthChecks)
return realChain.proceed(request, transmitter, exchange)
}
}
拦截
intercept(chain: Interceptor.Chain)
分析transmitter()
创建连接对象,释放连接资源,取消连接超时等
val transmitter = realChain.transmitter()
class Transmitter(
private val client: OkHttpClient,
private val call: Call
) {
//连接池
private val connectionPool: RealConnectionPool = client.connectionPool.delegate
//监听对象
private val eventListener: EventListener = client.eventListenerFactory.create(call)
private val timeout = object : AsyncTimeout() {
override fun timedOut() {
cancel()
}
}.apply {
timeout(client.callTimeoutMillis.toLong(), MILLISECONDS)
}
private var callStackTrace: Any? = null
private var request: Request? = null
private var exchangeFinder: ExchangeFinder? = null
// Guarded by connectionPool.
var connection: RealConnection? = null
private var exchange: Exchange? = null
private var exchangeRequestDone = false
private var exchangeResponseDone = false
private var canceled = false
private var timeoutEarlyExit = false
private var noMoreExchanges = false
val isCanceled: Boolean
get() {
synchronized(connectionPool) {
return canceled
}
}
fun timeout(): Timeout = timeout
fun timeoutEnter() {
timeout.enter()
}
//在呼叫完全完成之前停止应用超时。 这用于WebSockets
和双工调用,其中超时仅适用于初始设置。
fun timeoutEarlyExit() {
check(!timeoutEarlyExit)
timeoutEarlyExit = true
timeout.exit()
}
private fun <E : IOException?> timeoutExit(cause: E): E {
if (timeoutEarlyExit) return cause
if (!timeout.exit()) return cause
val e = InterruptedIOException("timeout")
if (cause != null) e.initCause(cause)
@Suppress("UNCHECKED_CAST") // E is either IOException or IOException?
return e as E
}
fun callStart() {
this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()")
eventListener.callStart(call)
}
//装备一个请求,有的话 则不需要新建立
fun prepareToConnect(request: Request) {
if (this.request != null) {
if (this.request!!.url.canReuseConnectionFor(request.url) && exchangeFinder!!.hasRouteToTry()) {
return // Already ready.
}
check(exchange == null)
if (exchangeFinder != null) {
maybeReleaseConnection(null, true)
exchangeFinder = null
}
}
this.request = request
this.exchangeFinder = ExchangeFinder(
this, connectionPool, createAddress(request.url), call, eventListener)
}
//创建Address
private fun createAddress(url: HttpUrl): Address {
var sslSocketFactory: SSLSocketFactory? = null
var hostnameVerifier: HostnameVerifier? = null
var certificatePinner: CertificatePinner? = null
//判断是否后Https
if (url.isHttps) {
sslSocketFactory = client.sslSocketFactory
hostnameVerifier = client.hostnameVerifier
certificatePinner = client.certificatePinner
}
return Address(url.host, url.port, client.dns, client.socketFactory,
sslSocketFactory, hostnameVerifier, certificatePinner, client.proxyAuthenticator,
client.proxy, client.protocols, client.connectionSpecs, client.proxySelector)
}
//获取一个新的请求对,request,response
internal fun newExchange(chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean): Exchange {
synchronized(connectionPool) {
check(!noMoreExchanges) { "released" }
check(exchange == null) {
"cannot make a new request because the previous response is still open: " +
"please call response.close()"
}
}
//查找连接
val codec = exchangeFinder!!.find(client, chain, doExtensiveHealthChecks)
val result = Exchange(this, call, eventListener, exchangeFinder!!, codec)
synchronized(connectionPool) {
this.exchange = result
this.exchangeRequestDone = false
this.exchangeResponseDone = false
return result
}
}
fun acquireConnectionNoEvents(connection: RealConnection) {
assert(Thread.holdsLock(connectionPool))
check(this.connection == null)
this.connection = connection
connection.transmitters.add(TransmitterReference(this, callStackTrace))
}
//从连接的分配列表中删除发射机。 返回调用者应该关闭的套接字
fun releaseConnectionNoEvents(): Socket? {
assert(Thread.holdsLock(connectionPool))
val index = connection!!.transmitters.indexOfFirst { it.get() == this@Transmitter }
check(index != -1)
val released = this.connection
released!!.transmitters.removeAt(index)
this.connection = null
if (released.transmitters.isEmpty()) {
released.idleAtNanos = System.nanoTime()
if (connectionPool.connectionBecameIdle(released)) {
return released.socket()
}
}
return null
}
fun exchangeDoneDueToException() {
synchronized(connectionPool) {
check(!noMoreExchanges)
exchange = null
}
}
//释放随请求或响应而持有的资源
internal fun <E : IOException?> exchangeMessageDone(
exchange: Exchange,
requestDone: Boolean,
responseDone: Boolean,
e: E
): E {
var result = e
var exchangeDone = false
synchronized(connectionPool) {
if (exchange != this.exchange) {
return result // This exchange was detached violently!
}
var changed = false
if (requestDone) {
if (!exchangeRequestDone) changed = true
this.exchangeRequestDone = true
}
if (responseDone) {
if (!exchangeResponseDone) changed = true
this.exchangeResponseDone = true
}
if (exchangeRequestDone && exchangeResponseDone && changed) {
exchangeDone = true
this.exchange!!.connection()!!.successCount++
this.exchange = null
}
}
if (exchangeDone) {
result = maybeReleaseConnection(result, false)
}
return result
}
fun noMoreExchanges(e: IOException?): IOException? {
synchronized(connectionPool) {
noMoreExchanges = true
}
return maybeReleaseConnection(e, false)
}
//释放连接
private fun <E : IOException?> maybeReleaseConnection(e: E, force: Boolean): E {
var result = e
val socket: Socket?
var releasedConnection: Connection?
val callEnd: Boolean
synchronized(connectionPool) {
check(!force || exchange == null) { "cannot release connection while it is in use" }
releasedConnection = this.connection
socket = if (this.connection != null && exchange == null && (force || noMoreExchanges)) {
releaseConnectionNoEvents()
} else {
null
}
if (this.connection != null) releasedConnection = null
callEnd = noMoreExchanges && exchange == null
}
socket?.closeQuietly()
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection!!)
}
if (callEnd) {
val callFailed = result != null
result = timeoutExit(result)
if (callFailed) {
eventListener.callFailed(call, result!!)
} else {
eventListener.callEnd(call)
}
}
return result
}
//取消重试
fun canRetry(): Boolean {
return exchangeFinder!!.hasStreamFailure() && exchangeFinder!!.hasRouteToTry()
}
fun hasExchange(): Boolean {
synchronized(connectionPool) {
return exchange != null
}
}
//取消连接
fun cancel() {
val exchangeToCancel: Exchange?
val connectionToCancel: RealConnection?
synchronized(connectionPool) {
canceled = true
exchangeToCancel = exchange
connectionToCancel = exchangeFinder?.connectingConnection() ?: connection
}
exchangeToCancel?.cancel() ?: connectionToCancel?.cancel()
}
internal class TransmitterReference(
referent: Transmitter,
//
val callStackTrace: Any?
) : WeakReference<Transmitter>(referent)
}
建立连接newExchange(chain, doExtensiveHealthChecks)
//获取一个新的请求对,request,reponse
internal fun newExchange(chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean): Exchange {
synchronized(connectionPool) {
check(!noMoreExchanges) { "released" }
check(exchange == null) {
"cannot make a new request because the previous response is still open: " +
"please call response.close()"
}
}
val codec = exchangeFinder!!.find(client, chain, doExtensiveHealthChecks)
val result = Exchange(this, call, eventListener, exchangeFinder!!, codec)
synchronized(connectionPool) {
this.exchange = result
this.exchangeRequestDone = false
this.exchangeResponseDone = false
return result
}
}
find()获取HttpCoder
fun find(
client: OkHttpClient,
chain: Interceptor.Chain,
doExtensiveHealthChecks: Boolean
): ExchangeCodec {
//获取参数
val connectTimeout = chain.connectTimeoutMillis()
val readTimeout = chain.readTimeoutMillis()
val writeTimeout = chain.writeTimeoutMillis()
val pingIntervalMillis = client.pingIntervalMillis
val connectionRetryEnabled = client.retryOnConnectionFailure
try {
//获取一个健康的连接
val resultConnection = findHealthyConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled,
doExtensiveHealthChecks = doExtensiveHealthChecks
)
//返回结果
return resultConnection.newCodec(client, chain)
} catch (e: RouteException) {
trackFailure()
throw e
} catch (e: IOException) {
trackFailure()
throw RouteException(e)
}
}
获取一个健康的连接
private fun findHealthyConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
doExtensiveHealthChecks: Boolean
): RealConnection {
//循环任务,知道获取到位置
while (true) {
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)
//如果这是一个全新的连接,我们可以跳过广泛的健康检查。
synchronized(connectionPool) {
if (candidate.successCount == 0) {
return candidate
}
}
//执行(可能很慢)检查以确认池连接仍然良好。 如果它
// 不是,把它从池中取出然后重新开始。
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
candidate.noNewExchanges()
continue
}
return candidate
}
}
//返回连接。 如果存在连接,则优先选择现有连接,
//如果不存在从连接池,建立一个新的连接。
@Throws(IOException::class)
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
var foundPooledConnection = false
var result: RealConnection? = null
var selectedRoute: Route? = null
var releasedConnection: RealConnection?
val toClose: Socket?
synchronized(connectionPool) {
if (transmitter.isCanceled) throw IOException("Canceled")
hasStreamFailure = false // This is a fresh attempt.
//连接
releasedConnection = transmitter.connection
//获取是否连接成功
toClose = if (transmitter.connection != null && transmitter.connection!!.noNewExchanges) {
transmitter.releaseConnectionNoEvents()
} else {
null
}
if (transmitter.connection != null) {
// 已经分配了一个连接
result = transmitter.connection
releasedConnection = null
}
if (result == null) {
// Attempt to get a connection from the pool.
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
foundPooledConnection = true
result = transmitter.connection
} else if (nextRouteToTry != null) {
selectedRoute = nextRouteToTry
nextRouteToTry = null
} else if (retryCurrentRoute()) {
selectedRoute = transmitter.connection!!.route()
}
}
}
toClose?.closeQuietly()
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection!!)
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result!!)
}
if (result != null) {
// 如果我们找到已经分配或池化的连接,我们就完成了。
return result!!
}
//选择路由
var newRouteSelection = false
if (selectedRoute == null && (routeSelection == null || !routeSelection!!.hasNext())) {
newRouteSelection = true
routeSelection = routeSelector.next()
}
var routes: List<Route>? = null
synchronized(connectionPool) {
if (transmitter.isCanceled) throw IOException("Canceled")
if (newRouteSelection) {
// 拿到IP开始连接匹配
routes = routeSelection!!.routes
if (connectionPool.transmitterAcquirePooledConnection(
address, transmitter, routes, false)) {
foundPooledConnection = true
result = transmitter.connection
}
}
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection!!.next()
}
//创建连接并立即将其分配给此分配。 这使它成为可能
// 对于异步取消()来中断我们即将要做的握手。
result = RealConnection(connectionPool, selectedRoute!!)
connectingConnection = result
}
}
// 如果我们第二次发现了汇集连接,我们就完成了。
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result!!)
return result!!
}
// 做TCP + TLS握手。 这是一个阻止操作。
result!!.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
connectionPool.routeDatabase.connected(result!!.route())
var socket: Socket? = null
synchronized(connectionPool) {
connectingConnection = null
//上次尝试连接合并,只有在我们尝试多次时才会发生
//与同一主机的并发连接。
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
// 关闭创建的连接,返回连接池的连接
result!!.noNewExchanges = true
socket = result!!.socket()
result = transmitter.connection
// 我们可以获得立即不健康的合并连接。 在
//在这种情况下,我们将重试我们刚刚成功连接的路线。
nextRouteToTry = selectedRoute
} else {
connectionPool.put(result!!)
transmitter.acquireConnectionNoEvents(result!!)
}
}
socket?.closeQuietly()
eventListener.connectionAcquired(call, result!!)
return result!!
}
网友评论