from twisted.internet import defer
from twisted.internet import reactor
from twisted.web.client import getPage
from queue import Queue
class Commend(object):
def run(self):
crawler_process = CrawlerProcess()
spider_cls_path_list = ['twisted_demo.chouti.ChoutiSpider']
for spider_cls_path in spider_cls_path_list:
crawler_process.crawl(spider_cls_path)
crawler_process.start()
class CrawlerProcess(object):
def __init__(self):
self._active = set()
def crawl(self, spider_cls_path):
crawler = Crawler()
d = crawler.crawl(spider_cls_path)
self._active.add(d)
def start(self):
dd = defer.DeferredList(self._active)
dd.addBoth(lambda _: reactor.stop())
reactor.run()
class Crawler(object):
@defer.inlineCallbacks
def crawl(self, spider_cls_path):
engine = self._create_engine()
spider = self._create_spider(spider_cls_path)
start_requests = iter(spider.start_requests())
yield engine.open_spider(start_requests)
yield engine.start()
def _create_engine(self):
return ExecutionEngine()
def _create_spider(self, spider_cls_path):
model_path, cls_name = spider_cls_path.rsplit('.', maxsplit=1)
import importlib
m = importlib.import_module(model_path)
cls = getattr(m, cls_name)
return cls()
class ExecutionEngine(object):
def __init__(self):
self.scheduler = None
self.max = 5
self.crawling = []
self._close = None
@defer.inlineCallbacks
def open_spider(self, start_requests):
self.scheduler = Scheduler()
yield self.scheduler.open()
while True:
try:
req = next(start_requests)
except StopIteration:
break
self.scheduler.enqueue_request(req)
reactor.callLater(0, self._next_request())
@defer.inlineCallbacks
def start(self):
self._close = defer.Deferred()
yield self._close
def _next_request(self):
if self.scheduler.size == 0 and len(self.crawling) == 0:
self._close.callback(None)
return
while len(self.crawling) < 5:
req = self.scheduler.next_request()
if not req:
return
self.crawling.append(req)
print('scheduler.size: %s crawling: %s' % (self.scheduler.size(), len(self.crawling)))
d = getPage(req.url.encode())
d.addCallback(self.get_response_callback, req)
d.addCallback(lambda _: reactor.callLater(0, self._next_request))
def get_response_callback(self, content, request):
self.crawling.remove(request)
response = HttpResponse(content, request)
result = request.callback(response)
import types
if isinstance(result, types.GeneratorType):
for req in iter(result):
self.scheduler.enqueue_request(req)
class Scheduler(object):
def __init__(self):
self.q = Queue()
def open(self):
pass
def enqueue_request(self, request):
self.q.put(request)
def next_request(self):
try:
req = self.q.get(block=False)
except Exception as e:
req = None
return req
def size(self):
return self.q.qsize()
class Request(object):
def __init__(self, url, callback):
self.callback = callback
self.url = url
class HttpResponse(object):
def __init__(self, content, request):
self.content = content
self.request = request
self.url = self.request.url
self.text = self.content.decode()
if __name__ == '__main__':
Commend().run()
网友评论