美文网首页
OKHttp的应用与源码解析

OKHttp的应用与源码解析

作者: 月影路西法 | 来源:发表于2019-10-12 16:37 被阅读0次

OKHttp介绍

okhttp github地址
OkHttp是一个优秀的网络请求框架
谷歌官方在6.0以后在android sdk已经移除了httpClient,加入我们okHttp.
okHttp支持SPDY(是谷歌开发的基于TCP的应用层协议,用于最小化网络延迟,提升网络速度,优化用户的网络使用体验. SPDY并不是一种替代http的协议,只是对http的一种增强.)允许连接在一主机的所有请求分享一个socket.如果SPDY不可用.会使用连接池来减少请求延迟.利用响应缓存来避免重复的网络请求.即便是网络出现问题时,okhttp依然起作用.它将从常见的链接问题当中回复.如果你的服务器有多个IP地址,当地一个失败时,okhttp会自动尝试连接其他的地址.这对于IPV4和IPV6以及寄宿在多个数据中心的服务而言,是非常有必要的,所以okhttp的稳定性可以说是非常棒的

okhttp流程图.png

OkHttp的使用

OkHttpClient初始化

OkHttpClient okHttpClient=new OkHttpClient();

调用此方法会自动生成一下请求的参数,下面为OkHttp自动生成的一些参数

//OkHttpClient的默认构造方法
  public OkHttpClient() {
    this(new Builder());
  }
      
     public Builder() {
      dispatcher = new Dispatcher();
      protocols = DEFAULT_PROTOCOLS;
      connectionSpecs = DEFAULT_CONNECTION_SPECS;
      eventListenerFactory = EventListener.factory(EventListener.NONE);
      proxySelector = ProxySelector.getDefault();
      if (proxySelector == null) {
        proxySelector = new NullProxySelector();
      }
      cookieJar = CookieJar.NO_COOKIES;
      socketFactory = SocketFactory.getDefault();
      hostnameVerifier = OkHostnameVerifier.INSTANCE;
      certificatePinner = CertificatePinner.DEFAULT;
      proxyAuthenticator = Authenticator.NONE;
      authenticator = Authenticator.NONE;
      connectionPool = new ConnectionPool();
      dns = Dns.SYSTEM;
      followSslRedirects = true;
      followRedirects = true;
      retryOnConnectionFailure = true;
      callTimeout = 0;
      connectTimeout = 10_000;
      readTimeout = 10_000;
      writeTimeout = 10_000;
      pingInterval = 0;
    }

这大概是一个最简单的一个例子了,在new OkHttpClient()内部使用构造器模式初始化了一些配置信息:支持协议、任务分发器(其内部包含一个线程池,执行异步请求)、连接池(其内部包含一个线程池,维护connection)、连接/读/写超时时长等信息。

如果需要使用其他的参数,则需要调用OkHttpClient.Builder,下方为我自己写的okHttpClient初始化方法

        OkHttpClient.Builder builder = new OkHttpClient.Builder().connectTimeout(30000L, TimeUnit.MILLISECONDS)
                .readTimeout(30000L, TimeUnit.MILLISECONDS);

                for(Interceptor interceptor : interceptors){
                    builder.addInterceptor(interceptor);//我再这里添加了拦截器
                }
//                .addInterceptor(new LoggingInterceptor("OkHttpClient"))
                builder.hostnameVerifier(new HostnameVerifier() {
                    @Override
                    public boolean verify(String hostname, SSLSession session) {
                        return true;
                    }
                });
        if (false) {
            HttpsUtils.SSLParams sslParams = HttpsUtils.getSslSocketFactory(null, null, null);
            builder.sslSocketFactory(sslParams.sSLSocketFactory, sslParams.trustManager);
        } else {
            SSLContext sslContext = null;
            try {
                sslContext = SSLContext.getInstance("TLS");
                try {
                    sslContext.init(null, null, null);
                } catch (KeyManagementException e) {
                    e.printStackTrace();
                }
            } catch (NoSuchAlgorithmException e) {
                e.printStackTrace();
            }

            SSLSocketFactory socketFactory = new Tls12SocketFactory(sslContext.getSocketFactory());
            builder.sslSocketFactory(socketFactory, new HttpsUtils.UnSafeTrustManager());
        }
        OkHttpClient okHttpClient = builder
                .build();

RequestBody

在调用 put post delete方法的时候回需要传入RequestBody这个参数
FormBody是RequestBody的实现类,用于表单方式的请求

FormBody

OkHttpClient client = new OkHttpClient();
        //创建表单请求参数
        FormBody.Builder builder = new FormBody.Builder();
        builder.add("name", "zhangsan");
        builder.add("age", "18");
        FormBody formBody = builder.build();
        Request request = new Request.Builder()
                .url(url)
                .post(formBody)
                .build();
        client.newCall(request).enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
            }
            @Override
            public void onResponse(Call call, Response response) throws IOException {
            }
        });

RequestBody.create(...)

RequestBody 是一个抽象类,我们不能直接使用它,但是可以通过调用它的静态create方法来获取一个RequestBody对象,该方法会创建并返回一个 RequestBody 的匿名内部类实例
查看一下 RequestBody 类,发现它有这样几个 create 方法。


RequestBody.create.png

其中前三个方法最终调用的都是第四个方法,所以我们可以具体看一下最后两个方法的具体实现

  /** Returns a new request body that transmits {@code content}. */
  public static RequestBody create(final @Nullable MediaType contentType, final byte[] content,
      final int offset, final int byteCount) {
    if (content == null) throw new NullPointerException("content == null");
    Util.checkOffsetAndCount(content.length, offset, byteCount);
    return new RequestBody() {
      @Override public @Nullable MediaType contentType() {
        return contentType;
      }

      @Override public long contentLength() {
        return byteCount;
      }

      @Override public void writeTo(BufferedSink sink) throws IOException {
        sink.write(content, offset, byteCount);
      }
    };
  }

  /** Returns a new request body that transmits the content of {@code file}. */
  public static RequestBody create(final @Nullable MediaType contentType, final File file) {
    if (file == null) throw new NullPointerException("file == null");

    return new RequestBody() {
      @Override public @Nullable MediaType contentType() {
        return contentType;
      }

      @Override public long contentLength() {
        return file.length();
      }

      @Override public void writeTo(BufferedSink sink) throws IOException {
        Source source = null;
        try {
          source = Okio.source(file);
          sink.writeAll(source);
        } finally {
          Util.closeQuietly(source);
        }
      }
    };
  }

这里多解释一下一些参数
Content-Type(MediaType),即是Internet Media Type,互联网媒体类型;也叫做MIME类型,在Http协议消息头中,使用Content-Type来表示具体请求中的媒体类型信息。用于定义网络文件的类型和网页的编码,决定文件接收方将以什么形式、什么编码读取这个文件。常见的媒体格式类型有:

text/html:HTML格式
text/pain:纯文本格式
image/jpeg:jpg图片格式
application/json:JSON数据格式
application/octet-stream:二进制流数据(如常见的文件下载)
application/x-www-form-urlencoded:form表单encType属性的默认格式,表单数据将以key/value的形式发送到服务端
multipart/form-data:表单上传文件的格式
使用 create 方法可以用来用于上传 String 和 File 对象,具体实现如下:
上传JSON字符串:

MultipartBody

MultipartBody是针对多文件和键值对同时上传,使用如下

MultipartBody.Builder builder = new MultipartBody.Builder().setType(MultipartBody.FORM);
appParms(builder, params);

    private void appParms(MultipartBody.Builder builder, Map<String, Object> params) {
        if (params != null && !params.isEmpty()) {
            for (String key : params.keySet()) {
                builder.addFormDataPart(key, params.get(key) + "");
                Object value = params.get(key);
                if (value instanceof File) {
                    //处理文件 -- >>object file
                    File file = (File) value;
                    builder.addFormDataPart(key, file.getName(), RequestBody.create(MediaType.parse(guessMineType(file.getAbsolutePath())), file));

                } else if (value instanceof List) {
                    List<File> listFiles = (List<File>) value;
                    for (int i = 0; i < listFiles.size(); i++) {
                        File file = listFiles.get(i);
                        builder.addFormDataPart(key + i, file.getName(), RequestBody.create(MediaType.parse(guessMineType(file.getAbsolutePath())), file));
                    }
                }
            }
        }
    }

附上MediaType的类型

  /**
   * The "mixed" subtype of "multipart" is intended for use when the body parts are independent and
   * need to be bundled in a particular order. Any "multipart" subtypes that an implementation does
   * not recognize must be treated as being of subtype "mixed".
   */
  public static final MediaType MIXED = MediaType.get("multipart/mixed");

  /**
   * The "multipart/alternative" type is syntactically identical to "multipart/mixed", but the
   * semantics are different. In particular, each of the body parts is an "alternative" version of
   * the same information.
   */
  public static final MediaType ALTERNATIVE = MediaType.get("multipart/alternative");

  /**
   * This type is syntactically identical to "multipart/mixed", but the semantics are different. In
   * particular, in a digest, the default {@code Content-Type} value for a body part is changed from
   * "text/plain" to "message/rfc822".
   */
  public static final MediaType DIGEST = MediaType.get("multipart/digest");

  /**
   * This type is syntactically identical to "multipart/mixed", but the semantics are different. In
   * particular, in a parallel entity, the order of body parts is not significant.
   */
  public static final MediaType PARALLEL = MediaType.get("multipart/parallel");

  /**
   * The media-type multipart/form-data follows the rules of all multipart MIME data streams as
   * outlined in RFC 2046. In forms, there are a series of fields to be supplied by the user who
   * fills out the form. Each field has a name. Within a given form, the names are unique.
   */
  public static final MediaType FORM = MediaType.get("multipart/form-data");

Request

Request为建造者模式,最简单的初始化为下方代码,里面需要传入请求的信息

Request request = new Request.Builder()
                .get()//使用get请求
                .url("https:www.baidu.com")//请求地址为百度
                .build();//建造者模式生成request对象


Call对象

Call call = client.newCall(request);

//添加callBack回调函数

//同步调用,返回Response,会抛出IO异常
Response response = call.execute();

//异步调用,并设置回调函数
call.enqueue(new Callback() {
    @Override
    public void onFailure(Call call, IOException e) {
        Toast.makeText(OkHttpActivity.this, "get failed", Toast.LENGTH_SHORT).show();
    }

    @Override
    public void onResponse(Call call, final Response response) throws IOException {
        final String res = response.body().string();

    }
});

在这里需要注意的是,CallBack的回调函数是在子线程中,需要调用Handle或者runOnUiThread()进行UI线程的更新

         mHandle.post(new Runnable() {
              @Override
              public void run() {
                 Log.i(TAG, "error");
                 //执行成功方法
                 callBack.onSuccess(response.code(),result);
              }
           });

        runOnUiThread(new Runnable() {
            @Override
            public void run() {
                contentTv.setText(res);
            }
        });

以上就是OkHttp的基本用法

OkHttp拦截器Interceptor的使用

在调用OkHttpClient.Build的时候可以设置拦截器

mOkHttpClient = new OkHttpClient().newBuilder()
                .connectTimeout(REQUEST_TIME, TimeUnit.SECONDS)
                .readTimeout(REQUEST_TIME, TimeUnit.SECONDS)
                .writeTimeout(REQUEST_TIME, TimeUnit.SECONDS)
                .addInterceptor(new LoggerInterceptor())//这里就是添加连接器的地方
                .build();

拦截器可以自己定义 只要实现Interceptor 就可以了

public class LoggerInterceptor implements Interceptor {
      ...
}

拦截器是一个强有力的机制,能够监控,重写以及重试(请求的)调用。

以下是官网提供的一个Interceptor例子,只用来打印拦截到的Response的相关日志:

class LoggingInterceptor implements Interceptor {
    String tag = "MainActivityXXXX";

    @Override
    public Response intercept(Interceptor.Chain chain) throws IOException {
        Request request = chain.request();

        long t1 = System.nanoTime();
        Log.d(tag, String.format("Sending request %s on %s%n%s",
                request.url(), chain.connection(), request.headers()));

        Response response = chain.proceed(request);

        long t2 = System.nanoTime();
        Log.d(tag, String.format("Received response for %s in %.1fms%n%s",
                response.request().url(), (t2 - t1) / 1e6d, response.headers()));

        return response;
    }
}

在Okhttp源码中 再执行enqueue方法后,都会在最后调用RealCall类中getResponseWithInterceptorChain(),在这个方法中会调用到拦截器

final class RealCall implements Call {
...
  Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    return chain.proceed(originalRequest);
  }
}
getResponseWithInterceptorChain方法的执行流程.png

从 getResponseWithInterceptorChain 函数我们可以看到,Interceptor.Chain 的分布依次是:


image.png

1.在配置 OkHttpClient 时设置的 interceptors;
2.负责失败重试以及重定向的 RetryAndFollowUpInterceptor;
3.负责把用户构造的请求转换为发送到服务器的请求、把服务器返回的响应转换为用户友好的响应的 BridgeInterceptor;
4.负责读取缓存直接返回、更新缓存的 CacheInterceptor;
5.负责和服务器建立连接的 ConnectInterceptor;
6.配置 OkHttpClient 时设置的 networkInterceptors;
7.负责向服务器发送请求数据、从服务器读取响应数据的CallServerInterceptor。

实际上,拦截器是责任链模式的最佳应用(如同事件分发机制),每个拦截器可以自己拦截处理,或者交给下一个拦截器,让每个 Interceptor 自行决定能否完成任务以及怎么完成任务。
其实 Interceptor 的设计也是一种分层的思想,每个 Interceptor 就是一层。为什么要套这么多层呢?分层的思想在 TCP/IP 协议中就体现得淋漓尽致,分层简化了每一层的逻辑,每层只需要关注自己的责任(单一原则思想也在此体现),而各层之间通过约定的接口/协议进行合作(面向接口编程思想),共同完成复杂的任务。

OkHttp的源码解析

我们在看一下OkHttpClient的默认构造方法

  public OkHttpClient() {
    this(new Builder());
  }

    public Builder() {
      dispatcher = new Dispatcher();//任务调度器
      protocols = DEFAULT_PROTOCOLS;//支持的协议
      connectionSpecs = DEFAULT_CONNECTION_SPECS;
      eventListenerFactory = EventListener.factory(EventListener.NONE);
      proxySelector = ProxySelector.getDefault();
      if (proxySelector == null) {
        proxySelector = new NullProxySelector();
      }
      cookieJar = CookieJar.NO_COOKIES;
      socketFactory = SocketFactory.getDefault();
      hostnameVerifier = OkHostnameVerifier.INSTANCE;
      certificatePinner = CertificatePinner.DEFAULT;
      proxyAuthenticator = Authenticator.NONE;
      authenticator = Authenticator.NONE;
      connectionPool = new ConnectionPool();
      dns = Dns.SYSTEM;
      followSslRedirects = true;
      followRedirects = true;
      retryOnConnectionFailure = true;
      callTimeout = 0;
      connectTimeout = 10_000;
      readTimeout = 10_000;
      writeTimeout = 10_000;
      pingInterval = 0;
    }

第一行创建了一个Dispatcher任务分派器,它定义了三个双向任务队列,两个异步队列:准备执行的请求队列 readyAsyncCalls、正在运行的请求队列 runningAsyncCalls;一个正在运行的同步请求队列 runningSyncCalls。
还有一个线程池 executorService ,这个线程池跟Android中的CachedThreadPool非常类似,这种类型的线程池,适用于大量的耗时较短的异步任务

public final class Dispatcher {
  private int maxRequests = 64;//最大请求数量
  private int maxRequestsPerHost = 5;//每台主机最大的请求数量
  private @Nullable Runnable idleCallback;

  /** Executes calls. Created lazily. */
  private @Nullable ExecutorService executorService;//线程池

  /** Ready async calls in the order they'll be run. */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();//准备执行的请求队列

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();// 正在运行的请求队列

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();//正在运行的同步请求队列

  public Dispatcher(ExecutorService executorService) {
    this.executorService = executorService;
  }

  public Dispatcher() {
  }
 /** 这个线程池没有核心线程,线程数量没有限制,空闲60s就会回收*/
  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

其他的是一些连接超时之类的参数
记下来我们来看Request

public final class Request {
  final HttpUrl url;
  final String method;
  final Headers headers;
  final @Nullable RequestBody body;
  final Map<Class<?>, Object> tags;

  private volatile @Nullable CacheControl cacheControl; // Lazily initialized.

  Request(Builder builder) {
    this.url = builder.url;//访问的URL
    this.method = builder.method; //访问URL的方法
    this.headers = builder.headers.build();//请求的消息头
    this.body = builder.body;//请求的方法体
    this.tags = Util.immutableMap(builder.tags);//tag
  }
...
  public static class Builder {
    @Nullable HttpUrl url;
    String method;
    Headers.Builder headers;
    @Nullable RequestBody body;

    /** A mutable map of tags, or an immutable empty map if we don't have any. */
    Map<Class<?>, Object> tags = Collections.emptyMap();

    public Builder() {
      this.method = "GET";
      this.headers = new Headers.Builder();
    }

    Builder(Request request) {
      this.url = request.url;
      this.method = request.method;
      this.body = request.body;
      this.tags = request.tags.isEmpty()
          ? Collections.<Class<?>, Object>emptyMap()
          : new LinkedHashMap<>(request.tags);
      this.headers = request.headers.newBuilder();
    }
..
    public Request build() {
      if (url == null) throw new IllegalStateException("url == null");
      return new Request(this);
    }

Request类是典型的构造者模式,里面需要的信息有URL,Method,body等的,最后调用build方法,生成Request对象
接下来我们看newCall这个方法

Call call=mOkHttpClient.newCall(request)

  @Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
  }

  static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.eventListener = client.eventListenerFactory().create(call);
    return call;
  }

  private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    this.client = client;
    this.originalRequest = originalRequest;
    this.forWebSocket = forWebSocket;
    this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
    this.timeout = new AsyncTimeout() {
      @Override protected void timedOut() {
        cancel();
      }
    };
    this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS);
  }

  public EventListener.Factory eventListenerFactory() {
    return eventListenerFactory;
  }

通过以上代码我们可以看到,将Request类传入近RealCall,生成RealCall对象,并且创造了一个RetryAndFollowUpInterceptor拦截器,用于处理请求错误和重定向等,这是 Okhttp 框架的精髓 interceptor chain 中的一环,默认情况下也是第一个拦截器,除非调用 OkHttpClient.Builder#addInterceptor(Interceptor) 来添加全局的拦截器。关于拦截器链的顺序参见 RealCall#getResponseWithInterceptorChain() 方法。
接下来我们查看RealCallCall的enqueue方法

  @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {//在这里只能执行一次
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }


  void enqueue(AsyncCall call) {
    synchronized (this) {
      readyAsyncCalls.add(call);
    }
    promoteAndExecute();
  }


  private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));

    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();

        if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
        if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.

        i.remove();
        executableCalls.add(asyncCall);
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }

    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      asyncCall.executeOn(executorService());
    }

    return isRunning;
  }

可以看到,一个 Call 只能执行一次,否则会抛异常,这里创建了一个 AsyncCall 并将Callback传入,将AsyncCall,加入到请求队列readyAsyncCalls中, 接着再交给任务分发器 Dispatcher 来进一步处理。然后在Dispatcher#enqueue()里将AsyncCall加入到临时变量List<AsyncCall> executableCalls中然后执行asyncCall.executeOn(executorService());方法

    /**
     * Attempt to enqueue this async call on {@code executorService}. This will attempt to clean up
     * if the executor has been shut down by reporting the call as failed.
   
*尝试将此异步调用排队到{@code executorservice}。这将试图清理
*如果执行器已被关闭,则报告调用失败。
  */
final class RealCall implements Call {
...
  final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }
...

    void executeOn(ExecutorService executorService) {
      assert (!Thread.holdsLock(client.dispatcher()));//断言线程是否有被锁住
      boolean success = false;
      try {
        executorService.execute(this);//调用NamedRunnable 
        success = true;
      } catch (RejectedExecutionException e) {
        InterruptedIOException ioException = new InterruptedIOException("executor rejected");
        ioException.initCause(e);
        eventListener.callFailed(RealCall.this, ioException);
        responseCallback.onFailure(RealCall.this, ioException);
      } finally {
        if (!success) {
          client.dispatcher().finished(this); // This call is no longer running!
        }
      }
    }

//AsyncCall 中的Runnable
public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}
...
//最终会调用execute()方法
    @Override protected void execute() {
      boolean signalledCallback = false;
      timeout.enter();
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {//请求失败了
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {//成功了 返回response
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        e = timeoutExit(e);
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }

在executeOn(ExecutorService executorService)方法中,通过executorService.execute(this)调用了NamedRunnable 的run方法,再run方法中调用了execute

void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
}

  private <T> void finished(Deque<T> calls, T call) {
    Runnable idleCallback;
    synchronized (this) {
//从正在执行的队列中将其移除
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      idleCallback = this.idleCallback;
    }

    boolean isRunning = promoteAndExecute();//推动下一个任务的执行
  //如果没有正在执行的任务,且idleCallback不为null,则回调通知空闲了
    if (!isRunning && idleCallback != null) {
      idleCallback.run();
    }
  }

  private boolean promoteAndExecute() {//推动下一个任务的执行
    assert (!Thread.holdsLock(this));

    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();

        if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
        if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.

        i.remove();
        executableCalls.add(asyncCall);
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }

    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      asyncCall.executeOn(executorService());
    }

    return isRunning;
  }

在getResponseWithInterceptorChain() 方法执行完成之后,会将这次请求移除队列,并推动下一个请求的进行,如果队列是空的,则通知空闲调用idleCallback.run();
我们回过头来接着看getResponseWithInterceptorChain()

  Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();//这是一个List,是有序的
    interceptors.addAll(client.interceptors());//首先添加的是用户添加的全局拦截器
    interceptors.add(retryAndFollowUpInterceptor);//错误、重定向拦截器
    interceptors.add(new BridgeInterceptor(client.cookieJar()));//桥接拦截器,桥接应用层与网络层,添加必要的头、
    interceptors.add(new CacheInterceptor(client.internalCache())); //缓存处理,Last-Modified、ETag、DiskLruCache等
    interceptors.add(new ConnectInterceptor(client));//连接拦截器
    if (!forWebSocket) {/从这就知道,通过okHttpClient.Builder#addNetworkInterceptor()传进来的拦截器只对非网页的请求生效
      interceptors.addAll(client.networkInterceptors());
    }
//真正访问服务器的拦截器
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    return chain.proceed(originalRequest);
  }

可以看这块重点就是 interceptors 这个集合,首先将前面的 client.interceptors() 全部加入其中,还有在创建 RealCall时的 retryAndFollowUpInterceptor加入其中,接着还创建并添加了BridgeInterceptor、CacheInterceptor、ConnectInterceptor、CallServerInterceptor,最后通过RealInterceptorChain#proceed(Request)来执行整个 interceptor chain,可见把这个拦截器链搞清楚,整体流程也就明朗了。最后在调用chain.proceed(originalRequest)进行请求

public final class RealInterceptorChain implements Interceptor.Chain {
  private final List<Interceptor> interceptors;
  private final StreamAllocation streamAllocation;
  private final HttpCodec httpCodec;
  private final RealConnection connection;
  private final int index;
  private final Request request;
  private final Call call;
  private final EventListener eventListener;
  private final int connectTimeout;
  private final int readTimeout;
  private final int writeTimeout;
  private int calls;

  public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,
      HttpCodec httpCodec, RealConnection connection, int index, Request request, Call call,
      EventListener eventListener, int connectTimeout, int readTimeout, int writeTimeout) {
    this.interceptors = interceptors;
    this.connection = connection;
    this.streamAllocation = streamAllocation;
    this.httpCodec = httpCodec;
    this.index = index;
    this.request = request;
    this.call = call;
    this.eventListener = eventListener;
    this.connectTimeout = connectTimeout;
    this.readTimeout = readTimeout;
    this.writeTimeout = writeTimeout;
  }
...
  @Override public Response proceed(Request request) throws IOException {
    return proceed(request, streamAllocation, httpCodec, connection);
  }
  public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();
    calls++;
    // If we already have a stream, confirm that the incoming request will use it.
    if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must retain the same host and port");
    }
    // If we already have a stream, confirm that this is the only call to chain.proceed().
    if (this.httpCodec != null && calls > 1) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must call proceed() exactly once");
    }

    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);//这里是重点

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
      throw new IllegalStateException("network interceptor " + interceptor
          + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
      throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    if (response.body() == null) {
      throw new IllegalStateException(
          "interceptor " + interceptor + " returned a response with no body");
    }

    return response;
  }

从这段实现可以看出,是按照添加到 interceptors 集合的顺序,逐个往下调用拦截器的intercept()方法,所以在前面的拦截器会先被调用。这个例子中自然就是 RetryAndFollowUpInterceptor 了。

public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Call call = realChain.call();
    EventListener eventListener = realChain.eventListener();
    //创建一个StreamAllocation
    StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
        createAddress(request.url()), call, eventListener, callStackTrace);
    this.streamAllocation = streamAllocation;

    //统计重定向次数,不能大于20
    int followUpCount = 0; 
    Response priorResponse = null;
    while (true) {
      if (canceled) {
        streamAllocation.release();
        throw new IOException("Canceled");
      }

      Response response;
      boolean releaseConnection = true;
      try {
        //调用下一个interceptor的来获得响应内容
        response = realChain.proceed(request, streamAllocation, null, null);
        releaseConnection = false;
      } catch (RouteException e) {
        // The attempt to connect via a route failed. The request will not have been sent.
        if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
          throw e.getLastConnectException();
        }
        releaseConnection = false;
        continue;
      } catch (IOException e) {
        // An attempt to communicate with a server failed. The request may have been sent.
        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
        releaseConnection = false;
        continue;
      } finally {
        // We're throwing an unchecked exception. Release any resources.
        if (releaseConnection) {
          streamAllocation.streamFailed(null);
          streamAllocation.release();
        }
      }

      // Attach the prior response if it exists. Such responses never have a body.
      if (priorResponse != null) {
        response = response.newBuilder()
            .priorResponse(priorResponse.newBuilder()
                    .body(null)
                    .build())
            .build();
      }
    
     //重定向处理    
      Request followUp = followUpRequest(response, streamAllocation.route());

      if (followUp == null) {
        if (!forWebSocket) {
          streamAllocation.release();
        }
        return response;
      }

      closeQuietly(response.body());

      if (++followUpCount > MAX_FOLLOW_UPS) {
        streamAllocation.release();
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }

      if (followUp.body() instanceof UnrepeatableRequestBody) {
        streamAllocation.release();
        throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
      }

      if (!sameConnection(response, followUp.url())) {
        streamAllocation.release();
        streamAllocation = new StreamAllocation(client.connectionPool(),
            createAddress(followUp.url()), call, eventListener, callStackTrace);
        this.streamAllocation = streamAllocation;
      } else if (streamAllocation.codec() != null) {
        throw new IllegalStateException("Closing the body of " + response
            + " didn't close its backing stream. Bad interceptor?");
      }

      request = followUp;
      priorResponse = response;
    }
}

这个拦截器就如同它的名字retry and followUp,主要负责错误处理和重定向等问题,比如路由错误、IO异常等。

接下来就到了BridgeInterceptor#intercept(),在这个拦截器中,添加了必要请求头信息,gzip处理等。

public Response intercept(Chain chain) throws IOException {
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();
    
    //从这开始给请求添加了一些请求头信息
    RequestBody body = userRequest.body();
    if (body != null) {
      MediaType contentType = body.contentType();
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString());
      }

      long contentLength = body.contentLength();
      if (contentLength != -1) {
        requestBuilder.header("Content-Length", Long.toString(contentLength));
        requestBuilder.removeHeader("Transfer-Encoding");
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked");
        requestBuilder.removeHeader("Content-Length");
      }
    }

    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", hostHeader(userRequest.url(), false));
    }

    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive");
    }

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true;
      requestBuilder.header("Accept-Encoding", "gzip");
    }

    List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
    if (!cookies.isEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies));
    }

    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", Version.userAgent());
    }

    Response networkResponse = chain.proceed(requestBuilder.build());

    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

    Response.Builder responseBuilder = networkResponse.newBuilder()
        .request(userRequest);

    if (transparentGzip
        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
        && HttpHeaders.hasBody(networkResponse)) {
      GzipSource responseBody = new GzipSource(networkResponse.body().source());
      Headers strippedHeaders = networkResponse.headers().newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build();
      responseBuilder.headers(strippedHeaders);
      String contentType = networkResponse.header("Content-Type");
      responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
    }

    return responseBuilder.build();
}

这个拦截器处理请求信息、cookie、gzip等
接着往下是 CacheInterceptor

public Response intercept(Chain chain) throws IOException {
    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;

    long now = System.currentTimeMillis();

    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;

    if (cache != null) {
      cache.trackResponse(strategy);
    }

    if (cacheCandidate != null && cacheResponse == null) {
      closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
    }

    // If we're forbidden from using the network and the cache is insufficient, fail.
    if (networkRequest == null && cacheResponse == null) {
      return new Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(Util.EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
    }

    // If we don't need the network, we're done.
    if (networkRequest == null) {
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }

    Response networkResponse = null;
    try {
      //调用下一个拦截器进行网络请求    
      networkResponse = chain.proceed(networkRequest);
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }

    // If we have a cache response too, then we're doing a conditional get.
    if (cacheResponse != null) {
      if (networkResponse.code() == HTTP_NOT_MODIFIED) {
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }

    Response response = networkResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();

    if (cache != null) {
      if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
        // Offer this request to the cache.
        CacheRequest cacheRequest = cache.put(response);
        return cacheWritingResponse(cacheRequest, response);
      }

      if (HttpMethod.invalidatesCache(networkRequest.method())) {
        try {
          cache.remove(networkRequest);
        } catch (IOException ignored) {
          // The cache cannot be written.
        }
      }
    }

    return response;
}

这个拦截器主要工作是做做缓存处理,如果有有缓存并且缓存可用,那就使用缓存,否则进行调用下一个拦截器 ConnectionInterceptor 进行网络请求,并将响应内容缓存。

public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    
    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();
    
    return realChain.proceed(request, streamAllocation, httpCodec, connection);
}

这个拦截器主要是打开一个到目标服务器的 connection 并调用下一个拦截器 CallServerInterceptor,这是拦截器链最后一个拦截器,它向服务器发起真正的网络请求。

public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();

long sentRequestMillis = System.currentTimeMillis();

realChain.eventListener().requestHeadersStart(realChain.call());
httpCodec.writeRequestHeaders(request);
realChain.eventListener().requestHeadersEnd(realChain.call(), request);

Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
  // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
  // Continue" response before transmitting the request body. If we don't get that, return
  // what we did get (such as a 4xx response) without ever transmitting the request body.
  if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
    httpCodec.flushRequest();
    realChain.eventListener().responseHeadersStart(realChain.call());
    responseBuilder = httpCodec.readResponseHeaders(true);
  }

  if (responseBuilder == null) {
    // Write the request body if the "Expect: 100-continue" expectation was met.
    realChain.eventListener().requestBodyStart(realChain.call());
    long contentLength = request.body().contentLength();
    CountingSink requestBodyOut =
        new CountingSink(httpCodec.createRequestBody(request, contentLength));
    BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);

    request.body().writeTo(bufferedRequestBody);
    bufferedRequestBody.close();
    realChain.eventListener()
        .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
  } else if (!connection.isMultiplexed()) {
    // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
    // from being reused. Otherwise we're still obligated to transmit the request body to
    // leave the connection in a consistent state.
    streamAllocation.noNewStreams();
  }
}

httpCodec.finishRequest();

if (responseBuilder == null) {
  realChain.eventListener().responseHeadersStart(realChain.call());
  responseBuilder = httpCodec.readResponseHeaders(false);
}

Response response = responseBuilder
    .request(request)
    .handshake(streamAllocation.connection().handshake())
    .sentRequestAtMillis(sentRequestMillis)
    .receivedResponseAtMillis(System.currentTimeMillis())
    .build();

int code = response.code();
if (code == 100) {
  // server sent a 100-continue even though we did not request one.
  // try again to read the actual response
  responseBuilder = httpCodec.readResponseHeaders(false);

  response = responseBuilder
          .request(request)
          .handshake(streamAllocation.connection().handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();

  code = response.code();
}

realChain.eventListener()
        .responseHeadersEnd(realChain.call(), response);

if (forWebSocket && code == 101) {
  // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
  response = response.newBuilder()
      .body(Util.EMPTY_RESPONSE)
      .build();
} else {
  response = response.newBuilder()
      .body(httpCodec.openResponseBody(response))
      .build();
}

if ("close".equalsIgnoreCase(response.request().header("Connection"))
    || "close".equalsIgnoreCase(response.header("Connection"))) {
  streamAllocation.noNewStreams();
}

if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
  throw new ProtocolException(
      "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}

return response;

}


相关文章

网友评论

      本文标题:OKHttp的应用与源码解析

      本文链接:https://www.haomeiwen.com/subject/btjopctx.html