美文网首页
python网络爬虫:多任务-进程、线程

python网络爬虫:多任务-进程、线程

作者: changzj | 来源:发表于2018-12-30 18:09 被阅读0次

    一 、实现多任务的方式

    多线程
    多进程
    协程
    多线程+多进程

    并行,并发

    并行:同时发起同时执行,(4核,4个任务)
    并发:同时发起,单个执行
    在python语言中,并不能真正意义上实现多线程,
    因为cpython解释器有一个全局GIL解释器锁,来保证同一时刻只有一个线程在执行

    线程

    线程:是cpu执行的基本单元,占用资源少,并且线程和线程间的资源是共享的,线程依赖进程存在的,
    多线程一般适用于I/O密集型操作,线程的执行时无序的

    • 线程的创建使用
    from threading import Thread
    import threading,time
    
    data = []
    
    def download_image(url,num):
        '''
        下载图片
        :param url:
        :param num:
        :return:
        '''
        global data
        time.sleep(2)
        print(url, num)
        data.append(num)
    def read_data():
        global data
        for i in data:
            print(i)
    if __name__ == '__main__':
    
        # 获取当前线程的名称 threading.currentThread().name
        print('主线程开启',threading.currentThread().name)
        # 创建一个子线程
        '''
        target=None,  线程要执行的目标函数
        name=None, 创建线程时,指定线程的名称
        args=(), 为目标函数传参数,(tuple元组类型)
        
        '''
    
        thread_sub1 = Thread(target=download_image,name='下载线程',args=('http://d.hiphotos.baidu.com/image/pic/item/9825bc315c6034a84d0cf125c6134954082376a3.jpg',1))
        thread_sub2 = Thread(target=read_data,name='读取线程',)
        # 是否开启守护进程(在开启线程之前设置)
        # daemon:Flase,在主线程结束时,会检测子线程任务是否结束,
        # 如果子线程中任务没有结束,则会让子线程正常结束任务
        # daemon:True ,在主线程结束时,会检测子线程任务是否结束,
        # 如果子线程任务没有结束,则会让子线程跟随主线程一起结束
        thread_sub1.daemon = True
        # 启动线程
        thread_sub1.start()
    
        # join():阻塞,等待子线程中的人物执行完毕后,再回到主线程中继续执行
        thread_sub1.join()
        # 开启线程
        thread_sub2.start()
        thread_sub2.join()
        print('主线程结束', threading.currentThread().name)
    

    队列

    Queue(队列对象) Queue是python中的标准库,可以直接import Queue引用;

    队列是线程间最常用的交换数据的形式

    python下多线程的思考

    对于资源,加锁是个重要的环节。因为python原生的list,dict等,都是not thread safe的。而Queue,是线程安全的,因此在满足使用条件下,建议使用队列

    1.初始化: class (FIFO 先进先出)

    Queue.Queue(maxsize)
    2.包中的常用方法:

    Queue.qsize() 返回队列的大小

    Queue.empty() 如果队列为空,返回True,反之False

    Queue.full() 如果队列满了,返回True,反之False

    Queue.full 与 maxsize 大小对应

    Queue.get(block,timeout)获取队列,timeout等待时间

    创建一个“队列”对象
    import Queue
    
    
     maxsize:指定队列中能够存储的最大数据量
     dataqueue = queue.Queue(maxsize=40)
    
     for i in range(0,40):
         if not dataqueue.full():
            dataqueue.put(i)
    # 判断队列是否为空
    isempty = dataqueue.empty()
    print(isempty)
    
    判断队列是否存满了
    isfull = dataqueue.full()
    print(isfull)
    
    返回队列大小
    size = dataqueue.qsize()
    print(size)
    
     FIFO(先进的先出)
    print(dataqueue.get())
    
    # 注意:队列是线程之间常用的数据交换形式,因为队列在线程间,是线程安全的
    

    线程池爬虫

    from concurrent.futures import ThreadPoolExecutor
    import requests,threading
    from lxml.html import etree
    # 线程池的目的:创建一个线程池,里面有指定数量的线程,让线程执行任务
    
    def down_load_data(page):
        print(page)
        print('正在下载第' + str(page) + '页', threading.currentThread().name)
        full_url = 'http://blog.jobbole.com/all-posts/page/%s/' % str(page)
        req_header = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.26 Safari/537.36 Core/1.63.6788.400 QQBrowser/10.3.2864.400'
        }
        response = requests.get(full_url, headers=req_header)
    
        if response.status_code == 200:
            # 将获取的页面源码存到dataqueue队列中
            print('请求成功')
            return response.text,response.status_code
    def download_done(futures):
        print(futures.result())
        # 可以在这里做数据解析
        html = futures.result()[0]
        html_element = etree.HTML(html)
        articles = html_element.xpath('//div[@class="post floated-thumb"]')
        for article in articles:
            articleInfo = {}
            # 标题
            articleInfo['title'] = article.xpath('.//a[@class="archive-title"]/text()')[0]
            # 封面
            img_element = article.xpath('.//div[@class="post-thumb"]/a/img')
            if len(img_element) > 0:
                articleInfo['coverImage'] = img_element[0].xpath('./@src')[0]
            else:
                articleInfo['coverImage'] = '暂无图片'
            p_as = article.xpath('.//div[@class="post-meta"]/p[1]//a')
            if len(p_as) > 2:
                # tag类型
                articleInfo['tag'] = p_as[1].xpath('./text()')[0]
                # 评论量
                articleInfo['commentNum'] = p_as[2].xpath('./text()')[0]
            else:
                # tag类型
                articleInfo['tag'] = p_as[1].xpath('./text()')[0]
                # 评论量
                articleInfo['commentNum'] = '0'
            # 简介
            articleInfo['content'] = article.xpath('.//span[@class="excerpt"]/p/text()')[0]
            # 时间
            articleInfo['publishTime'] = ''.join(article.xpath('.//div[@class="post-meta"]/p[1]/text()')).replace('\n',
                                                                                                                  '').replace(
                ' ', '').replace('\r', '').replace('·', '')
            print(articleInfo)
    if __name__ == '__main__':
    
        # 创建线程池
        # max_workers:指定线程池中的线程数量
        pool = ThreadPoolExecutor(max_workers=10)
    
        for i in range(1, 201):
            # 往线程池中添加任务
            handler=pool.submit(down_load_data,i)
            # 设置回调方法
            handler.add_done_callback(download_done)
        # 内部实质是执行了join()方法
        pool.shutdown()
    

    进程

    • 进程:是操作系统进行资源分配的基本单元,进程的执行也是无序的,每一个进程都有自己储存空间,进程之间的资源不共享
    from multiprocessing import Process,Queue
    import os
    # maxsize=-1 :设置队列中能够存储最大元素的个数
    data_queue = Queue(maxsize=10,)
    def write_data(num,data_queue):
    
        print(num)
        for i in range(0,num):
            data_queue.put(i)
        print(os.getpid(),data_queue.full())
    
    
    def read_data(data_queue):
        print('正在读取',os.getpid())
        print(data_queue.qsize())
        for i in range(0,data_queue.qsize()):
            print(data_queue.get())
    
    
    
    if __name__ == '__main__':
        # os.getpid() 获取进程id
        print('主进程开始',os.getpid())
        # 创建子进程
        '''
        target=None,设置进程要执行的函数
        name=None, 设置进程名称
        args=(),给进程执行的函数传递参数(tuple类型)
        kwargs={},给进程执行的函数传递参数(字典类型)
        '''
        process1 = Process(target=write_data,args=(10,data_queue))
        # 使用start()启动进程
        process1.start()
        # timeout=5:设置阻塞时间
        process1.join(timeout=5)
    
        process2 = Process(target=read_data,args=(data_queue,))
        # 使用start()启动进程
        process2.start()
        process2.join()
        print('主进程结束', os.getpid())
    
    '''
    1.创建任务队列
    2.创建爬取进程,执行爬取任务
    3.创建数据队列
    4.创建解析线程,解析获取数据
    
    案例:世纪佳缘
    武汉 url
    http://date.jiayuan.com/eventslist_new.php?page=1&city_id=4201&shop_id=33 (第一页是静态页面)
    http://date.jiayuan.com/eventslist_new.php?page=2&city_id=4201&shop_id=33 (第二页动态加载)
    http://date.jiayuan.com/eventslist_new.php?page=3&city_id=4201&shop_id=33
    accessID=20181222135209518625;
     SESSION_HASH=8f0e4c42c7aa889792b3d1a9229a6ac6ed4b3296;
     user_access=1; PHPSESSID=4efcd71c786c42502ee0b39fcdc0e601; plat=date_pc; DATE_FROM=daohang; DATE_SHOW_LOC=4201; DATE_SHOW_SHOP=33; uv_flag=124.200.191.230
    上海 url
    http://date.jiayuan.com/eventslist_new.php?page=2&city_id=31&shop_id=15
    accessID=20181222135209518625;
     SESSION_HASH=8f0e4c42c7aa889792b3d1a9229a6ac6ed4b3296;
     user_access=1; PHPSESSID=4efcd71c786c42502ee0b39fcdc0e601; plat=date_pc; DATE_FROM=daohang; DATE_SHOW_LOC=31; uv_flag=124.205.158.242; DATE_SHOW_SHOP=15
    '''
    from multiprocessing import Process, Queue
    import requests, re, json
    from lxml.html import etree
    import time
    
    
    def down_load_page_data(taskqueue, dataqueue):
        '''
        2 执行任务下载
        :param taskqueue:
        :param dataqueue:
        :return:
        '''
        sumTime = 0
        while True:
            if not taskqueue.empty():
                sumTime = 0
                url = taskqueue.get()
                response, cur_page = download_page_data(url)
                data_dict = {'data': response.text, 'page': cur_page}
                dataqueue.put(data_dict)
    
                # 获取下一页
                if cur_page != 1:
                    print('===',cur_page)
                    if isinstance(response.json(), list):
    
                        next_page = cur_page + 1
                        # re.compile('page=\d+')
                        next_url = re.sub('page=\d+', 'page=' + str(next_page), url)
                        taskqueue.put(next_url)
                    else:
                        print('已获取到' + str(cur_page) + '页', '没有数据了', response.json())
                        pass
    
                elif cur_page == 1:
                    next_page = cur_page + 1
                    next_url = re.sub('page=\d+', 'page=' + str(next_page), url)
                    taskqueue.put(next_url)
            else:
                #数据队列中没有任务了
                time.sleep(0.001)
                sumTime = sumTime + 1
                if sumTime > 5000:
                    print('跳出循环')
                    break
    
    def download_page_data(url):
        '''
         3 下载每一个分页数据
        :param url: 每一个分页url地址
        :return:
        '''
        # http://date.jiayuan.com/eventslist_new.php?page=2&city_id=4201&shop_id=33
        pattern = re.compile('.*?page=(\d+)&city_id=(\d+)&shop_id=(\d+)')
        result = re.findall(pattern, url)[0]
        # 当前页
        cur_page = result[0]
        DATE_SHOW_LOC = result[1]
        DATE_SHOW_SHOP = result[2]
        print(cur_page, DATE_SHOW_SHOP, DATE_SHOW_LOC)
        cookie = '''
               accessID=20181222135209518625; SESSION_HASH=8f0e4c42c7aa889792b3d1a9229a6ac6ed4b3296; user_access=1; PHPSESSID=4efcd71c786c42502ee0b39fcdc0e601; plat=date_pc; DATE_FROM=daohang; uv_flag=124.200.191.230; DATE_SHOW_LOC=%s; DATE_SHOW_SHOP=%s
        ''' %(DATE_SHOW_LOC,DATE_SHOW_SHOP)
        # print(cookie)
    
        req_header = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.26 Safari/537.36 Core/1.63.6788.400 QQBrowser/10.3.2864.400',
            'Cookie': cookie,
            'Referer': 'http://date.jiayuan.com/eventslist.php',
        }
        # cookie_dict = {sub_str.split('=')[0]: sub_str.split('=')[1] for sub_str in cookie.split('; ')}
        response = requests.get(url, headers=req_header)
        # print(cookie_dict)
        if response.status_code == 200:
            print('第' + cur_page + '页获取成功', DATE_SHOW_LOC, DATE_SHOW_SHOP)
            return response, int(cur_page)
    
    
    def parse_page_data(dataqueue):
        '''
        解析进程解析数据
        :param dataqueue:
        :return:
        '''
        while not dataqueue.empty():
            data = dataqueue.get()
            page = data['page']
            html = data['data']
            if page == 1:
                print('解析第一页数据,静态页面')
                html_element = etree.HTML(html)
                hot_active = html_element.xpath('//div[@class="hot_detail fn-clear"]')
                for hot_div in hot_active:
                    # 活动详情的url地址
                    full_detail_url = 'http://date.jiayuan.com' + hot_div.xpath('.//h2[@class="hot_title"]/a/@href')[0]
                    response = download_detail_data(full_detail_url)
                    parse_detail_data(response)
                more_active = html_element.xpath('//ul[@class="review_detail fn-clear t-activiUl"]/li')
                for more_li in more_active:
                    # 活动详情的url地址
                    full_detail_url = 'http://date.jiayuan.com' + more_li.xpath('.//a[@class="review_link"]/@href')[0]
                    response = download_detail_data(full_detail_url)
                    parse_detail_data(response)
            else:
                print('解析第' + str(page) + '数据', '非静态页面')
                # 使用json.loads() 将json字符串转换为python数据类型
                json_obj = json.loads(html)
                if isinstance(json_obj,list):
                    # 是列表说明得到正确的数据
                    print('正在解析数据')
                    for sub_dict in json_obj:
                        id = sub_dict['id']
                        full_detail_url = 'http://date.jiayuan.com/activityreviewdetail.php?id=%s' % id
                        response = download_detail_data(full_detail_url)
                        parse_detail_data(response)
    
    def download_detail_data(url):
        req_header = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.26 Safari/537.36 Core/1.63.6788.400 QQBrowser/10.3.2864.400',
            'Cookie': 'accessID=20181222135209518625; SESSION_HASH=8f0e4c42c7aa889792b3d1a9229a6ac6ed4b3296; user_access=1; PHPSESSID=4efcd71c786c42502ee0b39fcdc0e601; plat=date_pc; DATE_FROM=daohang; uv_flag=124.200.191.230; DATE_SHOW_LOC=31; DATE_SHOW_SHOP=15',
            'Referer': 'http://date.jiayuan.com/eventslist.php',
        }
        # cookie_dict = {sub_str.split('=')[0]: sub_str.split('=')[1] for sub_str in cookie.split('; ')}
        response = requests.get(url, headers=req_header)
        # print(cookie_dict)
        if response.status_code == 200:
            print('详情页面获取成功', response.url)
            return response
    
    
    def parse_detail_data(response):
        '''
        解析活动详情
        :param response:
        :return:
        '''
        html_element = etree.HTML(response.text)
    
        # 创建一个字典,存放获取的数据
        item = {}
        # 活动标题
        item['title'] = ''.join(html_element.xpath('//h1[@class="detail_title"]/text()')[0])
        # 活动时间
        item['time'] = ','.join(
            html_element.xpath('//div[@class="detail_right fn-left"]/ul[@class="detail_info"]/li[1]//text()')[0])
        # 活动地址
        item['adress'] = html_element.xpath('//ul[@class="detail_info"]/li[2]/text()')[0]
        # 参加人数
        item['joinnum'] = html_element.xpath('//ul[@class="detail_info"]/li[3]/span[1]/text()')[0]
        # 预约人数
        item['yuyue'] = html_element.xpath('//ul[@class="detail_info"]/li[3]/span[2]/text()')[0]
        # 介绍
        item['intreduces'] = \
            html_element.xpath('//div[@class="detail_act fn-clear"][1]//p[@class="info_word"]/span[1]/text()')[0]
        # 提示
        item['point'] = html_element.xpath('//div[@class="detail_act fn-clear"][2]//p[@class="info_word"]/text()')[0]
        # 体验店介绍
        item['introductionStore'] = ''.join(
            html_element.xpath('//div[@class="detail_act fn-clear"][3]//p[@class="info_word"]/text()'))
        # 图片连接
        item['coverImage'] = html_element.xpath('//div[@class="detail_left fn-left"]/img/@data-original')[0]
    
        print(item)
    
        with open('sjjy.json', 'a+', encoding='utf-8') as file:
            json_str = json.dumps(item,ensure_ascii=False)+'\n'
            file.write(json_str)
    
    if __name__ == '__main__':
        # 第一步 创建任务队列
        taskqueue = Queue()
    
        # 2 设置起始任务
        taskqueue.put('http://date.jiayuan.com/eventslist_new.php?page=1&city_id=4201&shop_id=33')
        taskqueue.put('http://date.jiayuan.com/eventslist_new.php?page=1&city_id=31&shop_id=15')
        taskqueue.put('http://date.jiayuan.com/eventslist_new.php?page=1&city_id=3702&shop_id=42')
        taskqueue.put('http://date.jiayuan.com/eventslist_new.php?page=1&city_id=50&shop_id=5')
    
        # 4 创建数据队列
        dataqueue = Queue()
        # 3 创建进程爬取任务
    
        for i in range(0, 4):
            process_crawl = Process(
                target=down_load_page_data,
                args=(taskqueue, dataqueue)
            )
            process_crawl.start()
        time.sleep(10)
    
        # 创建解析进程
        for i in range(0, 4):
            process_parse = Process(
                target=parse_page_data,
                args=(dataqueue,)
            )
            process_parse.start()
    

    进程池

    # from concurrent.futures import ProcessPoolExecutor
    # import time,os
    # def download_data(page):
    #     print(page,os.getpid())
    #     # time.sleep(1)
    #     return '请求成功'+str(page),page
    #
    # # 进程执行完一个任务后的回调函数
    # def download_done(futures):
    #     result =futures.result()
    #     print(result)
    #
    #     next_page = int(result[1])+1
    #     handler=pool.submit(download_data,next_page)
    #     handler.add_done_callback(download_done)
    # if __name__ == '__main__':
    #     # 1 创建进程池
    #     # max_workers= : 设置进程池中的进程数量
    #     pool = ProcessPoolExecutor(4)
    #
    #     for i in range(0,200):
    #         '''
    #         fn, 执行函数
    #         *args, 传递参数
    #         '''
    #         handler = pool.submit(download_data,i)
    #         # 回调函数的设置,,看自己是否需要
    #         handler.add_done_callback(download_done)
    #     # cannot schedule new futures after shutdown
    #     # pool.shutdown()
    
    # 方式二
    from multiprocessing import Pool
    import os
    def download_data(page):
        print(page,os.getpid())
        # time.sleep(1)
        return '请求成功'+str(page),page
    def done(futures):
        print(futures)
    
    if __name__ == '__main__':
        # 创建进程池
        pool = Pool(4)
    
        for page in range(0,200):
            # pool.apply_async() 异步非阻塞
            # pool.apply() 同步的方式添加任务
            # func : 要执行的方法
            # args=() ,给函数传递的参数
            #callback=None, 成功回调
            #error_callback=None  错误回调
            pool.apply_async(download_data,args=(page,),callback=done)
        # 执行close 后不可以在添加任务了
        pool.close()
        pool.join()
    

    进程池爬虫

    from concurrent.futures import ProcessPoolExecutor
    import requests, re, time, json
    from lxml.html import etree
    
    
    def down_load_page_data(url):
        """
        执行任务的下载
        :param url
        :return:
        """
        response, cur_page = download_page_data(url)
        data_dict = {'data': response.text, 'page': cur_page}
    
        # 获取下一页
        if cur_page != 1:
            print('====', cur_page)
            if isinstance(response.json(), list):
                next_page = cur_page + 1
                next_url = re.sub('page=\d+', 'page=' + str(next_page), url)
    
            else:
                print('已获取到' + str(cur_page) + '页', '没有数据了', response.json())
                next_url = None
                pass
        elif cur_page == 1:
            next_page = cur_page + 1
            next_url = re.sub('page=\d+', 'page=' + str(next_page), url)
    
        return data_dict, next_url
    
    
    def download_page_data(url):
        """
        下载每一个分页的数据
        :param url: 每一个分页的url地址
        :return:
        """
        # http://date.jiayuan.com/eventslist_new.php?
        # page=1&city_id=4201&shop_id=33
        pattern = re.compile('.*?page=(\d+)&city_id=(\d+)&shop_id=(\d+)')
        result = re.findall(pattern, url)[0]
        cur_page = result[0]
        DATE_SHOW_LOC = result[1]
        DATE_SHOW_SHOP = result[2]
        print(cur_page, DATE_SHOW_SHOP, DATE_SHOW_LOC)
        cookie = """_gscu_1380850711=43812116hs5dyy11; accessID=20181222071935501079; jy_refer=www.baidu.com; _gscbrs_1380850711=1; PHPSESSID=9202a7e752f801a49a5747832520f1da; plat=date_pc; DATE_FROM=daohang; SESSION_HASH=61e963462c6b312ee1ffacf151ffaa028477217d; user_access=1; uv_flag=124.64.18.38; DATE_SHOW_LOC=%s; DATE_SHOW_SHOP=%s""" % (
        DATE_SHOW_LOC, DATE_SHOW_SHOP)
        # print(cookie)
    
        req_header = {
            'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36',
            'Cookie': cookie,
            'Referer': 'http://date.jiayuan.com/eventslist.php',
        }
        # cookie_dict = {sub_str.split('=')[0]:sub_str.split('=')[1] for sub_str in cookie.split('; ')}
        # print(cookie_dict)
        # cookies(cookiejar object or dict)
        response = requests.get(url, headers=req_header)
    
        if response.status_code == 200:
            print('第' + cur_page + '页获取成功', DATE_SHOW_SHOP, DATE_SHOW_LOC)
            return response, int(cur_page)
    
    
    def parse_page_data(futures):
        '''
        1.获取到下一页url地址,继续往进程池中添加任务
        2.获取到分页的页面源码进行数据解析
        :param futures:
        :return:
        '''
        result = futures.result()
        data = result[0]
        next_page_url = result[1]
        if next_page_url:
            print('正在添加任务', next_page_url)
            handler = page_pool.submit(down_load_page_data, next_page_url)
            handler.add_done_callback(parse_page_data)
        page = data['page']
        html = data['data']
        # 创建进程池(获取活动详情的页面源码)
        detail_pool = ProcessPoolExecutor(2)
    
        if page == 1:
            print('解析第一页数据,静态页面')
            html_element = etree.HTML(html)
            hot_active = html_element.xpath('//div[@class="hot_detail fn-clear"]')
            for hot_div in hot_active:
                # 活动详情的url地址
                full_detail_url = 'http://date.jiayuan.com' + hot_div.xpath('.//h2[@class="hot_title"]/a/@href')[0]
                handler = detail_pool.submit(download_detail_data, full_detail_url)
                handler.add_done_callback(parse_detail_data)
            more_active = html_element.xpath('//ul[@class="review_detail fn-clear t-activiUl"]/li')
            for more_li in more_active:
                # 活动详情的url地址
                full_detail_url = 'http://date.jiayuan.com' + more_li.xpath('.//a[@class="review_link"]/@href')[0]
                handler = detail_pool.submit(download_detail_data, full_detail_url)
                handler.add_done_callback(parse_detail_data)
        else:
            print('解析第' + str(page) + '数据', '非静态页面')
            # 使用json.loads()将json字符串转换为python数据类型
            json_obj = json.loads(html)
            if isinstance(json_obj, list):
                # 是列表,说明得到的是正确的数据,
                print('正在解析数据')
                for sub_dict in json_obj:
                    id = sub_dict['id']
                    # http://date.jiayuan.com/activityreviewdetail.php?id=11706
                    full_detail_url = 'http://date.jiayuan.com/activityreviewdetail.php?id=%s' % id
                    handler = detail_pool.submit(download_detail_data, full_detail_url)
                    handler.add_done_callback(parse_detail_data)
        detail_pool.shutdown()
    
    
    def download_detail_data(url):
        """
        根据活动详情的url地址发起请求
        :param url:
        :return:
        """
        req_header = {
            'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36',
            'Cookie': '_gscu_1380850711=43812116hs5dyy11; accessID=20181222071935501079; jy_refer=www.baidu.com; _gscbrs_1380850711=1; PHPSESSID=9202a7e752f801a49a5747832520f1da; plat=date_pc; DATE_FROM=daohang; SESSION_HASH=61e963462c6b312ee1ffacf151ffaa028477217d; user_access=1; uv_flag=124.64.18.38; DATE_SHOW_LOC=50; DATE_SHOW_SHOP=5',
            'Referer': 'http://date.jiayuan.com/eventslist.php',
        }
        response = requests.get(url, headers=req_header)
    
        if response.status_code == 200:
            print('详情页面获取成功', response.url)
            return response
    
    
    def parse_detail_data(futures):
        """
        解析活动详情
        :param response:
        :return:
        """
        response = futures.result()
        html_element = etree.HTML(response.text)
        # 创建一个字典,存放获取的数据
        item = {}
        # 活动标题
        item['title'] = ''.join(html_element.xpath('//h1[@class="detail_title"]/text()')[0])
        # 活动时间
        item['time'] = ','.join(
            html_element.xpath('//div[@class="detail_right fn-left"]/ul[@class="detail_info"]/li[1]//text()')[0])
        # 活动地址
        item['adress'] = html_element.xpath('//ul[@class="detail_info"]/li[2]/text()')[0]
        # 参加人数
        item['joinnum'] = html_element.xpath('//ul[@class="detail_info"]/li[3]/span[1]/text()')[0]
        # 预约人数
        item['yuyue'] = html_element.xpath('//ul[@class="detail_info"]/li[3]/span[2]/text()')[0]
        # 介绍
        item['intreduces'] = \
        html_element.xpath('//div[@class="detail_act fn-clear"][1]//p[@class="info_word"]/span[1]/text()')[0]
        # 提示
        item['point'] = html_element.xpath('//div[@class="detail_act fn-clear"][2]//p[@class="info_word"]/text()')[0]
        # 体验店介绍
        item['introductionStore'] = ''.join(
            html_element.xpath('//div[@class="detail_act fn-clear"][3]//p[@class="info_word"]/text()'))
        # 图片连接
        item['coverImage'] = html_element.xpath('//div[@class="detail_left fn-left"]/img/@data-original')[0]
    
        with open('shijijiyua.json', 'a+', encoding='utf-8') as file:
            json_str = json.dumps(item, ensure_ascii=False) + '\n'
            file.write(json_str)
    
    
    if __name__ == '__main__':
    
        # 1 创建一个进程池,执行分页任务下载
        page_pool = ProcessPoolExecutor(2)
        # 2 添加任务
        start_urls = [
            'http://date.jiayuan.com/eventslist_new.php?page=1&city_id=4201&shop_id=33',
            'http://date.jiayuan.com/eventslist_new.php?page=1&city_id=31&shop_id=15',
            'http://date.jiayuan.com/eventslist_new.php?page=1&city_id=3702&shop_id=42',
            'http://date.jiayuan.com/eventslist_new.php?page=1&city_id=50&shop_id=5',
        ]
        for url in start_urls:
            handler = page_pool.submit(down_load_page_data, url)
            handler.add_done_callback(parse_page_data)
    

    迭代器

    迭代是访问集合元素的一种方式。迭代器是一个可以记住遍历的位置的对象。迭代器对象从集合的第一个元素开始访问,直到所有的元素被访问完结束。迭代器只能往前不会后退

    可迭代对象 我们已经知道可以对list、tuple、str等类型的数据使用for...in...的循环语法从其中依次拿到数据进行使用,我们把这样的过程称为遍历,也叫迭代。
    如何判断一个对象是否可以迭代

    from collections import Iterable
    
    print(isinstance([],Iterable)) --> True
    
    print(isinstance(1,Iterable)) -->False
    

    可迭代对象进行迭代使用的过程,每迭代一次(即在for...in...中每循环一次)都会返回对象中的下一条数据,一直向后读取数据直到迭代了所有数据后结束。

    可迭代对象通过_iter方法向我们提供一个迭代器,我们在迭代一个可迭代对象的时候,实际上就是先获取该对象提供的一个迭代器,然后通过这个迭代器来依次获取对象中的每一个数据。一个具备了 _iter 方法的对象,就是一个 可迭代对象

    iter()函数与next()函数

    list、tuple等都是可迭代对象,我们可以通过iter()函数获取这些可迭代对象的迭代器。然后我们可以对获取到的迭代器不断使用next()函数来获取下一条数据。iter()函数实际上就是调用了可迭代对象的iter方法。

    如何判断一个对象是否是迭代器

    from collections import Iterator
    
    print(isinstance([1,2], Iterator)) -->False
    
    print(isinstance(iter([1,2]), Iterator)) -->True
    

    迭代器是用来帮助我们记录每次迭代访问到的位置,当我们对迭代器使用next()函数的时候,迭代器会向我们返回它所记录位置的下一个位置的数据。实际上,在使用next()函数的时候,调用的就是迭代器对象的next方法。所以,我们要想构造一个迭代器,就要实现它的next方法。并且python要求迭代器本身也是可迭代的,所以我们还要为迭代器实现iter方法,迭代器的iter方法返回自身即可。

    一个实现了iter方法和next方法的对象,就是迭代器。

    生成器

    利用迭代器,我们可以在每次迭代获取数据(通过next()方法)时按照特定的规律进行生成。但是我们在实现一个迭代器时,关于当前迭代到的状态需要我们自己记录,进而才能根据当前状态生成下一个数据。为了达到记录当前状态,并配合next()函数进行迭代使用,我们可以采用更简便的语法,即生成器(generator)。生成器是一类特殊的迭代器。
    在使用生成器实现的方式中,我们将原本在迭代器next方法中实现的基本逻辑放到一个函数中来实现,但是将每次迭代返回数值的return换成了yield,此时新定义的函数便不再是函数,而是一个生成器了。
    使用了yield关键字的函数不再是函数,而是生成器。(使用了yield的函数就是生成器)

    • yield关键字有两点作用:
      保存当前运行状态(断点),然后暂停执行,即将生成器(函数)挂起
      将yield关键字后面表达式的值作为返回值返回,此时可以理解为起到了return的作用
      可以使用next()函数让生成器从断点处继续执行.

    协程

    协程,又称微线程,纤程
    协程是python个中另外一种实现多任务的方式,只不过比线程更小占用更小执行单元(理解为需要的资源)。 它自带CPU寄存器上下文。这样只要在合适的时机, 我们可以把一个协程 切换到另一个协程。 只要这个过程中保存或恢复 CPU上下文那么程序还是可以运行的。

    协程和线程差异

    在实现多任务时, 线程切换从系统层面远不止保存和恢复 CPU上下文这么简单。 操作系统为了程序运行的高效性每个线程都有自己缓存Cache等等数据,操作系统还会帮你做这些数据的恢复操作。 所以线程的切换非常耗性能。但是协程的切换只是单纯的操作CPU的上下文,所以一秒钟切换个上百万次系统都抗的住。

    yeild简单实现

    import time
    
    def work1():
        while True:
            print("----work1---")
            yield
            time.sleep(0.5)
    
    def work2():
        while True:
            print("----work2---")
            yield
            time.sleep(0.5)
    
    def main():
        w1 = work1()
        w2 = work2()
        while True:
            next(w1)
            next(w2)
    
    if __name__ == "__main__":
        main()
    实质: 其实任务是在主线程中并发执行的,看上去像同时执行而已,当执行next()的时候,函数执行到yield的时候先暂停一下,然后之后再调用next()的时候,接着上一次暂停的位置执行
    

    实现协程

    .greenlet的使用

    from greenlet import greenlet
    import requests
    
    def download1():
        print('正在下载1')
        #耗时的操作
        response = requests.get(url='https://github.com/')
        gre2.switch()
        print('download1下载完了')
        gre2.switch()
    
    
    def download2():
        print('正在下载2')
        response = requests.get(url='https://github.com/')
        gre1.switch()
        print('download2下载完了')
    
    
    gre1 = greenlet(download1)
    gre2 = greenlet(download2)
    gre1.switch()
    
    

    greenlet已经实现了协程,但是这个还的人工切换,python还有一个比greenlet更强大的并且能够自动切换任务的模块gevent

    gevent能够在内部自己实现携程之间的切换

    from gevent import monkey,pool
    import gevent,requests
    import lxml.etree as etree

    有耗时操作时需要

    monkey.patch_all() # 将程序中用到的耗时操作的代码,换为gevent中自己实现的模块

    def download(url):
        print(url+'正在下载1')
        header = {'User-Agent':'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:61.0) Gecko/20100101 Firefox/61.0'}
        response = requests.get(url,headers=header)
        print(len(response.text),url+'已完成1')
    
    def download2(url):
        print(url+'正在下载2')
        header = {'User-Agent':'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:61.0) Gecko/20100101 Firefox/61.0'}
        response = requests.get(url,headers=header)
        print(len(response.text),url+'已完成2')
    
    pool = pool.Pool(2)
    
    gevent.joinall(
        [
            pool.spawn(download,'https://www.yahoo.com/'),
            pool.spawn(download,'https://www.taobao.com/'),
            pool.spawn(download,'https://github.com/'), 
            pool.spawn(download2,'https://www.yahoo.com/'),
            pool.spawn(download2,'https://www.taobao.com/'),
            pool.spawn(download2,'https://github.com/'), 
        ]
    )
    

    相关文章

      网友评论

          本文标题:python网络爬虫:多任务-进程、线程

          本文链接:https://www.haomeiwen.com/subject/izwvlqtx.html