美文网首页
python3 多线程编程实战: http多线程下载器的编写

python3 多线程编程实战: http多线程下载器的编写

作者: mudssky | 来源:发表于2019-06-08 21:18 被阅读0次

    python3 多线程编程实战: http多线程下载器的编写
    说到多线程的应用,这种并发下载的情况显然比较适合。也是日常生活中使用会比较广泛的一个应用。
    当我们编写爬虫下载一些比较大的资源的时候,比如说视频。很多情况下使用多线程都能极大提升下载速度。

    001.range字段

    http分片下载的核心在于header中的Range字段。当我们请求文件的时候,得到的http响应中会有Content-Length字段,结合这两个字段我们就可以对文件进行分段下载了。

    我们在ipython shell交互环境中用requests进行测试
    请求ngnix服务器上的一个资源

    import requests
    response = requests.head('http://47.97.164.148:3457/1.mp4')
    print(response.headers)
    
    {'Server': 'nginx', 'Date': 'Sat, 08 Jun 2019 12:41:02 GMT', 'Content-Type': 'video/mp4', 'Content-Length': '413634200', 'Last-Modified': 'Thu, 06 Jun 2019 17:50:18 GMT', 'Connection': 'close', 'ETag': '"5cf9525a-18a78e98"', 'Accept-Ranges': 'bytes'}
    

    我们只要在请求头中包含range字段,就可以请求指定区间的数据
    比如 Range:bytes=0-1048576
    使用httpie工具,进行测试,我们发现返回的文件大小 为 1048577,正好是0-1048576这个区间的文件大小

    PS C:\Users\mudssky\Desktop> http localhost:9999/45678.mp4  Range:bytes=0-1048576
    HTTP/1.1 206 Partial Content
    Connection: close
    Content-Length: 1048577
    Content-Range: bytes 0-1048576/1869150326
    Content-Type: text/plain
    Date: Sat, 08 Jun 2019 08:24:16 GMT
    ETag: "5cf10651-6f68f876"
    Last-Modified: Fri, 31 May 2019 10:47:45 GMT
    Server: nginx/1.14.1
    
    
    
    +-----------------------------------------+
    | NOTE: binary data not shown in terminal |
    +-----------------------------------------+
    

    002.文件指定位置写入seek

    关于下载过程的临时文件,我想到以下几种方案:

    1. 直接在内存中下载完后,再保存到硬盘。 缺点:如果文件太大,内存不够用怎么办
    2. 分段下载,每段一个临时文件,下载完成后合并,文件名可以记录段的id。缺点,下载完成后再合并一次,磁盘传输*2,耗电耗硬盘。这种一般常见于视频网站那种m3u8 ts下载,分段下载,然后用ffmpeg合并。
    3. 下载前创建一个相同大小的文件,往里面写数据。
    4. 3的基础上,用内存做缓冲区,内存中的数据到一定大小(而不是一下载完就写),再往磁盘里写数据,迅雷下载就是这样的。

    这里我选择方法三,因为没准备搞多复杂,先实现一个基本功能,以后有需要再加功能。

    创建文件用wb二进制写入,truncate可以截取一定的文件大小

    self.file = open(self.path, 'wb')
    self.file.truncate(self.file_size)
    
    

    写入部分的程序,用seek来定位

     with self.lock:
                self.file.seek(part_dict['start'])
                self.file.write(response.content)
    
    

    003.断点续传

    暂时先不支持(太懒了),应该需要有一个文件记录下载完的分块,这样中途退出可以载入这个文件判断那些分块没有下载,下载那些分块就可以。
    而且有些服务器的文件其实不支持断点续传和多线程。

    004.整体程序逻辑

    整个下载类继承了threading.Thread,这样下载器本身可以作为一个线程启动.
    可以设置下载使用的线程数和下载分块的大小,所以说我想到要用队列来实现
    首先按照分块大小分配下载任务,然后无论有多少个线程,只要从队列中拿任务下载就行了。

    除了下载线程之外,我们还要显示进度条。
    所以再开一个单独进程显示进度信息


    最终效果我参照了youtube-dl的下载信息,十分简洁易懂,测试下载一个1.78gb的视频文件,下载完毕后可以正常播放

    [downloading]: 99.52% of 1782.56mb speed:160.0 mb/s ETA:0.05s
     100% of 1782.56mb in 16s
    

    还有最后一步,对比下载文件和源文件的md5
    我们可以用powershell的命令计算md5,发现最终的md5值是一致的。这个demo算是成功了。

    
    Get-FileHash -Algorithm md5 .\45678.mp4
    

    下面是程序源码:

    import threading
    import requests
    import logging
    import queue
    import time
    import os
    from requests.adapters import HTTPAdapter
    
    class MulThreadDownload(threading.Thread):
        download_thread_num = 8
        # 文件分段大小,1024*1024即1mb大小
        def __init__(self,download_url,path,filename='',download_thread_num=0,part_size=1024*1024):
            threading.Thread.__init__(self)
            self.download_url = download_url
            self.path=path
            self.file_name=filename
            self.download_thread_num=download_thread_num
            self.part_size=part_size
            self.file=None
            self.threads=[]
            self.lock=threading.Lock()
            # 共用一个session,减少tcp请求的次数
            self.session=requests.session()
            # 使用requests自带的失败重试解决方案
            self.session.mount('http://', HTTPAdapter(max_retries=5))
            self.session.mount('https://', HTTPAdapter(max_retries=5))
            self.file_size=-1
            self.downloaded_size=0
            self.taskQ=queue.Queue()
            self.mbsize=-1
        def download_thread(self,threadid):
            # 当下载任务队列为空时,线程就会退出,停止执行
            while not self.taskQ.empty():
                part_dict=self.taskQ.get(block=True,timeout=None)
                headers={'Range':'bytes={0}-{1}'.format(part_dict['start'],part_dict['end'])}
                # response=requests.get(url=self.download_url,stream=True,headers=headers)
                # 因为分段自动是小文件,所以没必要用慢速下载,直接载入内存就行了
                response=self.session.get(url=self.download_url,headers=headers)
                # with self.lock:
                #     self.file.seek(part_dict['start'])
                #     self.file.write(response.content)
                with self.lock:
                    self.file.seek(part_dict['start'])
                    self.file.write(response.content)
                    self.downloaded_size+=part_dict['end']-part_dict['start']
                logging.debug(str(threadid)+' download succeed: '+str(part_dict))
                # for chunk in response.iter_content(chu)
        def analysis_filename(self):
            # 从url地址中获取文件名
            filename = self.download_url.split('/')[-1]
            logging.debug('analysis filename form url,got{0},from{1}'.format(filename,self.download_url))
            return filename
        def progress_bar_thread(self):
            start_time=int(time.time())
            sleep_time=0.1
            former_size=0
            while self.downloaded_size!=self.file_size:
                # 计算下载速度
                speed = (self.downloaded_size-former_size)*(1/sleep_time)
                speedstr=self.speed_str(speed)
                former_size=self.downloaded_size
    
                # 计算剩余时间
                remaining_size=self.file_size-self.downloaded_size
                if speed>0:
                    remaining_seconds=round(remaining_size/speed,2)
                    eta = self.ETA_str(remaining_seconds)
                else:
                    eta='???'
                # 计算下载百分比
                percentage=self.downloaded_size/self.file_size*100
    
                print('\r[downloading]: {:.2f}% of {}mb speed:{} ETA:{}'.format(percentage,self.mbsize,speedstr,eta),end='')
                time.sleep(0.1)
            # 因为一直不换行,所以下载完要换行
            end_time =int(time.time())
            print('\n 100% of {}mb in {}'.format(self.mbsize,self.ETA_str(end_time-start_time)))
        def speed_str(self,speedbytes):
            if speedbytes>1024*1024:
                return str(round(speedbytes/1024/1024,2))+' mb/s'
            elif speedbytes>1024:
                return str(round(speedbytes/1024,2))+' kb/s'
            else:
                return str(speedbytes)+' b/s'
        def ETA_str(self,seconds):
            if seconds>60*60:
                hour=seconds//3600
                min=(seconds-hour*3600)//60
                second=(seconds-hour*3600)%60
                str='{0}h{1}m{2}s'.format(hour,min,second)
            elif seconds>60:
                min = seconds // 60
                second = seconds % 60
                str = '{0}m{1}s'.format(min, second)
            else:
                str='{0}s'.format(seconds)
            return str
    
        def run(self):
            # 1.从url中获取文件信息,为线程分配下载资源做准备
            # 从url提取文件名
            logging.info('url:'+self.download_url)
            # 从文件响应头获取content-length。以及Accept-Ranges字段为分配下载做准备
            response_head = requests.head(self.download_url)
            if self.file_name == '':
                self.file_name = self.analysis_filename()
            # if not response_head.headers.has_key('Accept-Ranges'):
            if 'Accept-Ranges' not in response_head.headers.keys() :
                logging.fatal("不支持断点续传,不支持多线程下载")
            self.file_size = int(response_head.headers['Content-Length'])
            # 计算文件大小mb值
            self.mbsize=round(self.file_size / 1024 / 1024, 2)
            # 获取文件大小后,创建相同大小的文件
            if self.path=='':
                filepath=self.file_name
            else:
                filepath=os.path.join(self.path,self.file_name)
            self.file = open(filepath, 'wb')
            self.file.truncate(self.file_size)
            # 获得文件大小后划分下载任务。按照part_size进行划分
            # 最终分块任务数为,比如说文件大小为1gb,分块1mb,那么就要分成1024份,如果1gb多一点点,那么1025份
            part_num = self.file_size//self.part_size+1
            # 发送的请求头带上这一条就可以请求指定区间的数据 Range: bytes = 0 - 1048576
    
            # 创建下载队列,把Range的值字符串进行拼接
            # 这里存在一个小问题,请求0-1024*1024的资源,实际上返回的是1024*1024+1大小的资源,但是在用seek写的过程中,因为多出来的一字节会不断覆盖掉,所以没有问题。
            # 从代码可读性考虑上看,这样其实也好。我们先测试一下这样是否可行再改进
    
            for num in range(part_num):
                start= num*self.part_size
                end=(num+1)*self.part_size
                if num==part_num-1:
                    end=self.file_size
                # rangestr='bytes={0}-{1}'.format(num*self.part_size,endSize)
                part_dict={
                        'partnum':part_num,
                        'start':start,
                           'end':end
                }
                self.taskQ.put(part_dict)
                logging.debug(str(part_dict))
                # print(rangestr)
            # 开启进度条线程
            threading.Thread(target=self.progress_bar_thread,args=()).start()
            for i in range(self.download_thread_num):
                t=threading.Thread(target=self.download_thread,args=(i,))
                self.threads.append(t)
                t.start()
            for t in self.threads:
                t.join()
    
            # 全部线程运行结束,说明文件下载完成
            self.file.close()
    
    
    def main():
        # logging.basicConfig(format="%(asctime)s:%(levelname)s:%(message)s",level=logging.DEBUG)
        logging.basicConfig(format="%(asctime)s:%(levelname)s:%(message)s",level=logging.INFO)
        mul=MulThreadDownload(download_url=r'http://localhost:9999/45678.mp4',filename='1451.mp4',download_thread_num=2)
        mul.start()
    if __name__ == '__main__':
    #     main()
    

    相关文章

      网友评论

          本文标题:python3 多线程编程实战: http多线程下载器的编写

          本文链接:https://www.haomeiwen.com/subject/gldwxctx.html