美文网首页
java并发学习

java并发学习

作者: YaM丶 | 来源:发表于2019-05-18 22:45 被阅读0次

java 高并发

基本概念

  • 并发: 多个线程操作相同的资源,保证线程安全,合理使用资源
  • 高并发: 服务能同时处理很多请求,提升程序性能

并发编程的基础

cpu多级缓存

cpu频率太快了,快到主存跟不上这样在处理器时钟周期内,cpu常常需要等待主存,浪费资源。
所以cache的出现,是为了缓解cpu和内存之间的速度不匹配的问题。

cpu多级缓存的意义

  • 时间局部性: 如果某个数据被访问,那么在不久的将来,该数据很可能再次被访问。
  • 空间局部性: 如果某个数据被访问,那么与它相邻的数据很快也可能也被访问。

缓存一致性(MESI)

用于保证多个cpu cache之间的缓存数据的一致性

  • M: modified 被修改
    • 代表该缓存行只被缓存在cpu的缓存中,并且是被修改过的,因此其与主存数据不一致,需要被写回主存。
    • 当写回主存之后,状态变为E
  • E: exclusive 独享
    • 只被缓存在某cpu中,与主存数据一致,该状态的缓存可在任何时候被其他缓存访问时变为共享状态S,当被修改时变为M状态
  • S: shared 共享
    • 该状态标识该缓存行可能被多个cpu进行缓存,且各缓存中数据与主存是一致的,当有一个cpu修改该缓存的时候,其他cpu的该缓存可以被作废,变为I状态
  • I: invalid 无效
    • 代表该缓存行无效

乱序执行优化

  • 处理其为提高运算速度而做出违背代码原有顺序的优化

juc 之 并发容器

  • ArrayList -> CopyOnWriteArrayList
  • HashSet, TreeSet -> CopyOnWriteArraySet, ConcurrentSkipListSet
  • HashMap, TreeMap -> ConcurrentHashMap, ConcurrentSkipListMap

ConcurrentHashMap vs ConcurrentSkipListMap

ConcurrentSkipListMap key是有序的,支持更高的并发

juc 之 aqs

aqs介绍

AbstractQueuedSynchronizer - AQS

  • 使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架
  • 利用了一个int类型表示状态
  • 使用方法是继承
  • 子类通过继承并通过实现它的方法管理其状态(acquire,release)的方法操纵状态
  • 可以同时实现排他锁和共享锁模式(独占、共享)

aqs 同步组件

  • CountDownLatch
  • Semaphore
  • CyclicBarrier
  • ReentrantLock
  • Condition
  • FutureTask

CountDownLatch

@Slf4j
public class Demo001CountDownLatch {

    private final static int count = 200;

    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();

        final CountDownLatch latch = new CountDownLatch(count);

        for (int i = 0; i < count; i++) {
            final int threadNum = i;
            exec.execute(()->{
                try {
                    test(threadNum);
                } catch (InterruptedException e) {
                    log.error("exception",e);
                } finally {
                    latch.countDown();
                }
            });
        }
        latch.await();
        log.info("finish");
        exec.shutdown();
    }

    private static void test(int threadNum) throws InterruptedException {
        Thread.sleep(100);
        log.info("{}", threadNum);
        Thread.sleep(100);
    }
}

Semaphore

@Slf4j
public class Demo003Semaphore {

    private final static int count = 200;

    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < count; i++) {
            final int threadNum = i;
            exec.execute(()->{
                try {
                    semaphore.acquire(); // 获取执行许可
                    test(threadNum);
                    semaphore.release(); // 释放许可
                } catch (InterruptedException e) {
                    log.error("exception",e);
                } finally {
                }
            });
        }
        log.info("finish");
        exec.shutdown();
    }

    private static void test(int threadNum) throws InterruptedException {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}

CyclicBarrier

CyclicBarrier vs CountDownLatch

  • CountDownLatch 的计数器只能使用一次,
    而CyclicBarrier的计数器可以重置

code:

@Slf4j
public class Demo004CyclicBarrier2 {

    // CyclicBarrier 可以给一个runnable,满足条件时优先执行这个runnable
    private static CyclicBarrier barrier = new CyclicBarrier(5,()->{
        log.info("call back is running");
    });

    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            service.execute(()->{
                try {
                    race(threadNum);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            });
        }
        service.shutdown();
    }
    private static void race(int threadNum) throws InterruptedException, BrokenBarrierException, TimeoutException {
        Thread.sleep(1000);
        log.info("{} is ready",threadNum);
        try {
            barrier.await(2000, TimeUnit.MILLISECONDS);
        } catch (BrokenBarrierException e) {
            log.warn("BrokenBarrierException");
        } catch (TimeoutException e) {
            log.warn("TimeoutException");
        }
        log.info("{} continue",threadNum);
    }
}

ReentrantLock 与 锁

ReentrantLock与synchronized的区别

- 可重入性
    - 两者都可重入
- 锁的实现  
    - synchronized 依赖于jvm实现
    - ReentrantLock 通过jdk实现 
- 性能区别
    - 在功能 synchronized 可实现的情况下官方推荐 synchronized   
- ReentrantLock 需要手动释放锁  
- ReentrantLock 独有的功能
    - 可指定公平锁还是非公平锁 
    - 提供了一个Condition类,可分组唤醒需要唤醒的线程  
    - 提供能够中断等待锁的线程的机制,lock.lockInterruptibly() 

简单使用

@Slf4j
public class Demo005Lock {

    public static int clientTotal = 5000;

    public static int threadTotal = 500;

    public static int count = 0;

    private final static Lock lock = new ReentrantLock();

    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newCachedThreadPool();

        CountDownLatch latch = new CountDownLatch(clientTotal);
        Semaphore semaphore = new Semaphore(threadTotal);

        for (int i = 0; i < clientTotal; i++) {
            service.execute(()->{
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                latch.countDown();
            });
        }
        latch.await();
        service.shutdown();
        log.info("finish,count:{}",count);
    }

    private static void add() {
        try {
            lock.lock();
            count++;
        } finally {
            lock.unlock();
        }
    }
}

ReentrantReadWriteLock 读写锁

@Slf4j
public class Demo006ReentrantReadWriteLock {
    private final Map<String, Data> map = new HashMap<>();

    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    private final Lock readLock = lock.readLock();

    private final Lock writeLock = lock.writeLock();

    public Data get(String key) {
        readLock.lock();
        try {
            return map.get(key);
        } finally {
            readLock.unlock();
        }
    }
    public Set<String> getAllKeys() {
        readLock.lock();
        try {
            return map.keySet();
        } finally {
            readLock.unlock();
        }
    }

    public Data put(String key, Data value) {
        writeLock.lock();
        try {
            return map.put(key, value);
        } finally {
            writeLock.unlock();
        }
    }

    class Data{

    }
}

StampedLock

@Slf4j
public class Demo007StampedLock {

    public static int clientTotal = 5000;

    public static int threadTotal = 500;

    public static int count = 0;

    private final static StampedLock lock = new StampedLock();
    
    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newCachedThreadPool();

        CountDownLatch latch = new CountDownLatch(clientTotal);
        Semaphore semaphore = new Semaphore(threadTotal);

        for (int i = 0; i < clientTotal; i++) {
            service.execute(()->{
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                latch.countDown();
            });
        }
        latch.await();
        service.shutdown();
        log.info("finish,count:{}",count);
    }

    private static void add() {
        long stamp =  lock.writeLock();
        try {
            count++;
        } finally {
            lock.unlock(stamp);
        }
    }
}

Condition

@Slf4j
public class Demo008Condition {
    public static void main(String[] args) throws IOException {
        ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();

        new Thread(()->{
            try{
                lock.lock();
                log.info("wait signal"); // 1
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("get signal"); // 4
            lock.unlock();
        }).start();

        new Thread(()->{
            lock.lock();
            log.info("get lock"); // 2
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            condition.signal();
            log.info("send signal"); // 3
            lock.unlock();
        }).start();

        System.in.read();
    }
}

FutureTask

@Slf4j
public class Demo002FutureTask {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        FutureTask<String> futureTask = new FutureTask<>(() -> {
            log.info("do something in callable");
            Thread.sleep(5000);
            return "done";
        });

        new Thread(futureTask).start();
        log.info("do something in main");
        Thread.sleep(2000);
        String result = futureTask.get();
        log.info("result :{}",result);
    }
}

fork/join

fork/join 任务局限性

  • 只能使用fork/join操作来作为同步机制
  • 任务不做io操作
  • 任务不能抛出异常

简单使用

@Slf4j
public class Demo001ForkJoin extends RecursiveTask<Integer> {

    public static final int threshold = 2;
    private int start;
    private int end;

    public Demo001ForkJoin(int start, int end) {
        this.start = start;
        this.end = end;
    }
    @Override
    protected Integer compute() {
        int sum = 0;

        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            int middle = (start + end) / 2;
            Demo001ForkJoin leftTask = new Demo001ForkJoin(start, middle);
            Demo001ForkJoin rightTask = new Demo001ForkJoin(middle + 1, end);

            leftTask.fork();
            rightTask.fork();

            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            sum = leftResult + rightResult;
        }
        return sum ;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        Demo001ForkJoin task = new Demo001ForkJoin(1, 100);
        Future<Integer> result = forkJoinPool.submit(task);
        try {
            log.info("result:{}",result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

BlockingQueue

操作

- Throws Exception Special Value Blocks Times Out
insert add(o) offer(o) put(o) offer(o,timeout,timeunit)
remove remove(o) poll(o) take(o) poll(timeout,timeunit)
Examine element(o) peek(o)

一些实现类

  • ArrayBlockingQueue
    • 有界的阻塞队列
    • 先进先出
  • DelayQueue
    • 阻塞内部元素,元素必须实现Delayed接口,delayed 接口继承了comparable接口
    • 内部元素需要进行排序,一般按照元素过期时间优先级进行排序
  • LinkedBlockingQueue
    • 大小可变,可指定边界
    • 先进先出
  • PriorityBlockingQueue
    • 有优先级的阻塞队列
    • 无边界
    • 插入元素需要实现comparable接口,用于优先级的排序
  • SynchronousQueue
    • 内部仅容纳一个元素
    • 内部有元素时其他放入元素的操作被阻塞

线程池

ThreadPoolExecutor

参数:

  • corePoolSize : 核心线程数量
  • maximumPoolSize : 最大线程数量
  • workQueue : 阻塞队列,存储等待执行的任务,很重要,会对线程池运行过程产生重大影响
  • keepAliveTime : 线程没有任务执行时最多保持多久中止
  • unit : keepAliveTime 的时间单位
  • threadFactory: 线程工厂
  • rejectHandle: 当拒绝处理任务时的策略

操作:

  • execute() : 提交任务
  • submit() : 提交任务,能够返回结果
  • shutdown() : 关闭线程池,等待任务都执行完
  • shutdownNow() : 直接关闭线程池,不等待任务执行完

监控方法:

  • getTaskCount() : 线程池已执行和未执行的任务总数
  • getCompletedTaskCount() : 已完成的任务数量
  • getPoolSize() : 线程池当前的线程数量
  • getActiveCount() : 当前线程池中正在执行的线程数量

线程池-Executor框架接口

  • Executors.newCachedThreadPool 可缓存的线程池
  • Executors.newFixedThreadPool 固定数量的线程池,超过数量则等待
  • Executors.newScheduledThreadPool 定长,支持定时周期性的执行任务
  • Executors.newSingleThreadExecutor 单线程池化的线程池,只会用当个线程执行任务

newCachedThreadPool

@Slf4j
public class Demo001CachedThreadPool {
    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        int count = 50;
        for (int i = 0; i < count; i++) {
            final int num = i;
            service.execute(()->{
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info("task:{}",num);
            });
        }
        service.shutdown();
    }
}

newFixedThreadPool

@Slf4j
public class Demo002FixedThreadPool {
    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(10);
        int count = 50;
        for (int i = 0; i < count; i++) {
            final int num = i;
            service.execute(()->{
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info("task:{}",num);
            });
        }
        service.shutdown();
    }
}

newSingleThreadExecutor

@Slf4j
public class Demo003SingleThreadExecutor {
    public static void main(String[] args) {
        ExecutorService service = Executors.newSingleThreadExecutor();
        int count = 50;
        for (int i = 0; i < count; i++) {
            final int num = i;
            service.execute(()->{
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info("task:{}",num);
            });
        }
        service.shutdown();
    }
}

newScheduledThreadPool

@Slf4j
public class Demo004ScheduledThreadPool {
    public static void main(String[] args) {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(10);

        // 单次调用,延时3秒后执行
        service.schedule(()->{
            log.info("delayed for 3");
        },3, TimeUnit.SECONDS);

        // 循环调用,延迟5秒后每3秒运行一次
        service.scheduleAtFixedRate(()->{
            log.info("scheduleAtFixedRate");
        },5,3,TimeUnit.SECONDS);

        // service.shutdown();

        // 功能类似于Timer
        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                log.info("Timer");
            }
        },new Date(),1000*5);
    }
}

线程池的合理配置

  • cpu密集型任务,就需要尽量压榨cpu,参考值为cpu数量+1
  • io密集型,参考值可设置为2*cpu数量

死锁

死锁条件

  • 互斥条件
  • 请求和保持条件
  • 不剥夺条件
  • 环路等待条件

多线程并发最佳实践

  • 使用本地变量
  • 使用不可变类
  • 最小化锁的作用域范围:
    阿姆达尔定律: S = 1/(1-a+a/n)
  • 使用线程池的executor,而不是直接new thread执行
  • 宁可使用同步也不要使用线程的wait和notify
  • 使用BlockingQueue实现生产-消费模式
  • 使用并发集合而不是加了锁的同步集合
  • 使用semaphore创建有界的访问
  • 宁可使用同步代码块也不使用同步的方法
  • 避免使用静态变量

相关文章

网友评论

      本文标题:java并发学习

      本文链接:https://www.haomeiwen.com/subject/lnmxzqtx.html