Java并发核心

作者: 依弗布德甘 | 来源:发表于2020-01-21 21:48 被阅读0次

Semaphore

Semaphore是一个计数信号量,常用于限制客访问某些资源的线程数目,相当于一种用来控制并发量的共享锁

  • 用于多个共享资源互斥使用
  • 用于控制并发线程数
Semaphore的简单使用
import java.util.concurrent.Semaphore;

public class Demo {
    // 创建只能同时有5个线程的信道
    static Semaphoresp = new Semaphore(5);

    public static void main(String args[]){
        for (int i=0; i<1000; i++){
            new Thread(){
                @Override
                public void run() {
                    try {
                        sp.acquire();       //抢信号量、就是在加锁
                        Thread.sleep(2000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    queryDB("localhost:3006");
                    sp.release();       //释放信号量,就是解锁
                }
            }.start();
        }
    }

    // 模拟操作DB
    public static void queryDB(String url){
        System.out.println("query " + url);
    }
}
使用AQS实现Semaphore
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class DemoSemaphore {

    private Sync sync;

    public DemoSemaphore(int permits){
        this.sync = new Sync(permits);
    }

    //获取信号量
    public void acquire(){
        sync.acquireShared(1);
    }

    //释放信号量
    public void release(){
        sync.releaseShared(1);
    }

    //AQS里面有很多没有实现的方法,要使用AQS,一个创建AQS的实例,并且重写方法
    class Sync extends AbstractQueuedSynchronizer {

        private int permits;

        public Sync(int permits){
            this.permits = permits;
        }

        //这里不需要考虑入队列、出队列,  这些都是不带try的方法中实现了,作为了公共的业务逻辑
        @Override
        protected int tryAcquireShared(int arg) {
            //获取锁的线程,最多不能超过n个
            int c = getState(); //state此处表示信号量获取的个数
            int nextc = c + arg;    //arg一般是1

            if (nextc <= permits){
                if (compareAndSetState(c, nextc))
                    return 1;
            }
            return -1;
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            for (;;){
                int c = getState();
                if (c == 0) return false;

                int nextc = c - arg;
                if (compareAndSetState(c, nextc)){
                    return true;
                }
            }
        }
        //同样是失败,release的时候,要自旋,而acquire却没有
    }
}

CountDownLatch

CountDownLatch是一个倒计数器同步工具类,用来协调多个线程之间的同步

  • 让一些线程阻塞直到另一些线程完成一系列操作后才唤醒。
  • 通过调用await方法让线程进入阻塞状态等待倒计时0时唤醒。
  • 通过线程调用countDown方法让倒计时中的计数器减去1,当计数器为0时,会唤醒哪些因为调用了await而阻塞的线程。
CountDownLatch简单使用
import java.util.Random;
import java.util.concurrent.CountDownLatch;

public class Demo {

    /*
    火箭起飞前,有很多检查需要做,每项检查需要的时间不同,
    完成全部10项检查后,火箭才能点火
     */
    public static void test01() throws InterruptedException {
        CountDownLatch ctl = new CountDownLatch(10);

        //任务在异步的执行
        for (int i=0; i<10; i++){
            int number = i;
            new Thread(){
                @Override
                public void run() {
                    int randomInt = new Random().nextInt(10);
                    try {
                        Thread.sleep(randomInt * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    System.out.println(">>>>" + number);
                    //当任务执行完,将count-1
                    ctl.countDown();
                }
            }.start();
        }

        //通过await来阻塞住
        System.out.println("主线程开始等待。。。");
        ctl.await();
        System.out.println("点火...");
    }

    /*
    预备,跑!!!
     */
    public static void test02() throws InterruptedException {
        CountDownLatch ctl = new CountDownLatch(1);

        for (int i=0; i<6; i++){
            int number = i;
            new Thread(){
                @Override
                public void run() {
                    System.out.println(number + " is redy...");
                    ctl.await();
                    System.out.println(String.format("运动员%d起跑", number));
                }
            }.start();
        }

        System.out.println("预备");
        Thread.sleep(3000);
        ctl.countDown();
        System.out.println("跑!!!");
    }

    public static void main(String args[]) throws InterruptedException {
        test02();
    }
}
使用AQS实现CountDownLatch
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class  DemoCountDownLatch {

    private Sync sync;

    public DemoCountDownLatch(int count){
        this.sync = new Sync(count);
    }

    //释放共享锁
    //state初始值为count,只有在state减为0的时候,才能释放锁成功
    public void countDown(){
        sync.releaseShared(1);
    }

    //获取共享,只有在state=0时,才能获取锁成功
    public void await(){
        sync.acquireShared(1);
    }

    class Sync extends AbstractQueuedSynchronizer{
        public Sync(int count){
            setState(count);    //state用来记录倒计数
        }

        @Override
        protected int tryAcquireShared(int arg) {
            return getState() == 0 ? 1 : -1;
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            for (;;){
                int c = getState();

                if (c == 0)
                    return false;

                int nextc = c - arg;
                //减1需要使用CAS操作
                if (compareAndSetState(c, nextc)){
                    return nextc == 0;
                }
            }
        }
    }
}

CyclicBarrier

CyclicBarrier循环栅栏,底层是通过ReentrantLock以及Condition中的await和signal实现

  • 让线程到达一个屏障时被阻塞,当达到屏障数量后,线程才会继续执行
  • 它通过调用await方法让线程进入屏障
CyclicBarrier简单使用
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.locks.LockSupport;

public class Demo {
    public static void main(String args[]){
        // 按4个一组执行
        CyclicBarrier barrier  = new CyclicBarrier(4);

        //传入一个Runnable,打印栅栏
        for (int i=0; i< 100; i++){
            new Thread(){
                @Override
                public void run() {
                    barrier.await();    //
                    System.out.println("上到摩天轮...");
                }
            }.start();
            LockSupport.parkNanos(1000 * 1000 * 1000L);
        }
    }
}
使用ReentrantLock以及Condition实现CyclicBarrier
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class DemoCyclicBarrier {

    //condition实现
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();

    //一个批次的大小
    private final int parties;

    //记录当前一轮有多少个线程等待
    private int count = 0;

    //全局年代
    private Object generation = new Object();

    public DemoCyclicBarrier(int parties){
        if (parties <=0)
            throw new IllegalArgumentException();
        this.parties = parties;
    }

    //进入下一轮等待,叫做进入下一个 年代
    public void nextGeneration(){
        count = 0;
        generation = new Object();
        condition.signalAll();
    }

    public void await(){
        lock.lock();
        try {
            Object myGeneration = generation;

            int index = ++count;
            //若当前一轮,集满
            if (index == parties){
                //进入下一轮 : count =0, 唤醒所有线程
                nextGeneration();
                return;
            }

            for (;;){
                //没有集满,挂起线程
                try {
                    condition.await();   //await方法用pak来实现的
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //什么时候应该让线程结束等待???
                if (myGeneration != generation)
                    return;
            }
        }finally {
            lock.unlock();
        }
    }
}

fork join框架

  • ForkJoinPool是ExecutorService的实现类,是一种特殊的线程池
  • 主要是对一个任务的拆分与合并
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class Demo {

    //  有一堆大文件,每一行放的是一个URL,你需要进行HTTP
    // 用线程池,任务的拆分,很多文件,逻辑非常零散,跨多个线程
    static ArrayList<String> urls = new ArrayList<String>(){
        {
            add("http://www.baidu.com");
            add("http://www.sina.com");
            // ....
        }
    };
    
    static ForkJoinPool forkJoinPool = new ForkJoinPool(3,
            ForkJoinPool.defaultForkJoinWorkerThreadFactory,
            null,
            true);

    //模拟网络请求
    public static String doRequest(String url){ 
        return "Kody ... test ... " + url + "\n";
    }

    public static void main(String args[]) throws ExecutionException, InterruptedException {
        Job job = new Job(urls, 0, urls.size());
        ForkJoinTask<String> forkJoinTask =  forkJoinPool.submit(job); 
        String result = forkJoinTask.get();
        System.out.println(result);
    }

    //使用ForkJoin最核心的内容,就是定义 递归任务,
    //定义递归任务,即定义如何对Task进行拆分,对结果进行汇总
    //定义就放在compute方法中
    static class Job extends RecursiveTask<String>{

        List<String> urls;
        int start;
        int end;

        public Job(List<String> urls, int start, int end){
            this.urls = urls;
            this.start = start;
            this.end = end;
        } 

        @Override
        protected String compute() {
            int count = end - start;        //计算任务大小 
            //若任务比较小,就直接执行,  // 10
            if (count <=10){
                String result = "";
                for (int i = start; i< end; i++){
                    String response = doRequest(urls.get(i));
                    result += response;
                }
                return result;
            }else{
                //否则,拆分任务
                int x = (start + end) / 2;
                Job job1 = new Job(urls, start, x);
                job1.fork();
                
                Job job2 = new Job(urls, x, end);
                job2.fork();
                
                //汇总结果
                String result = "";
                result += job1.join();
                result +=job2.join();
                return result;
            }
        }
    }
}

Future、FutureTask

Future只是一个接口,FutureTask是实现了RunnableFuture

  • Future呈现的是异步计算的结果。
  • Future中的方法提供了检查计算是否已经完成,并且等待计算的结果,还能够重新获取计算记得结果。
  • 已经完成的时候只能使用get()方法获取结果,如果有需要的话,可以一直阻塞等待结果,直到结果已经准备好了
Callable

有两种创建线程的方法-一种是通过创建Thread类,另一种是通过使用Runnable创建线程。但是,Runnable缺少的一项功能是,当线程终止时(即run()完成时),我们无法使线程返回结果。为了支持此功能,Java中提供了Callable接口

  • 为了实现Runnable,需要实现不返回任何内容的run()方法,而对于Callable,需要实现在完成时返回结果的call()方法。请注意,不能使用Callable创建线程,只能使用Runnable创建线程
  • 另一个区别是call()方法可以引发异常,而run()则不能
  • 为实现Callable而必须重写call方法

FutureTask 与 Callable 简单使用

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask; 

public class Demo {
    public static void main(String args[]) throws InterruptedException, ExecutionException {
        //使用:用来包裹一个callab实例,得到的futureTask实例可以传入Thread()
        CallableTask task = new CallableTask();
        FutureTask<String> future = new FutureTask<>(task);

        new Thread(future).start();

        String result =  future.get();      //get方法会阻塞
        System.out.println(result);
 
        //一个futureTask实例,只能使用一次
        //同时说明,这个任务,从头到尾只会被一个线程执行
        new Thread(future).start();
 
    }
}
 
class CallableTask implements Callable<String>{
    @Override
    public String call() throws Exception {
        System.out.println(">>>执行任务。。。");

        //模拟耗时
        LockSupport.parkNanos(1000 * 1000 *1000 * 5L);
        return "success";
    }
} 
模拟FutureTask原理
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

public class DemoFutureTask<T> implements Runnable{

    public DemoFutureTask(Callable<T> call){
        this.call = call;
    }

    private Callable<T> call;

    T result;

    //Runner,用来实现抢执行的权限
    AtomicReference<Thread> runner = new AtomicReference<>();

    //等待队列
    LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();

    //任务状态
    private volatile int state = NEW;

    private static final int NEW = 0;
    private static final int RUNNING = 1;
    private static final int FINISHED = 2;


    @Override
    public void run() {
        if (state != NEW ||
                !runner.compareAndSet(null, Thread.currentThread())){
            return;
        }

        state = RUNNING;
        try {
            result = call.call();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            state = FINISHED;
        }

        while (true){
            Thread th = waiters.poll();
            if (th == null){
                break;
            }
            LockSupport.unpark(th);
        }
    }

    public T get(){
        if (state != FINISHED){
            waiters.offer(Thread.currentThread());
        }

        //挂起线程
        while (state!=FINISHED){
            LockSupport.park();
        }
        return result;
    }
}

相关文章

  • 双11Java程序员书单推荐

    Java 《Java核心技术卷I》 《Java核心技术卷II》 《Java编程思想》 《Java并发编程实战》 《...

  • 记录一些书籍

    JAVA 基础 《Java核心技术·卷1:基础知识》《Java核心技术 卷2:高级特性》《Java8 实战》 并发...

  • 线程

    Java 并发编程:线程池的使用 Java 并发编程:线程池的使用java 多线程核心技术梳理 (附源码) 本文对...

  • Java并发编程基础之并发包源码剖析(书籍目录)

    并发编程是Java编程的核心领域,而Java并发包则凝聚了并发编程的精华,掌握并发编程基础,熟练应用,理解思想则显...

  • 有追求的程序员书单

    Java经典进阶书籍 Effective Java Java编程思想 Java并发编程实战 Java核心技术卷一 ...

  • Java并发核心

    Semaphore Semaphore是一个计数信号量,常用于限制客访问某些资源的线程数目,相当于一种用来控制并发...

  • Java学习资源收集

    Java基础核心 Java NIO Java NIO系列教程(并发编程网) 攻破JAVA NIO技术壁垒 Java...

  • Java并发基础——核心篇

    Java并发基础之核心篇 上一期介绍了基础理论,让大家对并发有了基本的概念,这期也是并发理论的核心内容。 核心概念...

  • Java面试题总结分析+自学笔记分享(附带学习脑图)

    JAVA核心知识整理 JVM,JAVA集合,网络,JAVA多线程并发,JAVA基础,Spring原理,微服务,Zo...

  • java并发

    1.并发编程中的锁 并发编程中的各种锁java高并发锁的3种实现Java并发机制及锁的实现原理 2.线程池核心线程...

网友评论

    本文标题:Java并发核心

    本文链接:https://www.haomeiwen.com/subject/zqpezctx.html