美文网首页
Okhttp主流程源码浅析(2)

Okhttp主流程源码浅析(2)

作者: wenou | 来源:发表于2018-07-04 18:08 被阅读24次

    上一篇Okhttp主流程源码浅析(1)分析到任务调度方面,接着把剩下的主流程分析.

    当一个任务被执行起来,会调用getResponseWithInterceptorChain()来获取到响应结果 response

    //TODO: 责任链模式,拦截器链  执行请求
    //TODO: 拿到回调结果
    Response response = getResponseWithInterceptorChain();
    

    getResponseWithInterceptorChain()源码:

     Response getResponseWithInterceptorChain() throws IOException {
            // Build a full stack of interceptors.
            //TODO 责任链 
            List<Interceptor> interceptors = new ArrayList<>();
            interceptors.addAll(client.interceptors());
            //TODO 处理重试与重定向
            interceptors.add(retryAndFollowUpInterceptor);
            //TODO 处理 配置请求头等信息
            interceptors.add(new BridgeInterceptor(client.cookieJar()));
            //TODO 处理 缓存配置 根据条件(存在响应缓存并被设置为不变的或者响应在有效期内)返回缓存响应
            //TODO 设置请求头
            interceptors.add(new CacheInterceptor(client.internalCache()));
            //TODO 连接服务器
            interceptors.add(new ConnectInterceptor(client));
            if (!forWebSocket) {
                interceptors.addAll(client.networkInterceptors());
            }
            //TODO 执行流操作(写出请求体、获得响应数据)
            //TODO 进行http请求报文的封装与请求报文的解析
            interceptors.add(new CallServerInterceptor(forWebSocket));
    
            Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
                    originalRequest, this, eventListener, client.connectTimeoutMillis(),
                    client.readTimeoutMillis(), client.writeTimeoutMillis());
    
            return chain.proceed(originalRequest);
        }
    

    创建List<Interceptor> interceptors = new ArrayList<>()集合来保存拦截器,平时我们自己给OkHttpClient添加的拦截器也是在这里处理.

    例如添加log 拦截器:

    //添加log拦截器
    HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
    interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
    OkHttpClient.Builder builder = new OkHttpClient.Builder()
                  .connectTimeout(10, TimeUnit.SECONDS)
                  .readTimeout(15, TimeUnit.SECONDS)
                  .writeTimeout(15, TimeUnit.SECONDS)
                  .retryOnConnectionFailure(true)
                  .addInterceptor(interceptor); 
    

    这些拦截器保存在OkHttpClient实例里面,到这里会调用interceptors.addAll(client.interceptors())把我们的拦截器先添加到集合里面,然后再添加一些默认的拦截器来处理请求与响应

    责任链的拦截器集合内容如下:

    1.自定义拦截器
    2.retryAndFollowUpInterceptor: 重试与重定向拦截器
    3.BridgeInterceptor: 处理配置请求头等信息
    4.CacheInterceptor: 处理缓存信息
    5.ConnectInterceptor: 连接服务器
    6.CallServerInterceptor: http报文封装与解析,与服务器执行流操作

    责任链调用顺序图:

    责任链调用顺序图:

    调用顺序:
    当发起一个请求,首先会进入到自定义拦截器里面,然后再依次进入okhttp默认的各个拦截器里面,最终连接到服务器,进行流的读写, 拿到服务器返回的响应结果,再反过来,依次的回调到调用者.

    在上面的getResponseWithInterceptorChain()方法里面,把拦截器添加完成之后,会执行如下代码:

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
                    originalRequest, this, eventListener, client.connectTimeoutMillis(),
                    client.readTimeoutMillis(), client.writeTimeoutMillis());
    
            return chain.proceed(originalRequest);
    

    创建RealInterceptorChain realInterceptorChain = new RealInterceptorChain,然后开始执行责任链,调用chain.proceed(originalRequest)把请求传入进去,看下RealInterceptorChain的proceed方法

     @Override public Response proceed(Request request) throws IOException {
        return proceed(request, streamAllocation, httpCodec, connection);
      }
    public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
          RealConnection connection) throws IOException {
        //省略代码 
        ...  
        //TODO: 创建新的拦截链,链中的拦截器集合index+1
        RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
            connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
            writeTimeout);
        //TODO: 执行当前的拦截器 默认是:retryAndFollowUpInterceptor
        Interceptor interceptor = interceptors.get(index);
        Response response = interceptor.intercept(next);
        //省略代码 
        ...  
        return response;
      }
    

    删除了部分判断的代码,这里看重点,在proceed()里面看到,会再创建一个新的对象RealInterceptorChain next = new RealInterceptorChain(...)并且把拦截器集合通过构造方法传入,还有一些其他信息.

    注意: 这里的会把 index + 1再传入进去,下面会用到

    然后再从拦截器集合里面拿到一个拦截器interceptors.get(index),并且去执行这个拦截器的方法拿到响应结果Response response = interceptor.intercept(next);

    回到上面看下调用顺序图,如果我们有给okhttpClient加入自定义拦截器,这里就先调用自定义拦截器,否则就开始执行第一个默认的拦截器retryAndFollowUpInterceptor

    看下retryAndFollowUpInterceptor.intercept(next)方法

     @Override
        public Response intercept(Chain chain) throws IOException {
            Request request = chain.request();
            RealInterceptorChain realChain = (RealInterceptorChain) chain;
            Call call = realChain.call();
            EventListener eventListener = realChain.eventListener();
            //TODO 核心 协调连接、请求/响应以及复用
            StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
                    createAddress(request.url()), call, eventListener, callStackTrace);
            this.streamAllocation = streamAllocation;
    
            int followUpCount = 0;
            Response priorResponse = null;
            while (true) {
                if (canceled) {
                    streamAllocation.release();
                    throw new IOException("Canceled");
                }
    
                Response response;
                boolean releaseConnection = true;
                try {
                    //TODO 执行责任链 实际上就是下一个拦截器
                    response = realChain.proceed(request, streamAllocation, null, null);
                    releaseConnection = false;
                } catch (RouteException e) {
                } catch (IOException e) {
                } finally {
                }
                ...删除部分代码
                return response;
          }
      }
    

    这里会StreamAllocation streamAllocation = new StreamAllocation(...)创建一个StreamAllocation对象,并且把连接池,和请求地址信息,call等等通过构造传入,这里只关心主流程,StreamAllocation的作用不说了

    然后开了个while (true) {}循环,在里面会调用response = realChain.proceed(request, streamAllocation, null, null); 继续执行责任链的下一个拦截器,这个realChain就是上面retryAndFollowUpInterceptor.intercept(next)传入的这个next,就是上面提到的那个RealInterceptorChain对象,注意这个时候的index = 1然后继续执行proceed()方法

    这里又回到了前面RealInterceptorChain.proceed()

     @Override public Response proceed(Request request) throws IOException {
        return proceed(request, streamAllocation, httpCodec, connection);
      }
    public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
          RealConnection connection) throws IOException {
        //省略代码 
        ...  
        //TODO: 创建新的拦截链,链中的拦截器集合index+1
        RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
            connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
            writeTimeout);
        //TODO: 执行当前的拦截器 默认是:retryAndFollowUpInterceptor
        Interceptor interceptor = interceptors.get(index);
        Response response = interceptor.intercept(next);
        //省略代码 
        ...  
        return response;
      }
    

    这里的代码有点绕,第一次看有点晕头转向的感觉~~

    没错,这里又回到上面的地方,不过有两点不同了
    1.第一次进来的时候index = 0,streamAllocation = null
    2.这一次index = 1,streamAllocation刚刚被 new 出来

    然后再次创建一个新的RealInterceptorChain对象,把信息传入构造.

    注意: 这个时候 index = 1, 所以传入next 里面的 index = 1+1

     RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
            connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
            writeTimeout);
    

    然后再调用Interceptor interceptor = interceptors.get(index);从拦截器里头取下一个拦截器,因为index=1,这个时候取出的是BridgeInterceptor拦截器(处理配置请求头等信息),执行Response response = interceptor.intercept(next);

    看下BridgeInterceptor拦截器的intercept()方法

    @Override
        public Response intercept(Chain chain) throws IOException {
            Request userRequest = chain.request();
            Request.Builder requestBuilder = userRequest.newBuilder();
    
            RequestBody body = userRequest.body();
            if (body != null) {
                MediaType contentType = body.contentType();
                if (contentType != null) {
                    requestBuilder.header("Content-Type", contentType.toString());
                }
    
                long contentLength = body.contentLength();
                if (contentLength != -1) {
                    requestBuilder.header("Content-Length", Long.toString(contentLength));
                    requestBuilder.removeHeader("Transfer-Encoding");
                } else {
                    requestBuilder.header("Transfer-Encoding", "chunked");
                    requestBuilder.removeHeader("Content-Length");
                }
            }
    
            if (userRequest.header("Host") == null) {
                requestBuilder.header("Host", hostHeader(userRequest.url(), false));
            }
    
            if (userRequest.header("Connection") == null) {
                requestBuilder.header("Connection", "Keep-Alive");
            }
    
            // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
            // the transfer stream.
            boolean transparentGzip = false;
            if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
                transparentGzip = true;
                requestBuilder.header("Accept-Encoding", "gzip");
            }
    
            List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
            if (!cookies.isEmpty()) {
                requestBuilder.header("Cookie", cookieHeader(cookies));
            }
    
            if (userRequest.header("User-Agent") == null) {
                requestBuilder.header("User-Agent", Version.userAgent());
            }
            //TODO 执行下一个拦截器
            Response networkResponse = chain.proceed(requestBuilder.build());
    
            HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
    
            Response.Builder responseBuilder = networkResponse.newBuilder()
                    .request(userRequest);
    
            if (transparentGzip
                    && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
                    && HttpHeaders.hasBody(networkResponse)) {
                GzipSource responseBody = new GzipSource(networkResponse.body().source());
                Headers strippedHeaders = networkResponse.headers().newBuilder()
                        .removeAll("Content-Encoding")
                        .removeAll("Content-Length")
                        .build();
                responseBuilder.headers(strippedHeaders);
                String contentType = networkResponse.header("Content-Type");
                responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
            }
    
            return responseBuilder.build();
        }
    

    代码挺多,但是逻辑也清晰,就是为我们的请求,按照http协议添加一些请求头信息,为后面使用 socket 写出数据做准备

    配置一些请求头信息 ,例如:

    1.Content-Type 请求中的媒体类型信息
    2.Content-Length 请求体body的长度
    3.Host 请求域名
    4.Connection: "Keep-Alive" 是否保持长连接
    5.Cookie 等等....

    关于http的内容,大伙可以上网去搜一下对应的文章了解一下..

    然后调用Response networkResponse = chain.proceed(requestBuilder.build());跟上面一样,还是会回到RealInterceptorChain.proceed()方法里面,再次创建一个新的RealInterceptorChain对象,并且把 index+1,然后继续执行一下个拦截的intercept(next)方法,这里不贴出相同的源码了.然后会执行到CacheInterceptor拦截器

    看一下CacheInterceptor.intercept(next)

     @Override public Response intercept(Chain chain) throws IOException {
        //TODO request对应缓存的Response
        Response cacheCandidate = cache != null
            ? cache.get(chain.request())
            : null;
        //TODO 执行响应缓存策略
        long now = System.currentTimeMillis();
        CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
        Request networkRequest = strategy.networkRequest;
        Response cacheResponse = strategy.cacheResponse;
    
        if (cache != null) {
          cache.trackResponse(strategy);
        }
        //TODO 缓存无效 关闭资源
        if (cacheCandidate != null && cacheResponse == null) {
          closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
        }
    
        // If we're forbidden from using the network and the cache is insufficient, fail.
        //TODO 如果禁止使用网络,并且没有缓存,就返回失败
        if (networkRequest == null && cacheResponse == null) {
          return new Response.Builder()
              .request(chain.request())
              .protocol(Protocol.HTTP_1_1)
              .code(504)
              .message("Unsatisfiable Request (only-if-cached)")
              .body(Util.EMPTY_RESPONSE)
              .sentRequestAtMillis(-1L)
              .receivedResponseAtMillis(System.currentTimeMillis())
              .build();
        }
    
        // If we don't need the network, we're done.
        //TODO 不使用网络请求直接返回响应
        if (networkRequest == null) {
          return cacheResponse.newBuilder()
              .cacheResponse(stripBody(cacheResponse))
              .build();
        }
        //TODO 执行下一个拦截器
        Response networkResponse = null;
        try {
          networkResponse = chain.proceed(networkRequest);
        } finally {
          // If we're crashing on I/O or otherwise, don't leak the cache body.
          if (networkResponse == null && cacheCandidate != null) {
            closeQuietly(cacheCandidate.body());
          }
        }
    
        // If we have a cache response too, then we're doing a conditional get.
        //TODO 如果存在缓存 更新
        if (cacheResponse != null) {
          //TODO 304响应码 自从上次请求后,请求需要响应的内容未发生改变
          if (networkResponse.code() == HTTP_NOT_MODIFIED) {
            Response response = cacheResponse.newBuilder()
                .headers(combine(cacheResponse.headers(), networkResponse.headers()))
                .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
                .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
                .cacheResponse(stripBody(cacheResponse))
                .networkResponse(stripBody(networkResponse))
                .build();
            networkResponse.body().close();
    
            // Update the cache after combining headers but before stripping the
            // Content-Encoding header (as performed by initContentStream()).
            cache.trackConditionalCacheHit();
            cache.update(cacheResponse, response);
            return response;
          } else {
            closeQuietly(cacheResponse.body());
          }
        }
        //TODO 缓存Response
        Response response = networkResponse.newBuilder()
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
    
        if (cache != null) {
          if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
            // Offer this request to the cache.
            CacheRequest cacheRequest = cache.put(response);
            return cacheWritingResponse(cacheRequest, response);
          }
    
          if (HttpMethod.invalidatesCache(networkRequest.method())) {
            try {
              cache.remove(networkRequest);
            } catch (IOException ignored) {
              // The cache cannot be written.
            }
          }
        }
    
        return response;
      }
    

    这个拦截器是处理缓存相关,具体细节这里不管,只分析主流程...

    不过从注释可以清晰的看出一些大概的逻辑
    1.从缓存里面取出响应结果Response cacheCandidate
    2.判断缓存是否失效,如果失效就关闭资源
    3.判断如果禁止使用网络,并且没有缓存,就返回失败
    等等....

    然后又继续调用networkResponse = chain.proceed(networkRequest);执行下一个拦截器ConnectInterceptor

    ConnectInterceptor.intercept(Chain chain)

     @Override public Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        Request request = realChain.request();
        StreamAllocation streamAllocation = realChain.streamAllocation();
    
        // We need the network to satisfy this request. Possibly for validating a conditional GET.
        boolean doExtensiveHealthChecks = !request.method().equals("GET");
        //TODO 连接服务器/复用socket
        HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
        RealConnection connection = streamAllocation.connection();
    
        //TODO 找到复用的 socket或者创建新的 socket,并且连接服务器,但是这里还没有把请求写出去
        //TODO 然后执行下一个拦截器
        return realChain.proceed(request, streamAllocation, httpCodec, connection);
      }
    

    这个拦截器负责连接服务器,会调用streamAllocation.newStream

     public HttpCodec newStream(
                OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
            int connectTimeout = chain.connectTimeoutMillis();
            int readTimeout = chain.readTimeoutMillis();
            int writeTimeout = chain.writeTimeoutMillis();
            int pingIntervalMillis = client.pingIntervalMillis();
            boolean connectionRetryEnabled = client.retryOnConnectionFailure();
    
            try {
                RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
                        writeTimeout, pingIntervalMillis, connectionRetryEnabled,
                        doExtensiveHealthChecks);
                //TODO HttpCodec 处理解析请求与响应的工具类
                HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
    
                synchronized (connectionPool) {
                    codec = resultCodec;
                    return resultCodec;
                }
            } catch (IOException e) {
                throw new RouteException(e);
            }
        }
    

    newStream()方法里面会调用findHealthyConnection()去寻找一个健康的可用的连接来进行socket连接

    有关sockter连接部分还请大伙去查看对应的资料,这里只需要知道:

    1.TPC/IP协议是传输层协议,主要规范数据如何在网络中传输
    2.Socket则是对TCP/IP协议的封装和应用(程序员层面上)
    3.Http 是应用层协议,解决如何包装数据,然后通过Socket发送到服务器
    4.这里的RealConnection类是对Socket的封装,里面持有Socket的引用
    5.这里的HttpCodec是处理解析请求与响应的工具类

    回到上面,通过调用streamAllocation.newStream(...)连接之后,拿到一个HttpCodec,然后继续执行下一个拦截器,也就是最后的拦截器CallServerInterceptor,通过socket和服务器进行I/O流读写操作

    CallServerInterceptor.intercept()

    @Override public Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        HttpCodec httpCodec = realChain.httpStream();
        StreamAllocation streamAllocation = realChain.streamAllocation();
        RealConnection connection = (RealConnection) realChain.connection();
        Request request = realChain.request();
        ...
        //TODO 把请求通过 socket 以http的协议格式,通过I/O流的形式写出去
        httpCodec.writeRequestHeaders(request);
        realChain.eventListener().requestHeadersEnd(realChain.call(), request);
        
        ...
    
        //TODO 请求写出完成
        httpCodec.finishRequest();
    
        if (responseBuilder == null) {
          realChain.eventListener().responseHeadersStart(realChain.call());
          responseBuilder = httpCodec.readResponseHeaders(false);
        }
    
        //TODO 处理响应
        Response response = responseBuilder
            .request(request)
            .handshake(streamAllocation.connection().handshake())
            .sentRequestAtMillis(sentRequestMillis)
            .receivedResponseAtMillis(System.currentTimeMillis())
            .build();
    
        ...
        return response;
      }
    

    删除了很多代码.具体实现先不去看,如果大伙想知道,可以去搜索对应方面的具体细节文章...

    这里了解到:
    1.在CallServerInterceptor这个拦截器,实现Socket与服务器进行TCP连接,然后进行I/O数据流的通信.
    2.发送请求, Socket是以http协议的格式,把请求拆分,请求行,请求头,请求体等等,一行一行写出去的
    3.接受响应, Socket也是以http协议的格式,把响应结果一行一行的读出来,然后再解析

    Socket通信例子:

    例如 : 用浏览器模拟 GET 去请求获取天气的接口,然后拿到结果


    image.png
    image.png

    怎样用Socket来实现请求呢?

    java的jdk为我们提供了具体的实现Socket,直接创建使用就行

    static void doHttp() throws Exception {
            //创建一个Socket ,传入 域名和端口,http默认是80
            Socket socket = new Socket("restapi.amap.com", 80);
            //接受数据的输入流
            final BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            //发送数据 输出流
            BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            new Thread() {
                @Override
                public void run() {
                    while (true) {
                        String line = null;
                        try {
                            while ((line = br.readLine()) != null) {
                                System.out.println("recv :" + line);
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }.start();
            bw.write("GET /v3/weather/weatherInfo?city=%E9%95%BF%E6%B2%99&key=13cb58f5884f9749287abbead9c658f2 HTTP/1.1\r\n");
            bw.write("Host: restapi.amap.com\r\n\r\n");
            bw.flush();
        }
    

    1.创建一个Socket ,传入域名和端口,http默认是80
    2.通过Socket 拿到接受数据的输入流和发送数据的输出流
    3.按http协议规范,把请求写出去bw.write("GET /v3/weathe....bw.write("Host: restapi.amap.com\r\n\r\n");
    4.接受服务器响应结果

    打印结果:

    recv :HTTP/1.1 200 OK
    recv :Server: Tengine
    recv :Date: Thu, 05 Jul 2018 09:47:14 GMT
    recv :Content-Type: application/json;charset=UTF-8
    recv :Content-Length: 253
    recv :Connection: close
    recv :X-Powered-By: ring/1.0.0
    recv :gsid: 011178122113153078403472700229732459707
    recv :sc: 0.010
    recv :Access-Control-Allow-Origin: *
    recv :Access-Control-Allow-Methods: *
    recv :Access-Control-Allow-Headers: DNT,X-CustomHeader,Keep-Alive,User-Agent,X-Requested-With,
    If-Modified-Since,Cache-Control,Content-Type,key,x-biz,x-info,platinfo,encr,enginever,gzipped,poiid
    recv :
    recv :{"status":"1","count":"1","info":"OK","infocode":"10000","lives":
    [{"province":"广东","city":"深圳市","adcode":"440300","weather":
    "云","temperature":"29","winddirection":"南","windpower":"5","humidity":"77",
    "reporttime":"2018-07-05 17:00:00"}]}
    

    响应也是一行一行读写出来

    最后一个拦截器拿到服务器响应结果,然后再依次往上面的拦截器返回response,这个流程就不多说了,到此一次通信就完成...

    小结:

    okhttp采用责任链模式,每一个拦截器负责具体的每一块的功能,降低每个功能模块的耦合度,让整体框架更加灵活,我们可以轻松的加入自己自定义的拦截器.

    okhttp维护着一个连接池,能做到相同Host的socket复用

    相关文章

      网友评论

          本文标题:Okhttp主流程源码浅析(2)

          本文链接:https://www.haomeiwen.com/subject/mnwsuftx.html