-
开篇惯例,美女镇楼!!
文末有美女出处
前两天有个网友给我私信说想让我帮他实现一个自动批量定时点对点发送邮件的功能。闲聊之下才知道该网友的为何亟需上述功能,他的需求是这样的:
1.从excel
文件中读取5K个邮箱地址并向这些地址发送一封相同的邮件,但这些邮件每次只发20个;
2.这些邮件每次只能点对点单一发送,也就说,每个收到邮件的人只看见发件人的邮箱地址。
- 而一次添加多个收件人或抄送多人的方式会让每个收件人都能看到其他收件人的邮箱地址,显然这种方法是行不通的。 那么如何解决这个看似棘手的问题呢?接下来我们就利用
Python
大法结合Win32Com
模块来操作Excel
文件实现上述所需的功能。
@ 实现自动化操作Excel功能
- 目前
python
操作Excel
文件主要有两个模块:win32com
模块和xlrd+xlwt+xlutils
组合。其中win32
这个模块功能很强大,但是操作效率有点慢,而且这玩意只能在windows
系统上运行,可移植性差的。而另一种方式要同时需要三个模块组合,这个组合读取文件速度很快,但是不好操作excel
文件,复制的文件跟源文件的样式还有点区别。 - 通常大家使用的系统都是基于Windows的,所以这里就选择
win32com
模块操作excel
文件。接下来我们就写个类用来操作Excel
文件。这个操作类有如下功能:打开/关闭excel
文件、添加/复制/删除工作表、获取有效使用行列数...等等。
class handleExcel:
def __init__(self, fileName=None): # 打开文件或者新建文件(如果不存在的话)
...
def save(self, new_filename=None): # save excel file
'''保存文件'''
...
def close(self): # close excel file
'''关闭文件'''
...
def add_sheet(self, sheetname=None):
'''添加工作表'''
...
def copy_sheet(self, srcsheet, destsheet=None, before=None):
'''复制工作表'''
...
def delete_sheet(self, sheet):
'''删除工作表'''
...
def get_rows(self, sheet):
'''获取有效使用行数'''
...
def get_cols(self, sheet):
'''获取有效使用列数'''
...
def read_cell(self, sheet, row, col):
'''读单元格数据'''
...
def write_cell(self, sheet, row, col, value):
'''写单元格数据'''
...
def get_range(self, sheet, row1, col1, row2, col2):
'''获取某一区域的数据'''
...
def copy_range(self, sheet, row1, col1, row2, col2, tgt_row, tgt_col):
'''复制一块区域'''
...
def cut_range(self, sheet, row1, col1, row2, col2, tgt_row, tgt_col):
'''移动一块区域'''
...
def clear_range(self, sheet, row1, col1, row2, col2):
'''清除指定区域内容'''
...
def add_picture(self, sheet, pic_name, left, top, width, height):
'''添加图片'''
...
def del_row(self, sheet, row):
'''删除指定行'''
...
def del_col(self, sheet, col):
'''删除指定列'''
...
- 接下来我们在C盘中新建一个
mail.xlsx
的excel
文件,里面填写500条邮箱地址(不考虑地址是否有效)来对我们刚刚完成的excel
工具类进行测试。测试代码如下:
if __name__ == '__main__':
xls = handleExcel(r'C:\xxoo\mail.xlsx')
rows = xls.get_rows('Sheet1')
cols = xls.get_cols('Sheet1')
print(rows)
print(cols)
mail_list = []
for i in range(1, rows):
cell_value = xls.read_cell('Sheet1', i, 1)
mail_list.append(cell_value)
print(cell_value)
print(mail_list)
xls.close()
- 结果显示能正确读取mail.xlsx文件中的测试数据并打印到控制台。
完整参考代码如下:
#!/usr/bin/env python
# -*-coding:utf-8-*-
from win32com.client import Dispatch
import win32com.client
class handleExcel:
def __init__(self, fileName=None): # 打开文件或者新建文件(如果不存在的话)
self.xlApp = win32com.client.Dispatch('Excel.Application')
if fileName:
self.fileName = fileName
self.xlBook = self.xlApp.Workbooks.Open(fileName)
print('open {} succeed!'.format(fileName))
else:
self.xlBook = self.xlApp.Workbooks.Add()
self.fileName = ''
def save(self, new_filename=None): # save excel file
'''保存文件'''
if new_filename:
self.fileName = new_filename
self.xlBook.SaveAs(new_filename)
else:
self.xlBook.Save()
def close(self): # close excel file
'''关闭文件'''
self.xlBook.Close(SaveChanges=0)
del self.xlApp
def add_sheet(self, sheetname=None):
'''添加工作表'''
sheet = self.xlBook.Sheets.Add()
if sheetname:
sheet.Name = sheetname
def copy_sheet(self, srcsheet, destsheet=None, before=None): # copy sheet
'''复制工作表'''
sht = self.xlBook.WorkSheets(srcsheet)
if before is None: # 在指定工作表的后面插入新的工作表
sht.Copy(None, sht)
new_sheet = sht.Next
if destsheet is not None:
new_sheet.Name = destsheet
else: # 在指定工作表的前面插入新的工作表
sht.Copy(before, None)
if destsheet is not None:
index = before.Index - 1 # 所获新插入的工作表所在的位置(工作表的index从1开始)
self.xlBook.Sheets(index).Name = destsheet
def delete_sheet(self, sheet):
'''删除工作表'''
try:
sheet = self.xlBook.WorkSheets(sheet)
if sheet is not None:
sheet.Delete()
except Exception as e:
print(e)
def get_rows(self, sheet):
'''获取有效使用行数'''
try:
sht = self.xlBook.WorkSheets(sheet)
rows = sht.UsedRange.Rows.Count
return rows
except Exception as e:
print(e)
print("no sheet named {}".format(sheet))
def get_cols(self, sheet):
'''获取有效使用列数'''
try:
sht = self.xlBook.WorkSheets(sheet)
cols = sht.UsedRange.Columns.Count
return cols
except Exception as e:
print(e)
print("no sheet named {}".format(sheet))
def read_cell(self, sheet, row, col): # get data (row,col) from sheet
'''读单元格数据'''
try:
sht = self.xlBook.WorkSheets(sheet)
return sht.Cells(row, col).Value
except Exception as e:
print(e)
def write_cell(self, sheet, row, col, value): # set date (row ,col) value to sheet
'''写单元格数据'''
try:
sht = self.xlBook.WorkSheets(sheet)
sht.Cells(row, col).Value = value
except Exception as e:
print(e)
def get_range(self, sheet, row1, col1, row2, col2): # 获得一块区域的数据,返回为一个二维元组
'''获取某一区域的数据'''
try:
sht = self.xlBook.WorkSheets(sheet)
return sht.Range(sht.Cells(row1, col1), sht.Cells(row2, col2)).Value
except Exception as e:
print(e)
def copy_range(self, sheet, row1, col1, row2, col2, tgt_row, tgt_col):
'''复制一块区域'''
try:
sht = self.xlBook.WorkSheets(sheet)
src_range = sht.Range(sht.Cells(row1, col1), sht.Cells(row2, col2))
dest_range = sht.Range(sht.Cells(tgt_row, tgt_col),
sht.Cells(tgt_row + row2 - row1, tgt_col + col2 - col1))
src_range.Copy(dest_range)
except Exception as e:
print(e)
def cut_range(self, sheet, row1, col1, row2, col2, tgt_row, tgt_col):
'''移动一块区域'''
try:
sht = self.xlBook.WorkSheets(sheet)
src_range = sht.Range(sht.Cells(row1, col1), sht.Cells(row2, col2))
src_range.Cut(sht.Cells(tgt_row, tgt_col))
except Exception as e:
print(e)
def clear_range(self, sheet, row1, col1, row2, col2):
'''清除指定区域内容'''
try:
sht = self.xlBook.WorkSheets(sheet)
src_range = sht.Range(sht.Cells(row1, col1), sht.Cells(row2, col2))
src_range.Clear()
except Exception as e:
print(e)
def add_picture(self, sheet, pic_name, left, top, width, height): # add a picture to sheet
'''添加图片'''
try:
sht = self.xlBook.WorkSheets(sheet)
sht.Shapes.AddPicture(pic_name, 4, 3, left, top, width, height)
except Exception as e:
print(e)
def del_row(self, sheet, row):
'''删除指定行'''
if row < 1 or row > self.get_rows(sheet):
print('out of range!!')
return
del_row = self.xlBook.WorkSheets(sheet).Rows(row)
del_row.Delete()
def del_col(self, sheet, col):
'''删除指定列'''
if col < 1 or col > self.get_cols(sheet):
print('out of range!!')
return
del_col = self.xlBook.WorkSheets(sheet).Columns(col)
del_col.Delete()
if __name__ == '__main__':
xls = handleExcel(r'C:\xxoo\mail.xlsx')
rows = xls.get_rows('Sheet1')
cols = xls.get_cols('Sheet1')
print(rows)
print(cols)
mail_list = []
for i in range(1, rows):
cell_value = xls.read_cell('Sheet1', i, 1)
mail_list.append(cell_value)
print(cell_value)
print(mail_list)
xls.close()
- 完成了
excel
文件的操作后接下来就是如何从excel
文件中加载邮箱地址进行自动发送邮件操作了。这个工具类的强大之处在于当你需要重复的保存海量数据到excel文件中时可以节省大量不必要浪费的时间,比如当你需要将通过爬虫爬取到的海量网页信息保存到excel
文件中,完全可以通过这个工具类结合python实现数据的自动写入、读取和保存
@ 自动发送邮件功能
-
自动定时发送邮件功能请参见之前的一篇文章《Python3实现自动定时发送邮件功能》里面有介绍如何利用QQ邮箱自动给指定邮箱地址定时定点发送邮件。但之前的那篇文章中没有操作
爽excel
的功能,下面给出一个带有刚刚介绍到的excel
操作类的参考栗子,该栗子中是通过多线程实现同时向500个不同邮箱地址点对点单一发送相同的带有附件的邮件,500封邮件发完大概耗时几分钟,重点是你啥都不用干,只需在终端下执行python auto_send_email.py
这句命令就行了,你说爽不爽?!
-
要是换成人工操作那不得累死,500封相同内容,还只能一个一个的发,光复制邮件内容都已经让人手抽筋了。更何况那位网友说的5000封邮件,想想都可怕!!我就不傻逼逼的搞个5000个邮箱地址来测试下了,要是有谁蛋疼的狠可以试试哈!!
惊恐 -
好了,不瞎扯淡了,准备睡觉,建议大家早点睡觉,毕竟,狗命要紧!!!哈哈哈....
睡觉
-
附上栗子参考代码如下:
#!/usr/bin/env python
# -*-coding:utf-8-*-
import threading
from auto_send_mail.handle_excel import handleExcel
__author__ = 'SamWoo'
import mimetypes
import os
import smtplib
import time
from email import encoders
from email.header import Header
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import parseaddr, formataddr
from os.path import getsize
class EmailManager:
def __init__(self, **kwargs):
'''
constructor
:param kwargs:Variable paramete
'''
self.kwargs = kwargs
self.smtp_server = 'smtp.qq.com'
self.MAX_FILE_SIZE = 10 * 1024 * 1024
def __get_cfg(self, key, throw=True):
'''
get the configuration file based on the key
:param key:
:param throw:
:return:
'''
cfg = self.kwargs.get(key)
if throw == True and (cfg is None or cfg == ''):
raise Exception("The configuration can't be empty", 'utf-8')
return cfg
def __init_cfg(self):
self.msg_from = self.__get_cfg('msg_from')
self.password = self.__get_cfg('password')
self.msg_to = ';'.join(self.__get_cfg('msg_to'))
self.msg_subject = self.__get_cfg('msg_subject')
self.msg_content = self.__get_cfg('msg_content')
self.msg_date = self.__get_cfg('msg_date')
# attachment
self.attach_file = self.__get_cfg('attach_file', throw=False)
def login_server(self):
'''
login server
:return:
'''
server = smtplib.SMTP_SSL(self.smtp_server, 465)
server.set_debuglevel(1)
server.login(self.msg_from, self.password)
return server
def get_main_msg(self):
'''
suject content
:return:
'''
msg = MIMEMultipart()
# message content
msg.attach(MIMEText(self.msg_content, 'plain', 'utf-8'))
msg['From'] = self._format_addr('Sam <%s>' % self.msg_from)
msg['To'] = self._format_addr('To <%s>' % self.msg_to)
msg['Subject'] = Header(self.msg_subject, 'utf-8')
msg['Date'] = self.msg_date
# attachment content
attach_file = self.get_attach_file()
if attach_file is not None:
msg.attach(attach_file)
return msg
def get_attach_file(self):
'''
generate mail attachment content
:return:
'''
if self.attach_file is not None and self.attach_file != '':
try:
if getsize(self.attach_file) > self.MAX_FILE_SIZE:
raise Exception('The attachment is too large and the upload failed!!')
with open(self.attach_file, 'rb') as file:
ctype, encoding = mimetypes.guess_type(self.attach_file)
if ctype is None or encoding is not None:
ctype = 'application/octet-stream'
maintype, subtype = ctype.split('/', 1)
mime = MIMEBase(maintype, subtype)
mime.set_payload(file.read())
# set header
mime.add_header('Content-Disposition', 'attachment',
filename=os.path.basename(self.attach_file))
mime.add_header('Content-ID', '<0>')
mime.add_header('X-Attachment-Id', '0')
# set the attachment encoding rules
encoders.encode_base64(mime)
return mime
except Exception as e:
print('%s......' % e)
return None
else:
return None
def _format_addr(self, s):
name, addr = parseaddr(s)
return formataddr((Header(name, 'utf-8').encode(), addr))
def send(self):
try:
# initialize the configuration file
self.__init_cfg()
# log on to the SMTP server and verify authorization
server = self.login_server()
# mail content
msg = self.get_main_msg()
# send mail
server.sendmail(self.msg_from, self.msg_to, msg.as_string())
server.quit()
print("Send succeed!!")
except smtplib.SMTPException:
print("Error:Can't send this email!!")
def get_mail_address():
global mail_list, i
xls = handleExcel(r'C:\xxoo\mail.xlsx')
rows = xls.get_rows('Sheet1')
mail_list = []
for i in range(1, rows+1):
cell_value = xls.read_cell('Sheet1', i, 1)
mail_list.append(cell_value)
print(cell_value)
print(mail_list)
xls.close()
return mail_list
def send_email(mail_list):
global manager
mail_cfg = {'msg_from': 'xxxxx@qq.com',
'password': 'xxxxx',
'msg_to': ['xxxx@qq.com'],
'msg_subject': 'Python Auto Send Email Test',
'msg_content': 'Hi, boy! Just do it, Python!',
'attach_file': r'.\font.zip',
'msg_date': time.ctime()
}
for mail in mail_list:
mail_cfg['msg_to'] = [mail]
manager = EmailManager(**mail_cfg)
print('Now send email to {}'.format(mail_cfg['msg_to']))
manager.send()
if __name__ == "__main__":
mail_list = get_mail_address()
num = len(mail_list) // 4
list1 = mail_list[0:num]
list2 = mail_list[num:num * 2]
list3 = mail_list[num * 2:num * 3]
list4 = mail_list[num * 3:]
#开启4条线程分批发送500封相同邮件
threading.Thread(target=send_email, args=(list1,)).start()
threading.Thread(target=send_email, args=(list2,)).start()
threading.Thread(target=send_email, args=(list3,)).start()
threading.Thread(target=send_email, args=(list4,)).start()
# 1.every 5 min to send a email
# time_intvl = 5 * 60
# start_time = int(time.time())
# print(start_time)
# while True:
# end_time = int(time.time())
# cost_time = end_time - start_time
# # print(cost_time)
# if cost_time == time_intvl:
# manager.send()
# start_time = end_time
# print('regular send email....%s' % time.ctime(start_time))
# else:
# pass
# 2.every day 13:10:00 send a email to me
# regular_hour = 13
# regular_min = 10
# regular_sec = 00
# while True:
# current_time = time.localtime(time.time())
# # print(current_time.tm_min)
# if (current_time.tm_hour == regular_hour) \
# and (current_time.tm_min == regular_min) \
# and (current_time.tm_sec == regular_sec):
# manager.send()
# print('send a email at 13:10:00 every day....')
# else:
# pass
-
PS:镇楼美女的图片在哪找的?请参见《Python3+requests_html+selenium爬取妹子图》这篇文章,保证不让你失望,不客气哈,不过要注意营养,你懂的!
网友评论