美文网首页
多线程PDF下载器

多线程PDF下载器

作者: 奈斯凸米特 | 来源:发表于2019-10-08 17:20 被阅读0次
    该PDF下载器支持从本地txt文件中读取和数据库读取两种方式
    数据库ip我乱写的,这里集成了三种情况的数据库(本地数据库,需要验证的数据库,不需要验证的数据库)
    # -*- coding: utf-8 -*-
    """
    此程序用来多线程下载PDF,提供本地txt文件读取下载和数据库读取下载两种方式
    """
    import hashlib
    import os
    import sys
    from multiprocessing import Queue
    from threading import Thread, Lock
    
    import pymongo
    import requests
    
    g_num = 0  # 创建全局变量
    lock = Lock()  # 创建全局互斥锁
    
    
    class DownloadPDF(object):
    
        def __init__(self):
            # 创建消息队列
            self.q = Queue()
    
        def run(self):
            self.mkdir()
            self.get_pdf_msg()
            pdf_size = self.q.qsize()  # 总PDF数
            # 开启多线程
            all_th = []
            for i in range(10):
                th = Thread(target=self.download, args=(pdf_size,))
                th.start()
                all_th.append(th)
    
            for th in all_th:
                th.join()
            print('\n' + '下载完成!')
    
        def download(self, pdf_size):
            while True:
                try:
                    # 获取PDF信息
                    each_pdf = self.q.get(timeout=2)  # 设置超时时间2s。这里如果不设置超时,当队列为空时,q.get()会进入阻塞状态
                except:
                    # 这里队列已经为空,取不到数据就跳出循环
                    break
                    # 计数
                global g_num
                lock.acquire()  # 上锁
                g_num += 1
                # 打印下载进度
                sys.stdout.write('\r' + '正在下载:%s / %s' % (g_num, pdf_size))
                path = ''
                pdf_url = ''
                md5_later = ''
                pdf_name = ''  # 如果没有PDF名,置为空
                if is_txt:
                    # 选择了txt文件方式,只有一个链接
                    pdf_url = each_pdf.strip()
                    href_name = pdf_url.split('/')[-1]  # 取链接 / 后的值作为PDF名称
                    if '.pdf' in href_name:
                        href_name = href_name.split('.')[0]
                    md5_later = self.pdf_name_md5(href_name)  # pdf转码之后的名称
                    path = 'PDF/' + md5_later + '.pdf'
    
                elif not is_txt:
                    # 从数据库中读取PDF信息,数据库中包含两个字段(pdf_name, pdf_url)
                    pdf_name = each_pdf['pdf_name']
                    pdf_url = each_pdf['pdf_url']
                    md5_later = self.pdf_name_md5(pdf_name)
                    path = 'PDF/' + md5_later + '.pdf'
                # 下载之前先去重
                try:
                    # 尝试获取该路径下pdf的size,如果还未下载则置为0
                    size = os.path.getsize(path)
                except:
                    size = 0
                # 判断已经下载并且size > 0则为成功下载
                if os.path.exists(path) and size != 0:
                    # 路径存在说明下载过
                    lock.release()  # 释放锁
                    continue
                # 下载PDF
                self.download_pdf(pdf_name, each_pdf, path, pdf_url, md5_later)
                lock.release()  # 释放锁
    
        @staticmethod
        def download_pdf(pdf_name, each_pdf, path, pdf_url, md5_later):
            """
            下载PDF的主要实现程序
            :param pdf_name:
            :param each_pdf:
            :param path:
            :param pdf_url:
            :param md5_later:
            :return:
            """
            try:
                # 开始下载
                headers = {
                    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)'
                                  ' Chrome/76.0.3809.100 Safari/537.36'}
                r = requests.get(url=pdf_url, headers=headers, timeout=(120, 600))  # 设置connect超时2分钟,read超时10分钟
                f = open(path, 'wb')
                f.write(r.content)
                f.close()
                r.close()
                s = requests.session()
                s.keep_alive = False
                if is_insert == 'y':
                    # 下载成功,插入数据库
                    if not pdf_name:
                        msg = {'url': pdf_url, 'md5_name': md5_later, 'relative_path': path}
                    else:
                        msg = {'url': pdf_url, 'origin_name': pdf_name, 'md5_name': md5_later, 'relative_path': path}
                    new_collection.insert(msg)
            except:
                # 下载失败
                print('\r' + '下载失败:%s' % str(each_pdf))
                with open('error.txt', 'r') as fr:
                    content = fr.readlines()
                if str(each_pdf) not in content:
                    with open('error.txt', 'a') as fe:
                        fe.writelines(str(each_pdf) + '\n')
    
        @staticmethod
        def mkdir():
            """
            创建下载失败文件、去重文件、PDF文件夹
            :return:
            """
            if not os.path.exists('error.txt'):
                with open('error.txt', 'w') as fr:
                    fr.write('')
            if not os.path.exists('PDF'):
                os.mkdir('PDF')
    
        def client_dbs(self):
            """
            连接数据库,获取所有pdf信息
            :return:
            """
            ip = self.choose_ip()
    
            client = pymongo.MongoClient('mongodb://%s:27017' % ip)
            # 这里因为该数据库开了验证,所以这里进行判断验证(ip我乱写的)
            if ip == '192.168.0.123':
                client['admin'].authenticate('用户名', '密码')  # 数据库验证(用户名密码根据自己数据库修改)
            all_db_names = client.list_database_names()
            # 规范打印数据库名
            self.pr_datas(all_db_names)
    
            db_name_num = input('\r' + '请选择数据库对应序号:')
            db_name = self.choose_db_or_col(all_db_names, db_name_num, '数据库')
    
            while True:
                db = client[db_name]
                all_col = db.collection_names()
                if not all_col:
                    db_name = input('没有这个数据库,请重新输入:')
                else:
                    break
            # 规范打印集合名
            self.pr_datas(all_col)
            col_num = input('\r' + '请选择集合对应的序号:')
            col_name = self.choose_db_or_col(all_col, col_num, '集合')
    
            collection = db[col_name]
            return collection, db
    
        @staticmethod
        def choose_ip():
            """
            选择ip
            :return:
            """
            ip = input('请选择数据库:' + '\n' + '1. 本地数据库   2. 123数据库   3. 200数据库' + '\n')
            while True:
                if ip == '1':
                    ip = '127.0.0.1'
                    break
                elif ip == '2':
                    ip = '192.168.0.123'
                    break
                elif ip == '3':
                    ip = '192.168.0.200'
                    break
                else:
                    ip = input('请正确输入1 或者 2 或者 3:')
            return ip
    
        @staticmethod
        def choose_db_or_col(name, name_num, d_or_c):
            """
            选择数据库或者集合
            :return:
            """
            while True:
                try:
                    int(name_num)
                except ValueError:
                    name_num = input('请输入正确的%s序号:' % d_or_c)
                if 0 <= int(name_num) <= len(name):
                    break
                else:
                    name_num = input('请输入正确的%s序号:' % d_or_c)
            for j, e_name in enumerate(name, 1):
                if name_num == str(j):
                    choose_name = e_name
                    return choose_name
    
        @staticmethod
        def pr_datas(names):
            """
            规范打印数据库/集合名称
            :param names:
            :return:
            """
            for i, each_name in enumerate(names, 1):
                print(str(i) + '. ' + each_name + ' ' * (40 - len(each_name) - len(str(i))), end='')
                if i % 3 == 0:
                    print('\n')
            if len(names) < 3:
                print('\n')
    
        def get_pdf_msg(self):
            """
            将pdf信息存入消息队列
            :return:
            """
            all_urls = self.choose_way()
            for i in all_urls:
                self.q.put(i)
    
        def choose_way(self):
            """
            选择PDF的url来源,是在txt文件中还是从数据库中读取
            :return:
            """
            print('*' * 100)
            print(
                '注意事项:' + '\n' + '1. 如果选择了txt文件方式下载,txt内容必须为一行一个url格式' + '\n' +
                '2. 如果选择了从数据库中读取PDF信息下载,则数据库中包含两个字段(a. 存放url的字段,b. 该PDF的名字字段(可以没有)),其中存放url的字段不能嵌套,只能有一个url' + '\n' +
                '3. txt文件只有链接,默认取链接最后 "/" 后的值作为PDF名' + '\n' +
                '4. 数据库方式如果没有指定PDF名字,则默认以链接最后一个 / 后的内容作为PDF名')
            print('*' * 100)
            global is_txt, is_insert, new_collection
            the_way = input('请选择PDF的来源(输入1 / 2):' + '\n' + '1. txt文件       2. 从数据库中读取' + '\n')
            while True:
                if the_way == '1' or the_way == '2':
                    break
                else:
                    the_way = input('请正确输入1 或者 2:')
            urls = []
            if the_way == '1':
                # txt文件
                is_txt = True
                print('说明:txt文件中必须为一行一个url!')
                txt_name = input('请输入与该程序同级目录下的txt文件名称(如:123.txt):')
                while True:
                    try:
                        with open(txt_name, 'r') as f:
                            urls = f.readlines()
                        break
                    except FileNotFoundError:
                        txt_name = input('没有这个txt文件,请重新输入:')
            elif the_way == '2':
                # 数据库
                collection, db = self.client_dbs()  # 连接数据库
                url_field = input('请输入PDF的url字段名:')
                name_field = input('请输入PDF的name字段名(如果没有则不输入):')
                if not name_field:
                    is_txt = True
                    for d in collection.find():
                        if d[url_field]:
                            urls.append(d[url_field])
                else:
                    is_txt = False
                    for d in collection.find():
                        if d[url_field]:
                            msg = {'pdf_url': d[url_field], 'pdf_name': d[name_field]}
                            urls.append(msg)
    
            # 选择是否要插入数据的数据库
            is_insert = input('是否要将下载的PDF信息写入到新的数据库(输入y / n,不输入或输入其他默认不写入):')
            if is_insert == 'y':
                print('选择插入数据库,字段包含(pdf_url, md5_name, relative_path)')
                ip = self.choose_ip()
    
                client = pymongo.MongoClient('mongodb://%s:27017' % ip)
                db_name = input('请输入要存入的数据库名(可新建):')
                db = client[db_name]
                if ip == '192.168.0.123':
                    db.authenticate(name='用户名', password='密码', source='admin')
                new_col = input('请输入要插入的集合名称:')
                new_collection = db[new_col]
            return urls
    
        @staticmethod
        def pdf_name_md5(pdf):
            """
            将pdf名字转为md5
            :param pdf:
            :return:
            """
            md = hashlib.md5()
            md.update(pdf.encode('utf-8'))
            pdf_name = md.hexdigest()
            return pdf_name
    
    
    if __name__ == '__main__':
        download = DownloadPDF()
        download.run()
    

    相关文章

      网友评论

          本文标题:多线程PDF下载器

          本文链接:https://www.haomeiwen.com/subject/netipctx.html