使用exchange收取邮件(代码如下)
from exchangelib import Credentials, Account, Configuration, Version, DELEGATE
from exchangelib import EWSDateTime
import requests.adapters
from subprocess import PIPE, Popen
import pathlib, time
import shutil
import datetime
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class MyAccount():
def __init__(self, server, email_address, password):
self.email_address = email_address
cred = Credentials(username=email_address, password=password)
try:
requests.get(url='https://%s/EWS/Exchange.asmx' % server, timeout=5)
config = Configuration(server=server, credentials=cred)
self.account = Account(primary_smtp_address=email_address, autodiscover=False,
access_type=DELEGATE, credentials=cred, config=config)
except requests.exceptions.ConnectTimeout:
self.account = Account(primary_smtp_address=email_address, autodiscover=True,
access_type=DELEGATE, credentials=cred)
print('邮箱连接成功')
def receive(self, start, end, root='./email', user_id='lxooo'):
print('正在获取邮件...')
root_p = pathlib.Path(root)
if not root_p.exists():
root_p.mkdir()
mail_path = pathlib.Path('{}/{}'.format(root, self.email_address))
if not mail_path.exists():
mail_path.mkdir()
account = self.account
folders = {
'inbox': account.inbox,
'sent': account.sent,
'junk': account.junk,
'outbox': account.outbox,
'trash': account.trash,
'contacts': account.contacts
}
start = account.default_timezone.localize(
EWSDateTime.fromtimestamp(start))
end = account.default_timezone.localize(
EWSDateTime.fromtimestamp(end))
for key, box in folders.items():
path = mail_path.joinpath(key)
if not path.exists():
path.mkdir()
for msg in box.filter(datetime_received__range=(start, end)):
if msg.mime_content:
p = path.joinpath(str(msg.subject) + '{}.eml'.format(str(time.time())))
else:
p = path.joinpath(str(msg.subject) + '{}.html'.format(str(time.time())))
with open(p, 'w') as f:
if msg.mime_content:
f.write(msg.mime_content)
else:
f.write(msg.body)
base_name = root_p.joinpath(self.email_address + '--'+str(time.time()))
zip_path = shutil.make_archive(base_name=base_name, format='zip', root_dir=mail_path)
#shutil.rmtree(mail_path)
cmd = 'gpg -r {} -e {}'.format(user_id,zip_path)
p = Popen(cmd, shell=True, stdout=PIPE)
p.communicate()
p.wait()
if __name__ == '__main__':
start = (datetime.datetime.now()-datetime.timedelta(days=50)).timestamp()
end = datetime.datetime.now().timestamp()
my_account = MyAccount('smtp.outlook.com', "", '')
my_account.receive(start, end)
使用pop3收取邮件(代码如下)
import datetime
import email
import poplib
import random
import shutil
import time
from email.header import decode_header
from email.parser import Parser
from pathlib import Path
import gnupg
def dcode(str):
h = email.header.Header(str)
dh = email.header.decode_header(h)
return dh[0][0]
def pop3_get_email(email_address, pwd, pop3_server):
path = Path('./pop_email')
if not path.exists():
path.mkdir()
task_dir = path.joinpath("pop3" + "_" + "123")
if not task_dir.exists():
task_dir.mkdir()
store_path = task_dir.joinpath(email_address)
if not store_path.exists():
store_path.mkdir()
server = poplib.POP3_SSL(pop3_server, port=995)
server.user(email_address)
server.pass_(pwd)
folder_info = server.list()[1]
folder = len(folder_info)
percent = 0
for i in range(0, folder):
index = folder_info[i].decode().split(' ')[0]
mail = server.retr(index)[1]
try:
msg_content = b'\r\n'.join(mail).decode('utf-8')
except:
try:
msg_content = b'\r\n'.join(mail).decode('gbk')
except:
try:
msg_content = b'\r\n'.join(mail).decode('big5')
except:
folder -= 1
continue
# time_scope = msg_content.split('\n')
msg = Parser().parsestr(msg_content)
date = msg['date']
date = date.split("+")[0].split("-")[0].strip()
try:
utcdatetime = datetime.datetime.strptime(date, '%a, %d %b %Y %H:%M:%S')
except:
utcdatetime = datetime.datetime.strptime(date, '%d %b %Y %H:%M:%S')
percent += 1
pro = (percent / folder) * 100
print(pro)
if '2019-09-22' <= str(utcdatetime) <= '2019-10-11':
msg = Parser().parsestr(msg_content)
value = msg.get('Subject', '')
subject = decode_header(value)
if isinstance(subject[0][0], bytes):
if subject[0][1]:
subject = subject[0][0].decode(subject[0][1], errors='ignore')
else:
subject = subject[0][0].decode('utf-8', errors='ignore')
else:
subject = subject[0][0]
date = datetime.datetime.now()
subject = str(subject.replace('/', '').replace('\\', ''))
filename = subject[:100] + '----' + str(date) + '.eml'
try:
filepath = store_path.joinpath(filename)
with open(filepath, 'a') as f:
f.write(msg_content)
except:
filename = str(random.randint(1, 1000000)) + '----' + str(date) + '.eml'
filepath = store_path.joinpath(filename)
with open(filepath, 'a') as f:
f.write(msg_content)
elif '2019-09-22' > str(utcdatetime):
continue
elif str(utcdatetime) > '2019-10-11':
break
if __name__ == "__main__":
pop3_get_email('账号', '密码', 'pop.163.com')
使用imap收取邮件(代码如下)
import shutil
import time
from pathlib import Path
import imaplib
import datetime
import email
import random
from email.header import decode_header
def analysis_server(mail_address,imap=True,pop3=False):
email_add = mail_address.split('@')[1].split('.')[0]
server = {
'gmail': {'pop':{'server':'pop.gmail.com', 'port':995},
'smtp': {'server':'smtp.gmail.com', 'port':465},
'imap': {'server':'imap.gmail.com', 'port':993},},
'outlook': {'pop': {'server': 'outlook.office365.com', 'port': 995},
'smtp': {'server': 'smtp.office365.com', 'port': 587},
'imap': {'server': 'outlook.office365.com', 'port': 993}, },
'yahoo': {'pop': {'server': 'pop.mail.yahoo.com', 'port': 995},
'smtp': {'server': 'smtp.mail.yahoo.com', 'port': 465},
'imap': {'server': 'imap.mail.yahoo.com', 'port': 993}, },
'163': {'pop': {'server': 'pop.163.com', 'port': 995},
'smtp': {'server': 'smtp.163.com', 'port': 465},
'imap': {'server': 'imap.163.com', 'port': 993}, },
'126': {'pop': {'server': 'pop.126.com', 'port': 995},
'smtp': {'server': 'smtp.126.com', 'port': 465},
'imap': {'server': 'imap.126.com', 'port': 993}, },
'yeah': {'pop': {'server': 'pop.yeah.net', 'port': 995},
'smtp': {'server': 'smtp.yeah.net', 'port': 465},
'imap': {'server': 'imap.yeah.net', 'port': 993}, },
'qq': {'pop':{'server':'pop.qq.com', 'port':995},
'smtp': {'server':'smtp.qq.com', 'port':465},
'imap': {'server':'imap.qq.com', 'port':993},},
'sina': {'pop': {'server': 'pop.sina.com', 'port': 995},
'smtp': {'server': 'smtp.sina.com', 'port': 465},
'imap': {'server': 'imap.sina.com', 'port': 993}, },
'sohu': {'pop': {'server': 'pop.sohu.com', 'port': 995},
'smtp': {'server': 'smtp.sohu.com', 'port': 465},
'imap': {'server': 'imap.sohu.com', 'port': 993}, },
'139': {'pop': {'server': 'pop.139.com', 'port': 995},
'smtp': {'server': 'smtp.139.com', 'port': 465},
'imap': {'server': 'imap.139.com', 'port': 993}, },
}
email_server = server[email_add]['imap']
return email_server
def receive_imap(since, before, root_path, email_address,password,task_id):
#ed = db.session.query(EmailDownload).filter_by(email_download_id=task_id)
server = analysis_server(email_address)
path = Path(root_path)
if not path.exists():
path.mkdir()
task_dir = path.joinpath(task_id)
if not task_dir.exists():
task_dir.mkdir()
store_path = task_dir.joinpath(email_address + '--' + str(time.time()))
if not store_path.exists():
store_path.mkdir()
print(server['server'], server['port'])
imap_conn = imaplib.IMAP4_SSL(server['server'], server['port'])
# imap_conn = imaplib.IMAP4_SSL(server)#'imap.{}.com'.format(type)
imap_conn.login(email_address, password)
print('登录成功')
folder_info = imap_conn.list()[1]
total_folders = {}
for i in range(0, len(folder_info)):
selectFolder = folder_info[i].decode().split(" \"/\"")[1]
selectFolder = selectFolder.replace(" \"", "")
selectFolder = selectFolder.replace("\"", "")
folderName = folder_info[i].decode().split(" \"/\"")[0]
folderName = folderName.replace("(", "")
folderName = folderName.replace(")", "")
folderName = folderName.replace("(", "")
folderName = folderName.replace("\\HasNoChildren", "")
folderName = folderName.replace("\\HasChildren", "")
folderName = folderName.replace("\\", "")
folderName = folderName.replace(" ", "")
if folderName == "":
folderName = selectFolder
if folderName == "Noselect":
continue
total_folders.setdefault(folderName.strip(), selectFolder.strip())
if 'All' in total_folders:
del total_folders['All']
sincedata = 'since "' + since + '"'
beforedata = 'before "' + before + '"'
totalMailList = []
CurrentEmailCount = 0
folder_count = len(total_folders)
rate = 0
percent = 0
counts = 0
for index, folder in enumerate(total_folders):
base = index / folder_count
try:
status, count = imap_conn.select(total_folders[folder])
if status == 'OK':
status, currentMailList = imap_conn.search(None, sincedata, beforedata)
if currentMailList and currentMailList[0] == '':
continue
folder_t = folder.replace('/', '').replace('\\', '').replace(' ','')
folderpath = store_path.joinpath(folder_t)
if not folderpath.exists():
folderpath.mkdir()
currentMailList = currentMailList[0].split()
list_count = len(currentMailList)
counts += list_count
if list_count > 0:
for idx, mail_num in enumerate(currentMailList):
percent += 1
rate = base + (idx+1) / list_count / folder_count
try:
status, data = imap_conn.fetch(mail_num, '(UID BODY.PEEK[])')
except:
continue
date = datetime.datetime.now()
status, subject_str = imap_conn.fetch(mail_num, '(BODY.PEEK[HEADER.FIELDS (subject)])')
subject_msg = email.message_from_bytes(subject_str[0][1]).get('subject')
subject = decode_header(subject_msg)
if isinstance(subject[0][0], bytes):
if subject[0][1]:
try:
subject = subject[0][0].decode(subject[0][1], errors='ignore')
except:
continue
else:
subject = subject[0][0].decode('utf-8', errors='ignore')
else:
subject = subject[0][0]
subject = str(subject.replace('/', '').replace('\\', ''))
filename = subject[:100] + '----' + str(date) + '.eml'
try:
filepath = folderpath.joinpath(filename)
with open(filepath, 'ab') as f:
f.write(data[0][1])
except:
filename = str(random.randint(1, 1000000)) + '----' + str(date) + '.eml'
filepath = folderpath.joinpath(filename)
with open(filepath, 'ab') as f:
f.write(data[0][1])
CurrentEmailCount += 1
pro = (percent / counts) * 100
print(pro)
else:
rate = base + 1/folder_count
except:
rate = base + 1 / folder_count
continue
print(percent , counts)
base_name = store_path.joinpath(email_address + '--' + str(time.time()))
zip_path = shutil.make_archive(base_name=base_name, format='zip', root_dir=store_path)
if __name__ == '__main__':
since = '02-Nov-2016'
# since = '01-May-2019'
before = '24-Oct-2019'
root_path = './data/email'
email_address = '账号'
password = '密码'
task_id = 'lxooo'
receive_imap(
since=since,
before=before,
root_path=root_path,
email_address=email_address,
password=password,
task_id=task_id,
)
网友评论