美文网首页
python 进程(池)、线程(池)

python 进程(池)、线程(池)

作者: 不忘初心_悟空 | 来源:发表于2020-09-24 19:15 被阅读0次

    进程、多进程、进程池

    进程总概述

    进程

    from multiprocessing import Process
    import os
    
    # 子进程要执行的代码
    def run_proc(name):
        print('Run child process %s (%s)...' % (name, os.getpid()))
    
    if __name__=='__main__':
        print('Parent process %s.' % os.getpid())
        p = Process(target=run_proc, args=('test',))
        print('Child process will start.')
        p.start()
        p.join()
        print('Child process end.')
    

    多进程(进程池创建)

    from multiprocessing import Pool
    import os, time, random
    
    def long_time_task(name):
        print('Run task %s (%s)...' % (name, os.getpid()))
        start = time.time()
        time.sleep(random.random() * 3)
        end = time.time()
        print('Task %s runs %0.2f seconds.' % (name, (end - start)))
    
    
    if __name__ == '__main__':
        print('Parent process %s.' % os.getpid())
        p = Pool(3)
        for i in range(4):
            p.apply_async(long_time_task, args=(i,))
        print('Waiting for all subprocesses done...')
        p.close()
        p.join()
        print('All subprocesses done.')
    

    解析:
    对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。
    Parent process 87461.
    Waiting for all subprocesses done...
    Run task 0 (87462)...
    Run task 1 (87463)...
    Run task 2 (87464)...
    Task 1 runs 1.66 seconds.
    Run task 3 (87463)... -----------------> task3在某个进程结束时,在创建
    Task 2 runs 2.33 seconds.
    Task 0 runs 2.54 seconds.
    Task 3 runs 2.83 seconds.
    All subprocesses done.

    进程之间通信

    Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。
    我们以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

    from multiprocessing import Process, Queue
    import os, time, random
    
    # 写数据进程执行的代码:
    def write(q):
        print('Process to write: %s' % os.getpid())
        for value in ['A', 'B', 'C']:
            print('Put %s to queue...' % value)
            q.put(value)
            time.sleep(random.random())
    
    # 读数据进程执行的代码:
    def read(q):
        print('Process to read: %s' % os.getpid())
        while True:
            value = q.get(True)
            print('Get %s from queue.' % value)
    
    if __name__=='__main__':
        # 父进程创建Queue,并传给各个子进程:
        q = Queue()
        pw = Process(target=write, args=(q,))
        pr = Process(target=read, args=(q,))
        # 启动子进程pw,写入:
        pw.start()
        # 启动子进程pr,读取:
        pr.start()
        # 等待pw结束:
        pw.join()
        # pr进程里是死循环,无法等待其结束,只能强行终止:
        pr.terminate()
    

    线程总概述

    线程

    import time, threading
    
    # 新线程执行的代码:
    def loop():
        print('thread %s is running...' % threading.current_thread().name)
        n = 0
        while n < 5:
            n = n + 1
            print('thread %s >>> %s' % (threading.current_thread().name, n))
            time.sleep(1)
        print('thread %s ended.' % threading.current_thread().name)
    
    print('thread %s is running...' % threading.current_thread().name)
    t = threading.Thread(target=loop, name='LoopThread')
    t.start()
    t.join()
    print('thread %s ended.' % threading.current_thread().name)
    

    线程锁-线程安全(操作同一个变量)

    balance = 0
    lock = threading.Lock()
    
    def run_thread(n):
        for i in range(100000):
            # 先要获取锁:
            lock.acquire()
            try:
                # 放心地改吧:
                    global balance
                                    balance = balance + n
                                    balance = balance - n
            finally:
                # 改完了一定要释放锁:
                lock.release()
    

    线程池创建

    ThreadPoolExecutor实现

    from socket import AF_INET, SOCK_STREAM, socket
    from concurrent.futures import ThreadPoolExecutor
    
    def echo_client(sock, client_addr):
        '''
        Handle a client connection
        '''
        print('Got connection from', client_addr)
        while True:
            msg = sock.recv(65536)
            if not msg:
                break
            sock.sendall(msg)
        print('Client closed connection')
        sock.close()
    
    def echo_server(addr):
        pool = ThreadPoolExecutor(128)
        sock = socket(AF_INET, SOCK_STREAM)
        sock.bind(addr)
        sock.listen(5)
        while True:
            client_sock, client_addr = sock.accept()
            pool.submit(echo_client, client_sock, client_addr)
    
    echo_server(('',15000))
    

    手动创建你自己的线程池, 通常可以使用一个Queue来轻松实现

    from socket import socket, AF_INET, SOCK_STREAM
    from threading import Thread
    from queue import Queue
    
    def echo_client(q):
        '''
        Handle a client connection
        '''
        sock, client_addr = q.get()
        print('Got connection from', client_addr)
        while True:
            msg = sock.recv(65536)
            if not msg:
                break
            sock.sendall(msg)
        print('Client closed connection')
    
        sock.close()
    
    def echo_server(addr, nworkers):
        # Launch the client workers
        q = Queue()
        for n in range(nworkers):
            t = Thread(target=echo_client, args=(q,))
            t.daemon = True
            t.start()
    
        # Run the server
        sock = socket(AF_INET, SOCK_STREAM)
        sock.bind(addr)
        sock.listen(5)
        while True:
            client_sock, client_addr = sock.accept()
            q.put((client_sock, client_addr))
    echo_server(('',15000), 128)
    

    相关文章

      网友评论

          本文标题:python 进程(池)、线程(池)

          本文链接:https://www.haomeiwen.com/subject/fdstuktx.html