OkHttp是主流的网络请求框架,Android网络请求基本的项目封装也是有Rxjava+Retrofit+Okhttp进行封装,面对Kotlin语法可能也有的同学使用Coroutine+Retrofit+Okhttp进行封装 这篇文章并非将封装 而是对OkHttp源码性进行阅读 对OkHttp进行一步的了解,并且学习里面一些里面的设计思想。
源码是最好的老师!
本文基于okhttp:4.2.2
okhttp的基本使用:
//////////////////////////////////////////////
//下面代码中调用到的协程相关信息
class OkhttpActivity : AppCompatActivity(){
private val job = Job()
private val ioScope = CoroutineScope(Dispatchers.IO + job)
lateinit var handler:CoroutineExceptionHandler
companion object{
val TAG = "OkhttpActivity"
}
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_okhttp)
handler = CoroutineExceptionHandler{_,exception ->
run {
Log.d(TAG, "exception" + exception.message)
}
}
}
同步请求方式
//创建OkHttpClient对象,这里使用的是Builder设计模式的创建方式
var client = OkHttpClient.Builder()
.connectTimeout(5, TimeUnit.SECONDS)
.build()
//创建Request对象
var request = Request.Builder().url("https://www.wanandroid.com/article/list/0/json").build()
ioScope.launch(handler) {
//同步请求
var response = client.newCall(request).execute()
}
以上的操作是基本的实例代码 :
步骤如下:
- 创建
OkHttpClient
- 创建请求对象
Request
同步请求直接写成一句了 - ,实际上是先
调newCall()
方法返回一个Call
对象 - 调用
execute()
方法,最终根据返回Respone对象
源码分析
- 首先分析:
OkHttpClient
,从OkHttpClient.Builder()
open class OkHttpClient internal constructor(builder: Builder) :
Cloneable, Call.Factory, WebSocket.Factory {
//关键2
constructor() : this(Builder())
//...省略其他代码
class Builder constructor() {
internal var dispatcher: Dispatcher = Dispatcher()
internal var connectionPool: ConnectionPool = ConnectionPool()
internal val interceptors: MutableList<Interceptor> = mutableListOf()
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
internal var retryOnConnectionFailure = true
internal var authenticator: Authenticator = Authenticator.NONE
fun dispatcher(dispatcher: Dispatcher) = apply {
this.dispatcher = dispatcher
}
//关键3
fun connectTimeout(timeout: Long, unit: TimeUnit) = apply {
connectTimeout = checkDuration("timeout", timeout, unit)
}
//... 省略其他代码
//关键1
fun build(): OkHttpClient = OkHttpClient(this)
}
}
上面可以看到
Builder
是OkHttpClient
中的内部类,内部类中的build()
将自己作为参数传入OkHttpClient(this)
调用了constructor() : this(Builder())
,这里面的代码中,将所有设置方法返回自己本身this
使用的是设计模式的建造者模式
。
(可能有些同学没怎么看懂dispatcher()
这个方法返回Builder
本身,这个可以看一下kotlin的apply
这些函数)
- 第二步分析建请求对象
Request
class Request internal constructor(
@get:JvmName("url") val url: HttpUrl,
...
) {
fun newBuilder(): Builder = Builder(this)
open class Builder {
internal var url: HttpUrl? = null
internal var method: String
internal var headers: Headers.Builder
internal var body: RequestBody? = null
/** A mutable map of tags, or an immutable empty map if we don't have any. */
internal var tags: MutableMap<Class<*>, Any> = mutableMapOf()
//无参数构造方法
constructor() {
this.method = "GET"
this.headers = Headers.Builder()
}
//带Request参数的构造方法
internal constructor(request: Request) {
this.url = request.url
this.method = request.method
this.body = request.body
this.tags = if (request.tags.isEmpty()) {
mutableMapOf()
} else {
request.tags.toMutableMap()
}
this.headers = request.headers.newBuilder()
}
}
其实跟
OkHttpClient
类似,Reuqest
这个类也有个内部类Builder
,同样是建造者
模式的一个应用,其中可以看到构造方法中this.method = "GET"
代表默认是为GET的请求的,且有个带参的构造函数constructor(request: Request)
在Reuqest
中被newBuilder()
方法调用。(这个方法一般用于重新设置新的请求头的需求,例如 拦截器~)
- 接下来 分析
newCall()
OkHttpClient{
//...省略其他代码
override fun newCall(request: Request): Call {
return RealCall.newRealCall(this, request, forWebSocket = false)
}
}
/////////////////////////////////////////////////
//RealCall 类 我们进去newRealCall()方法查看一下
internal class RealCall private constructor(
val client: OkHttpClient,
val originalRequest: Request,
val forWebSocket: Boolean
) : Call {
companion object {
fun newRealCall(
client: OkHttpClient,
originalRequest: Request,
forWebSocket: Boolean
): RealCall {
// Safely publish the Call instance to the EventListener.
return RealCall(client, originalRequest, forWebSocket).apply {
transmitter = Transmitter(client, this)
}
}
}
}
调用
newCall()
方法,可以看到实例化了RealCall
并且transmitter = Transmitter(client, this)
实例化了RealCall
中的transmitter
,并返回了RealCall
本身
- 接下来看
execute
//RealCall类
override fun execute(): Response {
synchronized(this) {
check(!executed) { "Already Executed" }
executed = true
}
transmitter.timeoutEnter()
// 关键1
transmitter.callStart()
try {
// 关键2
client.dispatcher.executed(this)
// 关键3
return getResponseWithInterceptorChain()
} finally {
client.dispatcher.finished(this)
}
}
///////////////////////////////////
//关键1 :callStart()的调用
// Transmitter类:
fun callStart() {
this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()")
eventListener.callStart(call)
}
////////////////////////////////////
// 关键2 executed()的调用:
class Dispatcher constructor() {
// ....省略其他代码
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("OkHttp Dispatcher", false))
}
return executorServiceOrNull!!
}
private val runningSyncCalls = ArrayDeque<RealCall>()
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}
}
/////////////////////////////////////////////////////////////////////////////////////
// 关键3 getResponseWithInterceptorChain()
@Throws(IOException::class)
fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
//关键代码5
val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this,
client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis)
var calledNoMoreExchanges = false
try {
//关键代码4 真正处理拦截器的地方 并返回response 数据
val response = chain.proceed(originalRequest)
if (transmitter.isCanceled) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw transmitter.noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
transmitter.noMoreExchanges(null)
}
}
}
///////////////////////////////////////////////////////////////////////
// 关键代码4 真正处理拦截器的地方
RealInterceptorChain类
@Throws(IOException::class)
fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?): Response {
if (index >= interceptors.size) throw AssertionError()
calls++
// If we already have a stream, confirm that the incoming request will use it.
check(this.exchange == null || this.exchange.connection()!!.supportsUrl(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}
// If we already have a stream, confirm that this is the only call to chain.proceed().
check(this.exchange == null || calls <= 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
}
// Call the next interceptor in the chain.
// 关键代码5
val next = RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout)
val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
// Confirm that the next interceptor made its required call to chain.proceed().
check(exchange == null || index + 1 >= interceptors.size || next.calls == 1) {
"network interceptor $interceptor must call proceed() exactly once"
}
check(response.body != null) { "interceptor $interceptor returned a response with no body" }
return response
}
关键代码1:
transmitter.callStart()
,中最终调用了eventListener.callStart(call)
设置监听回调。关键代码2:
client.dispatcher.executed(this)
做请求一次的判断跟标记,如果!executed == true
,则抛出异常,false
则是标记为true``client.dispatcher.executed(this),Dispatcher
中调用executed
将其添加到runningSyncCalls
数组双端队列中关键代码3
getResponseWithInterceptorChain()
这个是最终获取到数据Response
的调用,方法中设置的各种拦截器,包括cookie
cache
等,默认cookie
为CookieJar.NO_COOKIES
关键代码4 真正处理代码拦截器的地方,这里使用
责任链
的设计模式,在// Call the next interceptor in the chain.// 关键代码5
这个地方一直调用链表的下个拦截器的intercept()
方法,递归的形式.关键代码5 我们可以看到,最新放入的拦截器最先处理, 优先我们设置的拦截器
异步请求方式
创建OkHttpClient对象
var client = OkHttpClient.Builder()
.connectTimeout(5, TimeUnit.SECONDS)
.build()
创建Request对象
var request = Request.Builder().url("https://www.wanandroid.com/article/list/0/json").build()
异步请求
ioScope.launch(handler) {
client.newCall(request).enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
Log.d(TAG, "onFailure" + e.message)
}
override fun onResponse(call: Call, response: Response) {
Log.d(TAG, "onResponse" + response.message)
}
})
}
以上的操作是基本的实例代码 :
步骤如下:
- 创建
OkHttpClient
- 创建请求对象
Request
- 调
newCall()
方法返回一个Call
对象 - 调用
enqueue()
方法,并添加内部实现类Callback
,实现回调的两个方法
源码分析
- 直接从
RealCall
的enqueue()
入手
override fun enqueue(responseCallback: Callback) {
synchronized(this) {
check(!executed) { "Already Executed" }
executed = true
}
transmitter.callStart()
// 关键代码1
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
这里有两点要跟
enqueue()
方法跟AsyncCall()类
,先看下面AsyncCall()
类的
/////////////////////////////////////////////
// 先分析一下AsyncCall
internal inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
@Volatile private var callsPerHost = AtomicInteger(0)
fun callsPerHost(): AtomicInteger = callsPerHost
fun executeOn(executorService: ExecutorService) {
var success = false
try {
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
....
} finally {
if (!success) {
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
transmitter.timeoutEnter()
try {
// 关键代码1.1
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} finally {
client.dispatcher.finished(this)
}
}
}
}
AsyncCall
实现了Runnable
接口,实现run()
方法 线程池跑的时候就是跑run
里面的逻辑。这里调用了getResponseWithInterceptorChain()
这里上面有讲到是去进行真正网络请求的地方。然后将获取回来的结果通过传进来的responseCallback
,回调回去。
讲完
client.dispatcher.enqueue(AsyncCall(responseCallback))
的AsyncCall()
我们来讲讲enqueue()
这个方法
/////////////////////////////////////////////
class Dispatcher constructor() {
// ...省略其它代码
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
readyAsyncCalls.add(call) // 关键代码1
// 关键代码 2
if (!call.get().forWebSocket) {
val existingCall = findExistingCallWithHost(call.host())
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
// 关键代码3
promoteAndExecute()
}
}
//////////////////////////////////////////////////////////////////////////////////////
// 关键代码3
private fun promoteAndExecute(): Boolean {
assert(!Thread.holdsLock(this))
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
// 关键代码4
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove()
asyncCall.callsPerHost().incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
// 关键代码5
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
// 关键代码 6
asyncCall.executeOn(executorService)
}
return isRunning
}
关键代码1 调用
Dispatcher
类中的enqueue
,把call
放入readyAsyncCalls
这里补充一点关于这几个Call的数组对列分别记录什么:
// 用于记录准备好要运行的异步Call
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
//用于记录异步运行的Call,包括已经取消但是还没有结束的调用
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
//用于记录同步运行的Call,包括已经取消但是还没有结束的调用
private val runningSyncCalls = ArrayDeque<RealCall>()
关键代码2 将其他的Call里面的
callsPerHost
的值传给它 (这里是为了后面做判断主机的而做的操作,主机数超过5时暂时不放到运行数组队列中)关键代码3 4
promoteAndExecute
这个比较重点,从准备好的readyAsyncCalls
轮训获取出每个AsyncCalls
,如果RunningAsyncCalls
的总数大于this.maxRequestsPerHost
(64)则停止轮训,或者asyncCall.callsPerHost().get()
的值超过this.maxRequests
(5)则忽略这次的逻辑。
其逻辑主要就是将准备态的ReadyAsyncCalls
的AsyncCall
添加到runningAsyncCalls
中,并从ReadyAsyncCalls
删除。关键代码5 轮训使用线程池(下面有代码展示)运行
readyAsyncCalls
里面的符合目前逻辑要求的asyncCall
进行运行。
//线程池
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("OkHttp Dispatcher", false))
}
return executorServiceOrNull!!
}
跟着 走一下关键代码6的逻辑
asyncCall.executeOn(executorService)
fun executeOn(executorService: ExecutorService) {
assert(!Thread.holdsLock(client.dispatcher))
var success = false
try {
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
transmitter.noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
这里调用了
executorService.execute(this)
至此 便调用到了AsyncCall
的run
方法,这里可以看一下 上面源码的// 先分析一下AsyncCall
这块调用
总结:
OkHttp
通过建造者
模式创建OkHttpClient
、Request
和Response
,在通过创建Call
发起同步异步请求,Dispatcher
作为中转点,做Call
的请求队列的维护为主,使用线程池对Call
的请求队列进行调度运行,最后通过各种拦截器(责任链模式
) 最终请求网络返回数据信息
网友评论