美文网首页
Python 并发总结

Python 并发总结

作者: flashine | 来源:发表于2019-02-18 12:26 被阅读0次

    0x01 GIL锁

    C语言写的Python解释器存在全局解释器锁GIL(Global Interpreter Lock),由于GIL的存在,在同一时间内,Python解释器只能运行一个线程,所以Python多线程实际上运行的时候只存在一个线程。但是这种情况只存在于Cpython中,对于Jpython(Java解释器)和Rpython(Python解释器)则不存在。
      GIL对计算密集型的程序会产生影响,因为计算密集型的程序,需要占用系统资源,GIL使得程序相当于始终在进行单线程运算。
      对于IO密集型任务适用于多线程,对于计算密集型任务则适用于多进程:
      1. IO密集型:磁盘IO、网络IO。比如网络请求、文件读写等。
      2. 计算密集型:指CPU计算占主要的任务。

    0x02 threadpool实现多线程

    基于python2的线程池实现多线程:

    # -*- coding: UTF-8 -*-
    import threadpool 
    import time,random 
    import Queue
    
    def hello(str): 
        time.sleep(2) 
        return str 
    
    def print_ret(request, result): 
        print "the result is %s %r\n" % (request.requestID, result) 
    
    
    def deal_task(pool):
        try:
            pool.poll(True)
        except Exception, e:
            print str(e)
    
    q = Queue.Queue()
    for i in range(100):
        q.put(i)
    
    lst = [q.get() for i in range(q.qsize())]
    
    pool = threadpool.ThreadPool(20) 
    # 第一个参数为线程执行函数,第二个参数为线程函数的参数
    # 最后一个参数为对前两个函数运行结果的操作,request和hello
    requests = threadpool.makeRequests(hello, lst, print_ret) 
    for req in requests:
        pool.putRequest(req)
    
    pool.wait()
    

    0x03 threading 实现多线程

    threading 多线程示例代码

    # -*- coding: UTF-8 -*-
    import requests
    import threading
    import Queue
    
    url = "xxx"
    threads = 100
    q = Queue.Queue()
    for i in range(20):
        q.put(i)
    
    def send():
        while not q.empty():
            q.get()
            r = requests.post(url, data={})
            print(r.text)
    
    if __name__ == '__main__':
        for i in range(threads):
            t = threading.Thread(target=send)
            t.start()
        for i in range(threads):
            t.join()
    
    

    代码中未涉及到多线程变量操作,所以没有采用线程锁,需要使用的时候再加

    lock = threding.Lock()
    
    def test():
        lock.acquire()
        # operate param
        lock.release()
    

    0x04 ThreadPoolExecutor 实现多线程

    ThreadPoolExecutor支持Python3和Python2。使用ThreadPoolExecutor可以自动调度线程,它实现了对threadingmultiprocessing的进一步抽象,而且在进行文件读写操作时不需要在代码中额外使用线程锁。

    # -*- coding: UTF-8 -*-
    import requests
    from concurrent.futures import ThreadPoolExecutor, as_completed, wait, ALL_COMPLETED
    import time
    
    header = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36'
    }
    
    def check_words(url,path):
        """
        从网页中查找关键词并写入指定的path
        """
        print(url)
    
    if __name__ == "__main__":
        urls = []
        path = ""
        with ThreadPoolExecutor(max_workers=48) as executor:
            # submit(线程函数,线程参数)
            tasks = [executor.submit(check_words, url ,path) for url in urls]
            for i, task in enumerate(as_completed(tasks)):
                print("\r 已完成第{0}/100个=>"。format(i), end="")  
    

    ThreadPoolExecutor使用submit()方法向线程池中提交一次线程运行所需的参数,而使用map()方法则可以直接提交集合,即map(list())as_completed函数可以检测当前执行的线程函数是否运行完毕,add_done_callback是线程执行结果的回调,如需要获取线程执行返回值则可以在futures上进行绑定,wait方法可以让主线程阻塞直到满足设定要求。

    def handler(result):
        res = res.result()
        #handle result
        
    future.add_done_callback(handler)
    wait(futures, return_when=ALL_COMPLETED)
    

    使用多次之后发现ThreadPoolExecutor结合requests时往往会出现线程假死的情况:实际还在运行,但是不输出任何结果,原因未知。

    0x05 asyncio + aiohttp 实现协程并发

    参考使用Python进行并发编程 发现在进行多线程的网络请求时使用asyncio+aiohttp能够达到最高效的执行速度。

    # -*- coding: UTF-8 -*-
    import aiohttp
    import asyncio
    import async_timeout
    import time
    import ssl
    
    header = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36',
        'Cookie': ''
    }
    
    async def check(url):
        try:
            async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False)) as session:# 忽略校验ssl
                async_timeout.timeout(5)
                async with session.get(url, headers=header, ssl=ssl.SSLContext()) as response:
                    res = ""
                    if response.status == 200:
                        res = await response.text()
                        link_word = set()
                        with open('./dark_link.txt', 'r', encoding='utf-8') as dark_file:
                            for word in dark_file.readlines():
                                if res.find(word.strip()) != -1:
                                    link_word.add(word.strip())
                        if len(link_word) != 0:
                            print("".join(["[+]", url, "发现关键词:"]), link_word)
                            return [url, "", response.status, link_word]
                        else:
                            return [url, "", response.status, ""]
                    else:
                        return [url, "", response.status, ""]
        except:
            return [url, "", "连接异常", ""]
    
    
    if __name__ == '__main__':
        urls = set()
        with open(r'urls.txt') as pf:
            for line in pf.readlines():
                urls.add(line.strip())
     # windows编程需要使用ProactorEventLoop,其他情况下使用asyncio.get_event_loop()
     event_loop = asyncio.ProactorEventLoop()
        asyncio.set_event_loop(event_loop)
        tasks = [check(url) for url in urls]
        results = event_loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=False))# 请求函数运行结束后收集各个结果
        for item in results:
            print(item)
    

    目前对asyncio了解较少,以后了解多了再详写

    0x06 multiprocessing.dummy 线程池

    1. map
      用法基本与ThreadPoolExecutor 的map用法一致
    from multiprocessing.dummy import Pool as ThreadPool
    import requests
    
    arg = [3, 5, 11, 19, 12]
    pool = ThreadPool(processes=3)
    return_list = pool.map(requests.get, arg)
    pool.close()
    pool.join()
    print(return_list)
    
    1. apply_async
    from multiprocessing.dummy import Pool as ThreadPool
    import requests
    
    async_pool = ThreadPool(4)
    results =[]
    for i in range(5):
        msg = 'msg: %d' % i
        result = async_pool.apply_async(requests.get, (msg, ))
        results.append(result)
     
    for i in results:
        i.wait()  # 等待线程函数执行完毕
     
    for i in results:
        if i.ready():  # 线程函数是否已经启动了
            if i.successful():  # 线程函数是否执行成功
                print(i.get())  # 理论上是线程函数返回值,实际测试代码时未输出任何内容,原因未知
    

    相关文章

      网友评论

          本文标题:Python 并发总结

          本文链接:https://www.haomeiwen.com/subject/cekaeqtx.html