美文网首页我的Python自学之路Python爬虫人生苦短,我学Python
简单分布式爬虫——第二弹:masterSpider的实现

简单分布式爬虫——第二弹:masterSpider的实现

作者: 布利啾啾的布利多 | 来源:发表于2018-01-30 20:55 被阅读152次

上一讲简单分布式爬虫——第一弹:了解分布式爬虫结构
我们讲过,masterSpider的作用是协调各节点spider之间的工作,包括任务分发、URL管理、结果回收等,那么这一讲,我们就来逐步实现masterSpider的各项功能。Ready,go!

首先,由于我们打造的是分布式爬虫,所以需要在多台主机之间进行通信,在这里我们通过python的multiprocessing模块来实现,该模块中有一个managers的子模块支持把多进程分布到多台机器上,主从机之间通过任务队列进行联系,为了实现爬虫功能,
1、我们需要设置几个队列,分别是:任务队列task_queue、结果队列result_queue、结果处理队列conn_queue、数据存储队列store_queue。
2、将上面创建的队列(主要是任务队列和结果队列)在网络上注册,以便其他主机能够在网络上发现他们,注册后获得网络队列,相当于本地队列的映像。
3、创建一个Basemanager的实例manager,绑定端口和验证口令。
4、启动manager实例,监管信息通道。
5、通过管理实例的方法获得网络访问的Queue对象,即把网络队列实体化为可用的本地队列。
5、创建任务到“本地队列”,自动上传至网络队列,分配给网络上的其他主机进行处理。
下面实现这一过程:

# -*- coding: utf-8 -*-

def start_Manager(self, task_queue, result_queue):
        '''
        创建分布式爬虫管理器
        :param task_queue:   任务队列
        :param result_queue: result队列
        :return:             manager对象
        '''
        def _get_task():
            return task_queue
        def _get_result():
            return result_queue
        # 在网络上注册两个管理队列
        BaseManager.register('get_task_queue', callable=_get_task)
        BaseManager.register('get_result_queue', callable=_get_result)
        # 监听主机 端口,设置身份认证口令
        manager = BaseManager(address=('127.0.0.1', 10001), authkey='python'.encode('utf-8'))
        return manager

以上实现了分布式爬虫的控制管理器,接下来实现URL管理器,还是先明确一下URL管理器的任务:
URL管理器首先需要两个集合(使用集合是为了利用集合元素不重复的特性实现url自动去重):待爬取url集合和已爬取url集合。URL管理器需要完成添加新url到未爬取url集合、添加爬过的url到已爬取url集合、判断是否有未爬取url、取一个未爬取url、存储爬虫进度等。下面来实现:为方便管理,创建URL管理器类:

## UrlManager.py
# -*- coding: utf-8 -*-
try:
    import cPickle as pickle
except:
    import pickle
import hashlib
class UrlManager(object):
    '''
    url管理器:
        两个url集合(集合可以自动去重):
            未爬取url: new_urls
            已爬取url: old_urls
        七个接口:
            判断是否有待爬取url:    has_new_url()
            添加新url到集合:        add_new_url(url)、add_new_urls(urls)
            取出一个url:            get_new_url()
            获取未爬取url集合大小:  new_url_size()
            获取已爬取url集合大小:  old_url_size()
            进度保存:               save_procese(path, data)
            进度加载:               load_process(path)
    '''
    def __init__(self):
        self.new_urls = self.load_process('new_urls.txt')  # 未爬取URL集合
        self.old_urls = self.load_process('old_urls.txt')  # 已爬取URL集合
    def has_new_url(self):
        '''
        判断是否有待爬取的url
        :return: True or False
        '''
        return self.new_url_size() != 0
    def get_new_url(self):
        '''
        获取一个未爬取的url
        :return: 一个未爬取的url
        '''
        new_url = self.new_urls.pop()
        m = hashlib.md5()
        m.update(new_url.encode('utf-8'))
        # 为了减少存储量,存储已爬取的url的md5而不是url
        self.old_urls.add(m.hexdigest())
        return new_url
    def add_new_url(self, url):
        '''
        添加一个新url到未爬取集合
        :param url: 单个url
        :return:    无
        '''
        if url is None:
            return
        m = hashlib.md5()
        m.update(url.encode('utf-8'))
        url_md5 = m.hexdigest()
        if url not in self.new_urls and url_md5 not in self.old_urls:
            self.new_urls.add(url)
    def add_old_url(self, url):
        if url is None:
            return
        m = hashlib.md5()
        m.update(url.encode('utf-8'))
        url_md5 = m.hexdigest()
        if url_md5 not in self.old_urls:
            self.old_urls.add(url)
    def add_new_urls(self, urls):
        '''
        添加多个新url到未爬取的url集合
        :param urls:多个url
        :return:    无
        '''
        if urls is None or len(urls)==0:
            return
        for url in urls:
            self.add_new_url(url)
    def add_old_urls(self, urls):
        '''
        添加多个新url到未爬取的url集合
        :param urls:多个url
        :return:    无
        '''
        if urls is None or len(urls)==0:
            return
        for url in urls:
            self.add_old_url(url)
    def new_url_size(self):
        '''
        获取未爬取url集合大小
        :return: 未爬取url集合大小
        '''
        return len(self.new_urls)
    def old_url_size(self):
        '''
        获取已爬取url集合大小
        :return: 已爬取url集合大小
        '''
        return len(self.old_urls)
    def save_process(self, path, data):
        '''
        进度保存
        :param path:    保存文件路径
        :param data:    数据
        :return:
        '''
        print('[!]保存进度:%s' % path)
        with open(path, 'wb') as f:
            pickle.dump(data, f)    # 使用数据持久存储模块pickle
    def load_process(self, path):
        '''
        从本地文件加载爬虫进度
        :param path:    本地文件路径
        :return:
        '''
        print('[+]从文件加载进度:%s' % path)
        try:
            with open(path, 'rb') as f:
                data = pickle.load(f)
                return data
        except:
            print('[!]无进度文件,创建:%s' % path)
        return set()

完成上面两步之后,接下来就重点实现masterSpider的任务了:
1、url管理任务
2、结果分析
3、数据存储
每项任务我们单独开一个进程来负责。

# url管理任务
    def url_manager_process(self, task_queue, conn_queue, root_url):
        '''
        url管理器进程
        :param task_queue:   task队列
        :param conn_queue:   结果处理队列
        :param root_url:     起始url
        :return:
        '''
        url_manager = UrlManager()
        url_manager.add_new_url(root_url)
        while True:
            while url_manager.has_new_url():
                # 从url管理器获取新的url
                new_url = url_manager.get_new_url()
                # 将url分发下去
                url_queue.put(new_url)
                # 添加爬虫结束条件
                if url_manager.old_url_size() > 1000:
                    # 通知节点停止工作
                    url_queue.put('end')
                    print('控制节点发起停止命令')
                    # 关闭管理节点,存储爬虫状态
                    url_manager.save_process('new_urls.txt', url_manager.new_urls)
                    url_manager.save_process('old_urls.txt', url_manager.old_urls)
                    return

            # 添加新url到url管理器
            try:
                if not conn_queue.empty():
                    urls = conn_queue.get()
                    url_manager.add_new_urls(urls)
            except BaseException as e:
                # 休息一会儿
                time.sleep(0.1)

# 结果分析任务
    def result_process(self, result_queue, conn_queue, store_queue):
        '''
        # 结果分析进程
        :param result_queue:
        :param conn_queue:
        :param store_queue:
        :return:
        '''
        while True:
            try:
                if not result_queue.empty():
                    content = result_queue.get(True)
                    if content['new_urls'] == 'end':
                        print('结果分析进程收到结束命令')
                        store_queue.put('end')
                        return

                    conn_queue.put(content['new_urls'])
                    store_queue.put(content['data'])
                else:
                    # 休息一会儿
                    time.sleep(0.1)
            except BaseException as e:
                # 休息一会儿
                time.sleep(0.1)

# 数据存储任务
    def store_process(self, store_queue):
        '''
        # 结果存储进程
        :param store_queue:
        :return:
        '''
        output = DataOutPut()
        while True:
            if not store_queue.empty():
                data = store_queue.get()
                if data == 'end':
                    print('存储进程接到结束命令')
                    output.output_end(output.filepath)
                    return
                output.store_data(data)
            else:
                # 休息一会儿
                time.sleep(0.1)

为方便管理,我们可以将以上任务包括start_manager方法一起做成一个Manager管理类,于是就有:

## SpiderManager.py
# -*- coding: utf-8 -*-
from multiprocessing.managers import BaseManager
from multiprocessing import Process, Queue
from DataOutPut import DataOutPut
from UrlManager import UrlManager
import time

class SpiderManager(object):
    def start_Manager(self, task_queue, result_queue):
        pass #代码省略

    def url_manager_process(self, task_queue, conn_queue, root_url):
        pass #代码省略
    
    def result_process(self, result_queue, conn_queue, store_queue):
        pass #代码省略

    def store_process(self, store_queue):
        pass #代码省略

if __name__ == '__main__':
    # 初始化队列
    task_queue = Queue()
    result_queue = Queue()
    store_queue = Queue()
    conn_queue = Queue()

    # 创建分布式管理器
    node = NodeManager()
    manager = node.start_Manager(task_queue, result_queue)
    # 创建url管理进程、数据提取进程、数据存储进程
    url_manager_process = Process(target=node.url_manager_process, args=(task_queue, conn_queue, 'http://xxx',))
    result_process = Process(target=node.result_process, args=(result_queue, conn_queue, store_queue,))
    store_process = Process(target=node.store_process, args=(store_queue,))
    # 启动各个进程和分布式管理器
    url_manager_process.start()
    result_process.start()
    store_process.start()
    manager.get_server().serve_forever()

最后就是数据存储,可以存储到本地文本文件或者数据库中,这里将数据以.html格式进行存储:

## DataOutPut.py
# -*- coding: utf-8 -*-
import codecs
import time

class DataOutPut(object):
    '''
    数据存储器:
        内存存储: store_data(data)
        文件存储: output_data(path)
    '''

    def __init__(self):
        self.filepath = 'baike_%s.html' % (time.strftime('%Y_%m_%d_%H_%M_%S', time.localtime()))
        self.output_head(self.filepath)
        self.datas = []

    def store_data(self, data):
        if data is None:
            return
        self.datas.append(data)
        if len(self.datas)>10:
            self.output_html(self.filepath)

    def output_head(self, path):
        '''
        将HTML头写进去
        :return:
        '''
        fout=codecs.open(path,'w',encoding='utf-8')
        fout.write("<html>")
        fout.write("<body>")
        fout.write("<table>")
        fout.close()

    def output_html(self,path):
        '''
        将数据写入HTML文件中
        :param path: 文件路径
        :return:
        '''
        fout=codecs.open(path,'a',encoding='utf-8')
        for data in self.datas:
            fout.write("<tr>")
            fout.write("<td>%s</td>"%data['url'])
            fout.write("<td>%s</td>"%data['title'])
            fout.write("<td>%s</td>"%data['summary'])
            fout.write("</tr>")
        self.datas=[]
        fout.close()

    def output_end(self,path):
        '''
        输出HTML结束
        :param path: 文件存储路径
        :return:
        '''
        fout=codecs.open(path,'a',encoding='utf-8')
        fout.write("</table>")
        fout.write("</body>")
        fout.write("</html>")
        fout.close()

以上就是分布式爬虫——第二弹:masterSpider的实现的全部内容。
下一讲:分布式节点爬虫的实现

参考资料:《Python爬虫开发与项目实战》

相关文章

网友评论

    本文标题:简单分布式爬虫——第二弹:masterSpider的实现

    本文链接:https://www.haomeiwen.com/subject/dlqhzxtx.html