缘来如此 2019-12-04
这段时间看了看工作室的工具库的下载组件,发现其存在一些问题:
1.下载核心逻辑有 bug,在暂停下载或下载失败等情况时有概率无法顺利完成下载。
2.虽然原来的设计是采用多线程断点续传的设计,但打了一下日志发现其实下载任务都是在同一个线程下串行执行,并没有起到加快下载速度的作用。
考虑到原来的代码并不复杂,因此对这部分下载组件进行了重写。这里记录一下里面的多线程断点续传功能的实现。
请查看完整的PDF版
(更多完整项目下载。未完待续。源码。图文知识后续上传github。)
可以点击关于我联系我获取完整PDF
(VX:mm14525201314)
首先我们谈一谈,多线程下载的意义。
在日常的场景下,网络中不可能只有下载方与服务器之间这样一条连接,为了避免在这样的场景下的网络拥塞,TCP 协议通过调节窗口的大小来避免出现拥塞,但这个窗口的大小可能没办法达到我们预期的效果:充分利用我们的带宽。因此我们可以采用多个 TCP 连接的形式来提高我们带宽的利用率,从而加快下载速度。
打个比喻就是我们要从一个水缸中用抽水机通过水管抽水,由于管子的直径等等的限制,我们单条管子无法完全利用我们的抽水机的抽水动力。因此我们就将这些抽水的任务分成了多份,分摊到多个管子上,这样就可以更充分的利用我们的抽水机动力,从而提高抽水的速度。
因此,我们使用多线程下载的主要意义就是——提高下载速度。
前面提到了我们主要的目的是将一个总的下载任务分摊到多个子任务中,比如假设我们用 5 个线程下载这个文件,那么我们就可以对一个长度为 N 的任务进行如下图的均分:
但真实场景下往往 N 都不是刚好为 5 的倍数的,因此对于最后一个任务还需要加上剩余的任务量,也就是 N/5+N%5。
上面的任务分配我们已经了解了,看起来很理想,但有一个问题,我们如何实现向服务器只请求这个文件的某一段而不是全部呢?
我们可以通过在请求头中加入 Range 字段来指定请求的范围,从而实现指定某一段的数据。
如:RANGE bytes=10000-19999
就指定了 10000-19999 这段字节的数据
所以我们的核心思想就是通过它拿到文件对应字节段的 InputStream,然后对它读取并写入文件。
下面再讲讲文件写入问题,由于我们是多线程下载,因此文件并不是每次都是从前往后一个个字节写入的,随时可能在文件的任何一个地方写入数据。因此我们需要能够在文件的指定位置写入数据。这里我们用到了RandomAccessFile
来实现这个功能。
RandomAccessFile
是一个随机访问文件类,同时整合了 FileOutputStream
和 FileInputStream
,支持从文件的任何字节处读写数据。通过它我们就可以在文件的任何字节处写入数据。
接下来简单讲讲我们这里是如何使用 RandomAccessFile
的。我们对于每个子任务来说都有一个开始和结束的位置。每个任务都可以通过 RandomAccessFile::seek
跳转到文件的对应字节位置,然后从该位置开始读取 InputStream
并写入。
这样,就实现了不同线程对文件的随机写入。
由于我们在真正开始下载之前,我们需要先将任务分配到各个线程,因此我们需要先了解到文件的大小。
为了获取到文件的大小,我们用到 Response Headers
中的 Content-Length
字段。
如下图所示,可以看到,打开该下载请求的链接后,Response Headers
中包含了我们需要的 Content-Length
,也就是该文件的大小,单位是字节。
对于多个子任务,我们如何实现它们的断点续传呢?
其实原理很简单,只需要保证每个子任务的下载进度能够被即时地记录即可。这样继续下载时只需要读取这些下载记录,从上次下载结束的位置开始下载即可。
它的实现有很多方式,只要能做到数据持久化即可。这里我使用的是数据库来实现。
这样,我们的子任务需要拥有一些必要的信息
completedSize
:当前下载完成大小taskSize
:子任务总大小startPos
:子任务开始位置currentPos
:子任务进行到的位置endPos
:子任务结束位置通过这些信息,我们就能够记录子任务的下载进度从而恢复我们之前的下载,实现断点续传。
下面我们用代码来实现这样一个多线程下载功能。
首先,我们定义一下下载中的各个状态:
public class DownloadStatus { public static final int IDLE = 233; // 空闲,默认状态 public static final int COMPLETED = 234; // 完成 public static final int DOWNLOADING = 235; // 下载中 public static final int PAUSE = 236; // 暂停 public static final int ERROR = 237; // 出错 }
可以看到,这里定义了如上的五种状态。
这里需要用到如数据库及 HTTP 请求的功能,我们这里定义其接口如下,具体实现各位可以根据需要自己实现:
public interface DownloadDbHelper { /** * 从数据库中删除子任务记录 * @param task 子任务记录 */ void delete(SubDownloadTask task); /** * 向数据库中插入子任务记录 * @param task 子任务记录 */ void insert(SubDownloadTask task); /** * 在数据库中更新子任务记录 * @param task 子任务记录 */ void update(SubDownloadTask task); /** * 获取所有指定Task下的子任务记录 * @param taskTag Task的Tag * @return 子任务记录 */ List<SubDownloadTask> queryByTaskTag(String taskTag); }
public interface DownloadHttpHelper { /** * 获取文件总长度 * @param url 下载url * @param callback 获取文件长度CallBack */ void getTotalSize(String url, NetCallback<Long> callback); /** * 获取InputStream * @param url 下载url * @param start 开始位置 * @param end 结束位置 * @param callback 获取字节流的CallBack */ void getStreamByRange(String url, long start, long end, NetCallback<InputStream> callback); }
我们先从上到下,从子任务开始实现。在我的设计中,它具有如下的成员变量:
@Entity public class SubDownloadTask implements Runnable { public static final int BUFFER_SIZE = 1024 * 1024; private static final String TAG = SubDownloadTask.class.getSimpleName(); @Id private Long id; private String url; // 文件下载的 url private String taskTag; // 父任务的 Tag private long taskSize; // 子任务大小 private long completedSize; // 子任务完成大小 private long startPos; // 开始位置 private long currentPos; // 当前位置 private long endPos; // 结束位置 private volatile int status; // 当前下载状态 @Transient private SubDownloadListener listener; // 子任务下载监听,主要用于提示父任务 @Transient private File saveFile; // 要保存到的文件 ... }
由于这里的数据库的操作是用 GreenDao
实现,因此这里有一些相关注解,各位可以忽略。
InputStream
获取可以看到,子任务是一个 Runnable,我们可以通过其 run 方法开始下载,这样就可以通过如 ExecutorService 来开启多个线程执行子任务。
我们看到其 run 方法:
@Override public void run() { status = DownloadStatus.DOWNLOADING; DownloadManager.getInstance() .getHttpHelper() .getStreamByRange(url, currentPos, endPos, new NetCallback<InputStream>() { @Override public void onResult(InputStream inputStream) { listener.onSubStart(); writeFile(inputStream); } @Override public void onError(String message) { listener.onSubError("文件流获取失败"); status = DownloadStatus.ERROR; } }); }
可以看到,我们获取了其从 currentPos
到 endPos
端的字节流,通过其 Response Body 拿到了它的 InputStream
,然后调用了 writeFile(InputStream)
方法进行文件的写入。
文件写入
接下来看到 writeFile
方法:
private void writeFile(InputStream in) { try { RandomAccessFile file = new RandomAccessFile(saveFile, "rwd"); // 通过 saveFile 建立RandomAccessFile file.seek(currentPos); // 跳转到对应位置 byte[] buffer = new byte[BUFFER_SIZE]; while (true) { // 循环读取 InputStream,直到暂停或读取结束 if (status != DownloadStatus.DOWNLOADING) { // 状态不为 DOWNLOADING,停止下载 break; } int offset = in.read(buffer, 0, BUFFER_SIZE); if (offset == -1) { // 读取不到数据,说明读取结束 break; } // 将读取到的数据写入文件 file.write(buffer, 0, offset); // 下载数据并在数据库中更新 currentPos += offset; completedSize += offset; DownloadManager.getInstance() .getDbHelper() .update(this); // 通知父任务下载进度 listener.onSubDownloading(offset); } if(status == DownloadStatus.DOWNLOADING) { // 下载完成 status = DownloadStatus.COMPLETED; // 通知父任务下载完成 listener.onSubComplete(completedSize); } file.close(); in.close(); } catch (IOException e) { e.printStackTrace(); listener.onSubError("文件下载失败"); status = DownloadStatus.ERROR; resetTask(); } }
具体流程可以看代码中的注释。可以看到,子任务实际上就是循环读取 InputStream
,并写入文件,同时将下载进度同步到数据库。
父任务也就是我们具体的下载任务,我们同样先看到成员变量:
public class DownloadTask implements SubDownloadListener { private static final String TAG = DownloadTask.class.getSimpleName(); private String tag; // 下载任务的 Tag,用于区分不同下载任务 private String url; // 下载 url private String savePath; // 保存路径 private String fileName; // 保存文件名 private DownloadListener listener; // 下载监听 private long completeSize; // 下载完成大小 private long totalSize; // 下载任务总大小 private int status; // 当前下载进度 private int threadNum; // 线程数(由外部设置的每个任务的下载线程数) private File file; // 保存文件 private List<SubDownloadTask> subTasks; // 子任务列表 private ExecutorService mExecutorService; // 线程池,用于执行子任务 ... }
对于一个下载任务,可以通过 download 方法开始执行:
public void download() { listener.onStart(); subTasks = querySubTasks(); status = DownloadStatus.DOWNLOADING; if (subTasks.isEmpty()) { // 是新任务 downloadNewTask(); } else if (subTasks.size() == threadNum) { // 不是新任务 downloadExistTask(); } else { // 不是新任务,但下载线程数有误 listener.onError("断点数据有误"); resetTask(); } }
可以看到,我们先将子任务列表从数据库中读取出来。
downloadNewTask
方法。downloadExistTask
方法。我们先看到 downloadNewTask
方法:
DownloadManager.getInstance() .getHttpHelper() .getTotalSize(url, new NetCallback<Long>() { @Override public void onResult(Long total) { completeSize = 0L; totalSize = total; initSubTasks(); startAsyncDownload(); } @Override public void onError(String message) { error("获取文件长度失败"); } });
可以看到,获取到总长度后,通过调用 initSubTasks
方法,对子任务列表进行了初始化(计算子任务长度等),然后调用了 startAsyncDownload
方法后通过 ExecutorService
运行子任务进入子任务进行下载。
我们看到 initSubTasks
方法:
private void initSubTasks() { long averageSize = totalSize / threadNum; for (int taskIndex = 0; taskIndex < threadNum; taskIndex++) { long taskSize = averageSize; if (taskIndex == threadNum - 1) { // 最后一个任务,则 size 还需要加入剩余量 taskSize += totalSize % threadNum; } long start = 0L; int index = taskIndex; while (index > 0) { start += subTasks.get(index - 1).getTaskSize(); index--; } long end = start + taskSize - 1; // 注意这里 SubDownloadTask subTask = new SubDownloadTask(); subTask.setUrl(url); subTask.setStatus(DownloadStatus.IDLE); subTask.setTaskTag(tag); subTask.setCompletedSize(0); subTask.setTaskSize(taskSize); subTask.setStartPos(start); subTask.setCurrentPos(start); subTask.setEndPos(end); subTask.setSaveFile(file); subTask.setListener(this); DownloadManager.getInstance() .getDbHelper() .insert(subTask); subTasks.add(subTask); } }
可以看到就是计算每个任务的大小及开始及结束点的位置,这里要注意的是 endPos 需要 -1,否则各个任务的下载位置会重叠,并且最后一个任务会多下载一个字节导致如文件损坏等影响。具体原因就是比如一个大小为 500 的文件,则应当是 0-499 而不是 0-500。
接下来我们看看 downloadExistTask
方法:
private void downloadExistTask() { // 不是新任务,且下载线程数无误,计算已下载大小 completeSize = countCompleteSize(); totalSize = countTotalSize(); startAsyncDownload(); }
这里其实很简单,遍历子任务列表计算已下载量及总任务量,并调用 startAsyncDownload 开始多线程下载。
具体执行子任务我们可以看到 startAsyncDownload
方法:
private void startAsyncDownload() { for (SubDownloadTask subTask : subTasks) { if (subTask.getCompletedSize() < subTask.getTaskSize()) { // 只下载没有下载结束的子任务 mExecutorService.execute(subTask); } } }
可以看到,这里其实只是通过 ExecutorService 执行对应子任务(Runnable)而已。
我们接下来看到 pause 方法:
public void pause() { stopAsyncDownload(); status = DownloadStatus.PAUSE; listener.onPause(); }
可以看到,这里只是调用了 stopAsyncDownload
方法停止子任务。
看到 stopAsyncDownload
方法:
private void stopAsyncDownload() { for (SubDownloadTask subTask : subTasks) { if (subTask.getStatus() != DownloadStatus.COMPLETED) { // 下载完成的不再取消 subTask.cancel(); } } }
可以看到,调用了子任务的 cancel
方法。
继续看到子任务的 cancel
方法:
void cancel() { status = DownloadStatus.PAUSE; listener.onSubCancel(); }
这里很简单,仅仅是将下载状态设置为了 PAUSE,这样在写入文件的下一次 while 循环时便会中止循环从而结束 Runnable
的执行。
看到 cancel
方法:
public void cancel() { stopAsyncDownload(); resetTask(); listener.onCancel(); }
可以看到和暂停的逻辑差不多,只是在暂停后还需要对子任务重置从而使得下次下载从头开始。
前面提到,外部可以通过 DownloadListener
监听下载的进度,下面是 DownloadListener
接口的定义:
public interface DownloadListener { default void onStart() {} default void onDownloading(long progress, long total) {} default void onPause() {} default void onCancel() {} default void onComplete() {} default void onError(String message) {} }
我们实时的下载进度其实是在子任务的保存文件过程中才能体现出来的,同样,子任务的下载失败也需要通知到 DownloadListener
,这是怎么做到的呢?
前面提到了,我们还定义了一个 SubDownloadListener
,其监听者就是子任务的父任务。通过监听我们可以将子任务状态反馈到父任务,父任务再根据具体情况反馈数据给 DownloadListener
。
public interface SubDownloadListener { void onSubStart(); void onSubDownloading(int offset); void onSubCancel(); void onSubComplete(long completeSize); void onSubError(String message); }
比如之前看到,每次下载失败我们都会调用 onSubError
,每次读取 offset 的数据都会调用 onSubDownload(offset)
,每个任务下载失败都会调用 onSubComplete(completeSize)
。这样,我们子任务的下载状态就成功返回给了上层。
我们接着看看上层是如何处理的:
@Override public void onSubStart() {} @Override public void onSubDownloading(int offset) { synchronized (this) { completeSize = completeSize + offset; listener.onDownloading(completeSize, totalSize); } } @Override public void onSubCancel() {} @Override public void onSubComplete(long completeSize) { checkComplete(); } @Override public void onSubError(String message) { error(message); }
可以看到,每次下载到一段数据,它都会把数据量返回上来,此时 completeSize
就加上了对应的 offset,然后再将新的 completeSize
通知给监听者,这样就实现了下载进度的监听。这里之所以加锁是因为会有多个线程(子任务线程)对 completeSize
进行操作,加锁保证线程安全。
而每次有子任务完成,它都会调用 checkComplete
方法检查是否下载完成,若每个子任务都下载完成,则说明任务下载完成,然后通知监听者。
同样的,每次子任务出现错误,都会通知监听者出现错误,并做一些错误情况下的处理。
到这里,这篇文章就结束了,我们成功实现了多线程断点续传下载功能。基于这个原理,我们可以做一些上层的封装实现一个文件下载框架。
请查看完整的PDF版
(更多完整项目下载。未完待续。源码。图文知识后续上传github。)
可以点击关于我联系我获取完整PDF
(VX:mm14525201314)