import re
import os
import json
import time
import execjs
import random
import hashlib
import datetime
import requests
import pymysql
from .public_method import PublicMethod as pm
from bs4 import BeautifulSoup
from urllib.parse import quote
from elasticsearch import Elasticsearch
class ZhiHu:
def init(self, es, index, kws, pages=5, time_sleep=6):
self.es = es
self.kws = kws
self.pages = int(pages)
self.index = index
self.time_sleep = time_sleep
def start(self):
word_list = self.kws
for item in set(word_list):
# time.sleep(5)
# print(item)
self.search(item)
def search(self, def_keyword, max_page=-1, task_name=''):
def_log_name = 'zhihu' + str(datetime.datetime.now())[0:19].replace(' ', '').replace(':', '').replace('-', '')
try:
self.write_log(def_log_name, 'ZhiHu正在爬取“{}”相关内容'.format(def_keyword), 1)
keyword = quote(def_keyword)
user_agent = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36'
referer = 'https://www.zhihu.com/search?type=content&q=' + keyword
x_zse_93 = '101_3_2.0'
cookie = 'd_c0="AJAQjrv_3ROPTkO_VUROUBPm9N8nvbosWQs=|1634095612"; z_c0="2|1:0|10:1638760457|4:z_c0|92:Mi4xdmV5RkJBQUFBQUFBa0JDT3VfX2RFeVlBQUFCZ0FsVk5DYzZhWWdDc0t5UFEtampwVnFqRVBkUmlmRmREMlFZSlBR|448bb1636cfe026f2ac7e5b891194ae8dcc3b5f0e0676f6fbb8267d77270dd3b"'
# cookie = '_zap=944f67b8-9adb-43e0-95bb-f90c89bdd24c; d_c0="AJAQjrv_3ROPTkO_VUROUBPm9N8nvbosWQs=|1634095612"; _9755xjdesxxd_=32; _xsrf=c5wkbz4fQRTRAP8gxCzAvXP9vyhitlAd; __snaker__id=Amg9n5ZVCEOU7Q4X; YD00517437729195:WM_NI=BWovQEb48q3Un3wRNXXxqDhgGYdO2MInM86V5VLKPREm3QHhwdZjnVH7oEyO61mqm57G41bnZdGlSf9FA7gEPltKs5X8VkkmztK8nLw4+OmWi4qIVwaQz0JC3D7mlGc6Rlk=; YD00517437729195:WM_NIKE=9ca17ae2e6ffcda170e2e6eeb5fc5dad8da996f566f2bc8eb7d55b839a8fbbae3fad8da38eec5eedbc85b2ee2af0fea7c3b92a8289a197d44fa2ecf892b25faeedb9b3cd549ca8bfa5eb6d8587acb0b1478bb283d9b55ef78a84acd35a87b3fdb2cb64b5bc968bed5e8ce9a1a6fc45f6efa392f03ff599fcd2e1669abc898dd26efb9a8fd8e64f94b4bcacf23f9ce89a8cea67baac85d9ed48b5ece5b0f05dabecbfaad57da38fb6d1c949f19cf98eef4da1ea9e8dea37e2a3; YD00517437729195:WM_TID=8juR254EwX5EABRUUFY/8sXyiiF+YbRp; z_c0="2|1:0|10:1638760457|4:z_c0|92:Mi4xdmV5RkJBQUFBQUFBa0JDT3VfX2RFeVlBQUFCZ0FsVk5DYzZhWWdDc0t5UFEtampwVnFqRVBkUmlmRmREMlFZSlBR|448bb1636cfe026f2ac7e5b891194ae8dcc3b5f0e0676f6fbb8267d77270dd3b"; gdxidpyhxdE=t+ah8jn1jAofUJcdL6US8QTOHSIMXrVBPRjMEIqMZzicQ8wqEwhoqiL/S0IPvYhDrd2jjtJWYPYw+O8zM9Gz6phyMxcuhUpiW8p\9t8uJVZkQuIfVja5cCliPIwG+xo+dTDIlTQps9aSdVfB6C/4Jl\XGwqyoL3kqh/+UhECq\bYxVo9:1638779289492; Hm_lvt_98beee57fd2ef70ccdd5ca52b9740c49=1641973781,1642923853,1643018665; Hm_lpvt_98beee57fd2ef70ccdd5ca52b9740c49=1643018665; NOT_UNREGISTER_WAITING=1; SESSIONID=qPWK52dUvKalL5UrLQopVdtrFRvcanz9Qgu1RG7VkkS; KLBRSID=76ae5fb4fba0f519d97e594f1cef9fab|1643018667|1643018664; JOID=VloTAE1Mn4gkbr4pA0pr1YCpOQ4VO8vaalmKSVM40u1NJ4l6deTnZkJqvCgDFhfs-V8B0Ated30cdRIINHlssXw=; osd=WlAQC0NAlYsvYLIjAEFl2YqqMgAZMcjRZFWASlg23udOLId2f-fsaE5gvyMNGh3v8lEN2ghVeXEWdhkGOHNvunI='
# cookie_extract = re.findall(r'd_c0=(".*=|\d{10}")', cookie)[0].split(';')[0]
cookie_extract = re.findall(r'd_c0="(.*?)";', cookie)[0]
# cookie_extract_raw = re.findall(r'd_c0="(.*?)";', cookie)[0]
# cookie_extract_list=cookie_extract_raw.split('|')
# cookie_extract=cookie_extract_list[0]+'|'+str(round(time.time()))
# print('cookie_extract',a)
print('cookie_extract',cookie_extract)
self.write_log(def_log_name, f'cookie_extract:{cookie_extract}') # 提取cookie中的d_c0
offset = 0
lc_idx = 0
break_num = 0
while 1:
path = '/api/v4/search_v3?t=general&q={keyword}&correction=1&offset={offset}&limit=20&filter_fields=&lc_idx={lc_idx}&show_all_topics=0&search_source=Normal'
url = path.format(keyword=keyword, offset=offset, lc_idx=lc_idx)
full_url = 'https://www.zhihu.com' + url
f = "+".join([x_zse_93, url, '"{}"'.format(cookie_extract)])
fmd5 = hashlib.new('md5', f.encode()).hexdigest() # md5加密
with open('E:/project/君智项目/crawler/zhang/decrypt.js', 'r') as f:
ctx1 = execjs.compile(f.read(), cwd=r'E:/ProgramFiles/nodejs/node_modules')
encrypt_str = ctx1.call('b', fmd5) # 知乎加密
x_zse_96 = "2.0_%s" % encrypt_str
self.write_log(def_log_name, f'x_zse_96:{x_zse_96}')
headers = {
'User-Agent': user_agent,
"referer": referer,
'Cookie': cookie,
"x-zse-93": x_zse_93,
"x-zse-96": x_zse_96
}
search_web_data = requests.get(full_url, headers=headers)
time.sleep(random.random() + 1)
status_code = search_web_data.status_code
if status_code == 200:
search_json_data = json.loads(search_web_data.text)
search_data = search_json_data.get('data')
if search_data:
for item in search_data:
item_type = item.get('type')
if item_type == 'search_result':
item_object = item.get('object')
item_question = item_object.get('question')
if item_question:
question_id = item_question.get('id') # 问题ID
# print(question_id)
question_url = 'https://www.zhihu.com/question/' + question_id
self.get_all(question_url, max_page, def_keyword, def_log_name, task_name)
# print('\n' + question_url)
else:
self.write_log(def_log_name, '未获取到数据。')
break_num += 1
if break_num >= 3:
return
else:
self.write_log(def_log_name, f'请求数据失败,status_code:{status_code}')
break_num += 1
if break_num >= 3:
return
offset += 20
lc_idx += 20
if max_page != -1 and offset >= max_page:
self.write_log(def_log_name, f'超过最大采集页数:{max_page},结束。')
except Exception as e:
self.write_log(def_log_name, e)
def get_web_data(self, def_log_name, def_url):
"""
提取异步加载的数据
:param def_url:
:return:
"""
time.sleep(random.uniform(2, 4))
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/94.0.4606.81 Safari/537.36'}
try:
def_web_data = requests.get(def_url, headers=headers)
def_web_data.encoding = 'utf-8'
def_soup = BeautifulSoup(def_web_data.text, 'html.parser')
return def_soup
except Exception as e:
self.write_log(def_log_name, '数据请求或者解析失败。')
self.write_log(def_log_name, e)
return False
def exec_sql(self, def_log_name, sql, sql_values):
"""执行SQL语句"""
mysql_config = {'host': '172.16.163.5',
'port': 3306,
'user': 'root',
'password': '123456',
'database': 'crawler',
'charset': 'utf8'}
conn = pymysql.connect(**mysql_config)
cursor = conn.cursor()
try:
cursor.executemany(sql, sql_values)
conn.commit()
return 1
except Exception as e:
self.write_log(def_log_name, "执行MySQL: %s 时出错: %s" % (sql, e))
return 0
def write_log(self, def_log_name, def_content, line_feed=0):
log_dir_path = os.path.dirname(__file__) + '/../logs'
path = f"{log_dir_path}/{def_log_name}.log"
log_content = '[' + str(datetime.datetime.now())[0:19] + '] ' + str(def_content) + '\n' # 日志内容
if line_feed == 1: # 如果是1则添加一个换行
with open(path, "a", encoding="utf-8") as f:
print()
f.write('\n')
with open(path, "a", encoding="utf-8") as f:
print(log_content, end='')
f.write(log_content)
def judge_es(self, key, value):
"""判断value是否存在于ES"""
# es = Elasticsearch(
# ['172.16.163.8'],
# sniff_on_connection_fail=True, # 节点没有响应时,进行刷新,重新连接
# sniffer_timeout=60, # 每 60 秒刷新一次
# timeout=360
# ) # 连接ES
key_dict = {
1: "u_id",
2: "item_id",
3: "comments_id"
}
grab_dict = {
1: "u_is_grab",
2: "i_is_grab",
3: "c_is_grab"
}
select_body = {
"query": {
"term": {
key_dict.get(key): value
}
}
}
data = self.es.search(index="es_zhihu_check", body=select_body)['hits']['hits']
# print(data)
if data: # 如果存在则判断是否爬取
is_grab = data[0].get('_source').get(grab_dict.get(key))
if is_grab:
return 1
else:
return 2
else:
return 0
def insert_es_single(self, key, value):
# es = Elasticsearch(
# ['172.16.163.8'],
# sniff_on_connection_fail=True, # 节点没有响应时,进行刷新,重新连接
# sniffer_timeout=60, # 每 60 秒刷新一次
# timeout=360
# ) # 连接ES
key_dict = {
1: "u_id",
2: "item_id",
3: "comments_id"
}
grab_dict = {
1: "u_is_grab",
2: "i_is_grab",
3: "c_is_grab"
}
body = {
key_dict.get(key): value,
grab_dict.get(key): 1, # 是否已爬取,0是未爬取,1是已爬取
}
self.es.index(index="es_zhihu_check", body=body)
def get_all(self, def_url, max_page=-1, keyword='', log_name='', task_name=''):
if log_name == '':
def_log_name = 'zhihu' + str(datetime.datetime.now())[0:19].replace(' ', '').replace(':', '').replace('-', '')
else:
def_log_name = log_name
try:
self.write_log(def_log_name, f'ZhiHu当前爬取的链接:{def_url}', 1)
q_id_list = re.findall(r'www.zhihu.com/question/(\d+)', def_url)
if q_id_list:
question_id = q_id_list[0] # 问题ID
else:
self.write_log(def_log_name, '输入链接异常或无法获取question_id!')
return False
title_soup = self.get_web_data(def_log_name, def_url)
title = title_soup.select('h1[class="QuestionHeader-title"]')[0].text # 问题标题
page = 0
platform_name = '知乎网' # 渠道名称
print('sssss')
while True:
question_url_base = 'https://www.zhihu.com/api/v4/questions/{question_id}/answers?include=data%5B%2A%5D.is_normal%2Cadmin_closed_comment%2Creward_info%2Cis_collapsed%2Cannotation_action%2Cannotation_detail%2Ccollapse_reason%2Cis_sticky%2Ccollapsed_by%2Csuggest_edit%2Ccomment_count%2Ccan_comment%2Ccontent%2Ceditable_content%2Cattachment%2Cvoteup_count%2Creshipment_settings%2Ccomment_permission%2Ccreated_time%2Cupdated_time%2Creview_info%2Crelevant_info%2Cquestion%2Cexcerpt%2Cis_labeled%2Cpaid_info%2Cpaid_info_content%2Crelationship.is_authorized%2Cis_author%2Cvoting%2Cis_thanked%2Cis_nothelp%2Cis_recognized%3Bdata%5B%2A%5D.mark_infos%5B%2A%5D.url%3Bdata%5B%2A%5D.author.follower_count%2Cvip_info%2Cbadge%5B%2A%5D.topics%3Bdata%5B%2A%5D.settings.table_of_content.enabled&limit=5&offset={page}&platform=desktop&sort_by=default'
question_url = question_url_base.format(question_id=question_id, page=page)
answer_soup = self.get_web_data(def_log_name, question_url)
print(1111,answer_soup)
if answer_soup:
answer_content = json.loads(answer_soup.text)['data']
else:
answer_content = ''
print('434233543545',answer_content)
if len(answer_content) != 0:
for item in answer_content:
question_id = item.get('question').get('id') # 问题ID
answer_id = item['id'] # 回答ID,主键
es_unique_id1 = str(question_id) + '/' + str(answer_id) # 跳过第一版主键情况
judge_result1 = self.judge_es(2, es_unique_id1)
if judge_result1 != 0: # 如果存在则跳过
self.write_log(def_log_name, '{}已爬取过(第一版)。'.format(es_unique_id1))
continue
print(2222)
es_unique_id = str(question_id) + '/' + str(answer_id) + '/' + task_name
judge_result = self.judge_es(2, es_unique_id)
if judge_result != 0: # 如果存在则跳过
self.write_log(def_log_name, '{}已爬取过。'.format(es_unique_id))
continue
author_name = item['author']['name'] # 回答用户昵称
author_describe = item['author']['headline'] # 回答用户描述
author_gender_raw = item['author']['gender'] # 回答用户性别
if author_gender_raw == 1:
author_gender = '男'
elif author_gender_raw == 0:
author_gender = '女'
else:
author_gender = '未知'
answer_url = 'https://www.zhihu.com/answer/{}'.format(answer_id) # 回答链接
answer_content = item.get('excerpt') # 回答内容
answer_comment = item['comment_count'] # 回答评论数
answer_vote = item['voteup_count'] # 回答赞同数
answer_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(item.get('created_time'))) # 回答时间
create_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) # 采集创建时间
emotion = pm.get_emotion(answer_comment[0:50])
print(3333)
# 将回答信息插入数据库
answer_values = list()
sql = "insert into crawler.zhihu_content(task_name, platform_name, keyword, question_id, answer_id, author_name, author_describe, author_gender, answer_url, answer_content, answer_comment, answer_vote, answer_time, create_time) values(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);"
answer_values.append((task_name, platform_name, keyword, question_id, answer_id, author_name, author_describe, author_gender, answer_url, answer_content, answer_comment, answer_vote, answer_time, create_time))
result = self.exec_sql(def_log_name, sql, answer_values)
if result == 1: # 插入成功
self.insert_es_single(2, es_unique_id)
self.write_log(def_log_name, 'Mysql插入数据成功:{}'.format(answer_values))
self.content_es(answer_comment, answer_content, answer_time, answer_url, answer_vote, author_name, create_time, es_unique_id, platform_name, title,emotion) # 将回答内容写入ES
self.get_comments(def_log_name, question_id, answer_id, platform_name, answer_content, es_unique_id, task_name) # 获取评论
# exit(1)
page = page + 5
if max_page != -1 and page >= max_page:
self.write_log(def_log_name, f'已完成采集,共{max_page}页。')
break
else:
self.write_log(def_log_name, '-----执行完毕')
break
except Exception as e:
self.write_log(def_log_name, e)
def content_es(self, answer_comment, answer_content, answer_time, answer_url, answer_vote, author_name, create_time, es_unique_id, platform_name, title,emotion):
"""将回答内容写入ES"""
# es = Elasticsearch(
# ['172.16.163.8'],
# sniff_on_connection_fail=True, # 节点没有响应时,进行刷新,重新连接
# sniffer_timeout=60, # 每 60 秒刷新一次
# timeout=360
# ) # 连接ES
esHour = re.findall(r'\d{4}-\d{2}-\d{2} (\d{2}):\d{2}:\d{2}', str(answer_time))
esDate = re.findall(r'(\d{4}-\d{2}-\d{2})', str(answer_time))[0]
body = {
'esHour': esHour[0] if esHour else '00', # 文章内容/评论时间小时
'create_time': round(time.mktime(time.strptime(create_time, '%Y-%m-%d %H:%M:%S'))), # 数据采集时间
'topSource': '站外资源',
'sysId': 1,
'hotWord': '',
'source': platform_name,
'title': title, # 问题标题
'url': answer_url, # 回答链接
'content': answer_content, # 回答内容
'esDate': esDate, # 文章内容/评论时间日期
'createTime': round(time.mktime(time.strptime(str(answer_time), '%Y-%m-%d %H:%M:%S'))), # 评论时间
'viewed': 0, # 浏览次数/转发量
'comments': answer_comment, # 总评论数/回答评论数
'id': es_unique_id, # 主键
'makeSummary': (answer_content[0:200] + '...') if len(answer_content) >= 200 else answer_content, # 摘要
'parentSource': '社媒', # 电商/社媒/视频网
'dataType': '文章', # 文章/评论
'emotion': emotion, # 1.正面2.中性3.负面
'usefulCount': answer_vote, # 有用数/回答赞同数
'create_by': author_name # 作者/回答用户昵称
}
self.es.index(index=self.index, body=body)
def get_sub_comments(self, def_log_name, task_name, question_id, answer_id, def_comments_id, platform_name, answer_content, parent_id, answer_url):
"""评论的回复部分(二级评论)"""
sub_comments_offset = 0
while 1:
sub_comments_url = 'https://www.zhihu.com/api/v4/comments/{comments_id}/child_comments?limit=20&offset={sub_comments_offset}'.format(comments_id=def_comments_id, sub_comments_offset=sub_comments_offset)
try:
sub_comments_soup = self.get_web_data(def_log_name, sub_comments_url)
sub_comments_json_data = json.loads(sub_comments_soup.text)
sub_comments_data = sub_comments_json_data['data']
except Exception as e:
self.write_log(def_log_name, f'二级评论获取异常:{e}')
sub_comments_data = {}
if sub_comments_data:
for sub_item in sub_comments_data:
sub_comments_id = sub_item['id'] # 评论ID,和问题ID、回答ID形成复合主键
es_unique_id1 = str(question_id) + '/' + str(answer_id) + '/' + str(sub_comments_id) # 跳过第一版主键情况
judge_result1 = self.judge_es(3, es_unique_id1)
if judge_result1 != 0: # 如果存在则跳过
self.write_log(def_log_name, '{}评论已爬取过(第一版)。'.format(es_unique_id1))
continue
es_unique_id = str(question_id) + '/' + str(answer_id) + '/' + str(sub_comments_id) + '/' + task_name # 评论主键
judge_result = self.judge_es(3, es_unique_id)
if judge_result != 0: # 如果存在则跳过
self.write_log(def_log_name, '{}评论已爬取过。'.format(es_unique_id))
continue
sub_comments_content = sub_item['content'] # 二级评论内容
sub_author_name = sub_item['reply_to_author']['member']['name'] # 评论用户昵称
sub_author_describe = sub_item['reply_to_author']['member']['headline'] # 评论用户描述
sub_author_gender = sub_item['reply_to_author']['member']['gender'] # 评论用户性别
sub_vote_count = sub_item['vote_count'] # 二级评论点赞数
child_comment_count = sub_item.get('child_comment_count') # 二级评论下应该没有回复数,应该取出为空
comment_rank = 2 # 评论等级1为一级评论2为评论回复
sub_comments_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(sub_item['created_time'])) # 评论时间
sub_create_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) # 采集创建时间
emotion = pm.get_emotion(sub_comments_content[0:50])
# 将二级级评论插入数据库
sub_comments_values = list()
sub_comments_sql = "insert into crawler.zhihu_comments(task_name, question_id, answer_id, comments_id, comments_content, author_name, author_describe, author_gender, vote_count, child_comment_count, comment_rank, comments_time, create_time) values(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);"
sub_comments_values.append(
(task_name, question_id, answer_id, sub_comments_id, sub_comments_content, sub_author_name, sub_author_describe, sub_author_gender, sub_vote_count, child_comment_count, comment_rank, sub_comments_time,
sub_create_time))
result = self.exec_sql(def_log_name, sub_comments_sql, sub_comments_values)
if result == 1: # 插入成功
self.insert_es_single(3, es_unique_id)
self.write_log(def_log_name, '插入评论数据成功:{}'.format(sub_comments_values))
self.sub_comments_es(answer_content, answer_url, child_comment_count, es_unique_id, parent_id, platform_name, sub_author_name, sub_comments_content, sub_comments_time, sub_create_time, sub_vote_count,emotion) # 将二级评论写入ES
else:
self.write_log(def_log_name, '当前评论回复爬取结束。')
break
sub_comments_offset += 20
def sub_comments_es(self, answer_content, answer_url, child_comment_count, es_unique_id, parent_id, platform_name, sub_author_name, sub_comments_content, sub_comments_time, sub_create_time, sub_vote_count,emotion):
"""将二级评论写入ES"""
# es = Elasticsearch(
# ['172.16.163.8'],
# sniff_on_connection_fail=True, # 节点没有响应时,进行刷新,重新连接
# sniffer_timeout=60, # 每 60 秒刷新一次
# timeout=360
# ) # 连接ES
esHour = re.findall(r'\d{4}-\d{2}-\d{2} (\d{2}):\d{2}:\d{2}', str(sub_comments_time))
esDate = re.findall(r'(\d{4}-\d{2}-\d{2})', str(sub_comments_time))[0]
body = {
'esHour': esHour[0] if esHour else '00', # 文章内容/评论时间小时
'create_time': round(time.mktime(time.strptime(sub_create_time, '%Y-%m-%d %H:%M:%S'))), # 数据采集时间
'topSource': '站外资源',
'sysId': 1,
'hotWord': '',
'source': platform_name,
'title': (answer_content[0:50] + '...') if len(answer_content) >= 50 else answer_content, # 回答内容缩略
'url': answer_url, # 回答链接
'content': sub_comments_content, # 二级评论内容
'esDate': esDate, # 文章内容/评论时间日期
'createTime': round(time.mktime(time.strptime(str(sub_comments_time), '%Y-%m-%d %H:%M:%S'))), # 评论时间
'viewed': 0, # 浏览次数/转发量
'comments': child_comment_count, # 总评论数/一级评论的回复数(二级评论)
'id': es_unique_id, # 主键
'makeSummary': (sub_comments_content[0:200] + '...') if len(sub_comments_content) >= 200 else sub_comments_content, # 摘要
'parentSource': '社媒', # 电商/社媒/视频网
'dataType': '评论', # 文章/评论
'emotion': emotion, # 1.正面2.中性3.负面
'usefulCount': sub_vote_count, # 有用数/赞同数
'create_by': sub_author_name, # 作者/用户昵称
'parentId': parent_id # 隶属于哪条回答
}
self.es.index(index=self.index, body=body)
def get_comments(self, def_log_name, question_id, answer_id, platform_name, answer_content, parent_id, task_name=''):
answer_url = 'https://www.zhihu.com/question/{question_id}/answer/{answer_id}'.format(question_id=question_id, answer_id=answer_id)
comments_offset = 0
while 1:
comments_url = 'https://www.zhihu.com/api/v4/answers/{answer_id}/root_comments?order=normal&limit=20&offset={comments_offset}&status=open'.format(answer_id=answer_id, comments_offset=comments_offset)
comments_soup = self.get_web_data(def_log_name, comments_url)
try:
comments_json_data = json.loads(comments_soup.text)
comments_data = comments_json_data['data']
except Exception as e:
self.write_log(def_log_name, e)
comments_data = []
if comments_data:
for item in comments_data:
comments_id = item['id'] # 评论ID,和问题ID形成复合主键
# print(comments_id)
es_unique_id1 = str(question_id) + '/' + str(answer_id) + '/' + str(comments_id) # 跳过第一版主键情况
judge_result1 = self.judge_es(3, es_unique_id1)
if judge_result1 != 0: # 如果存在则跳过
self.write_log(def_log_name, '{}评论已爬取过(第一版)。'.format(es_unique_id1))
continue
es_unique_id = str(question_id) + '/' + str(answer_id) + '/' + str(comments_id) + '/' + task_name # 评论主键
judge_result = self.judge_es(3, es_unique_id)
if judge_result != 0: # 如果存在则跳过
self.write_log(def_log_name, '{}评论已爬取过。'.format(es_unique_id))
continue
comments_content = item['content'] # 一级评论内容
author_name = item['author']['member']['name'] # 评论用户昵称
author_describe = item['author']['member']['headline'] # 评论用户描述
author_gender_raw = item['author']['member']['gender'] # 评论用户性别
if author_gender_raw == 1:
author_gender = '男'
elif author_gender_raw == 0:
author_gender = '女'
else:
author_gender = '未知'
vote_count = item['vote_count'] # 一级评论点赞数
child_comment_count = item['child_comment_count'] # 一级评论的回复数(二级评论)
comment_rank = 1 # 评论等级1为一级评论2为评论回复
comments_time_raw = item['created_time']
try:
comments_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(float(comments_time_raw))) # 评论时间
except Exception as e:
self.write_log(def_log_name, e)
comments_time = ''
create_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) # 采集创建时间
emotion = pm.get_emotion(comments_content[0:50])
# 将一级评论插入数据库
comments_values = list()
comments_sql = "insert into crawler.zhihu_comments(task_name, question_id, answer_id, comments_id, comments_content, author_name, author_describe, author_gender, vote_count, child_comment_count, comment_rank, comments_time, create_time) values(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);"
comments_values.append((task_name, question_id, answer_id, comments_id, comments_content, author_name, author_describe, author_gender, vote_count, child_comment_count, comment_rank, comments_time, create_time))
result = self.exec_sql(def_log_name, comments_sql, comments_values)
# print(comments_values)
if result == 1: # 插入成功
self.insert_es_single(3, es_unique_id)
self.write_log(def_log_name, '插入评论数据成功:{}'.format(comments_values))
self.comments_es(answer_content, answer_url, author_name, child_comment_count, comments_content, comments_time, create_time, es_unique_id, parent_id, platform_name, vote_count,emotion) # 将一级评论写入ES
self.get_sub_comments(def_log_name, task_name, question_id, answer_id, comments_id, platform_name, answer_content, parent_id, answer_url)
else:
self.write_log(def_log_name, f'本条回答评论爬取完毕:{answer_url}')
break
comments_offset += 20
self.write_log(def_log_name, f'comments_offset:{comments_offset}')
def comments_es(self, answer_content, answer_url, author_name, child_comment_count, comments_content, comments_time, create_time, es_unique_id, parent_id, platform_name, vote_count,emotion):
"""将一级评论写入ES"""
# es = Elasticsearch(
# ['172.16.163.8'],
# sniff_on_connection_fail=True, # 节点没有响应时,进行刷新,重新连接
# sniffer_timeout=60, # 每 60 秒刷新一次
# timeout=360
# ) # 连接ES
esHour = re.findall(r'\d{4}-\d{2}-\d{2} (\d{2}):\d{2}:\d{2}', str(comments_time))
esDate = re.findall(r'(\d{4}-\d{2}-\d{2})', str(comments_time))[0]
body = {
'esHour': esHour[0] if esHour else '00', # 文章内容/评论时间小时
'create_time': round(time.mktime(time.strptime(create_time, '%Y-%m-%d %H:%M:%S'))), # 数据采集时间
'topSource': '站外资源',
'sysId': 1,
'hotWord': '',
'source': platform_name,
'title': (answer_content[0:50] + '...') if len(answer_content) >= 50 else answer_content, # 回答内容缩略
'url': answer_url, # 回答链接
'content': comments_content, # 一级评论内容
'esDate': esDate, # 文章内容/评论时间日期
'createTime': round(time.mktime(time.strptime(str(comments_time), '%Y-%m-%d %H:%M:%S'))), # 评论时间
'viewed': 0, # 浏览次数/转发量
'comments': child_comment_count, # 总评论数/一级评论的回复数(二级评论)
'id': es_unique_id, # 主键
'makeSummary': (comments_content[0:200] + '...') if len(comments_content) >= 200 else comments_content, # 摘要
'parentSource': '社媒', # 电商/社媒/视频网
'dataType': '评论', # 文章/评论
'emotion': emotion, # 1.正面2.中性3.负面
'usefulCount': vote_count, # 有用数/赞同数
'create_by': author_name, # 作者/用户昵称
'parentId': parent_id # 隶属于哪条回答
}
self.es.index(index=self.index, body=body)
网友评论