美文网首页
Thinking in java 之并发其二:资源共享

Thinking in java 之并发其二:资源共享

作者: Tearsweet | 来源:发表于2018-10-17 11:17 被阅读13次

    一、 多线程资源共享问题

    在单线程的情况下,我们很少去考虑资源冲突的问题。而在多线程中,单个实例的某个方法或者变量会经常出现被多个线程访问的情况。最常见的问题,在线程A访问f()进行到一半时,线程B也调用了f()方法。这很容易导致资源使用时出现我们不愿意见到的情况。比如下面这个例子。
    Thinking in Java 中,以生产整数作为测试用例,先用一个虚拟类作为生产整数的标准

    public abstract class IntGenerator {
    
        private volatile boolean canceled = false;
        public abstract int next();
        public void cancel() {
            canceled = true;
        }
        public boolean isCanceled() {
            return canceled;
        }
    }
    

    现在,继续创建一个用来生成偶数的类:

    public class EvenGenerator extends IntGenerator {
    
        private int currentEvenValue = 0;
        @Override
        public int next() {
            // TODO Auto-generated method stub
            ++currentEvenValue;
            Thread.yield();
            ++currentEvenValue;
            return currentEvenValue;
        }
        public static void main(String[] args) {
            EvenChecker.test(new EvenGenerator());
        }
    }
    

    EvenGenerator 是一个偶数生成器,它包含一个变量 currentEvenValue,初始值是0。同时,它的 next() 方法会对 currentEvenValue 进行两次自增操作,并返回自增后的值。在理想情况下,我们每次通过next() 获得的都是偶数。

    但是,当多个任务对 next() 进行调用时,是否会出现,currentEvenValue 完成第一次自增之后,其他任务也开始调用 next() 并且自增两次,此时,我们将会获得一个奇数。为了证明这一点,我们通过 EvenChecker 来对 EvenGenerator 进行多线程操作。

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    
    public class EvenChecker implements Runnable {
    
        private IntGenerator generator;
        private final int id;
        public EvenChecker(IntGenerator generator,int ident) {
            this.generator = generator;
            this.id = ident;
        }
        @Override
        public void run() {
            // TODO Auto-generated method stub
            while(!generator.isCanceled()) {
                int val=generator.next();
                if(val % 2 != 0) {
                    System.out.println(val + " not even!");
                    generator.cancel();
                }else {
                    System.out.println(val + " is even!");
                }
    
            }
    
        }
    
        public static void test(IntGenerator generator,int count) {
            System.out.println("print Ctrl+C to exit");
            ExecutorService exec = Executors.newCachedThreadPool();
            for(int i=0;i<count;i++) {
                exec.execute(new EvenChecker(generator,i));
            }
            exec.shutdown();
        }
        public static void test(IntGenerator generator) {
            test(generator,10);
        }
    
    }
    

    EvenChecker 会创建多个线程,每个线程会都对不断的调用 EvenGenerator 的 next() 方法。当 next() 返回一个偶数时,该线程会继续进行对 next() 的调用。而当出现奇数时,任务被终止。

    无论实验多少次,EvenChecker 总会在某个时刻终止,说明,的确会出现上文所述的情况。(注意:main 方法在 EvenGenerator 里)

    在进行多线程开发,共享资源需要被谨慎处理。通过一些手段,可以保证,当一个任务使用某个资源时,其他任务只能等待该任务使用完成。

    二、给资源上锁

    一个行之有效的办法是在对出现资源冲突的方法或代码块使用 synchronized 关键字。
    对于一个特定对象,当一个任务在使用被 synchronized 修饰的资源时,对象里所有被 synchronized 的资源都会被锁定,我们将之称之为“上锁”,而“解锁”则是在任务完成对资源的调用之后自动实现。“解锁”之后的资源可以再一次被其他任务使用。
    现在使用 synchronized 完善上文的代码。
    首先需要建立一个偶数生成器,并用 synchronized 修饰它的 next() 方法。

    public class SynchronizedGenerator extends IntGenerator {
    
        private int currentEvenValue = 0;
        @Override
        public synchronized int next() {
            // TODO Auto-generated method stub
            ++currentEvenValue;
            Thread.yield();
            ++currentEvenValue;
            return currentEvenValue;
        }
    
        public static void main(String[] args) {
            EvenChecker.test(new SynchronizedGenerator());
        }
    
    }
    

    然后,用 EvenChecker 来使用 synchronizedGenerator。结果是,除非我么手动停止,否则程序任务将会无限的循环下去。

    synchronized 除了可以修饰方法,也可以修饰方法内部的某个代码块(通常称这个代码块为“临界区”)。因此,当我们只是想防止方法中的部分代码(而不是整个方法)被多个线程同时访问时,也可以使用synchronized。被 synchronized 修饰的代码块也被成为“同步控制块”。

    synchronized 的上锁和解锁过程,是 java 帮我们自动去实现的。如果需要一个显性的上锁和解锁过程,可以使用 java.util.concurrent.locks中的显示互斥机制。我们可以在程序运行到某个位置时上锁或者解锁。下面的代码,是使用 lock 实现互斥的例子。

    package ThreadTest.SycnSourceTest;
    
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class MutexEvenGenerator extends IntGenerator {
    
        private int currentEvenValue = 0;
        private Lock lock = new ReentrantLock();
        @Override
        public int next() {
            // TODO Auto-generated method stub
            lock.lock();
            try {
                ++currentEvenValue;
                Thread.yield();
                ++currentEvenValue;
                return currentEvenValue;
            }finally {
                lock.unlock();
            }
    
        }
        public static void main(String[] args) {
            EvenChecker.test(new MutexEvenGenerator());
        }
    }
    

    从运行结果来看,lock 的确起到了和 synchronized 同等的效果。

    为了保证在任务的最后都能够正确的解锁,我们必须在 finally 块中对 lock 进行解锁。

    除了能够显性的执行“锁”操作,lock 还可以用来实现“如果一段时间未能获取锁,则放弃获取锁这一行为”的操作。我们甚至能够自己指定“获取锁”这一行为的尝试时间。

    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class AttemptLocking {
    
        private ReentrantLock lock = new ReentrantLock();
        public void untimed() {
            boolean captured = lock.tryLock();
            try {
                System.out.println("tryLock(): "+captured);
            }finally {
                if(captured)
                    lock.unlock();
            }
        }
    
        public void timed() {
            boolean captured = false;
            try {
                captured = lock.tryLock(2, TimeUnit.SECONDS);
            }catch(InterruptedException e) {
                throw new RuntimeException();
            }
            try {
                System.out.println("tryLock(2,TimeUnit.SECONDS): "+captured);
            }finally {
                if(captured)
                    lock.unlock();
            }
        }
        public static void main(String[] args) throws InterruptedException {
            final AttemptLocking al = new AttemptLocking();
            al.untimed();
            al.timed();
            new Thread() {
                {setDaemon(true);}
                public void run() {
                    al.lock.lock();
                    System.out.println("acquired");
                }
            }.start();
            TimeUnit.MILLISECONDS.sleep(1000);
            al.untimed();
            al.timed();
        }
    
    }
    /*output:
    tryLock(): true
    tryLock(2,TimeUnit.SECONDS): true
    acquired
    tryLock(): false
    tryLock(2,TimeUnit.SECONDS): false*/
    

    这段程序时这样的,主程序第一次调用 al.timed 和 al.timed 的时候,它们都顺利的获得锁。然后我们 新建了一个 Thread 这个 Thread 获取的 al 的锁,并且一直没有释放,所以当我们再执行 al.timed 和 al.timed 是,就会出现获取失败的结果。

    tyrLock 有自己的默认尝试时间,或者我们通过构造参数的方式去定义它的尝试时间,尝试时间结束之后,tryLock会放弃获取锁的操作。

    三、原子性和易变性

    原子操作是指不能被线程调度机制中断的操作,即,一旦操作发生,它必然会在切换到其他线程之前完成。但是“原子操作不需要进行同步控制”是一个错误的结论。

    原子性可以应用于除了 long 和 double 之外的所有基本类型之上的操作,可以保证它们会被当做不可分(原子)的操作来操作内存。但是,通过在定义 long 和 double 时使用 volatile 关键字就会获得(简单的赋值与返回值操作)原子性。

    同事,volatile 还确保了应用 中的可视性。如果将一个域声明为 volatile,那么只要对这个域产生写操作,那么所有的读操作就都可以看到这个修改。简而言之,volatile 域上发生的变化会变立刻写入到主存中。

    volatile 与 sychronized:如果一个域会被多个任务访问,那么它应该是 volatile 的,否则这个域就应该只能经由同步来访问。如果一个域已经用 sychronized 来防护,那就不必将其设置为 volatile的。相比较于 volatile,更应该优先使用sychronized。

    为了满足一些性能优化需求,java 为我们提供了,AtomicInteger、AtomicLong、AtomReference 等特殊的原子性变量类。这些类的操作是机器级别的原子操作,因此在使用它们时,不必担心。

    一个任务所有的写入操作,对于这个任务的读操作都是可视的,因此,如果它只需要保证这个任务的内部可视,不必将其设置为 volatile的。

    重点:当一个域的值依赖于它之前的值(比如递增一个计数器),volatile 就无非进行工作。或者某个域的值收到其他域的值限制,它也无法工作。(例如,Range 类的 lower 和 upper 边界必须遵循 lower <= upper 的限制)。

    四、在其他对象上同步

    首先,先看下面这段代码:

    import java.util.concurrent.TimeUnit;
    
    class DualSynch {
        private Object syncObject = new Object();
        public synchronized void f() {
            for(int i=0;i<5;i++) {
                System.out.println("f()");
                try {
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        public void g() {
            synchronized(syncObject) {
                for(int i=0;i<5;i++) {
                    System.out.println("g()");
                    try {
                        TimeUnit.MILLISECONDS.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    
    public class SyncObject{
        public static void main(String[] args) {
            final DualSynch ds = new DualSynch();
            new Thread() {
                public void run() {
                    ds.f();
                }
            }.start();
            ds.g();
        }
    }
    

    输出的结果告诉我们,f() 和 g() 这两个方法显然不受同步控制的影响。这是因为它们的锁是两个不同的锁,f() 锁针对的事该对象自己(this),而g() 锁针对的是 syncObject。如果我们将 synchronized(syncObject) 换成 synchronized(this),就会得到不一样的结果。

    五、线程的本地存储

    防止任务在共享资源上产生冲突的第二种方式是根除对变量的共享。线程的本地存储可以为每个任务创造一个相应存储块,即如果有5个任务需要用到变量 x,本地线程就会生成5个用于 X 的不同的存储块。

    package ThreadTest.SycnSourceTest.concurrency;
    
    import java.util.Random;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    class Accessor implements Runnable{
        private final int id;
        public Accessor(int idn) {id=idn;}
        public void run() {
            while(!Thread.currentThread().isInterrupted()) {
                ThreadLocalVariableHolder.increment();
                System.out.println(this);
            }
        }
        public String toString() {
            return "#"+id+": "+ThreadLocalVariableHolder.get();
        }
    
    }
    public class ThreadLocalVariableHolder {
    
        public static ThreadLocal<Integer> value = new ThreadLocal<Integer>() {
            private Random rand = new Random(47);
            protected synchronized Integer initialValue() {
                return rand.nextInt(10000);
            }
        };
        public static void increment() {
            value.set(value.get()+1);
        }
        public static int get() {return value.get();}
    
        public static void main(String[] args) throws InterruptedException {
            // TODO Auto-generated method stub
            ExecutorService exec = Executors.newCachedThreadPool();
            for(int i=0;i<5;i++) {
                exec.execute(new Accessor(i));
            }
            TimeUnit.MILLISECONDS.sleep(4);
            exec.shutdown();
        }
    
    
    }
    

    上述的代码中,每个任务都似乎在独立的计数,彼此不受影响。这是因为每个单独的线程都被分配了自己的存储空间。

    相关文章

      网友评论

          本文标题:Thinking in java 之并发其二:资源共享

          本文链接:https://www.haomeiwen.com/subject/uxblzftx.html