解刨OkHttp框架

作者: Jack921 | 来源:发表于2018-05-22 15:14 被阅读108次

    继AsyncTask,又把手术刀指向OkHttp,有时候解析源码会上瘾。因为源码里包含的东西仿佛就是组成计算机世界的砖头,水分,只要有这些东西,就可以保罗万物,无招胜有招。又说多了,开始吧

    首先okhttp的依赖是:

    compile 'com.squareup.okhttp3:okhttp:3.8.1'
    

    我就是根据这里的源码进行解析的。

    再来也很简单,就是最简单的OkHttp的同步和异步网络访问:

    OkHttpClient client = new OkHttpClient();
    
     //同步网络访问
    public String Synch(String url) throws IOException {
        Request request = new Request.Builder().url(url).build();
        Response response = client.newCall(request).execute();
        return response.body().string();
    }
    
    //异步网络访问
    public void Async(String url) throws IOException {
        Request request = new Request.Builder().url(url).build();
        client.newCall(request).enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {}
            @Override
            public void onResponse(Call call, Response response) throws IOException {
                Log.e("response",response.body().string());
            }
        });
    }
    

    OkHttpClient

    我们首先是定义OkHttpClient,通过看源码如下:

    public OkHttpClient() {
        this(new Builder());
    }
    
     public static final class Builder {
        Dispatcher dispatcher;
        @Nullable Proxy proxy;
        List<Protocol> protocols;
        List<ConnectionSpec> connectionSpecs;
        final List<Interceptor> interceptors = new ArrayList<>();
        final List<Interceptor> networkInterceptors = new ArrayList<>();
        EventListener.Factory eventListenerFactory;
        ProxySelector proxySelector;
        CookieJar cookieJar;
        @Nullable Cache cache;
        @Nullable InternalCache internalCache;
        SocketFactory socketFactory;
        @Nullable SSLSocketFactory sslSocketFactory;
        @Nullable CertificateChainCleaner certificateChainCleaner;
        HostnameVerifier hostnameVerifier;
        CertificatePinner certificatePinner;
        Authenticator proxyAuthenticator;
        Authenticator authenticator;
        ConnectionPool connectionPool;
        Dns dns;
        boolean followSslRedirects;
        boolean followRedirects;
        boolean retryOnConnectionFailure;
        int connectTimeout;
        int readTimeout;
        int writeTimeout;
        int pingInterval;
    
        public Builder() {
          dispatcher = new Dispatcher();
          protocols = DEFAULT_PROTOCOLS;
          connectionSpecs = DEFAULT_CONNECTION_SPECS;
          eventListenerFactory = EventListener.factory(EventListener.NONE);
          proxySelector = ProxySelector.getDefault();
          cookieJar = CookieJar.NO_COOKIES;
          socketFactory = SocketFactory.getDefault();
          hostnameVerifier = OkHostnameVerifier.INSTANCE;
          certificatePinner = CertificatePinner.DEFAULT;
          proxyAuthenticator = Authenticator.NONE;
          authenticator = Authenticator.NONE;
          connectionPool = new ConnectionPool();
          dns = Dns.SYSTEM;
          followSslRedirects = true;
          followRedirects = true;
          retryOnConnectionFailure = true;
          connectTimeout = 10_000;
          readTimeout = 10_000;
          writeTimeout = 10_000;
          pingInterval = 0;
        }
         
     }
    
    OkHttpClient(Builder builder) {
        this.dispatcher = builder.dispatcher;
        this.proxy = builder.proxy;
        this.protocols = builder.protocols;
        this.connectionSpecs = builder.connectionSpecs;
        this.interceptors = Util.immutableList(builder.interceptors);
        this.networkInterceptors = Util.immutableList(builder.networkInterceptors);
        this.eventListenerFactory = builder.eventListenerFactory;
        this.proxySelector = builder.proxySelector;
        this.cookieJar = builder.cookieJar;
        this.cache = builder.cache;
        this.internalCache = builder.internalCache;
        this.socketFactory = builder.socketFactory;
    
        boolean isTLS = false;
        for (ConnectionSpec spec : connectionSpecs) {
          isTLS = isTLS || spec.isTls();
        }
    
        if (builder.sslSocketFactory != null || !isTLS) {
          this.sslSocketFactory = builder.sslSocketFactory;
          this.certificateChainCleaner = builder.certificateChainCleaner;
        } else {
          X509TrustManager trustManager = systemDefaultTrustManager();
          this.sslSocketFactory = systemDefaultSslSocketFactory(trustManager);
          this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
        }
    
        this.hostnameVerifier = builder.hostnameVerifier;
        this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
            certificateChainCleaner);
        this.proxyAuthenticator = builder.proxyAuthenticator;
        this.authenticator = builder.authenticator;
        this.connectionPool = builder.connectionPool;
        this.dns = builder.dns;
        this.followSslRedirects = builder.followSslRedirects;
        this.followRedirects = builder.followRedirects;
        this.retryOnConnectionFailure = builder.retryOnConnectionFailure;
        this.connectTimeout = builder.connectTimeout;
        this.readTimeout = builder.readTimeout;
        this.writeTimeout = builder.writeTimeout;
        this.pingInterval = builder.pingInterval;
    }
    
    

    看出OkHttpClient主要是进行各种参数的初始化,通过builder对象进行初始化再赋值给OkHttpClient。第二种也可以用建造者模式,初始化OkHttpClient,如下:

    OkHttpClient okHttpClient=new OkHttpClient.Builder().build();
    

    注意事项:OkHttpClient强烈建议全局单例使用,因为每一个OkHttpClient都有自己单独的连接池和线程池,复用连接池和线程池能够减少延迟、节省内存。

    Request

    每一个HTTP请求包含一个URL、一个方法(GET或POST或其他)、一些HTTP头。请求还可能包含一个特定内容类型的数据类的主体部分。而

    Request request = new Request.Builder().url(url).build();
    

    和OkHttpClient一样,只是做这些东西的初始化。因为很简单,所以这就不像上面列源码细讲了。

    同步网络访问

    Request request = new Request.Builder().url(url).build();
    Response response = client.newCall(request).execute();
    response.body().string();
    

    接着我们拿同步网络进行下去,OkHttpClient调用了newCall();代码如下:

     @Override public Call newCall(Request request) {
        return new RealCall(this, request, false /* for web socket */);
     }
    

    然后调用了RealCall的execute()方法,方法源码如下:

    @Override 
    public Response execute() throws IOException {
        synchronized (this) {
          if (executed) throw new IllegalStateException("Already Executed");
          executed= = true;
        }
        captureCallStackTrace();
        try {
          client.dispatcher().executed(this);
          Response result = getResponseWithInterceptorChain();
          if (result == null) throw new IOException("Canceled");
          return result;
        } finally {
          client.dispatcher().finished(this);
        }
    }
    

    首先executed=ture,确保每一个call对象只能使用一次原则,然后就调用了captureCallStackTrace(),源码如下:

    private void captureCallStackTrace() {
        Object callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()");
        retryAndFollowUpInterceptor.setCallStackTrace(callStackTrace);
    }
    
    public Object getStackTraceForCloseable(String closer) {
        if (logger.isLoggable(Level.FINE)) {
          return new Throwable(closer); // These are expensive to allocate.
        }
        return null;
    }
    
    public final class RetryAndFollowUpInterceptor implements Interceptor {
      public void setCallStackTrace(Object callStackTrace) {
        this.callStackTrace = callStackTrace;
      }
    }
    
    

    可以看出captureCallStackTrace什么也没有做,只是把一个对象传进retryAndFollowUpInterceptor,
    其中他这个的作用就像他名字一样,就是一个堆栈跟踪,捕获了这个请求的StackTrace。

    接着 client.dispatcher().executed(this); 对应的源码如下:

    public final class Dispatcher { 
      private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();    
      synchronized void executed(RealCall call) {
        runningSyncCalls.add(call);
      }
    }
    

    他这也是把请求放在一个双向队列里。也没做什么操作。
    然后就是重点了,可以说整个网络请求获取数据都是靠 Response result = getResponseWithInterceptorChain();这句代码,Response装载了所有的访问数据,而getResponseWithInterceptorChain()做了什么? 源码如下:

    Response getResponseWithInterceptorChain() throws IOException {
        // Build a full stack of interceptors.
        List<Interceptor> interceptors = new ArrayList<>();
        //添加开发者应用层自定义的Interceptor
        interceptors.addAll(client.interceptors());
        //这个Interceptor是处理请求失败的重试,重定向    
        interceptors.add(retryAndFollowUpInterceptor);
        //这个Interceptor工作是添加一些请求的头部或其他信息
        //并对返回的Response做一些友好的处理(有一些信息你可能并不需要)
        interceptors.add(new BridgeInterceptor(client.cookieJar()));
        //这个Interceptor的职责是判断缓存是否存在,读取缓存,更新缓存等等
        interceptors.add(new CacheInterceptor(client.internalCache()));
        //这个Interceptor的职责是建立客户端和服务器的连接
        interceptors.add(new ConnectInterceptor(client));
        if (!forWebSocket) {
          //添加开发者自定义的网络层拦截器
          interceptors.addAll(client.networkInterceptors());
        }
        interceptors.add(new CallServerInterceptor(forWebSocket));
        //一个包裹这request的chain
        Interceptor.Chain chain = new RealInterceptorChain(
            interceptors, null, null, null, 0, originalRequest);
        //把chain传递到第一个Interceptor手中
        return chain.proceed(originalRequest);
    }
    

    看上面的代码,不断的添加各种拦截器,最后就创建RealInterceptorChain然后调用proceed(),先看一下RealInterceptorChain类,

    public final class RealInterceptorChain implements Interceptor.Chain {
      private final List<Interceptor> interceptors;
      private final StreamAllocation streamAllocation;
      private final HttpCodec httpCodec;
      private final RealConnection connection;
      private final int index;
      private final Request request;
      private int calls;
    
      public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,
          HttpCodec httpCodec, RealConnection connection, int index, Request request) {
        this.interceptors = interceptors;
        this.connection = connection;
        this.streamAllocation = streamAllocation;
        this.httpCodec = httpCodec;
        this.index = index;
        this.request = request;
      }
    
      @Override public Connection connection() {
        return connection;
      }
    
      public StreamAllocation streamAllocation() {
        return streamAllocation;
      }
    
      public HttpCodec httpStream() {
        return httpCodec;
      }
    
      @Override public Request request() {
        return request;
      }
    
      @Override public Response proceed(Request request) throws IOException {
        return proceed(request, streamAllocation, httpCodec, connection);
      }
    
      public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
          RealConnection connection) throws IOException {
        if (index >= interceptors.size()) throw new AssertionError();
    
        calls++;
    
        // If we already have a stream, confirm that the incoming request will use it.
        if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
          throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
              + " must retain the same host and port");
        }
    
        // If we already have a stream, confirm that this is the only call to chain.proceed().
        if (this.httpCodec != null && calls > 1) {
          throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
              + " must call proceed() exactly once");
        }
    
        // Call the next interceptor in the chain.
        RealInterceptorChain next = new RealInterceptorChain(
            interceptors, streamAllocation, httpCodec, connection, index + 1, request);
        Interceptor interceptor = interceptors.get(index);
        Response response = interceptor.intercept(next);
    
        // Confirm that the next interceptor made its required call to chain.proceed().
        if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
          throw new IllegalStateException("network interceptor " + interceptor
              + " must call proceed() exactly once");
        }
    
        // Confirm that the intercepted response isn't null.
        if (response == null) {
          throw new NullPointerException("interceptor " + interceptor + " returned null");
        }
    
        return response;
      }
    }
    
    

    从proceed()看创建RealInterceptorChain对象时候httpCodec直接赋予了null,所以略过判断,然后调用了这个interceptor.intercept(next);方法,就是执行前面添加的拦截器的intercept()方法; 而每一个拦截器的intercept()又会调用下一个拦截器的proceed(),下一个拦截器的proceed()又调用这个拦截器的intercept(),由此类推一个一个往下调。最后一个返回结果,在一层一层向上返回.

    如下面的关键代码,每个拦截器都有对应的代码一步步的调下一个拦截器。

    public interface Interceptor {
      Response intercept(Chain chain) throws IOException;
    
      interface Chain {
        Request request();
    
        Response proceed(Request request) throws IOException;
    
        @Nullable Connection connection();
      }
    }
    
    RealInterceptorChain implements Interceptor.Chain{
        public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
          RealConnection connection) throws IOException {
            RealInterceptorChain next = new RealInterceptorChain(
                interceptors, streamAllocation, httpCodec, connection, index + 1, request);
            Interceptor interceptor = interceptors.get(index);
            Response response = interceptor.intercept(next);
          }
    }
    
    public final class RetryAndFollowUpInterceptor implements Interceptor {
        @Override 
        public Response intercept(Chain chain) throws IOException {
            Response response = ((RealInterceptorChain) chain).proceed(request,streamAllocation,null,null);
        }
    }
    
    public final class BridgeInterceptor implements Interceptor {
        @Override 
        public Response intercept(Chain chain) throws IOException {
            Request.Builder requestBuilder = userRequest.newBuilder();
            Response networkResponse = chain.proceed(requestBuilder.build());
        }
    }
    
    public final class CacheInterceptor implements Interceptor {
        @Override 
        public Response intercept(Chain chain) throws IOException {
        Response networkResponse = null;
        networkResponse = chain.proceed(networkRequest);
        }
    }
    

    用别人的一张图就是这样:


    流程图.PNG

    各个拦截器形成拦截器链,OkHttp的这种拦截器链采用的是责任链模式,这样的好处是将请求的发送和处理分开,并且可以动态添加中间的处理方实现对请求的处理、短路等操作。

    最后的client.dispatcher().finished(this);源码如下:

    private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
        int runningCallsCount;
        Runnable idleCallback;
        synchronized (this) {
          if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
          if (promoteCalls) promoteCalls();
          runningCallsCount = runningCallsCount();
          idleCallback = this.idleCallback;
        }
    
        if (runningCallsCount == 0 && idleCallback != null) {
          idleCallback.run();
        }
    }
    
    private void promoteCalls() {
        if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
        if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
    
        for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
          AsyncCall call = i.next();
    
          if (runningCallsForHost(call) < maxRequestsPerHost) {
            i.remove();
            runningAsyncCalls.add(call);
            executorService().execute(call);
          }
    
          if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
        }
    }
    
    

    最后通过calls.remove(call),移走了对了,因为readyAsyncCalls,readyAsyncCalls都为空,所以promoteCalls()不会触发。
    起始上面最核心的就是拦截器的责任链模式。是值得我们学习的。

    异步网络访问

    因为很多和同步是一样的,所以就讲关键代码:

    client.dispatcher().enqueue(new AsyncCall(responseCallback));
    

    dispatcher()方法的源码是

     public Dispatcher dispatcher() {
        return dispatcher;
      }
    

    没什么讲的,接着就是说重要的enqueue(),

    private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
    private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
    private int maxRequestsPerHost = 5;
    private int maxRequests = 64;
    
    synchronized void enqueue(AsyncCall call) {
        if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
          runningAsyncCalls.add(call);
          executorService().execute(call);
        } else {
          readyAsyncCalls.add(call);
        }
    }
    
    private int runningCallsForHost(AsyncCall call) {
        int result = 0;
        for (AsyncCall c : runningAsyncCalls) {
          if (c.host().equals(call.host())) result++;
        }
        return result;
    }
    
    

    但异步请求数量小于65并且请求访问的域名小于5,就会添加到runningAsyncCalls队列中,然后executorService线程池去运行,否则就添加到readyAsyncCalls等待队列中,executorService具体是什么线程池呢,看如下源码:

    private ExecutorService executorService;
      
    public synchronized ExecutorService executorService() {
        if (executorService == null) {
          executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
              new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
        }
        return executorService;
    }
      
    

    这是一个阀值为Integer.MAX_VALUE,不保留任何核心线程,用多少创多少,最多只能存活60秒,他这样设计成不设上限的线程,以保证I/O任务中高阻塞低占用的过程,不会长时间卡在阻塞上。
    接着我们传进executorService里的AsyncCall,源码如下:

    final class AsyncCall extends NamedRunnable {
        private final Callback responseCallback;
    
        AsyncCall(Callback responseCallback) {
          super("OkHttp %s", redactedUrl());
          this.responseCallback = responseCallback;
        }
    
        String host() {
          return originalRequest.url().host();
        }
    
        Request request() {
          return originalRequest;
        }
    
        RealCall get() {
          return RealCall.this;
        }
    
        @Override protected void execute() {
          boolean signalledCallback = false;
          try {
            Response response = getResponseWithInterceptorChain();
            if (retryAndFollowUpInterceptor.isCanceled()) {
              signalledCallback = true;
              responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
            } else {
              signalledCallback = true;
              responseCallback.onResponse(RealCall.this, response);
            }
          } catch (IOException e) {
            if (signalledCallback) {
              // Do not signal the callback twice!
              Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
            } else {
              responseCallback.onFailure(RealCall.this, e);
            }
          } finally {
            client.dispatcher().finished(this);
          }
        }
      }
      
    public abstract class NamedRunnable implements Runnable {
      protected final String name;
    
      public NamedRunnable(String format, Object... args) {
        this.name = Util.format(format, args);
      }
    
      @Override public final void run() {
        String oldName = Thread.currentThread().getName();
        Thread.currentThread().setName(name);
        try {
          execute();
        } finally {
          Thread.currentThread().setName(oldName);
        }
      }
    
      protected abstract void execute();
    }
      
    

    AsyncCall继承了Runnable,就像我之前解析AsyncTask一样,先线程池先执行NamedRunnable的run()方法,中途再执行AsyncCall的execute()方法,而整个网络访问还是像我们同步访问那样, Response response = getResponseWithInterceptorChain();通过责任获取访问然后再接口回调,获取服务器返回的数据。
    最后又是执行client.dispatcher().finished(this);先执行calls.remove(call)删除call,当异步的缓存队列readyAsyncCalls有缓存请求时且满足条件时,就会执行promoteCalls()方法里的代码,就是在readyAsyncCalls取出一个call,并把这个call放入runningAsyncCalls,然后执行execute.

    相关文章

      网友评论

        本文标题:解刨OkHttp框架

        本文链接:https://www.haomeiwen.com/subject/mevqjftx.html