https://www.cnblogs.com/Zzbj/p/16373953.html
接收
@app.route('/upload/accept', methods=['GET', 'POST'])
def index(): # 一个分片上传后被调用
if request.method == 'POST':
upload_file = request.files['file']
task = request.form.get('task_id') # 获取文件唯一标识符
chunk = request.form.get('chunk', 0) # 获取该分片在所有分片中的序号
filename = '%s%s' % (task, chunk) # 构成该分片唯一标识符
upload_file.save('./upload/%s' % filename) # 保存分片到本地
return rt('./index.html')
@app.route('/upload/complete', methods=['GET'])
def upload_success(): # 所有分片均上传完后被调用
target_filename = request.args.get('filename') # 获取上传文件的文件名
task = request.args.get('task_id') # 获取文件的唯一标识符
chunk = 0 # 分片序号
with open('./upload/%s' % target_filename, 'wb') as target_file: # 创建新文件
while True:
try:
filename = './upload/%s%d' % (task, chunk)
source_file = open(filename, 'rb') # 按序打开每个分片
target_file.write(source_file.read()) # 读取分片内容写入新文件
source_file.close()
except IOError:
break
chunk += 1
os.remove(filename) # 删除该分片,节约空间
return rt('./index.html')
发送
https://www.shuzhiduo.com/A/WpdKNvgA5V/
import requests
from requests_toolbelt import MultipartEncoder
import os
import math
import hashlib
def get_md5(path):
m = hashlib.md5()
with open(path, 'rb') as f:
for line in f:
m.update(line)
md5code = m.hexdigest()
return md5code
def upload_slice_file(url, file_path):
chunk_size = 1024*1024*2
filename = file_path.split("\\")[-1:][0]
total_size = os.path.getsize(file_path)
current_chunk = 1
total_chunk = math.ceil(total_size/chunk_size)
while current_chunk <= total_chunk:
start = (current_chunk - 1)*chunk_size
end = min(total_size, start+chunk_size)
with open(file_path, 'rb') as f:
f.seek(start)
file_chunk_data = f.read(end-start)
data = MultipartEncoder(
fields={
"filename": filename,
"totalSize": str(total_size),
"currentChunk": str(current_chunk),
"totalChunk": str(total_chunk),
"md5": get_md5(file_path),
"file": (filename, file_chunk_data, 'application/octet-stream')
}
)
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36",
"Accept": "application/json",
"Accept-Encoding": "gzip, deflate",
"Connection": "keep-alive",
"Content-Type": data.content_type
}
with requests.post(url, headers=headers, data=data) as response:
assert response.status_code == 200
current_chunk = current_chunk + 1
网友评论