美文网首页
OkHttp源码分析(一)

OkHttp源码分析(一)

作者: 代码我写的怎么 | 来源:发表于2022-12-30 14:56 被阅读0次

文章中源码的OkHttp版本为4.10.0

implementation 'com.squareup.okhttp3:okhttp:4.10.0'

1.简单使用

  • okHttp的简单使用代码如下:
//创建OkHttpClient对象
val client = OkHttpClient().newBuilder().build()

//创建Request对象
val request = Request.Builder()
    .url("https://wanandroid.com/wxarticle/list/408/1/json")    //添加请求的url地址
    .build()                                                    //返回一个Request对象

//发起请求
fun request() {
    val response = client
        .newCall(request)                                           //创建一个Call对象
        .enqueue(object : Callback {                                //调用enqueue方法执行异步请求
            override fun onFailure(call: Call, e: IOException) {
                TODO("Not yet implemented")
            }

            override fun onResponse(call: Call, response: Response) {
                TODO("Not yet implemented")
            }
        })
    }
  • 工作流程有四步:
    • 创建OkHttpClient对象
    • 创建Request对象
    • 创建Call对象
    • 开始发起请求,enqueue为异步请求,execute为同步请求

2.OkHttpClient对象是如何创建的

val client = OkHttpClient().newBuilder().build()

OkHttpClient对象的创建是一个典型的建造者模式,先看一下newBuilder方法做了什么,源码如下:

//创建了一个Builder对象
open fun newBuilder(): Builder = Builder(this)
class Builder constructor() {
    //调度器
    internal var dispatcher: Dispatcher = Dispatcher()
    //拦截器集合
    internal val interceptors: MutableList<Interceptor> = mutableListOf()
    //网络拦截器集合
    internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()

    ...

}

newBuilder中创建了一个Builder对象,Builder对象的构造函数中定义了很多的变量,这里只保留了3个重要的。

下面看一下build方法做了什么

//这个this就是上面创建的Builder对象
fun build(): OkHttpClient = OkHttpClient(this)

okHttpClient源码如下

open class OkHttpClient internal constructor(
  builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {

  @get:JvmName("dispatcher") val dispatcher: Dispatcher = builder.dispatcher

  @get:JvmName("interceptors") val interceptors: List<Interceptor> =
      builder.interceptors.toImmutableList()

  @get:JvmName("networkInterceptors") val networkInterceptors: List<Interceptor> =
      builder.networkInterceptors.toImmutableList()

    ...

}

通过OkHttpClient对象的源码可以得知,Builder创建的调度器、拦截器最终都会交给OkHttpClient,这是建造者模式的特定。

3.Request对象是如何创建的

val request = Request.Builder()
    .url("https://wanandroid.com/wxarticle/list/408/1/json")    //添加请求的url地址
    .build()                                                    //返回一个Request对象
open class Builder {
    //请求地址
    internal var url: HttpUrl? = null
    //请求方法
    internal var method: String
    //请求头
    internal var headers: Headers.Builder
    //请求体
    internal var body: RequestBody? = null

    ...
}

4.创建Call对象

val call = client.newCall(request)            
override fun newCall(request: Request): Call {
    //newRealCall中传递了三个参数,第一个参数是OkHttpClient本身,第二个参数request,
    //第三个不用关注
    return RealCall.newRealCall(this, request, forWebSocket = false)
}

接着我们先来看一下RealCall是什么

class RealCall(
  val client: OkHttpClient,
  /** The application's original request unadulterated by redirects or auth headers. */
  val originalRequest: Request,
  val forWebSocket: Boolean
) : Call {
    ...
}

从源码可知RealCallCall的子类,那么Call又是什么呢,往下看

//调用是准备好要执行的请求。也可以取消调用。
//由于该对象表示单个请求/响应对(流),因此不能执行两次。
interface Call : Cloneable {
    //返回发起此调用的原始请求。
    fun request(): Request
    @Throws(IOException::class)

    //同步请求,立即调用请求,并阻塞,直到响应可以处理或出现错误。
    fun execute(): Response

    //异步请求,接受回调参数
    fun enqueue(responseCallback: Callback)

    //取消请求
    fun cancel()

    //如果此调用已被执行或进入队列,则返回true。多次执行调用是错误的。
    fun isExecuted(): Boolean

    //是否是取消状态
    fun isCanceled(): Boolean

    //超时时间,
    fun timeout(): Timeout

    //创建一个与此调用相同的新调用,即使该调用已经进入队列或执行,该调用也可以被加入队列或执行。
    public override fun clone(): Call

    fun interface Factory {
        fun newCall(request: Request): Call
    }
}

5.发起请求

  • 以异步请求为例进行分析
call.enqueue(object : Callback {
    override fun onFailure(call: Call, e: IOException) {
        println("onFailure:$e")
    }

    override fun onResponse(call: Call, response: Response) {
        println("onResponse:${response.body.toString()}")
    }
})

RealCallCall的子类所以enqueue的具体实现是在RealCall

override fun enqueue(responseCallback: Callback) {
     //检查是否进行了二次请求
     check(executed.compareAndSet(false, true)) { "Already Executed" }

    //请求后立即调用,相当于监听请求的开始事件
    callStart()
    //将请求交给调度器来决定什么时候开始请求
    client.dispatcher.enqueue(AsyncCall(responseCallback))
}

private fun callStart() {
    this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()")
    eventListener.callStart(this)
}
  • 疑问:client.dispatcher.enqueue是如何决定什么时候开始请求的

    • 已知client就是OkHttpClient
    • dispatcher是调度器,先来看一下它的源码
class Dispatcher constructor() {
    //并发执行的最大请求数。上面的请求队列在内存中,等待正在运行的调用完成。
    //如果在调用这个函数时有超过maxRequests的请求在运行,那么这些请求将保持在运行状态。
    @get:Synchronized var maxRequests = 64
    set(maxRequests) {
        require(maxRequests >= 1) { "max < 1: $maxRequests" }
        synchronized(this) {
            field = maxRequests
        }
        promoteAndExecute()
    }

    //每台主机可并发执行的最大请求数。这限制了URL主机名的请求。
    //注意,对单个IP地址的并发请求仍然可能超过此限制:
    //多个主机名可能共享一个IP地址或通过同一个HTTP代理路由。
    @get:Synchronized var maxRequestsPerHost = 5
    set(maxRequestsPerHost) {
        require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
        synchronized(this) {
            field = maxRequestsPerHost
        }
        promoteAndExecute()
    }

    //线程安全的单例模式,线程池的获取用于线程调度。
    @get:Synchronized
    @get:JvmName("executorService") val executorService: ExecutorService
    get() {
        if (executorServiceOrNull == null) {
            executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
                                                       SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
        }
        return executorServiceOrNull!!
    }

    //定义准备发送的队列
    private val readyAsyncCalls = ArrayDeque<AsyncCall>()

    //定义异步发送队列
    private val runningAsyncCalls = ArrayDeque<AsyncCall>()

    //定义同步发送队列
    private val runningSyncCalls = ArrayDeque<RealCall>()

    ...
}
    • 再来看一下dispatcher.enqueue的源码
internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
        //将call对象添加到准备发送的队列,这个call对象来自AsyncCall,稍后再讲
        readyAsyncCalls.add(call)

        //修改AsyncCall,使其共享对同一主机的现有运行调用的AtomicInteger。
        if (!call.get().forWebSocket) {
            //共享一个已经存在的正在运行调用的AtomicInteger
            val existingCall = findExistingCallWithHost(call.host())
            //统计发送数量
            if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
        }
    }
    //准备发送请求
    promoteAndExecute()
}
//将符合条件的call从readyAsyncCalls(准备发送的队列)添加到runningAsyncCalls(异步发送队列)中
//并在服务器上执行它们
//不能在同步时调用,因为执行调用可以调用用户代码
//如果调度程序当前正在运行,则为true。
private fun promoteAndExecute(): Boolean {
    assert(!Thread.holdsLock(this))

    //收集所有需要执行的请求
    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
        val i = readyAsyncCalls.iterator()
        //遍历准备发送的队列
        while (i.hasNext()) {
            val asyncCall = i.next()

            //判断已经发送的请求是大于等于最大请求个数64,是则跳出循环
            if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
            //判断并发请求个数是否大于等于最大并发个数5,如果是则跳出循环
            if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.

            //从准备队列中删除
            i.remove()
            //计数+1
            asyncCall.callsPerHost().incrementAndGet()
            executableCalls.add(asyncCall)
            //将对象添加到异步发送队列中
            runningAsyncCalls.add(asyncCall)
        }
        isRunning = runningCallsCount() > 0
    }

    for (i in 0 until executableCalls.size) {
        val asyncCall = executableCalls[i]
        //提交任务到线程池
        asyncCall.executeOn(executorService)
    }

    return isRunning
}
    • 最终请求是通过AsyncCallexecuteOn发送出去的,AsyncCall是什么
internal inner class AsyncCall(
    private val responseCallback: Callback
  ) : Runnable {
    ...
}
    • 接收了一个回调,继承了Runnable
fun executeOn(executorService: ExecutorService) {
    client.dispatcher.assertThreadDoesntHoldLock()

    //暂时定义为未执行成功
    var success = false
    try {
        //使用线程执行自身
        executorService.execute(this)
        //执行成功
        success = true
    } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        noMoreExchanges(ioException)
        //发送失败
        responseCallback.onFailure(this@RealCall, ioException)
    } finally {
        if (!success) {
            //发送结束
            client.dispatcher.finished(this) // 停止运行
        }
    }
}
    • AsyncCall将自己加入到线程池,然后线程池开启线程执行自己的run方法,那么AsyncCall加入了一个怎样的线程池呢?
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
    if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
                                                   SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
    }
    return executorServiceOrNull!!
}
    • 这里定义了一个缓存线程池,具有缓存功能且如果一个请求执行完毕后在60s内再次发起就会复用刚才那个线程,提高了性能。
    • 交给线程池后,线程池会开启线程执行AsyncCall的run方法
override fun run() {
    threadName("OkHttp ${redactedUrl()}") {
        //定义响应标志位,用于表示请求是否成功
        var signalledCallback = false
        timeout.enter()
        try {
            //发送请求并得到结果
            val response = getResponseWithInterceptorChain()
            //过程未出错
            signalledCallback = true
            //回调结果
            responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
            if (signalledCallback) {
                // 不要两次发出回调信号!
                Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e)
            } else {
                //失败回调
                responseCallback.onFailure(this@RealCall, e)
            }
        } catch (t: Throwable) {
          cancel()
          if (!signalledCallback) {
            val canceledException = IOException("canceled due to $t")
            canceledException.addSuppressed(t)
            responseCallback.onFailure(this@RealCall, canceledException)
          }
          throw t
        } finally {
            client.dispatcher.finished(this)
        }
    }
}

在请求得到结果后最后会调用finish表示完成,这里的finish又做了什么呢?

/**
 * 由[AsyncCall.run]用于表示完成。
 */
internal fun finished(call: AsyncCall) {
    call.callsPerHost.decrementAndGet()
    finished(runningAsyncCalls, call)
}

/** 
 * 由[Call.execute]用于表示完成。
 */
internal fun finished(call: RealCall) {
    finished(runningSyncCalls, call)
}

private fun <T> finished(calls: Deque<T>, call: T) {
    val idleCallback: Runnable?
    synchronized(this) {
        //将完成的任务从队列中删除
        if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
            idleCallback = this.idleCallback
    }

    //用于将等待队列中的请求移入异步队列,并交由线程池执行
    val isRunning = promoteAndExecute()

    //如果没有请求需要执行,回调闲置callback
    if (!isRunning && idleCallback != null) {
        idleCallback.run()
    }
}
  • 流程图如下

作者:无糖可乐爱好者
链接:https://juejin.cn/post/7182552726641836087

相关文章

网友评论

      本文标题:OkHttp源码分析(一)

      本文链接:https://www.haomeiwen.com/subject/cswxcdtx.html