美文网首页
Android |《看完不忘系列》之okhttp

Android |《看完不忘系列》之okhttp

作者: 哈利迪ei | 来源:发表于2020-08-06 10:34 被阅读0次

    嗨,我是哈利迪~《看完不忘系列》将以从树干到细枝的思路分析一些技术框架,本文将对开源项目okhttp网络库进行介绍。

    本文约3800字,阅读大约10分钟。如个别大图模糊,可前往个人站点阅读。

    概览

    源码基于3.14.9,即java版本的最新版

    首先上职责图,各个类的名字基本可以见名知意了,就不翻译了,直接起飞~

    image

    树干

    我们先看一趟飞行的大体流程,

    image

    好了,进入代码环节,引入依赖,

    implementation 'com.squareup.okhttp3:okhttp:3.14.9'
    

    简单使用(只分析异步请求,同步请求类似),

    class OkhttpActivity extends AppCompatActivity {
        //创建机场,通常是单例
        OkHttpClient mClient = new OkHttpClient();
    
        void onCreate(Bundle savedInstanceState) {
            String url = "xxx";
            //构建者模式创建Request请求,设置url(飞去哪里)
            Request request = new Request.Builder().url(url).build();
            //知道目的地后,创建Call会话(本次航班)
            Call call = mClient.newCall(request);
            //异步请求入队(飞机进入就绪跑道)
            call.enqueue(new Callback() {
                @Override
                public void onFailure(Call call, IOException e) {
                    //本次航班失败 - -
                }
    
                @Override
                public void onResponse(Call call, Response response) throws IOException {
                    //抵达目的地!
                    //body只能取一次,Response就会关闭,所以要用临时变量接收
                    String result = response.body().string();
                    //回调在子线程,要操作UI的话需切回主线程
                    runOnUiThread(() -> {
                        mBinding.tv.setText(result);
                    });
                }
            });
        }
    }
    

    OkHttpClientRequest使用构建者模式创建即可,当然,如果OkHttpClient不需要进行配置,直接new就行。知道了起点和终点,就可以创建航班Call了,

    //OkHttpClient.java
    Call newCall(Request request) {
        return RealCall.newRealCall(this, request, false);
    }
    
    //RealCall.java
    RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
        RealCall call = new RealCall(client, originalRequest, forWebSocket);
        //Transmitter意为发射器,功能挺杂的,就先叫他机长吧
        call.transmitter = new Transmitter(client, call);
        return call;
    }
    

    可见Call的实例是RealCall,航班创建好后,进入就绪跑道,

    //RealCall.java
    void enqueue(Callback responseCallback) {
        //机长回调eventListener,实时汇报航班状态,先忽略
        transmitter.callStart();
        //用AsyncCall封装Callback,由机场调度中心dispatcher安排进入就绪跑道
        client.dispatcher().enqueue(new AsyncCall(responseCallback));
    }
    

    AsyncCall就是一个Runnable,run方法里调了execute方法,

    //AsyncCall.java
    void execute() {
        try {
            //得到Response,抵达目的地
            Response response = getResponseWithInterceptorChain();
            //成功(一般response.isSuccessful()才是真正意义上的成功)
            responseCallback.onResponse(RealCall.this, response);
        } catch (IOException e) {
            //失败
            responseCallback.onFailure(RealCall.this, e);
        } catch (Throwable t) {
            cancel();
            IOException canceledException = new IOException("canceled due to " + t);
            canceledException.addSuppressed(t);
            //失败
            responseCallback.onFailure(RealCall.this, canceledException);
            throw t;
        } finally {
            //结束航班,callsPerHost减1,runningAsyncCalls移除AsyncCall
            client.dispatcher().finished(this);
        }
    }
    

    AsyncCall里有一个原子计数器,

    //目前每个主机(域名)有多少个会话call
    volatile AtomicInteger callsPerHost = new AtomicInteger(0);
    

    Dispatcher里有两个默认max值,

    int maxRequests = 64;  //最多同时请求数为64
    int maxRequestsPerHost = 5;  //每个主机最多同时请求数为5
    

    什么意思呢?可以这么理解,机场的调度中心,限制了同时最多起飞的航班为64班;飞往同一个城市的航班,同时最多只能有5班,为什么做城市限制?跟连接池的复用有关,后面会讲。下面我们以上海为例,

    看下enqueue方法做了啥,

    //Dispatcher.java
    enqueue(AsyncCall call) {
        synchronized (this) {
            //飞机进入就绪跑道
            readyAsyncCalls.add(call);
            if (!call.get().forWebSocket) {
                //查找飞往上海的AsyncCall
                AsyncCall existingCall = findExistingCallWithHost(call.host());
                //复用上海的计数器callsPerHost,用于统计同一城市的航班
                if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
            }
        }
        //飞机进入起飞跑道
        promoteAndExecute();
    }
    

    跟进promoteAndExecute,

    //Dispatcher.java
    boolean promoteAndExecute() {
        //收集可以执行的AsyncCall
        List<AsyncCall> executableCalls = new ArrayList<>();
        boolean isRunning;
        synchronized (this) {
            for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
                AsyncCall asyncCall = i.next();
                //64个起飞跑道被占满,跳出
                if (runningAsyncCalls.size() >= maxRequests) break;
                //飞往上海的航班达到5个,留在就绪跑道就行,跳过
                if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue;
                //离开就绪跑道
                i.remove();
                //上海航班计数器+1
                asyncCall.callsPerHost().incrementAndGet();
                //把AsyncCall存起来
                executableCalls.add(asyncCall);
                //进入起飞跑道
                runningAsyncCalls.add(asyncCall);
            }
            isRunning = runningCallsCount() > 0;
        }
        //把可以执行的AsyncCall,统统起飞
        for (int i = 0, size = executableCalls.size(); i < size; i++) {
            AsyncCall asyncCall = executableCalls.get(i);
            asyncCall.executeOn(executorService());
        }
        return isRunning;
    }
    

    其中executorService()返回了一个线程池,

    //Dispatcher.java
    synchronized ExecutorService executorService() {
        if (executorService == null) {
            executorService =
                new ThreadPoolExecutor(0, Integer.MAX_VALUE, 
                                       60, TimeUnit.SECONDS,
                                       new SynchronousQueue<>(), 
                                       Util.threadFactory("OkHttp Dispatcher", false));
        }
        return executorService;
    }
    

    核心线程数为0,空闲了60秒后,所有线程会被清空;最大线程数无限制,其实还好,已经有调度中心Dispatcher会限制请求数了。

    继续跟进executeOn方法,

    //AsyncCall.java
    void executeOn(ExecutorService executorService) {
        boolean success = false;
        try {
            //线程池运行Runnable,执行run,调用前面提到的AsyncCall.execute
            executorService.execute(this);
            success = true;
        } catch (RejectedExecutionException e) {
            InterruptedIOException ioException = new InterruptedIOException("executor rejected");
            ioException.initCause(e);
            transmitter.noMoreExchanges(ioException);
            //失败回调
            responseCallback.onFailure(RealCall.this, ioException);
        } finally {
            if (!success) {
                //结束航班
                client.dispatcher().finished(this);
            }
        }
    }
    

    可见,回调都在子线程里完成,所以Activity里要切回主线程才能操作UI。至此,核心流程就结束了。

    image

    细枝

    拦截器链

    前边得到Response的地方,调了getResponseWithInterceptorChain,进去看看,

    //RealCall.java
    Response getResponseWithInterceptorChain() throws IOException {
        List<Interceptor> interceptors = new ArrayList<>();
        //添加自定义拦截器
        interceptors.addAll(client.interceptors());
        //添加默认拦截器
        interceptors.add(new RetryAndFollowUpInterceptor(client));
        interceptors.add(new BridgeInterceptor(client.cookieJar()));
        interceptors.add(new CacheInterceptor(client.internalCache()));
        interceptors.add(new ConnectInterceptor(client));
        if (!forWebSocket) {
            //添加自定义网络拦截器(在ConnectInterceptor后面,此时网络连接已准备好)
            interceptors.addAll(client.networkInterceptors());
        }
        //添加默认拦截器,共4+1=5个
        interceptors.add(new CallServerInterceptor(forWebSocket));
        //创建拦截器链
        Interceptor.Chain chain =
            new RealInterceptorChain(interceptors, transmitter, null, 0,
                                     originalRequest, this, client.connectTimeoutMillis(),
                                     client.readTimeoutMillis(), client.writeTimeoutMillis());
        //放行
        Response response = chain.proceed(originalRequest);
        return response;
    }
    

    拦截器链基于责任链模式,即不同的拦截器有不同的职责,链上的拦截器会按顺序挨个处理,在Request发出之前,Response返回之前,插入一些定制逻辑,这样可以方便的扩展需求。当然责任链模式也有不足,就是只要一个环节阻塞住了,就会拖慢整体运行(效率);同时链条越长,产生的中间对象就越多(内存)。

    image

    我们先跟proceed方法,

    //RealInterceptorChain.java
    Response proceed(Request request, Transmitter transmitter,Exchange exchange)
        throws IOException {
        //传入index + 1,可以访问下一个拦截器
        RealInterceptorChain next = 
            new RealInterceptorChain(interceptors, transmitter, exchange,
                                     index + 1, request, call, connectTimeout, 
                                     readTimeout, writeTimeout);
        Interceptor interceptor = interceptors.get(index);
        //执行第一个拦截器,同时传入next
        Response response = interceptor.intercept(next);
        //等所有拦截器处理完,就能返回Response了
        return response;
    }
    

    下面简要分析下各个拦截器的功能。

    一、RetryAndFollowUpInterceptor

    负责重试和重定向。

    //最大重试次数
    static final int MAX_FOLLOW_UPS = 20;
    
    Response intercept(Chain chain) throws IOException {
        Request request = chain.request();
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        Transmitter transmitter = realChain.transmitter();
        int followUpCount = 0;
        while (true) {
            //机长为Request准备一个连接
            //主机、端口、协议都相同时,连接可复用
            transmitter.prepareToConnect(request);
            //放行,让后面的拦截器执行
            Response response = realChain.proceed(request, transmitter, null);
            //后面的拦截器执行完了,拿到Response,解析看下是否需要重试或重定向,需要则返回新的Request
            Request followUp = followUpRequest(response, route);
            if (followUp == null) {
                //新的Request为空,直接返回response
                return response;
            }
            RequestBody followUpBody = followUp.body();
            if (followUpBody != null && followUpBody.isOneShot()) {
                //如果RequestBody有值且只许被调用一次,直接返回response
                return response;
            }
            if (++followUpCount > MAX_FOLLOW_UPS) {
                //重试次数上限,结束
                throw new ProtocolException("Too many follow-up requests: " + followUpCount);
            }
            //将新的请求赋值给request,继续循环
            request = followUp;
        }
    }
    

    其中followUpRequest方法会根据Response不同的响应码做相应的处理,就不跟了。

    二、BridgeInterceptor

    桥接,负责把应用请求转换成网络请求,把网络响应转换成应用响应,说白了就是处理一些网络需要的header,简化应用层逻辑。

    Response intercept(Chain chain) throws IOException {
        Request userRequest = chain.request();
        Request.Builder requestBuilder = userRequest.newBuilder();
        RequestBody body = userRequest.body();
        if (body != null) {
            requestBuilder.header("Content-Type", contentType.toString());
            //处理Content-Length、Transfer-Encoding
            //...
        }
        //处理Host、Connection、Accept-Encoding、Cookie、User-Agent、
        //...
        //放行,把处理好的新请求往下传递,得到Response
        Response networkResponse = chain.proceed(requestBuilder.build());
        Response.Builder responseBuilder = networkResponse.newBuilder()
            .request(userRequest);
        //处理新Response的Content-Encoding、Content-Length、Content-Type、gzip
        //返回新Response
        return responseBuilder.build();
    }
    

    这里需要注意的一点是,在服务器支持gzip压缩的前提下,客户端不设置Accept-Encoding=gzip的话,okhttp会自动帮我们开启gzip和解压数据,如果客户端自己开启了gzip,就需要自己解压服务器返回的数据了。

    三、CacheInterceptor

    负责管理缓存,使用okio读写缓存。

    InternalCache cache;
    
    Response intercept(Chain chain) throws IOException {
        //获取候选缓存
        Response cacheCandidate = cache != null
            ? cache.get(chain.request())
            : null;
        //创建缓存策略
        CacheStrategy strategy = 
            new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
        //网络请求
        Request networkRequest = strategy.networkRequest;
        //缓存Response
        Response cacheResponse = strategy.cacheResponse;
        //如果网络请求和缓存Response都为空
        if (networkRequest == null && cacheResponse == null) {
            //返回一个504的Response
            return new Response.Builder().code(504).xxx.build();
        }
        //如果不使用网络,直接返回缓存
        if (networkRequest == null) {
            return cacheResponse.newBuilder()
                .cacheResponse(stripBody(cacheResponse)).build();
        }
        //放行,往后走
        Response networkResponse = chain.proceed(networkRequest);
        if (cacheResponse != null) {
            //获取到缓存响应码304,即缓存可用
            if (networkResponse.code() == HTTP_NOT_MODIFIED) {
                Response response = cacheResponse.newBuilder().xxx.build();
                //更新缓存,返回
                cache.update(cacheResponse, response);
                return response;
            }
        }
        //获取网络Response
        Response response = networkResponse.newBuilder().xxx.build();
        //写入缓存,返回
        cache.put(response);
        return response;
    }
    

    关于缓存策略CacheStrategy会在缓存章节展开。

    四、ConnectInterceptor

    负责创建连接Connection

    Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        Request request = realChain.request();
        Transmitter transmitter = realChain.transmitter();
        boolean doExtensiveHealthChecks = !request.method().equals("GET");
        //机长创建一个交换器Exchange
        Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
        //放行,给下一个拦截器
        return realChain.proceed(request, transmitter, exchange);
    }
    

    newExchange方法会在连接池章节展开。

    五、CallServerInterceptor

    负责写请求和读响应。

    Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        Exchange exchange = realChain.exchange();
        Request request = realChain.request();
        //写请求头
        exchange.writeRequestHeaders(request);
        Response.Builder responseBuilder = null;
        //处理请求体body...
        //读取响应头
        responseBuilder = exchange.readResponseHeaders(false);
        //构建响应
        Response response = responseBuilder
            .request(request)
            .handshake(exchange.connection().handshake())
            .sentRequestAtMillis(sentRequestMillis)
            .receivedResponseAtMillis(System.currentTimeMillis())
            .build();
        //读取响应体
        response = response.newBuilder()
            .body(exchange.openResponseBody(response))
            .build();
        return response;
    }
    

    缓存

    缓存的实现是基于请求和响应的header来做的。CacheStrategy即缓存策略,CacheInterceptor拦截器会根据他拿到网络请求networkRequest、缓存响应cacheResponse,从而决定是使用网络还是缓存。

    //CacheStrategy.java
    //内部类工厂,生产CacheStrategy
    static class Factory {
        //一些字段:servedDate、lastModified、expires、etag...
        Factory(long nowMillis, Request request, Response cacheResponse) {
            this.nowMillis = nowMillis;
            this.request = request;
            this.cacheResponse = cacheResponse;
            if (cacheResponse != null) {
                //解析cacheResponse,把参数赋值给自己的成员变量
                this.sentRequestMillis = cacheResponse.sentRequestAtMillis();
                //...
                Headers headers = cacheResponse.headers();
                for (int i = 0, size = headers.size(); i < size; i++) {
                    String fieldName = headers.name(i);
                    String value = headers.value(i);
                    if ("Date".equalsIgnoreCase(fieldName)) {
                        servedDate = HttpDate.parse(value);
                        servedDateString = value;
                    } else if (xxx){
                        //...
                    }
                }
            }
        }
    
        CacheStrategy get() {
            CacheStrategy candidate = getCandidate();
            if (candidate.networkRequest != null && request.cacheControl().onlyIfCached()) {
                //返回策略,交给拦截器
                return new CacheStrategy(null, null);
            }
            return candidate;
        }
    
        CacheStrategy getCandidate() {
            //根据header字段,得到各种策略,交给拦截器...
            return new CacheStrategy(xxx);
        }
    }
    

    getCandidate里面就是根据header字段得到各种策略,然后交给拦截器处理,感兴趣的读者自行阅读啦。

    那么缓存是如何写入磁盘的呢?跟进InternalCache接口,他的实现在Cache类里,

    //Cache.java
    InternalCache internalCache = new InternalCache() {
        @Override public Response get(Request request) throws IOException {
            return Cache.this.get(request);//读取
        }
    
        @Override public CacheRequest put(Response response) throws IOException {
            return Cache.this.put(response);//写入
        }
    
        //...
    };
    
    Response get(Request request) {
        String key = key(request.url()); //键
        DiskLruCache.Snapshot snapshot; //缓存快照
        Entry entry;
        snapshot = cache.get(key); //cache是okhttp的DiskLruCache
        if (snapshot == null) {
            return null; //没缓存,直接返回
        }
        //快照得到输入流,用于创建缓存条目
        entry = new Entry(snapshot.getSource(ENTRY_METADATA));
        //得到响应
        Response response = entry.response(snapshot);
        return response;
    }
    
    CacheRequest put(Response response) {
        String requestMethod = response.request().method();
        if (!requestMethod.equals("GET")) {
            //不是get请求,不缓存
            return null;
        }
        //封装成日志条目
        Entry entry = new Entry(response);
        DiskLruCache.Editor editor = null;
        editor = cache.edit(key(response.request().url()));
        //写入缓存
        entry.writeTo(editor);
        return new CacheRequestImpl(editor);
    }
    

    okhttp的DiskLruCache,就是根据最近最少使用算法,来管理磁盘缓存,他和Glide里的DiskLruCache有几份相似,比如日志处理都一样,内部都有一个线程池来清理磁盘,不过okhttp有用到okio。感兴趣的读者可以留意下okhttp3.internal.cache.DiskLruCachecom.bumptech.glide.disklrucache.DiskLruCache

    image

    注:缓存默认是关闭的,需要自行开启:

    new OkHttpClient.Builder()
        .cache(new Cache(new File(MyApp.APP.getCacheDir(), "okhttp_cache"), //路径
                         50L * 1024L * 1024L)) //大小
        .build();
    

    连接池

    还记得Transmitter吗,前面我们叫他机长,他是应用和网络之间的桥梁,管理着连接、请求、响应和流。在拦截器章节知道:

    RetryAndFollowUpInterceptor里调了transmitter.prepareToConnect;准备一个连接

    ConnectInterceptor里调了transmitter.newExchange;创建一个交换器

    这里补充几个概念:

    Connection,实现为RealConnection:连接,抽象概念,内部维护了Socket

    ConnectionPool,持有RealConnectionPool:连接池,管理连接的复用

    Exchange:交换器(管理请求和响应、持有ExchangeCodec)

    ExchangeCodec:编解码器,用于编码请求,解码响应,实现有Http1ExchangeCodec和Http2ExchangeCodec

    HTTP 1.1:引入keep-alive机制,支持连接保活,可以多个请求复用一个连接,但请求是串行的

    HTTP 2.0:支持多路复用,一个连接的多个请求可以并行

    先看RealConnectionPool

    //RealConnectionPool.java
    //线程池,用于清理过期的连接。一个连接池最多运行一个线程
    Executor executor =
        new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,
                               new SynchronousQueue<>(), 
                               Util.threadFactory("OkHttp ConnectionPool", true));
    //每个ip地址的最大空闲连接数,为5个
    int maxIdleConnections;
    //空闲连接存活时间,为5分钟
    long keepAliveDurationNs;
    //连接队列
    Deque<RealConnection> connections = new ArrayDeque<>();
    
    //获取连接
    boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter,
                                               List<Route> routes, boolean requireMultiplexed) {
        for (RealConnection connection : connections) {
            //要求多路复用,跳过不支持多路复用的连接
            if (requireMultiplexed && !connection.isMultiplexed()) continue;
            //不合条件,跳过
            if (!connection.isEligible(address, routes)) continue;
            //给机长分配一个连接
            transmitter.acquireConnectionNoEvents(connection);
            return true;
        }
        return false;
    }
    
    //移除连接,executor运行cleanupRunnable,调用了该方法
    long cleanup(long now) {
        //查找移除的连接,或下一次移除的时间
        synchronized (this) {
            for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
                //...
                if (idleDurationNs > longestIdleDurationNs) {
                    longestIdleDurationNs = idleDurationNs;
                    longestIdleConnection = connection;
                }
            }
            if (longestIdleDurationNs >= this.keepAliveDurationNs
                || idleConnectionCount > this.maxIdleConnections) {
                //移除连接
                connections.remove(longestIdleConnection);
            }
        }
        //关闭Socket
        closeQuietly(longestIdleConnection.socket());
    }
    

    RealConnection代码有点多,知道他内部维护了Socket就行了。

    前面提到过,同一主机的同时请求数被限制成maxRequestsPerHost = 5 ,为什么这么做?同主机的请求可以共用一个连接,所以大概是为了限流?比如同时飞往上海的航班如果不限数量,会把上海机场挤爆?有知道答案的小伙伴留下评论呀~

    小结

    okhhttp具有以下优势:

    • 使用简单,拦截器链的设计方便扩展
    • 请求失败能自动重连和尝试主机的其他ip、能重定向
    • 可以自动处理gzip
    • 本地缓存可以避免重复请求
    • 同主机的请求可以共享一个Socket,socket由Connection维护,ConnectionPool管理Connection的复用,避免频繁地创建和销毁连接
    image

    尾声

    还是那句话,该系列旨在摸清技术的整体实现思路,okhhttp里还有很多精彩细节,如cookie、route、dns、tls等处理,本文没有提到,大家还是要对着源码学习呀。哈迪在看源码过程还发现了很多不懂的地方,比如各种协议和标准,这也是个补充网络知识的好机会,一起飞~

    系列文章:

    参考资料


    image

    相关文章

      网友评论

          本文标题:Android |《看完不忘系列》之okhttp

          本文链接:https://www.haomeiwen.com/subject/rtbmrktx.html