美文网首页
okhttp3学习笔记

okhttp3学习笔记

作者: 阿丹_90 | 来源:发表于2020-01-15 18:14 被阅读0次

    源码版本:com.squareup.okhttp3:okhttp:4.3.0

    1 okhttp简单应用

    通过okhttp发起异步post请求示例:

    // 创建OkHttpClient单例
    val okHttpClient = OkHttpClient()
    
    /* 媒体类型 用于描述http请求或响应主体的内容类型。
    * 官方注释:一种[RFC 2045][rfc_2045]媒体类型,适合描述http请求或响应体的内容类型。
    * 默认charset=utf-8。*/
    val contentType: MediaType = "application/json; charset=utf-8".toMediaType()
    // json格式请求参数
    val body: RequestBody = "{\"userId\": \"001\", \"password\": \"123\"}".toRequestBody(contentType)
    // 新建post请求
    val post: Request = Request.Builder()
                    .url("")
                    .post(body)
                    .build()
    
    // 异步发起,回调形式返回结果
    okHttpClient.newCall(post).enqueue(object : Callback {
                override fun onFailure(call: Call, e: IOException) {
                }
    
                @Throws(IOException::class)
                override fun onResponse(call: Call, response: Response) {
                    val request = call.request()
                    val requestBody = request.body
                    val responseBody = response.body
                }
            })
    

    可以看出用okhttp发送请求需要3步

    1. 创建一个单例OkHttpClient对象
    2. 创建原始请求Request对象
    3. 用OkHttpClient对象把Request对象包装成直接可以用Call,并排队执行。

    1.1 OkHttpClient

    关于OkHttpClient的官方注释:OkHttpClient是http请求(Call)的工厂,可以用来发送http请求和接受响应。只创建一个OkHttpClient实例并将其用于所有HTTP调用时,OkHttp的性能最佳。因为每个OkHttpClient实例都拥有自己的连接池和线程池。重用连接和线程可以减少延迟并节省内存。相反,为每个请求创建一个OkHttpClient实例会浪费空闲池的资源。
    OkHttpClient可以直接通过构造方法创建也可以通过OkHttpClient.Builder创建。

        // 通过OkHttpClient.Builder创建OkHttpClient实例
        val okHttpClient = OkHttpClient.Builder()
                .eventListener(object : EventListener() {
                    override fun callStart(call: Call) {
                        Log.d("network", "发起请求${call.request().url}")
                    }
                })
                .build()
    
        // 通过无参构造创建OkHttpClient实例
        val okHttpClient = OkHttpClient()
    
        // OkHttpClient.Builder的build()方法
        fun build(): OkHttpClient = OkHttpClient(this)
    
    open class OkHttpClient internal constructor(
      builder: Builder
    ) : Cloneable, Call.Factory, WebSocket.Factory {
        // OkHttpClient无参构造
        constructor() : this(Builder())
        // 省略其他代码
        ...
    }
    

    查看源码可以看出两种方法其实是一种,但如果想自定义OkHttpClient属性则需通过OkHttpClient.Builder。我们来看一下OkHttpClient.Builder的属性和默认值,这些最后会赋值给OkHttpClient。

        // 调度器
        internal var dispatcher: Dispatcher = Dispatcher()
        // 连接池 底层用ArrayDeque
        internal var connectionPool: ConnectionPool = ConnectionPool()
        // 用户责任链/拦截器链 ArrayList
        internal val interceptors: MutableList<Interceptor> = mutableListOf()
        // 网络责任链/拦截器链 ArrayList
        internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
        // EventListener工厂
        internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
        // 连接失败是否重试
        internal var retryOnConnectionFailure = true
        // 身份验证
        internal var authenticator: Authenticator = Authenticator.NONE
        internal var followRedirects = true
        internal var followSslRedirects = true
        internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
        internal var cache: Cache? = null
        internal var dns: Dns = Dns.SYSTEM
        internal var proxy: Proxy? = null
        internal var proxySelector: ProxySelector? = null
        internal var proxyAuthenticator: Authenticator = Authenticator.NONE
        internal var socketFactory: SocketFactory = SocketFactory.getDefault()
        internal var sslSocketFactoryOrNull: SSLSocketFactory? = null
        internal var x509TrustManagerOrNull: X509TrustManager? = null
        internal var connectionSpecs: List<ConnectionSpec> = OkHttpClient.DEFAULT_CONNECTION_SPECS
        internal var protocols: List<Protocol> = OkHttpClient.DEFAULT_PROTOCOLS
        internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
        internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
        internal var certificateChainCleaner: CertificateChainCleaner? = null
        internal var callTimeout = 0
        // 链接超时时间
        internal var connectTimeout = 10_000
        // 读超时时间
        internal var readTimeout = 10_000
        // 写超时时间
        internal var writeTimeout = 10_000
        internal var pingInterval = 0
    

    1.2 Request

    Request只能通过Request.Builder创建,默认get请求。可自定义请求方式、url、header、tag等。Request不能被OkHttpClient直接使用,需要包装成Call。

    Call是interface,我们来看一下Call定义了哪些属性和方法。

    interface Call : Cloneable {
           // 返回原始Request对象
           fun request(): Request
           // 同步执行请求,直接返回响应
           @Throws(IOException::class)
           fun execute(): Response
           // 异步执行请求,把请求加入队列等待执行,通过Callback返回响应
           fun enqueue(responseCallback: Callback)
           // 请求可取消,已经执行过的不可以取消
           fun cancel()
           // 是否执行了 execute或enqueue
           fun isExecuted(): Boolean
           fun isCanceled(): Boolean
           // 返回跨越整个调用的Timeout,包括解析DNS、连接、写入请求体、服务器处理和读取响应体过程。如果调用需要重定向或重试,所有操作都必须在一个超时周期内完成。
           fun timeout(): Timeout
           // 复制当前Call,因为一个Call只可以发起一次,想重复发起相同相求需复制一个新的Call
           public override fun clone(): Call
           // Call工厂interface
           interface Factory {
                  fun newCall(request: Request): Call
           }
    }
    

    RealCall是Call的实现类,原始请求request先包装成RealCall,后续还会包装成RealCall的内部类AsnyCall,这里先不说。OkHttpClient实现了Call.Factory,所以前边说OkHttpClient是http请求(Call)的工厂。

    RealCall对象是通过okHttpClient.newCall(post)创建的。

    override fun newCall(request: Request): Call {
           return RealCall.newRealCall(this, request,forWebSocket = false)
    }
    

    RealCall构造方法私有,只能通过RealCall的newRealCall(client: OkHttpClient,originalRequest: Request,forWebSocket: Boolean)构建实例。

    companion object {
           fun newRealCall(
                  client: OkHttpClient,
                  originalRequest: Request,
                  forWebSocket: Boolean
           ): RealCall {
           // Safely publish the Call instance to the EventListener.
           return RealCall(client, originalRequest, forWebSocket).apply {
                  transmitter = Transmitter(client,this)
                  }
           }
    }
    

    1.3 发送异步请求

    平时应用比较多的是异步请求,接下来我们看一下okHttp异步请求的执行过程。RealCall的enqueue方法:

    override fun enqueue(responseCallback: Callback) {
           // 加锁用标记位判断当前对象是否以执行,执行过抛异常"Already Executed",没执行过则执行。
           synchronized(this){
                  check(!executed){ "Already Executed" }
                  executed =true
           }
           // 记录堆栈跟踪,告诉eventListener请求开始
           transmitter.callStart()
           // 通过dispatcher进行排队
           client.dispatcher.enqueue(AsyncCall(responseCallback))
    }
    

    2 Dispatcher和AsyncCall

    Dispatcher 调度器,用于决定何时执行异步请求。可在创建OkHttpClient时自定义,一般没必要自定义,okhttp有很好的默认实现。说到Dispatcher就必须提到AsyncCall,AsyncCall是RealCall的一个内部类,是为了配合dispatcher进行线程调度的Runnable。
    我们先看一下Dispatcher的部分源码。

      // 并发执行的请求的最大数目
      @get:Synchronized var maxRequests = 64
        set(maxRequests) {
          require(maxRequests >= 1) { "max < 1: $maxRequests" }
          synchronized(this) {
            field = maxRequests
          }
          promoteAndExecute()
        }
    
      // 同一host请求并发执行的请求的最大数目。WebSocket 不受此限制
      @get:Synchronized var maxRequestsPerHost = 5
        set(maxRequestsPerHost) {
          require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
          synchronized(this) {
            field = maxRequestsPerHost
          }
          promoteAndExecute()
        }
    
      // 线程池
      private var executorServiceOrNull: ExecutorService? = null
      // 线程池懒汉式初始化
      @get:Synchronized
      @get:JvmName("executorService") val executorService: ExecutorService
        get() {
          if (executorServiceOrNull == null) {
            executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
                SynchronousQueue(), threadFactory("OkHttp Dispatcher", false))
          }
          return executorServiceOrNull!!
        }
    
      // 准备执行的AsyncCall列表
      private val readyAsyncCalls = ArrayDeque<AsyncCall>()
      // 正在执行的AsyncCall列表
      private val runningAsyncCalls = ArrayDeque<AsyncCall>()
    
      // 请求入队
      internal fun enqueue(call: AsyncCall) {
        synchronized(this) {
          // 把call加入准备执行的AsyncCall列表
          readyAsyncCalls.add(call)
    
          // 相同host的call共享同一AtomicInteger 
          // 这里我个人认为把 readyAsyncCalls.add(call)放到if语句后更好
          if (!call.get().forWebSocket) {
            // 在执行中call列表和准备执行call列表寻找相同host的call
            val existingCall = findExistingCallWithHost(call.host())
            // 存在相同host的call,则把找到的call的AtomicInteger赋值给当前call
            if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
          }
        }
        // 推动执行
        promoteAndExecute()
      }
    
      /**
       * 在执行中call列表和准备执行call列表寻找相同host的call
       */
      private fun findExistingCallWithHost(host: String): AsyncCall? {
        for (existingCall in runningAsyncCalls) {
          if (existingCall.host() == host) return existingCall
        }
        /* 如果执行到这里肯定能找到相同host的call的,
        因为当前call已经被加入到readyAsyncCalls,
        当遍历到最后一个call就是当前call本身,
        肯定符合existingCall.host() == host,然后返回的call实际就是当前call。*/
        for (existingCall in readyAsyncCalls) {
          if (existingCall.host() == host) return existingCall
        }
        return null
      }
    
      /**
       * 筛选合适的待执行列表的call到执行中列表。
       * 并在线程池executorService中执行
       */
      private fun promoteAndExecute(): Boolean {
        assert(!Thread.holdsLock(this))
    
        // 可拓展列表,用于存放待执行列表中合适移动到执行中列表的call
        val executableCalls = mutableListOf<AsyncCall>()
        // 当前是否有请求正在执行
        val isRunning: Boolean
        synchronized(this) {
          val i = readyAsyncCalls.iterator()
          // 遍历待执行call列表
          while (i.hasNext()) {
            val asyncCall = i.next()
            // 如果当前正在执行的call数量超过了调度器允许的最大值,则终止操作
            if (runningAsyncCalls.size >= this.maxRequests) break
            // 如果与当前call相同host的call数量超过调度器允许的同一个host的最大值,则跳过当前call,继续判断下一个call是否合适。
            if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue 
    
            // 当前call合适
            // 移出待执行列表
            i.remove()
            // 同host请求计数加1
            asyncCall.callsPerHost().incrementAndGet()
            // 把当前call添加到可执行列表,当循环完成此列表中包含此次筛选的所有合适的call
            executableCalls.add(asyncCall)
            // 把当前call添加到执行中列表,此时此列表中包含之前正在执行的call和此次筛选出的合适执行但未执行的call
            runningAsyncCalls.add(asyncCall)
          }
          // 判断是否有正在执行的(包含合适执行但未执行的)请求
          isRunning = runningCallsCount() > 0
        }
    
        // 把此次循环筛选出来的call放到线程池中执行,for结束runningAsyncCalls中的所有call都处于执行中状态
        for (i in 0 until executableCalls.size) {
          val asyncCall = executableCalls[i]
          asyncCall.executeOn(executorService)
        }
    
        return isRunning
      }
    

    这部分代码比较长,但是只做了一件事:把请求放到队列中合理调度执行。程序的入口是enqueue(call: AsyncCall)方法,操作一共分3步

    1. 把call放到待执行列表。
    2. 相同host的call共用同一AtomicInteger。AtomicInteger用于统计当前host正在执行的请求数量。
    3. 筛选合适的待执行call,从待执行列表取出放到执行中列表,然后在线程池中执行。

    我们在看一下AsyncCall的源码

      internal inner class AsyncCall(
        private val responseCallback: Callback
      ) : Runnable {
        // 多线程操作可见,且操作保证原子性
        @Volatile private var callsPerHost = AtomicInteger(0)
        fun callsPerHost(): AtomicInteger = callsPerHost
        fun reuseCallsPerHostFrom(other: AsyncCall) {
          this.callsPerHost = other.callsPerHost
        }
        // 原始请求的host
        fun host(): String = originalRequest.url.host
        // 原始请求
        fun request(): Request = originalRequest
        // 当前RealCall对象,持有此内部类的外部类对象
        fun get(): RealCall = this@RealCall
    
        // 执行当前AsnyCall
        fun executeOn(executorService: ExecutorService) {
          assert(!Thread.holdsLock(client.dispatcher))
          var success = false
          try {
            // 把当前call(Runnable对象)加入dispatcher的线程池中执行
            executorService.execute(this)
            success = true
          } catch (e: RejectedExecutionException) {
            val ioException = InterruptedIOException("executor rejected")
            ioException.initCause(e)
            // 执行结束,尝试释放connection,并通知eventListener
            transmitter.noMoreExchanges(ioException)
            // 失败回调,把当前call和异常传回
            responseCallback.onFailure(this@RealCall, ioException)
          } finally {
            if (!success) {
              // 执行结束,操作调度器,在call列表删除当前call;如队列有等待call则唤醒执行;如调度器空闲则通知调度器空闲监听器。
              client.dispatcher.finished(this) // This call is no longer running!
            }
          }
        }
    
        // 当前Runnable的run方法
        override fun run() {
          // 当前是在子线程
          threadName("OkHttp ${redactedUrl()}") {
            var signalledCallback = false
            // 开始计时
            transmitter.timeoutEnter()
            try {
              // 真正的网络请求操作,response是网络请求结果
              val response = getResponseWithInterceptorChain()
              signalledCallback = true
              // 回调 请求成功
              responseCallback.onResponse(this@RealCall, response)
            } catch (e: IOException) {
              if (signalledCallback) {
                // Do not signal the callback twice!
                Platform.get().log("Callback failure for ${toLoggableString()}", INFO, e)
              } else {
                // 回调 请求失败
                responseCallback.onFailure(this@RealCall, e)
              }
            } catch (t: Throwable) {
              // 停止计时 即调用:transmitter.cancel() 
              cancel()
              if (!signalledCallback) {
                val canceledException = IOException("canceled due to $t")
                canceledException.addSuppressed(t)
                // 回调 请求失败
                responseCallback.onFailure(this@RealCall, canceledException)
              }
              throw t
            } finally {
              // 执行结束,操作调度器,在call列表删除当前call;如队列有等待call则唤醒执行;如调度器空闲则通知调度器空闲监听器。
              client.dispatcher.finished(this)
            }
          }
        }
      }
    

    Asnycall的代码非常简单,只有2个重点方法:
    fun executeOn(executorService: ExecutorService) 和
    Runnable的抽象方法override fun run()。
    executeOn(executorService: ExecutorService)是提供给dispatcher当请求排队到可执行队列中时操作请求执行的入口,整个方法除了异常处理和回调处理,只有一句executorService.execute(this),executorService是从dispatcher持有的所有call共享的线程池,okhttp通过此线程池实现线程的回收利用。
    run()方法就是网络请求的真正入口,即:val response = getResponseWithInterceptorChain()获取网络请求结果,然后通过回调把结果传给调用者。至此,okhttp的线程调度完成,Dispatcher和Asnycall的任务完成。

    3 责任链

    接下来我们看一下Asnycall的run()里用来获取网络响应的getResponseWithInterceptorChain(),它拉开了okhttp的一大亮点的序幕:责任链模式。

      @Throws(IOException::class)
      fun getResponseWithInterceptorChain(): Response {
        // Build a full stack of interceptors. 需要留意任务链中拦截器的顺序
        val interceptors = mutableListOf<Interceptor>()
        // 用户自定义拦截器,最先执行的拦截器
        interceptors += client.interceptors
        // 失败重试和重定向拦截器,如果请求取消则可能抛出IOException
        interceptors += RetryAndFollowUpInterceptor(client)
        // 桥接拦截器
        interceptors += BridgeInterceptor(client.cookieJar)
        // 缓存拦截器
        interceptors += CacheInterceptor(client.cache)
        // 链接拦截器,用于打开目标服务器的链接
        interceptors += ConnectInterceptor
        // 用户自定义的网络拦截器
        if (!forWebSocket) {
          interceptors += client.networkInterceptors
        }
        // 真正对服务器进行网络调用的拦截器,任务链最后一个拦截器
        interceptors += CallServerInterceptor(forWebSocket)
    
        // 创建任务链,注意第四个参数index的值是0
        val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this,
            client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis)
    
        var calledNoMoreExchanges = false
        try {
          // 执行任务链下一个
          val response = chain.proceed(originalRequest)
          // 以下都是异常处理
          if (transmitter.isCanceled) {
            response.closeQuietly()
            throw IOException("Canceled")
          }
          return response
        } catch (e: IOException) {
          calledNoMoreExchanges = true
          throw transmitter.noMoreExchanges(e) as Throwable
        } finally {
          if (!calledNoMoreExchanges) {
            transmitter.noMoreExchanges(null)
          }
        }
      }
    

    任务链的执行入口是val response = chain.proceed(originalRequest),我们点进去看一下,这里确实不太好理解,所以源码中有很多注释。

    override fun proceed(request: Request): Response {
        return proceed(request, transmitter, exchange)
      }
    
      @Throws(IOException::class)
      fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?): Response {
        if (index >= interceptors.size) throw AssertionError()
    
        calls++
    
        // If we already have a stream, confirm that the incoming request will use it.
        check(this.exchange == null || this.exchange.connection()!!.supportsUrl(request.url)) {
          "network interceptor ${interceptors[index - 1]} must retain the same host and port"
        }
    
        // If we already have a stream, confirm that this is the only call to chain.proceed().
        check(this.exchange == null || calls <= 1) {
          "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
        }
    
        // Call the next interceptor in the chain. 注意这里第四个参数index+1了
        val next = RealInterceptorChain(interceptors, transmitter, exchange,
            index + 1, request, call, connectTimeout, readTimeout, writeTimeout)
        // 取出当前index对应的interceptor
        val interceptor = interceptors[index]
    
        // 当前拦截器执行拦截操作
        @Suppress("USELESS_ELVIS")
        val response = interceptor.intercept(next) ?: throw NullPointerException("interceptor $interceptor returned null")
    
        // Confirm that the next interceptor made its required call to chain.proceed().
        check(exchange == null || index + 1 >= interceptors.size || next.calls == 1) {"network interceptor $interceptor must call proceed() exactly once"}
    
        check(response.body != null) { "interceptor $interceptor returned a response with no body" }
    
        return response
      }
    

    这里的重点同样只有一行val response = interceptor.intercept(next) ?: throw NullPointerException( "interceptor $interceptor returned null"),我们点开看一下intercept(next) ,这是接口Interceptor的方法,我们随便找一个实现类看,这里用的是简单好理解的LoggingInterceptor。

    private static class LoggingInterceptor implements Interceptor {
        @Override
        public Response intercept(Chain chain) throws IOException {
                // 操作request
                long t1 = System.nanoTime();
                Request request = chain.request();
                logger.info(String.format("Sending request %s on %s%n%s",
                        request.url(), chain.connection(), request.headers()));
    
                // 执行任务链下一个
                Response response = chain.proceed(request);
    
                // 操作response
                long t2 = System.nanoTime();
                logger.info(String.format("Received response for %s in %.1fms%n%s",
                        request.url(), (t2 - t1) / 1e6d, response.headers()));
                return response;
            }
        }
    

    拦截器的 intercept(Chain chain)方法基本分为3部分

    1. 处理request;
    2. 执行任务链下一个,获取response;
    3. 处理response,并返回给上级调用者。

    责任链的工作流程


    未命名文件-6.png

    相关文章

      网友评论

          本文标题:okhttp3学习笔记

          本文链接:https://www.haomeiwen.com/subject/seztzctx.html