#!coding=utf-8
import os
import logging
from time import time
from boto3.session import Session
from multiprocessing import Pool
AWS_ACCESS_KEY_ID = '9E2B670F8631B13D'
AWS_SECRET_ACCESS_KEY = 'f8f6988f2fbf54b42b9ed3c74098c1194c29a651a'
ENDPOINT_URL = 'http://xxxxx.com'
LOGGING_FILE = '/Users/xxxx/Desktop/example.log'
logging.basicConfig(filename=LOGGING_FILE, level=logging.INFO)
class S3UploadFile():
'''
初始化参数:
bucket_name:
桶名字
upload_file_dir:
要上传文件的地址
bucket_dir:
本地上传目录截取地址。
示例:
upload_file_dir = '/root/images/aaa'
bucket_dir = 'root/' * 这里的 '/' 是必须得
在桶中创建的目录结构为 'images/aaa'
processes_number:
默认:10
启动的进程数。
所有的文件会根据进程数分组。
如:115个文件,进程数 10。则会把 115 个文件分成 10 组
'''
def __init__(self, upload_file_dir, bucket_name, bucket_dir, processes_number=10):
self.aws_access_key_id = AWS_ACCESS_KEY_ID
self.aws_secret_access_key = AWS_SECRET_ACCESS_KEY
self.endpoint_url = ENDPOINT_URL
self.processes_number = processes_number
self.bucket_name = bucket_name
self.upload_file_dir = upload_file_dir
self.bucket_dir = bucket_dir
self.all_obj_path = []
self.listdir(self.upload_file_dir)
def client_connection(self):
session = Session(aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key)
return session.client('s3', endpoint_url=self.endpoint_url)
def listdir(self, upload_file_dir):
for file in os.listdir(upload_file_dir):
file_path = os.path.join(upload_file_dir, file)
if os.path.isdir(file_path):
self.listdir(file_path)
else:
self.all_obj_path.append(file_path)
def upload_file(self, file_paths):
s3_client = self.client_connection()
pid_start_time = time()
pid = os.getpid()
logging.info('PID: %s; All Files: %s' % (pid, file_paths))
for file in file_paths:
start_time = time()
file_name = file.split(self.bucket_dir)[1]
try:
s3_client.upload_file(file, self.bucket_name, file_name, ExtraArgs={'ACL': 'public-read'})
logging.info('PID: %s File upload success: %s: time: %.2f' %
(pid, file_name, time() - start_time))
except Exception as error:
logging.info('File upload failed: %s; ' % (file, error))
logging.info('Upload End PID: %s; Total Time: %.2f' %
(os.getpid(), (time() - pid_start_time)))
def split_tasks(self):
obj_total = len(self.all_obj_path)
if obj_total < self.processes_number:
self.processes_number = obj_total
divide_list = [[] for i in range(self.processes_number)]
for i in range(0, obj_total):
divide_list[i % 10].append(self.all_obj_path[i])
return divide_list
def run(self):
po = Pool(self.processes_number)
task_list = self.split_tasks()
po.map(self.upload_file, (*task_list, ))
po.close()
po.join()
print("Finish uploading !!")
if __name__ == '__main__':
upload_file_dir = '/Users/xxxx/Desktop/xxxx'
bucket_name = 'xxxxx'
bucket_dir = 'Desktop/'
s3_upload = S3UploadFile(upload_file_dir, bucket_name, bucket_dir)
s3_upload.run()
网友评论