基于OkHttp的Http指标监控

作者: 山鱿鱼说 | 来源:发表于2019-09-26 16:52 被阅读0次

    Http请求过程

    image

    指标数据

    1.入队到请求结束耗时
    2.dns查询耗时
    3.socket connect耗时
    4.tls连接的耗时
    5.请求发送耗时
    6.响应传输耗时
    7.首包耗时
    8.响应解析耗时
    9.Http错误,区分业务错误和请求错误

    采集到以上指标,结合数据可视化的工具,可以对Http个阶段的耗时和错误分布有直观的感受,同时对优化业务Http请求提供数据支持。

    如何获取指标

    获取指标数据首先需要找到产生指标数据的关键代码,然后插入收集代码即可。
    如果业务中使用的框架没有源码或者不重新打包源码的情况下,如何插入代码?
    这个就需要使用到能够实现AOP的工具,在前面分享的Monitor中的提供注解和配置文件的方式,在指定函数中插入相关代码的功能。这样的实现方式也可以使监控代码和业务代码分离。

    OkHttp框架

    OkHttp是Android上最常用的Http请求框架,OkHttp的最新版本已经升级到4.0.x,实现也全部由java替换到了Kotlin,API的一些使用也会有些不同。由于4.x的设备不是默认支持TLSV1.2版本,OkHttp 3.13.x以上的版本需要在Android 5.0+(API level 21+)和Java 1.8的环境开发,不过OkHttp也为了支持4.x设备单独创立了3.12.x分支,本文中使用的OkHttp版本为3.12.3版本。

    OkHttp整体流程

    先引用个别人画流程图(原图来自

    image

    请求过程分析

    1.创建 OkHttpClient

    new OkHttpClient()中会调用new OkHttpClient.Builder()方法,Builder()会设置一些的默认值。OkHttpClient()会把OkHttpClient.Builder()产生的对象中字段复制到对应OkHttpClient对象的字段上,其中sslSocketFactory如果没有在Builder中设置,OkHttp会获取系统默认的sslSocketFactory。

    public OkHttpClient.Builder(){
      /**
       *异步请求的分发器,其中使用 不限制线程数,存活时间为60s的线程池来执行异步请求
      * 默认限制同时执行的异步请求不操过64个,每个host的同时执行的异步请求不操过5个
        *超过限制新的请求需要等待。
        * */
       dispatcher = new Dispatcher();
      //支持的协议类型 Http1.0/1.1/2,QUIC 
       protocols = DEFAULT_PROTOCOLS;
       //支持TLS和ClearText
       connectionSpecs = DEFAULT_CONNECTION_SPECS;
       //请求事件通知器,大部分指标数据都可以度过EventListener来获取
       eventListenerFactory = EventListener.factory(EventListener.NONE);
      //系统级代理服务器
       proxySelector = ProxySelector.getDefault();
       if(proxySelector == null){
         proxySelector = new NullProxySelector();
       }
       //默认不使用Cookie
       cookieJar = CookieJar.NO_COOKIES;
      //socket工厂
       socketFactory = SocketFactory.getDefault();
      //用于https主机名验证
       hostnameVerifier = OkHostnameVerifier.INSTANCE;
      //用于约束哪些证书是可信的,可以用来证书固定
       certificatePinner = CertificatePinner.DEFAULT;
      //实现HTTP协议的信息认证
       proxyAuthenticator = Authenticator.NONE;
       //实现HTTP协议的信息认证
       authenticator = Authenticator.NONE;
       /**
        *实现多路复用机制连接池,最多保持5个空闲连接
       *每个空闲连接最多保持五分钟
        * */
       connectionPool = new ConnectionPool();
      //dns解析,默认使用系统InetAddress.getAllByName(hostname)
       dns = Dns.SYSTEM;
      //支持ssl重定向
       followSslRedirects = true;
      //支持重定向
       followRedirects = true;
      //连接失败是否重试
       retryOnConnectionFailure = true;
      //请求超时时间,0为不超时
       callTimeout = 0;
      //连接超时时间
       connectTimeout = 10_000;
      //socket读超时时间
       readTimeout = 10_000;
      //socket写超时时间
       writeTimeout = 10_000;
      //websocket 心跳间隔
       pingInterval = 0;
    }
    
    //获取默认的sslSocketFactory
    X509TrustManager trustManager = Util.platformTrustManager();
    this.sslSocketFactory = newSslSocketFactory(trustManager);
    this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
    

    2.Request执行过程

    从上面的流程图可以看出,不管同步请求还是异步请求,最终都会调用到 RealCall.getResponseWithInterceptorChain(),getResponseWithInterceptorChain() 再调用RealInterceptorChain.proceed(Request request)方法发起最终请求,下面我们来分析一下这两个方法的具体代码。

     Response RealCall.getResponseWithInterceptorChain() throws IOException {
     //组装所有的Interceptors
      List<Interceptor> interceptors = new ArrayList<>();
      //业务自定的Interceptors,通过OkHttpClient.Builid.addInterceptor添加
      interceptors.addAll(client.interceptors());
     //其他功能interceptors
      interceptors.add(retryAndFollowUpInterceptor);
      interceptors.add(new BridgeInterceptor(client.cookieJar()));
      interceptors.add(new CacheInterceptor(client.internalCache()));
      interceptors.add(new ConnectInterceptor(client));
     //如果不是forWebSocket请求,添加通过OkHttpClient.Builid.addNetworkInterceptor添加的Interceptor
      if (!forWebSocket) {
        interceptors.addAll([client.networkInterceptors](http://client.networkinterceptors/)());
      }
     //真正发起网络请求的Interceptor
      interceptors.add(new CallServerInterceptor(forWebSocket));
     //创建RealInterceptorChain
      Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
          originalRequest, this, eventListener, client.connectTimeoutMillis(),
          client.readTimeoutMillis(), client.writeTimeoutMillis());
      //开始执行请求
      Response response = chain.proceed(originalRequest);
     //如果请求被取消
      if (retryAndFollowUpInterceptor.isCanceled()) {
        closeQuietly(response);
        throw new IOException("Canceled");
      }
      return response;
    }
    

    接下来再看RealInterceptorChain.proceed(Request request)代码

    public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, 
        RealConnection connection) throws IOException { 
      /*
        *index代表当前应该执行的Interceptor在Interceptor列表中的位置,如果超过 
        *Interceptor列表size,报错
       *在RealCall.getResponseWithInterceptorChain()第一次调用proceed方法时传递的index值为0
      */
      if (index >= interceptors.size()) throw new AssertionError(); 
     //执行次数+1
      calls++; 
     //httpCodec时在ConnectInterceptor创建的,会对应一个socket连接
      if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) { 
        throw new IllegalStateException("network interceptor " + interceptors.get(index - 1) 
            + " must retain the same host and port"); 
      } 
      // If we already have a stream, confirm that this is the only call to chain.proceed(). 
      if (this.httpCodec != null && calls > 1) { 
        throw new IllegalStateException("network interceptor " + interceptors.get(index - 1) 
            + " must call proceed() exactly once"); 
      } 
      //创建新的RealInterceptorChain,通过改变index的值来实现调用Interceptor列表下一个位置的Interceptor
      RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec, 
          connection, index + 1, request, call, eventListener, connectTimeout, readTimeout, 
          writeTimeout); 
      //获取当前index位置的Interceptor
      Interceptor interceptor = interceptors.get(index); 
     //执行当前位置的interceptor,同时传递新创建的RealInterceptorChain,新的RealInterceptorChain中的index值是当前Interceptor列表中下一个位置
      //interceptor.intercept中会调用新的RealInterceptorChain的proceed方法,实现向后调用
      Response response = interceptor.intercept(next); 
      // 如果当前RealInterceptorChain的httpCodec不为空,确保下一个位置的Interceptor只被调用一次,httpCodec是在ConnectInterceptor中被赋值
      if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) { 
        throw new IllegalStateException("network interceptor " + interceptor 
            + " must call proceed() exactly once"); 
      } 
      // response为空报错
      if (response == null) { 
        throw new NullPointerException("interceptor " + interceptor + " returned null"); 
      } 
     // response.body为空报错
      if (response.body() == null) { 
        throw new IllegalStateException( 
            "interceptor " + interceptor + " returned a response with no body"); 
      } 
      //返回response
      return response; 
    }
    

    从代码可以看出 Interceptor 是 OkHttp 最核心的功能类,Interceptor 把实际的网络请求、缓存、透明压缩等功能都统一了起来,每一个功能都实现为一个Interceptor,它们最终组成一个了Interceptor.Chain的责任链,其中每个 Interceptor 都可能完成 Request到 Response 转变的任务,循着责任链让每个 Interceptor 自行决定能否完成任务以及怎么完成任务,完成网络请求这件事也从 RealCall 类中剥离了出来,简化了各自的责任和逻辑,代码变得优雅。
    这些Interceptor最为关键的两个Interceptor是ConnectInterceptor和CallServerInterceptor,ConnectInterceptor的主要功能是在连接池里找到可复用连接,如果没有,就创建新的socket,进行tls握手,将socket用Okio进行包裹,创建HttpCodec。CallServerInterceptor使用HttpCodec进行相关协议的传输和解析。下面对ConnectInterceptor中findConnect过程和CallServerInterceptor请求过程做一个分析。

    3.ConnectInterceptor findConnection过程分析

    在ConnectInterceptor中,会为本次请求创建可用的RealConnection,首先会从连接池中找到能够复用的连接,如果没有就创建新的socket,然后使用RealConnection创建HttpCodec。创建RealConnection的方法调用链路为StreamAllocation.newStream()-> StreamAllocation.findHealthyConnection()->StreamAllocation.findConnection(),findConnection()是创建连接的主要代码。

    private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
        boolean foundPooledConnection = false;
        //最终找到的connection
        RealConnection result = null;
        Route selectedRoute = null;
        //需要释放的connection
        Connection releasedConnection;
        //需要关闭的socket
        Socket toClose;
        synchronized (connectionPool) {
            if (released) throw new IllegalStateException("released");
            if (codec != null) throw new IllegalStateException("codec != null");
            if (canceled) throw new IOException("Canceled");
            /**
             *如果遇到重定向到同一个地址的情况下,在RetryAndFollowUpInterceptor中会使用已经分配的StreamAllocation
             *进行重定向请求,这个时候的connection不为空,但是这个connection不一定时有效的连接。
             * **/
            releasedConnection = this.connection;
            /**
             *如果已经存在RealConnection,但是不能用来创建新的Stream
             *就设置this.connection=null,同时返回要关闭的socket
             *当前请求第一次执行时releaseIfNoNewStreams()不进行任何操作
             * */
            toClose = releaseIfNoNewStreams();
            /**
             *
             * 这时候this.connection不为空
             * 表示原来的connection可以用来创建新的Stream
             * 当前请求第一次执行时this.connection=null
             *
             * */
            if (this.connection != null) {
                // We had an already-allocated connection and it's good.
                result = this.connection;
                releasedConnection = null;
            }
            /**
             * reportedAcquired会在新建socket或者从连接池
             * 获取到有效RealConnection时赋值为true
             * 当前请求第一次执行时reportedAcquired=fasle
             * */
            if (!reportedAcquired) {
                // If the connection was never reported acquired, don't report it as released!
                releasedConnection = null;
            }
            /**
             * 如果还没找到目标RealConnection
             * 尝试从连接池中获取
             * */
            if (result == null) {
                /**
                 * 对于非http/2协议,如果已经存在不超过过RealConnection复用的最大值且协议,证书都一致
                 * 这个RealConnection可以用来复用
                 * 如果从连接池中获取RealConnection,会调用
                 * streamAllocation.acquire()设置connection为新值
                 * */
                Internal.instance.get(connectionPool, address, this, null);
                /**
                 * connection != null表示从连接池获取到合适的
                 * RealConnection,设置foundPooledConnection = true;
                 * */
                if (connection != null) {
                    foundPooledConnection = true;
                    result = connection;
                } else {
                    selectedRoute = route;
                }
            }
        }
        /**
         * close当前的需要关闭的socket
         * */
        closeQuietly(toClose);
        /**
         * 如果当前的RealConnection需要释放,调用eventListener
         * */
        if (releasedConnection != null) {
            eventListener.connectionReleased(call, releasedConnection);
        }
        /**
         * 如果从连接池获取到RealConnection,调用eventListener
         * */
        if (foundPooledConnection) {
            eventListener.connectionAcquired(call, result);
        }
        /**
         * 当前RealConnection可以继续使用或者从连接池中找到合适的RealConnection
         * 返回这个RealConnection
         * */
        if (result != null) {
            // If we found an already-allocated or pooled connection, we're done.
            return result;
        }
        // If we need a route selection, make one. This is a blocking operation.
        /**
         * routeSelector在StreamAllocation构造方法中被创建
         * 连接池重新寻找
         * 请求第一次执行时routeSelector.next()会进行域名解析工作
         * */
        boolean newRouteSelection = false;
        if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
            newRouteSelection = true;
            routeSelection = routeSelector.next();
        }
        synchronized (connectionPool) {
            if (canceled) throw new IOException("Canceled");
            if (newRouteSelection) {
                // Now that we have a set of IP addresses, make another attempt at getting a connection from
                // the pool. This could match due to connection coalescing.
                List<Route> routes = routeSelection.getAll();
                for (int i = 0, size = routes.size(); i < size; i++) {
                    Route route = routes.get(i);
                    Internal.instance.get(connectionPool, address, this, route);
                    if (connection != null) {
                        foundPooledConnection = true;
                        result = connection;
                        this.route = route;
                        break;
                    }
                }
            }
            if (!foundPooledConnection) {
                if (selectedRoute == null) {
                    selectedRoute = routeSelection.next();
                }
                // Create a connection and assign it to this allocation immediately. This makes it possible
                // for an asynchronous cancel() to interrupt the handshake we're about to do.
                route = selectedRoute;
                refusedStreamCount = 0;
                result = new RealConnection(connectionPool, selectedRoute);
                acquire(result, false);
            }
        }
        // If we found a pooled connection on the 2nd time around, we're done.
        /**
         * 如果在连接池重新找到合适的RealConnection,返回
         * */
        if (foundPooledConnection) {
            eventListener.connectionAcquired(call, result);
            return result;
        }
        /**
         * 如果还没有找到,就需要创建新的RealConnect
         * 生成新的socket,建立Tls,并加入ConnectPool
         * */
        // Do TCP + TLS handshakes. This is a blocking operation.
        result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
                connectionRetryEnabled, call, eventListener);
        routeDatabase().connected(result.route());
        Socket socket = null;
        synchronized (connectionPool) {
            reportedAcquired = true;
            // Pool the connection.
            Internal.instance.put(connectionPool, result);
            // If another multiplexed connection to the same address was created concurrently, then
            // release this connection and acquire that one.
            if (result.isMultiplexed()) {
                socket = Internal.instance.deduplicate(connectionPool, address, this);
                result = connection;
            }
        }
        closeQuietly(socket);
        eventListener.connectionAcquired(call, result);
        return result;
    }
    

    4.CallServerInterceptor程分析

    public Response intercept(Chain chain) throws IOException {
      RealInterceptorChain realChain = (RealInterceptorChain) chain;
     //Http解析器,在ConncetInterceptor中创建
      HttpCodec httpCodec = realChain.httpStream();
      StreamAllocation streamAllocation = realChain.streamAllocation();
      /**
       *请求的使用的连接,在ConncetInterceptor中产生
       *连接可能是从ConnectPool中选择或者重新创建出来
      **/
      RealConnection connection = (RealConnection) realChain.connection();
      Request request = realChain.request();
      long sentRequestMillis = System.currentTimeMillis();
      realChain.eventListener().requestHeadersStart(realChain.call());
      /**
       * 通过httpCodec中用Okio包裹的socket写请求头
      **/
      httpCodec.writeRequestHeaders(request);
      realChain.eventListener().requestHeadersEnd(realChain.call(), request);
      Response.Builder responseBuilder = null;
      /**
       * 如果请求有请求body,发送body
       **/
      if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
     request body.
     /**
       * 请求头是Expect: 100-continue,先读响应头
     *    httpCodec.readResponseHeaders方法读取到状态码100时, 会返回null
    **/
        if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
          httpCodec.flushRequest();
          realChain.eventListener().responseHeadersStart(realChain.call());
          responseBuilder = httpCodec.readResponseHeaders(true);
        }
     /**
       * 如果正常发送请求body部分
       *  请求头有Expect: 100-continue,但是服务器没有返回状态码100,且不是Http/2协议,关闭当前连接
    **/
        if (responseBuilder == null) {
          realChain.eventListener().requestBodyStart(realChain.call());
          long contentLength = request.body().contentLength();
          CountingSink requestBodyOut =
              new CountingSink(httpCodec.createRequestBody(request, contentLength));
          BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
          request.body().writeTo(bufferedRequestBody);
          bufferedRequestBody.close();
          realChain.eventListener()
              .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
        } else if (!connection.isMultiplexed()) {
          streamAllocation.noNewStreams();
        }
      }
      httpCodec.finishRequest();
     /**
       * 正常情况下,读响应headers
    **/
      if (responseBuilder == null) {
        realChain.eventListener().responseHeadersStart(realChain.call());
        responseBuilder = httpCodec.readResponseHeaders(false);
      }
      Response response = responseBuilder
          .request(request)
          .handshake(streamAllocation.connection().handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
      int code = response.code();
      if (code == 100) {
        responseBuilder = httpCodec.readResponseHeaders(false);
        response = responseBuilder
                .request(request)
                .handshake(streamAllocation.connection().handshake())
                .sentRequestAtMillis(sentRequestMillis)
                .receivedResponseAtMillis(System.currentTimeMillis())
                .build();
        code = response.code();
      }
     /**
       * 读响应headers结束
    **/
      realChain.eventListener()
              .responseHeadersEnd(realChain.call(), response);
    /**
       * 如果是forWebSocket,且 code == 101返回空响应
    *   其他返回 RealResponseBody对象
    **/
      if (forWebSocket && code == 101) {
        // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
        response = response.newBuilder()
            .body(Util.EMPTY_RESPONSE)
            .build();
      } else {
       /**
      *将
      *无响应内容
      * chunk内容,
      * 存在contenlength内容
      * 不存在contenlength内容
      *  的响应body包裹成 RealResponseBody对象
     *
    **/
        response = response.newBuilder()
            .body(httpCodec.openResponseBody(response))
            .build();
      }
      /**
      * 服务端关闭要求连接
    **/
      if ("close".equalsIgnoreCase(response.request().header("Connection"))
          || "close".equalsIgnoreCase(response.header("Connection"))) {
        streamAllocation.noNewStreams();
      }
       /**
        * code是204/205,contentLength()还大于0,抛出协议错误
       **/
      if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
        throw new ProtocolException(
            "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
      }
      return response;
    }
    

    CallServerInterceptor执行完成后返回的是一个只读取了响应headers,但是还没有读取body的Response,OkHttp网络请求部分的代码到此就结束了,后续的parseReponse都在更上层的框架中,比如Retrofit就是在OkHttpCall.parseReponse()方法中调用serviceMethod.toResponse(catchingBody)中调用GsonConvter或者其他Convertor来进行处理。

    获取指标具体实现

    对于Http 请求耗时,异常,数据大小,状态码 的获取,直接使用前面实现的MAOP,拦截OkHttpClient.Builder的build方法加入统计Interceptor ,DNSLookUp 耗时,连接耗时,ssl耗时,通过设置EventListener.Factory,可以直接收集。解析耗时需要拦截上层框架的parseReponse方法进行收集。
    首包时间需要拦截OkHttp读请求数据的方法来实现,OKHttpClient 最终调用CallServerInterceptor,关键代码就是读取readResponseHeaders的时机。


    image

    MAOP 实现

    使用前面提供的MAOP功能,在AOP配置文件中加入,拦截OkHttpClient的builder方法和Http1Codec的readHeaderLine方法和okhttp3.internal.http2.Http2Stream的takeResponseHeaders方法的配置

    image image

    在拦截OkHttpClient的Builder的build()方法中加入统计Interceptor和EventListenerFactory

    image

    首包的时间通过:认为第一次读响应头返回时为首包时间,拦截okhttp3.internal.http1.Http1Code.readHeaderLine的方法和okhttp3.internal.http2.Http2Stream.takeResponseHeaders计算首包时间


    image

    Retrofit parse耗时收集

    AOP配置文件中加入对retrofit2.OKHttp.parseResponse拦截的配置


    20_32_45__09_01_2019.jpg

    Method回掉中处理相关的数据


    20_35_17__09_01_2019.jpg

    综上,这个方案基本能实现Http基本指标的获取,但是有些细节还需完善,可以微信关注知识星球共同交流


    相关文章

      网友评论

        本文标题:基于OkHttp的Http指标监控

        本文链接:https://www.haomeiwen.com/subject/zitxyctx.html