美文网首页
Okhttp主线流程源码学习笔记

Okhttp主线流程源码学习笔记

作者: Chenyangqi | 来源:发表于2020-10-26 14:33 被阅读0次

本次学习Okhttp源码是对以下这个版本,从开始请求到请求结束回调请求结果的一这个主要流程

 implementation 'com.squareup.okhttp3:okhttp:3.10.0'

所涉及的核心类为

  • OkhttpClient
  • Request
  • Response
  • Call
  • Callback
Okhttp简单使用如下:
OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
        Request request = new Request.Builder()
                .url("http://baidu.com")
                .get()
                .build();
        Call call = okHttpClient.newCall(request);
        call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
                Log.e("test", "fail ---");
            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                Log.e("test", "success-->" + response.body());
            }
        });

发送请求的主线流程

call.enqueue(Callback responseCallback),call是一个接口它的实现类是RealCall

final class RealCall implements Call {

    ...
  @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }
  ...

}

以上这段代码表示不能同时执行相同的请求,否则会抛出"Already Executed"的异常,然后执行到client.dispatcher().enqueue(new AsyncCall(responseCallback)),跟进Dispatcher中

public final class Dispatcher {
  //等待队列的容量
  private int maxRequests = 64;
  //运行队列的容量
  private int maxRequestsPerHost = 5;
  //请求的缓存队列
  private @Nullable ExecutorService executorService;
  //等待队列
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
  //异步请求的运行队列
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
  //同步请求的运行队列
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

  //初始化缓存队列
  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

    //异步请求
  synchronized void enqueue(AsyncCall call) {
    //如果对同一个host地址请求的的运行队列此时长度小于5,则把请求添加进运行队列,立即通过线程池执行,如果不满足条线就先添加到等待队列里
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }
}

Dispatcher类中定义了等待队列和同步请求和异步请求的队列,以及等待列表容量64,运行队列长度5,也就是说如果添加一个请求此时运行队列里面不足5个就直接把这个请求放进运行队列直接执行,如果此时运行队列里面有五个则先放在等待队列中,等运行队列里面执行完少于5个的时候再从等待队列里面取去放入运行队列执行请求
其中线程池创建过程如下

 executorService = new ThreadPoolExecutor(0, 
                    Integer.MAX_VALUE, 
                    60,
                    TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(), 
                    Util.threadFactory("OkHttp Dispatcher", false));
  
 public class Util{
     public static ThreadFactory threadFactory(final String name, final boolean daemon) {
    return new ThreadFactory() {
      @Override public Thread newThread(Runnable runnable) {
        Thread result = new Thread(runnable, name);
        result.setDaemon(daemon);
        return result;
      }
    };
 } 

创建了一个核心线程为0,无限容量的子线程,存活时间60s的线程池,并且为线程指定了名字,thread.setDaemon(daemon)设置线程为守护线程,SynchronousQueue是一个没有容量的队列,这里配合核心线程为0,线程容量Int.Max实现了一个接收到请求就直接分配线程(复用或者新创建线程)执行的过程。
再分析线程池执行executorService().execute(call),其中的call是new AsyncCall(responseCallback)创建的,AsyncCalls继承自NamedRunnable,是RealCall的内部类。NamedRunnable实现了Runnable接口

public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();

AsyncCall继承NamedRunnable,并且实现了抽象方法execute

final class RealCall implements Call {

    final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }

    String host() {
      return originalRequest.url().host();
    }

    Request request() {
      return originalRequest;
    }

    RealCall get() {
      return RealCall.this;
    }

    //实现父类定义的抽象类,当父类被线程池执行的时候,这个类也会执行
    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        //责任链模式实现的各类拦截器
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }
}

通过以上过程可以知道,当Dispatcher.enqueue()执行线程池executorService().execute(call)会触发RealCall的内部类AsyncCall执行复写父类抽象的execute方法

@Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }

通过策略模式逐层让拦截器处理response,如果请求失败则回调responseCallback.onFailure(RealCall.this, new IOException("Canceled")),如果请求成功则回调responseCallback.onResponse(RealCall.this, response)。

如上,Http请求的主线流程就完成了。

Okhttp中用到的设计模式

构建者模式

OkHttpClient和Request的创建是构造者模式创建的,构造者模式比较常见不多解释。

OkHttpClient okHttpClient = new OkHttpClient.Builder()
                .build();
 Request request = new Request.Builder()
           .url("http://baidu.com")
           .get()
           .build();

责任链模式

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    return chain.proceed(originalRequest);
  }

责任链模式:为请求创建了一个接收者对象的链。这种模式给予请求的类型,对请求的发送者和接收者进行解耦。这种类型的设计模式属于行为型模式。
在这种模式中,通常每个接收者都包含对另一个接收者的引用。如果一个对象不能处理该请求,那么它会把相同的请求传给下一个接收者,依此类推。

举个栗子:
员工李华需要请假,向所在部门主管(Manager1)申请,部门主管(Manager1)同意转给上一级主管(Manager2)审批,Manager2同意审批再转给人事主管(Manager3)审批
step 1:创建执行事务的接口IBaseTask.java

interface IBaseTask {
    /**
     * @param task      事务内容
     * @param iBaseTask 下一个任务节点
     */
    public void doAction(String content, IBaseTask iBaseTask);
}

事特批:创建三个责任链成员,也就是三个经理Manager

class Manager1 implements IBaseTask {

    @Override
    public void doAction(String content, IBaseTask iBaseTask) {
        if (content.equals("请假")) {
            System.out.println("Manager1 同意请假 并转交给下一个Manager审批...");
            iBaseTask.doAction(content, iBaseTask);
        } else {

        }
    }
}

class Manager2 implements IBaseTask{
    @Override
    public void doAction(String content, IBaseTask iBaseTask) {
        if (content.equals("请假")) {
            System.out.println("Manager2 同意请假 并转交给下一个Manager审批...");
            iBaseTask.doAction(content, iBaseTask);
        }
    }
}

class Manager3 implements IBaseTask {
    @Override
    public void doAction(String content, IBaseTask iBaseTask) {
        if (content.equals("请假")) {
            System.out.println("Manager3 同意请假 流程结束");
            return;
        }
    }
}

step3:通过TaskManager实现责任链实现

class TaskManager implements IBaseTask {
    private List<IBaseTask> taskList;
    private int index = 0;

    public TaskManager() {
        this.taskList = new ArrayList<>();
    }

    public void addTask(IBaseTask task) {
        if (taskList == null || taskList.isEmpty()) {
            this.taskList = new ArrayList<>();
        }
        taskList.add(task);
    }

    @Override
    public void doAction(String content, IBaseTask iBaseTask) {
        if (taskList.isEmpty()) {
            System.out.println("taskList is empty!");
            return;
        }
        if (index < 0 || index >= taskList.size()) {
            System.out.println("index out of bundle!");
            return;
        }
        IBaseTask task = taskList.get(index);
        index++;
        task.doAction(content, iBaseTask);
    }
}

测试结果

public static void main(String[] args) {
        TaskManager taskManager = new TaskManager();
        taskManager.addTask(new Manager1());
        taskManager.addTask(new Manager2());
        taskManager.addTask(new Manager3());
        taskManager.doAction("请假", taskManager);
    }

执行结果如下

Manager1 同意请假 并转交给下一个Manager审批...
Manager2 同意请假 并转交给下一个Manager审批...
Manager3 同意请假 流程结束
Class transformation time: 0.008817664s for 99 classes or 8.817664E-5s per class

Okhttp的拦截器也是类似如上的这种责任链模式实现。

相关文章

网友评论

      本文标题:Okhttp主线流程源码学习笔记

      本文链接:https://www.haomeiwen.com/subject/ksyamktx.html