美文网首页
python--线程lock

python--线程lock

作者: 极光火狐狸 | 来源:发表于2018-09-22 17:01 被阅读14次

    源码: tests/lock.py

    # -.- coding:utf-8 -.-
    import time
    import threading
    import unittest
    
    
    py3 = hasattr(threading, "main_thread") is True
    py2 = not py3
    
    
    class TestLock(unittest.TestCase):
    
        def test_init_lock_status_is_false(self):
            lock = threading.Lock()
            self.assertFalse(lock.locked())
    
        @unittest.skipIf(py2, u"锁上锁python2不支持timeout功能!")
        def test_locked_can_not_lock_again_py3(self):
            lock = threading.Lock()
            lock.acquire()
            self.assertRaises(excClass=threading.ThreadError,
                              callableObj=lock.acquire, **{"timeout": 5})
    
        def test_a_thread_lock_another_thread_release(self):
            """
            A 线程加锁后, B线程可以强行解锁.
            :return:
            """
            lock = threading.Lock()
            lock.acquire()
            lifetime = {"status": "init"}
    
            def release_lock(the_lock, the_lifetime):
                the_lock.release()
                the_lifetime.update({"status": "Done"})
    
            t = threading.Thread(target=release_lock, args=(lock, lifetime))
            t.start()
    
            while lifetime.get("status") != "Done":
                print("debug:")
                time.sleep(1)
    
            self.assertFalse(lock.locked())     # 无锁状态
    
    

     
     

    测试: tests/main.py

    import unittest
    
    
    TEST_MODULE = [
        "ln_threading.tests.lock",
    ]
    
    
    if __name__ == '__main__':
        suite = unittest.defaultTestLoader.loadTestsFromNames(TEST_MODULE)
        runner = unittest.TextTestRunner(verbosity=2)
        runner.run(suite)
    
    

    相关文章

      网友评论

          本文标题:python--线程lock

          本文链接:https://www.haomeiwen.com/subject/fndxoftx.html