美文网首页
OkHttp源码解析 -- 同步异步处理

OkHttp源码解析 -- 同步异步处理

作者: PuHJ | 来源:发表于2018-03-29 15:42 被阅读19次

    前言:

    使用OkHttp,执行网络请求时会有异步还是同步处理。先说下异步和同步的区别,同步并不是指在UI线程中执行,简单的一句话就可说明,同步是会阻塞当前线程,异步是指重新开了一个线程,并在开辟的线程中执行,执行完成后再回调到原始线程。
    那么OkHttp是怎么完成异步和同步的处理的了?接下来的来一步一步的解析。

    1、同步请求

    // 调用代码
    Response response = call.execute();
    

    call的实现类是RealCall,这是一个准备进行网络请求的执行类,他不能被执行两次

    // 同步的请求方法
    @Override public Response execute() throws IOException {
        // 代码块 executed参数就是表示这个RealCall是否执行过了,验证了它不能执行两次
        synchronized (this) {
          if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }
        captureCallStackTrace();
       // 设置一些接口回调,比如callStart,这个回调的注册在OkHttpClient的构建中
        eventListener.callStart(this);
        try {
          // 调用OkHttpClient的调度器插入到http请求队列中
          client.dispatcher().executed(this);
          // 开始执行网络请求的责任链来开始真正的网络请求,得到Response响应后就可以返回了
          Response result = getResponseWithInterceptorChain();
          if (result == null) throw new IOException("Canceled");
          return result;
        } catch (IOException e) {
          // 错误的回调
          eventListener.callFailed(this, e);
          throw e;
        } finally {
          // 因为是插入到队列中,所以执行完成后就进行善后处理了
          client.dispatcher().finished(this);
        }
      }
    

    在执行execute方法过程中,遇到了三个方向:
    1、client.dispatcher().executed(this);
    2、 Response result = getResponseWithInterceptorChain()// 网路请求责任链后面一章分析
    3、 client.dispatcher().finished(this);

    client.dispatcher()是什么东西,什么时候遇到过?
    构建OkHttpClient的时候就对dispatcher赋值了,对于一个OkHttpClient来说,Dispatcher也就是一个

      dispatcher = new Dispatcher();
    

    看上述第一个方法
    Dispatcher执行execute方法很简单,就是讲该RealCall对象入队,不会有其它的处理

      /** Used by {@code Call#execute} to signal it is in-flight. */
      synchronized void executed(RealCall call) {
        runningSyncCalls.add(call);
      }
    
    /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
      // 先进先出的队形,以及执行方式
      private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
    

    继续看第三个方法:

      // 从队列拿出来
      void finished(RealCall call) {
        finished(runningSyncCalls, call, false);
      }
    
    // 参数依次是插入的同步队列,该RealCall,和时候需要重新整理队列的标志对于同步请求不需要
    private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
        int runningCallsCount;
        Runnable idleCallback;
        // 安全加锁
        synchronized (this) {
          // 移除
          if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
          // 整理队列,异步的时候才需要,同步不需要
          if (promoteCalls) promoteCalls();
          // 更新运行的RealCall的数量,包括同步和异步
          runningCallsCount = runningCallsCount();
          idleCallback = this.idleCallback;
        }
        // 最后一个Http请求完成后,并且设置了idleCallback 回调
        if (runningCallsCount == 0 && idleCallback != null) {
          idleCallback.run();
        }
      }
    

    关于同步的请求很简单,简单来说就是插入队列,进行Http请求,请求回调后清除队列中的请求,返回响应体,就结束了。当然http请求并没分析,只解析了整体的调度过程。

    2、异步请求

    异步请求的调度方式,传入一个接口实例

    call.enqueue(new Callback() {
                    @Override
                    public void onFailure(Call call, IOException e) {
    
                    }
    
                    @Override
                    public void onResponse(Call call, Response response) throws IOException {
                        
                    }
                });
    

    同样的,我们来看下RealCall中的enqueue方法
    what?比同步的还少?不存在的,这里看的少,只不过被封装起来了

      @Override public void enqueue(Callback responseCallback) {
        // 同样的同一个RealCall不能执行两次,可以通过clone()方法获取新的对象
        synchronized (this) {
          if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }
        captureCallStackTrace();
        eventListener.callStart(this);
        // 直接交给Dispatcher管理了,尽然撒手不管了
        client.dispatcher().enqueue(new AsyncCall(responseCallback));
      }
    

    首先看下Dispatcher中的enqueue方法

     //  这个call是个Runnable,我们将传入的Callback封装在AsyncCall里面
    synchronized void enqueue(AsyncCall call) {
        // 是否可以执行?条件就是是否到了最大的运行数量。是否到了同一个主机的最大运行数量
        if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
          // 加入到运行队列中
          runningAsyncCalls.add(call);
          // 通过线程池来执行Runnable
          executorService().execute(call);
        } else {
         // 条件不满足,那就只能等待了,放在异步等待队列中,同步的话就不存在,直接放行
          readyAsyncCalls.add(call);
        }
      }
    

    为了解释清楚,需要了解Dispatcher的成员变量,不了解还玩个啥。

    首先Dispatcher这个类是final,不可以继承重写的
    
      // http最大的请求
      private int maxRequests = 64;
      // 每个主机的最大http请求
      private int maxRequestsPerHost = 5;
      private @Nullable Runnable idleCallback;
    
      /** Executes calls. Created lazily. */
      // 懒加载创建线程池
      private @Nullable ExecutorService executorService;
    
      /** Ready async calls in the order they'll be run. */
      // 准备的异步队列
      private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
    
      /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
      // 正在运行的异步队列(包含了已经取消但是还没执行完成的请求)
      private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
    
      /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
      // 正在运行的同步队列(提醒,这里面保存的RealCall实例,异步的保存的是Runnable)(包含了已经取消但是还没执行完成的请求
      private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
    
    // synchronized 同步
    public synchronized ExecutorService executorService() {
        if (executorService == null) {
          /**
            * 核心线程为0,为了不消耗资源,因为核心线程会一直执行下去
            * 最大线程数Integer.MAX_VALUE,其实达不到,maxRequests 
            *   60秒之后没有利用就销毁了
            *  线程等待队列
            * 线程工厂
            **/
          executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
              new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
        }
        return executorService;
      }
    

    回过头看看怎么对传入的Callback封装的,AsyncCall就是一个Runnable,只有新开辟一个线程才叫异步,不然怎么可能叫异步了
    为了简化代码量,直截取了Runnable中的run方法中的核心代码 <该成功失败的回调都是在子线程中>

    @Override protected void execute() {
          boolean signalledCallback = false;
          try {
            // 网络请求
            Response response = getResponseWithInterceptorChain();
           // 如果取消了 那就直接返回错误
            if (retryAndFollowUpInterceptor.isCanceled()) {
              signalledCallback = true;
              responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
            } else {
              signalledCallback = true;
             // 回调成功
              responseCallback.onResponse(RealCall.this, response);
            }
          } catch (IOException e) {
            if (signalledCallback) {
              // Do not signal the callback twice!
              Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
            } else {
              eventListener.callFailed(RealCall.this, e);
              // 遇到了IO的异常也回调错误
              responseCallback.onFailure(RealCall.this, e);
            }
          } finally {
           // 最后清除
            client.dispatcher().finished(this);
          }
        }
    

    废话不多说,直接看看 client.dispatcher().finished(this);这个会和同步的有什么区别了?这个this代表当前Runnable
    贴出代码

      /** Used by {@code AsyncCall#run} to signal completion. */
      void finished(AsyncCall call) {
        // 运行的异步队列,Runnable,需要重新调整队列
        finished(runningAsyncCalls, call, true);
      }
    
      private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
        int runningCallsCount;
        Runnable idleCallback;
        synchronized (this) {
          if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
          // 重点分析这个方法,他解决了什么时候需要出发从准备队列放到执行队列中,并触发执行的契机。最好的契机就是有一个执行完了,才可能会需要处理,这就是promoteCalls的意义
          if (promoteCalls) promoteCalls();
          runningCallsCount = runningCallsCount();
          idleCallback = this.idleCallback;
        }
    
        if (runningCallsCount == 0 && idleCallback != null) {
          idleCallback.run();
        }
      }
    
      // 对队列的处理
      private void promoteCalls() {
        // 基本条件不满足 返回
        if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
        if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
        
        // 对准备的异步队列进行遍历
        for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
          AsyncCall call = i.next();
          
         // 满足的条件(该主机的http请求个数没有达到最大值maxRequestsPerHost = 5)
          if (runningCallsForHost(call) < maxRequestsPerHost) {
            // 从准备队列删除,并插入到运行队列中,并放在线程池中执行
            i.remove();
            runningAsyncCalls.add(call);
            executorService().execute(call);
          }
    
          if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
        }
      }
    

    总结:到此我们就解析完了异步和同步的调度过程了。看下面盗用的调度过程


    调度过程

    其实Dispatcher这个类的处理过程就是生产者和消费者模式。保存了准备队列和执行队列。
    同时对数据进行操作的时候都是加上了synchronized关键字。
    内部通过维护线程池处理减少资源的浪费,更加高效。

    相关文章

      网友评论

          本文标题:OkHttp源码解析 -- 同步异步处理

          本文链接:https://www.haomeiwen.com/subject/dgdmcftx.html