美文网首页
大家来找茬

大家来找茬

作者: 是东东 | 来源:发表于2021-12-18 00:41 被阅读0次

twitter_spider.py

import json
import time
import datetime
import langid
import schedule
from selenium import webdriver
from lxml import etree
import getpass
from static_func import pubilished_time, norm_date, get_logdata, replaces, get_date, write_logdata, old_data_path, \
    write_crawl_time, isdir_exists


class Twitter(object):
    timing, url_p, num, data_urls, _n, m_n = 7, 'https://twitter.com', 0, [], 0, 0

    def __init__(self):
        co = webdriver.ChromeOptions()
        co.add_argument(
            'user-agent=Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.61 Safari/537.36')
        co.add_argument(f'referer={self.url_p}')
        # co.add_argument('--headless')
        co.add_argument('--disable-gpu')
        co.add_argument("--start-maximized")
        co.add_argument('blink-settings=imagesEnabled=false')
        co.add_experimental_option("excludeSwitches", ["enable-automation"])
        co.add_experimental_option('useAutomationExtension', False)
        self.driver = webdriver.Chrome(options=co)
        self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
            "source": "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
        })

    def get(self, url):
        self.driver.get(url)
        time.sleep(self.timing)

    def parse(self, tree, target, timed, log_data, json_path):
        details = tree.xpath('//article')
        for detail in details:
            item, post_details, _a = {}, {}, '//div[@class="css-1dbjc4n r-1d09ksm r-18u37iz r-1wbh5a2"]//a[@title]'
            press_time, data_url = ''.join(detail.xpath(f'.{_a}/time/@datetime')), ''.join(detail.xpath(f'.{_a}/@href'))
            if data_url and press_time:
                if 'http' not in data_url and '://' not in data_url:
                    data_url = self.url_p + data_url
                press_time = press_time[:19].replace('T', ' ')
                pt = norm_date(press_time)
                if pt >= timed:
                    if pt == timed:
                        with open(old_data_path, 'a') as _a:
                            _a.write(f'{datetime.date.today()} {data_url} \n')
                    if data_url not in log_data:
                        content = replaces(' '.join(detail.xpath(
                            './/div[contains(@class,"r-a023e6 r-16dba41 r-ad9z0x r-bcqeeo r-bnwqim r-qvutc0")]//text()'))).replace(
                            '…', '')
                        language = langid.classify(content)[0]
                        if language not in ['en', 'ar']:
                            language = ''
                        citation_url = ''.join(detail.xpath(
                            './/a[@class="css-4rbku5 css-18t94o4 css-1dbjc4n r-1loqt21 r-18u37iz r-16y2uox r-1wtj0ep"]/@href|.//div[contains(@class,"r-a023e6 r-16dba41 r-ad9z0x r-bcqeeo r-bnwqim r-qvutc0")]//a[@title]/@title'))
                        img_url = ''.join(detail.xpath(
                            './/div[@class="css-1dbjc4n r-9x6qib r-t23y2h r-1phboty r-rs99b7 r-156q2ks r-1udh08x"]//img[@class="css-9pa8cd"]/@src'))
                        video_obj = detail.xpath(
                            './/div[@class="css-1dbjc4n r-1p0dtai r-1loqt21 r-1d2f490 r-u8s1d r-zchlnj r-ipm5af"]')
                        video_url = ''
                        if video_obj:
                            video_url = data_url
                        post_obj = detail.xpath(
                            './/div[@class="css-1dbjc4n r-9x6qib r-t23y2h r-rs99b7 r-1loqt21 r-dap0kf r-1ny4l3l r-1udh08x r-o7ynqc r-6416eg"]')
                        if post_obj:
                            publisher = ''.join(post_obj[0].xpath(
                                './/div[@class="css-901oao css-bfa6kz r-1re7ezh r-18u37iz r-1qd0xha r-a023e6 r-16dba41 r-ad9z0x r-bcqeeo r-qvutc0"]/span/text()'))
                            publish_time = ''.join(post_obj[0].xpath('.//time/@datetime'))[:19].replace('T', ' ')
                            publish_content = replaces(' '.join(post_obj[0].xpath(
                                './/div[@class="css-901oao r-hkyrab r-1qd0xha r-a023e6 r-16dba41 r-ad9z0x r-1g94qm0 r-bcqeeo r-bnwqim r-qvutc0"]//text()')).replace(
                                '…', ''))
                            publish_img_url = ''.join(post_obj[0].xpath(
                                './/div[@class="css-1dbjc4n r-1p0dtai r-1mlwlqe r-1d2f490 r-11wrixw r-1udh08x r-u8s1d r-zchlnj r-ipm5af r-417010"]//img/@src'))
                            post_details['post'], post_details['publish_time'], post_details['publish_content'], \
                            post_details['publish_img_url'] = publisher, publish_time, publish_content, publish_img_url
                        if data_url not in self.data_urls:
                            item['username'], item['press_time'], item['data_url'], item['content'], item[
                                'citation_url'], item['img_url'], item['video_url'], item['post_details'], item[
                                'language'] = \
                                target, press_time, data_url, content, citation_url, img_url, video_url, post_details, language
                            with open(json_path, 'a', encoding="utf-8") as write_json:
                                write_json.write(json.dumps(item, ensure_ascii=False) + '\n')
                            self.data_urls.append(data_url)
                            self.num += 1
                            print(item)
                            print(f'获取 {self.num} 条\n')
                else:
                    self._n += 1
                    print(f'{self._n} 已抓取过数据 发布时间{pt} < {timed}')
                    if self._n > 20:
                        break
            else:
                print(f'没有 {data_url} 或 {press_time}')

    def scroll_page(self, url, s_date, target, timed, log_data, json_path):
        self.get(url)
        print('滚动开始')
        break_num = 0
        for i in range(1, 35):
            print(f'滚动次数:{i}')
            source = self.driver.page_source
            tree = etree.HTML(source)
            if tree.xpath(
                    '//div[@class="css-901oao r-1re7ezh r-1qd0xha r-a023e6 r-16dba41 r-ad9z0x r-bcqeeo r-q4m81j r-ey96lj r-qvutc0"]'):
                print(f'{s_date}当月 无内容,抓取下个月\n')
                self.m_n += 1
                break
            if self.m_n > 0:
                break
            self.parse(tree, target, timed, log_data, json_path)  # 解析数据
            self.driver.execute_script(f"window.scrollBy(0,{str(i*1000)})")
            if tree.xpath('//div[@class="css-1dbjc4n r-1bxhq7s"]'):
                break_num += 1
                print(f'break_num: {break_num}')
                if break_num > 3:
                    print(f'超三次无内容,抓取下个月')
                    break
            time.sleep(5)
            if self._n > 20:
                break
        print(f'滚动结束\n')

    def run(self):
        with open('./twitter_name_ar.txt', 'r') as _ar:
            targets = _ar.readlines()
        with open('./twitter_name_en.txt', 'r') as _en:
            en = _en.readlines()
        targets.extend(en)
        for target in targets:  # 循环目标
            target, today = target.replace('\n', ''), datetime.date.today()
            json_path = f'C:/Users/{getpass.getuser()}/Desktop/ljd/json_file/Twitter/twitter_{today}.json'
            isdir_exists(json_path)
            print(f'目标名:{target}')
            timed, log_data = pubilished_time(target), ''.join(get_logdata(target))  # 获取本地文件数据
            self.get(f'{self.url_p}/{target}')
            try:
                _d = self.driver.find_element_by_xpath('//span[contains(text()," 加入")]').text
            except Exception:
                _d = None
            if _d:
                print(f'twitter创建时间:{_d}')
                for s_date, e_date in get_date(_d):  # 获取开始和截止日期
                    print(f'目前在 {s_date} 至 {e_date} 范围抓取')
                    url = f'{self.url_p}/search?f=live&q=(from%3A{target})%20until%3A{e_date}%20since%3A{s_date}%20-filter%3Areplies&src=typed_query'
                    print(f'目标网址:{url}')
                    try:
                        self.scroll_page(url, s_date, target, timed, log_data, json_path)  # 滚动获取数据
                    except Exception as e:
                        pass
                    if self._n > 20:
                        self._n = 0
                        break
                    if self.m_n > 0:
                        self.m_n = 0
                        break
                write_crawl_time(timed, today, target)  # 写入 开始抓取时间
                write_logdata(today, target)
        self.driver.close()
        print(f'任务结束 {datetime.datetime.today()}')


def run_spider():
    T = Twitter()
    T.run()


"""
schedule.every(10).seconds.do(job) # 每10秒执行一次
schedule.every(10).minutes.do(job) # 每10分钟执行一次
schedule.every().hour.do(job) # 每小时执行一次
schedule.every().day.at("10:30").do(job) # 每天十点半执行
schedule.every().monday.do(job) # 每周一执行
schedule.every().wednesday.at("13:15").do(job) # 每周三13点15执行
"""

if __name__ == '__main__':
    n = 0
    if n == 0:
        """运行所有"""
        run_spider()
    elif n == 1:
        """定时"""
        start_time = '00:00'
        print(f'开启时间:{datetime.datetime.today()}\n{__file__.split("/")[-1]}\t每周一下午{start_time}  自动抓取\n')
        schedule.every().monday.at(start_time).do(run_spider)
        while True:
            schedule.run_pending()  # 运行所有可运行的任务
            time.sleep(1)

static_func.py

import datetime
import logging
import os

import dateparser
from dateutil.relativedelta import relativedelta

BASE_DIR = '/'.join(__file__.split('\\')[:-1])
newest_date_path = lambda _t: f'{BASE_DIR}/log/newest_date/{_t}.log'
old_data_path = lambda _t: f'{BASE_DIR}/log/old_data/{_t}.log'


def str_to_date(date):
    """字符串日期格式化为日期类型"""
    return datetime.datetime.strptime(date, '%Y-%m-%d').date()


def norm_date(date):
    """转换各国语言日期,并格式化日期:xxxx-xx-xx"""
    return dateparser.parse(date).date()


def isdir_exists(filepath):
    """判断文件及文件夹是否存在"""
    if not os.path.exists(filepath):
        dirpath = '/'.join(filepath.split('/')[:-1])
        if not os.path.exists(dirpath):
            os.makedirs(dirpath)
        with open(filepath, 'w', encoding='utf-8'):
            logging.info(f'创建文件成功:{filepath}')


def pubilished_time(target):
    path = newest_date_path(target)
    isdir_exists(path)
    with open(path, 'r', encoding='utf-8') as file:
        date = file.read()
    if date:
        date = str_to_date(date)
    else:
        date = str_to_date('1000-01-01')
    return date


def write_crawl_time(timed, today, target):
    with open(newest_date_path(target), 'w', encoding='utf-8') as json_file:
        if timed == str_to_date('1000-01-01'):
            today = today + relativedelta(days=+1)
        json_file.write(str(today))
        logging.info(f'已写入更新日期:{today}')


def get_logdata(target):
    path = old_data_path(target)
    isdir_exists(path)
    with open(path, 'r', encoding='utf-8') as log_file:
        log_file = log_file.readlines()
    if not log_file:
        log_file = ['我没有日志数据']
    # 禁止拼接,因为 write_logdata 函数 需用列表读取每行,通过日期排除冗余数据
    return log_file


def write_logdata(today, target):
    """写入最近一次的当天及明天抓取的url, 减少文件大小"""
    tomorrow = today + relativedelta(days=+1)
    old_data = get_logdata(target)
    with open(old_data_path(target), 'w', encoding="utf-8") as o:
        for data in old_data:
            if data:
                if str(today) in data:
                    o.write(data)
                if str(tomorrow) in data:
                    o.write(data)


def replaces(strs):
    """
    替换所有的特殊符号
    """
    try:
        s = ''.join(strs).replace('...', '').replace('\r', '').replace('\n', '').replace('\t', '').strip()\
            .replace('  ', '').replace('  ', '').replace('  ', '').replace('  ', '').replace('  ', '')
        return s
    except Exception:
        return ''


def get_date(_d):
    dates = []
    create_time = norm_date(str(norm_date(_d.replace('加入', '')))[:7] + '-01')
    today = datetime.date.today()
    while True:
        # 这个月第一天
        now_first = dateparser.parse(str(today)[:7] + '-01')
        # 下个月第一天
        next_first = now_first + relativedelta(months=1)
        # 这个月最后一天
        now_last = next_first - relativedelta(days=1)
        today = today - relativedelta(months=1)
        dates.append([str(now_first)[:10], str(now_last)[:10]])
        if today <= create_time:
            break
    return dates


if __name__ == '__main__':
    print(get_date('2016-05-10'))

twitter_name_ar.txt

Sara_M_Fahad
h_albogami
mohammedal_saud

twitter_name_en.txt

sfrantzman
Mr_Alshammeri
Ben_winkley
FirasMaksad

相关文章

  • 放弃找茬的游戏人生

    生活中很容易陷入“大家来找茬”的游戏状态,找茬生活,发现万事皆可找茬。游戏中找到了”茬“意味着胜利,但是生活中时刻...

  • 黑白配

    欢迎大家来找茬

  • 2020-05-03小小星运

    欢迎大家来找我塔罗哦!

  • 2020-05-04小小星运

    欢迎大家来找我塔罗哦!

  • 月落镜湖风不守,能讽无波美不柔。 若寻瑕疵何无垢?雪漫昆仑愁白头。

  • 聚会回来的路上,车上音乐播放郭富城的《对你爱不完》,听着熟悉的旋律,禁不住手舞足蹈的跟唱;接下来的歌曲是周...

  • 大家来找茬——(有奖励)我的数学不好吗?!

    我的数学不好吗?!——大家来找茬! 祝大家新年快乐!下面是一个大家来找茬的娱乐节目:最快找出的人会有奖励哦。 喜欢...

  • 考试一茬茬考

    考试一茬茬考,学习要一鼓作气地学,以前上学多少感觉是被老师家长逼着学(当然也没有真的逼迫,只是说教),现在长大了,...

  • 割韭菜

    疫情就像割韭菜,一茬一茬又一茬。

网友评论

      本文标题:大家来找茬

      本文链接:https://www.haomeiwen.com/subject/zpbwfrtx.html