美文网首页
多线程之并发工具类

多线程之并发工具类

作者: OPice | 来源:发表于2019-09-23 13:06 被阅读0次

CountDownLatch

在开发过程中经常会碰到一个任务需要开启多个线程,然后将多个线程的执行结果汇总。比如说查询全量数据,考虑数据量的问题,我们基本上会做分页,这时候就需要多次循环调用。CountDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。
执行原理
CountDownLatch内部实现了AQS,初始化CountDownLatch的时候,会调用Sync的构造方法将count赋值给state变量。多个线程调用countDown的时候,是使用CAS递减state的值;调用await方法的线程会被放在AQS阻塞队列中,等待计数器为0时,唤醒该线程。
核心方法

//构造方法,初始化计数器为count
 public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        //Sync实现了AQS
        this.sync = new Sync(count);
    }
//调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
//挂起timeout,超过这个时间继续执行
 public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
//将count -1
public void countDown() {
        sync.releaseShared(1);
    }
//获取当前count值
 public long getCount() {
        return sync.getCount();
    }

走个栗子

public class CountDownLatchDemo implements Runnable{
    public static CountDownLatch countDownLatch = new CountDownLatch(10);
    private Integer count;

    public CountDownLatchDemo( Integer count) {
        this.count = count;
    }

    @Override
    public void run() {
        try {
            System.out.println("分页查询:"+count*100+"~"+(count+1)*100);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            countDownLatch.countDown();
        }
    }

    public static void main(String[] args) throws InterruptedException{
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

        for (int i = 0; i < 10; i++) {
            CountDownLatchDemo countDownLatchDemo = new CountDownLatchDemo(i);
            threadPoolExecutor.submit(countDownLatchDemo);
        }
        countDownLatch.await();
        System.out.println("分页全量数据查询完毕");
    }
}

CyclicBarrier

等待所有线程达到一个屏障时在执行.
原理
CyclicBarrier是由ReentrantLock可重入锁和Condition共同实现的。
核心方法

    /** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    private final Condition trip = lock.newCondition();
    /** The number of parties */
    private final int parties;
    /* The command to run when tripped */
    private final Runnable barrierCommand;
    /** The current generation */
    private Generation generation = new Generation();
  
//定义一个CyclicBarrier,parties 代表拦截的线程数量
 public CyclicBarrier(int parties) {
        this(parties, null);
    }

//定义一个CyclicBarrier,parties 代表拦截的线程数量,由最后一个线程执行barrierAction
public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

//调用
public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

//返回当前在屏障处等待的设备
public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }

//屏障是否可用
public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }
//返回参与屏障的数量
public int getParties() {
        return parties;
    }

跑个栗子

@Slf4j
public class CyclicBarrierTest implements Runnable {
    private CyclicBarrier barrier;

    private String name;
    private Long time;

    public CyclicBarrierTest(CyclicBarrier barrier, String name, Long time) {
        this.barrier = barrier;
        this.name = name;
        this.time = time;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(time * 1000);
            System.out.println(name + "到了");
            barrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

@Slf4j
public class MainThread implements Runnable {
    @Override
    public void run() {
        System.out.println("人到齐了,开饭.....");
    }

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new MainThread());
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        executorService.execute(new CyclicBarrierTest(cyclicBarrier,"小花",2L));
        executorService.execute(new CyclicBarrierTest(cyclicBarrier,"小红",1L));
        executorService.execute(new CyclicBarrierTest(cyclicBarrier,"小明",5L));

    }
}
cyclicbarriertest.gif

其他例子

public class CyclicBarrierDemo implements Runnable {
    CyclicBarrier cyclicBarrier;

    private Integer count;

    public CyclicBarrierDemo(CyclicBarrier cyclicBarrier, Integer count) {
        this.cyclicBarrier = cyclicBarrier;
        this.count = count;
    }

    @Override
    public void run() {
        try {
            System.out.println("accept iot device run data:" + count);
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(10, new Thread() {
            @Override
            public void run() {
                System.out.println("run data count" + 10 + ",开始执行。。。。");
            }
        });

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
        for (int i = 0; i < 10; i++) {
            CyclicBarrierDemo cyclicBarrierDemo = new CyclicBarrierDemo(cyclicBarrier,i);
            threadPoolExecutor.submit(cyclicBarrierDemo);
        }
    }
}

Semaphore

Semaphore 信号量,控制并发时线程的数量。

跑个栗子

//健身房有好几个跑步机
public class FitnessRoom {

    class TreadMill {
        private Integer num;

        public Integer getNum() {
            return num;
        }

        public void setNum(Integer num) {
            this.num = num;
        }

        public TreadMill(Integer num) {
            this.num = num;
        }
    }

    private TreadMill[] treadMills = new TreadMill[]{new TreadMill(1), new TreadMill(2), new TreadMill(3), new TreadMill(4)};
    private boolean[] use = new boolean[4];

    Semaphore semaphore = new Semaphore(4, true);

    //获取一个跑步机
    public TreadMill get() throws InterruptedException {
        semaphore.acquire(1);
        return getAvailable();
    }

    // 遍历找一个没人的跑步机
    public TreadMill getAvailable() {
        for (int i = 0; i < use.length; i++) {
            if (!use[i]) {
                use[i] = true;
                return treadMills[i];
            }
        }
        return null;
    }

    /**
     * 释放跑步机
     * @param treadMill
     */
    public void release(TreadMill treadMill) {
        for (int i = 0; i < use.length; i++) {
            if (treadMills[i] == treadMill) {
                if (use[i]) {
                    use[i] = false;
                }
            }

        }
    }
}

public class MainThread implements Runnable {

    private String name;
    private FitnessRoom fitnessRoom;

    public MainThread(String name, FitnessRoom fitnessRoom) {
        this.name = name;
        this.fitnessRoom = fitnessRoom;
    }

    @Override
    public void run() {
        try {
            FitnessRoom.TreadMill treadMill = fitnessRoom.get();
            if (null != treadMill) {
                System.out.println(name + "在" + treadMill.getNum() + "号跑步机上跑步");
                TimeUnit.SECONDS.sleep(2);
                System.out.println(name + "跑完了");
                //跑步机腾出来了
                fitnessRoom.release(treadMill);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
        }
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        FitnessRoom fitnessRoom = new FitnessRoom();
        for (int i = 0; i < 20; i++) {
            executorService.execute(new MainThread(i + "", fitnessRoom));
        }
    }
}
semaphore.gif

Exchanger

两个线程之间交互数据工具类

原子操作类

AtomicBoolean
AtomicInteger
AtomicLong
AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray
AtomicReference
AtomicReferenceFieldUpdater
AtomicMarkableReference
AtomicIntegerFieldUpdater
AtomicLongFieldUpdater
AtomicStampedFieldUpdater
AtomicReferenceFieldUpdater

集合工具类

ConcurrentHashMap
ConcurrentSkipListMap
CopyOnWriteArrayList
CopyOnWriteArraySet

相关文章

网友评论

      本文标题:多线程之并发工具类

      本文链接:https://www.haomeiwen.com/subject/arwfjctx.html