序号 | 作用 |
---|---|
1 | 获取参数类型 |
2 | 控制台打印 |
3 | 遍历数组 |
4 | 定义变量并引用 |
5 | 定义方法 |
6 | 获取时间 |
7 | 连接数据库查询、添加 |
8 | 爬虫 BeautifulSoup库 |
9 | 三元表达式 |
10 | 获取uuid |
11 | 字符串替换,去空等 |
12 | 定时触发器 |
1) 获取参数类型 |
type(参数名) |
print(type(param1))
2)控制台打印 |
print(打印内容) |
print("This is a test")
3) 遍历数组 |
for i in range(0,len(a)) |
for i in range(0,len(a)):
print(a[i].get("href"))
4) 定义变量并引用 |
声明引用变量,直接引用变量|
A定义变量host,port。注意变量类型,注意“()”
host = ("127.0.0.1")#url
port = (3306)#端口号
#封装到类里,方便继承
class LoggerConfig():
logger_name = 'python_api'
logger_level = 'DEBUG'
logger_file = 'log_my.txt'
B引用变量A
from reptile import Config
this_host=Config.host
this_port=Config.port
5) 定义方法 |
声明引用方法,直接引用方法|
A定义变量test,test1。**注意参数类型
def test(): print("This is a test")
def test1(name): print("This is a test by name:"+name)
B引用方法A
from methodSet import test, test1
test()
test1("123")
6) 获取时间 |
import time
date = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
import datetime
now = datetime.datetime.now()
ts = now.strftime('%Y-%m-%d %H:%M:%S')
7) 连接数据库查询、添加 |
import pymysql
from reptile import Config
#连接串
sql_getTimes_baidu_hot_main = "select id from baidu_hot_main where address=%s and `describe` =%s"
sql_insert_baidu_hot_main = '''insert into baidu_hot_main(id,address,`position`,times,header,`date`,`describe`) values(%s,%s,%s,%s,%s,%s,%s)'''
sql_insert_baidu_hot_content_batch = '''insert into baidu_hot_content (id,main_id,top,title,href,content,type,img) values(%s,%s,%s,%s,%s,%s,%s,%s)'''
#查询当前访问次数
def getTimes(address,describe):
try:
#连接数据库
db= pymysql.connect(
host=Config.host,#url
port=Config.port,#端口号
user=Config.user,#数据库用户
password=Config.password,#数据库密码
database=Config.database#要连接的数据库名称
)
cursor = db.cursor()
#执行sql语句
sum = cursor.execute(sql_getTimes_baidu_hot_main, (address,describe))
print('describe='+describe+';;;;address='+address)
#提交事务
db.commit()
print('查询次数成功')
except Exception as e:
print(e)
#如果出现异常,回滚
db.rollback()
print('查询次数失败')
finally:
#关闭数据库连接
db.close()
return sum
#插入表
def setMain(id,address,position,times,header,describe,date):
try:
#连接数据库
db= pymysql.connect(
host=Config.host,#url
port=Config.port,#端口号
user=Config.user,#数据库用户
password=Config.password,#数据库密码
database=Config.database#要连接的数据库名称
)
cursor = db.cursor()
#执行sql语句
cursor.execute(sql_insert_baidu_hot_main, (id,address,position,times,header,date,describe))
#提交事务
db.commit()
print('插入成功')
except Exception as e:
print(e)
#如果出现异常,回滚
db.rollback()
print('插入失败')
finally:
#关闭数据库连接
db.close()
return 1
#批量插入表
def setBatchContent(dataList):
#连接数据库
db= pymysql.connect(
host=Config.host,#url
port=Config.port,#端口号
user=Config.user,#数据库用户
password=Config.password,#数据库密码
database=Config.database#要连接的数据库名称
)
#连接串
cursor = db.cursor()
try:
#执行sql语句
cursor.executemany(sql_insert_baidu_hot_content_batch,dataList)
#提交事务
db.commit()
print('插入成功')
except Exception as e:
print(e)
#如果出现异常,回滚
db.rollback()
print('插入失败')
finally:
#关闭数据库连接
db.close()
return 1
数据表字段词与python方法名冲突需要转译+“``”
8) 爬虫 BeautifulSoup库 |
使用爬虫爬取网页指定内容汇总 |
#引入库:
import requests
from bs4 import BeautifulSoup
#爬虫方法
#配置网页信息2
url1 = 'https://top.baidu.com/board?tab=realtime'
headers1={'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36'}
cookies1 = ''
params1=""
contentA1 = "#sanRoot > main > div.container.right-container_2EFJr > div > div:nth-child(2) > div "
response = requests.get(url,headers=headers)
response.encoding = 'utf-8'
#定义选择器
soup = BeautifulSoup(response.text, 'html.parser')
a = soup.select(contentA)
for i in range(0,len(a)):
print(a[i])
爬取每条打印值如下:
<div class="category-wrap_iQLoo horizontal_1eKyQ">
<a class="img-wrapper_29V76" href="https://www.baidu.com/s?wd=%E9%AB%98%E6%A0%A1%E5%AD%A6%E7%94%9F%E6%AF%95%E4%B8%9A13%E5%B9%B4%E5%90%8E%E8%A2%AB%E6%92%A4%E9%94%80%E6%AF%95%E4%B8%9A%E8%AF%81&sa=fyb_news&rsv_dl=fyb_news" target="_blank">
<div class="index_1Ew5p c-index-bg25"> 25 </div> <img alt="" src="https://fyb-1.cdn.bcebos.com/hotboard_img/580a41001f1fbe34f2ce116bafa56f4c?x-bce-process=image/resize,m_fill,w_256,h_170"/>
<div class="border_3WfEn"></div>
</a>
<div class="trend_2RttY hide-icon"> <div class="img-wrap_JPOmE trend-icon_1Z3Cd">
<img src="//fyb-pc-static.cdn.bcebos.com/static/asset/icon-same_886375f242bd1538af21a9721f16b170.png"/> </div>
<div class="hot-index_1Bl1a"> 2584879 </div> <div class="text_1lUwZ"> 热搜指数 </div>
</div>
<img class="line_3-bzA" src="//fyb-pc-static.cdn.bcebos.com/static/asset/line-bg@2x_95cb5a089159c6d5a959a596d460d60a.png"/>
<div class="content_1YWBm">
<a class="title_dIF3B" href="https://www.baidu.com/s?wd=%E9%AB%98%E6%A0%A1%E5%AD%A6%E7%94%9F%E6%AF%95%E4%B8%9A13%E5%B9%B4%E5%90%8E%E8%A2%AB%E6%92%A4%E9%94%80%E6%AF%95%E4%B8%9A%E8%AF%81&sa=fyb_news&rsv_dl=fyb_news" target="_blank">
<div class="c-single-text-ellipsis"> 高校学生毕业13年后被撤销毕业证 </div> <div class="c-text hot-tag_1G080"> </div>
</a>
<!--s-frag-->
<div class="hot-desc_1m_jR small_Uvkd3 ellipsis_DupbZ">
近日,西安工业大学发布一则《关于撤销陈华良毕业证书的公告》。公告显示,毕业13年的陈华良因违反相关规定,非法取得学籍被撤...
<a class="look-more_3oNWC" href="https://www.baidu.com/s?wd=%E9%AB%98%E6%A0%A1%E5%AD%A6%E7%94%9F%E6%AF%95%E4%B8%9A13%E5%B9%B4%E5%90%8E%E8%A2%AB%E6%92%A4%E9%94%80%E6%AF%95%E4%B8%9A%E8%AF%81&sa=fyb_news&rsv_dl=fyb_news" target="_blank">查看更多></a>
</div>
<div class="hot-desc_1m_jR large_nSuFU"> 近日,西安工业大学发布一则《关于撤销陈华良毕业证书的公告》。公告显示,毕业13年的陈华良因违反相关规定,非法取得学籍被撤销毕业证。
<a class="look-more_3oNWC" href="https://www.baidu.com/s?wd=%E9%AB%98%E6%A0%A1%E5%AD%A6%E7%94%9F%E6%AF%95%E4%B8%9A13%E5%B9%B4%E5%90%8E%E8%A2%AB%E6%92%A4%E9%94%80%E6%AF%95%E4%B8%9A%E8%AF%81&sa=fyb_news&rsv_dl=fyb_news" target="_blank">查看更多></a>
</div> <!--/s-frag--> </div> </div>
汇总方法:
#获取当前列的a标签下的href
a[i].find("a").get("href")
#获取当前列的a标签下的div文本内容
a[i].find("a").find("div").text
#获取当前列的a标签下的img的图片链接
a[i].find("a").find("img").get("src")
#获取当前列所有的div标签内容
a[i].find_all('div')
#获取当前列所有的div标签内容的第1条
a[i].find_all('div')[0]
#获取当前列class为hot-index_1Bl1a的div标签内容
a[i].find(name="div",attrs={"class":"hot-index_1Bl1a"})
#获取当前列id为this_id的div标签内容
a[i].find(name="div",attrs={"id":"this_id"})
9) 三元表达式 |
("C1","C2")[boolean] |
top = ("0",index)[len(index) !=0 ]
10) 获取uuid |
uuid.uuid1() |uuid.uuid2()|uuid.uuid3()|uuid.uuid4()
import uuid
id2 = uuid.uuid1()
11) 字符串替换,去空等 |
.replace("",'')|.strip()|
#替换“查看更多>”为空
xxx.replace("查看更多>",'')
#首尾去空格
xxx.strip()
12) 定时触发器 |
sched定时模块|APScheduler定时框架|
sched定时模块:
启动后,间隔指定时间再次执行。
import time
import sched
from reptile.controller.WebHandleController import runRepitle
from datetime import date, datetime
def time_printer():
print("定时任务启动==="+datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# 注意 sched 模块不是循环的,一次调度被执行后就 Over 了,如果想再执行,请再次 enter
loop_monitor()
def loop_monitor():
s = sched.scheduler(time.time, time.sleep) # 生成调度器
'''
schedule.enter(delay, priority, action, arguments)
其中:
delay:延迟执行任务的时间,为0表示立即执行任务
priority:执行任务的优先级,0为最大,1234依次降低
action:执行任务的函数名,这里是printTime
arguments:执行任务的函数参数,格式为 (arg1,arg2,...,) 最后一定要有逗号,没有参数就是()
'''
s.enter(60*60*1, 0, time_printer,())
s.run()
# 启动位置
if __name__ == "__main__":
loop_monitor()
APScheduler定时框架:
APScheduler是一个 Python 定时任务框架,提供了基于日期、固定时间间隔以及 crontab 类型的任务,并且可以持久化任务、并以 daemon 方式运行应用。
安装:
$ pip install apscheduler
APScheduler 四个组件分别为:触发器(trigger),作业存储(job store),执行器(executor),调度器(scheduler)。
- run_date: 在某天执行任务
- timezone: 在某段时间执行任务
- interval: 固定时间间隔触发:
weeks: 每隔几周执行一次 | weeks=0
days: 每隔几天执行一次 | days=0
hours: 每隔几小时执行一次 | hours=0
minutes: 每隔几分执行一次 | minutes=0
seconds: 每隔几秒执行一次 | seconds=0
start_date: 最早执行时间 | start_date=None
end_date: 最晚执行时间 | end_date=None
timezone: 执行时间区间 | timezone=Nonecron: 在特定时间周期性地触发:
year: 4位数字
month: 月 (1-12)
day: 天 (1-31)
week: 标准周 (1-53)
day_of_week: 周中某天 (0-6 or mon,tue,wed,thu,fri,sat,sun)
hour: 小时 (0-23)
minute:分钟 (0-59)
second: 秒 (0-59)
start_date: 最早执行时间
end_date: 最晚执行时间
timezone: 执行时间区间
image.png
具体实现:
from datetime import date, datetime
from apscheduler.schedulers.blocking import BlockingScheduler
def my_job(text):
print(text)
sched = BlockingScheduler()
# 1-1指定时间点执行一次
# sched.add_job(my_job, 'date', run_date=datetime(2022, 7, 27, 13, 58, 00), args=['text'])
# 1-2指定时间点执行一次
# sched.add_job(my_job, 'date', run_date='2009-11-06 16:30:05', args=['text'])
# 2直接执行一次
# sched.add_job(my_job, args=['text'])
# 3从启动开始,每两小时执行一次
# sched.add_job(my_job, 'interval', hours=2)
# 4从2022-07-27 09:30:00到2022-08-15 12:00:00每4h执行一次
sched.add_job(my_job, 'interval', hours=2, start_date='2022-07-27 09:51:00', end_date='2022-08-15 12:00:00',args=[datetime.now().strftime("%Y-%m-%d %H:%M:%S")+'爬取开始...'])
# 5of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00
#sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# 6Runs from Monday to Friday at 5:30 (am) until 2014-05-30 00:00:00
#sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')
sched.start()
网友评论