美文网首页
python多线程实现S3文件分段上传下载

python多线程实现S3文件分段上传下载

作者: bloody | 来源:发表于2016-02-01 12:17 被阅读0次

    对于大对象的存取,s3提供了分段上传/下载的接口,基于此,可以进一步实现多线程并行传输或者断点续传等功能。

    本实现使用了亚马逊的boto库
    https://pypi.python.org/pypi/boto

    以及filechunkio库
    https://pypi.python.org/pypi/filechunkio/

    1.分段上传

    为了分段上传一个大文件,需要先将文件分段,然后使用云盘提供的Multipart接口上传每个分段即可,最后云盘将在后端把所有分段合并成一个Object。
    下面的例子中使用了FileChunkIO分段读取文件:

    chunksize=4096*1024
    chunkcnt=int(math.ceil(filesize*1.0/chunksize))
    mp=bucket.initiate_multipart_upload("object-1")  #创建Multipart对象
    for i in range(0,chunkcnt):
    offset=chunksize*i
    len=min(chunksize,filesize-offset)
    fp=FileChunkIO(“/path/to/file”,'r',offset=offset,bytes=len)  #创建文件的分段
    mp.upload_part_from_file(fp,part_num=i+1)  #上传每个分段
    mp.complete_upload()
    

    完成分段上传之后,需要使用Multipart的complete_upload()或者cancel_upload()结束分段上传,释放Multipart占用的资源。

    2.分段下载

    为了使用分段下载,需要指定分段在文件中的起始偏移地址和终止偏移地址,然后构造包含Range报文头的HTTP Get请求下载相应的分段。
    示例如下:

    chunksize=4096*1024
    chunkcnt=int(math.ceil(filesize*1.0/chunksize))
    for i in range(0,chunkcnt):
     offset=chunksize*i
     len=min(chunksize,filesize-offset)
     resp=conn.make_request("GET",bucket.name,filename,headers={"Range":"bytes=%d-%d" % (offset,offset+len)})
     data=resp.read(len)
     if data == "":
        break
     fp.write(data)
    

    3.多线程的完整实现

    import shutil
    import math
    import string
    import io
    from io import BytesIO
    import os
    from os import path
    import sys
    import traceback
    import boto
    import boto.s3.connection
    from filechunkio import FileChunkIO
    import threading
    import Queue
    import time
    
    class Chunk:
        num = 0
        offset = 0
        len = 0
        def __init__(self, n, o, l):  
            self.num = n
            self.offset = o
            self.len = l
    
    chunksize = 8 << 20
    
    def init_queue(filesize):
        chunkcnt = int(math.ceil(filesize*1.0/chunksize))
        q = Queue.Queue(maxsize = chunkcnt)
        for i in range(0,chunkcnt):
            offset = chunksize*i
            len = min(chunksize, filesize-offset)
            c = Chunk(i+1, offset, len)
            q.put(c)
        return q
    
    def upload_chunk(filepath, mp, q, id):
        while (not q.empty()):
            chunk = q.get()
            fp = FileChunkIO(filepath, 'r', offset=chunk.offset, bytes=chunk.len)
            mp.upload_part_from_file(fp, part_num=chunk.num)
            fp.close()
            q.task_done()
    
    def upload_file_multipart(filepath, keyname, bucket, threadcnt=8):
        filesize = os.stat(filepath).st_size
        mp = bucket.initiate_multipart_upload(keyname)
        q = init_queue(filesize)
        for i in range(0, threadcnt):
            t = threading.Thread(target=upload_chunk, args=(filepath, mp, q, i))
            t.setDaemon(True)
            t.start()
        q.join()
        mp.complete_upload()
    
    def download_chunk(filepath, bucket, key, q, id):
        while (not q.empty()):
            chunk = q.get()
            offset = chunk.offset
            len = chunk.len
            resp = bucket.connection.make_request("GET", bucket.name, key.name, headers={"Range":"bytes=%d-%d" % (offset, offset+len)})
            data = resp.read(len)
            fp = FileChunkIO(filepath, 'r+', offset=offset, bytes=len)
            fp.write(data)
            fp.close()
            q.task_done()
    
    def download_file_multipart(key, bucket, filepath, threadcnt=8):
        if type(key) == str:
            key=bucket.get_key(key)
        filesize=key.size
        if os.path.exists(filepath):
            os.remove(filepath)
        os.mknod(filepath)
        q = init_queue(filesize)
        for i in range(0, threadcnt):
            t = threading.Thread(target=download_chunk, args=(filepath, bucket, key, q, i))
            t.setDaemon(True)
            t.start()
        q.join()
    
    access_key = "test"
    secret_key = "123456"
    host = "*****"
    
    filepath = "/search/2G.file"
    keyname = "2G.file"
    
    threadcnt = 8
    
    conn = boto.connect_s3(
        aws_access_key_id = access_key,
        aws_secret_access_key = secret_key,
        host = host,
        is_secure=False,
        calling_format = boto.s3.connection.OrdinaryCallingFormat(),
        )
    
    bucket = conn.get_bucket("test")
    
    time1= time.time()
    upload_file_multipart(filepath, keyname, bucket, threadcnt)
    time2= time.time()
    print "upload %s with %d threads use %d seconds" % (keyname, threadcnt, time2-time1)
    
    key = bucket.get_key(keyname)
    
    download_filepath = path.join(".", keyname)
    time1= time.time()
    download_file_multipart(key, bucket, download_filepath, threadcnt)
    time2= time.time()
    print "download %s with %d threads use %d seconds" % (keyname, threadcnt, time2-time1)
    

    相关文章

      网友评论

          本文标题:python多线程实现S3文件分段上传下载

          本文链接:https://www.haomeiwen.com/subject/wzeskttx.html