美文网首页
JAVA_Concurrent

JAVA_Concurrent

作者: 激励上善若水 | 来源:发表于2017-11-28 11:26 被阅读3次

    切面编程

    面向切面编程,也可以说是面向方面编程

    • 定义
      所谓切面就是说贯穿到系统的各个模块中,系统的一个功能就是一个方面(切面)。比如日志系统,权限检查,统一的异常检查等
    • 好处
      可以动态的添加和删除在切面上的逻辑而不影响原来的执行代码。
    • 使用
      把功能模块对应的对象作为切面嵌入到原来的各个系统模块中,采用代理技术,代理调用目标,同时把切面对象加入进来。

    线程并发

    对java.util.concurrent包中的类进行总结

    • threadLocal

      1. 本质就是一个HashMap,key就是当前的线程。
      2. 存的时候是与当前线程相关,取得时候也是与当前线程相关。
    • AtomicInteger

      • 线程安全的整数类
      • AtomicLong,AtomicIntegerArray,AtomicIntegerFieldUpdater(对类中的整型数据进行原子操作)
      • 【共享变量】如果是Integer,那么可以使用该对象替换
    • 线程池

      1. 固定数量的
        Executors.newFixedThreadPool(3)
      2. 不固定数量(动态变化的)
        Executors.newCachedThreadPool()
      3. 一个线程
        • Executors.newSingleThreadExecutor();
        • 与new Thread区别:线程死了以后会自动创建一个新的,保证线程池里面始终有一个线程(如何实现线程死掉以后重新启动?)
      4. 定时器
        Executors.newScheduledThreadPool(3).schedule(runnable,delay,unit)
    • Callable&Future

    1. 一个任务的情况
      线程池去submit一个callable,callable任务可以返回一个结果,结果存在future中,然后将来某个时间从future中去拿.(前提是callable执行结束,否则会一直等待)
      • 为何不什么时候需要什么时候执行
        避免主线程做耗时操作。
      • Future取得的结果类型必须和Callable返回的结果类型一致,通过泛型实现的(泛型的使用)
    ExecutorService pool = Executors.newSingleThreadExecutor();
    Future<string> future = pool.submit(new Callable<string>() {
    
        @Override
        public String call() throws Exception {
            return "future result";
        }
    });
    try {
        System.out.print(future.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
    
    1. 一组任务的情况
      提交一组任务Callable,先结束的任务可以取得结果
    ExecutorService pool = Executors.newFixedThreadPool(10);
    CompletionService service = new ExecutorCompletionService(pool);
    for (int i = 0; i &lt; 10; i++) {
        final s = i;
        service.submit(new Callable<string>() {
    
            @Override
            public String call() throws Exception {
                return ""+s;
            }
        });
    }
    for (int i = 0; i &lt; 10; i++) {
        try {
            System.out.print(service.take().get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
    
    • 读写锁

    多个读锁不互斥
    读锁与写锁互斥
    写锁与写锁互斥

    final Queue3 queue3 = new Queue3();
    for (int i=0; i&lt;3; i++){
        new Thread(){
            @Override
            public void run() {
                super.run();
                while (true){
                    queue3.get();
                }
            }
        }.start();
        new Thread(){
            @Override
            public void run() {
                super.run();
                while (true){
                    queue3.put(new Random().nextInt(1000));
                }
            }
        }.start();
    }
    
    class Queue3 {
        private Object data = null;
        ReadWriteLock rwl = new ReentrantReadWriteLock();
    
        public void get(){
            rwl.readLock().lock();
            try {
                System.out.print("begin read");
                Thread.sleep((long) (Math.random()*1000));
                System.out.print(Thread.currentThread().getName());
                System.out.print("stop read");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                rwl.readLock().unlock();
            }
        }
    
        public void put(Object object){
            rwl.writeLock().lock();
            try {
                System.out.print("begin put");
                Thread.sleep((long) (Math.random()*1000));
                this.data = object;
                System.out.print("stop put");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                rwl.writeLock().unlock();
            }
        }
    }
    
    • Condition

    1. 类似传统线程里面的object.wait object.notify
    2. 一个锁内部可以有多个Condition,Lock和Condition可以实现可阻塞队列
    3. 存在的意义就是可以用在多线程条件下
    class BoundedBuffer {
       final Lock lock = new ReentrantLock();
       final Condition notFull  = lock.newCondition(); 
       final Condition notEmpty = lock.newCondition(); 
    
       final Object[] items = new Object[100];
       int putptr, takeptr, count;
    
       public void put(Object x) throws InterruptedException {
         lock.lock();
         try {
           while (count == items.length)
             notFull.await();
           items[putptr] = x;
           if (++putptr == items.length) putptr = 0;
           ++count;
           notEmpty.signal();
         } finally {
           lock.unlock();
         }
       }
    
       public Object take() throws InterruptedException {
         lock.lock();
         try {
           while (count == 0)
             notEmpty.await();
           Object x = items[takeptr];
           if (++takeptr == items.length) takeptr = 0;
           --count;
           notFull.signal();
           return x;
         } finally {
           lock.unlock();
         }
       }
     }
    
    • Semaphore

    1. 维护当前访问自身线程的个数,并提供同步机制
    2. 可以控制同时访问资源的线程个数
    3. 实现一个文件允许的并发访问数
    4. 单个信号灯可以实现互斥锁,这可应用于 死锁 恢复的一些场合
    5. 可以控制等待进入线程的执行顺序,构造方法参数
        public static class SemaphoreTest{
            public static void main(){
                ExecutorService executorService = Executors.newCachedThreadPool();
                final Semaphore semaphore = new Semaphore(3);
                for(int i=0 ;i&lt;10; i++){
                    Runnable runnable = new Runnable() {
                        @Override
                        public void run() {
                            try {
                                semaphore.acquire();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
    
                            System.out.print("线程进入:"+ Thread.currentThread().getName()+
                                    " 剩余信号灯:"+semaphore.availablePermits());
    
                            try {
                                Thread.sleep((long) (Math.random()*1000));
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
    
                            System.out.print("线程:"+ Thread.currentThread().getName() +"离开");
                            semaphore.release();
                        }
                    };
                    executorService.execute(runnable);
                }
            }
        }
    
    • CountDownLatch(倒计时计数器)

    1. 调用countDown方法,计数器减一,当减到0的时候,多个等待者或者单个等待者执行
    2. 可以实现一个人通知多个人(裁判通知所有运动员可以开始各就各位预备跑)也可以多个人通知一个人(所有运动员跑完全程以后,裁判才可以公布成绩)
        public static class CountDownLatchTest{
            public static void main(){
                ExecutorService executorService = Executors.newCachedThreadPool();
                //吹口哨计数器 一个人通知多个人
                final CountDownLatch countDownLatch = new CountDownLatch(1);
                //公布成绩计数器  多个人通知一个人
                final CountDownLatch countDownLatch1 = new CountDownLatch(3);
                for (int i = 0; i &lt; 3; i++) {
                    Runnable runnable = new Runnable() {
                        @Override
                        public void run() {
                            try {
                                System.out.print("线程"+Thread.currentThread().getName()+"正在准备接受命令");
                                //子线程wait
                                countDownLatch.await();
                                System.out.print("线程"+Thread.currentThread().getName()+"已接受命令");
    
                                Thread.sleep((long) (Math.random()*1000));
                                System.out.print("线程"+Thread.currentThread().getName()+"回应处理结果");
                                countDownLatch1.countDown();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    };
                    executorService.execute(runnable);
                }
    
                try {
                    System.out.print("即将发布奔跑命令");
                    //主线程去countdown
                    countDownLatch.countDown();
    
                    System.out.print("已经发布奔跑命令,等待结果");
                    countDownLatch1.await();
                    System.out.print("线程已经收到所有响应结果,裁判公布成绩");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
    • Exchanger

    1. 用于两个线程间的数据交换
    2. 类似毒品交易(第一个拿出数据的人将等待第二个人拿着数据到来时,才能进行数据交换)
    //线程1和线程2互换数据
        public static class ExchangerTest{
            public static void main(){
                ExecutorService executorService = Executors.newCachedThreadPool();
                final Exchanger exchanger = new Exchanger();
    
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            String data1 = "xiaoming";
                            System.out.print("线程"+Thread.currentThread().getName()+"正要把数据:"+data1+"换出去");
    
                            Thread.sleep((long) (Math.random()*10000));
    
                            String data2 = (String) exchanger.exchange(data1);
                            System.out.print("线程"+Thread.currentThread().getName()+"换回的数据:"+data2);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
    
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            String data1 = "xiaoli";
                            System.out.print("线程"+Thread.currentThread().getName()+"正要把数据:"+data1+"换出去");
    
                            Thread.sleep((long) (Math.random()*10000));
    
                            String data2 = (String) exchanger.exchange(data1);
                            System.out.print("线程"+Thread.currentThread().getName()+"换回的数据:"+data2);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }
    
    • BlockingQueue

    与普通队列区别:

    1. 正常队列满了以后再添加就报错了
      阻塞队列满了以后再添加会等待 阻塞住
    2. 正常队列空了以后再取数据就报错
      阻塞队列空了以后再取数据会等待 阻塞住
    3. 阻塞队列实现阻塞同步的方式很简单,使用的就是是lock锁的多条件(condition)阻塞控制
    4 组不同的方法用于插入、移除以及对队列中的元素进行检查
            final BlockingQueue queue = new ArrayBlockingQueue(3);
            for (int i=0; i&lt;2; i++){
                new Thread(){
                    @Override
                    public void run() {
                        while (true){
                            try {
                                Thread.sleep((long) (Math.random()*1000));
                                Log.i("TEST",Thread.currentThread().getName() + "准备放数据");
                                queue.put(1);
                                Log.i("TEST",Thread.currentThread().getName() + "已经放了数据"+"   队列目前有"+queue.size()+"个数据");
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }.start();
    
                new Thread(){
                    @Override
                    public void run() {
                        while (true){
                            try {
                                Thread.sleep(1000);
                                Log.i("TEST",Thread.currentThread().getName()+"准备取数据");
                                queue.take();
                                Log.i("TEST",Thread.currentThread().getName()+"已经取走数据"+"  队列目前有"+queue.size()+"个数据");
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }.start();
            }
    
    生产者消费者
    public class BlockingQueueTest {
        //生产者
        public static class Producer implements Runnable{
            private final BlockingQueue<integer> blockingQueue;
            private volatile boolean flag;
            private Random random;
    
            public Producer(BlockingQueue<integer> blockingQueue) {
                this.blockingQueue = blockingQueue;
                flag=false;
                random=new Random();
    
            }
            public void run() {
                while(!flag){
                    int info=random.nextInt(100);
                    try {
                        blockingQueue.put(info);
                        System.out.println(Thread.currentThread().getName()+" produce "+info);
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }               
                }
            }
            public void shutDown(){
                flag=true;
            }
        }
        //消费者
        public static class Consumer implements Runnable{
            private final BlockingQueue<integer> blockingQueue;
            private volatile boolean flag;
            public Consumer(BlockingQueue<integer> blockingQueue) {
                this.blockingQueue = blockingQueue;
            }
            public void run() {
                while(!flag){
                    int info;
                    try {
                        info = blockingQueue.take();
                        System.out.println(Thread.currentThread().getName()+" consumer "+info);
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }               
                }
            }
            public void shutDown(){
                flag=true;
            }
        }
        public static void main(String[] args){
            BlockingQueue<integer> blockingQueue = new LinkedBlockingQueue<integer>(10);
            Producer producer=new Producer(blockingQueue);
            Consumer consumer=new Consumer(blockingQueue);
            //创建5个生产者,5个消费者
            for(int i=0;i&lt;10;i++){
                if(i&lt;5){
                    new Thread(producer,"producer"+i).start();
                }else{
                    new Thread(consumer,"consumer"+(i-5)).start();
                }
            }
    
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            producer.shutDown();
            consumer.shutDown();
    
        }
    }
    
    主线程执行十次子任务,然后子线程执行十次子任务
    循环50次
    ====================================================
            final Business business = new Business();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int i=0;i&lt;=50;i++){
                        business.sub(i);
                    }
                }
            }).start();
    
            for (int i=0;i&lt;=50;i++){
                business.main(i);
            }
    ==============================================================
        static class Business {
            BlockingQueue<integer> blockingQueue1 = new ArrayBlockingQueue&lt;&gt;(1);
            BlockingQueue<integer> blockingQueue2 = new ArrayBlockingQueue&lt;&gt;(1);
    
            //无参构造,只要调用构造方法,都会调用该方法
            {
                try {
                    blockingQueue2.put(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            public void main(int i){
                try {
                    blockingQueue1.put(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                for (int j = 0; j &lt; 10; j++) {
                    Log.i("TEST","sub thread sequece of "+j+"  loop of "+i);
                }
                try {
                    blockingQueue2.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            public void sub(int i){
                try {
                    blockingQueue2.put(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                for (int j = 0; j &lt; 10; j++) {
                    Log.i("TEST","main thread sequece of "+j+"  loop of "+i);
                }
                try {
                    blockingQueue1.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
    • 同步集合

    • 传统方式下用Collections工具类提供的synchronizedCollection方法获取同步集合

      • synchronizedCollection 通过代理模式创建一个同步集合继承自map,实现map的增删改实际是调用真实的map的增删改,只是增加synchronized关键字,以实现同步效果。
    • java.util.concurrent下

      • ConcurrentHashMap
      • CopyOnWriteArrayList
      • CopyOnWriteArraySet
    • 传统集合迭代过程中不可以修改集合会抛出ConcurrentModificationException,可以使用同步集合

    • sample

    实现效果:十个线程顺序执行doSome方法
    key:
    1. semaphore与lock效果相同,控制并发访问量
    2. SynchronousQueue只有其他线程取数据take,才能存数据put。
    > A  in which each insert operation must wait for a corresponding remove operation by another thread
    public class TestThread {
    
        /**
         * @param args
         */
        public static void main(String[] args) {
            
            System.out.println("begin:"+(System.currentTimeMillis()/1000));
            
    //      final Semaphore semaphore = new Semaphore(1);
            final Lock lock = new ReentrantLock();
            final SynchronousQueue<String> queue = new SynchronousQueue<String>();
            
    
            
            for (int i = 0; i < 10; i++) {
                new Thread(new Runnable() {
                    
                    public void run() {
                        try {
    //                      semaphore.acquire();
                            lock.lock();
                            System.out.println("take");
                            String output = TestDo.doSome(queue.take());
                            System.out.println(Thread.currentThread().getName() + ":"+output);
    //                      semaphore.release();
                            lock.unlock();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
            }
            
    
            for (int i = 0; i < 10; i++) {
                try {
                    System.out.println("put");
                    queue.put(i+"");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
    }
    
    class TestDo {
        public static String doSome(String input){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String output = input + ":"+ (System.currentTimeMillis()/1000);
            return output;
        }
    }
    

    相关文章

      网友评论

          本文标题:JAVA_Concurrent

          本文链接:https://www.haomeiwen.com/subject/xhysbxtx.html