美文网首页
python--线程BoundedSemaphore

python--线程BoundedSemaphore

作者: 极光火狐狸 | 来源:发表于2018-09-22 17:06 被阅读24次

源码: tests/bounded_semaphore.py

# -.- coding:utf-8 -.-
import unittest
import threading


class TestBoundedSemaphore(unittest.TestCase):

    """
    Bounded Semaphore 是基于 Semaphore 基础上的上限限制对象.
    """

    def test_acquire_multi_times(self):
        # acquire 行为无变化, 因为Bounded Semaphore
        # 沿用的是Semaphore的acquire方法.
        sem = threading.BoundedSemaphore(2)

        status = sem.acquire()
        self.assertTrue(status)

        status = sem.acquire()
        self.assertTrue(status)

        status = sem.acquire(blocking=0)
        self.assertFalse(status)

    def test_release_unacquire(self):
        # BoundedSemaphore实例化的上限参数是2,
        # sem.release() 想要给 sem 的上限 +1,
        # 由于BoundedSemaphore重新实现了release()方法,
        # 它判断了, 如果 现有值已经是上限值, 那么就报错!
        # 也就是说, 不允许再上限值的基础上+1.
        sem = threading.BoundedSemaphore(2)
        self.assertRaises(ValueError, sem.release)

 
 

测试: tests/main.py

import unittest


TEST_MODULE = [
    "ln_threading.tests.bounded_semaphore",
]


if __name__ == '__main__':
    suite = unittest.defaultTestLoader.loadTestsFromNames(TEST_MODULE)
    runner = unittest.TextTestRunner(verbosity=2)
    runner.run(suite)

相关文章

网友评论

      本文标题:python--线程BoundedSemaphore

      本文链接:https://www.haomeiwen.com/subject/tcjxoftx.html