美文网首页并发编程
多线程设计模式:第二篇 - 四种基础模式

多线程设计模式:第二篇 - 四种基础模式

作者: 张angang强吖 | 来源:发表于2018-10-10 14:39 被阅读0次

    一,单线程模式

             单线程模式是指在临界区域内,同一时间内只允许一个线程执行处理。

             下面的代码示例使三个人频繁的通过一道门,当经过门的时候记录通行者的姓名和出生地,同时增加已通过门的人数。

    /**
     * @author koma <komazhang@foxmail.com>
     * @date 2018-10-15
     */
    public class Main {
        public static void main(String[] args) {
            Gate gate = new Gate();
            new UserThread(gate, "Alice", "Alas").start();
            new UserThread(gate, "Bobby", "Brazli").start();
            new UserThread(gate, "Chirs", "Canda").start();
        }
    }
    
    public class Gate {
        private int counter = 0;
        private String name = "Nobody";
        private String address = "Nowhere";
    
        public void pass(String name, String address) {
            this.counter++;
            this.name = name;
            this.address = address;
            check();
        }
    
        public String toString() {
            return "No."+counter+": "+name+", "+address;
        }
    
        private void check() {
            if (name.charAt(0) != address.charAt(0)) {
                System.out.println("******* BROKEN ********"+toString());
            }
        }
    }
    
    public class UserThread extends Thread {
        private final Gate gate;
        private final String myname;
        private final String myaddress;
    
        public UserThread(Gate gate, String name, String address) {
            this.gate = gate;
            this.myname = name;
            this.myaddress = address;
        }
    
        @Override
        public void run() {
            System.out.println(myname+" BEGIN");
            while (true) {
                gate.pass(myname, myaddress);
            }
        }
    }
    

             运行上面的示例程序会发现,程序执行混乱,主要原因是因为 Gate 类作为共享资源,在多线程环境下是非线程安全的,pass() 和 toString() 方法作为程序的临界区,在多线程环境下时可以被多个线程同时调用执行,属于非线程安全方法。

             对于非线程安全的方法,在同时被多个线程调用执行时,实例的状态就会发生混乱,这时就应该保护该方法,使其不能够被多个线程同时调用,这时就需要用到单线程模式,即同一时刻只能允许一个线程调用执行。在 Java 中,通过给非线程安全方法加上 synchronized 声明来进行保护。修改后代码如下:

    public synchronized void pass(String name, String address) {
        this.counter++;
        this.name = name;
        this.address = address;
        check();
    }
    
    public synchronized String toString() {
        return "No."+counter+": "+name+", "+address;
    }
    

             对于 check() 方法不需要保护的原因是,首先 check() 是一个私有方法,不能够被外部实例随意调用,其次调用 check() 方法的 pass() 方法已经被保护起来,因此 check() 方法就无需再保护,但是由于 synchronized 的锁是可重入的,因此即使给 check() 方法加上 synchronized 声明,也不影响程序运行结果。

    1,生存性和死锁

             多线程程序评价标准中最重要的是线程安全性和生存性。单线程模式保证了线程的安全性,但是当使用单线程模式时,如果稍不注意则会使程序发生死锁,从而让程序失去生存性。

             在单线程模式中,当满足下列条件时,就会发生死锁:

    • 存在多个共享资源
    • 线程在持有某个资源的锁时,还需要获取其它资源的锁
    • 获取共享资源的锁的顺序不固定

    2,性能

             使用单线程模式会显著降低程序性能,主要原因是在进入 synchronized 方法时需要先获取实例的锁,而获取锁是需要时间的,如果需要获取多个资源的锁,则耗费的时间会更长。获取锁时,如果有其它线程正在持有锁,那么会产生线程冲突,当发生线程冲突时,程序的整体性能会随着线程等待时间的增加而下降。

    3,synchronized 与 Before/After 模式

             不管是 synchronized 方法还是 synchronized 代码块,都可以看作是在 "{" 处获取锁,在 "}" 处释放锁。这与显式的获取锁,释放锁的程序存在很大的区别,如下:

    method() {
        lock();
        //do method
        unlock();
    }
    

             这种显式的方式会根据 //do method 部分或者执行 return,或者抛出异常而导致锁无法被释放,而 synchronized 则会保证锁一定会被释放。要想让显式的锁操作达到和 synchronized 同样的效果,则需要使用到一种简单的 Before/After 模式实现,如下:

    method() {
        lock();
        try{
            //do method
        } finally {
            unlock();
        }
    }
    

             由于 Java 规范保证了 finally 部分一定会被执行,从而可以保证锁一定会被释放。

    4,synchronized 与 原子操作

             synchronized 方法只允许一个线程同时执行,从线程的角度来看,这个方法的执行是不可被分割的,这种不可分割的操作,通常称为原子(Atomic)操作。

             Java 规范定义了一些原子操作,如,char,int 等基本类型的操作,对象引用类型的赋值和引用等。因此对于这类操作,无需加上 synchronized 关键字。但是对于 long 和 double 类型的引用赋值则不是原子操作,因此就必须使用单线程模式,最简单的方法就是在 synchronized 方法中执行操作。另外一种方法是在该类字段声明上加上 volatile 关键字,加上 volatile 关键字之后,对该字段的操作就是原子的了。

             一般情况下对于 long 和 double 操作,Java虚拟机也将其视为原子操作,但是这仅仅是虚拟机的实现,对于有些虚拟机可能并没有实现,因此对于 long 和 double 操作,在多线程环境下,通常建议加上 volatile 关键字声明或者使用 java 提供的 atomic 包。

    5,计数信号量

             单线程模式用于确保临界资源同一时刻只能由一个线程使用,那么如果想实现在同一时刻只能有 N 个线程使用时,这时就可以考虑使用计数信号量来实现。

             Java juc包中提供了用于表示计数信号量的 Semaphore 类。该类中 acquire() 方法用于确保存在可用资源,当存在可用资源时,线程会立即返回,同时内部信号量减1,当无可用资源时,线程则阻塞,直到有可用资源出现。release() 方法用于释放资源,释放资源后,内部信号量加1,同时会唤醒一个等待在 acquire() 方法上的线程。

             下面的示例程序演示了10个线程去争抢3个资源的情形,在同一时刻只能有3个线程使用共享资源,其它线程则需要等待,代码如下:

    /**
     * @author koma <komazhang@foxmail.com>
     * @date 2018-10-15
     */
    public class TestCounter {
        public static void main(String[] args) {
            new TestCounter().run();
        }
    
        public void run() {
            //创建三个资源,同一个时刻只能有三个线程同时使用
            BoundedResource resource = new BoundedResource(3);
    
            //创建10个线程去争抢资源
            for (int i = 0; i < 10; i++) {
                new UseThread(resource).start();
            }
        }
    
        class UseThread extends Thread {
            private final BoundedResource resource;
            private final Random random = new Random();
    
            public UseThread(BoundedResource resource) {
                this.resource = resource;
            }
    
            @Override
            public void run() {
                while (true) {
                    try {
                        resource.use();
                        Thread.sleep(random.nextInt(3000));
                    } catch (InterruptedException e) {
                    }
                }
            }
        }
    
        class BoundedResource {
            private final Semaphore semaphore;
            private final int permits;
            private final Random random = new Random();
    
            public BoundedResource(int permits) {
                this.permits = permits;
                this.semaphore = new Semaphore(permits);
            }
    
            public void use() throws InterruptedException {
                semaphore.acquire(); //申请资源
                try {
                    System.out.println("BEGIN: used = "+(permits-semaphore.availablePermits()));
                    Thread.sleep(random.nextInt(500));
                    System.out.println("END: used = "+(permits-semaphore.availablePermits()));
                } finally {
                    semaphore.release(); //使用 Before/After 模式保证资源一定会被释放
                }
            }
        }
    }
    

    二,不可变模式

            不可变模式是指在该模式中存在可以确保类实例的状态一定不发生变化的类,多线程环境下在访问这些不可变的实例的时候,不需要执行耗时的互斥处理,从而提高程序的性能。如下代码示例,Person 类即是一个遵循不可变模式的不可变类。

    /**
     * @author koma <komazhang@foxmail.com>
     * @date 2018-10-15
     */
    public final class Person {
        private final String name;
        private final String address;
    
        public Person(String name, String address) {
            this.name = name;
            this.address = address;
        }
    
        public String getName() {
            return this.name;
        }
    
        public String getAddress() {
            return this.address;
        }
    
        public String toString() {
            return "[Person: name = "+name+", address = "+address+"]";
        }
    }
    

            在该类中,类被声明为 final 类型,从而确保该类没有子类,类成员被声明为 private 确保类成员不能在类外部被修改,而类方法中只有 getter 方法,也确保了通过该类也不能够修改类成员内容,而类成员同样也声明为 final 且在构造方法中赋值,则类成员内容在类实例化之后即不可能再被修改。以上种种措施,都是为了保证 Person 类的不可变性。那么在多线程环境下使用该类时,就可以省去多线程互斥处理,从而提供程序性能。

    1,不可变模式的应用场景

    • 实例被创建后,状态将不再发生变化
              实例的状态是由字段的值决定的,因此将字段声明为 final 且不存在 setter 方法是必要措施,但是这还不够充分,因为即使字段的值不变,字段所引用的实例也有可能发生变化。
    • 实例是共享的,且被频繁访问
              不可变模式的有点是不再需要 synchronized 保护,这就意味着能够在不失去安全性和生存性的前提下提高程序性能。

    2,可变类和不可变类

            不可变类的使用场景比较微妙,因为大部分的类可能都需要使用 setter 方法,这时我们可以重新审视该类,看是否能够把类拆分成一个可变类和一个不可变类,然后再设计成通过可变类可以创建不可变类,反过来通过不可变类也可以创建可变类,这样在不可变类中就可以应用不可变模式了。Java 中使用这种设计方法的经典示例就是 String 类和 StringBuffer 类。

    3,集合类和多线程

            Java中提供了常见的集合操作类,这些类大部分都是非线程安全的,因此,在多线程环境下使用集合类时一定要确定集合类的线程安全性。

    三,守护-等待模式

            守护等待模式是通过让线程等待来保证实例的安全性,即如果现在执行处理会造成问题,那么就让线程进行等待。

            下面的示例代码实现了一个简单的线程间通信,Client 线程会将请求的实例传递给 Server 线程,当 Server 线程试图获取请求实例时,如果当前还没有可以的请求实例,那么 Server 线程会进行等待,如下:

    /**
     * @author koma <komazhang@foxmail.com>
     * @date 2018-10-15
     */
    public class Main {
        public static void main(String[] args) {
            RequestQueue queue = new RequestQueue();
            new ClientThread(queue, "Alice", 3141592L).start();
            new ServerThread(queue, "Bobby", 6535897L).start();
        }
    }
    
    public class Request {
        private final String name;
    
        public Request(String name) {
            this.name = name;
        }
    
        public String getName() {
            return this.name;
        }
    
        @Override
        public String toString() {
            return "[ Request "+name+" ]";
        }
    }
    
    public class RequestQueue {
        private final Queue<Request> queue = new LinkedList<>();
    
        public synchronized Request getRequest() {
            while (queue.peek() == null) { //当没有可用的请求实例时等待
                try {
                    wait();
                } catch (InterruptedException e) {
                }
            }
            return queue.remove();
        }
    
        public synchronized void putRequest(Request request) {
            queue.offer(request);
            notifyAll(); //唤醒等待的线程
        }
    }
    
    public class ServerThread extends Thread {
        private final Random random;
        private final RequestQueue queue;
    
        public ServerThread(RequestQueue queue, String name, Long seed) {
            super(name);
            this.random = new Random(seed);
            this.queue = queue;
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                Request request = queue.getRequest();
                System.out.println(Thread.currentThread().getName()+" handles "+request);
                try {
                    Thread.sleep(random.nextInt(1000));
                } catch (InterruptedException e) {
                }
            }
        }
    }
    
    public class ServerThread extends Thread {
        private final Random random;
        private final RequestQueue queue;
    
        public ServerThread(RequestQueue queue, String name, Long seed) {
            super(name);
            this.random = new Random(seed);
            this.queue = queue;
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                Request request = queue.getRequest();
                System.out.println(Thread.currentThread().getName()+" handles "+request);
                try {
                    Thread.sleep(random.nextInt(1000));
                } catch (InterruptedException e) {
                }
            }
        }
    }
    

            通过上述代码可以知道,守护-等待模式主要利用了线程的通知-等待机制。守护-等待模式中的主要角色是一个持有被守护方法的类,进入到该方法中的线程是否要等待取决于守护条件。

            上述示例代码使用 LinkedList 类实现了 RequestQueue 类,实际上 Java 在 juc 包中提供了与该类功能类似的一个类,那就是 LinkedBlockingQueue,由于该类内部已经实现了 wait() 和 notify() 机制,因此使用该类可以简化 RequestQueue 的实现,如下:

    public class RequestQueue {
        private final BlockingQueue<Request> queue = new LinkedBlockingQueue<>();
    
        public Request getRequest() {
            Request request = null;
            try {
                request = queue.take(); //取出队首元素,为空时 wait()
            } catch (InterruptedException e) {
            }
            return request;
        }
    
        public void putRequest(Request request) {
            try {
                queue.put(request); //向队尾添加元素,并唤醒等待的线程
            } catch (InterruptedException e) {
            }
        }
    }
    

    四,停止-返回模式

            停止-返回模式是说,如果现在不适合执行这个操作,那么就直接返回。

            停止-返回模式的重点是当操作的守护条件不允许执行时直接返回,而非等待。例如下面的示例代码,ChangerThread 会不定期的修改 Data,同时保存,同时后台也会运行一个 SaverThread,该线程会定时检查 Data 的修改是否保存,如果已经保存则不做任何操作,直接返回,如果还未保存,则执行保存。这有点儿类似于我们常见的文档自动保存功能,如下:

    /**
     * @author koma <komazhang@foxmail.com>
     * @date 2018-10-15
     */
    public class Main {
        public static void main(String[] args) {
            Data data = new Data("data.txt", "(empty)");
            new ChangerThread("ChangerThread", data).start();
            new SaverThread("SaverThread", data).start();
        }
    }
    
    public class Data {
        private final String filename;
        private String content;
        private boolean changed;
    
        public Data(String filename, String content) {
            this.filename = filename;
            this.content = content;
            this.changed = true;
        }
    
        public synchronized void change(String newContent) {
            content = newContent;
            changed = true;
        }
    
        public synchronized void save() throws IOException {
            if (!changed) {
                return;
            }
    
            doSave();
            changed = false;
        }
    
        private void doSave() throws IOException {
            System.out.println(Thread.currentThread().getName()+" calls doSave, content = "+content);
            Writer writer = new FileWriter(filename);
            writer.write(content);
            writer.close();
        }
    }
    
    public class ChangerThread extends Thread {
        private final Data data;
        private final Random random = new Random();
    
        public ChangerThread(String name, Data data) {
            super(name);
            this.data = data;
        }
    
        @Override
        public void run() {
            try {
                for (int i = 0; true; i++) {
                    data.change("No."+i);
                    Thread.sleep(random.nextInt(1000));
                    data.save();
                }
            } catch (InterruptedException e) {
            } catch (IOException e) {
            }
        }
    }
    
    public class SaverThread extends Thread {
        private final Data data;
    
        public SaverThread(String name, Data data) {
            super(name);
            this.data = data;
        }
    
        @Override
        public void run() {
            try {
                while (true) {
                    data.save();
                    Thread.sleep(1000);
                }
            } catch (IOException e) {
            } catch (InterruptedException e) {
            }
        }
    }
    

            示例代码的重点是在 save() 方法中,当我们发现 changed 为 true 即数据修改已经保存之后,线程立即返回而不是等待,这就是停止-返回模式和守护-等待模式的不同,也是其特点所在。基于该特点,那么停止-等待模式的应用场景可以举例如下:

    • 并不需要执行时,就像示例程序一样
    • 当守护条件第一次成立时,例如下面这个在多线程环境下永远都只初始化一次的类。
    public class InitTest {
        private boolean inited = false;
    
        public synchronized void init() {
            if (inited) { //当类已经被初始化过时,不做任何操作,这里不能使用 wait()
                return;
            }
            //do init
            inited = true;
        }
    }
    

    相关文章

      网友评论

        本文标题:多线程设计模式:第二篇 - 四种基础模式

        本文链接:https://www.haomeiwen.com/subject/afbpaftx.html