美文网首页
java并发编程

java并发编程

作者: 持而盈 | 来源:发表于2017-02-10 17:06 被阅读93次

    简介

    几年前在网上看了些资料,系统的梳理了下java并发编程,部分代码抄自网络.

    synchronized 同步锁

    锁竞争原理

    syncchronized 关键字修饰方法,代码块. 为对象加锁. 需要注意:

    • 多个线程访问一个对象,竞争一把锁,没有得到锁的,会一直尝试竞争锁, 这里有个锁竞争问题.
    • 这是一把对象锁, 一个对象一把锁,多个对象他们有自己的锁,互不相关. 多个线程访问多个对象,锁 互不相关.
    • 这把对象锁是对象唯一的, 会锁住所有 synchronized 方法或代码块.

    脏读

    产生原因:
    对于A对象的p属性: p属性有setter和getter方法
    如果某线程正在setter,可能需要花费时间.而此时其他线程读取了属性,就存在脏读.

    解决方法:
    setter getter同时加锁.
    只有设置完毕,才可以取值.

    volatil 关键字

    用于修饰变量,让该变量在多个线程之间可见.

    从JDK1.5以后为了提升多线程运行效率,多线程会从从内存中拷贝一份变量,放进自己的内存区中,即线程内存和主内存是隔离的.这会造成问题:
    在线程执行过程中,主内存中的变量改变,线程却无法感知.
    volatil 修饰变量后,当主内存变量改变,线程会读取并同步到线程内存中.

    线程通信

    wait/notify

    使用方法:

    • 必须在synchronized中使用,才有意义.
    • wait会阻塞当前线程并且释放锁.
    • notify会唤醒被wait阻塞的线程,但不释放锁. 也就是说被唤醒的线程若不能得到锁,依然不能执行.

    下面是一个多线程队列代码:
    队列数量固定,若写满,则put()阻塞.
    若取尽,则get()阻塞

    package com.bjsxt.base.conn009;
    
    import java.util.LinkedList;
    import java.util.concurrent.atomic.AtomicInteger;
    /**
     * 模拟Queue
     * @author alienware
     *
     */
    public class MyQueue {
    
        private final LinkedList<Object> list = new LinkedList<Object>();
        
        private final AtomicInteger count = new AtomicInteger(0);
        
        private final int maxSize;
        private final int minSize = 0;
        
        private final Object lock = new Object();
        
        public MyQueue (int maxSize){
            this.maxSize = maxSize;
        }
    
        public void put (Object obj) {
            synchronized(lock){
                while(count.get() == maxSize){
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                list.add(obj);
                count.getAndIncrement();
                System.out.println(" 元素 " + obj + " 被添加 ");
                lock.notify();
                
            }
        }
        
        public Object take(){
            Object temp = null;
            synchronized (lock) {
                while(count.get() == minSize){
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                count.getAndDecrement();
                temp = list.removeFirst();
                System.out.println(" 元素 " + temp + " 被消费 ");
                lock.notify();
            }
            return temp;
        }
        
        public int size(){
            return count.get();
        }
        
        
        public static void main(String[] args) throws Exception {
            
            final MyQueue m = new MyQueue(5);
            m.put("a");
            m.put("b");
            m.put("c");
            m.put("d");
            m.put("e");
            System.out.println("当前元素个数:" + m.size());
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    m.put("h");
                    m.put("i");
                }
            }, "t1");
            
            Thread t2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000);
                        Object t1 = m.take();
                        //System.out.println("被取走的元素为:" + t1);
                        Thread.sleep(1000);
                        Object t2 = m.take();
                        //System.out.println("被取走的元素为:" + t2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "t2");
    
            t1.start();
            Thread.sleep(1000);
            t2.start();
            
        }
        
        
        
    }
    
    

    ThreadLocal 线程局部变量

    概念:
    线程局部变量,ThreadLocal完全不提供锁,而是用空间换时间的手段,为每一个线程提供变量的独立副本,以保障线程安全.
    在一定情况下,使用ThreadLocal可以减少锁竞争.

    一个例子:

    package com.bjsxt.base.conn010;
    
    public class ConnThreadLocal {
    
        public static ThreadLocal<String> th = new ThreadLocal<String>();
        
        public void setTh(String value){
            th.set(value);
        }
        public void getTh(){
            System.out.println(Thread.currentThread().getName() + ":" + this.th.get());
        }
        
        public static void main(String[] args) throws InterruptedException {
            
            final ConnThreadLocal ct = new ConnThreadLocal();
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    ct.setTh("张三");
                    ct.getTh();
                }
            }, "t1");
            
            Thread t2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000);
                        ct.setTh("李四");
                        ct.getTh();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "t2");
            
            t1.start();
            t2.start();
        }
        
    }
    
    

    单例 & 多线程

    在多线程中,考虑到性能和线程安全,一般使用以下2种经典的单例模式

    • double check instance
    • static inner class

    static inner class :

    package com.bjsxt.base.conn011;
    
    public class InnerSingleton {
        
        private static class Singletion {
            private static Singletion single = new Singletion();
        }
        
        public static Singletion getInstance(){
            return Singletion.single;
        }
        
    }
    
    

    double check instance:

    package com.bjsxt.base.conn011;
    
    public class DubbleSingleton {
    
        private static DubbleSingleton ds;
        
        public static DubbleSingleton getDs(){
            if(ds == null){
                try {
                    //模拟初始化对象的准备时间...
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (DubbleSingleton.class) {
                    //这里要再次检查非空,避免多线程同时创建多实例.
                    if(ds == null){
                        ds = new DubbleSingleton();
                    }
                }
            }
            return ds;
        }
        
        public static void main(String[] args) {
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println(DubbleSingleton.getDs().hashCode());
                }
            },"t1");
            Thread t2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println(DubbleSingleton.getDs().hashCode());
                }
            },"t2");
            Thread t3 = new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println(DubbleSingleton.getDs().hashCode());
                }
            },"t3");
            
            t1.start();
            t2.start();
            t3.start();
        }
        
    }
    
    

    同步类容器

    同步类容器都是线程安全的:
    古老的有: Vector, HashTable
    推荐使用: Collections.SynchronizedXXX

    实现: 底层无非是用synchronized 关键字对每个公用方法进行同步.

    缺陷: 每次只能一个线程操作容器,无法并发.

    并发类容器

    ConcurrentMap 此接口有2个重要的实现:
    ConcurrentHashMap
    ConcurrentSkipListMap (支持并发排序,弥补ConcurrentHashMap)

    原理:
    ConcurrentHashMap内部使用段(Segment)来表示不同的部分,其实每个段就是一个小的HashTable,他们都有自己的锁.
    只要多个操作修改发生在不同的段上,他们就可以并发进行.
    把一个整体分成了16个段,也就是说最高可以支持16个线程并发修改操作.
    这是通过减小锁的粒度,从而降低竞争的一种方案.

    package com.bjsxt.base.coll013;
    
    import java.util.Iterator;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.CopyOnWriteArrayList;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    
    public class UseConcurrentMap {
    
        public static void main(String[] args) {
            ConcurrentHashMap<String, Object> chm = new ConcurrentHashMap<String, Object>();
            chm.put("k1", "v1");
            chm.put("k2", "v2");
            chm.put("k3", "v3");
            chm.putIfAbsent("k4", "vvvv");
            //System.out.println(chm.get("k2"));
            //System.out.println(chm.size());
            
            for(Map.Entry<String, Object> me : chm.entrySet()){
                System.out.println("key:" + me.getKey() + ",value:" + me.getValue());
            }
            
            
            
        }
    }
    
    

    Copy-On-Write容器

    简称COW JDK中COW有2种:
    CopyOnWriteArrayList
    CopyOnWriteArraySet

    原理:
    即写时复制,当我们往容器中添加一个元素时,不直接向容器添加,而是将原容器拷贝,复制一个新容器,然后修改新容器.
    修改完后,将元容器的引用指向新容器.
    这样做的好处是:可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素.
    所以Copy-On-Write容器是一种读写分离的思想,读和写不同的容器.
    Copy-On-Write容器的写是有同步的.

    package com.bjsxt.base.coll013;
    
    import java.util.concurrent.CopyOnWriteArrayList;
    import java.util.concurrent.CopyOnWriteArraySet;
    
    public class UseCopyOnWrite {
    
        public static void main(String[] args) {
            
            CopyOnWriteArrayList<String> cwal = new CopyOnWriteArrayList<String>();
            CopyOnWriteArraySet<String> cwas = new CopyOnWriteArraySet<String>();
            
            
        }
    }
    
    

    并发 Queue

    java并发队列,主要分为2种:

    • ConcurrentLinkedQueue 非阻塞并发队列
    • BlockingQueue(这是个接口) 阻塞队列

    ConcurrentLinkedQueue 非阻塞并发队列

    这是一个高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能.
    通常ConcurrentLinkedQueue 性能好于 BlockingQueue.
    它是一个基于链接节点的无界线程安全队列.
    遵循先进先出,就想排队一样.

    重要方法:
    add() offer() 都是加入元素的方法,在本类中无区别.
    poll() peek() 都是取头元素节点,区别在于前者会删除元素,后者不会.

    BlockingQueue 阻塞队列

    主要有几种:

    ArrayBlockingQueue 有界队列

    基于数组的阻塞队列实现,在ArrayBlockingQueue内部维护了一个定长数组,以便缓存队列中的数据对象.
    其内部没有实现读写分离,也就意味着,生产也消费不能完全并行,长度是需要定义的.
    当队列写满时,所有写入者会被阻塞,直到能够写入.
    可以指定先进先出或者先进后出.
    也叫有界队列.

    LinkedBlockingQueue 无界队列

    基于链表的阻塞队列,同ArrayBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成).
    LinkedBlockingQueue之所以能够高效的处理并发数据,是因为其内部采用分离锁(读写分离2个锁),从而实现生产和消费完全并行.
    这是一个无界队列

    package com.bjsxt.base.coll013;
    
    import java.util.concurrent.LinkedBlockingDeque;
    
    public class UseDeque {
    
        public static void main(String[] args) {
            
            LinkedBlockingDeque<String> dq = new LinkedBlockingDeque<String>(10);
            dq.addFirst("a");
            dq.addFirst("b");
            dq.addFirst("c");
            dq.addFirst("d");
            dq.addFirst("e");
            dq.addLast("f");
            dq.addLast("g");
            dq.addLast("h");
            dq.addLast("i");
            dq.addLast("j");
            //dq.offerFirst("k");
            System.out.println("查看头元素:" + dq.peekFirst());
            System.out.println("获取尾元素:" + dq.pollLast());
            Object [] objs = dq.toArray();
            for (int i = 0; i < objs.length; i++) {
                System.out.println(objs[i]);
            }
            
        }
    }
    
    

    SynchronousQueue 无缓冲队列

    没有缓冲的队列,生产者产生的数据会被消费者直接获取.
    类似于Go语言中的channel通信.
    在add() 之前必须有一个线程处于take() 状态,否则会报错.

    ProrityBlockingQueue 基于优先级的阻塞队列

    优先级判断依据:通过构造函数传入的Compator对象(即实现了Comparable接口的对象,通常都是队列数据本身).
    也就是说传入队列的对象必须实现Comparable接口.
    在实现ProrityBlockingQueue时,内部控制线程同步的锁是公平锁.
    他也是一个无界队列.

    例子:

    UsePriorityBlockingQueue.java 主程序

    package com.bjsxt.base.coll013;
    
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.Iterator;
    import java.util.List;
    import java.util.concurrent.PriorityBlockingQueue;
    
    public class UsePriorityBlockingQueue {
    
        
        public static void main(String[] args) throws Exception{
            
            
            PriorityBlockingQueue<Task> q = new PriorityBlockingQueue<Task>();
            
            Task t1 = new Task();
            t1.setId(3);
            t1.setName("id为3");
            Task t2 = new Task();
            t2.setId(4);
            t2.setName("id为4");
            Task t3 = new Task();
            t3.setId(1);
            t3.setName("id为1");
            
            //return this.id > task.id ? 1 : 0;
            q.add(t1);  //3
            q.add(t2);  //4
            q.add(t3);  //1
            
            // 1 3 4
            System.out.println("容器:" + q);
            System.out.println(q.take().getId());
            System.out.println("容器:" + q);
    //      System.out.println(q.take().getId());
    //      System.out.println(q.take().getId());
            
    
            
        }
    }
    
    

    Task.java 任务类

    package com.bjsxt.base.coll013;
    
    public class Task implements Comparable<Task>{
        
        private int id ;
        private String name;
        public int getId() {
            return id;
        }
        public void setId(int id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        
        @Override
        public int compareTo(Task task) {
            return this.id > task.id ? 1 : (this.id < task.id ? -1 : 0);  
        }
        
        public String toString(){
            return this.id + "," + this.name;
        }
        
    }
    
    

    DelayQueue 带有延迟时间的Queue

    其中的元素只有当其指定的延迟时间到了,才能从队列中取出这个元素;
    DelayQueue中的元素必须实现Delayed接口;
    DelayQueue是一个没有大小限制的队列;
    应用场景包括:对缓存超时的数据进行处理,任务超时处理,空闲链接的关闭.

    这有个很有趣的例子: 模拟网吧上机

    WangBa.java 网吧主程序

    package com.bjsxt.base.coll013;
    
    import java.util.concurrent.DelayQueue;
    
    public class WangBa implements Runnable {  
        
        private DelayQueue<Wangmin> queue = new DelayQueue<Wangmin>();  
        
        public boolean yinye =true;  
          
        public void shangji(String name,String id,int money){  
            Wangmin man = new Wangmin(name, id, 1000 * money + System.currentTimeMillis());  
            System.out.println("网名"+man.getName()+" 身份证"+man.getId()+"交钱"+money+"块,开始上机...");  
            this.queue.add(man);  
        }  
          
        public void xiaji(Wangmin man){  
            System.out.println("网名"+man.getName()+" 身份证"+man.getId()+"时间到下机...");  
        }  
      
        @Override  
        public void run() {  
            while(yinye){  
                try {  
                    Wangmin man = queue.take();  
                    xiaji(man);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
        }  
          
        public static void main(String args[]){  
            try{  
                System.out.println("网吧开始营业");  
                WangBa siyu = new WangBa();  
                Thread shangwang = new Thread(siyu);  
                shangwang.start();  
                  
                siyu.shangji("路人甲", "123", 1);  
                siyu.shangji("路人乙", "234", 10);  
                siyu.shangji("路人丙", "345", 5);  
            }  
            catch(Exception e){  
                e.printStackTrace();
            }  
      
        }  
    }  
    

    Wangmin.java 网民

    
    package com.bjsxt.base.coll013;
    
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    public class Wangmin implements Delayed {  
        
        private String name;  
        //身份证  
        private String id;  
        //截止时间  
        private long endTime;  
        //定义时间工具类
        private TimeUnit timeUnit = TimeUnit.SECONDS;
          
        public Wangmin(String name,String id,long endTime){  
            this.name=name;  
            this.id=id;  
            this.endTime = endTime;  
        }  
          
        public String getName(){  
            return this.name;  
        }  
          
        public String getId(){  
            return this.id;  
        }  
          
        /** 
         * 用来判断是否到了截止时间 
         */  
        @Override  
        public long getDelay(TimeUnit unit) { 
            //return unit.convert(endTime, TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            return endTime - System.currentTimeMillis();
        }  
      
        /** 
         * 相互比较排序用 
         */  
        @Override  
        public int compareTo(Delayed delayed) {  
            Wangmin w = (Wangmin)delayed;  
            return this.getDelay(this.timeUnit) - w.getDelay(this.timeUnit) > 0 ? 1:0;  
        }  
      
    }  
    
    

    并发编程设计模式

    这些都是并发编程的设计模式.
    具体代码位置在:
    /home/zhangbing/virtualbox-shared/java_架构/并发编程代码/Multi_004

    下面介绍这些模式的主要思想.

    Future模式

    主要实现方式是wait/notify原理.
    客户端获取数据时,服务端立即返回一个data,但是要调用Data.getRequest()才能得到真正的数据,然而此方法被加锁了.所以客户端此时被阻塞了.
    此时服务端立即开启线程去获取数据,待得到数据后,设置上去,再调用notify() 唤醒getRequest()方法.客户端便拿到了数据.

    给出主要实现代码:

    FutureData.java 加锁的逻辑都在这里

    package com.bjsxt.height.design014;
    
    public class FutureData implements Data{
    
        private RealData realData ;
        
        private boolean isReady = false;
        
        public synchronized void setRealData(RealData realData) {
            //如果已经装载完毕了,就直接返回
            if(isReady){
                return;
            }
            //如果没装载,进行装载真实对象
            this.realData = realData;
            isReady = true;
            //进行通知
            notify();
        }
        
        @Override
        public synchronized String getRequest() {
            //如果没装载好 程序就一直处于阻塞状态
            while(!isReady){
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //装载好直接获取数据即可
            return this.realData.getRequest();
        }
    
    
    }
    
    

    Master-Worker模式

    这种模式用于将众多的任务(或者一个庞大的任务拆分成众多小任务)分给数个的Worker去并发执行,以提升性能.

    其核心点在于Master实例中要维护几个支持并发访问的集合,选对集合才是最重要的.
    主要代码如下:

    Master.java 主要逻辑

    
    package com.bjsxt.height.design015;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentLinkedQueue;
    
    public class Master {
    
        //1 有一个盛放任务的容器, 这个是并发容器,不会阻塞.要注意判空.
        private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();
        
        //2 需要有一个盛放worker的集合
        private HashMap<String, Thread> workers = new HashMap<String, Thread>();
        
        //3 需要有一个盛放每一个worker执行任务的结果集合
        private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
        
        //4 构造方法
        public Master(Worker worker , int workerCount){
            worker.setWorkQueue(this.workQueue);
            worker.setResultMap(this.resultMap);
            
            for(int i = 0; i < workerCount; i ++){
                this.workers.put(Integer.toString(i), new Thread(worker));
            }
            
        }
        
        //5 需要一个提交任务的方法
        public void submit(Task task){
            this.workQueue.add(task);
        }
        
        //6 需要有一个执行的方法,启动所有的worker方法去执行任务
        public void execute(){
            for(Map.Entry<String, Thread> me : workers.entrySet()){
                me.getValue().start();
            }
        }
    
        //7 判断是否运行结束的方法
        public boolean isComplete() {
            for(Map.Entry<String, Thread> me : workers.entrySet()){
                if(me.getValue().getState() != Thread.State.TERMINATED){
                    return false;
                }
            }       
            return true;
        }
    
        //8 计算结果方法
        public int getResult() {
            int priceResult = 0;
            for(Map.Entry<String, Object> me : resultMap.entrySet()){
                priceResult += (Integer)me.getValue();
            }
            return priceResult;
        }
        
    }
    
    

    Worker.java 工作线程的实现

    package com.bjsxt.height.design015;
    
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentLinkedQueue;
    
    public class Worker implements Runnable {
    
        private ConcurrentLinkedQueue<Task> workQueue;
        private ConcurrentHashMap<String, Object> resultMap;
        
        public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) {
            this.workQueue = workQueue;
        }
    
        public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
            this.resultMap = resultMap;
        }
        
        @Override
        public void run() {
            while(true){
                Task input = this.workQueue.poll();
                if(input == null) break;
                Object output = handle(input);
                this.resultMap.put(Integer.toString(input.getId()), output);
            }
        }
    
        private Object handle(Task input) {
            Object output = null;
            try {
                //处理任务的耗时。。 比如说进行操作数据库。。。
                Thread.sleep(500);
                output = input.getPrice();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return output;
        }
    
    
    
    }
    
    

    消费者-生产者模式

    此模式的核心点:
    生产者和消费者要共享一个缓冲区(集合容器),他们一个放入,一个取出.
    这个缓冲区是个阻塞容器(BlockingQueue),当容器中没有元素(可能是生产者停止了)时,消费者会被阻塞.
    在主函数中创建这个容器,并实例化生产者和消费者,并将缓冲区的引用传给他们(通过构造函数).

    主程序

    package com.bjsxt.height.design016;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class Main {
    
        public static void main(String[] args) throws Exception {
            //内存缓冲区
            BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10);
            //生产者
            Provider p1 = new Provider(queue);
            
            Provider p2 = new Provider(queue);
            Provider p3 = new Provider(queue);
            //消费者
            Consumer c1 = new Consumer(queue);
            Consumer c2 = new Consumer(queue);
            Consumer c3 = new Consumer(queue);
            //创建线程池运行,这是一个缓存的线程池,可以创建无穷大的线程,没有任务的时候不创建线程。空闲线程存活时间为60s(默认值)
    
            ExecutorService cachePool = Executors.newCachedThreadPool();
            cachePool.execute(p1);
            cachePool.execute(p2);
            cachePool.execute(p3);
            cachePool.execute(c1);
            cachePool.execute(c2);
            cachePool.execute(c3);
    
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            p1.stop();
            p2.stop();
            p3.stop();
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }       
    //      cachePool.shutdown(); 
    //      cachePool.shutdownNow();
            
    
        }
        
    }
    
    

    生产者

    package com.bjsxt.height.design016;
    
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class Provider implements Runnable{
        
        //共享缓存区
        private BlockingQueue<Data> queue;
        //多线程间是否启动变量,有强制从主内存中刷新的功能。即时返回线程的状态
        private volatile boolean isRunning = true;
        //id生成器
        private static AtomicInteger count = new AtomicInteger();
        //随机对象
        private static Random r = new Random(); 
        
        public Provider(BlockingQueue queue){
            this.queue = queue;
        }
    
        @Override
        public void run() {
            while(isRunning){
                try {
                    //随机休眠0 - 1000 毫秒 表示获取数据(产生数据的耗时) 
                    Thread.sleep(r.nextInt(1000));
                    //获取的数据进行累计...
                    int id = count.incrementAndGet();
                    //比如通过一个getData方法获取了
                    Data data = new Data(Integer.toString(id), "数据" + id);
                    System.out.println("当前线程:" + Thread.currentThread().getName() + ", 获取了数据,id为:" + id + ", 进行装载到公共缓冲区中...");
                    if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){
                        System.out.println("提交缓冲区数据失败....");
                        //do something... 比如重新提交
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        
        public void stop(){
            this.isRunning = false;
        }
        
    }
    
    

    消费者

    package com.bjsxt.height.design016;
    
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    public class Consumer implements Runnable{
    
        private BlockingQueue<Data> queue;
        
        public Consumer(BlockingQueue queue){
            this.queue = queue;
        }
        
        //随机对象
        private static Random r = new Random(); 
    
        @Override
        public void run() {
            while(true){
                try {
                    //获取数据
                    Data data = this.queue.take();
                    //进行数据处理。休眠0 - 1000毫秒模拟耗时
                    Thread.sleep(r.nextInt(1000));
                    System.out.println("当前消费线程:" + Thread.currentThread().getName() + ", 消费成功,消费数据为id: " + data.getId());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    

    Concurrent.util常用类

    这些常用类可以帮助快速设计并发程序

    CountDownLatch

    常用于监听某个初始化操作,等初始化执行完毕后,通知主线程继续工作.

    注意点:
    阻塞的是调用 awaite()方法的线程.
    等待countDown()次数达到后,阻塞的线程才继续.

    package com.bjsxt.height.concurrent019;
    
    import java.util.concurrent.CountDownLatch;
    
    public class UseCountDownLatch {
    
        public static void main(String[] args) {
            
            final CountDownLatch countDown = new CountDownLatch(2);
            
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("进入线程t1" + "等待其他线程处理完成...");
                        countDown.await();
                        System.out.println("t1线程继续执行...");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"t1");
            
            Thread t2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("t2线程进行初始化操作...");
                        Thread.sleep(3000);
                        System.out.println("t2线程初始化完毕,通知t1线程继续...");
                        countDown.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            Thread t3 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("t3线程进行初始化操作...");
                        Thread.sleep(4000);
                        System.out.println("t3线程初始化完毕,通知t1线程继续...");
                        countDown.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            
            t1.start();
            t2.start();
            t3.start();
            
            
            
        }
    }
    
    

    CyclicBarrier

    假设一个场景:
    每个线程代表一个跑步的运动员,所有运动员站在起跑线等,
    等所有人都准备好,才一起跑.

    package com.bjsxt.height.concurrent019;
    import java.io.IOException;  
    import java.util.Random;  
    import java.util.concurrent.BrokenBarrierException;  
    import java.util.concurrent.CyclicBarrier;  
    import java.util.concurrent.ExecutorService;  
    import java.util.concurrent.Executors; 
    public class UseCyclicBarrier {
    
        static class Runner implements Runnable {  
            private CyclicBarrier barrier;  
            private String name;  
            
            public Runner(CyclicBarrier barrier, String name) {  
                this.barrier = barrier;  
                this.name = name;  
            }  
            @Override  
            public void run() {  
                try {  
                    Thread.sleep(1000 * (new Random()).nextInt(5));  
                    System.out.println(name + " 准备OK.");  
                    barrier.await();  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                } catch (BrokenBarrierException e) {  
                    e.printStackTrace();  
                }  
                System.out.println(name + " Go!!");  
            }  
        } 
        
        public static void main(String[] args) throws IOException, InterruptedException {  
            CyclicBarrier barrier = new CyclicBarrier(3);  // 3 
            ExecutorService executor = Executors.newFixedThreadPool(3);  
            
            executor.submit(new Thread(new Runner(barrier, "zhangsan")));  
            executor.submit(new Thread(new Runner(barrier, "lisi")));  
            executor.submit(new Thread(new Runner(barrier, "wangwu")));  
      
            executor.shutdown();  
        }  
      
    }  
    

    Executor 线程池

    常用线程池

    java.util.concurrent.Executors 是一个线程池框架,可以创建以下几种线程池:

    newFixedThreadPool() 返回固定大小的线程池,线程数始终不变,若线程池有空闲,则立即执行,若没有,则把任务缓存在队列中等待.
    newSingleThreadPool() 返回单个线程的线程池,其他同上.
    newCachedThreadPool() 可缓存的线程池,线程数可以根据任务量自动变化.
    newScheduledThreadPool() 固定长度的线程池,可以使用延迟或者定时来执行任务.

    package com.bjsxt.height.concurrent017;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    
    public class UseExecutors {
    
        public static void main(String[] args) {
            
            ExecutorService pool = Executors.newScheduledThreadPool(10);
            
            //cache fixed single
            
            
            
        }
    }
    
    

    线程池底层创建方法

    主函数

    package com.bjsxt.height.concurrent018;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
    
    
    
    public class UseThreadPoolExecutor1 {
    
    
        public static void main(String[] args) {
            /**
             * 在使用有界队列时,若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程,
             * 若大于corePoolSize,则会将任务加入队列,
             * 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程,
             * 若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。
             * 
             */ 
            ThreadPoolExecutor pool = new ThreadPoolExecutor(
                    1,              //核心线程数,线程池始终保有这些线程,当缓存没满时,也只用核心线程干活,不起多余线程.
                    2,              //最大线程数,当缓存满了时,再来任务会起额外线程,但是绝不超过最大线程数.
                    60,             //空闲线程回收时间.
                    TimeUnit.SECONDS, //回收单位
                    new ArrayBlockingQueue<Runnable>(3)         //指定一种队列 (有界队列)
                    //new LinkedBlockingQueue<Runnable>()
                    , new MyRejected()                          //指定拒绝策略,可自定义.
                    //, new DiscardOldestPolicy()
                    );
            
            MyTask mt1 = new MyTask(1, "任务1");
            MyTask mt2 = new MyTask(2, "任务2");
            MyTask mt3 = new MyTask(3, "任务3");
            MyTask mt4 = new MyTask(4, "任务4");
            MyTask mt5 = new MyTask(5, "任务5");
            MyTask mt6 = new MyTask(6, "任务6");
            
            pool.execute(mt1);
            pool.execute(mt2);
            pool.execute(mt3);
            pool.execute(mt4);
            pool.execute(mt5);
            pool.execute(mt6);
            
            pool.shutdown();
            
        }
    }
    
    

    自定义拒绝策略

    package com.bjsxt.height.concurrent018;
    
    import java.net.HttpURLConnection;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadPoolExecutor;
    
    public class MyRejected implements RejectedExecutionHandler{
    
        
        public MyRejected(){
        }
        
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println("自定义处理..");
            System.out.println("当前被拒绝任务为:" + r.toString());
            
    
        }
    
    }
    
    

    Lock 使用JDK锁

    ReentrantLock 重入锁

    ReentrantLock和synchronized关键字差不多,只是用起来更灵活.

    锁的notify/signal 方法是唤醒一个awaite线程,
    notifyAll/signalAll 唤醒所有 awaite线程.

    使用重入锁

    package com.bjsxt.height.lock020;
    
    import java.util.concurrent.CopyOnWriteArrayList;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class UseReentrantLock {
        
        private Lock lock = new ReentrantLock();
        
        public void method1(){
            try {
                lock.lock();
                System.out.println("当前线程:" + Thread.currentThread().getName() + "进入method1..");
                Thread.sleep(1000);
                System.out.println("当前线程:" + Thread.currentThread().getName() + "退出method1..");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                
                lock.unlock();
            }
        }
        
        public void method2(){
            try {
                lock.lock();
                System.out.println("当前线程:" + Thread.currentThread().getName() + "进入method2..");
                Thread.sleep(2000);
                System.out.println("当前线程:" + Thread.currentThread().getName() + "退出method2..");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                
                lock.unlock();
            }
        }
        
        public static void main(String[] args) {
    
            final UseReentrantLock ur = new UseReentrantLock();
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    ur.method1();
                    ur.method2();
                }
            }, "t1");
    
            t1.start();
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //System.out.println(ur.lock.getQueueLength());
        }
        
        
    }
    
    

    使用Condition

    Condition的功能与 synchronized 中的 awaite/notify 一样.

    package com.bjsxt.height.lock020;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class UseCondition {
    
        private Lock lock = new ReentrantLock();
        private Condition condition = lock.newCondition();
        
        public void method1(){
            try {
                lock.lock();
                System.out.println("当前线程:" + Thread.currentThread().getName() + "进入等待状态..");
                Thread.sleep(3000);
                System.out.println("当前线程:" + Thread.currentThread().getName() + "释放锁..");
                condition.await();  // Object wait
                System.out.println("当前线程:" + Thread.currentThread().getName() +"继续执行...");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        
        public void method2(){
            try {
                lock.lock();
                System.out.println("当前线程:" + Thread.currentThread().getName() + "进入..");
                Thread.sleep(3000);
                System.out.println("当前线程:" + Thread.currentThread().getName() + "发出唤醒..");
                condition.signal();     //Object notify
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        
        public static void main(String[] args) {
            
            final UseCondition uc = new UseCondition();
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    uc.method1();
                }
            }, "t1");
            Thread t2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    uc.method2();
                }
            }, "t2");
            t1.start();
    
            t2.start();
        }
        
        
        
    }
    
    

    使用多Condition

    一个锁可以申明多个Condition,更加灵活.

    package com.bjsxt.height.lock020;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class UseManyCondition {
    
        private ReentrantLock lock = new ReentrantLock();
        private Condition c1 = lock.newCondition();
        private Condition c2 = lock.newCondition();
        
        public void m1(){
            try {
                lock.lock();
                System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m1等待..");
                c1.await();
                System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m1继续..");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        
        public void m2(){
            try {
                lock.lock();
                System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m2等待..");
                c1.await();
                System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m2继续..");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        
        public void m3(){
            try {
                lock.lock();
                System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m3等待..");
                c2.await();
                System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m3继续..");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        
        public void m4(){
            try {
                lock.lock();
                System.out.println("当前线程:" +Thread.currentThread().getName() + "唤醒..");
                c1.signalAll();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        
        public void m5(){
            try {
                lock.lock();
                System.out.println("当前线程:" +Thread.currentThread().getName() + "唤醒..");
                c2.signal();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        
        public static void main(String[] args) {
            
            
            final UseManyCondition umc = new UseManyCondition();
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    umc.m1();
                }
            },"t1");
            Thread t2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    umc.m2();
                }
            },"t2");
            Thread t3 = new Thread(new Runnable() {
                @Override
                public void run() {
                    umc.m3();
                }
            },"t3");
            Thread t4 = new Thread(new Runnable() {
                @Override
                public void run() {
                    umc.m4();
                }
            },"t4");
            Thread t5 = new Thread(new Runnable() {
                @Override
                public void run() {
                    umc.m5();
                }
            },"t5");
            
            t1.start(); // c1
            t2.start(); // c1
            t3.start(); // c2
            
    
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            t4.start(); // c1
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            t5.start(); // c2
            
        }
        
        
        
    }
    
    

    ReentrantReadWriteLock 读写锁

    在高并发访问下,尤其是读多写少,性能极强,远高于重入锁.

    其本质是读写分离的2把锁.
    在读锁下,多线程可以并发访问.
    在写锁下,只能顺序访问.

    口诀:
    读读共享,读写互斥,写写互斥.

    package com.bjsxt.height.lock021;
    
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
    import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
    
    public class UseReentrantReadWriteLock {
    
        private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
        private ReadLock readLock = rwLock.readLock();
        private WriteLock writeLock = rwLock.writeLock();
        
        public void read(){
            try {
                readLock.lock();
                System.out.println("当前线程:" + Thread.currentThread().getName() + "进入...");
                Thread.sleep(3000);
                System.out.println("当前线程:" + Thread.currentThread().getName() + "退出...");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                readLock.unlock();
            }
        }
        
        public void write(){
            try {
                writeLock.lock();
                System.out.println("当前线程:" + Thread.currentThread().getName() + "进入...");
                Thread.sleep(3000);
                System.out.println("当前线程:" + Thread.currentThread().getName() + "退出...");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                writeLock.unlock();
            }
        }
        
        public static void main(String[] args) {
            
            final UseReentrantReadWriteLock urrw = new UseReentrantReadWriteLock();
            
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    urrw.read();
                }
            }, "t1");
            Thread t2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    urrw.read();
                }
            }, "t2");
            Thread t3 = new Thread(new Runnable() {
                @Override
                public void run() {
                    urrw.write();
                }
            }, "t3");
            Thread t4 = new Thread(new Runnable() {
                @Override
                public void run() {
                    urrw.write();
                }
            }, "t4");       
            
    //      t1.start();
    //      t2.start();
            
    //      t1.start(); // R 
    //      t3.start(); // W
            
            t3.start();
            t4.start();
            
        }
    }
    
    

    相关文章

      网友评论

          本文标题:java并发编程

          本文链接:https://www.haomeiwen.com/subject/apzgittx.html