twitter_spider.py
import json
import time
import datetime
import langid
import schedule
from selenium import webdriver
from lxml import etree
import getpass
from static_func import pubilished_time, norm_date, get_logdata, replaces, get_date, write_logdata, old_data_path, \
write_crawl_time, isdir_exists
class Twitter(object):
timing, url_p, num, data_urls, _n, m_n = 7, 'https://twitter.com', 0, [], 0, 0
def __init__(self):
co = webdriver.ChromeOptions()
co.add_argument(
'user-agent=Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.61 Safari/537.36')
co.add_argument(f'referer={self.url_p}')
# co.add_argument('--headless')
co.add_argument('--disable-gpu')
co.add_argument("--start-maximized")
co.add_argument('blink-settings=imagesEnabled=false')
co.add_experimental_option("excludeSwitches", ["enable-automation"])
co.add_experimental_option('useAutomationExtension', False)
self.driver = webdriver.Chrome(options=co)
self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
"source": "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
})
def get(self, url):
self.driver.get(url)
time.sleep(self.timing)
def parse(self, tree, target, timed, log_data, json_path):
details = tree.xpath('//article')
for detail in details:
item, post_details, _a = {}, {}, '//div[@class="css-1dbjc4n r-1d09ksm r-18u37iz r-1wbh5a2"]//a[@title]'
press_time, data_url = ''.join(detail.xpath(f'.{_a}/time/@datetime')), ''.join(detail.xpath(f'.{_a}/@href'))
if data_url and press_time:
if 'http' not in data_url and '://' not in data_url:
data_url = self.url_p + data_url
press_time = press_time[:19].replace('T', ' ')
pt = norm_date(press_time)
if pt >= timed:
if pt == timed:
with open(old_data_path, 'a') as _a:
_a.write(f'{datetime.date.today()} {data_url} \n')
if data_url not in log_data:
content = replaces(' '.join(detail.xpath(
'.//div[contains(@class,"r-a023e6 r-16dba41 r-ad9z0x r-bcqeeo r-bnwqim r-qvutc0")]//text()'))).replace(
'…', '')
language = langid.classify(content)[0]
if language not in ['en', 'ar']:
language = ''
citation_url = ''.join(detail.xpath(
'.//a[@class="css-4rbku5 css-18t94o4 css-1dbjc4n r-1loqt21 r-18u37iz r-16y2uox r-1wtj0ep"]/@href|.//div[contains(@class,"r-a023e6 r-16dba41 r-ad9z0x r-bcqeeo r-bnwqim r-qvutc0")]//a[@title]/@title'))
img_url = ''.join(detail.xpath(
'.//div[@class="css-1dbjc4n r-9x6qib r-t23y2h r-1phboty r-rs99b7 r-156q2ks r-1udh08x"]//img[@class="css-9pa8cd"]/@src'))
video_obj = detail.xpath(
'.//div[@class="css-1dbjc4n r-1p0dtai r-1loqt21 r-1d2f490 r-u8s1d r-zchlnj r-ipm5af"]')
video_url = ''
if video_obj:
video_url = data_url
post_obj = detail.xpath(
'.//div[@class="css-1dbjc4n r-9x6qib r-t23y2h r-rs99b7 r-1loqt21 r-dap0kf r-1ny4l3l r-1udh08x r-o7ynqc r-6416eg"]')
if post_obj:
publisher = ''.join(post_obj[0].xpath(
'.//div[@class="css-901oao css-bfa6kz r-1re7ezh r-18u37iz r-1qd0xha r-a023e6 r-16dba41 r-ad9z0x r-bcqeeo r-qvutc0"]/span/text()'))
publish_time = ''.join(post_obj[0].xpath('.//time/@datetime'))[:19].replace('T', ' ')
publish_content = replaces(' '.join(post_obj[0].xpath(
'.//div[@class="css-901oao r-hkyrab r-1qd0xha r-a023e6 r-16dba41 r-ad9z0x r-1g94qm0 r-bcqeeo r-bnwqim r-qvutc0"]//text()')).replace(
'…', ''))
publish_img_url = ''.join(post_obj[0].xpath(
'.//div[@class="css-1dbjc4n r-1p0dtai r-1mlwlqe r-1d2f490 r-11wrixw r-1udh08x r-u8s1d r-zchlnj r-ipm5af r-417010"]//img/@src'))
post_details['post'], post_details['publish_time'], post_details['publish_content'], \
post_details['publish_img_url'] = publisher, publish_time, publish_content, publish_img_url
if data_url not in self.data_urls:
item['username'], item['press_time'], item['data_url'], item['content'], item[
'citation_url'], item['img_url'], item['video_url'], item['post_details'], item[
'language'] = \
target, press_time, data_url, content, citation_url, img_url, video_url, post_details, language
with open(json_path, 'a', encoding="utf-8") as write_json:
write_json.write(json.dumps(item, ensure_ascii=False) + '\n')
self.data_urls.append(data_url)
self.num += 1
print(item)
print(f'获取 {self.num} 条\n')
else:
self._n += 1
print(f'{self._n} 已抓取过数据 发布时间{pt} < {timed}')
if self._n > 20:
break
else:
print(f'没有 {data_url} 或 {press_time}')
def scroll_page(self, url, s_date, target, timed, log_data, json_path):
self.get(url)
print('滚动开始')
break_num = 0
for i in range(1, 35):
print(f'滚动次数:{i}')
source = self.driver.page_source
tree = etree.HTML(source)
if tree.xpath(
'//div[@class="css-901oao r-1re7ezh r-1qd0xha r-a023e6 r-16dba41 r-ad9z0x r-bcqeeo r-q4m81j r-ey96lj r-qvutc0"]'):
print(f'{s_date}当月 无内容,抓取下个月\n')
self.m_n += 1
break
if self.m_n > 0:
break
self.parse(tree, target, timed, log_data, json_path) # 解析数据
self.driver.execute_script(f"window.scrollBy(0,{str(i*1000)})")
if tree.xpath('//div[@class="css-1dbjc4n r-1bxhq7s"]'):
break_num += 1
print(f'break_num: {break_num}')
if break_num > 3:
print(f'超三次无内容,抓取下个月')
break
time.sleep(5)
if self._n > 20:
break
print(f'滚动结束\n')
def run(self):
with open('./twitter_name_ar.txt', 'r') as _ar:
targets = _ar.readlines()
with open('./twitter_name_en.txt', 'r') as _en:
en = _en.readlines()
targets.extend(en)
for target in targets: # 循环目标
target, today = target.replace('\n', ''), datetime.date.today()
json_path = f'C:/Users/{getpass.getuser()}/Desktop/ljd/json_file/Twitter/twitter_{today}.json'
isdir_exists(json_path)
print(f'目标名:{target}')
timed, log_data = pubilished_time(target), ''.join(get_logdata(target)) # 获取本地文件数据
self.get(f'{self.url_p}/{target}')
try:
_d = self.driver.find_element_by_xpath('//span[contains(text()," 加入")]').text
except Exception:
_d = None
if _d:
print(f'twitter创建时间:{_d}')
for s_date, e_date in get_date(_d): # 获取开始和截止日期
print(f'目前在 {s_date} 至 {e_date} 范围抓取')
url = f'{self.url_p}/search?f=live&q=(from%3A{target})%20until%3A{e_date}%20since%3A{s_date}%20-filter%3Areplies&src=typed_query'
print(f'目标网址:{url}')
try:
self.scroll_page(url, s_date, target, timed, log_data, json_path) # 滚动获取数据
except Exception as e:
pass
if self._n > 20:
self._n = 0
break
if self.m_n > 0:
self.m_n = 0
break
write_crawl_time(timed, today, target) # 写入 开始抓取时间
write_logdata(today, target)
self.driver.close()
print(f'任务结束 {datetime.datetime.today()}')
def run_spider():
T = Twitter()
T.run()
"""
schedule.every(10).seconds.do(job) # 每10秒执行一次
schedule.every(10).minutes.do(job) # 每10分钟执行一次
schedule.every().hour.do(job) # 每小时执行一次
schedule.every().day.at("10:30").do(job) # 每天十点半执行
schedule.every().monday.do(job) # 每周一执行
schedule.every().wednesday.at("13:15").do(job) # 每周三13点15执行
"""
if __name__ == '__main__':
n = 0
if n == 0:
"""运行所有"""
run_spider()
elif n == 1:
"""定时"""
start_time = '00:00'
print(f'开启时间:{datetime.datetime.today()}\n{__file__.split("/")[-1]}\t每周一下午{start_time} 自动抓取\n')
schedule.every().monday.at(start_time).do(run_spider)
while True:
schedule.run_pending() # 运行所有可运行的任务
time.sleep(1)
static_func.py
import datetime
import logging
import os
import dateparser
from dateutil.relativedelta import relativedelta
BASE_DIR = '/'.join(__file__.split('\\')[:-1])
newest_date_path = lambda _t: f'{BASE_DIR}/log/newest_date/{_t}.log'
old_data_path = lambda _t: f'{BASE_DIR}/log/old_data/{_t}.log'
def str_to_date(date):
"""字符串日期格式化为日期类型"""
return datetime.datetime.strptime(date, '%Y-%m-%d').date()
def norm_date(date):
"""转换各国语言日期,并格式化日期:xxxx-xx-xx"""
return dateparser.parse(date).date()
def isdir_exists(filepath):
"""判断文件及文件夹是否存在"""
if not os.path.exists(filepath):
dirpath = '/'.join(filepath.split('/')[:-1])
if not os.path.exists(dirpath):
os.makedirs(dirpath)
with open(filepath, 'w', encoding='utf-8'):
logging.info(f'创建文件成功:{filepath}')
def pubilished_time(target):
path = newest_date_path(target)
isdir_exists(path)
with open(path, 'r', encoding='utf-8') as file:
date = file.read()
if date:
date = str_to_date(date)
else:
date = str_to_date('1000-01-01')
return date
def write_crawl_time(timed, today, target):
with open(newest_date_path(target), 'w', encoding='utf-8') as json_file:
if timed == str_to_date('1000-01-01'):
today = today + relativedelta(days=+1)
json_file.write(str(today))
logging.info(f'已写入更新日期:{today}')
def get_logdata(target):
path = old_data_path(target)
isdir_exists(path)
with open(path, 'r', encoding='utf-8') as log_file:
log_file = log_file.readlines()
if not log_file:
log_file = ['我没有日志数据']
# 禁止拼接,因为 write_logdata 函数 需用列表读取每行,通过日期排除冗余数据
return log_file
def write_logdata(today, target):
"""写入最近一次的当天及明天抓取的url, 减少文件大小"""
tomorrow = today + relativedelta(days=+1)
old_data = get_logdata(target)
with open(old_data_path(target), 'w', encoding="utf-8") as o:
for data in old_data:
if data:
if str(today) in data:
o.write(data)
if str(tomorrow) in data:
o.write(data)
def replaces(strs):
"""
替换所有的特殊符号
"""
try:
s = ''.join(strs).replace('...', '').replace('\r', '').replace('\n', '').replace('\t', '').strip()\
.replace(' ', '').replace(' ', '').replace(' ', '').replace(' ', '').replace(' ', '')
return s
except Exception:
return ''
def get_date(_d):
dates = []
create_time = norm_date(str(norm_date(_d.replace('加入', '')))[:7] + '-01')
today = datetime.date.today()
while True:
# 这个月第一天
now_first = dateparser.parse(str(today)[:7] + '-01')
# 下个月第一天
next_first = now_first + relativedelta(months=1)
# 这个月最后一天
now_last = next_first - relativedelta(days=1)
today = today - relativedelta(months=1)
dates.append([str(now_first)[:10], str(now_last)[:10]])
if today <= create_time:
break
return dates
if __name__ == '__main__':
print(get_date('2016-05-10'))
twitter_name_ar.txt
Sara_M_Fahad
h_albogami
mohammedal_saud
twitter_name_en.txt
sfrantzman
Mr_Alshammeri
Ben_winkley
FirasMaksad
网友评论