美文网首页
okhttp源码理解

okhttp源码理解

作者: 杰奎琳子 | 来源:发表于2020-08-04 08:25 被阅读0次

框架优点:

支持HTTPS/HTTP2/WebSocket(服务器可主动推送消息)

内部维护任务队列线程池,友好支持并发访问

内部维护连接池,支持多路复用,减少连接创建开销

提供拦截器链,实现request和response的分层处理

Okio提供超时机制

Socket创建支持最佳路由

Interface-接口层:接口层接收用户的网络访问请求,发起实际的网络访问

OkHttpClient是OkHttp框架的用户面板,用户使用OkHttp进行各种设置,发起各种网络请求都是通过OkHttpClient完成的

Call描述一个实际的访问请求,用户的每一个网络请求都是一个Call实例。Call本身只是一个接口,定义了Call的接口方法,实际执行过程中,OkHttp会为每一个请求创建一个RealCall,每一个RealCall内部有一个AsyncCall

Dispatcher是OkHttp的调度器,其内部维护了一个线程池

Protocal-协议层:Protocol层负责处理协议逻辑,OkHttp支持Http1/Http2/WebSocket协议

Connection-连接层:在连接层中有一个连接池,统一管理所有的Socket连接,当用户新发起一个网络请求时,OkHttp会首先从连接池中查找是否有符合要求的连接,如果有则直接通过该连接发送网络请求;否则新创建一个连接。RealConnection描述一个物理Socket连接,连接池中维护多个RealConnection实例。由于Http/2支持多路复用,一个RealConnection可以支持多个网络访问请求,所以OkHttp又引入了StreamAllocation来描述一个实际的网络请求

Cache-缓存层:Cache层负责维护请求缓存,当用户的网络请求在本地已有符合要求的缓存时,OkHttp会直接从缓存中返回结果,从而节省网络开销

IO层:I/O层负责实际的数据读写。OkHttp的另一大有点就是其高效的I/O操作,这归因于其高效的I/O库Okio

Interceptor-拦截器层:拦截器层提供了一个类AOP接口,方便用户可以切入到各个层面对网络访问进行拦截并执行相关逻辑

简单使用+任务队列:

同步请求:RealCall表示被执行的请求,调用execute方法执行同步请求,最终会把RealCall请求放到Dispatcher的同步双端队列中顺序执行,执行结束后调用finish方法,把RealCall请求从Dispatcher中移除,无论是同步还是异步请求,最终都会调用getResponseWithInterceptorChain方法,在OkHttp中,拦截器链会依次完成相应功能

Call对象只能执行一次,否则会抛出异常

public String OkHttpGet() throws Exception {

  OkHttpClient client = new OkHttpClient();

  Request request = new Request.Builder().url(url).build();

  Response response = client.newCall(request).execute();

  return response.body().string();

}

@Override public Response execute() throws IOException {

    synchronized (this) {

      if (executed) throw new IllegalStateException("Already Executed");

      executed = true;

    }

    try {

      client.dispatcher().executed(this);

      Response result = getResponseWithInterceptorChain();

      if (result == null) throw new IOException("Canceled");

      return result;

    } catch (IOException e) {

      throw e;

    } finally {

      client.dispatcher().finished(this);

    }

  }

  synchronized void executed(RealCall call) {

    runningSyncCalls.add(call);

  }

异步请求:AsyncCall表示被执行的请求,调用enqueue方法执行异步请求,如果当前所有请求数(64)和单一host请求数(5)满足要求,则把AsyncCall放到Dispatcher的Running队列中顺序执行,否则放到waiting队列中,AsyncCall执行结束后调用finish方法,把当前AsyncCall从running队列中移除并执行线程调度

public void OkHttpSysGet() {

  String url = "http://wwww.baidu.com";

  OkHttpClient okHttpClient = new OkHttpClient();

  final Request request = new Request.Builder()

        .url(url)

        .get()//默认就是GET请求,可以不写

        .build();

  Call call = okHttpClient.newCall(request);

  call.enqueue(new Callback() {

    @Override

    public void onFailure(Call call, IOException e) {

      Log.d("okHttp", "onFailure: ");

    }

    @Override

    public void onResponse(Call call, Response response) throws IOException {

      Log.d("okHttp", "onResponse: " + response.body().string());

    }

  });

}

@Override public void enqueue(Callback responseCallback) {

  synchronized (this) {

    if (executed) throw new IllegalStateException("Already Executed");

    executed = true;

  }

  captureCallStackTrace();

  eventListener.callStart(this);

  client.dispatcher().enqueue(new AsyncCall(responseCallback));

}

synchronized void enqueue(AsyncCall call) {

  if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {

    runningAsyncCalls.add(call);

    executorService().execute(call);

  } else {

    readyAsyncCalls.add(call);

  }

}

  final class AsyncCall extends NamedRunnable {

    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {

      super("OkHttp %s", redactedUrl());

      this.responseCallback = responseCallback;

    }

    @Override protected void execute() {

      boolean signalledCallback = false;

      try {

        Response response = getResponseWithInterceptorChain();

        if (retryAndFollowUpInterceptor.isCanceled()) {

          signalledCallback = true;

          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));

        } else {

          signalledCallback = true;

          responseCallback.onResponse(RealCall.this, response);

        }

      } catch (IOException e) {

      } finally {

        client.dispatcher().finished(this);

      }

    }

  }

Dispatcher调度器:维护请求状态、维护线程池

  /** Ready async calls in the order they'll be run. */

  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */

  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */

  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {

    int runningCallsCount;

    Runnable idleCallback;

    synchronized (this) {

      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");

      if (promoteCalls) promoteCalls();

      runningCallsCount = runningCallsCount();

      idleCallback = this.idleCallback;

    }

    if (runningCallsCount == 0 && idleCallback != null) {

      idleCallback.run();

    }

  }

  private void promoteCalls() {

    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.

    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {

      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {

        i.remove();

        runningAsyncCalls.add(call);

        executorService().execute(call);

      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.

    }

  }

线程池的作用:减少创建线程的开销、减少线程过少造成CPU闲置、减少线程过多时内存和线程切换的开销

创建线程池的参数:核心线程数、最大线程数、非核心线程存活时间、时间单位、工作队列、线程工厂

    /**

    * Creates an Executor that uses a single worker thread operating

    * off an unbounded queue. (Note however that if this single

    * thread terminates due to a failure during execution prior to

    * shutdown, a new one will take its place if needed to execute

    * subsequent tasks.)  Tasks are guaranteed to execute

    * sequentially, and no more than one task will be active at any

    * given time. Unlike the otherwise equivalent

    * {@code newFixedThreadPool(1)} the returned executor is

    * guaranteed not to be reconfigurable to use additional threads.

    *

    * @return the newly created single-threaded Executor

    */

    public static ExecutorService newSingleThreadExecutor() {

        return new FinalizableDelegatedExecutorService

            (new ThreadPoolExecutor(1, 1,

                                    0L, TimeUnit.MILLISECONDS,

                                    new LinkedBlockingQueue<Runnable>()));

    }

    /**

    * Creates a thread pool that reuses a fixed number of threads

    * operating off a shared unbounded queue.  At any point, at most

    * {@code nThreads} threads will be active processing tasks.

    * If additional tasks are submitted when all threads are active,

    * they will wait in the queue until a thread is available.

    * If any thread terminates due to a failure during execution

    * prior to shutdown, a new one will take its place if needed to

    * execute subsequent tasks.  The threads in the pool will exist

    * until it is explicitly {@link ExecutorService#shutdown shutdown}.

    *

    * @param nThreads the number of threads in the pool

    * @return the newly created thread pool

    * @throws IllegalArgumentException if {@code nThreads <= 0}

    */

    public static ExecutorService newFixedThreadPool(int nThreads) {

        return new ThreadPoolExecutor(nThreads, nThreads,

                                      0L, TimeUnit.MILLISECONDS,

                                      new LinkedBlockingQueue<Runnable>());

    }

    /**

    * Creates a thread pool that creates new threads as needed, but

    * will reuse previously constructed threads when they are

    * available.  These pools will typically improve the performance

    * of programs that execute many short-lived asynchronous tasks.

    * Calls to {@code execute} will reuse previously constructed

    * threads if available. If no existing thread is available, a new

    * thread will be created and added to the pool. Threads that have

    * not been used for sixty seconds are terminated and removed from

    * the cache. Thus, a pool that remains idle for long enough will

    * not consume any resources. Note that pools with similar

    * properties but different details (for example, timeout parameters)

    * may be created using {@link ThreadPoolExecutor} constructors.

    *

    * @return the newly created thread pool

    */

    public static ExecutorService newCachedThreadPool() {

        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

                                      60L, TimeUnit.SECONDS,

                                      new SynchronousQueue<Runnable>());

    }

    /**

    * Creates a new {@code ThreadPoolExecutor} with the given initial

    * parameters.

    *

    * @param corePoolSize the number of threads to keep in the pool, even

    *        if they are idle, unless {@code allowCoreThreadTimeOut} is set

    * @param maximumPoolSize the maximum number of threads to allow in the

    *        pool

    * @param keepAliveTime when the number of threads is greater than

    *        the core, this is the maximum time that excess idle threads

    *        will wait for new tasks before terminating.

    * @param unit the time unit for the {@code keepAliveTime} argument

    * @param workQueue the queue to use for holding tasks before they are

    *        executed.  This queue will hold only the {@code Runnable}

    *        tasks submitted by the {@code execute} method.

    * @param threadFactory the factory to use when the executor

    *        creates a new thread

    * @param handler the handler to use when execution is blocked

    *        because the thread bounds and queue capacities are reached

    * @throws IllegalArgumentException if one of the following holds:<br>

    *        {@code corePoolSize < 0}<br>

    *        {@code keepAliveTime < 0}<br>

    *        {@code maximumPoolSize <= 0}<br>

    *        {@code maximumPoolSize < corePoolSize}

    * @throws NullPointerException if {@code workQueue}

    *        or {@code threadFactory} or {@code handler} is null

    */

    public ThreadPoolExecutor(int corePoolSize,

                              int maximumPoolSize,

                              long keepAliveTime,

                              TimeUnit unit,

                              BlockingQueue<Runnable> workQueue,

                              ThreadFactory threadFactory,

                              RejectedExecutionHandler handler) {

        if (corePoolSize < 0 ||

            maximumPoolSize <= 0 ||

            maximumPoolSize < corePoolSize ||

            keepAliveTime < 0)

            throw new IllegalArgumentException();

        if (workQueue == null || threadFactory == null || handler == null)

            throw new NullPointerException();

        this.corePoolSize = corePoolSize;

        this.maximumPoolSize = maximumPoolSize;

        this.workQueue = workQueue;

        this.keepAliveTime = unit.toNanos(keepAliveTime);

        this.threadFactory = threadFactory;

        this.handler = handler;

    }

拦截器链:

首先看一下getResponseWithInterceptorChain的实现:

Response getResponseWithInterceptorChain() throws IOException {

    List<Interceptor> interceptors = new ArrayList<>();

    interceptors.addAll(client.interceptors());

    interceptors.add(retryAndFollowUpInterceptor);

    interceptors.add(new BridgeInterceptor(client.cookieJar()));

    interceptors.add(new CacheInterceptor(client.internalCache()));

    interceptors.add(new ConnectInterceptor(client));

    if (!forWebSocket) {

      interceptors.addAll(client.networkInterceptors());

    }

    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0, originalRequest);

    return chain.proceed(originalRequest);

}

逻辑大致分为两部分:

创建一系列拦截器,并将其放入一个拦截器数组中。这部分拦截器即包括用户自定义的拦截器也包括框架内部拦截器

创建一个拦截器链RealInterceptorChain,并执行拦截器链的proceed方法

接下来看RealInterceptorChain的实现逻辑:

public final class RealInterceptorChain implements Interceptor.Chain {

  private final List<Interceptor> interceptors;

  private final StreamAllocation streamAllocation;

  private final HttpCodec httpCodec;

  private final RealConnection connection;

  private final int index;

  private final Request request;

  private int calls;

  public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,

                HttpCodec httpCodec, RealConnection connection, int index, Request request) {

  @Override public Response proceed(Request request) throws IOException {

    return proceed(request, streamAllocation, httpCodec, connection);

  }

  public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,

                RealConnection connection) throws IOException {

    ......

    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,

                                                connection, index + 1, request);

    Interceptor interceptor = interceptors.get(index);

    Response response = interceptor.intercept(next);

    ......

    return response;

  }

}

在proceed方法中的核心代码可以看到,proceed实际上也做了两件事:

创建下一个拦截链。传入index + 1使得下一个拦截器链只能从下一个拦截器开始访问

执行索引为index的intercept方法,并将下一个拦截器链传入该方法

接下来再看下第一个拦截器RetryAndFollowUpInterceptor的intercept方法:

public final class RetryAndFollowUpInterceptor implements Interceptor {

    @Override public Response intercept(Chain chain) throws IOException {

    Request request = chain.request();

    streamAllocation = new StreamAllocation(

        client.connectionPool(), createAddress(request.url()), callStackTrace);

    int followUpCount = 0;

    Response priorResponse = null;

    while (true) {

      if (canceled) {

        streamAllocation.release();

        throw new IOException("Canceled");

      }

      Response response = null;

      boolean releaseConnection = true;

      try {

      //执行下一个拦截器链的proceed方法

        response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);

        releaseConnection = false;

      } catch (RouteException e) {

        if (!recover(e.getLastConnectException(), false, request)) {

          throw e.getLastConnectException();

        }

        releaseConnection = false;

        continue;

      } catch (IOException e) {

        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);

        if (!recover(e, requestSendStarted, request)) throw e;

        releaseConnection = false;

        continue;

      } finally {

        if (releaseConnection) {

          streamAllocation.streamFailed(null);

          streamAllocation.release();

        }

      }

      ...

      Request followUp = followUpRequest(response);

      closeQuietly(response.body());

      ...

    }

  }

}

一个拦截器的intercept方法所执行的逻辑大致分为三部分:

在发起请求前对request进行处理

调用下一个拦截器,获取response

对response进行处理,返回给上一个拦截器

Okhttp中除了用户自定义的拦截器外还有几个核心拦截器完成了网络访问的核心逻辑,按照先后顺序依次是:

RetryAndFollowUpInterceptor

BridgeInterceptor

CacheInterceptor

ConnectIntercetot

CallServerInterceptor

RetryAndFollowUpInterceptor负责两部分逻辑:

在网络请求失败后进行重试

当服务器返回当前请求需要进行重定向时直接发起新的请求,并在条件允许情况下复用当前连接

BridgeInterceptor主要负责以下几部分内容:

设置内容长度,内容编码

设置gzip压缩,并在接收到内容后进行解压。省去了应用层处理数据解压的麻烦

添加cookie

设置其他报头,如User-Agent,Host,Keep-alive等,其中Keep-Alive是实现多路复用的必要步骤

CacheInterceptor的职责很明确,就是负责Cache的管理

当网络请求有符合要求的Cache时直接返回Cache

当服务器返回内容有改变时更新当前cache

如果当前cache失效,删除

ConnectInterceptor为当前请求找到合适的连接,可能复用已有连接也可能是重新创建的连接,返回的连接由连接池负责决定。

CallServerInterceptor负责向服务器发起真正的访问请求,并在接收到服务器返回后读取响应返回

缓存策略:

如果缓存不符合要求,同时也没有网络,则直接返回504(网关错误)

如果有缓存,但是没有网络,则直接返回缓存结果

如果有网络,也有符合的缓存,则执行HTTP的缓存策略

缓存开关有:pragma、Expires、cache-control

缓存校验有:Last-Modified、eTag

连接复用:

在HTTP1.0中每次请求都需要重新建立连接,每个连接负责一个资源请求。创建一个TCP连接需要3次握手,而释放连接则需要2次或4次握手。重复的创建和释放连接极大地影响了网络效率,同时也增加了系统开销

为了有效地解决这一问题,HTTP/1.1提出了Keep-Alive机制,当一个HTTP请求的数据传输结束后,TCP连接不立即释放,如果此时有新的HTTP请求,且其请求的Host和上次请求相同,则可以直接复用未释放的连接,从而省去了创建和释放TCP连接的开销,减少了网络延时

HTTP/2主要解决了以下问题:

报头压缩:HTTP/2使用HPACK压缩格式压缩请求和响应报头数据,减少不必要流量开销

请求与响应复用:HTTP/2通过引入新的二进制分帧层实现了完整的请求和响应复用,客户端和服务器可以将HTTP消息分解为互不依赖的帧,然后交错发送,最后再在另一端将其重新组装

指定数据流优先级:将 HTTP 消息分解为很多独立的帧之后,我们就可以复用多个数据流中的帧,客户端和服务器交错发送和传输这些帧的顺序就成为关键的性能决定因素。为了做到这一点,HTTP/2 标准允许每个数据流都有一个关联的权重和依赖关系

OkHttp内部通过ConnectionPool来管理连接池

连接复用清理规则:

通过RealConnection中的StreamAllocation弱引用队列是否为0来判断当前RealConnection是否空闲

ConnectionPool有一个独立的线程cleanupRunnable来清理连接池,其触发时机有两个:当连接池新增连接时、当连接闲置时(connectionBecameIdle接口被调用时)

搜集所有的空闲连接并记录KeepAlive时间,如果空闲连接超过5个或者某个连接的KeepAlive时间大于五分钟,就移除KeepAlive时间最长的连接,立刻再次扫描

如果目前还空闲连接不多于5个,就返回最大KeepAlive的剩余的时间,供下次清理

如果(全部都是活跃的连接),就返回默认的keep-alive时间,也就是5分钟后再执行清理

Okio:不需要区分字节流和字符流、增加超时机制、Buffer使用segment链保存数据,可以避免内存抖动,数据拷贝只需操作链表

同步超时:在read和write方法前面判断是否超时,可以判断整体读写超时,无法阻止单次读写超时

Socket异步超时:将AsyncTimeout组成有序链表,并且开启一个线程来监控,到达超时时间则主动关闭连接

相关文章

网友评论

      本文标题:okhttp源码理解

      本文链接:https://www.haomeiwen.com/subject/vdszlktx.html