美文网首页Java架构师Java 杂谈
JAVA并发编程之多线程并发同步业务场景与解决方案

JAVA并发编程之多线程并发同步业务场景与解决方案

作者: Java耕耘者 | 来源:发表于2018-10-23 14:31 被阅读9次

    Java并发编程是一个很热点的话题,无论在面试时候还是在高并发的场景中。都会涉及到Java的并发编程相关的知识。Java的并发编程有两个主要的基础知识,一个是线程安全另一个是线程间通信。本Java并发编程系列博客作为博主系统学习Java并发编程的知识记录。也希望可以帮助其他人。

    摘要

    1,线程概念

    2,Java线程的实现方式

    3,Java线程状态流转介绍

    4,Thread类中的常用方法分析

    1.什么是线程

    线程,是程序执行流的最小单元。一个标准的线程由线程ID,当前指令指针(PC),寄存器集合和堆栈组 成。另外,线程是进程中的一个实体,是被系统独立调度和分派的基本单位,线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属一个 进程的其它线程共享进程所拥有的全部资源,同一进程中的多个线程之间可以并发执行。由于线程之间的相互制约,致使线程 在运行中呈现出间断性。线程也有就绪、阻塞和运行三种基本状态。每一个程序都至少有一个线程,若程序只有一个线程,那就是程序本身。

    2.Java的线程实现

    Java线程实现方式有两种 第一种是继承Thread类 ,另一种是实现Runnable接口。而Thread类内部也同样实现了Runnable接口。Runnable接口只有一个run()方法。

    3.Java线程状态

    首先 看张线程状态流转图 这张图很重要

    1.20个人排队同时访问2个购票窗口,同时能购票的只有两个人,当其中一个人买票完成后,18个人中的其中一个在占用窗口进行购买。

    20个人相当于20个线程,2相当于资源,当18个人等待的时候,相当于线程堵塞,

    问题关键:怎么控制在统一时间的并发量为2?

    在多线程程序设计中有三个同步工具需要我们掌握,分别是Semaphore(信号量),countDownLatch(倒计数门闸锁),CyclicBarrier(可重用栅栏)

    Semaphore是一种在多线程环境下使用的设施,该设施负责协调各个线程,以保证它们能够正确、合理的使用公共资源的设施,也是操作系统中用于控制进程同步互斥的量。

    Semaphore分为单值和多值两种,前者只能被一个线程获得,后者可以被若干个线程获得。

    信号量Semaphore的介绍

    我们以一个停车场运作为例来说明信号量的作用。假设停车场只有三个车位,一开始三个车位都是空的。这时如果同时来了三辆车,看门人允许其中它们进入进入,然后放下车拦。以后来的车必须在入口等待,直到停车场中有车辆离开。这时,如果有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开一辆,则又可以放入一辆,如此往复。

    在这个停车场系统中,车位是公共资源,每辆车好比一个线程,看门人起的就是信号量的作用。信号量是一个非负整数,表示了当前公共资源的可用数目(在上面的例子中可以用空闲的停车位类比信号量),当一个线程要使用公共资源时(在上面的例子中可以用车辆类比线程),首先要查看信号量,如果信号量的值大于1,则将其减1,然后去占有公共资源。如果信号量的值为0,则线程会将自己阻塞,直到有其它线程释放公共资源。

    在信号量上我们定义两种操作: acquire(获取) 和 release(释放)。当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1),要么一直等下去,直到有线程释放信号量,或超时。release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。

    信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。

    信号量Semaphore的源码分析

    在Java的并发包中,Semaphore类表示信号量。Semaphore内部主要通过AQS(AbstractQueuedSynchronizer)实现线程的管理。Semaphore有两个构造函数,参数permits表示许可数,它最后传递给了AQS的state值。线程在运行时首先获取许可,如果成功,许可数就减1,线程运行,当线程运行结束就释放许可,许可数就加1。如果许可数为0,则获取失败,线程位于AQS的等待队列中,它会被其它释放许可的线程唤醒。在创建Semaphore对象的时候还可以指定它的公平性。一般常用非公平的信号量,非公平信号量是指在获取许可时先尝试获取许可,而不必关心是否已有需要获取许可的线程位于等待队列中,如果获取失败,才会入列。而公平的信号量在获取许可时首先要查看等待队列中是否已有线程,如果有则入列。

    构造函数源代码

    //非公平的构造函数

    public Semaphore(int permits) {

    sync = new NonfairSync(permits);

    }

    //通过fair参数决定公平性

    public Semaphore(int permits, boolean fair) {

    sync = fair ? new FairSync(permits) : new NonfairSync(permits);

    }

    acquire源代码

    public void acquire() throws InterruptedException {

    sync.acquireSharedInterruptibly(1);

    }

    public final void acquireSharedInterruptibly(int arg)

    throws InterruptedException {

    if (Thread.interrupted())

    throw new InterruptedException();

    if (tryAcquireShared(arg) < 0)

    doAcquireSharedInterruptibly(arg);

    }

    final int nonfairTryAcquireShared(int acquires) {

    for (;;) {

    int available = getState();

    int remaining = available - acquires;

    if (remaining < 0 ||

    compareAndSetState(available, remaining))

    return remaining;

    }

    }

    可以看出,如果remaining <0 即获取许可后,许可数小于0,则获取失败,在doAcquireSharedInterruptibly方法中线程会将自身阻塞,然后入列。

    release源代码

    public void release() {

    sync.releaseShared(1);

    }

    public final boolean releaseShared(int arg) {

    if (tryReleaseShared(arg)) {

    doReleaseShared();

    return true;

    }

    return false;

    }

    protected final boolean tryReleaseShared(int releases) {

    for (;;) {

    int current = getState();

    int next = current + releases;

    if (next < current) // overflow

    throw new Error("Maximum permit count exceeded");

    if (compareAndSetState(current, next))

    return true;

    }

    }

    可以看出释放许可就是将AQS中state的值加1。然后通过doReleaseShared唤醒等待队列的第一个节点。可以看出Semaphore使用的是AQS的共享模式,等待队列中的第一个节点,如果第一个节点成功获取许可,又会唤醒下一个节点,以此类推。

    使用示例

    package javalearning;

    import java.util.Random;

    import java.util.concurrent.ExecutorService;

    import java.util.concurrent.Executors;

    import java.util.concurrent.Semaphore;

    public class SemaphoreDemo {

    private Semaphore smp = new Semaphore(3);

    private Random rnd = new Random();

    class TaskDemo implements Runnable{

    private String id;

    TaskDemo(String id){

    this.id = id;

    }

    @Override

    public void run(){

    try {

    smp.acquire();

    System.out.println("Thread " + id + " is working");

    Thread.sleep(rnd.nextInt(1000));

    smp.release();

    System.out.println("Thread " + id + " is over");

    } catch (InterruptedException e) {

    }

    }

    }

    public static void main(String[] args){

    SemaphoreDemo semaphoreDemo = new SemaphoreDemo();

    //注意我创建的线程池类型,

    ExecutorService se = Executors.newCachedThreadPool();

    se.submit(semaphoreDemo.new TaskDemo("a"));

    se.submit(semaphoreDemo.new TaskDemo("b"));

    se.submit(semaphoreDemo.new TaskDemo("c"));

    se.submit(semaphoreDemo.new TaskDemo("d"));

    se.submit(semaphoreDemo.new TaskDemo("e"));

    se.submit(semaphoreDemo.new TaskDemo("f"));

    se.shutdown();

    }

    }

    运行结果

    Thread c is working

    Thread b is working

    Thread a is working

    Thread c is over

    Thread d is working

    Thread b is over

    Thread e is working

    Thread a is over

    Thread f is working

    Thread d is over

    Thread e is over

    Thread f is over

    可以看出,最多同时有三个线程并发执行,也可以认为有三个公共资源(比如计算机的三个串口)。

    如何一起学习,有没有免费资料?

    在程序员这条路上遇到瓶颈的朋友可以加WX:daxigua012 大家一起来提升进步 但要备注好信息 ,分享知识

    关注下面公众号"Java这点事"获取BATJ等一线互联网企业面试题目和答案还有java技术干货知识等你领取                                                                                                                              

    相关文章

      网友评论

        本文标题:JAVA并发编程之多线程并发同步业务场景与解决方案

        本文链接:https://www.haomeiwen.com/subject/ircazftx.html