美文网首页
java之JUC

java之JUC

作者: 任嘉平生愿 | 来源:发表于2019-11-08 14:15 被阅读0次
CountDownLatch
package com.example.concurrent.juc;

import java.util.concurrent.CountDownLatch;

/**
 * 相当于一个终点线
 */
public class CountDownLatchDemo implements Runnable {

    private CountDownLatch begin;
    //观察end的count为0时,会使当前线程退出阻塞
    private CountDownLatch end;
    //用来模拟子线程执行的任务
    private int index = 10;

    public CountDownLatchDemo(CountDownLatch begin, CountDownLatch end) {
        this.begin = begin;
        this.end = end;
    }

    //这里要使用同步synchronized
    synchronized  public void run(){
        try {
            begin.await();
            index--;
            System.out.println(Thread.currentThread().getName() + ":执行完任务了,  index: " + index);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            end.countDown();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch begin = new CountDownLatch(1);
        CountDownLatch end = new CountDownLatch(10);
        CountDownLatchDemo cdlDemo = new CountDownLatchDemo(begin, end);
        System.out.println("所有线程开始执行任务....");
        for (int i = 0; i < 10; i++) {
            new Thread(cdlDemo).start();
        }
        //开始让所有线程工作
        begin.countDown();
        //
        end.await();
        System.out.println("所有线程完成任务,可以开始做其它想做的事 了....");
    }
}

CyclicBarrier
package com.example.concurrent.juc;

import java.util.concurrent.CyclicBarrier;

/**
 * //相当于一个中间等待线
 */
public class CyclicBarrierDemo implements Runnable {

    private CyclicBarrier cyclicBarrier;
    private int index ;

    public CyclicBarrierDemo(CyclicBarrier cyclicBarrier, int index) {
        this.cyclicBarrier = cyclicBarrier;
        this.index = index;
    }

    @Override
    public void run() {
        try {
            System.out.println("index: " + index);
            cyclicBarrier.await();
            System.out.println(index+"执行");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(11);
        for (int i = 0; i < 10; i++) {
            new Thread(new CyclicBarrierDemo(cyclicBarrier, i)).start();
        }
        cyclicBarrier.await();
        System.out.println("全部到达屏障....");
    }
}
Exchanger
package com.example.concurrent.juc;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Exchanger;

/**
 * 当一个线程运行到exchange()方法时会阻塞,另一个线程运行到exchange()时,二者交换数据,然后执行后面的程序。
 */
public class ExchangerDemo implements Runnable {

    private Exchanger<Map<String, String>> exchanger;
    private Map<String, String> info;

    public ExchangerDemo (Exchanger<Map<String, String>> exchanger, Map<String, String> info) {
        this.exchanger = exchanger;
        this.info = info;
    }

    @Override
    public void run() {
        try {
            System.out.println(Thread.currentThread().getName() + "交换前的值是:" + info.toString() );
            Map<String, String> result = exchanger.exchange(info);
            System.out.println(Thread.currentThread().getName() + "交换后的值是:" + result.toString() );
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Map<String, String> aMap = new HashMap<String, String>();
        Map<String, String> bMap = new HashMap<String, String>();
        aMap.put("a", "aaa");
        aMap.put("b", "bbb");
        aMap.put("c", "ccc");
        bMap.put("1", "111");
        bMap.put("2", "222");
        bMap.put("3", "333");
        Exchanger<Map<String, String>> exchanger = new Exchanger<Map<String, String>>();
        new Thread(new ExchangerDemo(exchanger, aMap)).start();
        new Thread(new ExchangerDemo(exchanger, bMap)).start();
    }
}

Future
package com.example.concurrent.juc;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;


/**
 *异步线程返回值
 */
public class FutureDemo {

    public static void main(String[] args) throws Exception {
        Future<Integer> task = new FutureTask<Integer>(new Callable<Integer>() {
            @Override
            public Integer call() {
                return 1+2+3+4;
            }
        });

        new Thread((Runnable) task).start();
        System.out.println("start.........");
        TimeUnit.SECONDS.sleep(2);
        System.out.println("result: " + task.get());
    }
}


Semaphore
package com.example.concurrent.juc;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;


/**
 *可以控制线程执行的数量
 * 线程执行前用acquire()方法获得信号,执行完通过release()方法归还信号量。如果可用信号为0,acquire就会造成阻塞,等待release释放信号。作用就是只让指定个数的线程(随机选择或先来后到)并行。
 */
public class SemaphoreDemo implements Runnable {

    private Semaphore sema;

    public SemaphoreDemo (Semaphore semaphore) {
        this.sema = semaphore;
    }

    public void run() {
        try {
            sema.acquire();
            TimeUnit.SECONDS.sleep(2);
            System.out.println(Thread.currentThread().getName() + ": 执行完: " + System.currentTimeMillis());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            sema.release();
        }
    }

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);
        SemaphoreDemo semaphoreDemo = new SemaphoreDemo(semaphore);
        for (int i = 0; i < 10; i++) {
            new Thread(semaphoreDemo).start();
        }
    }
}

AtomicInteger
package com.example.concurrent.juc;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 保证原子性
 */
public class AtomicDemo {

    private static int CLIENT_COUNT = 5000;
    private static AtomicInteger count = new AtomicInteger(0);
    private static ExecutorService executorService =  new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
    private final static CountDownLatch countDownLatch = new CountDownLatch(CLIENT_COUNT);

    public static void main(String[] args) throws Exception {
        testAtomicInteger();
    }

    private static void testAtomicInteger() throws Exception {
        for (int i = 0; i < CLIENT_COUNT; i++) {
            executorService.execute(() -> {
                    add();
                // count每加 1,进行减 1 计数
                countDownLatch.countDown();
            });
        }
        // 等待线程池所有任务执行结束
        countDownLatch.await();
        executorService.shutdown();
        System.out.println("ConcurrencyDemo:" + count);
    }

    private static void add() {
        count.incrementAndGet();
    }
}

相关文章

网友评论

      本文标题:java之JUC

      本文链接:https://www.haomeiwen.com/subject/mxnibctx.html