# -.- coding:utf-8 -.-
import time
import unittest
import threading
from .utils import get_lock_count
class TestCondition(unittest.TestCase):
def test_can_acquire_multi_times(self):
"""
之所以能acquire多次, 是因为它默认采用Rlock实现.
"""
cond = threading.Condition()
status = cond.acquire()
self.assertTrue(status)
status = cond.acquire()
self.assertTrue(status)
lock_count = get_lock_count(cond)
self.assertEqual(lock_count, 2)
def test_can_not_acquire_multi_times_use_lock(self):
"""
把Condition默认的Rlock改为Lock之后, 就不能再多次acquire了.
根据acquire返回的状态值判定是否加锁成功!
"""
cond = threading.Condition(threading.Lock())
cond.acquire()
status = cond.acquire(0)
self.assertFalse(status)
def test_can_not_acquire_multi_times_with_multi_threads(self):
"""
不能在多线程中多次acquire
"""
cond = threading.Condition()
cond.acquire()
status = {"status": False}
def other_thread(the_cond):
while True:
locked = the_cond.acquire(0)
if locked:
status["status"] = True
break
t = threading.Thread(target=other_thread, args=(cond, ))
t.start()
t.join()
time.sleep(3)
self.assertFalse(status["status"])
def test_multi_consumer_one_producer_with_rlock(self):
cond = threading.Condition(threading.RLock())
consumer_count = []
def consumer(cv):
cv.acquire()
cv.wait()
consumer_count.append(1)
cv.release()
# with cv:
# cv.wait()
# consumer_count.append(1)
def producer(cv):
cv.acquire()
cv.notify_all()
cv.release()
# with cv:
# cv.notify_all()
# TODO: 为什么这里可以使用不同线程来acquire ?
# 因为 lock/rlock 的wait方法把 锁给释放掉,
# 然后生成新的锁, 并丢进 condition 的 waiters 列表中,
# 因此这里的三个consumer都可以acquire, 产生了可以并发的假象.
# 最终又因为 producer 中的 notify_all 通知了 waiters 列表中
# 的所有对象, 因此它们一个一个被唤醒并重新.
# wait方法被唤醒后, 又把lock/rlock给acquire了, 因此consumer最后
# 又可以release了, 整个过程饶了一大圈!
t1 = threading.Thread(target=consumer, args=(cond,))
t2 = threading.Thread(target=consumer, args=(cond,))
t3 = threading.Thread(target=consumer, args=(cond,))
t1.start()
t2.start()
t3.start()
time.sleep(10)
producer(cond)
time.sleep(1)
self.assertEqual(sum(consumer_count), 3)
def test_multi_consumer_one_producer_with_lock(self):
cond = threading.Condition(threading.Lock())
consumer_count = []
def consumer(cv):
cv.acquire()
cv.wait()
consumer_count.append(1)
cv.release()
# with cv:
# cv.wait()
# consumer_count.append(1)
def producer(cv):
cv.acquire()
cv.notify_all()
cv.release()
# with cv:
# cv.notify_all()
t1 = threading.Thread(target=consumer, args=(cond,))
t2 = threading.Thread(target=consumer, args=(cond,))
t3 = threading.Thread(target=consumer, args=(cond,))
t1.start()
t2.start()
t3.start()
time.sleep(10)
producer(cond)
time.sleep(1)
self.assertEqual(sum(consumer_count), 3)
测试: tests/main.py
import unittest
TEST_MODULE = [
"ln_threading.tests.condition",
]
if __name__ == '__main__':
suite = unittest.defaultTestLoader.loadTestsFromNames(TEST_MODULE)
runner = unittest.TextTestRunner(verbosity=2)
runner.run(suite)
网友评论