美文网首页
【JAVA】java.util.concurrent包

【JAVA】java.util.concurrent包

作者: Y了个J | 来源:发表于2019-03-24 13:30 被阅读0次
使用线程安全的类

StringBuilder -> StringBuffer

SimpleDateFormat -> JodaTime

ArrayList -> Vector, Stack, CopyOnWriteArrayList

HashSet -> Collections.synchronizedSet(new HashSet()), CopyOnWriteArraySet

TreeSet -> Collections.synchronizedSortedSet(new TreeSet()), ConcurrentSkipListSet

HashMap -> HashTable, ConcurrentHashMap, Collections.synchronizedMap(new HashMap())

TreeMap -> ConcurrentSkipListMap, Collections.synchronizedSortedMap(new TreeMap())

JUC中有非常多的类,将部分类按功能进行分类,分别是:
原子atomic包
比synchronized功能更强大的lock包
线程调度管理工具
线程安全与并发工具集合
线程池
AbstractQueuedSynchronizer,即队列同步器。它是构建锁或者其他同步组件的基础框架,它是JUC并发包中的核心基础组件。JUC大大提高了Java的并发能力,AQS是JUC的核心。
屏幕快照 2019-03-24 下午1.06.53.png
  • 同步队列:AQS通过内置的FIFO同步队列来完成资源获取线程的排队工作,如果当前线程获取同步状态失败(锁)时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。
  • 继承实现:AQS的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法acquire/release来管理同步状态。
  • 同步状态维护:AQS使用一个int类型的成员变量state来表示同步状态,当state > 0时表示已经获取了锁,当state = 0时表示释放了锁。它提供了三个方法getState()、setState(int newState)、compareAndSetState(int expect,int update)来对同步状态state进行操作,当然AQS可以确保对state的操作是安全的。
CountDownLatch

计数器闭锁是一个能阻塞主线程,让其他线程满足特定条件下主线程再继续执行的线程同步工具。

public class CountDownLatchTest {

    private static final int COUNT = 1000;

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        CountDownLatch countDownLatch = new CountDownLatch(COUNT);
        for (int i = 0; i < COUNT; i++) { //countDown方法的执行次数一定要与countDownLatch的计数器数量一致,否则无法将计数器清空导致主线程无法继续执行
            int finalI = i;
            executorService.execute(() -> {
                try {
                    Thread.sleep(3000);
                    System.out.println(finalI);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await(1, TimeUnit.SECONDS); //主线程只等1秒,超过之后继续执行主线程
        executorService.shutdown(); //当正在执行的线程执行完成之后再关闭而不是立即停止线程
        System.out.println("done!");
    }
}

这段程序先设置CountDownLatch为100,然后在其他线程中调用100次countDown方法,随后主程序在等待100次被执行完成之后,继续执行主线程代码

Semaphore

信号量是一个能阻塞线程且能控制统一时间请求的并发量的工具。比如能保证同时执行的线程最多200个,模拟出稳定的并发量。

public class CountDownLatchTest {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Semaphore semaphore = new Semaphore(3); //配置只能发布3个运行许可证
        for (int i = 0; i < 100; i++) {
            int finalI = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire(3); //获取3个运行许可,如果获取不到会一直等待,使用tryAcquire则不会等待
                    Thread.sleep(1000);
                    System.out.println(finalI);
                    semaphore.release(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown();
    }
}

由于同时获取3个许可,所以有即使开启了100个线程,但是每秒只能执行一个任务

原理
new Semaphore(3)传入的3就是AQS中state的值,也是许可数的总数,在调用acquire时,检测此时许可数如果小于0,就将被阻塞,然后将线程构建Node进入AQS队列

使用场景
数据库连接并发数,如果超过并发数,等待(acqiure)或者抛出异常(tryAcquire)

CyclicBarrier

可以让一组线程相互等待,当每个线程都准备好之后,所有线程才继续执行的工具类

public class CyclicBarrierTest {
    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
        System.out.println("ready done callback!");
    });

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 100; i++) {
            int finalI = i;
            Thread.sleep(1000);
            executorService.execute(() -> {
                try {
                    System.out.println(finalI + "ready!");
                    cyclicBarrier.await();
//                    cyclicBarrier.await(2000, TimeUnit.MILLISECONDS); // 如果某个线程等待超过2秒就报错
                    System.out.println(finalI + "go!");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });

        }
    }
}

原理
与CountDownLatch类似,都是通过计数器实现的,当某个线程调用await之后,计数器减1,当计数器大于0时将等待的线程包装成AQS的Node放入等待队列中,当计数器为0时将等待队列中的Node拿出来执行。

与CountDownLatch的区别:
CountDownLatch是一个线程等其他线程,CyclicBarrier是多个线程相互等待
CyclicBarrier的计数器能重复使用,调用多次

使用场景
1.CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个Excel保存了用户所有银行流水,每个Sheet保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。

2.有四个游戏玩家玩游戏,游戏有三个关卡,每个关卡必须要所有玩家都到达后才能允许通过。其实这个场景里的玩家中如果有玩家A先到了关卡1,他必须等到其他所有玩家都到达关卡1时才能通过,也就是说线程之间需要相互等待。

ReentrantLock

名为可重入锁,其实synchronized也可重入,是JDK层级上的一个并发控制工具

原理

// 以公平锁为例,从lock.lock()开始研究
final void lock() { acquire(1);}

public final void acquire(int arg) {
    if (!tryAcquire(arg) && // 首先通过公平或者非公平方式尝试获取锁
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 然后构建一个Node放入队列中并等待执行的时机
        selfInterrupt();
}

// 公平锁设置锁执行状态的逻辑
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) { //如果state是0,就是当前的锁没有人占有
        if (!hasQueuedPredecessors() && // 公平锁的核心逻辑,判断队列是否有排在前面的线程在等待锁,非公平锁就没这个条件判断
            compareAndSetState(0, acquires)) { // 如果队列没有前面的线程,使用CAS的方式修改state
            setExclusiveOwnerThread(current); // 将线程记录为独占锁的线程
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) { // 因为ReentrantLock是可重入的,线程可以不停地lock来增加state的值,对应地需要unlock来解锁,直到state为零
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

// 接下来要执行的acquireQueued如下
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) { // 再次使用公平锁逻辑判断是否将Node作为头结点立即执行
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

ReentrantLock与synchronized的区别
1.用法。synchronized既可以很方便的加在方法上,也可以加载特定代码块上,而lock需要显示地指定起始位置和终止位置。
2.实现。synchronized是依赖于JVM实现的,而ReentrantLock是JDK实现的
3.性能。synchronized和lock其实已经相差无几,其底层实现已经差不多了。但是如果你是Android开发者,使用synchronized还是需要考虑其性能差距的。
4.功能。ReentrantLock功能更强大。
ReentrantLock可以指定是公平锁还是非公平锁,而synchronized只能是非公平锁,所谓的公平锁就是先等待的线程先获得锁。
ReentrantLock提供了一个Condition(条件)类,用来实现分组唤醒需要唤醒的线程们,而不是像synchronized要么随机唤醒一个线程要么唤醒全部线程
ReentrantLock提供了一种能够中断等待锁的线程的机制,通过lock.lockInterruptibly()来实现这个机制

我们控制线程同步的时候,优先考虑synchronized,如果有特殊需要,再进一步优化。ReentrantLock如果用的不好,不仅不能提高性能,还可能带来灾难。

Condition

条件对象的意义在于对于一个已经获取锁的线程,如果还需要等待其他条件才能继续执行的情况下,才会使用Condition条件对象。与ReentrantLock结合使用,类似wait与notify。

public class ConditionTest {

    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        Thread thread1 = new Thread(() -> {
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + " run");
                System.out.println(Thread.currentThread().getName() + " wait for condition");
                try {
                    condition.await(); // 1.将线程1放入到Condition队列中等待被唤醒,且立即释放锁
                    System.out.println(Thread.currentThread().getName() + " continue"); // 3.线程2执行完毕释放锁,此时线程1已经在AQS等待队列中,则立即执行
                } catch (InterruptedException e) {
                    System.err.println(Thread.currentThread().getName() + " interrupted");
                    Thread.currentThread().interrupt();
                }
            } finally {
                lock.unlock();
            }
        });
        Thread thread2 = new Thread(() -> {
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + " run");
                System.out.println(Thread.currentThread().getName() + " sleep 1 secs");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    System.err.println(Thread.currentThread().getName() + " interrupted");
                    Thread.currentThread().interrupt();
                }
                condition.signalAll(); // 2.线程2获得锁,signalAll将Condition中的等待队列全部取出并加入到AQS中
            } finally {
                lock.unlock();
            }
        });
        thread1.start();
        thread2.start();
    }
}
Future、FutureTask、CompletableFuture 、Callable与Runnable
public class FutureTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Future<String> future = executorService.submit(() -> {
            try {
                System.out.println("doing");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "done";
        });
        System.out.println(future.get());
    }
}
public class FutureTaskTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        FutureTask<String> futureTask = new FutureTask<>(() -> {
            System.out.println("doing");
            Thread.sleep(1000);
            return "down";
        });
        executorService.submit(futureTask);
//        new Thread(futureTask).start();
        System.out.println(futureTask.get());
        executorService.shutdown();
    }
}
public class CompletableFutureTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> string1Future = CompletableFuture.supplyAsync(() -> {
            System.out.println("doing string1");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("done string1");
            return "string1";
        });
        CompletableFuture<String> string2Future = CompletableFuture.supplyAsync(() -> {
            System.out.println("doing string2");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("done string2");
            return "string2";
        });
        CompletableFuture.allOf(string1Future, string2Future).join();
        System.out.println(string1Future.get() + "and" + string2Future.get());
    }
}

CompletableFuture

BlockingQueue

https://wsmajunfeng.iteye.com/blog/1629354

Fork Join框架

fork join框架是JDK7中出现的一款高效的工具,Java开发人员可以通过它充分利用现代服务器上的多处理器。它是专门为了那些可以递归划分成许多子模块设计的,目的是将所有可用的处理能力用来提升程序的性能。fork join框架一个巨大的优势是它使用了工作窃取算法,可以完成更多任务的工作线程可以从其它线程中窃取任务来执行
但这样会要额外地对任务分派线程进行管理,无形地会增加管理的难度和复杂度,还可能碰到资源竞争导致的同步操作与性能损耗

相关文章

网友评论

      本文标题:【JAVA】java.util.concurrent包

      本文链接:https://www.haomeiwen.com/subject/efcuvqtx.html