美文网首页
OkHttp源码解析 -- 同步异步处理

OkHttp源码解析 -- 同步异步处理

作者: PuHJ | 来源:发表于2018-03-29 15:42 被阅读19次

前言:

使用OkHttp,执行网络请求时会有异步还是同步处理。先说下异步和同步的区别,同步并不是指在UI线程中执行,简单的一句话就可说明,同步是会阻塞当前线程,异步是指重新开了一个线程,并在开辟的线程中执行,执行完成后再回调到原始线程。
那么OkHttp是怎么完成异步和同步的处理的了?接下来的来一步一步的解析。

1、同步请求

// 调用代码
Response response = call.execute();

call的实现类是RealCall,这是一个准备进行网络请求的执行类,他不能被执行两次

// 同步的请求方法
@Override public Response execute() throws IOException {
    // 代码块 executed参数就是表示这个RealCall是否执行过了,验证了它不能执行两次
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
   // 设置一些接口回调,比如callStart,这个回调的注册在OkHttpClient的构建中
    eventListener.callStart(this);
    try {
      // 调用OkHttpClient的调度器插入到http请求队列中
      client.dispatcher().executed(this);
      // 开始执行网络请求的责任链来开始真正的网络请求,得到Response响应后就可以返回了
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } catch (IOException e) {
      // 错误的回调
      eventListener.callFailed(this, e);
      throw e;
    } finally {
      // 因为是插入到队列中,所以执行完成后就进行善后处理了
      client.dispatcher().finished(this);
    }
  }

在执行execute方法过程中,遇到了三个方向:
1、client.dispatcher().executed(this);
2、 Response result = getResponseWithInterceptorChain()// 网路请求责任链后面一章分析
3、 client.dispatcher().finished(this);

client.dispatcher()是什么东西,什么时候遇到过?
构建OkHttpClient的时候就对dispatcher赋值了,对于一个OkHttpClient来说,Dispatcher也就是一个

  dispatcher = new Dispatcher();

看上述第一个方法
Dispatcher执行execute方法很简单,就是讲该RealCall对象入队,不会有其它的处理

  /** Used by {@code Call#execute} to signal it is in-flight. */
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }

/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  // 先进先出的队形,以及执行方式
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

继续看第三个方法:

  // 从队列拿出来
  void finished(RealCall call) {
    finished(runningSyncCalls, call, false);
  }

// 参数依次是插入的同步队列,该RealCall,和时候需要重新整理队列的标志对于同步请求不需要
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    // 安全加锁
    synchronized (this) {
      // 移除
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      // 整理队列,异步的时候才需要,同步不需要
      if (promoteCalls) promoteCalls();
      // 更新运行的RealCall的数量,包括同步和异步
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }
    // 最后一个Http请求完成后,并且设置了idleCallback 回调
    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

关于同步的请求很简单,简单来说就是插入队列,进行Http请求,请求回调后清除队列中的请求,返回响应体,就结束了。当然http请求并没分析,只解析了整体的调度过程。

2、异步请求

异步请求的调度方式,传入一个接口实例

call.enqueue(new Callback() {
                @Override
                public void onFailure(Call call, IOException e) {

                }

                @Override
                public void onResponse(Call call, Response response) throws IOException {
                    
                }
            });

同样的,我们来看下RealCall中的enqueue方法
what?比同步的还少?不存在的,这里看的少,只不过被封装起来了

  @Override public void enqueue(Callback responseCallback) {
    // 同样的同一个RealCall不能执行两次,可以通过clone()方法获取新的对象
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    // 直接交给Dispatcher管理了,尽然撒手不管了
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

首先看下Dispatcher中的enqueue方法

 //  这个call是个Runnable,我们将传入的Callback封装在AsyncCall里面
synchronized void enqueue(AsyncCall call) {
    // 是否可以执行?条件就是是否到了最大的运行数量。是否到了同一个主机的最大运行数量
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      // 加入到运行队列中
      runningAsyncCalls.add(call);
      // 通过线程池来执行Runnable
      executorService().execute(call);
    } else {
     // 条件不满足,那就只能等待了,放在异步等待队列中,同步的话就不存在,直接放行
      readyAsyncCalls.add(call);
    }
  }

为了解释清楚,需要了解Dispatcher的成员变量,不了解还玩个啥。

首先Dispatcher这个类是final,不可以继承重写的

  // http最大的请求
  private int maxRequests = 64;
  // 每个主机的最大http请求
  private int maxRequestsPerHost = 5;
  private @Nullable Runnable idleCallback;

  /** Executes calls. Created lazily. */
  // 懒加载创建线程池
  private @Nullable ExecutorService executorService;

  /** Ready async calls in the order they'll be run. */
  // 准备的异步队列
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  // 正在运行的异步队列(包含了已经取消但是还没执行完成的请求)
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  // 正在运行的同步队列(提醒,这里面保存的RealCall实例,异步的保存的是Runnable)(包含了已经取消但是还没执行完成的请求
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

// synchronized 同步
public synchronized ExecutorService executorService() {
    if (executorService == null) {
      /**
        * 核心线程为0,为了不消耗资源,因为核心线程会一直执行下去
        * 最大线程数Integer.MAX_VALUE,其实达不到,maxRequests 
        *   60秒之后没有利用就销毁了
        *  线程等待队列
        * 线程工厂
        **/
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

回过头看看怎么对传入的Callback封装的,AsyncCall就是一个Runnable,只有新开辟一个线程才叫异步,不然怎么可能叫异步了
为了简化代码量,直截取了Runnable中的run方法中的核心代码 <该成功失败的回调都是在子线程中>

@Override protected void execute() {
      boolean signalledCallback = false;
      try {
        // 网络请求
        Response response = getResponseWithInterceptorChain();
       // 如果取消了 那就直接返回错误
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
         // 回调成功
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          // 遇到了IO的异常也回调错误
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
       // 最后清除
        client.dispatcher().finished(this);
      }
    }

废话不多说,直接看看 client.dispatcher().finished(this);这个会和同步的有什么区别了?这个this代表当前Runnable
贴出代码

  /** Used by {@code AsyncCall#run} to signal completion. */
  void finished(AsyncCall call) {
    // 运行的异步队列,Runnable,需要重新调整队列
    finished(runningAsyncCalls, call, true);
  }

  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      // 重点分析这个方法,他解决了什么时候需要出发从准备队列放到执行队列中,并触发执行的契机。最好的契机就是有一个执行完了,才可能会需要处理,这就是promoteCalls的意义
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

  // 对队列的处理
  private void promoteCalls() {
    // 基本条件不满足 返回
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
    
    // 对准备的异步队列进行遍历
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();
      
     // 满足的条件(该主机的http请求个数没有达到最大值maxRequestsPerHost = 5)
      if (runningCallsForHost(call) < maxRequestsPerHost) {
        // 从准备队列删除,并插入到运行队列中,并放在线程池中执行
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }

总结:到此我们就解析完了异步和同步的调度过程了。看下面盗用的调度过程


调度过程

其实Dispatcher这个类的处理过程就是生产者和消费者模式。保存了准备队列和执行队列。
同时对数据进行操作的时候都是加上了synchronized关键字。
内部通过维护线程池处理减少资源的浪费,更加高效。

相关文章

网友评论

      本文标题:OkHttp源码解析 -- 同步异步处理

      本文链接:https://www.haomeiwen.com/subject/dgdmcftx.html