美文网首页
Android OkHttp3流程分析(1)

Android OkHttp3流程分析(1)

作者: Bfmall | 来源:发表于2023-04-11 10:51 被阅读0次

    一、基本概念

    首先从使用出发,其次再结合源码来分析OkHttp3的内部实现的,建议大家下载 OkHttp 源码跟着本文,过一遍源码。首先来看一下OkHttp3的请求代码。

    同步请求方式代码:
        //同步请求
        private void testSyncRequest() {
            OkHttpClient okHttpClient = new OkHttpClient();
    
            //指定url,get()代表get请求方式
            Request request = new Request.Builder()
                    .url("https://www.baidu.com/img/bd_logo1.png").get().build();
            Call call = okHttpClient.newCall(request);
            new Thread() {
                @Override
                public void run() {
                    try {
                        //调用execute,同步请求(必须放在子线程中,否则报错android.os.NetworkOnMainThreadException)
                        Response response = call.execute();
                        String result = response.body().toString();
                        Log.i(TAG, "testSyncRequest==>result="+result);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }.start();
    
        }
    
    异步请求方式代码:
        //异步请求
        private void testAsyncRequest() {
            OkHttpClient okHttpClient = new OkHttpClient();
    
            //指定url,get()代表get请求方式
            Request request = new Request.Builder()
                    .url("https://www.baidu.com/img/bd_logo1.png").get().build();
            Call call = okHttpClient.newCall(request);
            //调用enqueue异步请求方式
            call.enqueue(new Callback() {
                @Override
                public void onFailure(Call call, IOException e) {
                    Log.i(TAG, "testAsyncRequest==>onFailure...err="+e.getMessage());
                }
    
                @Override
                public void onResponse(Call call, Response response) throws IOException {
                    Log.i(TAG, "testAsyncRequest==>result="+response.body().toString());
                }
            });
        }
    

    二、OkHttp3的执行流程

    1.创建OkHttpClient对象。OkHttpClient为网络请求执行的一个中心,它会管理连接池,缓存,SocketFactory,代理,各种超时时间,DNS,请求执行结果的分发等许多内容。
    2.创建Request对象。Request用于描述一个HTTP请求,比如请求的方法是GET还是POST,请求的URL,请求的header,请求的body,请求的缓存策略等。
    3.创建Call对象。Call是一次HTTP请求的Task,它会执行网络请求以获得响应。OkHttp中的网络请求执行Call既可以同步进行,也可以异步进行。调用call.execute()将直接执行网络请求,阻塞直到获得响应。而调用call.enqueue()传入回调,则会将Call放入一个异步执行队列,由ExecutorService在后台执行。
    4.执行网络请求并获取响应。

    上面的代码中涉及到几个常用的类:Request、Response和Call。下面就这几个类做详细的介绍。

    Request

    每一个HTTP请求包含一个URL、一个方法(GET或POST或其他)、一些HTTP头,请求还可能包含一个特定内容类型的数据类的主体部分。

    Response

    响应是对请求的回复,包含状态码、HTTP头和主体部分。

    Call

    OkHttp使用Call抽象出一个满足请求的模型,尽管中间可能会有多个请求或响应。执行Call有两种方式,同步或异步。

    那么首先来看一下OkHttpClient的源码实现:

    public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory {
    
      public OkHttpClient() {
        this(new Builder());
      }
    
      OkHttpClient(Builder builder) {
         //分发器,后面会提到
        this.dispatcher = builder.dispatcher;
        this.proxy = builder.proxy;
        this.protocols = builder.protocols;
        this.connectionSpecs = builder.connectionSpecs;
        this.interceptors = Util.immutableList(builder.interceptors);
        this.networkInterceptors = Util.immutableList(builder.networkInterceptors);
        this.eventListenerFactory = builder.eventListenerFactory;
        this.proxySelector = builder.proxySelector;
        this.cookieJar = builder.cookieJar;
        this.cache = builder.cache;
        this.internalCache = builder.internalCache;
        this.socketFactory = builder.socketFactory;
    
        boolean isTLS = false;
        for (ConnectionSpec spec : connectionSpecs) {
          isTLS = isTLS || spec.isTls();
        }
    
        if (builder.sslSocketFactory != null || !isTLS) {
          this.sslSocketFactory = builder.sslSocketFactory;
          this.certificateChainCleaner = builder.certificateChainCleaner;
        } else {
          X509TrustManager trustManager = Util.platformTrustManager();
          this.sslSocketFactory = newSslSocketFactory(trustManager);
          this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
        }
    
        if (sslSocketFactory != null) {
          Platform.get().configureSslSocketFactory(sslSocketFactory);
        }
    
        this.hostnameVerifier = builder.hostnameVerifier;
        this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
            certificateChainCleaner);
        this.proxyAuthenticator = builder.proxyAuthenticator;
        this.authenticator = builder.authenticator;
        this.connectionPool = builder.connectionPool;
        this.dns = builder.dns;
        this.followSslRedirects = builder.followSslRedirects;
        this.followRedirects = builder.followRedirects;
        this.retryOnConnectionFailure = builder.retryOnConnectionFailure;
        this.callTimeout = builder.callTimeout;
        this.connectTimeout = builder.connectTimeout;
        this.readTimeout = builder.readTimeout;
        this.writeTimeout = builder.writeTimeout;
        this.pingInterval = builder.pingInterval;
    
        if (interceptors.contains(null)) {
          throw new IllegalStateException("Null interceptor: " + interceptors);
        }
        if (networkInterceptors.contains(null)) {
          throw new IllegalStateException("Null network interceptor: " + networkInterceptors);
        }
      }
    
    
    }
    

    然后使用okHttpClient发起异步请求。例如:

    okHttpClient.newCall(request).enqueue(new Callback() {
        @Override
        public void onFailure(Call call, IOException e) {
    
        }
    
        @Override
        public void onResponse(Call call, Response response) throws IOException {
    
        }
    });
    

    那接下来我们在看下Request。例如:

    Request request = new Request.Builder().url("url").build();
    

    该段代码主要实现初始化建造者模式和请求对象,并且用URL替换Web套接字URL。其源码如下:

    
    public final class Request {
        public static class Builder {
            public Builder() {
              this.method = "GET";
              this.headers = new Headers.Builder();
            }
    
            Builder(Request request) {
              this.url = request.url;
              this.method = request.method;
              this.body = request.body;
              this.tags = request.tags.isEmpty()
                  ? Collections.<Class<?>, Object>emptyMap()
                  : new LinkedHashMap<>(request.tags);
              this.headers = request.headers.newBuilder();
            }
    
            public Builder url(String url) {
              if (url == null) throw new NullPointerException("url == null");
    
              // Silently replace web socket URLs with HTTP URLs.
              if (url.regionMatches(true, 0, "ws:", 0, 3)) {
                url = "http:" + url.substring(3);
              } else if (url.regionMatches(true, 0, "wss:", 0, 4)) {
                url = "https:" + url.substring(4);
              }
    
              return url(HttpUrl.get(url));
            }
    
            public Builder url(HttpUrl url) {
              if (url == null) throw new NullPointerException("url == null");
              this.url = url;
              return this;
            }
    
            ......
    
    
            public Request build() {
              if (url == null) throw new IllegalStateException("url == null");
              return new Request(this);
            }
        }
    }
    

    okHttpClient调用newCall()方法分析:

    //OkHttpClient.java
    @Override
    public Call newCall(Request request) {
        return RealCall.newRealCall(this, request, false /* for web socket */);
    }
    

    看下RealCall源码:

    //RealCall.java
    //RealCall是接口Call的实现类
    final class RealCall implements Call {
    
        static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
            // Safely publish the Call instance to the EventListener.
                //创建RealCall的实例
            RealCall call = new RealCall(client, originalRequest, forWebSocket);
            call.eventListener = client.eventListenerFactory().create(call);
            return call;
        }
    
            //异步请求方法
        @Override
        public void enqueue(Callback responseCallback) {
            synchronized (this) {
                  //executed 用来标记请求是否已经执行了, 如果已经执行的过程中,再次请求抛异常
              if (executed) throw new IllegalStateException("Already Executed");
              executed = true;
            }
            captureCallStackTrace();
            eventListener.callStart(this);
                //最后调用分发器dispatcher的enqueue方法,参数是异步AsyncCall对象
            client.dispatcher().enqueue(new AsyncCall(responseCallback));
        }
    
            @Override
            public RealCall clone() {
               return RealCall.newRealCall(client, originalRequest, forWebSocket);
            }
    }
    

    由上面的代码可以得出:
    1.检查这个 call 是否已经被执行了,每个 call 只能被执行一次,如果想要一个完全一样的 call,可以利用 call#clone方法进行克隆。
    2.利用 client.dispatcher().enqueue(this) 来进行实际执行,dispatcher 是刚才看到的OkHttpClient.Builder 的成员之一。

    AsyncCall是RealCall的一个内部类并且继承NamedRunnable。

    final class AsyncCall extends NamedRunnable {
        private final Callback responseCallback;
    
        AsyncCall(Callback responseCallback) {
          super("OkHttp %s", redactedUrl());
          this.responseCallback = responseCallback;
        }
    
        ......
    
        //实现execute方法
        @Override protected void execute() {
          boolean signalledCallback = false;
          timeout.enter();
          try {
            //调用getResponseWithInterceptorChain方法获取相应
            Response response = getResponseWithInterceptorChain();
            if (retryAndFollowUpInterceptor.isCanceled()) {
              signalledCallback = true;
              //调用失败
              responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
            } else {
              signalledCallback = true;
              //调用成功
              responseCallback.onResponse(RealCall.this, response);
            }
          } catch (IOException e) {
            e = timeoutExit(e);
            if (signalledCallback) {
              // Do not signal the callback twice!
              Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
            } else {
              eventListener.callFailed(RealCall.this, e);
              responseCallback.onFailure(RealCall.this, e);
            }
          } finally {
            //请求完成
            client.dispatcher().finished(this);
          }
        }
    }
    

    而NamedRunnable又实现了Runnable接口,来看代码:

    public abstract class NamedRunnable implements Runnable {
      protected final String name;
    
      public NamedRunnable(String format, Object... args) {
        this.name = Util.format(format, args);
      }
    
      @Override public final void run() {
        String oldName = Thread.currentThread().getName();
        Thread.currentThread().setName(name);
        try {
          execute();
        } finally {
          Thread.currentThread().setName(oldName);
        }
      }
    
      protected abstract void execute();
    }
    

    可以看到NamedRunnable实现了Runnbale接口并且是个抽象类,其抽象方法是execute(),该方法是在run方法中被调用的,这也就意味着NamedRunnable是一个任务,并且其子类应该实现execute方法。上面已经看到了AsyncCall的实现方法execute(),首先是调用getResponseWithInterceptorChain()方法获取响应,然后获取成功后,就调用回调的onReponse方法,如果失败,就调用回调的onFailure方法,并调用Dispatcher的finished方法。

    Dispatcher线程池介绍

    那还看一下Dispatcher类的相关代码:

    public final class Dispatcher {
      /** 最大并发请求数为64 */
      private int maxRequests = 64;
      /** 每个主机最大请求数为5 */
      private int maxRequestsPerHost = 5;
      private @Nullable Runnable idleCallback;
    
      /** 线程池 */
      /** Executes calls. Created lazily. */
      private @Nullable ExecutorService executorService;
    
      /** 准备执行的请求队列 */
      /** Ready async calls in the order they'll be run. */
      private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
    
     /** 正在执行的异步请求队列,包含已经取消但未执行完的请求 */
      /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
      private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
    
      /** 正在执行的同步请求队列,包含已经取消单未执行完的请求 */
      /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
      private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
    
      public Dispatcher(ExecutorService executorService) {
        this.executorService = executorService;
      }
    
      public Dispatcher() {
      }
    
      public synchronized ExecutorService executorService() {
        //构造线程池
        if (executorService == null) {
          executorService = new ThreadPoolExecutor(
              0, //corePoolSize:最小并发线程数,如果是0的话,空闲一段时间后所有线程将全部被销毁
              Integer.MAX_VALUE,//maximumPoolSize:最大线程数,当任务进来时可以扩充的线程最大值,当大于了这个值就会根据丢弃处理机制来处理
              60, //当线程数大于corePoolSize时,多余的空闲线程的最大存活时间
              TimeUnit.SECONDS,//单位秒
              new SynchronousQueue<Runnable>(), //工作队列,先进先出
              Util.threadFactory("OkHttp Dispatcher", false));//单个线程的工厂
        }
        return executorService;
      }
    
    }
    

    可以看出,在Okhttp中,构建了一个核心为[0, Integer.MAX_VALUE]的线程池,它不保留任何最小线程数,随时创建更多的线程数,当线程空闲时只能活60秒,它使用了一个不存储元素的阻塞工作队列,一个叫做”OkHttp Dispatcher”的线程工厂。也就是说,在实际运行中,当收到10个并发请求时,线程池会创建十个线程,当工作完成后,线程池会在60s后相继关闭所有线程。

    关于参数SynchronousQueue:
    我们知道SynchronousQueue这个队列是压根没有容器的,然后就找空闲线程啊,一找发现,没有空闲线程,那就重开一个线程来跑这个runnable。
    这么写有一个好处,就是假如我们的某个runnable1是一个需要很久才能返回结果,说极端点就是直接一个死循环,永远执行不完,这时候呢下一个runnable2进来了,发现空闲线程没有,核心线程数为0,队列也是一个没容器的玩意儿,那怎么办?就直接new Thread出一个线程自己跑自己的。我们反过来说,如果说我们的队列是有容器的,假如队列的容量为1,runnable1一开始在队列中,然后被线程执行,换句话说,现在队列为空,但是线程已经在跑runnable1的死循环了,你再来一个runnable2也是只能塞到队列中,默默地看runnable1在线程中疯狂死循环,永远也等不到线程空闲的时候。唯一能拯救runnable1的只有再来一个runnable3,当runnable3进入线程池后,发现runnable1在疯狂死循环,runnable2就像小鸟一样被强制囚禁在笼子里,观看runnable1的疯狂死循环,runnable3既进不了笼子,也没有核心线程给他跑,于是线程池就给它(runnable3)新建了一个线程3,等runnable3跑完了以后,线程3就处于空闲,那线程池能让这个线程3闲着吗?当然不行,那线程池就去找啊,找啊,发现runnable2还在队列中没有被执行,于是就吧runnable2交给了线程3来执行,这个时候runnable2才终于被执行了。

    接着看下执行方法enqueue():

    void enqueue(AsyncCall call) {
        synchronized (this) {
          readyAsyncCalls.add(call);
        }
        promoteAndExecute();
      }
    
      private boolean promoteAndExecute() {
        assert (!Thread.holdsLock(this));
    
        List<AsyncCall> executableCalls = new ArrayList<>();
        boolean isRunning;
        synchronized (this) {
          for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
            AsyncCall asyncCall = i.next();
            //如果正在执行的队列已满,跳出循环
            if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
            //如果同一主机请求数到达最大值,遍历下一个
            if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.
    
            i.remove();
            //加入执行队列
            executableCalls.add(asyncCall);
            //加入正在执行的队列中
            runningAsyncCalls.add(asyncCall);
          }
          isRunning = runningCallsCount() > 0;
        }
        //遍历,执行
        for (int i = 0, size = executableCalls.size(); i < size; i++) {
          AsyncCall asyncCall = executableCalls.get(i);
          //调用executeOn方法
          asyncCall.executeOn(executorService());
        }
    
        return isRunning;
      }
    

    看下AsyncCall的executeOn方法:

    void executeOn(ExecutorService executorService) {
          assert (!Thread.holdsLock(client.dispatcher()));
          boolean success = false;
          try {
            //调用线程池执行,实际上就是执行NamedRunnable的run方法
            executorService.execute(this);
            success = true;
          } catch (RejectedExecutionException e) {
            InterruptedIOException ioException = new InterruptedIOException("executor rejected");
            ioException.initCause(e);
            eventListener.callFailed(RealCall.this, ioException);
            responseCallback.onFailure(RealCall.this, ioException);
          } finally {
            if (!success) {
              client.dispatcher().finished(this); // This call is no longer running!
            }
          }
        }
    

    上面已经描述AsyncCall的继承关系,就是调用NamedRunnable的run方法

    ->class AsyncCall extends NamedRunnable
    ->class NamedRunnable implements Runnable
    

    看下代码:

    //抽象类NamedRunnable 
    public abstract class NamedRunnable implements Runnable {
      
      @Override public final void run() {
        String oldName = Thread.currentThread().getName();
        Thread.currentThread().setName(name);
        try {
          //调用抽象方法execute
          execute();
        } finally {
          Thread.currentThread().setName(oldName);
        }
      }
    
      protected abstract void execute();
    }
    
    //子类AsyncCall 
    final class AsyncCall extends NamedRunnable {
      //调用execute
      @Override protected void execute() {
          boolean signalledCallback = false;
          timeout.enter();
          try {
            //此方法是各种拦截器
            Response response = getResponseWithInterceptorChain();
            if (retryAndFollowUpInterceptor.isCanceled()) {
              signalledCallback = true;
              responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
            } else {
              signalledCallback = true;
              responseCallback.onResponse(RealCall.this, response);
            }
          } catch (IOException e) {
            e = timeoutExit(e);
            if (signalledCallback) {
              // Do not signal the callback twice!
              Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
            } else {
              eventListener.callFailed(RealCall.this, e);
              responseCallback.onFailure(RealCall.this, e);
            }
          } finally {
            client.dispatcher().finished(this);
          }
        }
    }
    

    从上述源码分析,如果当前还能执行一个并发请求,则加入 runningAsyncCalls ,立即执行,否则加入 readyAsyncCalls 队列。由此,可以得出Dispatcher的以下作用:
    1.调度线程池Disptcher实现了高并发,低阻塞的实现;
    2.采用Deque作为缓存,先进先出的顺序执行;
    3.任务在try/finally中调用了finished函数,控制任务队列的执行顺序,而不是采用锁,减少了编码复杂性提高性能。

    getResponseWithInterceptorChain方法是okhttp3的各种拦截器,后面文章分析。


    参考:
    https://blog.51cto.com/u_13657808/5658285

    相关文章

      网友评论

          本文标题:Android OkHttp3流程分析(1)

          本文链接:https://www.haomeiwen.com/subject/aolwddtx.html