美文网首页
java 并发编程精华一页纸

java 并发编程精华一页纸

作者: 轩居晨风 | 来源:发表于2017-04-11 22:09 被阅读70次

    1、线程安全与锁

    线程安全的本质,在于 存在了共享的可变状态 status, 在多线程共同操作状态变量时,当计算的正确性依赖于运行时相关的时序(比如i++,先取出i,再+1,然后赋值),就会出现竞争(竞态)

    无状态对象永远是线程安全的.

    所以线程安全 三步骤:a、无共享状态;b、共享状态不可变;c、共享状态同步

    要做到状态同步,就必须要通过锁 or volatile;本章节先讨论锁,被锁保护的代码块可以认为是一间屋子,只有唯一的锁可以打开,即同时只能只有一个线程进入。其他进入的线程必须拿到这把锁。这样就保证了数据的安全性。

    I、内部锁 synchronized

    三种用法:a、锁住变量 (这个变量就是锁);b、锁住方法 (拥有这个方法的对象是锁) ;c、锁住静态方法 (这个类的Class对象是锁)

    每个java对象都自带隐式锁(也称为监视器锁)

    可重入 - 每个线程可以对同一个锁多次获取,比如 下面这个例子 a 获取一次,b还要再获取,获取时计数\退出时减少(像不像早期的垃圾回收)

    public synchronized a(){

    b();

    }

    public synchronized b(){

    }

    II、显式锁 ReentrantLock

    ReentrantLock + Condition 替换了原先的 synchronized + object

    Condition 实现了 资源的 竞争与释放 ; Lock 实现了 代码的隔离

    Lock lock = new ReentrantLock();

    Waits = lock.newConidition();

    Lock.lock();

    Try{

    Waits.await() – 等同于 原先的锁wait

    }finnaly{

    Lock.unlock

    }

    显示锁有什么好处?

    a、tryLock( xxx ) -- 可以设置超时,无法获取的时候,这样可以避免死锁

    b、lockInterruptibly(); -- 可以响应中断的锁

    c、性能提升? -- java6 以后的版本已经没有性能优势了

    d、读写分离

    显式锁里面,有专门的 ReentrantReadWriteLock 读写分离锁。好吧,这里又看到类似数据库的设计思路了。

    读锁可以多线程同时进入,是共享的。写锁只能单线程进入,是排他的

    e、公平

    初始化 ReentrantLock 时,可以指定是否是公平的。何解? 因为线程调度唤醒线程时,是随机的,有可能先来的线程未获取到,这就不公平了。指定公平性,可以按等待顺序获取锁

    有啥缺点?

    每次都需要显式手工finnaly 关闭

    内部锁 or 显式锁?

    如何选择两者。jdk 推荐 默认使用 内部锁,因为简单、方便;如果有特殊需求的,如上显示锁描述的好处。

    III、死锁

    a、出现死锁的条件:

    互斥 - 资源只能被一个线程占有

    请求与保持 - 占住的不想放,想要的非要

    不可剥夺 - 占住的不能抢走

    循环等待 - 只要有人放手就解开了,就是没人放手

    b、如果预防死锁?

    排好队 - 如果非要获取多个锁,那所有的获取锁方式都要 按照一定顺序

    中断(抢夺) - 显式锁可以中断某个锁

    超时(放弃) - 显式锁可以设置超时放弃锁

    c、如何检测死锁?

    打印堆栈

    win( Ctrl + Break ) linux (kill -3)

    Jmap+jhat+jstatck

    监视工具

    JConsole | Visual VM

    IV、使用锁的建议

    a、如何选择内部锁和显式锁,默认内部锁,特殊情况比如定时、中断使用显式。(参见上文描述)

    b、尽量减少同步的范围、 不要在 方法上加锁,而是在方法内部

    c、多个锁按顺序获取,可以把获取的地方统一隔离,放在一起,通过对象hash值identityHashCode安排顺序

    2、线程安全与对象(非阻塞式实现)

    同步是 保护 对象(数据) 的一种方式,除了用锁这种同步方式,还有其他效率更高的方式。

    I、volatile - 共享变量

    系统会监视这个变量,对它的操作不会和其他内存操作重排序,也就是说对volatile 变量的操作当成是原子操作;所有的修改其他线程都可见,也就不需要加锁同步了。

    volatile int a;

    对volatile的使用有一些限制,主要是自增类的操作不能使用(比如i++)、只能单独使用、不能组合使用

    II、CAS 与原子类型 -- CAS = 比较并设置CompareAndSet

    实现思路很简单,就是当修改的值和 现在内存的值比较,如果一致了,就修改生效;如果不一致就一直循环检查,直到生效为止。

    public final long getAndDecrement() {

    while (true) {

    long current = get();

    long next = current - 1;

    if (compareAndSet(current, next))

    return current;

    }

    }

    volatile和 CAS 都是CPU层面提供的底层技术,不是语言本身提供的技术,其中CAS的实现还需要调用 JNI 本地化实现。

    volatile + CAS 构成了java的原子类型库 java\util\concurrent\atomic

    a、基本类型的原子包装器 AtomicInteger | AtomicLong | AtomicBoolean

    b、数组类型 AtomicIntegerArray ...

    c、对象类型 AtomicReference (包装一个类) | 域更新器 AtomicReferenceFieldUpdater(利用反射,更细粒度的操作)

    -- java自身的cocurrent类库也有使用原子类型的,比如ConcurrentLinkedQueue 就使用了 域更新器实现 具体链表 字段的更新

    该如何选择 锁 or 原子类型+volatile?

    一般需要用在控制变量、标记值等等应用场景应该选用 原子类型这些非阻塞的同步方式;如果有大段代码复杂逻辑需要保护,则采用同步方式。

    III、封闭线程Ad-hoc 与ThreadLocal

    把数据封闭在线程内部,就不存在共享状态的问题,也就是线程安全了,最典型的应用就是 ThreadLocal,网上对ThreadLocal很多理解都有误区

    a、每个线程绑定一个 ThreadLocal.ThreadLocalMap 对象,线程内部的局部变量。

    b、存放对象 set(T) 实际上就是 把 ThreadLocal 对象作为key ,T 作为value put 到 线程的map对象中

    c、获取对象 get,实际上就是把 this自身的对象 作为key 从 线程map中取出数据

    几个关键问题

    a、多个线程的 ThreadLocal 对象虽然是同一个,但Map 是线程自己的,所以在调用 set get 时,获取的是自己Map存储的 value,实现了数据封闭

    b、设置的value 必须是新的对象或者基本类型,否则如果设置的 是同一个 对象的引用的话,取出来还是同一个引用,就达到不到隔离的效果。

    c、如果需要 在所有线程共享一个初始数据,可以 继承ThreadLocal,扩展实现 initialValue 方法,这样所有线程都能看到初始化数据

    private static ThreadLocal> threadLog = new ThreadLocal>(){

    protected List initialValue(){

    return new ArrayList();

    }

    };

    总结:ThreadLocal本质上是一个 空间换时间的 实现,通过在多个线程中 存储不同的拷贝,实现了线程安全

    3、线程生命周期

    I、五种状态

    新建 - 两种方法创建多线程:继承java.lang.Thread,覆盖run方法 | 实现java.lang.Runnable接口,实现run方法

    就绪 - Thread.start - 对应run 方法

    运行 - 线程抢到CPU开始运行

    阻塞 - 遇到 锁、休眠、IO等情况线程处于阻塞态 (等待、休眠)

    终止 - 正常结束,被中断 Thread.interrrupt

    II、改变状态

    a、sleep - 当前线程休眠

    b、yield - 当前线程让渡CPU给同级别的线程

    c、wait - 阻塞等待其他资源的锁

    d、join - 傻傻等其他线程结束后再结束

    sleep vs wait

    sleep 只是线程休眠,并不会释放占有的资源,wait会释放所有的资源;wait 是资源上的方法,而休眠是线程方法

    sleep vs yield

    sleep 线程切入休眠状态,等待一定时间才唤醒抢占CPU;yield 只是暂时让渡CPU

    在构造函数中启动线程是危险的,因为可能构造函数还未构造完成,如果回调或者其他就会出现异常。避免在构造函数启动线程。

    III、线程优先级

    调度器根据线程优先级作为参考,决定多长时间调度该线程 ;Thread类中定义了默认3个级别 1 MIN ,5 NORMAL ,10 MAX

    4、线程间通讯

    严格来说 join 方法也算是一种线程间通讯,一个线程等待另一个线程处理结束,才继续。

    线程间通讯,不像进程间通讯Socket、共享内存、系统级别信号量,线程间因为共享资源,所以通讯方式大都是通过共享的资源。

    传统方式

    I、经典通讯方式 wait/notify/notifyAll

    使用wait/notify的正确姿势

    线程A

    synchronized(list){

    while(list.size == 0)

    list.wait();

    }

    线程B

    synchronized(list){

    list.add(xxx);

    list.notifyAll();

    }

    a、wait 方法一般都要写在synchronized 的循环里

    b、synchronized 锁住的对象和 wait的对象一般是同一个对象

    c、使用notifyAll 而不是 notify

    wait 有无参数? 有参数只是多了一个唤醒条件,唤醒后和wait一样继续抢占资源

    notify 和 notifyAll ? notifyAll唤醒所有线程,只有一个线程抢到,notify只随机唤醒一个线程

    线程屏障

    II、闭锁 CountDownLatch 和 关卡 CyclicBarrier

    a、从功能上看,两者很类似,都是用作阻塞的线程控制。

    b、从语义上看,略有区别,闭锁等待的是事件、关卡等待的是线程,两者都是在await等待;闭锁通过 countDown 这个事件、触发等待的线程,而关卡需要互相等待、等所有线程处理完成后、触发等待的线程。

    c、更大的区别则是:CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。

    III、信号量 Semaphore

    和操作系统的信号量概念一样,允许一个资源被被操作使用的次数

    Semaphore s = new Semaphore(100);

    申请 s.acquire()

    释放 s.release()

    以上三种使用方法,wait/闭锁/信号量 某种程度上都属于资源申请;而闭锁,则是单纯的线程等待控制。下面介绍另一种方式,数据交换.

    IV、Exchanger 数据交换

    只能两个线程进行数据交换

    Exchange exc = new Exchange();

    A线程执行 IN = exc.exchange(OUT) ,此时A线程阻塞

    B线程执行 OUT = exc.exchange(IN) ,此时A、线程解除阻塞,双方数据发生了交换

    5、线程池

    线程池一般有两种实现方式:一种是 任务池,各个线程循环去池里取任务;一种是 线程池,提交任务后,从池里取出空闲线程处理。

    I、ThreadPoolExecutor

    任务队列BlockingQueue - 使用了阻塞队列作为提交的任务,接受继承 Runnable 的任务

    线程池HashSet - 最少corePoolSize 最大 maximumPoolSize 空闲时间 keepAliveTime ;这三个参数决定 线程数目在 最小和最大之间弹性处理(当 > 最少,且 > 空闲时间,释放一部分空闲线程)

    线程工厂ThreadFactory - 提供创建线程的方法 (还是为了扩展性, 给应用更多的创建线程前后的处理工作),Executors框架提供了一些线程工厂的实现

    工作线程Worker - 实现了Runnable ,绑定了一个线程(ThreadFactory 创建线程时,传入),执行提交的任务外,还有beforeExecute afterExecute 的钩子函数

    队列管理

    ThreadPoolExecutor 的队列管理是开放出来由外界提供 BlockingQueue的实现,具体参见下面 并发框架的 Executors 提供的一些默认队列

    饱和策略

    当有限队列充满后(即无法把任务提交到 BlockingQueue中去),就要有饱和策略。ThreadPoolExecutor的饱和策略,可以通过调用setRejectedExecutionHandler来修改

    中止(abort)

    遗弃(discard)

    遗弃最旧的(discard-oldest)

    调用者运行(caller-runs)

    II、Executors/ExecutorServie(Executor) 框架

    a、Executor 框架 -- 其实就是一个 Command模式

    接口非常简单,把一个Runnable线程作为参数

    void execute(Runnable command);

    一般情况下,不需要自己定义一个线程,而是提交任务给Executor框架

    b、ExecutorService 框架 -- Active Object 模式(主动对象模式)

    调用线程 方法的调用 和被调用线程的 方法执行 进行解耦

    被调用线程内部绑定一个 线程对象,即ThredPoolExecutor 的Worker,自己管理线程的状态

    获取被调用线程的引用

    ExecutorService service = Executors.newFixedThreadPool(NTHREADS);

    提交

    service.execute(task);

    service.submit(task);

    还可以管理executor的生命周期

    service.shutdown();

    c、队列

    使用无限队列 LinkedBlockingQueue (对于ThreadPoolExecutor 来说这些队列都是有限的,可以把指定容量的队列给他,但Executors 默认的都是无限未指定容量的队列)

    newFixedThreadPool -- 定长的线程池

    newSingleThreadExecutor -- 单线程化的 executor

    使用同步移交队列 SynchronousQueue (没有内存的队列,直接把队列从 提交者 给 消费者,据说等待性能更好?少了数据结构的操作开销?)

    newCachedThreadPool -- 可缓存的线程池

    周期线程池

    newScheduledThreadPool - 定长线程池,支持定时和周期的任务执行

    Timer和TimerTask 的改进版本

    原先的Timer存在一些缺陷,Timer调度是基于绝对时间,对系统时间改变很敏感。(ScheduledThreadPoolExecutor 支持相对时间);所有timer任务在一个线程执行,如果耗时操作导致其他任务延迟,会出现一些不可预料的错误Timer中如果timetask执行失败,整个timer的任务都失败

    // 定时器

    public class Timer {

    public static final int TIME_DEFAULT = 15 * 60;

    protected ScheduledExecutorService service = Executors.newScheduledThreadPool(1);

    protected int period;

    protected Runnable runnable;

    public Timer(Runnable runnable){

    this(TIME_DEFAULT, runnable);

    }

    public Timer(int period, Runnable runnable){

    this.period = period;

    this.runnable = runnable;

    }

    public void start(){

    service.scheduleAtFixedRate(runnable, 0, period, TimeUnit.SECONDS);

    }

    public void destroy(){

    service.shutdown();

    }

    // 绝对定时器,从下一个粒度整点开始

    public static class AbsoluteTimer extends Timer{

    public AbsoluteTimer(Runnable runnable) {

    super(runnable);

    }

    public AbsoluteTimer(int period, Runnable runnable) {

    super(period, runnable);

    }

    @Override

    public void start(){

    service.scheduleAtFixedRate(runnable, Utils.getNextWholeTime(new Date(), period), period, TimeUnit.SECONDS);

    }

    }

    }

    d、线程工厂

    DefaultThreadFactory

    PrivilegedThreadFactory

    III、任务管理

    任务的执行,很容易操作,直接Thread 的start,或者提交到 任务执行框架 Executor执行

    任务取消和停止,如何控制? 怎么知道任务执行的结果? 怎么隔离一个耗时任务 ?

    取消的方法一:通过共享一个变量标志来进行任务的协调。- cancellation requested

    中断的方法二:通过interrupt 中断方法,取消一个线程的执行

    获取返回值

    通过 局部变量,或者返回函数 -- 不可行,因为主线程和子线程并没有先后顺序的约束,受限于CPU、IO等复杂因素,结果是未知的

    轮询 -- 可行,采用类似阻塞方式,不停的去检查是否完成,但多做了很多无用功,占用了CPU

    回调 -- 正解

    Future/FutureTask + Callable - 增强版的任务执行

    Callable 提供 Runnable的增强功能 可以支持返回值

    非阻塞的模型:Promise,Future 和 Callback 典型架构,Future 是异步执行方式

    下面是一个简版的 任务执行框架,可以把任务提交给Executors框架,通过 Future 来

    // 执行任务

    static interface Function{

    Out action() throws Exception ;

    }

    // 提交任务框架, 任何一次性执行的, 有可能延迟的都可以使用

    private static Out submit(final Function in){

    final ExecutorService exec = Executors.newSingleThreadExecutor();

    Future future = exec.submit(new Callable(){

    @Override

    public Out call() throws Exception {

    return in.action();

    }

    });

    Out result = null;

    try {

    result = future.get(STREAM_TIMEOUT_PER_TIME, TimeUnit.SECONDS);

    } catch (Exception e) {

    log.error(String.format("Task Thread %d Get submit error : %s ", Thread.currentThread().getId(), e.getMessage()), e.getCause());

    }finally{

    exec.shutdown();

    }

    return result;

    }

    Fork/Join 框架?

    IV、CompletionService:Executor 和 BlockingQueue的组合

    整合了 Executor 和 BlockingQueue的功能。调用时,把Callable任务提交给他,同时像quue一样取出take和poll方法,返回的是一个Future

    CompletionService 和 直接调用ExecutorService 区别在于,如果ExecutorService 启动多个任务,获取返回值时,多个Future需要管理起来,现在CompletionService直接用BlockingQueque对Future进行管理。

    V、其他

    线程增加 UncaughtExceptionHandler 捕获未知的线程异常,通知框架和应用

    6、其他

    JVM线程关闭 时,会有一个钩子函数

    Runtime.getRuntime().addShutdownHook(new Thread(){

    public void run(){

    // 在这里做一些清理的工作。

    }

    });

    守护线程 - 精灵线程 (dameon)

    后台服务线程,和JVM线程同在

    用户线程和守护线程区别不大,用户线程随时可以退出,守护线程一直到虚拟机的生命周期。 没有用户线程,守护线程和JVM一起退出。

    一个完整的实际使用案例 - 这个可以用 JDK7的 Fork/Join 改写

    // 下载调度线程

    public class DownLoadTask {

    public int MAX_THREAD ;

    public int MAX_RECORD_PER_THREAD ;

    public static final int DEFAULT_TIME_OUT = 120000;

    private FileWriter writer;

    private List list;

    private ExecutorService downPool ;

    private PriorityBlockingQueue container;

    private CyclicBarrier oneTimes;

    private AtomicInteger interrupt = new AtomicInteger(0);

    private AtomicInteger timeout = new AtomicInteger(0);

    private static Logger log = Logger.getLogger(DownLoadTask.class);

    public DownLoadTask(List list, FileWriter writer){

    this.list = list;

    this.writer = writer;

    }

    public void setMAX_THREAD(int MAX_THREAD) {

    this.MAX_THREAD = MAX_THREAD;

    }

    public void setMAX_RECORD_PER_THREAD(int MAX_RECORD_PER_THREAD) {

    this.MAX_RECORD_PER_THREAD = MAX_RECORD_PER_THREAD;

    }

    public void run(){

    int ONE_SORT_TOTAL_NUM = MAX_THREAD * MAX_RECORD_PER_THREAD;

    downPool = Executors.newFixedThreadPool(MAX_THREAD);

    container = new PriorityBlockingQueue(ONE_SORT_TOTAL_NUM);

    int recordsSize = list.size();

    int repeatCount = recordsSize / ONE_SORT_TOTAL_NUM;

    int lastRecords = recordsSize % ONE_SORT_TOTAL_NUM;

    int lastRecordsWorkerNum = (int) ceil((double)lastRecords / (double) MAX_RECORD_PER_THREAD);

    int lastRecordsLastWorkNum = lastRecords % MAX_RECORD_PER_THREAD;

    log.info("repeatCount size:"+ repeatCount);

    log.info("lastRecords size:"+ lastRecords);

    log.info("lastRecordsWorkerNum:"+ lastRecordsWorkerNum);

    log.info("lastRecordsLastWorkNum:"+ lastRecordsLastWorkNum);

    if(repeatCount==0){

    oneTimes = new CyclicBarrier(lastRecordsWorkerNum + 1, new Combine());

    }else if(repeatCount > 0){

    oneTimes = new CyclicBarrier(MAX_THREAD + 1, new Combine());

    }

    int i = 0;

    int count = 0;

    int index = 0;

    List listfile = new ArrayList();

    for(int j=0;j

    SortFileData sortfile = new SortFileData();

    sortfile.setFilestruct(list.get(j));

    sortfile.setIndex(j);

    listfile.add(sortfile);

    i++;

    if(listfile.size()== MAX_RECORD_PER_THREAD){

    downPool.submit(new Worker(index, listfile));

    index++;

    if(i % ONE_SORT_TOTAL_NUM == 0 ){

    count++;

    waitForComplete();

    if(count == repeatCount && lastRecords != 0){

    oneTimes = new CyclicBarrier(lastRecordsWorkerNum + 1, new Combine());

    }

    }

    listfile = new ArrayList();

    }

    }

    if (lastRecordsLastWorkNum>0){

    downPool.submit(new Worker(index, listfile));

    }

    if(lastRecordsWorkerNum != 0){

    waitForComplete();

    }

    log.info(String.format("result [interrup = %d, timeout = %d]", interrupt.get(), timeout.get()));

    }

    // 线程阻塞

    public void waitForComplete(){

    try {

    oneTimes.await(DEFAULT_TIME_OUT, TimeUnit.MILLISECONDS);

    } catch (InterruptedException e) {

    log.error("Thread interrupted.", e.getCause());

    interrupt.incrementAndGet();

    } catch (BrokenBarrierException e) {

    log.info("CyclicBarrier reset", e.getCause());

    } catch (TimeoutException e) {

    log.error("Thread timeout.", e.getCause());

    timeout.incrementAndGet();

    }

    }

    // 用于排序的查询结果

    static class SortFileData implements Comparable{

    int index;

    byte[] data;

    private FileStruct filestruct;

    public void setIndex(int index) {

    this.index = index;

    }

    public void setData(byte[] data) {

    this.data = data;

    }

    public FileStruct getFilestruct() {

    return filestruct;

    }

    public int getIndex(){

    return index;

    }

    public byte[] getData(){

    return data;

    }

    public void setFilestruct(FileStruct filestruct) {

    this.filestruct = filestruct;

    }

    @Override

    public int compareTo(SortFileData o) {

    SortFileData that = o;

    if(this.getIndex() > that.getIndex()) return 1;

    else if(this.getIndex() < that.getIndex()) return -1;

    return 0;

    }

    }

    //工作线程

    class Worker implements Runnable{

    private FileStruct struct;

    private int index;

    private List listfile;

    private DsuQuery query;

    Worker(int index, List listfile){

    this.index = index;

    this.listfile = listfile;

    }

    public void setQuery(DsuQuery query) {

    this.query = query;

    }

    @Override

    public void run() {

    query = new DsuQuery();

    for (SortFileData file:listfile) {

    byte[] data = Decode.decode(query.queryOnce(file.getFilestruct()),file.getFilestruct());

    //byte[] data =new byte[0];

    file.setData(data);

    container.offer(file);

    }

    waitForComplete();

    }

    }

    //合并线程,取出数据,并合并写入文件

    class Combine implements Runnable{

    @Override

    public void run() {

    log.info("container size is "+ container.size());

    while(!container.isEmpty()){

    SortFileData result = container.poll();

    writer.write(result.data);

    //log.info("start write size is "+ result.data.length);

    }

    oneTimes.reset();

    }

    }

    public static void main(String[] args) throws IOException {

    List list = new ArrayList();

    for(int i = 0 ; i < 14; i++){

    FileStruct f = new FileStruct();

    f.fileName = "test";

    f.start = i;

    f.length = 1;

    f.timestab=i;

    list.add(f);

    }

    LocalFileWriter filewriter = new LocalFileWriter(1);

    DownLoadTask down=new DownLoadTask(list,filewriter);

    down.run();

    filewriter.close();

    }

    }

    相关文章

      网友评论

          本文标题:java 并发编程精华一页纸

          本文链接:https://www.haomeiwen.com/subject/ypfpattx.html