美文网首页
线程池抓取

线程池抓取

作者: 垃圾桶边的狗 | 来源:发表于2022-12-14 09:41 被阅读0次
import datetime
import json
import warnings

from jsonpath import jsonpath
from sqlalchemy.exc import ResourceClosedError

warnings.filterwarnings('ignore')
import random
import time

import requests
import pandas as pd
import records
from urllib import parse
import traceback
from pymysql.converters import escape_string
from sqlalchemy import create_engine
from fake_useragent import UserAgent
from concurrent.futures import ThreadPoolExecutor, as_completed
from collections import deque

# from my_proxies import proxy


"""
1.到douyin_account_all表 获取main_page字段内容
2.根据main_page 内容拼接url
3.发请求
4.解析数据
5.存储数据
"""
proxy = [
   
    {'http://': 'http://192.168.241.62:9976',
     'https://': 'https://192.168.241.62:9976'},

]


class CrawlAima:
    def __init__(self):
        self.ua = UserAgent()
        self.db = records.Database(
            f"mysql+pymysql://user:{parse.quote_plus('abc')}@0.0.0.0:3306/db?charset=utf8mb4")
        self.conn = create_engine(self.db.db_url)

        self.dq = deque()
        self.counter = {} # sec_id 记录失败次数
        self.url = 'https://www.iesdouyin.com/post/'

        self.get_time_by_vid_list = []
        self.video_sql = self.get_video_sql()
        self.vid_dict = self.get_all_video()
        # 获取所有main_page
        # self.get_main_page()

    def get_all_video(self):
        vid_dict = self.db.query("select video_id,post_time from douyin_video_extend_main").as_dict()
        return {i['video_id']: str(i['post_time']) for i in vid_dict}


    def get_video_sql(self):
        video_sql = """
                        INSERT INTO douyin_video_extend_main (
                        video_id,
                        userid,
                        comment_num,
                        like_num,
                        collect_num,
                        share_num,
                        url,
                        content,
                        cover_pic,
                        post_time
                        )
                        VALUES
                        (
                        :video_id,
                        :userid,
                        :comment_num,
                        :like_num,
                        :collect_num,
                        :share_num,
                        :url,
                        :content,
                        :cover_pic,
                        :post_time
                        )
                        ON DUPLICATE KEY UPDATE
                            video_id = :video_id,
                            userid = :userid,
                            comment_num = :comment_num,
                            like_num = :like_num,
                            collect_num = :collect_num,
                            share_num = :share_num,
                            url = :url,
                            content = :content,
                            cover_pic = :cover_pic
                        """
        return video_sql

    def get_main_page(self):
        main_page = self.db.query("select userid, main_page,open_id from douyin_account_all;")

        df = main_page.export("df")
        df = df.dropna(subset=['main_page'])
        self.main_page_list = []
        if not df.empty:
            df.main_page = df.main_page.map(lambda x: x.rsplit('/', 1)[-1])
            self.main_page_list = df.main_page.to_list()
            self.main_page_dict = {v: k+1 for k, v in enumerate(self.main_page_list)}
            # 获取用户userid
            self.userid_list = df.userid.to_list()
            self.userid_dict = {str(u): 1 for u in self.userid_list}

            # {userid:open_id}
            df['userid'] = df['userid'].astype('str')
            self.uid_oid_map = dict(zip(df['userid'].to_list(),df['open_id'].to_list()))


    def counter_sec_uid(self,sec_uid):
        # sec_uid 记录次数
        if self.counter.get(sec_uid):
            self.counter[sec_uid] += 1
        else:
            self.counter[sec_uid] = 1

    def crawl_handler(self, sec_uid, max_cursor=0):
        params = {
            "sec_uid": sec_uid,
            "max_cursor": max_cursor,
            "count": 21,
            "key": "188"
        }
        headers = {
            "user-agent": self.ua.random,
            "Connection": "keep-alive",
            # "Host": "api.batmkey.cn:8000",
            # "Upgrade-Insecure-Requests": "1",
        }

        try:
            response = requests.get(self.url,
                                    headers=headers,
                                    params=params,
                                    timeout=(100, 100),
                                    verify=False,
                                    proxies=random.choice(proxy))#,proxies=random.choice(proxy))  # , proxies=proxies

            if response.status_code == 200:
                if response.text:
                    data = response.json()
                    if isinstance(data, dict) and isinstance(data.get("aweme_list"), list):
                        print(f"还剩:{len(self.main_page_dict) - self.main_page_dict[sec_uid]} {sec_uid} success")
                        # res = data.get("data",{}).get('aweme_list')
                        res = data.get('aweme_list')
                        if res:
                            self.data_process(res)
                        else:
                            # print('self.dq.appendleft',sec_uid)
                            self.counter_sec_uid(sec_uid)  # 加到队列
                            self.dq.appendleft(sec_uid)  # count + 1
                    else:
                        if data.get('code') == 100 and data['msg'] == '没有访问权限':
                            print("报错:",data)
                        else:
                            self.counter_sec_uid(sec_uid)  # 加到队列
                            self.dq.appendleft(sec_uid)  # count + 1
                else:
                    self.dq.appendleft(sec_uid)  # 加到队列
                    self.counter_sec_uid(sec_uid)  # count + 1
                    # print("服务器没有返回数据response=",response)
                    # time.sleep(2)
                    print(sec_uid,'没有获取到数据')

            else:
                self.dq.appendleft(sec_uid) # 加到队列
                self.counter_sec_uid(sec_uid) # count + 1
                # print("状态码:",response.status_code)
                # time.sleep(0.5)
        except Exception as e:
            # print('Exception>>>>self.dq.appendleft', sec_uid)
            self.dq.appendleft(sec_uid)
            self.counter_sec_uid(sec_uid)
            # traceback.print_exc()
            print(f"还剩:{len(self.main_page_dict) - self.main_page_dict[sec_uid]} {sec_uid} fail")
            if 'timed out' not in str(e):
                traceback.print_exc()
                print(sec_uid, '报错:',e,'\n')

        time.sleep(random.choice([1, 0.8]))

    def get_header(self, vids):
        headers = {
            'authority': 'www.douyin.com',
            'accept': 'application/json',
            'accept-language': 'zh-CN,zh;q=0.9',
            'cache-control': 'max-age=0',
            'sec-ch-ua': '".Not/A)Brand";v="99", "Google Chrome";v="103", "Chromium";v="103"',
            'path': f'web/api/v2/aweme/iteminfo/?item_ids={vids}',
            'sec-ch-ua-mobile': '?0',
            'sec-ch-ua-platform': '"Windows"',
            'sec-fetch-dest': 'document',
            'sec-fetch-mode': 'navigate',
            'sec-fetch-site': 'none',
            'sec-fetch-user': '?1',
            'upgrade-insecure-requests': '1',
            'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36',
        }
        return headers

    def convert_time(self, t, vid):
        get_post_time_url = "https://www.douyin.com/web/api/v2/aweme/iteminfo/?item_ids={}"
        post_time = self.vid_dict.get(vid)
        if post_time:
            return post_time
        else:
            if t and t.rsplit('_')[-1].isdigit():
                t = int(t.rsplit('_')[-1])
                return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t))
            else:
                res = requests.get(get_post_time_url.format(vid),
                                   headers=self.get_header(vid),
                                   verify=False)  # proxies=random.choice(proxy)
                if not res.text:
                    raise ValueError
                item = res.json()
                item = item['item_list'][0]
                # print('item_list:', len(res['item_list']))
                create_time = item["create_time"]
                post_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(create_time)))
                print(f"post_time:{post_time} vid:{vid}")
                return post_time
                

    def data_process(self, aweme_list):
        # print(data.keys()) #['aweme_list', 'has_more', 'log_pb', 'max_cursor', 'request_item_cursor', 'status_code']
        save_list = []
        for item in aweme_list:
            uid = item.get('author', {}).get('uid')
            if self.userid_dict.get(str(uid)):
                vid = item.get('statistics', {}).get('aweme_id', 0)
                save_data = {
                    "userid": uid,
                    "cover_pic": item.get("video", {}).get("cover", {}).get("url_list", [''])[0],
                    "content": escape_string(item.get('desc', '')),
                    "video_id": vid,
                    "comment_num": item.get('statistics', {}).get('comment_count', 0),
                    "like_num": item.get('statistics', {}).get('digg_count', 0),
                    "share_num": item.get('statistics', {}).get('share_count', 0),
                    "play_num": item.get('statistics', {}).get('play_count', 0),
                    "collect_num": item.get('statistics', {}).get('collect_count', 0),
                    "post_time": self.convert_time(item.get("video", {}).get("dynamic_cover", {}).get('uri'), vid),
                    # "post_time": self.convert_time(item.get('create_time')),
                    "url": "https://www.douyin.com/video/" + str(item.get('statistics', {}).get('aweme_id', 0)),
                }

                userid = save_data['userid']
                open_id = self.uid_oid_map.get(str(userid))
                if open_id:
                    open_id = f"'{open_id}'"
                else:
                    open_id = 'null'
                save_data['open_id'] = open_id
                save_list.append(save_data)

                # save_data.pop('url')
                # print('save_data:',save_data['post_time'],'video_id:',save_data['video_id'])
                # save_list.append(save_data)
            else:
                print("用户uid:", uid, '不存在--------------')

        # 存main表 迁移到main_true.py
        for i in save_list:
            self.db.query(self.video_sql, **i)

def loop():
    start_time = time.time()
    print("开始时间:", time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)))

    res = CrawlAima()
    # 获取所有page
    res.get_main_page()
    with ThreadPoolExecutor(max_workers=5) as t:
        obj_list = []
        for i in res.main_page_list:
            obj = t.submit(res.crawl_handler, i)
            obj_list.append(obj)

        for future in as_completed(obj_list):
            data = future.result()
    # 添加到队列
    # for i in res.main_page_list:
    #     res.dq.appendleft(i)

    crawl_fail_list = []
    while len(res.dq):
        page = res.dq.pop()
        print(f"剩余总数:{len(res.dq)} 当前page:{page}, 第 {res.counter.get(page, 0)} 抓取")
        # if res.counter.get(page, 0) > 3:  # 请求次数超过10 睡3秒
        #     time.sleep(3)

        if res.counter.get(page, 0) > 1:
            print(page, "请求次数超过3次 放弃抓取", res.counter.get(page, 0))
            crawl_fail_list.append(page)
            continue
        res.crawl_handler(page)

    print('------------3次 抓取失败----------------')
    print('抓取失败:',json.dumps(crawl_fail_list))
    print('----------------------------')
    end_time = time.time()
    print('finish', int(end_time - start_time) / 60, ' min',
          time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))


if __name__ == '__main__':
    # while 1:
    loop()
    # time.sleep(60*10)
    #

相关文章

  • 线程池抓取

  • java线程池

    线程VS线程池 普通线程使用 创建线程池 执行任务 执行完毕,释放线程对象 线程池 创建线程池 拿线程池线程去执行...

  • java----线程池

    什么是线程池 为什么要使用线程池 线程池的处理逻辑 如何使用线程池 如何合理配置线程池的大小 结语 什么是线程池 ...

  • Java线程池的使用

    线程类型: 固定线程 cached线程 定时线程 固定线程池使用 cache线程池使用 定时调度线程池使用

  • Spring Boot之ThreadPoolTaskExecut

    初始化线程池 corePoolSize 线程池维护线程的最少数量keepAliveSeconds 线程池维护线程...

  • 线程池

    1.线程池简介 1.1 线程池的概念 线程池就是首先创建一些线程,它们的集合称为线程池。使用线程池可以很好地提高性...

  • 多线程juc线程池

    java_basic juc线程池 创建线程池 handler是线程池拒绝策略 排队策略 线程池状态 RUNNIN...

  • ThreadPoolExecutor线程池原理以及源码分析

    线程池流程: 线程池核心类:ThreadPoolExecutor:普通的线程池ScheduledThreadPoo...

  • 线程池

    线程池 [TOC] 线程池概述 什么是线程池 为什么使用线程池 线程池的优势第一:降低资源消耗。通过重复利用已创建...

  • java 线程池使用和详解

    线程池的使用 构造方法 corePoolSize:线程池维护线程的最少数量 maximumPoolSize:线程池...

网友评论

      本文标题:线程池抓取

      本文链接:https://www.haomeiwen.com/subject/trgkqdtx.html