美文网首页
python--线程condition

python--线程condition

作者: 极光火狐狸 | 来源:发表于2018-09-22 17:03 被阅读53次

源码: tests/condition.py

# -.- coding:utf-8 -.-
import time
import unittest
import threading
from .utils import get_lock_count


class TestCondition(unittest.TestCase):

    def test_can_acquire_multi_times(self):
        """
        之所以能acquire多次, 是因为它默认采用Rlock实现.
        """
        cond = threading.Condition()

        status = cond.acquire()
        self.assertTrue(status)

        status = cond.acquire()
        self.assertTrue(status)

        lock_count = get_lock_count(cond)
        self.assertEqual(lock_count, 2)

    def test_can_not_acquire_multi_times_use_lock(self):
        """
        把Condition默认的Rlock改为Lock之后, 就不能再多次acquire了.
        根据acquire返回的状态值判定是否加锁成功!
        """
        cond = threading.Condition(threading.Lock())
        cond.acquire()
        status = cond.acquire(0)
        self.assertFalse(status)

    def test_can_not_acquire_multi_times_with_multi_threads(self):
        """
        不能在多线程中多次acquire
        """
        cond = threading.Condition()
        cond.acquire()
        status = {"status": False}

        def other_thread(the_cond):
            while True:
                locked = the_cond.acquire(0)
                if locked:
                    status["status"] = True
                break

        t = threading.Thread(target=other_thread, args=(cond, ))
        t.start()
        t.join()

        time.sleep(3)
        self.assertFalse(status["status"])

    def test_multi_consumer_one_producer_with_rlock(self):
        cond = threading.Condition(threading.RLock())

        consumer_count = []

        def consumer(cv):
            cv.acquire()
            cv.wait()
            consumer_count.append(1)
            cv.release()

            # with cv:
            #     cv.wait()
            #     consumer_count.append(1)

        def producer(cv):
            cv.acquire()
            cv.notify_all()
            cv.release()

            # with cv:
            #     cv.notify_all()

        # TODO: 为什么这里可以使用不同线程来acquire ?
        # 因为 lock/rlock 的wait方法把 锁给释放掉,
        # 然后生成新的锁, 并丢进 condition 的 waiters 列表中,
        # 因此这里的三个consumer都可以acquire, 产生了可以并发的假象.
        # 最终又因为 producer 中的 notify_all 通知了 waiters 列表中
        # 的所有对象, 因此它们一个一个被唤醒并重新.
        # wait方法被唤醒后, 又把lock/rlock给acquire了, 因此consumer最后
        # 又可以release了, 整个过程饶了一大圈!
        t1 = threading.Thread(target=consumer, args=(cond,))
        t2 = threading.Thread(target=consumer, args=(cond,))
        t3 = threading.Thread(target=consumer, args=(cond,))

        t1.start()
        t2.start()
        t3.start()

        time.sleep(10)
        producer(cond)

        time.sleep(1)
        self.assertEqual(sum(consumer_count), 3)

    def test_multi_consumer_one_producer_with_lock(self):
        cond = threading.Condition(threading.Lock())

        consumer_count = []
        def consumer(cv):
            cv.acquire()
            cv.wait()
            consumer_count.append(1)
            cv.release()

            # with cv:
            #     cv.wait()
            #     consumer_count.append(1)

        def producer(cv):
            cv.acquire()
            cv.notify_all()
            cv.release()

            # with cv:
            #     cv.notify_all()

        t1 = threading.Thread(target=consumer, args=(cond,))
        t2 = threading.Thread(target=consumer, args=(cond,))
        t3 = threading.Thread(target=consumer, args=(cond,))

        t1.start()
        t2.start()
        t3.start()

        time.sleep(10)
        producer(cond)

        time.sleep(1)
        self.assertEqual(sum(consumer_count), 3)

 
 

测试: tests/main.py

import unittest


TEST_MODULE = [
    "ln_threading.tests.condition",
]


if __name__ == '__main__':
    suite = unittest.defaultTestLoader.loadTestsFromNames(TEST_MODULE)
    runner = unittest.TextTestRunner(verbosity=2)
    runner.run(suite)

相关文章

网友评论

      本文标题:python--线程condition

      本文链接:https://www.haomeiwen.com/subject/yxjxoftx.html