# -.- coding:utf-8 -.-
import time
import unittest
import threading
class TestSemaphore(unittest.TestCase):
def test_acquire_multi_times(self):
# Semaphore 采用的是固定的 Condition(Lock()),
# 因此默认情况下, 是不能被多次acquire的.
# 但是实际上semaphore就是可以被多次acquire,
# 理由是它内部重构了acquire方法, 每次acquire它都会
# 去调用wait 方法, 然而 wait 方法, 就又去release掉当前锁,
# 然后丢到waiters中, 等待notify, 最后又acquire回来.
# 因此这里仍然是可以执行多个acquire, 但是这个acquire的次数被限定了.
sem = threading.Semaphore(2)
status = sem.acquire()
self.assertTrue(status)
status = sem.acquire()
self.assertTrue(status)
status = sem.acquire(blocking=0)
self.assertFalse(status)
def test_release_unacquired(self):
# 限定sem可以被acquire两次.
sem = threading.Semaphore(2)
# release() 后 sem 可以被acquire三次.
sem.release()
status = sem.acquire()
self.assertTrue(status)
status = sem.acquire()
self.assertTrue(status)
status = sem.acquire()
self.assertTrue(status)
# 第四次加锁失败!
status = sem.acquire(0)
self.assertFalse(status)
def test_doc_comment(self):
"""
我能想到的semaphore的场景是: 把semaphore当作是一个线程池.
线程池可以是:
django/flask的wsgi的并发限定连接数.
tornado的threadexecutorpool的并发限定连接数.
"""
self.assertTrue(True)
测试: tests/main.py
import unittest
TEST_MODULE = [
"ln_threading.tests.semaphore",
]
if __name__ == '__main__':
suite = unittest.defaultTestLoader.loadTestsFromNames(TEST_MODULE)
runner = unittest.TextTestRunner(verbosity=2)
runner.run(suite)
网友评论