美文网首页
OkHttp源码解析

OkHttp源码解析

作者: 沉迷学习_日渐发福 | 来源:发表于2018-11-08 21:29 被阅读0次

    okHttp概述

    上一篇文章讲到retrofit网络请求框架,其实retrofit内部并没有真正的实现网络请求,它内部将网络请求封装成了Call,并将网络请求转角给okhttp去执行。okhttp是square开源的一个网络库,将每一个网络请求都封装成一个call,然后利用call去执行网络请求。

    下面是官方给的例子:

    
    OkHttpClient client = new OkHttpClient();
    
    String run(String url) throws IOException {
    
      Request request = new Request.Builder()
    
          .url(url)
    
          .build();
    
      Response response = client.newCall(request).execute();
    
      return response.body().string();
    
    }
    
    

    okHttp源码阅读

    okhttp的使用就像上面的例子一样这么简单,我们看看其内部的实现

    调用流程

    构造Client

    首先用builder模式创建一个okhttpClient实例,里面有一个非常重要的成员dispatcher,顾名思义,主要是用来做okHttp的网络请求分发的。后续我们会再详细介绍。每一个client在创建时,都会直接new出一个dispatcher。

    
    public Builder() {
    
          dispatcher = new Dispatcher();
    
    
    
        }
    
    

    构造Request

    okHttp的网络请求表示为Request,里面包含了一个请求所需要的参数。

    
        //请求url
    
        HttpUrl url;
    
        //请求方式
    
        String method;
    
        //请求头
    
        Headers.Builder headers;
    
        //请求body
    
        RequestBody body;
    
        //请求tag
    
        Object tag;
    
    

    对于okHttp请求的直接触发单位并不是request,而是在request的基础上继续包装了一层,包装为Call。

    构造Call

    call是一个接口,供我们外部调用,主要有以下几个比较重要的方法

    
    //获取当次请求的Request
    
    Request request();
    
    //同步执行入口
    
    Response execute() throws IOException;
    
    //异步执行入口
    
    void enqueue(Callback responseCallback);
    
    //取消入口
    
    void cancel();
    
    

    我们实际使用的是其实现类RealCall,我们直接通过client提供的工厂方法,传入request,并返回RealCall。

    
    private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    
        this.client = client;
    
        this.originalRequest = originalRequest;
    
        this.forWebSocket = forWebSocket;
    
        this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
    
      }
    
    

    请求的执行

    同步请求 execute

    
    @Override public Response execute() throws IOException {
    
        synchronized (this) {
    
          if (executed) throw new IllegalStateException("Already Executed");
    
          executed = true;
    
        }
    
        captureCallStackTrace();
    
        try {
    
          //添加到dispatcher的同步队列中
    
          client.dispatcher().executed(this);
    
          //通过拦截器链执行请求
    
          Response result = getResponseWithInterceptorChain();
    
          if (result == null) throw new IOException("Canceled");
    
          return result;
    
        } finally {
    
            //从dispatcher正在运行队列中中移除
    
          client.dispatcher().finished(this);
    
        }
    
      }
    
    

    通过局部变量executed来标识一个call只能执行一次,需要用symchronized来保证多线程同步。

    然后通过dispatcher的请求处理,其实仅仅是将这个call添加到维护的正在运行的队列中,然后立马通过拦截器链发起请求。

    
    synchronized void executed(RealCall call) {
    
        runningSyncCalls.add(call);
    
      }
    
    

    后面通过拦截器链执行网络请求。

    异步请求

    异步请求和同步请求非常类似。

    
    @Override public void enqueue(Callback responseCallback) {
    
        synchronized (this) {
    
          if (executed) throw new IllegalStateException("Already Executed");
    
          executed = true;
    
        }
    
        captureCallStackTrace();
    
        eventListener.callStart(this);
    
        client.dispatcher().enqueue(new AsyncCall(responseCallback));
    
      }
    
    

    enqueue仅仅调用了dispatcher的enqueue,需要将RealCall转化为AsyncCall,也就是异步请求Call,其实就是一个Runnable。

    
    final class AsyncCall extends NamedRunnable {
    
        private final Callback responseCallback;
    
        AsyncCall(Callback responseCallback) {
    
          super("OkHttp %s", redactedUrl());
    
          this.responseCallback = responseCallback;
    
        }
    
    

    Dispatcher的分发机制

    不管同步还是异步请求,okHttp最终都会将请求传递给dispatcher。

    dispatcher内部维护三个队列,一个线程池。

    
    private @Nullable ExecutorService executorService;
    
      private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
    
      private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
    
      private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
    
    

    对于同步请求,会直接将请求添加到运行队列中,并马上执行请求,而异步请求可能需要先将请求添加到就绪异步队列中,等待dispatcher的调度唤起。

    
    synchronized void enqueue(AsyncCall call) {
    
        if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
    
          runningAsyncCalls.add(call);
    
          executorService().execute(call);
    
        } else {
    
          readyAsyncCalls.add(call);
    
        }
    
      }
    
    

    如果当前正在运行的网络请求�数小于设置最大值,那么表示这个请求可以立马发起,则需要将这个call添加到运行的异步队列中,并且调用线程池执行这个请求。否则需要奖这个请求添加到等待队列中。当线程池执行时,其实是调用了这个runnable的run方法,

    
    @Override public final void run() {
    
        String oldName = Thread.currentThread().getName();
    
        Thread.currentThread().setName(name);
    
        try {
    
          execute();
    
        } finally {
    
          Thread.currentThread().setName(oldName);
    
        }
    
      }
    
    

    其实内部就是执行excute方法

    
      @Override protected void execute() {
    
          boolean signalledCallback = false;
    
          try {
    
            Response response = getResponseWithInterceptorChain(null);
    
            ....
    
            responseCallback.onFailure()
    
            responseCallback.onResponse()
    
            ....
    
          } finally {
    
            client.dispatcher().finished(this);
    
          }
    
        }
    
    

    在run方法执行了execute方法之后,同样是先调用了拦截器链发起网络请求,然后在网络请求回来之后,回调callback,所以,可见这个callBack是在子线程里面做的回调。最后同样是调用finished关闭这个请求。

    
    private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    
        int runningCallsCount;
    
        Runnable idleCallback;
    
        synchronized (this) {
    
        //从异步执行队列中移除
    
          if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
    
          //唤起下一个call的调用
    
          if (promoteCalls) promoteCalls();
    
          //重新计算当前网络请求数
    
          runningCallsCount = runningCallsCount();
    
          idleCallback = this.idleCallback;
    
        }
    
        if (runningCallsCount == 0 && idleCallback != null) {
    
          idleCallback.run();
    
        }
    
      }
    
    

    如果是异步执行的call的finish操作,那么它还多了一个非常重要的步骤,通过promoteCalls来唤起下一个call请求发起。

    
    private void promoteCalls() {
    
        if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    
        if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
    
        for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
    
          AsyncCall call = i.next();
    
          if (runningCallsForHost(call) < maxRequestsPerHost) {
    
            i.remove();
    
            runningAsyncCalls.add(call);
    
            executorService().execute(call);
    
          }
    
          if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    
        }
    
      }
    
    

    首先会判断当前正在请求数是否大于设置的最大请求,然后判断等待队列是否为空,在非空的情况下,从就绪队列中移除首个call,并开始执行。

    拦截器链->获取response

    上面的同步或者异步请求,最终都会调用到getResponseWithinterceptorChain这个方法上。

    
    Response getResponseWithInterceptorChain(DiskCacheListener diskCacheListener) throws IOException {
    
        // Build a full stack of interceptors.
    
        List<Interceptor> interceptors = new ArrayList<>();
    
        interceptors.addAll(client.interceptors());
    
        interceptors.add(retryAndFollowUpInterceptor);
    
        interceptors.add(new BridgeInterceptor(client.cookieJar()));
    
        interceptors.add(new CacheInterceptor(client.internalCache(), diskCacheListener));
    
        interceptors.add(new ConnectInterceptor(client));
    
        if (!forWebSocket) {
    
          interceptors.addAll(client.networkInterceptors());
    
        }
    
        interceptors.add(new CallServerInterceptor(forWebSocket));
    
        Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
    
            originalRequest, this, eventListener, client.connectTimeoutMillis(),
    
            client.readTimeoutMillis(), client.writeTimeoutMillis());
    
        return chain.proceed(originalRequest);
    
      }
    
    

    首先会构造拦截器链,其实就是一个list,将一些需要的拦截器按序添加到list中,然后构造Chain,最终调用proceed开始执行。

    链式调用

    okHttp这个拦截器的遍历的实现其实是非常巧妙的。Chain有一个index表示当前执行的哪一个拦截器。

    
    //第一次执行
    
    Interceptor.Chain chain = new RealInterceptorChain(.., 0, ...);
    
        return chain.proceed(originalRequest);
    
    

    proceed方法

    
    // Call the next interceptor in the chain.
    
    RealInterceptorChain next = new RealInterceptorChain(index + 1);
    
    Interceptor interceptor = interceptors.get(index);
    
    Response response = interceptor.intercept(next);
    
    

    在proceed中首先会构造出下一个Chain,只需要把index+1即可,然后拿出当前的拦截器,并且执行其intercept方法。

    
    @Override public Response intercept(Chain chain) throws IOException {
    
    ....
    
    Response networkResponse = chain.proceed(requestBuilder.build());
    
    ....
    
    }
    
    

    在intercept中,再次调用了下一个链的proceed方法。依次执行,形成了一个链式调用。

    拦截器

    RetryAndFollow

    网络请求失败时的重连和重定向的拦截器
    

    Bridge

    桥接拦截器,主要是用来增加请求头,cookied, userAgent。
    

    Cache

    主要是负责okHttp的缓存。okHttp有一个缓存策略。
    
    
    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    
        Request networkRequest = strategy.networkRequest;
    
        Response cacheResponse = strategy.cacheResponse;
    
    

    networkRequest表示当前请求,cacheResponse表示是否有该请求对应的缓存response。

    
    // If we're forbidden from using the network and the cache is insufficient, fail.
    
        if (networkRequest == null && cacheResponse == null) {
    
          return new Response.Builder()
    
              .request(chain.request())
    
              .protocol(Protocol.HTTP_1_1)
    
              .code(504)
    
              .message("Unsatisfiable Request (only-if-cached)")
    
              .body(Util.EMPTY_RESPONSE)
    
              .sentRequestAtMillis(-1L)
    
              .receivedResponseAtMillis(System.currentTimeMillis())
    
              .build();
    
        }
    
        // If we don't need the network, we're done.
    
        if (networkRequest == null) {
    
          return cacheResponse.newBuilder()
    
              .cacheResponse(stripBody(cacheResponse))
    
              .build();
    
        }
    
    

    如果当前发起的请求为null,而且没有缓存的response,那么直接返回错误。

    如果发起的请求不为null,而且存在缓存,那么直接用缓存,提前返回。否则正常发起网络请求。

    Connect

    ConnectIntercept主要用建立网络链接。

    
    public Response intercept(Chain chain) throws IOException {
    
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
    
        Request request = realChain.request();
    
        StreamAllocation streamAllocation = realChain.streamAllocation();
    
        // We need the network to satisfy this request. Possibly for validating a conditional GET.
    
        boolean doExtensiveHealthChecks = !request.method().equals("GET");
    
        HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
    
        RealConnection connection = streamAllocation.connection();
    
        return realChain.proceed(request, streamAllocation, httpCodec, connection);
    
      }
    
    
    • 首先会从获取出链中拿出stramALlocation。StreamAlloction是okHttp用来辅助管理的一个类,主要是管理流、连接、和call之间的关系。是流和连接之间的component,它会为call请求去寻找连接并建立一个流。这个流是在第一个retryAndFollowIntercept中创建的,最后传递到connectIntercept。

    • 然后通过newStream方法找到一个合适的socket链接。并返回对应的socket操作封装类HttpCodeC,newStream会从连接池中去寻找合适的连接,如果没有可复用的连接,将会创建出新连接。

    
    RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
    
              writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
    
          HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
    
    

    CallServer

    CallServerIntercept是真正发起网络请求的类,通过上一个拦截器处理完后的HttpCodec进行网络的读写。内部封装了okio进行数据读写

    
    ...
    
    //写入请求头
    
    httpCodec.writeRequestHeaders(request);
    
    //通过okIo写入请求请求体
    
    request.body().writeTo(bufferedRequestBody);
    
    

    然后通过okio进行response读取

    
    if (responseBuilder == null) {
    
          realChain.eventListener().responseHeadersStart(realChain.call());
    
          responseBuilder = httpCodec.readResponseHeaders(false);
    
        }
    
        Response response = responseBuilder
    
            .request(request)
    
            .handshake(streamAllocation.connection().handshake())
    
            .sentRequestAtMillis(sentRequestMillis)
    
            .receivedResponseAtMillis(System.currentTimeMillis())
    
            .build();
    
    

    okHttp连接池的处理

    put

    当没有找到可以复用的连接时,需要创建新的连接,并将连接放入连接池中,

    
    void put(RealConnection connection) {
    
        assert (Thread.holdsLock(this));
    
        if (!cleanupRunning) {
    
          cleanupRunning = true;
    
          executor.execute(cleanupRunnable);
    
        }
    
        connections.add(connection);
    
      }
    
    

    每次添加新的连接,需要看是否需要清理连接池。这个处理操作是通过线程池来处理的。通过下面几个步骤来判断一个连接是否需要被回收

    • 计数,看一个connect是否被多个streamAllocation所引用。Connecttion内部存储了一个StreamAllocation的弱引用列表list,当一个网络请求结束时,将会关闭stremAllocation,并且回收
    
    for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
    
            RealConnection connection = i.next();
    
            // If the connection is in use, keep searching.
    
            if (pruneAndGetAllocationCount(connection, now) > 0) {
    
              inUseConnectionCount++;
    
              continue;
    
            }
    
            idleConnectionCount++;
    
            }
    
    
    • 如果空闲数大于最大空闲数,那么回收
    
    if (longestIdleDurationNs >= this.keepAliveDurationNs
    
              || idleConnectionCount > this.maxIdleConnections) {
    
            connections.remove(longestIdleConnection);
    
          }
    
    

    get连接

    
    Internal.instance.get(connectionPool, address, this, null);
    
    

    通过address来从连接池中获取需要复用的连接。

    
      @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    
        assert (Thread.holdsLock(this));
    
        for (RealConnection connection : connections) {
    
          if (connection.isEligible(address, route)) {
    
            streamAllocation.acquire(connection);
    
            return connection;
    
          }
    
        }
    
        return null;
    
      }
    
    

    首先会判断连接池中是否包含了这个请求地址和路由对应的连接,如果有,那么在这个connection中添加streamAllocation的引用并直接返回,

    socket

    socket连接的建立,是在StreamAllocation中建立的。当我们没找到可以复用的连接,那么将会创建一个新的连接。

    
    result = new RealConnection(connectionPool, selectedRoute);
    
    //为connect添加streamAllocation的引用
    
    acquire(result);
    
    

    在创建完connection之后,需要建立socket连接

    
      /** Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket. */
    
      private void connectSocket(int connectTimeout, int readTimeout, Call call,
    
          EventListener eventListener) throws IOException {
    
        Proxy proxy = route.proxy();
    
        Address address = route.address();
    
        rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
    
            ? address.socketFactory().createSocket()
    
            : new Socket(proxy);
    
        eventListener.connectStart(call, route.socketAddress(), proxy);
    
        rawSocket.setSoTimeout(readTimeout);
    
        try {
    
          Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    
        } catch (ConnectException e) {
    
          ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
    
          ce.initCause(e);
    
          throw ce;
    
        }
    
        // The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
    
        // More details:
    
        // https://github.com/square/okhttp/issues/3245
    
        // https://android-review.googlesource.com/#/c/271775/
    
        try {
    
          source = Okio.buffer(Okio.source(rawSocket));
    
          sink = Okio.buffer(Okio.sink(rawSocket));
    
        } catch (NullPointerException npe) {
    
          if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
    
            throw new IOException(npe);
    
          }
    
        }
    
      }
    
    

    上面的就是socket连接的建立过程,大体可以分为以下几步:

    • 创建socket套接字:
    
    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
    
            ? address.socketFactory().createSocket()
    
            : new Socket(proxy);
    
    
    • 连接远端Socket
    
    Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    
    
    • 读写流,okIo封装
    
    //read
    
    source = Okio.buffer(Okio.source(rawSocket));
    
    //write
    
    sink = Okio.buffer(Okio.sink(rawSocket));
    
    

    这个是客户端的socket创建,服务端的socket创建需要多一个监听

    总结

    其实okHttp的利用了java的职责链模式,每一个拦截器都有属于自己功能。下面看看一张自己绘制的okhttp访问的流程。

    [图片上传失败...(image-2de00c-1541683717380)]

    相关文章

      网友评论

          本文标题:OkHttp源码解析

          本文链接:https://www.haomeiwen.com/subject/kmegxqtx.html