美文网首页
okhttp——任务模型

okhttp——任务模型

作者: oceanLong | 来源:发表于2019-04-18 18:02 被阅读0次

    简介

    okhttp是Android中应用最广的http网络请求框架。结构优雅,性能强大。我们通过阅读它,对网络库的架构进行学习。本篇主要阅读okhttp任务队列的结构,了解okhttp的任务调度。

    基本结构

    网络请求是一个典型的生产/消费模型。我们的每一个请求都会加入一个缓冲队列,然后在合适的时候进行消费。

    我们以异步请求为例:

    基本架构

    OKHttpClient提供方法生产Call,Dispatcher负责管理、分发和执行AsyncCall

    下面,我们来细致地看一下代码。

    详细代码

    基本调用

    还是以异步任务为例,一次典型的okhttp的调用:

    String url = "http://wwww.baidu.com";
    OkHttpClient okHttpClient = new OkHttpClient();
    final Request request = new Request.Builder()
            .url(url)
            .get()//默认就是GET请求,可以不写
            .build();
    Call call = okHttpClient.newCall(request);
    call.enqueue(new Callback() {
        @Override
        public void onFailure(Call call, IOException e) {
            Log.d(TAG, "onFailure: ");
        }
    
        @Override
        public void onResponse(Call call, Response response) throws IOException {
            Log.d(TAG, "onResponse: " + response.body().string());
        }
    });
    

    我们可以看到,这个过程大致分为两部,生成一个新的call,将call加入队列。

    Request

    Request是okhttp中的http请求封装类,它管理了http协议中的参数,如Header,RequestBody等

    class Request internal constructor(
      internal val url: HttpUrl,
      builder: Builder
    ) {
      internal val method: String = builder.method
      internal val headers: Headers = builder.headers.build()
      internal val body: RequestBody? = builder.body
      internal val tags: Map<Class<*>, Any> = Util.immutableMap(builder.tags)
    
      ......
    
    }
    

    OKHttpClient

    OKHttpClient是okhttp中总管类,管理了okhttp几乎所有抽象模块和各种配置。

    
    open class OkHttpClient internal constructor(
      builder: Builder
    ) : Cloneable, Call.Factory, WebSocket.Factory {
      private val dispatcher: Dispatcher = builder.dispatcher
      private val proxy: Proxy? = builder.proxy
      private val protocols: List<Protocol> = builder.protocols
      private val connectionSpecs: List<ConnectionSpec> = builder.connectionSpecs
      private val interceptors: List<Interceptor> =
          Util.immutableList(builder.interceptors)
      private val networkInterceptors: List<Interceptor> =
          Util.immutableList(builder.networkInterceptors)
      private val eventListenerFactory: EventListener.Factory = builder.eventListenerFactory
      private val proxySelector: ProxySelector = builder.proxySelector
      private val cookieJar: CookieJar = builder.cookieJar
      private val cache: Cache? = builder.cache
      private val socketFactory: SocketFactory = builder.socketFactory
      private val sslSocketFactory: SSLSocketFactory?
      private val hostnameVerifier: HostnameVerifier = builder.hostnameVerifier
      private val certificatePinner: CertificatePinner
      private val proxyAuthenticator: Authenticator = builder.proxyAuthenticator
      private val authenticator: Authenticator = builder.authenticator
      private val connectionPool: ConnectionPool = builder.connectionPool
      private val dns: Dns = builder.dns
      private val followSslRedirects: Boolean = builder.followSslRedirects
      private val followRedirects: Boolean = builder.followRedirects
      private val retryOnConnectionFailure: Boolean = builder.retryOnConnectionFailure
      private val callTimeout: Int = builder.callTimeout
      private val connectTimeout: Int = builder.connectTimeout
      private val readTimeout: Int = builder.readTimeout
      private val writeTimeout: Int = builder.writeTimeout
      private val pingInterval: Int = builder.pingInterval
      private val internalCache: InternalCache? = builder.internalCache
      private val certificateChainCleaner: CertificateChainCleaner?
      ......
    
      /** Prepares the [request] to be executed at some point in the future. */
      override fun newCall(request: Request): Call {
        return RealCall.newRealCall(this, request, false /* for web socket */)
      }
    
    }
    

    RealCall

    RealCall是okhttp中任务的封装类

      companion object {
        fun newRealCall(
          client: OkHttpClient,
          originalRequest: Request,
          forWebSocket: Boolean
        ): RealCall {
          // Safely publish the Call instance to the EventListener.
          return RealCall(client, originalRequest, forWebSocket).apply {
            transmitter = Transmitter(client, this)
          }
        }
      }
    
    internal class RealCall private constructor(
      val client: OkHttpClient,
      /** The application's original request unadulterated by redirects or auth headers.  */
      val originalRequest: Request,
      val forWebSocket: Boolean
    ) : Call {
      /**
       * There is a cycle between the [Call] and [Transmitter] that makes this awkward.
       * This is set after immediately after creating the call instance.
       */
      private lateinit var transmitter: Transmitter
    
      // Guarded by this.
      var executed: Boolean = false
    
      @Synchronized override fun isExecuted(): Boolean = executed
    
      override fun isCanceled(): Boolean = transmitter.isCanceled
    
      override fun request(): Request = originalRequest
    
      override fun execute(): Response {
        synchronized(this) {
          check(!executed) { "Already Executed" }
          executed = true
        }
        transmitter.timeoutEnter()
        transmitter.callStart()
        try {
          client.dispatcher().executed(this)
          return getResponseWithInterceptorChain()
        } finally {
          client.dispatcher().finished(this)
        }
      }
    
      override fun enqueue(responseCallback: Callback) {
        synchronized(this) {
          check(!executed) { "Already Executed" }
          executed = true
        }
        transmitter.callStart()
        client.dispatcher().enqueue(AsyncCall(responseCallback))
      }
    
      override fun cancel() {
        transmitter.cancel()
      }
    ...
    }
    

    在异步任务的例子中,我们通常使用enqueue将我们的请求加入请求队列。
    client.dispatcher().enqueue(AsyncCall(responseCallback))
    这句话中,我们可以看到,我们新建了AsyncCall加入到OKHttpClient的dispatcher的队列中。

    我们先看AsyncCall封装了什么

      internal inner class AsyncCall(
        private val responseCallback: Callback
      ) : NamedRunnable("OkHttp %s", redactedUrl()) {
        @Volatile private var callsPerHost = AtomicInteger(0)
    
        fun callsPerHost(): AtomicInteger = callsPerHost
    
        fun reuseCallsPerHostFrom(other: AsyncCall) {
          this.callsPerHost = other.callsPerHost
        }
    
        fun host(): String = originalRequest.url().host()
    
        fun request(): Request = originalRequest
    
        fun get(): RealCall = this@RealCall
    
        /**
         * Attempt to enqueue this async call on `executorService`. This will attempt to clean up
         * if the executor has been shut down by reporting the call as failed.
         */
        fun executeOn(executorService: ExecutorService) {
          assert(!Thread.holdsLock(client.dispatcher()))
          var success = false
          try {
            executorService.execute(this)
            success = true
          } catch (e: RejectedExecutionException) {
            val ioException = InterruptedIOException("executor rejected")
            ioException.initCause(e)
            transmitter.noMoreExchanges(ioException)
            responseCallback.onFailure(this@RealCall, ioException)
          } finally {
            if (!success) {
              client.dispatcher().finished(this) // This call is no longer running!
            }
          }
        }
    
        override fun execute() {
          var signalledCallback = false
          transmitter.timeoutEnter()
          try {
            val response = getResponseWithInterceptorChain()
            signalledCallback = true
            responseCallback.onResponse(this@RealCall, response)
          } catch (e: IOException) {
            if (signalledCallback) {
              // Do not signal the callback twice!
              Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e)
            } else {
              responseCallback.onFailure(this@RealCall, e)
            }
          } finally {
            client.dispatcher().finished(this)
          }
        }
      }
    
    

    我们可以看到,AsyncCall主要提供了executeOnexecute两个方法。

    executeOn

    executeOnAsyncCall自身的方法。它的主要内容是对异常的处理。如果一切正常,就用传入的ExecutorService执行当前AsyncCall

    execute

    execute是实现的Runnable的run方法。

    public abstract class NamedRunnable implements Runnable {
      protected final String name;
    
      public NamedRunnable(String format, Object... args) {
        this.name = Util.format(format, args);
      }
    
      @Override public final void run() {
        String oldName = Thread.currentThread().getName();
        Thread.currentThread().setName(name);
        try {
          execute();
        } finally {
          Thread.currentThread().setName(oldName);
        }
      }
    
      protected abstract void execute();
    }
    

    我们可以看到,这个方法,只是为了给线程起名。
    AsyncCall在线程池中执行时,execute方法就会被调用。execute方法的主要逻辑是:
    getResponseWithInterceptorChain,它的实现,我们再不展开,属于okhttp网络能力了。

    铺垫了上面这么多,我们终于可以看看okhttp的任务队列如何设计的了。

    Dispatcher

      private var idleCallback: Runnable? = null
    
      /** Executes calls. Created lazily.  */
      private var executorService: ExecutorService? = null
    
      /** Ready async calls in the order they'll be run.  */
      private val readyAsyncCalls = ArrayDeque<AsyncCall>()
    
      /** Running asynchronous calls. Includes canceled calls that haven't finished yet.  */
      private val runningAsyncCalls = ArrayDeque<AsyncCall>()
    
      /** Running synchronous calls. Includes canceled calls that haven't finished yet.  */
      private val runningSyncCalls = ArrayDeque<RealCall>()
    
      internal fun enqueue(call: AsyncCall) {
        synchronized(this) {
          readyAsyncCalls.add(call)
    
          // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
          // the same host.
          if (!call.get().forWebSocket) {
            val existingCall = findExistingCallWithHost(call.host())
            if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
          }
        }
        promoteAndExecute()
      }
    
    

    这一段主要是AsyncCall的callsPerHost复用逻辑,注释说明比较清晰。如果有相同的AsyncCall存在于runningAsyncCalls中,则会增加callsPerHost。接下来,看一下promoteAndExecute的实现:

      private fun promoteAndExecute(): Boolean {
        assert(!Thread.holdsLock(this))
    
        val executableCalls = ArrayList<AsyncCall>()
        val isRunning: Boolean
        synchronized(this) {
          val i = readyAsyncCalls.iterator()
          while (i.hasNext()) {
            val asyncCall = i.next()
    
            if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
            if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.
    
            i.remove()
            asyncCall.callsPerHost().incrementAndGet()
            executableCalls.add(asyncCall)
            runningAsyncCalls.add(asyncCall)
          }
          isRunning = runningCallsCount() > 0
        }
    
        for (i in 0 until executableCalls.size) {
          val asyncCall = executableCalls[i]
          asyncCall.executeOn(executorService())
        }
    
        return isRunning
      }
    

    promoteAndExecute方法,是将readyAsyncCalls队列中的任务,在最大任务数没有超标的情况下,移入runningAsyncCalls队列中。并对涉及转移的方法,调用executeOn方法。executeOn被调用后,就会执行到,RealCall中AsyncCall中的execute方法了。

    小结

    以上就是okhttp在请求和任务上的基本结构,还没有涉及到具体的网络请求。核心类是RealCallDispatcher,通过ArrayDequeAsyncCall完成任务的调度。

    如有问题,欢迎指正。

    相关文章

      网友评论

          本文标题:okhttp——任务模型

          本文链接:https://www.haomeiwen.com/subject/ktlnwqtx.html