美文网首页
JAVA并发编程(三)线程协作与共享

JAVA并发编程(三)线程协作与共享

作者: RyanLee_ | 来源:发表于2019-07-17 14:53 被阅读0次

    1. 线程中断

    java线程中断是协作式,而非抢占式

    1.1. 线程中断相关方法

    • interrupt()
      将线程的中断标志位置为true,线程是否中断,由线程本身决定。一般情况下,线程的run 函数中通过调用isInterrupted() 判定线程是否应继续执行。
      栗子:
    /**
     * @author Ryan Lee
     */
    public class InterruptThread {
        private static class RunnableDemo implements Runnable{
            @Override
            public void run() {
                String threadName = Thread.currentThread().getName();
                //如果while的条件中不判断isInterrupted。即使主线程或其它线程调用本线程的interrupt()。本线程也不会终止。
                while(1==1 && !Thread.currentThread().isInterrupted()) {
                    try {
                        Thread.currentThread().sleep(100);
                    } catch (InterruptedException e) {
                        System.out.println("睡眠中被中断。截获中断异常后,中断标志位为"+Thread.currentThread().isInterrupted());
                        //此时中断标志位被复位(false)。如不显式调用中断,则线程会继续执行。
                        Thread.currentThread().interrupt();
                    }
                    System.out.println(threadName+" 正在运行");
                }
                System.out.println(threadName+"线程运行完成。中断标志位为" +Thread.currentThread().isInterrupted());
            }           
        }
    
        public static void main(String[] args) throws InterruptedException {
            RunnableDemo runnableDemo = new RunnableDemo();
            Thread interruptThread = new Thread(runnableDemo,"worker");
            interruptThread.start();
            Thread.sleep(2000);
            interruptThread.interrupt();
        }
    }
    

    执行结果

    worker 正在运行
    worker 正在运行
    worker 正在运行
    worker 正在运行
    worker 正在运行
    worker 正在运行
    worker 正在运行
    worker 正在运行
    worker 正在运行
    worker 正在运行
    worker 正在运行
    worker 正在运行
    worker 正在运行
    worker 正在运行
    worker 正在运行
    worker 正在运行
    worker 正在运行
    worker 正在运行
    worker 正在运行
    睡眠中被中断。截获中断异常后,中断标志位为false
    worker 正在运行
    worker线程运行完成。中断标志位为true
    
    • isInterrupted() 判定当前线程是否处于中断状态。
    • interrupted() 判定当前线程是否处于中断状态,同时中断标志位改为false。
    • 方法里如果抛出InterruptedException,线程的中断标志位会被复位成false,如果确实是需要中断线程,要求我们自己在catch语句块里再次调用interrupt()。

    2. 等待通知

    2.1. 等待通知的标准范式

    该范式分为两部分,分别针对等待方和通知方。

    • 等待方:
      a. 获取对象的锁:synchronized(对象)或执行对象中synchronized修饰的方法)
      b. 执行条件判断。如果条件不满足,那么调用对象的wait()方法,释放对象的锁。条件满足则执c行后续的逻辑。注意:等待方被通知后仍需进行条件判断
    • 通知方:
      a. 获取对象的锁:synchronized(对象)或执行对象中synchronized修饰的方法)
      b. 改变条件(一般和被通知方的条件判断相关)
      c. 通知所有等待在对象的线程

    2.2. 等待通知相关方法

    • wait()
      线程从运行状态变为阻塞状态,直到被唤醒。wait操作会释放该线程说持有的锁。
    • notifyAll()
      通知在该对象/资源上wait的所有线程。
    • notify()
      随机通知一个等待资源的线程。不建议使用。容易导致“获得对象锁的线程不满足继续执行的条件;而满足继续执行条件的线程又没有被唤醒”情况的发生。前面的文章中已具体分析过。此处不赘述。
      需要注意的一点是:AQS有类似的方法await(让线程进入阻塞状态),signal、signalAll(通知阻塞线程)。但一般情况下使用的是AQS的await与signal。两种等待通知方式的原理存在差异。后续介绍AQS的文章中会详细分析。

    栗子:模拟数据库连接池

    1. 首先实现java.sql.Connection接口
    import java.sql.*;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.Executor;
    
    public class SqlConnectImpl implements Connection{
        /*获取一个数据库连接*/
        public static final Connection getConnection(){
            return new SqlConnectImpl();
        }
    
        /**
         * 模拟提交
         */
        @Override
        public void commit(){
            try {
                Thread.currentThread().sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
                Thread.currentThread().interrupt();
            }
        }
    
        /**
         * 模拟创建SQL Statement
         * @return
         */
        @Override
        public Statement createStatement()  {
            try {
                Thread.currentThread().sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
                Thread.currentThread().interrupt();
            }
            return null;
        }
        ...
        //后面方法的太多,不一一列出。可使用IDE自动补全
    }
    
    
    1. 然后创建一个数据库连接池对象
    import java.sql.Connection;
    import java.util.LinkedList;
    /**
     * @author Ryan Lee
     */
    public class DBConnectionPool {
        //容纳连接的容器。双向连接列表
        private static LinkedList<Connection> pool = new LinkedList<>();
    
        /**
         * 根据大小初始化连接池
         *
         * @param size 连接池大小
         */
        public DBConnectionPool(int size) {
            if (size > 0) {
                for (int i = 0; i < size; i++) {
                    pool.addLast(SqlConnectImpl.getConnection());
                }
            }
        }
    
        /**
         * 从连接池获取连接
         * @param timeoutMills 超时时间;负数代表不限超时时间
         * @return
         * @throws InterruptedException
         */
        public Connection getConnFromPool(long timeoutMills) throws InterruptedException {
            //获得连接池的锁
            synchronized (pool) {
                //如果不设超时时间
                if (timeoutMills < 0) {
                    //wait判断条件:连接池为空。
                    while (pool.isEmpty()) {
                        pool.wait();
                    }
                    return pool.removeFirst();
                }
                //设置了超时时间
                else {
                    long overtime = System.currentTimeMillis() + timeoutMills;
                    long remain = timeoutMills;
                    //wait判断条件:连接池为空且尚未超时
                    while (pool.isEmpty() && remain > 0) {
                        pool.wait(remain);
                        remain = overtime - System.currentTimeMillis();
                    }
                    Connection result = null;
                    //如果连接池有连接
                    if (!pool.isEmpty()) {
                        result = pool.removeFirst();
                    }
                    return result;
                }
            }
        }
    
        /**
         * 释放数据库连接
         * @param conn
         */
        public void releaseConn(Connection conn) {
            if (conn != null) {
                //获得连接池的锁
                synchronized (pool) {
                    pool.addLast(conn);
                    pool.notifyAll();
                }
            }
        }
    }
    
    1. 最后写一个测试类
    import java.sql.Connection;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.atomic.AtomicInteger;
    /**
     * @author Ryan Lee
     */
    public class DBPoolTest {
        static DBConnectionPool pool = new DBConnectionPool(30);
        // 控制器:控制main线程将会等待所有Woker结束后才能继续执行
        static CountDownLatch countDownLatch;
    
        public static void main(String[] args) throws Exception {
            // 线程数量
            int threadCount = 50;
            countDownLatch = new CountDownLatch(threadCount);
            int opCountPerThread = 20;//每个线程的操作次数
            AtomicInteger got = new AtomicInteger();//计数器:统计可以拿到连接的线程
            AtomicInteger notGot = new AtomicInteger();//计数器:统计没有拿到连接的线程
            for (int i = 0; i < threadCount; i++) {
                //引用传递
                Thread thread = new Thread(new DbOpThread(opCountPerThread, got, notGot),
                        "工作线程_" + i);
                thread.start();
            }
            DBPoolTest.countDownLatch.await();// main线程在此处等待
            System.out.println("尝试获取连接次数: " + (threadCount * opCountPerThread));
            System.out.println("拿到连接的次数:  " + got);
            System.out.println("没拿到连接的次数: " + notGot);
        }
    
        /**
         * 数据库操作线程示例
         */
        static class DbOpThread implements Runnable {
            int opCount;
            //线程安全
            AtomicInteger successTime;
            AtomicInteger failTime;
    
            public DbOpThread(int opCount, AtomicInteger successTime,
                              AtomicInteger failTime) {
                this.opCount = opCount;
                this.successTime = successTime;
                this.failTime = failTime;
            }
            @Override
            public void run() {
                while (opCount > 0) {
                    try {
                        //从连接池获取连接。超时时间1000ms
                        Connection connection = pool.getConnFromPool(1000);
                        if (connection != null) {
                            try {
                                //模拟创建SQL statement
                                connection.createStatement();
                                //模拟提交
                                connection.commit();
                            } finally {
                                //执行完成释放连接
                                pool.releaseConn(connection);
                                successTime.incrementAndGet();
                            }
                        } else {
                            failTime.incrementAndGet();
                            System.out.println(Thread.currentThread().getName()
                                    + "等待超时!");
                        }
                    } catch (Exception ex) {
                    } finally {
                        opCount--;
                    }
                }
                DBPoolTest.countDownLatch.countDown();
            }
        }
    }
    
    

    执行结果

    工作线程_37等待超时!
    工作线程_36等待超时!
    工作线程_41等待超时!
    工作线程_8等待超时!
    工作线程_1等待超时!
    工作线程_12等待超时!
    工作线程_23等待超时!
    工作线程_43等待超时!
    工作线程_35等待超时!
    工作线程_26等待超时!
    工作线程_20等待超时!
    工作线程_25等待超时!
    工作线程_47等待超时!
    工作线程_1等待超时!
    工作线程_49等待超时!
    尝试获取连接次数: 1000
    拿到连接的次数:  985
    没拿到连接的次数: 15
    

    三、其它协作方式

    相关方法

    • join()
      插队,确保执行顺序。如在thread1中执行thread0.join(),表示thread0需要再thread1执行前执行。
      栗子:
    /**
     * @author Ryan Lee
     */
    public class TestJoin {
        static class JumpQueue implements Runnable {
            private Thread thread;//用来插队的线程
            public JumpQueue(Thread thread) {
                this.thread = thread;
            }
    
            public void run() {
                try {
                    System.out.println(thread.getName()+"插队在" +Thread.currentThread().getName()+"前面");
                    thread.join();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+" 执行完成.");
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            Thread previous = Thread.currentThread();//现在是主线程
            for (int i = 0; i < 10; i++) {
                //i=0,previous 是主线程,i=1;previous是i=0这个线程
                Thread thread =
                        new Thread(new JumpQueue(previous), "排队线程"+i);
                thread.start();
                previous = thread;
            }
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName() + " 执行完成.");
        }
    }
    

    执行结果

    main插队在排队线程0前面
    排队线程2插队在排队线程3前面
    排队线程1插队在排队线程2前面
    排队线程0插队在排队线程1前面
    排队线程4插队在排队线程5前面
    排队线程3插队在排队线程4前面
    排队线程5插队在排队线程6前面
    排队线程6插队在排队线程7前面
    排队线程7插队在排队线程8前面
    排队线程8插队在排队线程9前面
    main 执行完成.
    排队线程0 执行完成.
    排队线程1 执行完成.
    排队线程2 执行完成.
    排队线程3 执行完成.
    排队线程4 执行完成.
    排队线程5 执行完成.
    排队线程6 执行完成.
    排队线程7 执行完成.
    排队线程8 执行完成.
    排队线程9 执行完成.
    
    • yield()
      让出cpu的执行权。线程从运行状态变为可运行状态。但是下个时间片,该线程依然有可能被再次选中运行。具体看操作系统的线程调度结果。

    四、相关方法与锁的关系

    • sleep(),yield() 不释放持有的锁
    • wait() 释放持有的锁
    • notify(),notifyAll() 执行之前必须持有锁。方法本身不释放锁。notify()、notifyAll()一般要放在synchronized代码块最后。synchronized代码块执行完成后会释放锁。

    五、常用关键字

    • synchronized内置锁
      使用synchronized修饰一个方法或者一个代码块的时候,能够保证在同一时刻最多只有一个线程执行该段代码。用法在上面的示例中已展示。

    • volatile
      一个共享变量(类的成员变量、类的静态成员变量)被volatile修饰之后,保障了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。但如果同时有多个线程写,会导致结果的不确定性。
      volatile关键字适用于“多读一写”的场景。执行效率比synchronized效率高。

    • ThreadLocal的使用
      ThreadLocal是解决线程安全问题一个很好的思路,它通过为每个线程提供一个独立的变量副本解决了变量并发访问的冲突问题。在很多情况下,ThreadLocal比直接使用synchronized同步机制解决线程安全问题更简单,更方便,且结果程序拥有更高的并发性。
      ThreadLocal提供了线程安全的共享对象,在编写多线程代码时,可以把不安全的变量封装进ThreadLocal。
      栗子:

    /**
     * @author Ryan Lee
     */
    public class TestThreadLocal {
        private static class ThreadLocalVar {
            private static ThreadLocal<Integer> seqNum = ThreadLocal.withInitial(() -> 0);
            public int getNextNum() {
                seqNum.set(seqNum.get() + 1);
                return seqNum.get();
            }
        }
        public static void main(String[ ] args)
        {
            ThreadLocalVar sn = new ThreadLocalVar();
            //③ 3个线程共享sn,各自产生序列号
            TestClient t1 = new TestClient(sn);
            TestClient t2 = new TestClient(sn);
            TestClient t3 = new TestClient(sn);
            t1.start();
            t2.start();
            t3.start();
        }
        private static class TestClient extends Thread
        {
            private ThreadLocalVar sn;
            public TestClient(ThreadLocalVar sn) {
                this.sn = sn;
            }
            public void run()
            {
                //④每个线程打出3个序列值
                for (int i = 0; i < 3; i++) {
                    System.out.println("thread["+Thread.currentThread().getName()+
                            "] seqNum["+sn.getNextNum()+"]");
                }
            }
        }
    }
    

    执行结果:

    thread[Thread-0] seqNum[1]
    thread[Thread-2] seqNum[1]
    thread[Thread-2] seqNum[2]
    thread[Thread-2] seqNum[3]
    thread[Thread-1] seqNum[1]
    thread[Thread-0] seqNum[2]
    thread[Thread-1] seqNum[2]
    thread[Thread-0] seqNum[3]
    thread[Thread-1] seqNum[3]
    

    六、聊一下线程安全

    线程安全性的两个关键点:内存可见性和操作原子性。

    问题解析

    • 问题1:再spring容器中声明的Bean多是singleton模式。基于spring-mvc框架写web应用时,Bean中我们经常会用声明一些非线程安全的私有属性。会不会导致运行结果不准确呢?
      答案是不会。

    • 分析:
      在一般情况下,只有无状态的Bean才可以在多线程环境下共享。在Spring容器中,绝大部分Bean都可以声明为singleton作用域。就是因为Spring容器对一些Bean中非线程安全的“状态性对象”(如:私有属性)采用ThreadLocal进行封装,使之成为线程安全的“状态性对象”。因此具有非线程安全类型私有属性的Bean就能够以singleton的方式在多线程环境中正常工作了。

    • 问题2:偶尔会发现一些项目中声明了一些非线程安全的final静态变量/成员属性。(如SimpleDateFormat)以为不去修改就线程安全了。这种观点是错误的。
      如果final变量修饰的是原生类型的变量,则变量的值不可被修改。而final修饰的非原生类型的变量如SimpleDateFormat,其成员属性的值仍可被修改。
      SimpleDateFormat源码:

    public class SimpleDateFormat extends DateFormat {
    ...
     @Override
        public StringBuffer format(Date date, StringBuffer toAppendTo,
                                   FieldPosition pos)
        {
            pos.beginIndex = pos.endIndex = 0;
            return format(date, toAppendTo, pos.getFieldDelegate());
        }
    
        // Called from Format after creating a FieldDelegate
        private StringBuffer format(Date date, StringBuffer toAppendTo,
                                    FieldDelegate delegate) {
            // 重点在这..
            calendar.setTime(date);
    
            boolean useDateFormatSymbols = useDateFormatSymbols();
    
            for (int i = 0; i < compiledPattern.length; ) {
                int tag = compiledPattern[i] >>> 8;
                int count = compiledPattern[i++] & 0xff;
                if (count == 255) {
                    count = compiledPattern[i++] << 16;
                    count |= compiledPattern[i++];
                }
    
                switch (tag) {
                case TAG_QUOTE_ASCII_CHAR:
                    toAppendTo.append((char)count);
                    break;
    
                case TAG_QUOTE_CHARS:
                    toAppendTo.append(compiledPattern, i, count);
                    i += count;
                    break;
    
                default:
                    subFormat(tag, count, delegate, toAppendTo, useDateFormatSymbols);
                    break;
                }
            }
            return toAppendTo;
        }
    ...
    }
    

    DateFormat源码:

    public abstract class DateFormat extends Format {
        /**
         * The {@link Calendar} instance used for calculating the date-time fields
         * and the instant of time. This field is used for both formatting and
         * parsing.
         *
         * <p>Subclasses should initialize this field to a {@link Calendar}
         * appropriate for the {@link Locale} associated with this
         * <code>DateFormat</code>.
         * @serial
         */
        protected Calendar calendar;
    ...
        /**
         * Formats a Date into a date/time string.
         * @param date a Date to be formatted into a date/time string.
         * @param toAppendTo the string buffer for the returning date/time string.
         * @param fieldPosition keeps track of the position of the field
         * within the returned string.
         * On input: an alignment field,
         * if desired. On output: the offsets of the alignment field. For
         * example, given a time text "1996.07.10 AD at 15:08:56 PDT",
         * if the given fieldPosition is DateFormat.YEAR_FIELD, the
         * begin index and end index of fieldPosition will be set to
         * 0 and 4, respectively.
         * Notice that if the same time field appears
         * more than once in a pattern, the fieldPosition will be set for the first
         * occurrence of that time field. For instance, formatting a Date to
         * the time string "1 PM PDT (Pacific Daylight Time)" using the pattern
         * "h a z (zzzz)" and the alignment field DateFormat.TIMEZONE_FIELD,
         * the begin index and end index of fieldPosition will be set to
         * 5 and 8, respectively, for the first occurrence of the timezone
         * pattern character 'z'.
         * @return the string buffer passed in as toAppendTo, with formatted text appended.
         */
        public abstract StringBuffer format(Date date, StringBuffer toAppendTo,
                                            FieldPosition fieldPosition);
    
        /**
         * Formats a Date into a date/time string.
         * @param date the time value to be formatted into a time string.
         * @return the formatted time string.
         */
        public final String format(Date date)
        {
            return format(date, new StringBuffer(),
                          DontCareFieldPosition.INSTANCE).toString();
        }
    ...
    }
    
    • 问题分析:
      通过以上源码可以看到。我们常用的SimpleDateFormat.format(Date date)方法继承自DateFormat。
      而DateFormat.format(Date date)方法调用了抽象方法: abstract StringBuffer format(Date date, StringBuffer toAppendTo,FieldPosition fieldPosition)。SimpleDateFormat实现了该抽象方法。
      在SimpleDateFormat实现了public StringBuffer format(Date date, StringBuffer toAppendTo,FieldPosition fieldPosition)。该方法调用了私有方法 private StringBuffer format(Date date, StringBuffer toAppendTo,FieldDelegate delegate)。而在这个私有方法中修改了成员属性calendar的值。
      在多线程环境下:如果线程A 调用了全局静态变量sdf的format(date1)方法,执行了calendar.setTime(date1),尚未执行后续语句。此时线程B又调用sdf.format(date2) 执行了calendar.setTime(date2)方法。然后线程A执行format方法后续语句。这样 线程A中执行的sdf.format(date1) 的到的结果就是date2的格式化的值。
      上面只是一个简单的例子,实际情况要复杂的多。
      多线程环境下使用非线程安全的静态变量会导致结果的不确定性和一些其他的问题

    • 问题延伸:多线程环境下,建议使用线程安全的共享对象、静态变量。如果需要在多线程环境下使用非线程安全的变量。该如何做呢?
      以SimpleDateFormat的使用为例,有三种方式:1. 不声明静态变量/共享变量。在用到的方法中声明SimpleDateFormat局部变量。2. 使用synchronized对SimpleDateFormat静态变量/共享变量加锁3. 使用ThreadLocal包装静态变量/共享变量。

    关于线程安全机制,后续的文章会进行详细的分析,这里不留过多篇幅。

    相关文章

      网友评论

          本文标题:JAVA并发编程(三)线程协作与共享

          本文链接:https://www.haomeiwen.com/subject/yimdkctx.html