美文网首页
并发编程-进程池和multiprocess.Pool

并发编程-进程池和multiprocess.Pool

作者: Yanl__ | 来源:发表于2019-10-28 20:33 被阅读0次

    multiprocess.Pool模块

    • Pool([numprocess [,initializer [, initargs]]]):

    1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值 (一般设置为cpu个数+1)
    2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
    3 initargs:是要传给initializer的参数组

    • p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(args,*kwargs),然后返回结果。
      ret = p.apply(func, args=()) 返回值就是func的return
      '''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()'''

    • p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(args,*kwargs),然后返回结果。

    1. 返回值:ret = apply_async返回的对象,让用户可以通过ret.get()获得func的返回值。get会阻塞直到对应的func执行完毕拿到结果。
    2. 使用apply_async给进程池分配任务,需要先close后join来保持多进程和主进程代码的同步性
    • p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成

    • p.join():等待所有工作进程退出。此方法只能在close()或teminate()之后调用

    进程池的同步调用

    # -*- coding: UTF-8 -*-
    
    """
    # @Time    : 2019-10-23 17:02
    # @Author  : yanlei
    # @FileName: 进程池的同步调用.py
    """
    import os, time
    from multiprocessing import Pool
    
    def work(n):
        print('%s run'%os.getpid())
        time.sleep(1)
        return n**2
    
    p = Pool(3)
    res_l = []
    for i in range(10):
        res = p.apply(work, args=(i, )) # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞
        res_l.append(res)
    print(res_l)
    

    进程池的异步调用

    import os, time
    from multiprocessing import Pool
    
    def work(n):
        print('%s run'%os.getpid())
        time.sleep(1)
        return n**2
    
    p = Pool(3)
    ret_l = []
    for i in range(10):
        ret = p.apply_async(work, args=(i, )) #  异步调用
        ret_l.append(ret)
    p.close()
    p.join()
    for ret in ret_l:
        print(ret.get())
    

    进程池版socket并发聊天

    server端

    • 进程池开几个进程,就能同时和几个客户端聊天。其他的需要在后面排队等待前面的进程结束归还进程池才能拿到。
    import os
    from socket import *
    from multiprocessing import Pool
    
    server = socket(AF_INET, SOCK_STREAM)
    server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    server.bind(('127.0.0.1', 8080))
    server.listen(5)
    
    def talk(conn):
        print('进程pid:%s'%os.getpid())
        while True:
            try:
                msg = conn.recv(1024)
                if not msg:break
                print(msg.decode('utf-8'))
                conn.send(msg.upper())
            except Exception:
                break
    
    p = Pool(4)
    while True:
        conn, *_ = server.accept()
        p.apply_async(talk, args=(conn, ))
    

    client端

    from socket import *
    client = socket(AF_INET, SOCK_STREAM)
    client.connect(('127.0.0.1', 8080))
    
    while True:
        msg = input('>>>').strip()
        if not msg:break
    
        client.send(msg.encode('utf-8'))
        msg = client.recv(1024)
        print(msg.decode('utf-8'))
    

    回调函数

    p.apply_async(get_data, args=(url, ), callback=call_back)

    # -*- coding: UTF-8 -*-
    
    """
    # @Author  : yanlei
    # @FileName: 回调函数_爬取数据.py
    """
    import requests
    from multiprocessing import Pool
    
    def get_data(url):
        response = requests.get(url)
        if response.status_code == 200:
            return url, response.content.decode('utf-8')
    
    def call_back(args):
        url, content = args
        print(url, len(content))
    
    url_list = [
        'https://www.baidu.com',
        'https://www.sohu.com',
        'https://www.sogou.com',
        'https://www.runoob.com',
        'https://leetcode-cn.com',
        'https://cn.bing.com',
    ]
    
    p = Pool(2)
    for url in url_list:
        p.apply_async(get_data, args=(url, ), callback=call_back)
    p.close()
    p.join()
    
    
    
    

    相关文章

      网友评论

          本文标题:并发编程-进程池和multiprocess.Pool

          本文链接:https://www.haomeiwen.com/subject/yrwhvctx.html