import json
# import gevent
# from gevent.queue import Queue
# from gevent import monkey
from util.mysql_conn import MysqlConn
from setting import logger_root, MYSQL_NEW_YQ_CONFIG, YQ_ES
import os
import requests
import signal
import time
from elasticsearch import Elasticsearch
# monkey.patch_all()
class Task:
def __init__(self):
self.last_id = 0
self.terminated_flag = False
if os.path.exists("./data/table1_last_id.txt"):
with open("./data/table1_last_id.txt", "r", encoding="utf-8", newline="\n") as f:
self.last_id = int(f.readline().strip())
try:
self.mysql_client = MysqlConn(logger_root, MYSQL_NEW_YQ_CONFIG)
except Exception as e:
logger_root.error("mysql connect is error {}".format(e))
os._exit(-1)
def start(self):
signal.signal(signal.SIGTERM, self._term_handler)
signal.signal(signal.SIGINT, self._term_handler)
# 查询数据库
try:
while not self.terminated_flag:
data_list = self.query_mysql()
if data_list is None:
logger_root.error("mysql select error")
time.sleep(3)
continue
if not data_list:
logger_root.warning("mysql select get list is []")
time.sleep(3)
continue
for data in data_list:
if self.terminated_flag:
return
self.last_id = data["id"]
comp_name = data["comp_names"]
type = data["type"]
full = 0
if int(type) == 2 or int(type) == 3:
full = 1
eid_dict = self.get_eid(comp_name)
eid = ""
if eid_dict:
eid = eid_dict["eid"]
logger_root.info("get comp_name {} type {} eid{}".format(comp_name, type, eid))
# 存储数据库
self.write_mysql(comp_name,full,eid)
except Exception as e:
self.terminated_flag = True
self.save_file()
def write_mysql(self, comp_name,full,eid):
sql_str = "select id from yq_annotated_company_keywords where comp_name = %s"
sql_params = (comp_name,)
res = self.mysql_client.execute_query(sql_str,sql_params)
if res is None:
logger_root.error("insert mysql error with sql:{}".format(sql_str%sql_params))
return
if res:
logger_root.info("data at mysql already exists,continue insert")
return
sql_str = "insert into yq_annotated_company_keywords(comp_name,full,eid) values(%s,%s,%s)"
sql_params = (comp_name,full,eid)
res, err = self.mysql_client.execute_write(sql_str=sql_str,sql_params=sql_params)
if res is None or not res:
logger_root.error("insert mysql error with sql:{}".format(sql_str%sql_params))
return
logger_root.info("insert mysql success with comp_name:{}".format(comp_name))
return
def query_mysql(self):
sql_str = "select id,comp_names,type from names_type where id > %s limit 30"
sql_params = (self.last_id,)
data_list = self.mysql_client.execute_query(sql_str, sql_params)
return data_list
def get_eid(self, comp_name):
dict1 = json.dumps({"name": comp_name}, ensure_ascii=False).encode("utf-8")
res = requests.post("http://d6064.intsig.net/yq/tool/fetch_eid", data=dict1)
if res.status_code != 200:
logger_root.warning("eid get status {}".format(res.status_code))
return
return json.loads(res.text)
def _term_handler(self, signal_num, frame):
logger_root.info("job stop with {}".format(signal_num))
self.terminated_flag = True
self.save_file()
def save_file(self):
with open("./data/table1_last_id.txt", "w", encoding="utf-8", newline="\n") as f:
f.write(str(self.last_id))
if __name__ == "__main__":
task = Task()
task.start()
网友评论