美文网首页
JAVA_Concurrent

JAVA_Concurrent

作者: 激励上善若水 | 来源:发表于2017-11-28 11:26 被阅读3次

切面编程

面向切面编程,也可以说是面向方面编程

  • 定义
    所谓切面就是说贯穿到系统的各个模块中,系统的一个功能就是一个方面(切面)。比如日志系统,权限检查,统一的异常检查等
  • 好处
    可以动态的添加和删除在切面上的逻辑而不影响原来的执行代码。
  • 使用
    把功能模块对应的对象作为切面嵌入到原来的各个系统模块中,采用代理技术,代理调用目标,同时把切面对象加入进来。

线程并发

对java.util.concurrent包中的类进行总结

  • threadLocal

    1. 本质就是一个HashMap,key就是当前的线程。
    2. 存的时候是与当前线程相关,取得时候也是与当前线程相关。
  • AtomicInteger

    • 线程安全的整数类
    • AtomicLong,AtomicIntegerArray,AtomicIntegerFieldUpdater(对类中的整型数据进行原子操作)
    • 【共享变量】如果是Integer,那么可以使用该对象替换
  • 线程池

    1. 固定数量的
      Executors.newFixedThreadPool(3)
    2. 不固定数量(动态变化的)
      Executors.newCachedThreadPool()
    3. 一个线程
      • Executors.newSingleThreadExecutor();
      • 与new Thread区别:线程死了以后会自动创建一个新的,保证线程池里面始终有一个线程(如何实现线程死掉以后重新启动?)
    4. 定时器
      Executors.newScheduledThreadPool(3).schedule(runnable,delay,unit)
  • Callable&Future

  1. 一个任务的情况
    线程池去submit一个callable,callable任务可以返回一个结果,结果存在future中,然后将来某个时间从future中去拿.(前提是callable执行结束,否则会一直等待)
    • 为何不什么时候需要什么时候执行
      避免主线程做耗时操作。
    • Future取得的结果类型必须和Callable返回的结果类型一致,通过泛型实现的(泛型的使用)
ExecutorService pool = Executors.newSingleThreadExecutor();
Future<string> future = pool.submit(new Callable<string>() {

    @Override
    public String call() throws Exception {
        return "future result";
    }
});
try {
    System.out.print(future.get());
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}
  1. 一组任务的情况
    提交一组任务Callable,先结束的任务可以取得结果
ExecutorService pool = Executors.newFixedThreadPool(10);
CompletionService service = new ExecutorCompletionService(pool);
for (int i = 0; i &lt; 10; i++) {
    final s = i;
    service.submit(new Callable<string>() {

        @Override
        public String call() throws Exception {
            return ""+s;
        }
    });
}
for (int i = 0; i &lt; 10; i++) {
    try {
        System.out.print(service.take().get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}
  • 读写锁

多个读锁不互斥
读锁与写锁互斥
写锁与写锁互斥

final Queue3 queue3 = new Queue3();
for (int i=0; i&lt;3; i++){
    new Thread(){
        @Override
        public void run() {
            super.run();
            while (true){
                queue3.get();
            }
        }
    }.start();
    new Thread(){
        @Override
        public void run() {
            super.run();
            while (true){
                queue3.put(new Random().nextInt(1000));
            }
        }
    }.start();
}
class Queue3 {
    private Object data = null;
    ReadWriteLock rwl = new ReentrantReadWriteLock();

    public void get(){
        rwl.readLock().lock();
        try {
            System.out.print("begin read");
            Thread.sleep((long) (Math.random()*1000));
            System.out.print(Thread.currentThread().getName());
            System.out.print("stop read");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            rwl.readLock().unlock();
        }
    }

    public void put(Object object){
        rwl.writeLock().lock();
        try {
            System.out.print("begin put");
            Thread.sleep((long) (Math.random()*1000));
            this.data = object;
            System.out.print("stop put");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            rwl.writeLock().unlock();
        }
    }
}
  • Condition

  1. 类似传统线程里面的object.wait object.notify
  2. 一个锁内部可以有多个Condition,Lock和Condition可以实现可阻塞队列
  3. 存在的意义就是可以用在多线程条件下
class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 

   final Object[] items = new Object[100];
   int putptr, takeptr, count;

   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length)
         notFull.await();
       items[putptr] = x;
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally {
       lock.unlock();
     }
   }

   public Object take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0)
         notEmpty.await();
       Object x = items[takeptr];
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally {
       lock.unlock();
     }
   }
 }
  • Semaphore

  1. 维护当前访问自身线程的个数,并提供同步机制
  2. 可以控制同时访问资源的线程个数
  3. 实现一个文件允许的并发访问数
  4. 单个信号灯可以实现互斥锁,这可应用于 死锁 恢复的一些场合
  5. 可以控制等待进入线程的执行顺序,构造方法参数
    public static class SemaphoreTest{
        public static void main(){
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(3);
            for(int i=0 ;i&lt;10; i++){
                Runnable runnable = new Runnable() {
                    @Override
                    public void run() {
                        try {
                            semaphore.acquire();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }

                        System.out.print("线程进入:"+ Thread.currentThread().getName()+
                                " 剩余信号灯:"+semaphore.availablePermits());

                        try {
                            Thread.sleep((long) (Math.random()*1000));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }

                        System.out.print("线程:"+ Thread.currentThread().getName() +"离开");
                        semaphore.release();
                    }
                };
                executorService.execute(runnable);
            }
        }
    }
  • CountDownLatch(倒计时计数器)

  1. 调用countDown方法,计数器减一,当减到0的时候,多个等待者或者单个等待者执行
  2. 可以实现一个人通知多个人(裁判通知所有运动员可以开始各就各位预备跑)也可以多个人通知一个人(所有运动员跑完全程以后,裁判才可以公布成绩)
    public static class CountDownLatchTest{
        public static void main(){
            ExecutorService executorService = Executors.newCachedThreadPool();
            //吹口哨计数器 一个人通知多个人
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            //公布成绩计数器  多个人通知一个人
            final CountDownLatch countDownLatch1 = new CountDownLatch(3);
            for (int i = 0; i &lt; 3; i++) {
                Runnable runnable = new Runnable() {
                    @Override
                    public void run() {
                        try {
                            System.out.print("线程"+Thread.currentThread().getName()+"正在准备接受命令");
                            //子线程wait
                            countDownLatch.await();
                            System.out.print("线程"+Thread.currentThread().getName()+"已接受命令");

                            Thread.sleep((long) (Math.random()*1000));
                            System.out.print("线程"+Thread.currentThread().getName()+"回应处理结果");
                            countDownLatch1.countDown();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                };
                executorService.execute(runnable);
            }

            try {
                System.out.print("即将发布奔跑命令");
                //主线程去countdown
                countDownLatch.countDown();

                System.out.print("已经发布奔跑命令,等待结果");
                countDownLatch1.await();
                System.out.print("线程已经收到所有响应结果,裁判公布成绩");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
  • Exchanger

  1. 用于两个线程间的数据交换
  2. 类似毒品交易(第一个拿出数据的人将等待第二个人拿着数据到来时,才能进行数据交换)
//线程1和线程2互换数据
    public static class ExchangerTest{
        public static void main(){
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Exchanger exchanger = new Exchanger();

            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        String data1 = "xiaoming";
                        System.out.print("线程"+Thread.currentThread().getName()+"正要把数据:"+data1+"换出去");

                        Thread.sleep((long) (Math.random()*10000));

                        String data2 = (String) exchanger.exchange(data1);
                        System.out.print("线程"+Thread.currentThread().getName()+"换回的数据:"+data2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });

            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        String data1 = "xiaoli";
                        System.out.print("线程"+Thread.currentThread().getName()+"正要把数据:"+data1+"换出去");

                        Thread.sleep((long) (Math.random()*10000));

                        String data2 = (String) exchanger.exchange(data1);
                        System.out.print("线程"+Thread.currentThread().getName()+"换回的数据:"+data2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
  • BlockingQueue

与普通队列区别:

  1. 正常队列满了以后再添加就报错了
    阻塞队列满了以后再添加会等待 阻塞住
  2. 正常队列空了以后再取数据就报错
    阻塞队列空了以后再取数据会等待 阻塞住
  3. 阻塞队列实现阻塞同步的方式很简单,使用的就是是lock锁的多条件(condition)阻塞控制
4 组不同的方法用于插入、移除以及对队列中的元素进行检查
        final BlockingQueue queue = new ArrayBlockingQueue(3);
        for (int i=0; i&lt;2; i++){
            new Thread(){
                @Override
                public void run() {
                    while (true){
                        try {
                            Thread.sleep((long) (Math.random()*1000));
                            Log.i("TEST",Thread.currentThread().getName() + "准备放数据");
                            queue.put(1);
                            Log.i("TEST",Thread.currentThread().getName() + "已经放了数据"+"   队列目前有"+queue.size()+"个数据");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }.start();

            new Thread(){
                @Override
                public void run() {
                    while (true){
                        try {
                            Thread.sleep(1000);
                            Log.i("TEST",Thread.currentThread().getName()+"准备取数据");
                            queue.take();
                            Log.i("TEST",Thread.currentThread().getName()+"已经取走数据"+"  队列目前有"+queue.size()+"个数据");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }.start();
        }
生产者消费者
public class BlockingQueueTest {
    //生产者
    public static class Producer implements Runnable{
        private final BlockingQueue<integer> blockingQueue;
        private volatile boolean flag;
        private Random random;

        public Producer(BlockingQueue<integer> blockingQueue) {
            this.blockingQueue = blockingQueue;
            flag=false;
            random=new Random();

        }
        public void run() {
            while(!flag){
                int info=random.nextInt(100);
                try {
                    blockingQueue.put(info);
                    System.out.println(Thread.currentThread().getName()+" produce "+info);
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }               
            }
        }
        public void shutDown(){
            flag=true;
        }
    }
    //消费者
    public static class Consumer implements Runnable{
        private final BlockingQueue<integer> blockingQueue;
        private volatile boolean flag;
        public Consumer(BlockingQueue<integer> blockingQueue) {
            this.blockingQueue = blockingQueue;
        }
        public void run() {
            while(!flag){
                int info;
                try {
                    info = blockingQueue.take();
                    System.out.println(Thread.currentThread().getName()+" consumer "+info);
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }               
            }
        }
        public void shutDown(){
            flag=true;
        }
    }
    public static void main(String[] args){
        BlockingQueue<integer> blockingQueue = new LinkedBlockingQueue<integer>(10);
        Producer producer=new Producer(blockingQueue);
        Consumer consumer=new Consumer(blockingQueue);
        //创建5个生产者,5个消费者
        for(int i=0;i&lt;10;i++){
            if(i&lt;5){
                new Thread(producer,"producer"+i).start();
            }else{
                new Thread(consumer,"consumer"+(i-5)).start();
            }
        }

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        producer.shutDown();
        consumer.shutDown();

    }
}
主线程执行十次子任务,然后子线程执行十次子任务
循环50次
====================================================
        final Business business = new Business();
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i=0;i&lt;=50;i++){
                    business.sub(i);
                }
            }
        }).start();

        for (int i=0;i&lt;=50;i++){
            business.main(i);
        }
==============================================================
    static class Business {
        BlockingQueue<integer> blockingQueue1 = new ArrayBlockingQueue&lt;&gt;(1);
        BlockingQueue<integer> blockingQueue2 = new ArrayBlockingQueue&lt;&gt;(1);

        //无参构造,只要调用构造方法,都会调用该方法
        {
            try {
                blockingQueue2.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public void main(int i){
            try {
                blockingQueue1.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            for (int j = 0; j &lt; 10; j++) {
                Log.i("TEST","sub thread sequece of "+j+"  loop of "+i);
            }
            try {
                blockingQueue2.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public void sub(int i){
            try {
                blockingQueue2.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            for (int j = 0; j &lt; 10; j++) {
                Log.i("TEST","main thread sequece of "+j+"  loop of "+i);
            }
            try {
                blockingQueue1.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
  • 同步集合

  • 传统方式下用Collections工具类提供的synchronizedCollection方法获取同步集合

    • synchronizedCollection 通过代理模式创建一个同步集合继承自map,实现map的增删改实际是调用真实的map的增删改,只是增加synchronized关键字,以实现同步效果。
  • java.util.concurrent下

    • ConcurrentHashMap
    • CopyOnWriteArrayList
    • CopyOnWriteArraySet
  • 传统集合迭代过程中不可以修改集合会抛出ConcurrentModificationException,可以使用同步集合

  • sample

实现效果:十个线程顺序执行doSome方法
key:
1. semaphore与lock效果相同,控制并发访问量
2. SynchronousQueue只有其他线程取数据take,才能存数据put。
> A  in which each insert operation must wait for a corresponding remove operation by another thread
public class TestThread {

    /**
     * @param args
     */
    public static void main(String[] args) {
        
        System.out.println("begin:"+(System.currentTimeMillis()/1000));
        
//      final Semaphore semaphore = new Semaphore(1);
        final Lock lock = new ReentrantLock();
        final SynchronousQueue<String> queue = new SynchronousQueue<String>();
        

        
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                
                public void run() {
                    try {
//                      semaphore.acquire();
                        lock.lock();
                        System.out.println("take");
                        String output = TestDo.doSome(queue.take());
                        System.out.println(Thread.currentThread().getName() + ":"+output);
//                      semaphore.release();
                        lock.unlock();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
        

        for (int i = 0; i < 10; i++) {
            try {
                System.out.println("put");
                queue.put(i+"");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

class TestDo {
    public static String doSome(String input){
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String output = input + ":"+ (System.currentTimeMillis()/1000);
        return output;
    }
}

相关文章

  • JAVA_Concurrent

    切面编程 面向切面编程,也可以说是面向方面编程 定义所谓切面就是说贯穿到系统的各个模块中,系统的一个功能就是一个方...

网友评论

      本文标题:JAVA_Concurrent

      本文链接:https://www.haomeiwen.com/subject/xhysbxtx.html