美文网首页Android随笔记
okhttp源码分析(-)

okhttp源码分析(-)

作者: 行走的老者 | 来源:发表于2018-07-21 11:39 被阅读9次

    基本使用

    GET请求

    private fun doGet() {
        val client = OkHttpClient()
        val request = Request.Builder()
                .url("http://square.github.io/okhttp/")
                .build()
        val call = client.newCall(request)
    
        //todo 1. 同步请求
    //        val response = call.execute()
        //获取响应结果
        //String
    //        val string = response.body()?.string()
        //byte
    //        val bytes = response.body()?.bytes()
        //IO
    //        val inputStream = response.body()?.byteStream()
    
        //todo 2. 异步请求
        call.enqueue(object : Callback {
            override fun onFailure(call: Call?, e: IOException?) {
                Log.i(TAG, "response onFailure $call: ${e?.message}")
            }
    
            override fun onResponse(call: Call?, response: Response?) {
                Log.i(TAG, "response onResponse $call: ${response?.body()?.string()}")
            }
        })
    }
    

    POST 请求

    private fun doPost() {
        val client = OkHttpClient()
        //表单形式
        val requestBody = FormBody.Builder()
                .add("code", "209")
                .addEncoded(URLEncoder.encode("姓名"), URLEncoder.encode("廖少"))
                .build()
        //JSON形式
    //        val requestBody = RequestBody.create(MediaType.parse("application/json; charset=utf-8"),
    //                "{\"code\":\"209\",\"name\":\"廖少\"}")
    
        //文件形式
    //        val requestBody = RequestBody.create(MediaType.parse("application/octet-stream"),
    //                File("xx/xxx.png"))
    
        val request = Request.Builder()
                .url("http://square.github.io/okhttp/")
                .post(requestBody)
                .build()
        val call = client.newCall(request)
    
        //todo 1. 同步请求
    //        val response = call.execute()
        //获取响应结果
        //String
    //        val string = response.body()?.string()
        //byte
    //        val bytes = response.body()?.bytes()
        //IO
    //        val inputStream = response.body()?.byteStream()
    
        //todo 2. 异步请求
        call.enqueue(object : Callback {
            override fun onFailure(call: Call?, e: IOException?) {
                Log.i(TAG, "response onFailure $call: ${e?.message}")
            }
    
            override fun onResponse(call: Call?, response: Response?) {
                Log.i(TAG, "response onResponse $call: ${response?.body()?.string()}")
            }
        })
    }
    

    内部请求分析

    1. val client = OkHttpClient()
    • 通过OkHttpClient的Builder的默认构造方法来初始化网络所需的各种成员:
    public OkHttpClient() {
        this(new Builder());
    }
    
    ......
    
    public Builder() {
        //线程池,线程调度器
        dispatcher = new Dispatcher();
        //协议,默认http/1.1与HTTP/2
        protocols = DEFAULT_PROTOCOLS;
        //连接规则
        connectionSpecs = DEFAULT_CONNECTION_SPECS;
        //发起dns请求等一系列监听
        eventListenerFactory = EventListener.factory(EventListener.NONE);
        //代理选择器
        proxySelector = ProxySelector.getDefault();
        //Cookie缓存相关
        cookieJar = CookieJar.NO_COOKIES;
        //创建socket工厂,单例模式
        socketFactory = SocketFactory.getDefault();
        //主机校验
        hostnameVerifier = OkHostnameVerifier.INSTANCE;
        //SSL证书相关
        certificatePinner = CertificatePinner.DEFAULT;
        //安全认证相关
        proxyAuthenticator = Authenticator.NONE;
        authenticator = Authenticator.NONE;
        //连接池,默认最大空闲链接5个,连接有限5分钟
        connectionPool = new ConnectionPool();
        dns = Dns.SYSTEM;
        //是否允许SSL重定向
        followSslRedirects = true;
        followRedirects = true;
        //失败是否重试
        retryOnConnectionFailure = true;
        //连接超时时长
        connectTimeout = 10_000;
        //读写超时时长
        readTimeout = 10_000;
        writeTimeout = 10_000;
        pingInterval = 0;
    }
    
    • 构造函数传入Builder赋值OkHttpClient成员变量:
    Builder(OkHttpClient okHttpClient) {
        this.dispatcher = okHttpClient.dispatcher;
        this.proxy = okHttpClient.proxy;
        this.protocols = okHttpClient.protocols;
        this.connectionSpecs = okHttpClient.connectionSpecs;
        this.interceptors.addAll(okHttpClient.interceptors);
        this.networkInterceptors.addAll(okHttpClient.networkInterceptors);
        this.eventListenerFactory = okHttpClient.eventListenerFactory;
        this.proxySelector = okHttpClient.proxySelector;
        this.cookieJar = okHttpClient.cookieJar;
        this.internalCache = okHttpClient.internalCache;
        this.cache = okHttpClient.cache;
        this.socketFactory = okHttpClient.socketFactory;
        this.sslSocketFactory = okHttpClient.sslSocketFactory;
        this.certificateChainCleaner = okHttpClient.certificateChainCleaner;
        this.hostnameVerifier = okHttpClient.hostnameVerifier;
        this.certificatePinner = okHttpClient.certificatePinner;
        this.proxyAuthenticator = okHttpClient.proxyAuthenticator;
        this.authenticator = okHttpClient.authenticator;
        this.connectionPool = okHttpClient.connectionPool;
        this.dns = okHttpClient.dns;
        this.followSslRedirects = okHttpClient.followSslRedirects;
        this.followRedirects = okHttpClient.followRedirects;
        this.retryOnConnectionFailure = okHttpClient.retryOnConnectionFailure;
        this.connectTimeout = okHttpClient.connectTimeout;
        this.readTimeout = okHttpClient.readTimeout;
        this.writeTimeout = okHttpClient.writeTimeout;
        this.pingInterval = okHttpClient.pingInterval;
    }
    
    1. 创建Request对象
    • 通过Request的Builder默认构造方法,设置请求方式为GET与初始化请求头
    public Builder() {
      this.method = "GET";
      this.headers = new Headers.Builder();
    }
    
    • 通过Builder的url()方式设置请求链接,最后调用build()方法返回request对象
    public final class Request {
        final HttpUrl url;
        final String method;
        final Headers headers;
        final @Nullable RequestBody body;
        final Map<Class<?>, Object> tags;
        
        private volatile CacheControl cacheControl; // Lazily initialized.
      
        ......
      
        public Builder url(String url) {
          if (url == null) throw new NullPointerException("url == null");
        
          // Silently replace web socket URLs with HTTP URLs.
          if (url.regionMatches(true, 0, "ws:", 0, 3)) {
            url = "http:" + url.substring(3);
          } else if (url.regionMatches(true, 0, "wss:", 0, 4)) {
            url = "https:" + url.substring(4);
          }
        
          return url(HttpUrl.get(url));
        }
        ......
        public Request build() {
          if (url == null) throw new IllegalStateException("url == null");
          return new Request(this);
        }
        
        ......
        Request(Builder builder) {
            this.url = builder.url;
            this.method = builder.method;
            this.headers = builder.headers.build();
            this.body = builder.body;
            this.tags = Util.immutableMap(builder.tags);
        }
        ......
    }
    
    1. 将Request请求封装:val call = client.newCall(request)
    • 通过OkhttpClient的newCall()方法,构建一个RealCall对象
    @Override public Call newCall(Request request) {
        return RealCall.newRealCall(this, request, false /* for web socket */);
    }
    
    • 创建RealCall对象的同时还会创建一个RetryAndFollowUpInterceptor拦截器
    private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
        this.client = client;
        this.originalRequest = originalRequest;
        this.forWebSocket = forWebSocket;
        this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
    }
    
    static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
        // Safely publish the Call instance to the EventListener.
        RealCall call = new RealCall(client, originalRequest, forWebSocket);
        call.eventListener = client.eventListenerFactory().create(call);
        return call;
      }
    
    1. 执行同步请求:val response = call.execute()
    • 调用RealCall的execute()方法,该方法先检查该RealCall对象是否已经执行过该方法了,重复执行会抛出异常。
    @Override public Response execute() throws IOException {
        synchronized (this) {
        //检查是否已经执行过该请求了
          if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }
        captureCallStackTrace();
        eventListener.callStart(this);
        try {
        //提交网络请求
          client.dispatcher().executed(this);
          Response result = getResponseWithInterceptorChain();
          if (result == null) throw new IOException("Canceled");
          return result;
        } catch (IOException e) {
          eventListener.callFailed(this, e);
          throw e;
        } finally {
          client.dispatcher().finished(this);
        }
    }
    
    • 获取Dispatcher,执行executed()方法
    /**正在执行的同步请求队列,包括取消的请求和还没有完成的请求*/
    private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
    ......  
    //添加在的同步请求队列对尾
    synchronized void executed(RealCall call) {
        runningSyncCalls.add(call);
    }
    
    • 执行RellCall的getResponseWithInterceptorChain()方法
    Response getResponseWithInterceptorChain() throws IOException {
        // Build a full stack of interceptors.
        List<Interceptor> interceptors = new ArrayList<>();
        //初始化OkhttpClient的拦截器
        interceptors.addAll(client.interceptors());
        //重试和重定向的拦截器,最多20次
        interceptors.add(retryAndFollowUpInterceptor);
        //桥接转换拦截器:负责请求构建与响应
        interceptors.add(new BridgeInterceptor(client.cookieJar()));
        //缓存拦截器
        interceptors.add(new CacheInterceptor(client.internalCache()));
        //链接拦截器:负责socket的IO操作,这里使用了Okio提供的封装
        interceptors.add(new ConnectInterceptor(client));
        if (!forWebSocket) {
            //初始化OkhttpClient的网络拦截器
            interceptors.addAll(client.networkInterceptors());
        }
        //服务器回调拦截器:向服务器发送请求,将请求header和body写入socket中,然后读取响应header和body,返回最后需要的响应数据.
        interceptors.add(new CallServerInterceptor(forWebSocket));
        
        //创建RealInterceptorChain对象
        Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
            originalRequest, this, eventListener, client.connectTimeoutMillis(),
            client.readTimeoutMillis(), client.writeTimeoutMillis());
        
        return chain.proceed(originalRequest);
    }
    

    RealInterceptorChain构造方法

    /**
    *@param index 用来标记执行到了Interceptors集合中哪一个拦截器的intercept()方法。
    */
    public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,
      HttpCodec httpCodec, RealConnection connection, int index, Request request, Call call,
      EventListener eventListener, int connectTimeout, int readTimeout, int writeTimeout) {
        this.interceptors = interceptors;
        this.connection = connection;
        this.streamAllocation = streamAllocation;
        this.httpCodec = httpCodec;
        this.index = index;
        this.request = request;
        this.call = call;
        this.eventListener = eventListener;
        this.connectTimeout = connectTimeout;
        this.readTimeout = readTimeout;
        this.writeTimeout = writeTimeout;
    }
    
    • 执行RealInterceptorChain的proceed()方法
    public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
          RealConnection connection) throws IOException {
        if (index >= interceptors.size()) throw new AssertionError();
    
        calls++;
    
        // If we already have a stream, confirm that the incoming request will use it.
        if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
          throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
              + " must retain the same host and port");
        }
    
        // If we already have a stream, confirm that this is the only call to chain.proceed().
        if (this.httpCodec != null && calls > 1) {
          throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
              + " must call proceed() exactly once");
        }
    
        // Call the next interceptor in the chain.执行下一个拦截器
        RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
            connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
            writeTimeout);
        Interceptor interceptor = interceptors.get(index);
        Response response = interceptor.intercept(next);
    
        // Confirm that the next interceptor made its required call to chain.proceed().
        if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
          throw new IllegalStateException("network interceptor " + interceptor
              + " must call proceed() exactly once");
        }
    
        // Confirm that the intercepted response isn't null.
        if (response == null) {
          throw new NullPointerException("interceptor " + interceptor + " returned null");
        }
    
        if (response.body() == null) {
          throw new IllegalStateException(
              "interceptor " + interceptor + " returned a response with no body");
        }
    
        return response;
    }
    

    重点看下这三句代码:

    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);
    

    上面在proceed()方法里面有创建了新的RealInterceptorChain对象,且将index=index+1,而且RealInterceptorChain作为参数传给当前执行的第index个拦截器,而拦截器执行了intercept()方法,在这个方法里面有执行了RealInterceptorChain的proceed()方法,如此递归直到执行完所有的拦截器。

    • 下面重点来看下连接拦截器ConnectInterceptor
    public final class ConnectInterceptor implements Interceptor {
      public final OkHttpClient client;
    
      public ConnectInterceptor(OkHttpClient client) {
        this.client = client;
      }
    
      @Override public Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        Request request = realChain.request();
        StreamAllocation streamAllocation = realChain.streamAllocation();
    
        // We need the network to satisfy this request. Possibly for validating a conditional GET.
        boolean doExtensiveHealthChecks = !request.method().equals("GET");
        HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
        RealConnection connection = streamAllocation.connection();
    
        return realChain.proceed(request, streamAllocation, httpCodec, connection);
      }
    }
    

    具体的封装细节,后面单独留一个文章再叙述,现在先按流程走。最后执行CallServerInterceptor这个拦截器,通过将请求headers与body写入socket,向服务端发起请求,然后读取返回的header与body。

     @Override public Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        HttpCodec httpCodec = realChain.httpStream();
        StreamAllocation streamAllocation = realChain.streamAllocation();
        RealConnection connection = (RealConnection) realChain.connection();
        Request request = realChain.request();
    
        long sentRequestMillis = System.currentTimeMillis();
    
        //向socket写入header
        realChain.eventListener().requestHeadersStart(realChain.call());
        httpCodec.writeRequestHeaders(request);
        realChain.eventListener().requestHeadersEnd(realChain.call(), request);
    
        Response.Builder responseBuilder = null;
        if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
          // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
          // Continue" response before transmitting the request body. If we don't get that, return
          // what we did get (such as a 4xx response) without ever transmitting the request body.
          if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
            httpCodec.flushRequest();
            realChain.eventListener().responseHeadersStart(realChain.call());
            responseBuilder = httpCodec.readResponseHeaders(true);
          }
    
          if (responseBuilder == null) {
          //向socket写入body
            // Write the request body if the "Expect: 100-continue" expectation was met.
            realChain.eventListener().requestBodyStart(realChain.call());
            long contentLength = request.body().contentLength();
            CountingSink requestBodyOut =
                new CountingSink(httpCodec.createRequestBody(request, contentLength));
            BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
    
            request.body().writeTo(bufferedRequestBody);
            bufferedRequestBody.close();
            realChain.eventListener()
                .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
          } else if (!connection.isMultiplexed()) {
            // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
            // from being reused. Otherwise we're still obligated to transmit the request body to
            // leave the connection in a consistent state.
            streamAllocation.noNewStreams();
          }
        }
    
        httpCodec.finishRequest();
    
        //读取服务端返回的header与body
        if (responseBuilder == null) {
          realChain.eventListener().responseHeadersStart(realChain.call());
          responseBuilder = httpCodec.readResponseHeaders(false);
        }
    
        Response response = responseBuilder
            .request(request)
            .handshake(streamAllocation.connection().handshake())
            .sentRequestAtMillis(sentRequestMillis)
            .receivedResponseAtMillis(System.currentTimeMillis())
            .build();
    
        int code = response.code();
        if (code == 100) {
          // server sent a 100-continue even though we did not request one.
          // try again to read the actual response
          responseBuilder = httpCodec.readResponseHeaders(false);
    
          response = responseBuilder
                  .request(request)
                  .handshake(streamAllocation.connection().handshake())
                  .sentRequestAtMillis(sentRequestMillis)
                  .receivedResponseAtMillis(System.currentTimeMillis())
                  .build();
    
          code = response.code();
        }
    
        realChain.eventListener()
                .responseHeadersEnd(realChain.call(), response);
    
        if (forWebSocket && code == 101) {
          // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
          response = response.newBuilder()
              .body(Util.EMPTY_RESPONSE)
              .build();
        } else {
          response = response.newBuilder()
              .body(httpCodec.openResponseBody(response))
              .build();
        }
    
        if ("close".equalsIgnoreCase(response.request().header("Connection"))
            || "close".equalsIgnoreCase(response.header("Connection"))) {
          streamAllocation.noNewStreams();
        }
    
        if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
          throw new ProtocolException(
              "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
        }
    
        return response;
      }
    
    • 执行结束后返回Response对象,不管执行成功或者失败都会回调到 client.dispatcher().finished(this);
    void finished(RealCall call) {
        finished(runningSyncCalls, call, false);
    }
    
    private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
        int runningCallsCount;
        Runnable idleCallback;
        synchronized (this) {
            //从runningSyncCalls队列中移除当前请求的RealCall对象
          if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
          //这里是异步的时候执行
          if (promoteCalls) promoteCalls();
          runningCallsCount = runningCallsCount();
          idleCallback = this.idleCallback;
        }
        
        if (runningCallsCount == 0 && idleCallback != null) {
          idleCallback.run();
        }
    }
    
    1. 同步请求步骤总结
    • 通过Builder创建OkHttpClient与Request对象
    • 调用OkHttpClient的newCall方法创建RealCall对象,同时在RealCall构造方法里面创建了RetryAndFollowUpInterceptor拦截器
    • 调用RealCall的execute方法,在方法里面先判断该请求是否已经执行了,如果已经执行了则抛出IllegalStateException("Already Executed")异常
    • RealCall的execute方法里面调用了client.dispatcher()获取Dispatcher对象,在调用Dispatcher的executed方法,向正在运行的同步请求队列runningSyncCalls中添加
    • 调用RealCall的getResponseWithInterceptorChain方法,执行请求的拦截器链,结束后返回Response
    • 最后调用Dispatcher的finished方法,使该请求从runningSyncCalls队列中移除,请求结束,所以在整个同步请求过程中,Dispatcher扮演的角色主要是同步队列的添加与移除

    异步请求分析

    关于异步请求,其实前面创建OkhttpClient,Request,RealCall创建都是一样的,区别在于在RealCall里面执行的方法不一样,下面我们重点来看下RealCall的enqueue方法:

    @Override public void enqueue(Callback responseCallback) {
        synchronized (this) {
            if (executed) throw new IllegalStateException("Already Executed");
            executed = true;
        }
        captureCallStackTrace();
        eventListener.callStart(this);
        client.dispatcher().enqueue(new AsyncCall(responseCallback));
    }
    
    1. 同样也是先去判断是否已经执行了,然后调用Dispatcher对象的enqueue方法
    • 在看enqueue方法之前,我们先来看下AsyncCall是什么东西?
      final class AsyncCall extends NamedRunnable {
        private final Callback responseCallback;
    
        AsyncCall(Callback responseCallback) {
          super("OkHttp %s", redactedUrl());
          this.responseCallback = responseCallback;
        }
        ......
      }
      
      public abstract class NamedRunnable implements Runnable {
        ......
      }
    

    从上面我们可以看出AsyncCall其实就是Runnable,其实想想也是,毕竟后面是需要加入线程池中执行的。

    • 然后我们回来看enqueue方法
      synchronized void enqueue(AsyncCall call) {
      //判断当前运行的异步线程任务执行条件
        if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
          runningAsyncCalls.add(call);
          executorService().execute(call);
        } else {
          readyAsyncCalls.add(call);
        }
      }
    

    方法里面首先判断当前运行的异步队列大小是否超过了maxRequests(源码里面定义64)最大请求以及同一个域名下的请求是否超过了maxRequestsPerHost(固定值5)最大请求,如果都满足了这两个条件,则将请求任务加入runningAsyncCalls队列中,然后调用线程池执行该请求;如果不满足这两个条件,则将任务加入到准备执行异步请求队列readyAsyncCalls中等待调度执行。

    这里为什么要加入这两个条件呢?要弄明白这个我们先来看下Dispatcher中创建线程池的代码:

      public synchronized ExecutorService executorService() {
        if (executorService == null) {
          executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
              new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
        }
        return executorService;
      }
    

    可以看到它是单列模式创建的,而且corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAliveTime=60s,也就是说该线程池对于任务请求大小没有任何限制,可以无限添加,另外他们的空闲存活时长只有60秒,这样是为了最大提升OkHttp的请求吞吐且60s能及时清理掉线程池占用的内存,而这样就会导致一个问题,当大量请求过来的时候,就会无限的创建线程执行,这显然是不行了,会导致内存占用不断扩大,所以就在执行的时候添加了上面两个条件的限制,最大请求不超过64个,同一个域名下请求不超过5个,这个就很好的解决了并发的问题。

    1. 执行AsyncCall的execute()方法
      AsyncCall这个是继承自Runnable的,也就是说会执行run方法,到时AsyncCall没有run方法,而是execute方法,这是因为在NamedRunnable类中run方法调用了execute方法,所以最终是执行了execute方法:
        @Override protected void execute() {
          boolean signalledCallback = false;
          try {
            Response response = getResponseWithInterceptorChain();
            if (retryAndFollowUpInterceptor.isCanceled()) {
              signalledCallback = true;
              responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
            } else {
              signalledCallback = true;
              responseCallback.onResponse(RealCall.this, response);
            }
          } catch (IOException e) {
            if (signalledCallback) {
              // Do not signal the callback twice!
              Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
            } else {
              eventListener.callFailed(RealCall.this, e);
              responseCallback.onFailure(RealCall.this, e);
            }
          } finally {
            client.dispatcher().finished(this);
          }
        }
    

    代码上可以看出,同样是调用了getResponseWithInterceptorChain()方法,即执行请求拦截器链,返回Response,另外根据判断调用相应的回调方法。

    1. 调用Dispatcher的finished方法,从正在异步队列runningAsyncCalls中移除当前请求RealCall对象
      /** Used by {@code AsyncCall#run} to signal completion. */
      void finished(AsyncCall call) {
        finished(runningAsyncCalls, call, true);
      }
    
    private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
        int runningCallsCount;
        Runnable idleCallback;
        synchronized (this) {
          if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
          if (promoteCalls) promoteCalls();
          runningCallsCount = runningCallsCount();
          idleCallback = this.idleCallback;
        }
    
        if (runningCallsCount == 0 && idleCallback != null) {
          idleCallback.run();
        }
      }
    

    另外异步的时候调用的时候promoteCalls=true的,进而执行了promoteCalls()方法,下面我们看下这个方法:

      private void promoteCalls() {
      //条件判断
        if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
        if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
        
        for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
          AsyncCall call = i.next();
    
          if (runningCallsForHost(call) < maxRequestsPerHost) {
          //轮询从等待队列中获取且移除,另外将请求Call加入到正在运行队列runningAsyncCalls中,加入线程池执行
            i.remove();
            runningAsyncCalls.add(call);
            executorService().execute(call);
          }
    
          if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
        }
      }
    

    从这里可以看出是将任务取出执行,直到所有的任务结束,这即便是Dispatcher的任务调度功能。

    1. 异步请求步骤
    • 通过Builder创建OkHttpClient与Request对象
    • 调用OkHttpClient的newCall方法创建RealCall对象,同时在RealCall构造方法里面创建了RetryAndFollowUpInterceptor拦截器
    • 调用RealCall的enqueue方法,在方法里面先判断该请求是否已经执行了,如果已经执行了则抛出IllegalStateException("Already Executed")异常
    • RealCall的enqueue方法里面调用了client.dispatcher()获取Dispatcher对象,在调用Dispatcher的enqueue方法,且传入AsyncCall对象
    • 在enqueue方法里面判断正在请求的任务大小是否满足条件,满足条件则加入runningAsyncCalls队列,加入线程池执行,否则加入readyAsyncCalls队列中,等待Dispatcher的调度
    • 随后执行AsyncCall的execute方法,在execute方法里面调用getResponseWithInterceptorChain方法,执行请求的拦截器链,结束后返回Response
    • 最后调用Dispatcher的finished方法,使该请求从runningSyncCalls队列中移除,且通过轮询获取下一个任务,添加至runningAsyncCalls队列中且执行该请求,所以在整个异步请求过程中,Dispatcher扮演的角色主要是同步队列的添加与移除,以及对队列任务进行调度

    相关文章

      网友评论

        本文标题:okhttp源码分析(-)

        本文链接:https://www.haomeiwen.com/subject/rochmftx.html