前言
最近在用pyspider实现项目的爬虫系统,鉴于我们需要自定义的爬虫结果。所有需要重写pyspider自身的ResultWorker。MyResultWorker是对其的on_result(xxx)进行的重写。
自定义ResultWoker,重写on_result方法:
模块:CustomResultWorker.py
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# author: hzliwenhao
# Created on 2017-11-12
import mysql.connector
import time
import logging
from mysql.connector import errorcode
from pyspider.result import ResultWorker
logger = logging.getLogger("MyResultWorker")
class MyResultWorker(ResultWorker):
def on_result(self, task, result):
"""override"""
# assert task['taskid']
# assert task['project']
# assert task['url']
# assert result
# 插入结果,部分数据是硬编码的,方便测试
if MySQLDB().insert(app_id=1, instance_name='test_name', url=task['url']):
logger.info('result %s:%s %s -> %.30r' % (
task['project'], task['taskid'], task['url'], result))
class MySQLDB:
username = 'appops'
password = 'root123'
database = 'resultdb2'
host = '127.0.0.1'
port = 13308
connection = None
is_connect = True
placeholder = '%s'
def __init__(self):
"""init db"""
if self.is_connect:
self.connect()
@staticmethod
def escape(string):
return '`%s`' % string
def connect(self):
connect_kvs = {
'user': self.username,
'password': self.password,
'host': self.host,
'port': self.port,
'database': self.database
}
try:
cnx = mysql.connector.connect(**connect_kvs)
self.connection = cnx
return True
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("The credentials you provided are not correct.")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print("The database you provided does not exist.")
else:
print("Something went wrong: ", err)
return False
def insert(self, **values):
"""insert data"""
if self.connection is None:
print("Please connect first")
return False
cursor = self.connection.cursor()
sql = "insert into result_info " \
"(app_id, instance_name, url, insert_time) " \
"values " \
"(%s,%s,%s,%s)"
# values
app_id = values['app_id']
instance_name = values['instance_name']
url = values['url']
insert_time = int(round(time.time() * 1000))
try:
cursor.execute(sql, (app_id, instance_name, url, insert_time))
self.connection.commit()
return True
except mysql.connector.Error as err:
print("An error occurred: {}".format(err))
return False
if __name__ == '__main__':
import sys
reload(sys)
sys.setdefaultencoding('gbk')
db = MySQLDB()
db.insert(app_id=123, instance_name='test_name', url='url')
使用方法
方法1 - 命令行指定
pyspider result_worker --result-cls=CustomResultWorker.MyResultWorker // 模块名.类名
方法2
pyspider -c config.json result_worker
- -c config.json 配置文件指定
{
"taskdb":"mysql+taskdb://appops:root123@localhost:3306/taskdb", // mysql
"projectdb":"mysql+projectdb://appops:root123@localhost:3306/projectdb", // mysql
"resultdb":"mysql+resultdb://appops:root123@localhost:3306/resultdb", // mysql
"message_queue": "redis://localhost:6379/0", // redis
"webui":{ // 界面账户
"username": "appops",
"password": "root123",
"need-auth": true
},
"scheduler":{
"delete-time": 3600 // 删除项目的时间
},
"result_worker":{
"result_cls": "CustomResultWorker.MyResultWorker"
}
}
网友评论