微博关键词监控器
2020年4月2日阴雨连绵的下午,被疫情所困无法返校开学的我百无聊赖,突然想起之前抖音上刷到一个霓虹国程序员做了一个分手灯,就是一个监控社交网络,有人说分手就会亮一下的灯,我感觉十分有趣,于是决定自己也做一个。但是用计算机控制灯比较麻烦,比较之下我选择了较为方便的手机短信,于是一个沙雕脚本诞生了。
这个沙雕脚本使用爬虫技术监控微博的动态,当搜索关键词有新结果时便会以短信的方式予以通知,而且可以自行定义通知间隔,可以说是非常人(没)性(有)化(用)。
下面我将分模块介绍一下这个只有一百余行的python脚本。
短信模块
短信我们使用https://www.twilio.com/提供的服务,他们家是有免费试用的,有钱的老爷们也可以使用其他供应商的服务,应该用起来都类似。首先在网站注册账号,然后绑定手机,将密钥信息填进API里就可以调用起来了,十分简洁方便。
# 配置
auth_token = '**********************'
account_sid = '***********************'
client = Client(account_sid, auth_token)
# 然后调用client.messages.create就可以了,下边有写成一个工具函数
主程序部分
import copy # 深拷贝
import time # 计时器
import requests # 众所周知这是网络通讯库,爬虫都爱他
import re # 正则表达式
import json # 这个包熟悉网络的同学应该也很熟悉,很多的通讯都采用了json的形式(虽然现在在逐步被淘汰),这个包便是构造和处理json的
from requests import get # http GET方法
from twilio.rest import Client # 发短信用的
def main():
"""
开启任务循环
:return: None
"""
last_pool = [[], []] # 由于微博的时间记录不是精确性的,而是“刚刚”、“1分钟前”这种,这个列表用来记录已经通知过的微博id,配合时间戳可以区分出新微博是否需要被通知
while True:
print("\nnew detect launched")
new_pool = renew(last_pool) # 主方法,刷新一次,通知新微博,并返回新微博id列表
last_pool = last_pool[1:] # 弹出时间比较久的微博,这部分已经可以用时间戳予以区分
last_pool.append(new_pool) # 将新的列表添加进已通知池
time.sleep(30) # 暂停30s
if __name__ == '__main__':
kw = input("Please input the keyword you wanna overwatch: ")
headers = {
'user-agent':
'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/534.57.2 (KHTML, like Gecko) Version/5.1.7 Safari/534.57.2'
}
url = 'https://m.weibo.cn/api/container/getIndex' # m.weibo.cn是手机浏览器版本的微博,从这个来源比较容易爬,这个URL是搜索的接口
main() # 执行主函数
构建待访问池
def get_page():
# 先用requests构造请求,解析出关键词搜索出来的微博总页数
data_list = [] # @return,每次连接的具体参数
data = {
'containerid': '100103type=61&q={}'.format(kw), # 排序方式和搜索关键词
'page_type': 'searchall'} # 搜索方式
resp = requests.get(url=url, headers=headers, params=data)
total_page = resp.json()['data']['cardlistInfo']['total'] # 微博总数
# 一页有10条微博,用总数对10整除,余数为0则页码为总数/10,余数不为0则页码为(总数/10)+1
if total_page % 10 == 0:
page_num = int(total_page / 10)
else:
page_num = int(total_page / 10) + 1
# 页码为1,data为当前data,页码不为1,通过for循环构建每一页的data参数
if page_num == 1:
data_list.append(data)
return data_list
else:
for i in range(1, page_num + 1):
data['page'] = i
data_list.append(copy.deepcopy(data))
return data_list
工具函数
def check_time(id, last_pool, time): # 检查是否被通知过
if time != "刚刚": # 我们只通知最近发出的微博(30s间隔刷新),所以所有不是“刚刚”的微博都不需要通知
return False
for pool in last_pool: # 已经通知过的不用通知
if id in pool:
return False
return True
def sent_message(text, phone_number):
mes = client.messages.create(
from_='*********', # 填写在active number处获得的号码
body=text,
to=phone_number
)
print("msg sent: {}".format(mes.sid))
更新
def renew(last_pool):
data_list = get_page() # 接收data参数列表
pool = [] # 本次连接的已访问池
for data in data_list:
with get(url=url, headers=headers, params=data) as resp: # 携带参数发送请求
text = resp.text
text_dict = json.loads(text)['data']['cards'] # 微博页面上的数据是以卡片的形式组织的
parse_dict = {} # 解析出的数据放在这个字典里边
for card in text_dict:
if card['card_type'] == 9:
# 只有这个类型的卡片是记录微博数据的,其他的设计渲染、布局等不用关心的东西
scheme = card['scheme']
# 卡片中提取信息
if card['mblog']['isLongText'] is False:
text = card['mblog']['text']
text = re.sub(r'<.*?>|\n+', '', text)
else:
text = card['mblog']['longText']['longTextContent']
user = card['mblog']['user']['profile_url']
id = card['mblog']['id']
time = card['mblog']['created_at']
# 检查此条微博是否已经被通知
if not check_time(id, last_pool, time) or id in pool:
text = "{} new wblog published recently\n".format(len(pool))
print(text)
if len(pool) > 0:
sent_message(text, "+86***********") # 调用发信息方法
return pool
parse_dict['url'] = scheme
parse_dict['text'] = text
parse_dict['author'] = user
parse_dict['time'] = time
pool.append(id)
# print('new weibo detected:\n'
# 'author home page: {}\n'
# 'content: {}\n'
# 'time: {}\n'.format(parse_dict['author'], parse_dict['text'], parse_dict['time']))
text = "{} new wblog published recently\n".format(len(pool))
print(text)
if len(pool) > 0:
sent_message(text, "+86***********")
return pool
完整代码
import copy
import time
import requests
import re
import json
from requests import get
from twilio.rest import Client
auth_token = '**************************'
account_sid = '************************'
client = Client(account_sid, auth_token)
def get_page():
"""
先用requests构造请求,解析出关键词搜索出来的微博总页数
:return: 返回每次请求需要的data参数
"""
data_list = []
data = {
'containerid': '100103type=61&q={}'.format(kw),
'page_type': 'searchall'}
resp = requests.get(url=url, headers=headers, params=data)
total_page = resp.json()['data']['cardlistInfo']['total'] # 微博总数
# 一页有10条微博,用总数对10整除,余数为0则页码为总数/10,余数不为0则页码为(总数/10)+1
if total_page % 10 == 0:
page_num = int(total_page / 10)
else:
page_num = int(total_page / 10) + 1
# 页码为1,data为当前data,页码不为1,通过for循环构建每一页的data参数
if page_num == 1:
data_list.append(data)
return data_list
else:
for i in range(1, page_num + 1):
data['page'] = i
data_list.append(copy.deepcopy(data))
return data_list
def check_time(id, last_pool, time):
if time != "刚刚":
return False
for pool in last_pool:
if id in pool:
return False
return True
def sent_message(text, phone_number):
mes = client.messages.create(
from_='+**********', # 填写在active number处获得的号码
body=text,
to=phone_number
)
print("msg sent: {}".format(mes.sid))
def renew(last_pool):
data_list = get_page() # 接收data参数列表
pool = []
for data in data_list:
with get(url=url, headers=headers, params=data) as resp: # 携带参数发送请求
text = resp.text # await 等待知道获取完整数据
text_dict = json.loads(text)['data']['cards']
parse_dict = {}
for card in text_dict:
if card['card_type'] == 9:
scheme = card['scheme']
if card['mblog']['isLongText'] is False:
text = card['mblog']['text']
text = re.sub(r'<.*?>|\n+', '', text)
else:
text = card['mblog']['longText']['longTextContent']
user = card['mblog']['user']['profile_url']
id = card['mblog']['id']
time = card['mblog']['created_at']
if not check_time(id, last_pool, time) or id in pool:
text = "{} new wblog published recently\n".format(len(pool))
print(text)
if len(pool) > 0:
sent_message(text, "+86*********")
return pool
parse_dict['url'] = scheme
parse_dict['text'] = text
parse_dict['author'] = user
parse_dict['time'] = time
pool.append(id)
# print('new weibo detected:\n'
# 'author home page: {}\n'
# 'content: {}\n'
# 'time: {}\n'.format(parse_dict['author'], parse_dict['text'], parse_dict['time']))
text = "{} new wblog published recently\n".format(len(pool))
print(text)
if len(pool) > 0:
sent_message(text, "+86***********")
return pool
def main():
"""
开启任务循环
:return: None
"""
last_pool = [[], []]
while True:
print("\nnew detect launched")
new_pool = renew(last_pool)
last_pool = last_pool[1:]
last_pool.append(new_pool)
time.sleep(30)
if __name__ == '__main__':
kw = input("Please input the keyword you wanna overwatch: ")
headers = {
'user-agent':
'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/534.57.2 (KHTML, like Gecko) Version/5.1.7 Safari/534.57.2'
}
url = 'https://m.weibo.cn/api/container/getIndex'
main()
网友评论