美文网首页
Java 线程同步与实现

Java 线程同步与实现

作者: Drew_MyINTYRE | 来源:发表于2022-04-23 15:31 被阅读0次

    为何要使用 Java 线程同步?

    当多个线程同时操作一个可共享的资源变量时,将会导致数据不准确,相互之间产生冲突,因此加入同步锁以避免在该线程没有完成操作之前,被其他线程的调用,从而保证了该变量的唯一性和准确性。

    Java 中提供了很多线程同步操作,比如:synchronized 关键字、wait/notifyAllReentrantLockCondition、一些并发包下的工具类、SemaphoreThreadLocalAbstractQueuedSynchronizer 等。本文主要说明一下这几种同步方式的使用及优劣。

    ReentrantLock 可重入锁

    对于同一个线程,可以继续调用加锁的方法,而不会被挂起。可重入锁内部维护一个计数器,对于同一个线程调用 lock 方法,计数器 +1,调用 unlock 方法,计数器-1。怎么理解呢?看看下面的例子:

    private ReentrantLock lock = new ReentrantLock();
    
    public void execute() {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " lock!");
            try {
                anotherLock();
                Thread.sleep(5000l);
            } catch (InterruptedException e) {
                System.err.println(Thread.currentThread().getName() + " interrupted");
                Thread.currentThread().interrupt();
            }
        } finally {
            lock.unlock();
        }
    }
    
    public void anotherLock() {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " lock again!");
        } finally {
            lock.unlock();
        }
    }
    

    输出:

    Thread-0 lock!
    Thread-0 lock again!
    

    在一个加锁方法 execute() 中调用另外一个加锁方法 anotherLock() 并不会被挂起(不用等待锁,就不需要被挂起),可以直接调用(调用 execute 方法时计数器+1,然后内部又调用了 anotherLock 方法,计数器 +1,变成了2)。

    synchronized

    class MainActivity : AppCompatActivity() {
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_main)
            Thread {
                execute()
            }.apply {
                name = "thread-A"
            }.start()
    
            Thread {
                execute()
            }.apply {
                name = "thread-B"
            }.start()
        }
    
        @Synchronized
        fun execute() {
            Log.i("WWE", "${Thread.currentThread().name} -> synchronized called")
            try {
                anotherSynchronized()
                Thread.sleep(1500)
            } catch (ex: InterruptedException) {
                Thread.currentThread().interrupt();
            }
        }
    
        @Synchronized
        fun anotherSynchronized() {
            Log.i("WWE", "${Thread.currentThread().name} -> anotherSynchronized called")
        }
    }
    

    使用 synchronized 代码块同步关键代码即可,没有必要同步整个方法,同步是一种高开销的操作,因此应该尽量减少同步的内容。

    关于 Lock 对象和 synchronized 两种锁选择的考量:

    1,最好两个都不用,使用 java.util.concurrent 包提供的机制,能够帮助用户处理所有与锁相关的代码。

    2,如果 synchronized 关键字能满足用户的需求,就用 synchronized,因为它能简化代码。

    3,如果需要更高级的功能,就用 ReentrantLock 类,此时要注意及时释放锁,否则会出现死锁,通常在 finally 代码释放锁。

    ReentrantLock 有提供 tryLock 方法,可以设置超时时间,如果超过了这个时间还没有获取到锁,就会放弃。ReentrantLock 可以使用多个 Condition,可以中断一个试图获得锁的线程,ReentrantLock 可以选择公平锁和非公平锁,ReentrantLock 可以获得正在等待线程的个数,计数器等;

    Condition 条件对象

    对于一个已经拿到了 Lock 锁的线程,如果该线程需要等待某个条件才会执行,这种情况就考虑使用 Condition 条件对象。

    Condition 可以替代传统的线程间通信,用 await() 替换 wait(),用 signal() 替换 notify(),用 signalAll() 替换 notifyAll()

    为什么方法名不直接叫 wait()/notify()/nofityAll()?因为 Object 的这几个方法是 final 的,不可重写!

    class MainActivity : AppCompatActivity() {
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_main)
    
            val lock = ReentrantLock()
            val condition = lock.newCondition()
    
            Thread {
                lock.lock()
                try {
                    // do sth
                    try {
                        condition.await()
                        Log.i("WWE", "${Thread.currentThread().name} -> i waked up, more strong!")
                    } catch (ex: InterruptedException) {
                        Thread.currentThread().interrupt()
                    }
                } finally {
                    lock.unlock()
                }
            }.apply {
                name = "thread-A"
            }.start()
    
            Thread {
                lock.lock()
                try {
                    // do sth
                    try {
                        Thread.sleep(3000)
                        Log.i("WWE", "${Thread.currentThread().name} -> wake up from dream")
                    } catch (ex: InterruptedException) {
                        Thread.currentThread().interrupt()
                    }
                    condition.signalAll()
                } finally {
                    lock.unlock()
                }
            }.apply {
                name = "thread-B"
            }.start()
        }
    }
    

    输出:

    2022-04-22 21:32:20.790 13761-13786/com.dev I/WWE: thread-B -> wake up from dream
    2022-04-22 21:32:20.790 13761-13785/com.dev I/WWE: thread-A -> i waked up, more strong!
    

    这个例子中 thread-A 执行到 condition.await() 时,thread-A 会被挂起,直到thread-B 调用了 condition.signalAll() 方法之后,thread-A 才会重新被激活执行。

    这里需要注意的是 thread-A 调用 Condition 的 await() 方法之后,thread-A 线程释放锁,然后马上加入到 Condition 的等待队列中,由于 thread-A 释放了锁,thread-B 获得锁并执行,thread-B 执行 signalAll() 方法之后,Condition中的等待队列 thread-A 被取出并加入到 AQS 中,接下来 thread-B 执行完毕之后释放锁,由于 thread-A 已经在 AQS 的等待队列中,所以 thread-A 被唤醒,继续执行。

    Condition 是被绑定到 Lock 上的,要创建一个 LockCondition 必须用 newCondition() 方法。传统线程的通信方式,Condition 都可以实现。Condition 的强大之处在于它可以为多个线程间建立不同的 Condition

    wait&notify/notifyAll 方式

    class MainActivity : AppCompatActivity() {
    
        private val obj = Object()
    
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_main)
    
            Thread {
                doWait()
            }.apply {
                name = "thread-A"
            }.start()
    
            Thread {
                doNotify()
            }.apply {
                name = "thread-B"
            }.start()
        }
    
        private fun doWait() {
            synchronized(obj) {
                try {
                    Log.i("WWE", "${Thread.currentThread().name} #doWait")
                    obj.wait()
                    Log.i("WWE", "${Thread.currentThread().name} wake up")
                } catch (ex: InterruptedException) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    
        private fun doNotify() {
            synchronized(obj) {
                try {
                    Log.i("WWE", "${Thread.currentThread().name} #doNotify")
                    Thread.sleep(3000)
                    obj.notifyAll()
                    Log.i("WWE", "${Thread.currentThread().name} notifyAll")
                } catch (ex: InterruptedException) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
    

    输出:

    2022-04-23 01:02:31.160 15948-15987/com.dev I/WWE: thread-A doWait()
    2022-04-23 01:02:31.161 15948-15988/com.dev I/WWE: thread-B doNotify()
    2022-04-23 01:02:34.163 15948-15988/com.dev I/WWE: thread-B notifyAll
    2022-04-23 01:02:34.163 15948-15987/com.dev I/WWE: thread-A wake up
    

    这里需要注意的是 调用 wait/notifyAll 方法的时候一定要获得当前线程的锁,否则会发生 IllegalMonitorStateException 异常。

    thread.join() 方法

    class MainActivity : AppCompatActivity() {
        
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_main)
    
            val thread = Thread {
                Log.i("WWE", "${Thread.currentThread().name} run")
            }.apply {
                name = "thread-A"
            }
            thread.start()
    
            try {
                thread.join()
            } catch (ex: InterruptedException) {
                ex.printStackTrace()
            }
    
            Log.i("WWE", "${Thread.currentThread().name} run")
        }
    }
    

    输出:

    2022-04-23 01:10:06.870 16135-16159/com.dev I/WWE: thread-A run
    2022-04-23 01:10:06.870 16135-16135/com.dev I/WWE: main run
    

    Thread.yield() 方法

    Yield 方法可以暂停当前正在执行的线程对象,让其它有相同优先级的线程执行。它是一个静态方法而且 只保证当前线程放弃 CPU 占用,而不能保证使其它线程一定能占用 CPU,执行 yield() 的线程有可能在进入到暂停状态后马上又被执行。

    class MainActivity : AppCompatActivity() {
    
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_main)
            Thread {
                doSth()
            }.apply {
                name = "Thread-A"
            }.start()
    
           Thread {
                doSth()
            }.apply {
                name = "Thread-B"
            }.start()
    
            Thread.sleep(3000)
    
            Log.i("WWE", "${Thread.currentThread().name} run")
        }
    
        @Synchronized
        private fun doSth() {
            for(i in 0..3) {
                Log.i("WWE", "${Thread.currentThread().name} run")
                if("Thread-A" == Thread.currentThread().name && i == 1) {
                    Thread.yield()
                }
            }
        }
    }
    

    Thread.sleep() 方法

    在指定的时间内无法被唤醒,同时也不会释放对象锁(如果当前已经持有锁),该方法告诉操作系统在指定时间内不需为该线程分配执行时间片,实际上,调用 sleep() 方法时并不要求持有任何锁,也就不需要包裹在 synchronized 中。

    class MainActivity : AppCompatActivity() {
    
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_main)
    
            for (i in 0 until 2) {
                Thread {
                    try {
                        Log.i("WWE", "${Thread.currentThread().name} before")
                        Thread.sleep(5000)
                        Log.i("WWE", "${Thread.currentThread().name} after")
                    } catch (ex: InterruptedException) {
                        ex.printStackTrace()
                    }
                }.apply {
                    name = "Thread-$i"
                }.start()
            }
    
            Log.i("WWE", "${Thread.currentThread().name} run")
        }
    }
    

    输出:

    2022-04-23 02:48:45.438 19188-19188/com.dev I/WWE: main run
    2022-04-23 02:48:45.438 19188-19211/com.dev I/WWE: Thread-0 before
    2022-04-23 02:48:45.439 19188-19212/com.dev I/WWE: Thread-1 before
    2022-04-23 02:48:50.441 19188-19212/com.dev I/WWE: Thread-1 after
    2022-04-23 02:48:50.441 19188-19211/com.dev I/WWE: Thread-0 after
    

    ThreadLocal

    ThreadLocal 是一种把变量放到线程本地的方式来实现线程同步的。ThreadLocal 与同步机制都是为了解决多线程中相同变量的访问冲突问题。

    class MainActivity : AppCompatActivity() {
    
        // SimpleDateFormat 不是一个线程安全的类,可以使用 ThreadLocal 实现同步
        private val dateFormatThreadLocal = object : ThreadLocal<SimpleDateFormat>() {
            override fun initialValue(): SimpleDateFormat? {
                return SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
            }
        }
    
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_main)
    
            Thread {
                Log.i(
                    "WWE", "${Thread.currentThread().name} -> ${
                        dateFormatThreadLocal.get().format(
                            Date()
                        )
                    }"
                )
            }.apply {
                name = "thread-A"
            }.start()
    
            Thread {
                Log.i(
                    "WWE", "${Thread.currentThread().name} -> ${
                        dateFormatThreadLocal.get().format(
                            Date()
                        )
                    }"
                )
            }.apply {
                name = "thread-B"
            }.start()
        }
    }
    

    输出:

    2022-04-23 01:48:46.363 17102-17127/com.dev I/WWE: thread-A -> 2022-04-23 01:48:46
    2022-04-23 01:48:46.364 17102-17128/com.dev I/WWE: thread-B -> 2022-04-23 01:48:46
    

    Semaphore 信号量

    Semaphore 用于控制在同一个时间允许访问线程的个数,保证线程可以被合理的使用,可以使用构造器初始化同一时间允许被访问线程的个数:

    class MainActivity : AppCompatActivity() {
        private val semaphore = Semaphore(2)
    
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_main)
            for (i in 0 until 5) {
                Thread {
                    try {
                        semaphore.acquire()
                        Log.i("WWE", "${Thread.currentThread().name} run")
                        Thread.sleep(5000)
                        semaphore.release()
                    } catch (ex: InterruptedException) {
                        ex.printStackTrace()
                    }
                }.apply {
                    name = "Thread-$i"
                }.start()
            }
        }
    }
    

    输出:

    2022-04-23 02:33:55.669 18447-18474/com.dev I/WWE: Thread-1 run
    2022-04-23 02:33:55.669 18447-18476/com.dev I/WWE: Thread-3 run
    2022-04-23 02:34:00.670 18447-18475/com.dev I/WWE: Thread-2 run
    2022-04-23 02:34:00.670 18447-18473/com.dev I/WWE: Thread-0 run
    2022-04-23 02:34:05.671 18447-18477/com.dev I/WWE: Thread-4 run
    

    可以看出同一时间内,只有2个线程可以被同时访问,因为构造函数里传的是2。

    CountDownLatch

    CountDownLatch 是一个计数器,它的构造方法中需要设置一个数值,用来设定计数的次数。每次调用 countDown() 方法之后,这个计数器都会减去1,CountDownLatch 会一直阻塞着调用 await() 方法的线程,直到计数器的值变为0。

    class MainActivity : AppCompatActivity() {
        private val countDownLatch = CountDownLatch(5)
    
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_main)
            for(i in 0 until 5) {
                Thread {
                    Log.i("WWE", "${Thread.currentThread().name} sleep before")
                    try {
                        Thread.sleep(5000)
                    } catch (ex: InterruptedException) {
                        ex.printStackTrace()
                    }
                    Log.i("WWE", "${Thread.currentThread().name} sleep after")
                    countDownLatch.countDown()
                }.apply {
                    name = "Thread-$i"
                }.start()
            }
    
            try {
                countDownLatch.await()
            } catch (ex: InterruptedException) {
                ex.printStackTrace()
            }
    
            Log.i("WWE", "${Thread.currentThread().name} run")
        }
    }
    

    输出:

    2022-04-23 05:30:20.426 20564-20588/com.dev I/WWE: Thread-0 sleep before
    2022-04-23 05:30:20.426 20564-20592/com.dev I/WWE: Thread-4 sleep before
    2022-04-23 05:30:20.426 20564-20589/com.dev I/WWE: Thread-1 sleep before
    2022-04-23 05:30:20.426 20564-20591/com.dev I/WWE: Thread-3 sleep before
    2022-04-23 05:30:20.426 20564-20590/com.dev I/WWE: Thread-2 sleep before
    2022-04-23 05:30:25.429 20564-20589/com.dev I/WWE: Thread-1 sleep after
    2022-04-23 05:30:25.429 20564-20590/com.dev I/WWE: Thread-2 sleep after
    2022-04-23 05:30:25.429 20564-20588/com.dev I/WWE: Thread-0 sleep after
    2022-04-23 05:30:25.430 20564-20592/com.dev I/WWE: Thread-4 sleep after
    2022-04-23 05:30:25.430 20564-20591/com.dev I/WWE: Thread-3 sleep after
    2022-04-23 05:30:25.430 20564-20564/com.dev I/WWE: main run
    

    当线程调用 CountDownLatch 的 await 方法时,便会尝试获取 共享锁,不过一开始通常获取不到锁,于是线程被阻塞。共享锁 可获取到的条件是 锁计数器 的值为 0,而 锁计数器 的初始值为 count,当每次调用 CountDownLatch 对象的 countDown 方法时,也可以把 锁计数器 - 1。通过这种方式,调用 count 次 countDown 方法之后,锁计数器 就为 0 了,于是之前等待的线程就会继续运行了,并且此时如果再有线程想调用 await 方法时也会被立刻放行,不会再去做任何阻塞操作了。

    使用原子变量实现线程同步

    什么是原子操作呢?

    原子操作就是指将 读取变量修改变量保存变量 看成一个整体来操作,即这几种行为要么同时完成,要么都不完成。

    java.util.concurrent.atomic 包中提供了创建原子类型变量的工具类,使用该类可以简化线程同步。比如:其中 AtomicInteger 以原子方式更新 int 的值:

    class Bank {
        private AtomicInteger account = new AtomicInteger(100);
    
        public AtomicInteger getAccount() {
            return account;
        }
    
        public void save(int money) {
            account.addAndGet(money);
        }
    }
    

    AbstractQueuedSynchronizer

    AQS 是很多同步工具类的基础,比如:ReentrantLock 里的公平锁和非公平锁,Semaphore 里的公平锁和非公平锁,CountDownLatch 里的锁等他们的底层都是使用 AbstractQueuedSynchronizer 完成的。在实际开发当中,应当尽量远离底层结构。下面基于 AbstractQueuedSynchronizer 自定义实现一个独占锁。

    class MySynchronizer : AbstractQueuedSynchronizer() {
    
        override fun tryAcquire(arg: Int): Boolean {
            if (compareAndSetState(0, 1)) {
                exclusiveOwnerThread = Thread.currentThread()
                return true
            }
            return false
        }
    
        override fun tryRelease(arg: Int): Boolean {
            state = 0
            exclusiveOwnerThread = null
            return true
        }
    
        fun lock() {
            acquire(1)
        }
    
        fun unlock() {
            release(1)
        }
    }
    
    class MainActivity : AppCompatActivity() {
        private val mySynchronizer = MySynchronizer()
    
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_main)
            Thread {
                mySynchronizer.lock()
                try {
                    Log.i("WWE", "${Thread.currentThread().name} run before")
                    try {
                        Thread.sleep(5000)
                    } catch (ex: InterruptedException) {
                        Thread.currentThread().interrupt()
                    }
                    Log.i("WWE", "${Thread.currentThread().name} run after")
                } finally {
                    mySynchronizer.unlock()
                }
            }.apply {
                name = "Thread-A"
            }.start()
    
            Thread {
                mySynchronizer.lock()
                try {
                    Log.i("WWE", "${Thread.currentThread().name} run")
                } finally {
                    mySynchronizer.unlock()
                }
            }.apply {
                name = "Thread-B"
            }.start()
    
            Log.i("WWE", "${Thread.currentThread().name} run")
        }
    }
    

    输出:

    2022-04-23 07:26:52.498 22603-22603/com.dev I/WWE: main run
    2022-04-23 07:26:52.498 22603-22628/com.dev I/WWE: Thread-B run
    2022-04-23 07:26:52.498 22603-22627/com.dev I/WWE: Thread-A run before
    2022-04-23 07:26:57.499 22603-22627/com.dev I/WWE: Thread-A run after
    

    使用阻塞队列实现线程同步

    LinkedBlockingQueue 是一个基于链表的队列,先进先出的顺序(FIFO),范围任意的 blocking queue。

    相关文章

      网友评论

          本文标题:Java 线程同步与实现

          本文链接:https://www.haomeiwen.com/subject/jexvertx.html