廖雪峰Python3学习笔记-多线程与多进程

作者: 相关函数 | 来源:发表于2017-01-11 11:26 被阅读3445次

    本文是笔者学习廖雪峰Python3教程的笔记,在此感谢廖老师的教程让我们这些初学者能够一步一步的进行下去.如果读者想学习完成的教程,请访问廖雪峰Python3教程,笔记如有侵权,请告知删除...

    多进程

    • fork()

      在Unix和Linux操作系统下,提供了fork()函数,来开启一个子进程,这个函数调用一次,返回两次.是因为操作系统将当前进程复制了一份作为子进程,然后在两个进程内返回.

      子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。

      在Python中使用fork()创建子进程,需要先导入os模块.

    import os, time

    创建子进程之前声明的变量

    source = 10

    pid = os.fork()
    if pid == 0:
    time.sleep(20)
    print('child process n = %d' % (source -1))
    else:
    print('parent process n = %d' % source)

    Python对跨平台多进程的支持,通常是通过multiprocessing模块,该模块提供了一个Process类来表示一个进程对象.
    
    Process类的target参数表示进程要执行的任务也就是执行函数,而args就是执行函数的参数.
    
    

    from multiprocessing import Process
    import os

    def test(name):
    print('run child process %s, pid %s' % (name, os.getpid()))

    if name == 'main':

    print('parent process is %s'% os.getpid())
    p = Process(target = test, args = ('Joe',))
    print('child process will run...')
    p.start()
    p.join()
    print('child process is finish')
    
     join()方法的意义是等待子进程之行结束之后继续往下执行,主要用于进程间的同步.
     
    - Pool
    
    
    from multiprocessing import Pool
    import os, time, random
    
    def test(hhh):
        print('chlid process will run.... hhh = %s pid = %s'% (hhh, os.getpid()))
        start = time.time()
        time.sleep(random.random()*2)
        end = time.time()
        print('process is %s and time is %0.2f' % (os.getpid(), end - start))
    
    if __name__ == '__main__':
        p = Pool(4)
        for i in range(5):
            p.apply_async(test, args = (i,))
        print('wait all subprocess done')
        p.close()
        p.join()
        print('colse finish')
    
    
    上面代码利用Pool开启了多个进程,对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。这是Pool有意设计的限制起始最多同时执行4个进程,这并不是操作系统的限制。
    
    - 进程间的通信
    
    Python的multiprocessing模块包装了底层的机制,提供了Queue和Pipes等多种方式来交换数据.
    
    
    from multiprocessing import Queue, Process
    import os, time, random
    
    def write(q):
        print('write process is %s'%os.getpid())
        for i in ['Joe','and','Cheer']:
            q.put(i)
            time.sleep(random.random())
    
    def read(q):
        print('read process is %s'%os.getpid())
        while True:
            value = q.get(True)
            print('Get (%s) from Queue'%value)
    
    if __name__ == '__main__':
        q = Queue()
        pw = Process(target = write, args = (q,))
        pr = Process(target = read, args = (q,))
        pw.start()
        pr.start()
        pw.join()
        pr.terminate()
    
    
    父进程所有Python对象都必须通过pickle序列化再传到子进程去,所有,如果multiprocessing在Windows下调用失败了,要先考虑是不是pickle失败了。 
     
    # 多线程
    
    多任务可以由多进程完成,也可以由一个进程内的多线程完成.通常使用thearding这个模块来开一条子线程.
    
    - threading
    
    启动一个线程就是把一个函数传入并创建Thread实例,然后调用start()开始执行.
       
    

    import threading, time

    def test():
    print('1----thread is %s'%threading.current_thread().name)
    n = 0
    while n < 10:
    n += 1
    print('2----thread is %s'%threading.current_thread().name)
    time.sleep(3)
    print('3----thread is %s'%threading.current_thread().name)
    print('4----thread is %s'%threading.current_thread().name)
    thread = threading.Thread(target=test, name = 'testLoop')
    print('5----thread is %s'%threading.current_thread().name)
    thread.start()
    thread.join()
    print('6----thread is %s'%threading.current_thread().name)

    - Lock
    
    多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。
    
    由于锁只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁,所以,不会造成修改的冲突。创建一个锁就是通过threading.Lock()来实现
    
    

    import time, threading
    banlance = 0
    lcok = threading.Lock()
    def change_it(n):
    global banlance
    banlance += n
    banlance -= n
    def run_thread(n):
    for i in range(100000):
    lock.acquire()
    try:
    change_it(n)
    finally:
    lock.release()
    t1 = threading.Thread(target=run_thread, args=(5,))
    t2 = threading.Thread(target=run_thread, args=(8,))
    print('..........')
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print('.....**************8.....')
    print(banlance)

    
    - ThreadLocal
    
    在多线程环境下,每个线程都有自己的数据。一个线程使用自己的局部变量比使用全局变量好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁。
    
    

    import threading

    local_student = threading.local()

    def process_student():

    std = local_student.name
    print('*****std = %s, and thread is %s'%(std, threading.current_thread().name))
    

    def process_test(name):

    local_student.name = name
    print('####local_student = %s, and thread is %s'% (local_student, threading.current_thread().name)
        )
    process_student()
    

    print('test start.....')
    t1 = threading.Thread(target = process_test, args = ('Joe', ),name = 'Thread1')
    t2 = threading.Thread(target = process_test, args = ('Cheer', ), name = 'Thread2')

    t1.start()
    t2.start()

    t1.join()
    t2.join()

    print('end....')

    打印结果为

    test start.....

    local_student = <_thread._local object at 0x1029e1bf8>, and thread is Thread1

    *****std = Joe, and thread is Thread1

    local_student = <_thread._local object at 0x1029e1bf8>, and thread is Thread2

    *****std = Cheer, and thread is Thread2
    end....

    可以看出local_student.name类似一个全局变量在process_test中赋值之后,可以传递到process_student中使用.local_student是一个全局变量,不但可以使用local_student.name 还可以使用local_student.age等等.可以根据不同的key设置value.进行传递
    
    - 分布式进程
    
    在这一节中,提出在Process和Thread之间,应当优先选择Process,因为Process更加稳定,而且可以分不到多台机器上.Thread只能分不到同一机器的多和cpu上.
    
    Python的multiprocessing模块中的managers子模块支持用网络通信将多进程分不到多台机器上.
    
    通过managers模块将Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了.
    
    服务进程负责启动Queue,并将其注册到网络上,然后往Queue中写任务.
    
    > 1.首先创建两个Queue,一个负责任务发送,一个负责任务接收
    
    > 2.写一个继承自BaseManager的class,使用该class把两个队列注册到网络上,并且使用callable参数关联队列对象
    
    > 3.绑定端口和ip,并设置一个验证码,注意这个authkey是一个***The process’s authentication key (a byte string)***
    .
    
    > 4.启动队列,获取通过网络访问队列对象.
    
    > 5.然后就可以添加任务,从result读取结果了,最后使用shutdown()关闭manager.
    
    

    import queue, random, time
    from multiprocessing.managers import BaseManager

    task_queue = queue.Queue()
    result_queue = queue.Queue()

    class QueueManager(BaseManager):
    pass

    QueueManager.register('get_task_queue', callable = lambda:task_queue)
    QueueManager.register('get_result_queue', callable = lambda:result_queue)

    manager = QueueManager(address = ('127.0.0.1', 5000), authkey = b'joe')

    manager.start()

    task = manager.get_task_queue()
    result = manager.get_result_queue()

    print('.......')
    for i in range(10):
    n = random.randint(0, 10000)
    print('put (%d) in task'%n)
    task.put(n)

    print('try read task...')
    for i in range(10):

    print('%d'%(result.get(timeout = 10)))
    

    manager.shutdown()
    print('end...')

     
     在分布式多进程环境下,添加任务到队列不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得Queue接口进行添加.
     
     有了master进行任务分发,还需要一个worker来执行任务.在多任务环境下,通常是一个Master,多个Worker.上述代码已经将两个队列注册到网络上,我们只需要通过网络连接到服务进程,就可以实现分布式进程了
     
     > 1.创建类似的QueueManager.
     
     > 2.由于QueueManager是从网络上获取Queue,所以只需要提供注册时的名字就好了
     
     > 3.连接到服务器,必须保证同一ip和端口.
     
     > 4.从网络链接,获取到Queue对象,然后从task队列获取任务,把执行结果写入到result中.
     
    

    import time, sys, queue
    from multiprocessing.managers import BaseManager

    创建类似的QueueManager:

    class QueueManager(BaseManager):
    pass

    由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:

    QueueManager.register('get_task_queue')
    QueueManager.register('get_result_queue')

    连接到服务器,也就是运行task_master.py的机器:

    server_addr = '127.0.0.1'
    print('Connect to server %s...' % server_addr)

    端口和验证码注意保持与task_master.py设置的完全一致:

    m = QueueManager(address=(server_addr, 5000), authkey=b'abc')

    从网络连接:

    m.connect()

    获取Queue的对象:

    task = m.get_task_queue()
    result = m.get_result_queue()

    从task队列取任务,并把结果写入result队列:

    for i in range(10):
    try:
    n = task.get(timeout=1)
    print('run task %d * %d...' % (n, n))
    r = '%d * %d = %d' % (n, n, n*n)
    time.sleep(1)
    result.put(r)
    except Queue.Empty:
    print('task queue is empty.')

    处理结束:

    print('worker exit.')

    相关文章

      网友评论

        本文标题:廖雪峰Python3学习笔记-多线程与多进程

        本文链接:https://www.haomeiwen.com/subject/scxrbttx.html