美文网首页
OkDownload源码分析

OkDownload源码分析

作者: Drc15H | 来源:发表于2018-12-14 16:52 被阅读0次

    OkDownload是一款多线程断点续传下载引擎,它的功能完整,性能高,可配置性高,可以注入自定义组件来修改下载策略、替换网络请求框架等等,而且在项目中已有成熟应用(英语流利说),是一个很不错的开源下载框架。

    项目地址:https://github.com/lingochamp/okdownload

    OkDownload的简单使用

    OkDownload的使用非常简单:
    1.引入该开源库:
    implementation 'com.liulishuo.okdownload:okhttp:1.0.5' (提供okhttp连接,ps:如果使用的话,需要引入okhttp网络请求库)
    implementation 'com.liulishuo.okdownload:okdownload:1.0.5' (下载核心库)
    implementation 'com.liulishuo.okdownload:sqlite:1.0.5' (存储断点信息的数据库)
    2.开始一个任务:

    task = new DownloadTask.Builder(url, parentFile)
             .setFilename(filename)
             // 下载进度回调的间隔时间(毫秒)
             .setMinIntervalMillisCallbackProcess(30)
             // 任务过去已完成是否要重新下载
             .setPassIfAlreadyCompleted(false)
             .build();
    //异步执行任务
    task.enqueue(listener);
    // 取消任务
    task.cancel();
    // 同步执行任务
    task.execute(listener);
    

    当然也可以同时异步执行多个任务

    DownloadTask.enqueue(tasks, listener);
    

    3.任务队列的构建、开始和停止

    DownloadContext.Builder builder = new DownloadContext.QueueSet()
            .setParentPathFile(parentFile)
            .setMinIntervalMillisCallbackProcess(150)
            .commit();
    builder.bind(url1);
    builder.bind(url2).addTag(key, value);
    builder.bind(url3).setTag(tag);
    builder.setListener(contextListener);
    
    DownloadTask task = new DownloadTask.Builder(url4, parentFile)
            .setPriority(10).build();
    builder.bindSetTask(task);
    
    DownloadContext context = builder.build();
    
    context.startOnParallel(listener);
    
    // stop
    context.stop();
    

    4.获取任务状态

    Status status = StatusUtil.getStatus(task)
    
    status = StatusUtil.getStatus(url, parentPath, null);
    status = StatusUtil.getStatus(url, parentPath, filename);
    
    boolean isCompleted = StatusUtil.isCompleted(task);
    isCompleted = StatusUtil.isCompleted(url, parentPath, null);
    isCompleted = StatusUtil.isCompleted(url, parentPath, filename);
    
    Status completedOrUnknown = StatusUtil.isCompletedOrUnknown(task);
    

    5.获取断点信息

    // 注意:任务完成后,断点信息将会被删除 
    BreakpointInfo info = OkDownload.with().breakpointStore().get(id);
    info = StatusUtil.getCurrentInfo(url, parentPath, null);
    info = StatusUtil.getCurrentInfo(url, parentPath, filename);
    // 断点信息将被缓存在任务对象中,即使任务已经完成了
    info = task.getInfo();
    

    6.设置任务监听
    可以为任务设置五种不同类型的监听器,同时,也可以给任务和监听器建立1对1、1对多、多对1、多对多的关联。



    给一个任务设置多种监听:

    DownloadListener listener1 = new DownloadListener1();
    DownloadListener listener2 = new DownloadListener2();
    
    DownloadListener combinedListener = new DownloadListenerBunch.Builder()
                       .append(listener1)
                       .append(listener2)
                       .build();
    
    DownloadTask task = new DownloadTask.build(url, file).build();
    task.enqueue(combinedListener);
    

    为多个任务动态设置监听:

    UnifiedListenerManager manager = new UnifiedListenerManager();
    DownloadListener listener1 = new DownloadListener1();
    DownloadListener listener2 = new DownloadListener2();
    DownloadListener listener3 = new DownloadListener3();
    DownloadListener listener4 = new DownloadListener4();
    
    DownloadTask task = new DownloadTask.build(url, file).build();
    manager.attachListener(task, listener1);
    manager.attachListener(task, listener2);
    manager.detachListener(task, listener2);
    
    // 当一个任务结束时,这个任务的所有监听器都被移除
    manager.addAutoRemoveListenersWhenTaskEnd(task.getId());
    
    // enqueue task to start.
    manager.enqueueTaskWithUnifiedListener(task, listener3);
    manager.attachListener(task, listener4);
    

    下面我们来分析一下这个下载框架的源码:

    OkDownload

    首先看一下OkDownload这个类,这个类定义了所有的下载策略,我们可以自定义一些下载策略,可以通过OkDownload的Builder构造自定义的一个OkDownload实例,再通过OkDownload.setSingletonInstance进行设置:

    OkDownload.Builder builder = new OkDownload.Builder(context)
            .downloadStore(downloadStore) //断点信息存储的位置,默认是SQLite数据库 
            .callbackDispatcher(callbackDispatcher) //监听回调分发器,默认在主线程回调 
            .downloadDispatcher(downloadDispatcher) //下载管理机制,最大下载任务数、同步异步执行下载任务的处理
            .connectionFactory(connectionFactory) //选择网络请求框架,默认是OkHttp 
            .outputStreamFactory(outputStreamFactory) //构建文件输出流DownloadOutputStream,是否支持随机位置写入
            .downloadStrategy(downloadStrategy) //下载策略,文件分为几个线程下载
            .processFileStrategy(processFileStrategy) //多文件写文件的方式,默认是根据每个线程写文件的不同位置,支持同时写入。 
            .monitor(monitor); //下载状态监听 
    OkDownload.setSingletonInstance(builder.build());
    

    DownloadTask

    DownloadTask下载任务类,可通过它的Builder来构造一个下载任务,我们看它是如何执行的:

    public void execute(DownloadListener listener) {
            this.listener = listener;
            OkDownload.with().downloadDispatcher().execute(this);
        }
    
    public void enqueue(DownloadListener listener) {
            this.listener = listener;
            OkDownload.with().downloadDispatcher().enqueue(this);
        }
    

    可以看到都是通过downloadDispatcher来执行下载任务的,默认的downloadDispatcher是一个DownloadDispatcher实例,我们以同步执行一个下载任务为例,看它是如何下载的:

        public void execute(DownloadTask task) {
            Util.d(TAG, "execute: " + task);
            final DownloadCall call;
    
            synchronized (this) {
                if (inspectCompleted(task)) return;
                if (inspectForConflict(task)) return;
    
                call = DownloadCall.create(task, false, store);
                runningSyncCalls.add(call);
            }
    
            syncRunCall(call);
        }
    
    void syncRunCall(DownloadCall call) {
            call.run();
        }
    

    在execute方法里将一个DownloadTask实例又封装为了一个DownloadCall对象,然后在syncRunCall方法里执行了DownloadCall对象的run方法。通过看DownloadCall源码可以知道该类继承自NamedRunnable,而NamedRunnable实现了Runnable,在run方法里调用了execute方法。(enqueue执行任务最终则是调用 getExecutorService().execute(call);来异步执行的)



    那我们看一下DownloadCall这个类。

    DownloadCall

    先看一下DownloadCall是如何实现execute方法的,该方法比较长,首先执行的是inspectTaskStart:



    先看一下这个store是什么:




    通过看OkDownload这个类的源码可以知道,DownloadCall的store是调用BreakpointStoreOnSQLite的createRemitSelf方法生成的一个实例:



    可以看到是RemitStoreOnSQLite的一个实例,其主要用来保存任务及断点信息至本地数据库。RemitStoreOnSQLite里持有BreakpointStoreOnSQLite对象,BreakpointStoreOnSQLite里面包含了BreakpointSQLiteHelper(用于操作数据)和BreakpointStoreOnCache(用于做数据操作之前的数据缓存)。

    @Override public void syncCacheToDB(int id) throws IOException {
            sqLiteHelper.removeInfo(id);
    
            final BreakpointInfo info = sqliteCache.get(id);
            if (info == null || info.getFilename() == null || info.getTotalOffset() <= 0) return;
    
            sqLiteHelper.insert(info);
        }
    

    最终会调用上述syncCacheToDB方法,先删除数据库中的任务信息,若缓存(创建BreakpointStoreOnCache对象时,会调用loadToCache方法将数据库中所有任务信息进行缓存)

    this.onCache = new BreakpointStoreOnCache(helper.loadToCache(),
                    helper.loadResponseFilenameToMap());
    

    中有该任务,则检查任务信息是否合法,若合法则再次将该任务及断点信息保存在本地数据库中。
    inspectTaskStart方法结束后,会进入一个do-while循环,首先做一些下载前的准备工作:

            do {
                //1.判断当前任务的下载链接长度是否大于0,否则就抛出异常;
                if (task.getUrl().length() <= 0) {
                    this.cache = new DownloadCache.PreError(
                            new IOException("unexpected url: " + task.getUrl()));
                    break;
                }
    
                if (canceled) break;
    
                //2.从缓存中获取任务的断点信息,若没有断点信息,则创建断点信息并保存至数据库;
                @NonNull final BreakpointInfo info;
                try {
                    BreakpointInfo infoOnStore = store.get(task.getId());
                    if (infoOnStore == null) {
                        info = store.createAndInsert(task);
                    } else {
                        info = infoOnStore;
                    }
                    setInfoToTask(info);
                } catch (IOException e) {
                    this.cache = new DownloadCache.PreError(e);
                    break;
                }
                if (canceled) break;
    
                // 3.创建带缓存的下载输出流;
                @NonNull final DownloadCache cache = createCache(info);
                this.cache = cache;
    
                // 4.访问下载链接判断断点信息是否合理;
                final BreakpointRemoteCheck remoteCheck = createRemoteCheck(info);
                try {
                    remoteCheck.check();
                } catch (IOException e) {
                    cache.catchException(e);
                    break;
                }
    
                //5.确定文件路径后等待文件锁释放;
                fileStrategy.getFileLock().waitForRelease(task.getFile().getAbsolutePath());
    
                // 6. 判断缓存中是否有相同的任务,若有则复用缓存中的任务的分块信息;
                OkDownload.with().downloadStrategy()
                        .inspectAnotherSameInfo(task, info, remoteCheck.getInstanceLength());
    
                try {
                    //7.检查断点信息是否是可恢复的,若不可恢复,则根据文件大小进行分块,重新下载,否则继续进行下一步;
                    if (remoteCheck.isResumable()) {
                        // 8.判断断点信息是否是脏数据(文件存在且断点信息正确且下载链接支持断点续传);
                        final BreakpointLocalCheck localCheck = createLocalCheck(info,
                                remoteCheck.getInstanceLength());
                        localCheck.check();
                        // 9.若是脏数据则根据文件大小进行分块,重新开始下载,否则从断点位置开始下载;
                        if (localCheck.isDirty()) {
                            Util.d(TAG, "breakpoint invalid: download from beginning because of "
                                    + "local check is dirty " + task.getId() + " " + localCheck);
                            fileStrategy.discardProcess(task);
                            assembleBlockAndCallbackFromBeginning(info, remoteCheck,
                                    localCheck.getCauseOrThrow());
                        } else {
                            okDownload.callbackDispatcher().dispatch()
                                    .downloadFromBreakpoint(task, info);
                        }
                    } else {
                        Util.d(TAG, "breakpoint invalid: download from beginning because of "
                                + "remote check not resumable " + task.getId() + " " + remoteCheck);
                        fileStrategy.discardProcess(task);
                        assembleBlockAndCallbackFromBeginning(info, remoteCheck,
                                remoteCheck.getCauseOrThrow());
                    }
                } catch (IOException e) {
                    cache.setUnknownError(e);
                    break;
                }
    
                // 10. 开始下载
                start(cache, info);
    
                if (canceled) break;
    
                // 11. 错误重试机制
                if (cache.isPreconditionFailed()
                        && retryCount++ < MAX_COUNT_RETRY_FOR_PRECONDITION_FAILED) {
                    store.remove(task.getId());
                    retry = true;
                } else {
                    retry = false;
                }
            } while (retry);
    

    1.判断当前任务的下载链接长度是否大于0,否则就抛出异常;2.从缓存中获取任务的断点信息,若没有断点信息,则创建断点信息并保存至数据库;3.创建带缓存的下载输出流;4.访问下载链接判断断点信息是否合理;5.确定文件路径后等待文件锁释放; 6. 判断缓存中是否有相同的任务,若有则复用缓存中的任务的分块信息;7.检查断点信息是否是可恢复的,若不可恢复,则根据文件大小进行分块,重新下载,否则继续进行下一步;8.判断断点信息是否是脏数据(文件存在且断点信息正确且下载链接支持断点续传);9.若是脏数据则根据文件大小进行分块,重新开始下载,否则从断点位置开始下载;10.开始下载。

    文件分成多少块进行下载由DownloadStrategy决定的:

        // 1 connection: [0, 1MB)
        private static final long ONE_CONNECTION_UPPER_LIMIT = 1024 * 1024; // 1MiB
        // 2 connection: [1MB, 5MB)
        private static final long TWO_CONNECTION_UPPER_LIMIT = 5 * 1024 * 1024; // 5MiB
        // 3 connection: [5MB, 50MB)
        private static final long THREE_CONNECTION_UPPER_LIMIT = 50 * 1024 * 1024; // 50MiB
        // 4 connection: [50MB, 100MB)
        private static final long FOUR_CONNECTION_UPPER_LIMIT = 100 * 1024 * 1024; // 100MiB
    
        public ResumeAvailableResponseCheck resumeAvailableResponseCheck(
                DownloadConnection.Connected connected,
                int blockIndex,
                BreakpointInfo info) {
            return new ResumeAvailableResponseCheck(connected, blockIndex, info);
        }
    
        public int determineBlockCount(@NonNull DownloadTask task, long totalLength) {
            if (task.getSetConnectionCount() != null) return task.getSetConnectionCount();
    
            if (totalLength < ONE_CONNECTION_UPPER_LIMIT) {
                return 1;
            }
    
            if (totalLength < TWO_CONNECTION_UPPER_LIMIT) {
                return 2;
            }
    
            if (totalLength < THREE_CONNECTION_UPPER_LIMIT) {
                return 3;
            }
    
            if (totalLength < FOUR_CONNECTION_UPPER_LIMIT) {
                return 4;
            }
    
            return 5;
        }
    

    文件大小在0-1MB、1-5MB、5-50MB、50-100MB、100MB以上时分别开启1、2、3、4、5个线程进行下载。

    我们重点看一下下载部分的源码,也就是start(cache,info)方法:

    void start(final DownloadCache cache, BreakpointInfo info) throws InterruptedException {
            final int blockCount = info.getBlockCount();
            final List<DownloadChain> blockChainList = new ArrayList<>(info.getBlockCount());
            for (int i = 0; i < blockCount; i++) {
                final BlockInfo blockInfo = info.getBlock(i);
                if (Util.isCorrectFull(blockInfo.getCurrentOffset(), blockInfo.getContentLength())) {
                    continue;
                }
    
                Util.resetBlockIfDirty(blockInfo);
                blockChainList.add(DownloadChain.createChain(i, task, info, cache, store));
            }
    
            if (canceled) {
                return;
            }
    
            startBlocks(blockChainList);
        }
    

    可以看到它是分块下载的,每一个分块都是一个DownloadChain实例,DownloadChain实现了Runnable接口,继续看startBlocks方法:



    对于每一个分块任务,都调用了submitChain方法,由一个线程池去处理每一个DownloadChain分块,核心代码就在这里:

        void start() throws IOException {
            final CallbackDispatcher dispatcher = OkDownload.with().callbackDispatcher();
            // 处理请求拦截链
            final RetryInterceptor retryInterceptor = new RetryInterceptor();
            final BreakpointInterceptor breakpointInterceptor = new BreakpointInterceptor();
            connectInterceptorList.add(retryInterceptor);
            connectInterceptorList.add(breakpointInterceptor);
            connectInterceptorList.add(new RedirectInterceptor());
            connectInterceptorList.add(new HeaderInterceptor());
            connectInterceptorList.add(new CallServerInterceptor());
    
            connectIndex = 0;
            final DownloadConnection.Connected connected = processConnect();
            if (cache.isInterrupt()) {
                throw InterruptException.SIGNAL;
            }
    
            dispatcher.dispatch().fetchStart(task, blockIndex, getResponseContentLength());
            // 获取数据拦截链
            final FetchDataInterceptor fetchDataInterceptor =
                    new FetchDataInterceptor(blockIndex, connected.getInputStream(),
                            getOutputStream(), task);
            fetchInterceptorList.add(retryInterceptor);
            fetchInterceptorList.add(breakpointInterceptor);
            fetchInterceptorList.add(fetchDataInterceptor);
    
            fetchIndex = 0;
            final long totalFetchedBytes = processFetch();
            dispatcher.dispatch().fetchEnd(task, blockIndex, totalFetchedBytes);
        }
    

    可以看到它主要使用责任链模式进行了两个链式调用:处理请求拦截链和获取数据拦截链。
    处理请求拦截链包含了RetryInterceptor重试拦截器、BreakpointInterceptor断点拦截器、RedirectInterceptor重定向拦截器、HeaderInterceptor头部信息处理拦截器、CallServerInterceptor请求拦截器,该链式调用过程会逐个调用拦截器的interceptConnect方法:

    public class RetryInterceptor implements Interceptor.Connect, Interceptor.Fetch {
    
        @NonNull @Override
        public DownloadConnection.Connected interceptConnect(DownloadChain chain) throws IOException {
            final DownloadCache cache = chain.getCache();
    
            // 如果产生了RetryException,则重新执行该链式调用
            while (true) {
                try {
                    if (cache.isInterrupt()) {
                        throw InterruptException.SIGNAL;
                    }
                    return chain.processConnect();
                } catch (IOException e) {
                    if (e instanceof RetryException) {
                        chain.resetConnectForRetry();
                        continue;
                    }
    
                    chain.getCache().catchException(e);
                    throw e;
                }
            }
        }
    ......
    }
    
    public class BreakpointInterceptor implements Interceptor.Connect, Interceptor.Fetch {
    
        private static final String TAG = "BreakpointInterceptor";
    
        @NonNull @Override
        public DownloadConnection.Connected interceptConnect(DownloadChain chain) throws IOException {
            final DownloadConnection.Connected connected = chain.processConnect();
            final BreakpointInfo info = chain.getInfo();
    
            if (chain.getCache().isInterrupt()) {
                throw InterruptException.SIGNAL;
            }
    
            if (info.getBlockCount() == 1 && !info.isChunked()) {
                // 当只有一个线程进行下载文件时,如果断点信息中保存的文件长度和服务端返回的文件长度不一致,则以服务端返回的为准重新进行下载
                final long blockInstanceLength = getExactContentLengthRangeFrom0(connected);
                final long infoInstanceLength = info.getTotalLength();
                if (blockInstanceLength > 0 && blockInstanceLength != infoInstanceLength) {
                    Util.d(TAG, "SingleBlock special check: the response instance-length["
                            + blockInstanceLength + "] isn't equal to the instance length from trial-"
                            + "connection[" + infoInstanceLength + "]");
                    final BlockInfo blockInfo = info.getBlock(0);
                    boolean isFromBreakpoint = blockInfo.getRangeLeft() != 0;
    
                    final BlockInfo newBlockInfo = new BlockInfo(0, blockInstanceLength);
                    info.resetBlockInfos();
                    info.addBlock(newBlockInfo);
    
                    if (isFromBreakpoint) {
                        final String msg = "Discard breakpoint because of on this special case, we have"
                                + " to download from beginning";
                        Util.w(TAG, msg);
                        throw new RetryException(msg);
                    }
                    OkDownload.with().callbackDispatcher().dispatch()
                            .downloadFromBeginning(chain.getTask(), info, CONTENT_LENGTH_CHANGED);
                }
            }
    
            // update for connected.
            final DownloadStore store = chain.getDownloadStore();
            try {
                if (!store.update(info)) {
                    throw new IOException("Update store failed!");
                }
            } catch (Exception e) {
                throw new IOException("Update store failed!", e);
            }
    
            return connected;
        }
    ......
    }
    
    public class RedirectInterceptor implements Interceptor.Connect {
    
        //最大重定向次数
        static final int MAX_REDIRECT_TIMES = 10;
    
        private static final int HTTP_TEMPORARY_REDIRECT = 307;
      
        private static final int HTTP_PERMANENT_REDIRECT = 308;
    
        @NonNull @Override
        public DownloadConnection.Connected interceptConnect(DownloadChain chain) throws IOException {
            int redirectCount = 0;
    
            String url;
            DownloadConnection connection;
            while (true) {
    
                if (chain.getCache().isInterrupt()) {
                    throw InterruptException.SIGNAL;
                }
    
                final DownloadConnection.Connected connected = chain.processConnect();
                final int code = connected.getResponseCode();
    
                if (!isRedirect(code)) {
                    return connected;
                }
                //若需要重定向,则根据返回的新的url重新进行网络请求
                if (++redirectCount >= MAX_REDIRECT_TIMES) {
                    throw new ProtocolException("Too many redirect requests: " + redirectCount);
                }
    
                url = connected.getResponseHeaderField("Location");
                if (url == null) {
                    throw new ProtocolException(
                            "Response code is " + code + " but can't find Location field");
                }
    
                chain.releaseConnection();
    
                connection = OkDownload.with().connectionFactory().create(url);
                chain.setConnection(connection);
                chain.setRedirectLocation(url);
    
            }
        }
    
        private static boolean isRedirect(int code) {
            return code == HttpURLConnection.HTTP_MOVED_PERM
                    || code == HttpURLConnection.HTTP_MOVED_TEMP
                    || code == HttpURLConnection.HTTP_SEE_OTHER
                    || code == HttpURLConnection.HTTP_MULT_CHOICE
                    || code == HTTP_TEMPORARY_REDIRECT
                    || code == HTTP_PERMANENT_REDIRECT;
        }
    }
    
    public class HeaderInterceptor implements Interceptor.Connect {
        private static final String TAG = "HeaderInterceptor";
    
        @NonNull @Override
        public DownloadConnection.Connected interceptConnect(DownloadChain chain) throws IOException {
            final BreakpointInfo info = chain.getInfo();
            final DownloadConnection connection = chain.getConnectionOrCreate();
            final DownloadTask task = chain.getTask();
    
            // 添加User-Agent字段
            final Map<String, List<String>> userHeader = task.getHeaderMapFields();
            if (userHeader != null) Util.addUserRequestHeaderField(userHeader, connection);
            if (userHeader == null || !userHeader.containsKey(USER_AGENT)) {
                Util.addDefaultUserAgent(connection);
            }
    
            //添加Range字段
            final int blockIndex = chain.getBlockIndex();
            final BlockInfo blockInfo = info.getBlock(blockIndex);
            if (blockInfo == null) {
                throw new IOException("No block-info found on " + blockIndex);
            }
    
            String range = "bytes=" + blockInfo.getRangeLeft() + "-";
            range += blockInfo.getRangeRight();
    
            connection.addHeader(RANGE, range);
            Util.d(TAG, "AssembleHeaderRange (" + task.getId() + ") block(" + blockIndex + ") "
                    + "downloadFrom(" + blockInfo.getRangeLeft() + ") currentOffset("
                    + blockInfo.getCurrentOffset() + ")");
    
            // 如果有Etag信息,则添加If-Match字段
            final String etag = info.getEtag();
            if (!Util.isEmpty(etag)) {
                connection.addHeader(IF_MATCH, etag);
            }
    
            if (chain.getCache().isInterrupt()) {
                throw InterruptException.SIGNAL;
            }
    
            OkDownload.with().callbackDispatcher().dispatch()
                    .connectStart(task, blockIndex, connection.getRequestProperties());
    
            DownloadConnection.Connected connected = chain.processConnect();
    
            Map<String, List<String>> responseHeaderFields = connected.getResponseHeaderFields();
            if (responseHeaderFields == null) responseHeaderFields = new HashMap<>();
    
            OkDownload.with().callbackDispatcher().dispatch().connectEnd(task, blockIndex,
                    connected.getResponseCode(), responseHeaderFields);
            if (chain.getCache().isInterrupt()) {
                throw InterruptException.SIGNAL;
            }
    
            // 检查Etag字段是否一致
            final DownloadStrategy strategy = OkDownload.with().downloadStrategy();
            final DownloadStrategy.ResumeAvailableResponseCheck responseCheck =
                    strategy.resumeAvailableResponseCheck(connected, blockIndex, info);
            responseCheck.inspect();
           
            //获取Content-Length、Content-Range字段信息
            final long contentLength;
            final String contentLengthField = connected.getResponseHeaderField(CONTENT_LENGTH);
            if (contentLengthField == null || contentLengthField.length() == 0) {
                final String contentRangeField = connected.getResponseHeaderField(CONTENT_RANGE);
                contentLength = Util.parseContentLengthFromContentRange(contentRangeField);
            } else {
                contentLength = Util.parseContentLength(contentLengthField);
            }
    
            chain.setResponseContentLength(contentLength);
            return connected;
        }
    }
    
    public class CallServerInterceptor implements Interceptor.Connect {
        @NonNull @Override
        public DownloadConnection.Connected interceptConnect(DownloadChain chain) throws IOException {
            OkDownload.with().downloadStrategy().inspectNetworkOnWifi(chain.getTask());
            OkDownload.with().downloadStrategy().inspectNetworkAvailable();
            \\进行网络请求,获得响应
            return chain.getConnectionOrCreate().execute();
        }
    }
    

    获取数据拦截链包含了RetryInterceptor重试拦截器、BreakpointInterceptor断点拦截器、RedirectInterceptor重定向拦截器、HeaderInterceptor头部信息处理拦截器、FetchDataInterceptor获取数据拦截器,该链式调用过程会逐个调用拦截器的interceptFetch方法:

    public class RetryInterceptor implements Interceptor.Connect, Interceptor.Fetch {
    
    ......
    
        @Override
        public long interceptFetch(DownloadChain chain) throws IOException {
            try {
                return chain.processFetch();
            } catch (IOException e) {
                chain.getCache().catchException(e);
                throw e;
            }
        }
    }
    
    
    public class BreakpointInterceptor implements Interceptor.Connect, Interceptor.Fetch {
    
    ......
    
        @Override
        public long interceptFetch(DownloadChain chain) throws IOException {
            final long contentLength = chain.getResponseContentLength();
            final int blockIndex = chain.getBlockIndex();
            final boolean isNotChunked = contentLength != CHUNKED_CONTENT_LENGTH;
    
            long fetchLength = 0;
            long processFetchLength;
    
            final MultiPointOutputStream outputStream = chain.getOutputStream();
    
            try {
                while (true) {
                    //循环调用FetchDataInterceptor拦截器读写文件
                    processFetchLength = chain.loopFetch();
                    if (processFetchLength == -1) {
                        break;
                    }
    
                    fetchLength += processFetchLength;
                }
            } finally {
                chain.flushNoCallbackIncreaseBytes();
                if (!chain.getCache().isUserCanceled()) outputStream.done(blockIndex);
            }
    
            if (isNotChunked) {
                outputStream.inspectComplete(blockIndex);
    
                if (fetchLength != contentLength) {
                    throw new IOException("Fetch-length isn't equal to the response content-length, "
                            + fetchLength + "!= " + contentLength);
                }
            }
    
            return fetchLength;
        }
    
    ......
    
    
    public class FetchDataInterceptor implements Interceptor.Fetch {
    
        private final InputStream inputStream;
    
        private final byte[] readBuffer;
        private final MultiPointOutputStream outputStream;
        private final int blockIndex;
        private final DownloadTask task;
        private final CallbackDispatcher dispatcher;
    
        public FetchDataInterceptor(int blockIndex,
                                    @NonNull InputStream inputStream,
                                    @NonNull MultiPointOutputStream outputStream,
                                    DownloadTask task) {
            this.blockIndex = blockIndex;
            this.inputStream = inputStream;
            this.readBuffer = new byte[task.getReadBufferSize()];
            this.outputStream = outputStream;
    
            this.task = task;
            this.dispatcher = OkDownload.with().callbackDispatcher();
        }
    
        @Override
        public long interceptFetch(DownloadChain chain) throws IOException {
            if (chain.getCache().isInterrupt()) {
                throw InterruptException.SIGNAL;
            }
    
            OkDownload.with().downloadStrategy().inspectNetworkOnWifi(chain.getTask());
            // 读取数据
            int fetchLength = inputStream.read(readBuffer);
            if (fetchLength == -1) {
                return fetchLength;
            }
    
            //写文件
            outputStream.write(blockIndex, readBuffer, fetchLength);
    
            // 判断是否回调下载进度
            chain.increaseCallbackBytes(fetchLength);
            if (this.dispatcher.isFetchProcessMoment(task)) {
                chain.flushNoCallbackIncreaseBytes();
            }
    
            return fetchLength;
        }
    }
    
    

    每一个DownloadChain都完成后,最终会调用inspectTaskEnd方法,从数据库中删除该任务,并回调通知任务完成。这样,一个完整的下载任务就完成了。总体流程如下:

    OkDownload的优势在于:
    1.OkDownload内部使用的网络请求框架默认为OkHttp,OkHttp底层使用的IO库为Okio,相较于原生Java IO流,它更加简便高效。
    2.使用了数据库缓存+内存缓存的二级缓存模式,操作效率更高。
    3.功能更完善,除了多线程断点续传外,还提供了暂停功能,多种回调监听功能,多任务管理功能等。
    4.更加可靠:下载前有多重检查机制来判断重新下载还是从断点处下载;每次从断点续传时,都会对比响应信息跟之前是否一致;对重定向做了处理;有错误重试机制。
    5.可配置性高,可以注入自定义组件。

    相关文章

      网友评论

          本文标题:OkDownload源码分析

          本文链接:https://www.haomeiwen.com/subject/hzhicqtx.html