美文网首页
爬虫(多线程+多进程)

爬虫(多线程+多进程)

作者: Ji_uu | 来源:发表于2017-09-14 17:39 被阅读0次

    来自崔庆才的个人博客
    ps :使用多线程时在目录切换的问题上存在问题,可以给线程加一个锁

    下载图片特别慢,解决办法是使用多进程(为什么不说多线程?因为GIL的存在导致Python的多线程有点坑!!)

    今天来做一个多进程的爬虫(其实可以做一个超简化版的分布式爬虫)

    在多进程中,进程之间是不能相互通信的。这里就出现了一个 问题,多进程怎么知道哪些需要爬取,哪些已经爬取了? 这就涉及到了队列!!比如tornado中的queue模块(更为健壮的队列,请考虑celery这一类的专用消息传递工具)

    思路:
    每一个进程需要知道哪些URL爬取过了,哪些URL需要爬取。给URL设置两种状态:
    outstanding:等待爬取的URL
    complete:爬取完成的URL
    processing:正在进行的URL

    所有初始的URL状态都是outstanding;开始爬取的时候状态改为:processing;爬取完成时的状态:complete;失败的URL重置为outstanding;我们设置一个计时参数,当超过这个参数时,状态重置为outstanding

    开整了!

    首先需要一个datatime(这个模块比内置的time模块好用一点,安装方式 pip install datetime)还有pymongo

    下面是队列代码:

    from datetime import datetime, timedelta
    from pymongo import MongoClient, errors
    
    class MogoQueue():
    
        OUTSTANDING = 1 ##初始状态
        PROCESSING = 2 ##正在下载状态
        COMPLETE = 3 ##下载完成状态
    
        def __init__(self, db, collection, timeout=300):##初始mongodb连接
            self.client = MongoClient()
            self.Client = self.client[db]
            self.db = self.Client[collection]
            self.timeout = timeout
    
        def __bool__(self):
            """
            这个函数,我的理解是如果下面的表达为真,则整个类为真
            至于有什么用,后面我会注明的(如果我的理解有误,请指点出来谢谢,我也是Python新手)
            $ne的意思是不匹配
            """
            record = self.db.find_one(
                {'status': {'$ne': self.COMPLETE}}
            )
            return True if record else False
    
        def push(self, url, title): ##这个函数用来添加新的URL进队列
            try:
                self.db.insert({'_id': url, 'status': self.OUTSTANDING, '主题': title})
                print(url, '插入队列成功')
            except errors.DuplicateKeyError as e:  ##报错则代表已经存在于队列之中了
                print(url, '已经存在于队列中了')
                pass
        def push_imgurl(self, title, url):
            try:
                self.db.insert({'_id': title, 'statue': self.OUTSTANDING, 'url': url})
                print('图片地址插入成功')
            except errors.DuplicateKeyError as e:
                print('地址已经存在了')
                pass
    
        def pop(self):
            """
            这个函数会查询队列中的所有状态为OUTSTANDING的值,
            更改状态,(query后面是查询)(update后面是更新)
            并返回_id(就是我们的URL),MongDB好使吧,^_^
            如果没有OUTSTANDING的值则调用repair()函数重置所有超时的状态为OUTSTANDING,
            $set是设置的意思,和MySQL的set语法一个意思
            """
            record = self.db.find_and_modify(
                query={'status': self.OUTSTANDING},
                update={'$set': {'status': self.PROCESSING, 'timestamp': datetime.now()}}
            )
            if record:
                return record['_id']
            else:
                self.repair()
                raise KeyError
    
        def pop_title(self, url):
            record = self.db.find_one({'_id': url})
            return record['主题']
    
        def peek(self):
            """这个函数是取出状态为 OUTSTANDING的文档并返回_id(URL)"""
            record = self.db.find_one({'status': self.OUTSTANDING})
            if record:
                return record['_id']
    
        def complete(self, url):
            """这个函数是更新已完成的URL完成"""
            self.db.update({'_id': url}, {'$set': {'status': self.COMPLETE}})
    
        def repair(self):
            """这个函数是重置状态$lt是比较"""
            record = self.db.find_and_modify(
               query={
                   'timestamp': {'$lt': datetime.now() - timedelta(seconds=self.timeout)},
                   'status': {'$ne': self.COMPLETE}
               },
                update={'$set': {'status': self.OUTSTANDING}}
            )
            if record:
                print('重置URL状态', record['_id'])
    
        def clear(self):
            """这个函数只有第一次才调用、后续不要调用、因为这是删库啊!"""
            self.db.drop()
    

    队列做好了,下面是获取所有页面的代码:

    from Download import request
    from mongodb_queue import MogoQueue
    from bs4 import BeautifulSoup
    
    
    spider_queue = MogoQueue('meinvxiezhenji', 'crawl_queue')
    def start(url):
        response = request.get(url, 3)
        Soup = BeautifulSoup(response.text, 'lxml')
        all_a = Soup.find('div', class_='all').find_all('a')
        for a in all_a:
            title = a.get_text()
            url = a['href']
            spider_queue.push(url, title)
        """上面这个调用就是把URL写入MongoDB的队列了"""
    
    if __name__ == "__main__":
        start('http://www.mzitu.com/all')
    
    """这一段儿就不解释了哦!超级简单的"""
    

    下面是 多进程+多线程 的代码:

    import os
    import time
    import threading
    import multiprocessing
    from mongodb_queue import MogoQueue
    from Download import request
    from bs4 import BeautifulSoup
    
    SLEEP_TIME = 1
    
    def mzitu_crawler(max_threads=10):
        crawl_queue = MogoQueue('meinvxiezhenji', 'crawl_queue') ##这个是我们获取URL的队列
        ##img_queue = MogoQueue('meinvxiezhenji', 'img_queue')
        def pageurl_crawler():
            while True:
                try:
                    url = crawl_queue.pop()
                    print(url)
                except KeyError:
                    print('队列没有数据')
                    break
                else:
                    img_urls = []
                    req = request.get(url, 3).text
                    title = crawl_queue.pop_title(url)
                    mkdir(title)
                    os.chdir('D:\mzitu\\' + title)
                    max_span = BeautifulSoup(req, 'lxml').find('div', class_='pagenavi').find_all('span')[-2].get_text()
                    for page in range(1, int(max_span) + 1):
                        page_url = url + '/' + str(page)
                        img_url = BeautifulSoup(request.get(page_url, 3).text, 'lxml').find('div', class_='main-image').find('img')['src']
                        img_urls.append(img_url)
                        save(img_url)
                    crawl_queue.complete(url) ##设置为完成状态
                    ##img_queue.push_imgurl(title, img_urls)
                    ##print('插入数据库成功')
    
        def save(img_url):
            name = img_url[-9:-4]
            print(u'开始保存:', img_url)
            img = request.get(img_url, 3)
            f = open(name + '.jpg', 'ab')
            f.write(img.content)
            f.close()
    
        def mkdir(path):
            path = path.strip()
            isExists = os.path.exists(os.path.join("D:\mzitu", path))
            if not isExists:
                print(u'建了一个名字叫做', path, u'的文件夹!')
                os.makedirs(os.path.join("D:\mzitu", path))
                return True
            else:
                print(u'名字叫做', path, u'的文件夹已经存在了!')
                return False
    
        threads = []
        while threads or crawl_queue:
            """
            这儿crawl_queue用上了,就是我们__bool__函数的作用,为真则代表我们MongoDB队列里面还有数据
            threads 或者 crawl_queue为真都代表我们还没下载完成,程序就会继续执行
            """
            for thread in threads:
                if not thread.is_alive(): ##is_alive是判断是否为空,不是空则在队列中删掉
                    threads.remove(thread)
            while len(threads) < max_threads or crawl_queue.peek(): ##线程池中的线程少于max_threads 或者 crawl_qeue时
                thread = threading.Thread(target=pageurl_crawler) ##创建线程
                thread.setDaemon(True) ##设置守护线程
                thread.start() ##启动线程
                threads.append(thread) ##添加进线程队列
            time.sleep(SLEEP_TIME)
    
    def process_crawler():
        process = []
        num_cpus = multiprocessing.cpu_count()
        print('将会启动进程数为:', num_cpus)
        for i in range(num_cpus):
            p = multiprocessing.Process(target=mzitu_crawler) ##创建进程
            p.start() ##启动进程
            process.append(p) ##添加进进程队列
        for p in process:
            p.join() ##等待进程队列里面的进程结束
    
    if __name__ == "__main__":
        process_crawler()
    

    一个多进程 多线程的爬虫完成了,(其实可以设置一下mongodb,然后调整一下连接配置,在多台机器上跑!!就是超级简化的分布式爬虫了,虽然简陋,,)

    测试了一下八分钟下载了100套图

    相关文章

      网友评论

          本文标题:爬虫(多线程+多进程)

          本文链接:https://www.haomeiwen.com/subject/rqbdsxtx.html