美文网首页
大家来找茬

大家来找茬

作者: 是东东 | 来源:发表于2021-12-18 00:41 被阅读0次

    twitter_spider.py

    import json
    import time
    import datetime
    import langid
    import schedule
    from selenium import webdriver
    from lxml import etree
    import getpass
    from static_func import pubilished_time, norm_date, get_logdata, replaces, get_date, write_logdata, old_data_path, \
        write_crawl_time, isdir_exists
    
    
    class Twitter(object):
        timing, url_p, num, data_urls, _n, m_n = 7, 'https://twitter.com', 0, [], 0, 0
    
        def __init__(self):
            co = webdriver.ChromeOptions()
            co.add_argument(
                'user-agent=Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.61 Safari/537.36')
            co.add_argument(f'referer={self.url_p}')
            # co.add_argument('--headless')
            co.add_argument('--disable-gpu')
            co.add_argument("--start-maximized")
            co.add_argument('blink-settings=imagesEnabled=false')
            co.add_experimental_option("excludeSwitches", ["enable-automation"])
            co.add_experimental_option('useAutomationExtension', False)
            self.driver = webdriver.Chrome(options=co)
            self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
                "source": "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
            })
    
        def get(self, url):
            self.driver.get(url)
            time.sleep(self.timing)
    
        def parse(self, tree, target, timed, log_data, json_path):
            details = tree.xpath('//article')
            for detail in details:
                item, post_details, _a = {}, {}, '//div[@class="css-1dbjc4n r-1d09ksm r-18u37iz r-1wbh5a2"]//a[@title]'
                press_time, data_url = ''.join(detail.xpath(f'.{_a}/time/@datetime')), ''.join(detail.xpath(f'.{_a}/@href'))
                if data_url and press_time:
                    if 'http' not in data_url and '://' not in data_url:
                        data_url = self.url_p + data_url
                    press_time = press_time[:19].replace('T', ' ')
                    pt = norm_date(press_time)
                    if pt >= timed:
                        if pt == timed:
                            with open(old_data_path, 'a') as _a:
                                _a.write(f'{datetime.date.today()} {data_url} \n')
                        if data_url not in log_data:
                            content = replaces(' '.join(detail.xpath(
                                './/div[contains(@class,"r-a023e6 r-16dba41 r-ad9z0x r-bcqeeo r-bnwqim r-qvutc0")]//text()'))).replace(
                                '…', '')
                            language = langid.classify(content)[0]
                            if language not in ['en', 'ar']:
                                language = ''
                            citation_url = ''.join(detail.xpath(
                                './/a[@class="css-4rbku5 css-18t94o4 css-1dbjc4n r-1loqt21 r-18u37iz r-16y2uox r-1wtj0ep"]/@href|.//div[contains(@class,"r-a023e6 r-16dba41 r-ad9z0x r-bcqeeo r-bnwqim r-qvutc0")]//a[@title]/@title'))
                            img_url = ''.join(detail.xpath(
                                './/div[@class="css-1dbjc4n r-9x6qib r-t23y2h r-1phboty r-rs99b7 r-156q2ks r-1udh08x"]//img[@class="css-9pa8cd"]/@src'))
                            video_obj = detail.xpath(
                                './/div[@class="css-1dbjc4n r-1p0dtai r-1loqt21 r-1d2f490 r-u8s1d r-zchlnj r-ipm5af"]')
                            video_url = ''
                            if video_obj:
                                video_url = data_url
                            post_obj = detail.xpath(
                                './/div[@class="css-1dbjc4n r-9x6qib r-t23y2h r-rs99b7 r-1loqt21 r-dap0kf r-1ny4l3l r-1udh08x r-o7ynqc r-6416eg"]')
                            if post_obj:
                                publisher = ''.join(post_obj[0].xpath(
                                    './/div[@class="css-901oao css-bfa6kz r-1re7ezh r-18u37iz r-1qd0xha r-a023e6 r-16dba41 r-ad9z0x r-bcqeeo r-qvutc0"]/span/text()'))
                                publish_time = ''.join(post_obj[0].xpath('.//time/@datetime'))[:19].replace('T', ' ')
                                publish_content = replaces(' '.join(post_obj[0].xpath(
                                    './/div[@class="css-901oao r-hkyrab r-1qd0xha r-a023e6 r-16dba41 r-ad9z0x r-1g94qm0 r-bcqeeo r-bnwqim r-qvutc0"]//text()')).replace(
                                    '…', ''))
                                publish_img_url = ''.join(post_obj[0].xpath(
                                    './/div[@class="css-1dbjc4n r-1p0dtai r-1mlwlqe r-1d2f490 r-11wrixw r-1udh08x r-u8s1d r-zchlnj r-ipm5af r-417010"]//img/@src'))
                                post_details['post'], post_details['publish_time'], post_details['publish_content'], \
                                post_details['publish_img_url'] = publisher, publish_time, publish_content, publish_img_url
                            if data_url not in self.data_urls:
                                item['username'], item['press_time'], item['data_url'], item['content'], item[
                                    'citation_url'], item['img_url'], item['video_url'], item['post_details'], item[
                                    'language'] = \
                                    target, press_time, data_url, content, citation_url, img_url, video_url, post_details, language
                                with open(json_path, 'a', encoding="utf-8") as write_json:
                                    write_json.write(json.dumps(item, ensure_ascii=False) + '\n')
                                self.data_urls.append(data_url)
                                self.num += 1
                                print(item)
                                print(f'获取 {self.num} 条\n')
                    else:
                        self._n += 1
                        print(f'{self._n} 已抓取过数据 发布时间{pt} < {timed}')
                        if self._n > 20:
                            break
                else:
                    print(f'没有 {data_url} 或 {press_time}')
    
        def scroll_page(self, url, s_date, target, timed, log_data, json_path):
            self.get(url)
            print('滚动开始')
            break_num = 0
            for i in range(1, 35):
                print(f'滚动次数:{i}')
                source = self.driver.page_source
                tree = etree.HTML(source)
                if tree.xpath(
                        '//div[@class="css-901oao r-1re7ezh r-1qd0xha r-a023e6 r-16dba41 r-ad9z0x r-bcqeeo r-q4m81j r-ey96lj r-qvutc0"]'):
                    print(f'{s_date}当月 无内容,抓取下个月\n')
                    self.m_n += 1
                    break
                if self.m_n > 0:
                    break
                self.parse(tree, target, timed, log_data, json_path)  # 解析数据
                self.driver.execute_script(f"window.scrollBy(0,{str(i*1000)})")
                if tree.xpath('//div[@class="css-1dbjc4n r-1bxhq7s"]'):
                    break_num += 1
                    print(f'break_num: {break_num}')
                    if break_num > 3:
                        print(f'超三次无内容,抓取下个月')
                        break
                time.sleep(5)
                if self._n > 20:
                    break
            print(f'滚动结束\n')
    
        def run(self):
            with open('./twitter_name_ar.txt', 'r') as _ar:
                targets = _ar.readlines()
            with open('./twitter_name_en.txt', 'r') as _en:
                en = _en.readlines()
            targets.extend(en)
            for target in targets:  # 循环目标
                target, today = target.replace('\n', ''), datetime.date.today()
                json_path = f'C:/Users/{getpass.getuser()}/Desktop/ljd/json_file/Twitter/twitter_{today}.json'
                isdir_exists(json_path)
                print(f'目标名:{target}')
                timed, log_data = pubilished_time(target), ''.join(get_logdata(target))  # 获取本地文件数据
                self.get(f'{self.url_p}/{target}')
                try:
                    _d = self.driver.find_element_by_xpath('//span[contains(text()," 加入")]').text
                except Exception:
                    _d = None
                if _d:
                    print(f'twitter创建时间:{_d}')
                    for s_date, e_date in get_date(_d):  # 获取开始和截止日期
                        print(f'目前在 {s_date} 至 {e_date} 范围抓取')
                        url = f'{self.url_p}/search?f=live&q=(from%3A{target})%20until%3A{e_date}%20since%3A{s_date}%20-filter%3Areplies&src=typed_query'
                        print(f'目标网址:{url}')
                        try:
                            self.scroll_page(url, s_date, target, timed, log_data, json_path)  # 滚动获取数据
                        except Exception as e:
                            pass
                        if self._n > 20:
                            self._n = 0
                            break
                        if self.m_n > 0:
                            self.m_n = 0
                            break
                    write_crawl_time(timed, today, target)  # 写入 开始抓取时间
                    write_logdata(today, target)
            self.driver.close()
            print(f'任务结束 {datetime.datetime.today()}')
    
    
    def run_spider():
        T = Twitter()
        T.run()
    
    
    """
    schedule.every(10).seconds.do(job) # 每10秒执行一次
    schedule.every(10).minutes.do(job) # 每10分钟执行一次
    schedule.every().hour.do(job) # 每小时执行一次
    schedule.every().day.at("10:30").do(job) # 每天十点半执行
    schedule.every().monday.do(job) # 每周一执行
    schedule.every().wednesday.at("13:15").do(job) # 每周三13点15执行
    """
    
    if __name__ == '__main__':
        n = 0
        if n == 0:
            """运行所有"""
            run_spider()
        elif n == 1:
            """定时"""
            start_time = '00:00'
            print(f'开启时间:{datetime.datetime.today()}\n{__file__.split("/")[-1]}\t每周一下午{start_time}  自动抓取\n')
            schedule.every().monday.at(start_time).do(run_spider)
            while True:
                schedule.run_pending()  # 运行所有可运行的任务
                time.sleep(1)
    

    static_func.py

    import datetime
    import logging
    import os
    
    import dateparser
    from dateutil.relativedelta import relativedelta
    
    BASE_DIR = '/'.join(__file__.split('\\')[:-1])
    newest_date_path = lambda _t: f'{BASE_DIR}/log/newest_date/{_t}.log'
    old_data_path = lambda _t: f'{BASE_DIR}/log/old_data/{_t}.log'
    
    
    def str_to_date(date):
        """字符串日期格式化为日期类型"""
        return datetime.datetime.strptime(date, '%Y-%m-%d').date()
    
    
    def norm_date(date):
        """转换各国语言日期,并格式化日期:xxxx-xx-xx"""
        return dateparser.parse(date).date()
    
    
    def isdir_exists(filepath):
        """判断文件及文件夹是否存在"""
        if not os.path.exists(filepath):
            dirpath = '/'.join(filepath.split('/')[:-1])
            if not os.path.exists(dirpath):
                os.makedirs(dirpath)
            with open(filepath, 'w', encoding='utf-8'):
                logging.info(f'创建文件成功:{filepath}')
    
    
    def pubilished_time(target):
        path = newest_date_path(target)
        isdir_exists(path)
        with open(path, 'r', encoding='utf-8') as file:
            date = file.read()
        if date:
            date = str_to_date(date)
        else:
            date = str_to_date('1000-01-01')
        return date
    
    
    def write_crawl_time(timed, today, target):
        with open(newest_date_path(target), 'w', encoding='utf-8') as json_file:
            if timed == str_to_date('1000-01-01'):
                today = today + relativedelta(days=+1)
            json_file.write(str(today))
            logging.info(f'已写入更新日期:{today}')
    
    
    def get_logdata(target):
        path = old_data_path(target)
        isdir_exists(path)
        with open(path, 'r', encoding='utf-8') as log_file:
            log_file = log_file.readlines()
        if not log_file:
            log_file = ['我没有日志数据']
        # 禁止拼接,因为 write_logdata 函数 需用列表读取每行,通过日期排除冗余数据
        return log_file
    
    
    def write_logdata(today, target):
        """写入最近一次的当天及明天抓取的url, 减少文件大小"""
        tomorrow = today + relativedelta(days=+1)
        old_data = get_logdata(target)
        with open(old_data_path(target), 'w', encoding="utf-8") as o:
            for data in old_data:
                if data:
                    if str(today) in data:
                        o.write(data)
                    if str(tomorrow) in data:
                        o.write(data)
    
    
    def replaces(strs):
        """
        替换所有的特殊符号
        """
        try:
            s = ''.join(strs).replace('...', '').replace('\r', '').replace('\n', '').replace('\t', '').strip()\
                .replace('  ', '').replace('  ', '').replace('  ', '').replace('  ', '').replace('  ', '')
            return s
        except Exception:
            return ''
    
    
    def get_date(_d):
        dates = []
        create_time = norm_date(str(norm_date(_d.replace('加入', '')))[:7] + '-01')
        today = datetime.date.today()
        while True:
            # 这个月第一天
            now_first = dateparser.parse(str(today)[:7] + '-01')
            # 下个月第一天
            next_first = now_first + relativedelta(months=1)
            # 这个月最后一天
            now_last = next_first - relativedelta(days=1)
            today = today - relativedelta(months=1)
            dates.append([str(now_first)[:10], str(now_last)[:10]])
            if today <= create_time:
                break
        return dates
    
    
    if __name__ == '__main__':
        print(get_date('2016-05-10'))
    
    

    twitter_name_ar.txt

    Sara_M_Fahad
    h_albogami
    mohammedal_saud
    

    twitter_name_en.txt

    sfrantzman
    Mr_Alshammeri
    Ben_winkley
    FirasMaksad
    

    相关文章

      网友评论

          本文标题:大家来找茬

          本文链接:https://www.haomeiwen.com/subject/zpbwfrtx.html