美文网首页不务正业
微博关键词监控器

微博关键词监控器

作者: 要薯条不要辣翅 | 来源:发表于2020-04-02 17:35 被阅读0次

    微博关键词监控器

    2020年4月2日阴雨连绵的下午,被疫情所困无法返校开学的我百无聊赖,突然想起之前抖音上刷到一个霓虹国程序员做了一个分手灯,就是一个监控社交网络,有人说分手就会亮一下的灯,我感觉十分有趣,于是决定自己也做一个。但是用计算机控制灯比较麻烦,比较之下我选择了较为方便的手机短信,于是一个沙雕脚本诞生了。

    这个沙雕脚本使用爬虫技术监控微博的动态,当搜索关键词有新结果时便会以短信的方式予以通知,而且可以自行定义通知间隔,可以说是非常人(没)性(有)化(用)。

    下面我将分模块介绍一下这个只有一百余行的python脚本。

    短信模块

    短信我们使用https://www.twilio.com/提供的服务,他们家是有免费试用的,有钱的老爷们也可以使用其他供应商的服务,应该用起来都类似。首先在网站注册账号,然后绑定手机,将密钥信息填进API里就可以调用起来了,十分简洁方便。

    # 配置
    auth_token = '**********************'
    account_sid = '***********************'
    client = Client(account_sid, auth_token)
    
    # 然后调用client.messages.create就可以了,下边有写成一个工具函数
    

    主程序部分

    import copy # 深拷贝
    import time # 计时器
    import requests # 众所周知这是网络通讯库,爬虫都爱他
    import re # 正则表达式
    import json # 这个包熟悉网络的同学应该也很熟悉,很多的通讯都采用了json的形式(虽然现在在逐步被淘汰),这个包便是构造和处理json的
    from requests import get # http GET方法
    from twilio.rest import Client # 发短信用的
    
    def main():
        """
        开启任务循环
        :return: None
        """
        last_pool = [[], []] # 由于微博的时间记录不是精确性的,而是“刚刚”、“1分钟前”这种,这个列表用来记录已经通知过的微博id,配合时间戳可以区分出新微博是否需要被通知
        while True:
            print("\nnew detect launched")
            new_pool = renew(last_pool) # 主方法,刷新一次,通知新微博,并返回新微博id列表
            last_pool = last_pool[1:] # 弹出时间比较久的微博,这部分已经可以用时间戳予以区分
            last_pool.append(new_pool) # 将新的列表添加进已通知池
            time.sleep(30) # 暂停30s
    
    
    if __name__ == '__main__':
        kw = input("Please input the keyword you wanna overwatch: ")
        headers = {
            'user-agent':
                'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/534.57.2 (KHTML, like Gecko) Version/5.1.7 Safari/534.57.2'
        }
        url = 'https://m.weibo.cn/api/container/getIndex' # m.weibo.cn是手机浏览器版本的微博,从这个来源比较容易爬,这个URL是搜索的接口
        main() # 执行主函数
    

    构建待访问池

    def get_page():
        # 先用requests构造请求,解析出关键词搜索出来的微博总页数
        data_list = [] # @return,每次连接的具体参数
        data = {
            'containerid': '100103type=61&q={}'.format(kw), # 排序方式和搜索关键词
            'page_type': 'searchall'} # 搜索方式
        resp = requests.get(url=url, headers=headers, params=data)
        total_page = resp.json()['data']['cardlistInfo']['total']  # 微博总数
        # 一页有10条微博,用总数对10整除,余数为0则页码为总数/10,余数不为0则页码为(总数/10)+1
        if total_page % 10 == 0:
            page_num = int(total_page / 10)
        else:
            page_num = int(total_page / 10) + 1
        # 页码为1,data为当前data,页码不为1,通过for循环构建每一页的data参数
        if page_num == 1:
            data_list.append(data)
            return data_list
        else:
            for i in range(1, page_num + 1):
                data['page'] = i
                data_list.append(copy.deepcopy(data))
            return data_list
    

    工具函数

    def check_time(id, last_pool, time): # 检查是否被通知过
        if time != "刚刚": # 我们只通知最近发出的微博(30s间隔刷新),所以所有不是“刚刚”的微博都不需要通知
            return False
        for pool in last_pool: # 已经通知过的不用通知
            if id in pool:
                return False
        return True
    
    
    def sent_message(text, phone_number):
        mes = client.messages.create(
            from_='*********',  # 填写在active number处获得的号码
            body=text,
            to=phone_number
        )
        print("msg sent: {}".format(mes.sid))
    

    更新

    def renew(last_pool):
        data_list = get_page()  # 接收data参数列表
        pool = [] # 本次连接的已访问池
        for data in data_list:
            with get(url=url, headers=headers, params=data) as resp:  # 携带参数发送请求
                text = resp.text
                text_dict = json.loads(text)['data']['cards'] # 微博页面上的数据是以卡片的形式组织的
                parse_dict = {} # 解析出的数据放在这个字典里边
                for card in text_dict:
                    if card['card_type'] == 9: 
                      # 只有这个类型的卡片是记录微博数据的,其他的设计渲染、布局等不用关心的东西
                        scheme = card['scheme']
                        # 卡片中提取信息
                        if card['mblog']['isLongText'] is False:
                            text = card['mblog']['text']
                            text = re.sub(r'<.*?>|\n+', '', text)
                        else:
                            text = card['mblog']['longText']['longTextContent']
                        user = card['mblog']['user']['profile_url']
                        id = card['mblog']['id']
                        time = card['mblog']['created_at']
                        # 检查此条微博是否已经被通知
                        if not check_time(id, last_pool, time) or id in pool:
                            text = "{} new wblog published recently\n".format(len(pool))
                            print(text)
                            if len(pool) > 0:
                                sent_message(text, "+86***********") # 调用发信息方法
                            return pool
                        parse_dict['url'] = scheme
                        parse_dict['text'] = text
                        parse_dict['author'] = user
                        parse_dict['time'] = time
                        pool.append(id)
                # print('new weibo detected:\n'
                #       'author home page: {}\n'
                #       'content: {}\n'
                #       'time: {}\n'.format(parse_dict['author'], parse_dict['text'], parse_dict['time']))
        text = "{} new wblog published recently\n".format(len(pool))
        print(text)
        if len(pool) > 0:
            sent_message(text, "+86***********")
        return pool
    

    完整代码

    import copy
    import time
    import requests
    import re
    import json
    from requests import get
    from twilio.rest import Client
    
    auth_token = '**************************'
    account_sid = '************************'
    client = Client(account_sid, auth_token)
    
    
    def get_page():
        """
        先用requests构造请求,解析出关键词搜索出来的微博总页数
        :return: 返回每次请求需要的data参数
        """
        data_list = []
        data = {
            'containerid': '100103type=61&q={}'.format(kw),
            'page_type': 'searchall'}
        resp = requests.get(url=url, headers=headers, params=data)
        total_page = resp.json()['data']['cardlistInfo']['total']  # 微博总数
        # 一页有10条微博,用总数对10整除,余数为0则页码为总数/10,余数不为0则页码为(总数/10)+1
        if total_page % 10 == 0:
            page_num = int(total_page / 10)
        else:
            page_num = int(total_page / 10) + 1
        # 页码为1,data为当前data,页码不为1,通过for循环构建每一页的data参数
        if page_num == 1:
            data_list.append(data)
            return data_list
        else:
            for i in range(1, page_num + 1):
                data['page'] = i
                data_list.append(copy.deepcopy(data))
            return data_list
    
    
    def check_time(id, last_pool, time):
        if time != "刚刚":
            return False
        for pool in last_pool:
            if id in pool:
                return False
        return True
    
    
    def sent_message(text, phone_number):
        mes = client.messages.create(
            from_='+**********',  # 填写在active number处获得的号码
            body=text,
            to=phone_number
        )
        print("msg sent: {}".format(mes.sid))
    
    
    def renew(last_pool):
        data_list = get_page()  # 接收data参数列表
        pool = []
        for data in data_list:
            with get(url=url, headers=headers, params=data) as resp:  # 携带参数发送请求
                text = resp.text  # await 等待知道获取完整数据
                text_dict = json.loads(text)['data']['cards']
                parse_dict = {}
                for card in text_dict:
                    if card['card_type'] == 9:
                        scheme = card['scheme']
                        if card['mblog']['isLongText'] is False:
                            text = card['mblog']['text']
                            text = re.sub(r'<.*?>|\n+', '', text)
                        else:
                            text = card['mblog']['longText']['longTextContent']
                        user = card['mblog']['user']['profile_url']
                        id = card['mblog']['id']
                        time = card['mblog']['created_at']
                        if not check_time(id, last_pool, time) or id in pool:
                            text = "{} new wblog published recently\n".format(len(pool))
                            print(text)
                            if len(pool) > 0:
                                sent_message(text, "+86*********")
                            return pool
                        parse_dict['url'] = scheme
                        parse_dict['text'] = text
                        parse_dict['author'] = user
                        parse_dict['time'] = time
                        pool.append(id)
                # print('new weibo detected:\n'
                #       'author home page: {}\n'
                #       'content: {}\n'
                #       'time: {}\n'.format(parse_dict['author'], parse_dict['text'], parse_dict['time']))
        text = "{} new wblog published recently\n".format(len(pool))
        print(text)
        if len(pool) > 0:
            sent_message(text, "+86***********")
        return pool
    
    
    def main():
        """
        开启任务循环
        :return: None
        """
        last_pool = [[], []]
        while True:
            print("\nnew detect launched")
            new_pool = renew(last_pool)
            last_pool = last_pool[1:]
            last_pool.append(new_pool)
            time.sleep(30)
    
    
    if __name__ == '__main__':
        kw = input("Please input the keyword you wanna overwatch: ")
        headers = {
            'user-agent':
                'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/534.57.2 (KHTML, like Gecko) Version/5.1.7 Safari/534.57.2'
        }
        url = 'https://m.weibo.cn/api/container/getIndex'
        main()
    

    相关文章

      网友评论

        本文标题:微博关键词监控器

        本文链接:https://www.haomeiwen.com/subject/jatfphtx.html