美文网首页
Java多线程相关知识点扩展实例分析

Java多线程相关知识点扩展实例分析

作者: IT人故事会 | 来源:发表于2021-02-22 15:29 被阅读0次

    这次说说多线程只是扩展,主要讲解一些应用,应用带一些原理讲解,同时希望各位老铁有所收获,这些内容跟前面的线程和并发容器有关系的,从应用场景引出多线程技术栈里面的应用,其实锁和同步块,容器,工具类,都是非常的使用的。反射更加适应于语法。

    (一)多线程应用

    • ① 介绍

    多线程经常使用在逻辑处理里面,一个程序N个逻辑要做,一个用户请求可能需要数据库查询,第三方的系统接口,调用redis,一个用户请求需要多步组成,可以使用多线程技术来实现,去做一些调整,

    • ② 经典场景

    接触后端开发的时候,经常发现一个请求过来,后端需要做一系列的复杂的操作,下面这个后端有:系统消息,我的团队,我的钱包 对于这些信息,后面的系统如何设计。这些可能涉及到多个模块的调用。一个系统划分为多个子系统来做。

    • ③ 后端接口执行-大概流程

    互联网公司存在组织结构复杂,调用的模块比较多。设计这样系统的时候,一个信息单独的查询系统的对应接口,还是移动前端发起一次请求一下获取到。一般都做网关(API)接口,一个请求获取多个信息,网关收到信息后,获取多个子系统的接口,最后把信息汇总,返回给前端。

    1. 收到一个请求。
    2. 调用多个服务接口获取其他系统的数据信息。
    3. 最后汇总范围。

    通过数据分析,越来越多的互联网电商平台的单子70%以上都来自手机端,手机端有个典型的应用,网络处理很麻烦的,移动设备的固有属性,一个人走这走这到了信号的盲区了,一个页面发起五六个接口的请求,移动互联网的应用造成了很大的损耗,一般都是一个接口获取全部的信息。

    如果一个API网关需要调用3个接口,这3个接口是串行完成的,A执行完(3秒),执行B,B执行完(2秒),执行C(5秒),C执行完返回给移动端json字符串,需要10秒才能返回。

    如果A,B,C这3个没有相互依赖的关系,完全可以把A交给线程1,B交给线程2,C交给线程3,来一起去完成,汇总执行的结果,需要5秒,没完成就返回。这样是不是效率明显得到了提升。

    (二)Future

    • ① 介绍

    异步计算的结果,提供了用于检查计算是否完成,等待计算完成以及获取结果的方法。

    • ② 接口的定义
    1. boolean cancel(boolean mayInterruptIfRunning)

    尝试取消当前任务的执行。如果任务已经取消、已经完成或者其他原因不能取消,尝试将失败。如果任务还没有启动就调用了cancel(true),任务将永远不会被执行。如果任务已经启动,参数mayInterruptIfRunning将决定任务是否应该中断执行该任务的线程,以尝试中断该任务。
    如果任务不能被取消,通常是因为它已经正常完成,此时返回false,否则返回true

    1. boolean isCancelled()

    如果任务在正常结束之前被被取消返回true

    3.boolean isDone()

    正常结束、异常或者被取消导致任务完成,将返回true

    4.V get()

    等待任务结束,然后获取结果,如果任务在等待过程中被终端将抛出InterruptedException,如果任务被取消将抛出CancellationException,如果任务中执行过程中发生异常将抛出ExecutionException。

    5.V get(long timeout, TimeUnit unit)

    任务最多在给定时间内完成并返回结果,如果没有在给定时间内完成任务将抛出TimeoutException。

    (三)CountDownLatch

    • ① 介绍

    一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

    • ② 常见用法

    多个人等一个信号后继续执行操作。例如5个运动员,等一个发令员的枪响。
    一个人等多个人的信号。旅游团等所有人签到完成才开始出发。
    常见到使用的地方是zk获取连接的时候。

    • ③ 源码分析
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import org.springframework.web.client.RestTemplate;
    
    import java.util.ArrayList;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class UserServiceCountLatch {
        ExecutorService executorService = Executors.newCachedThreadPool();
    
        @Autowired
        private RestTemplate restTemplate;
    
        /**
         * 查询多个系统的数据,合并返回
         */
        public Object getUserInfo(String userId) throws InterruptedException {
            CountDownLatch count = new CountDownLatch(2);
            ArrayList<JSONObject> values = new ArrayList<>();
            // 你可以封装成一个 提交URL 就能自动多线程调用的 工具
                executorService.submit(() -> {
                    // 1.业务代码
                    JSONObject userInfo = new JSONObject();
                    values.add(userInfo);
                    count.countDown();
                });
                executorService.submit(() -> {
                   // 2.业务代码
                   JSONObject intergralInfo= new JSONObject();
                    values.add(intergralInfo);
                    count.countDown();
            });
    
            count.await();// 等待计数器归零
    
            // 3. 合并为一个json对象
            JSONObject result = new JSONObject();
            for (JSONObject value : values) {
                result.putAll(value);
            }
            return result;
        }
    }
    

    1.统计线程执行的情况
    2.压力测试中,使用countDownLatch实现最大程度的并发处理。
    2.多个线程之间,相互通信,比如线程异步调用完接口,结果通知。

    (四)CyclicBarrier

    • ① 介绍

    CyclicBarrier可以使一定数量的线程反复地在栅栏位置处汇集。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。

    • ② 场景

    坐车,老板都是票卖完了才开车。
    数据库的批量操作,达到一定数量批量进行插入。

    • ③ 源码
    import java.util.ArrayList;
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    // 循环屏障(栅栏),示例:数据库批量插入
    // 游戏大厅... 5人组队打副本
    public class CyclicBarrierTest {
        public static void main(String[] args) throws InterruptedException {
            LinkedBlockingQueue<String> sqls = new LinkedBlockingQueue<>();
            // 任务1+2+3...1000  拆分为100个任务(1+..10,  11+20) -> 100线程去处理。
    
            // 每当有4个线程处于await状态的时候,则会触发barrierAction执行
            CyclicBarrier barrier = new CyclicBarrier(4, new Runnable() {
                @Override
                public void run() {
                    // 这是每满足4次数据库操作,就触发一次批量执行
                    System.out.println("有4个线程执行了,开始批量插入: " + Thread.currentThread());
                    for (int i = 0; i < 4; i++) {
                        System.out.println(sqls.poll());
                    }
                }
            });
    
            for (int i = 0; i < 10; i++) {
                new Thread(() -> {
                    try {
                        sqls.add("data - " + Thread.currentThread()); // 缓存起来
                        Thread.sleep(1000L); // 模拟数据库操作耗时
                        barrier.await(); // 等待栅栏打开,有4个线程都执行到这段代码的时候,才会继续往下执行
                        System.out.println(Thread.currentThread() + "插入完毕");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }).start();
            }
    
            Thread.sleep(2000);
        }
    }
    

    (五)Semaphore

    • ①介绍

    又称“信号量”,控制多个线程争抢许可。
    acquire: 获取一个许可,如果没有就等待。
    release: 释放一个许可。
    availablePermits: 方法得到可用的许可数目。

    • ② 场景

    代码并发处理限流

    举个例子,去洗浴中心的时候都会给一个手环,这个手环很多时候就是为了限制熟练,因为柜子是有限的,每个人一个柜子,如果没有手环了就是没有柜子了,手环归还后柜子就出现了,基本就是这个原理。

    • ③ 源码
    import com.study.lock.aqs.NeteaseAqs;
    
    // 自定义的信号量实现
    public class NeteaseSemaphore {
        NeteaseAqs aqs = new NeteaseAqs() {
            @Override
            public int tryAcquireShared() { // 信号量获取, 数量 - 1
                for(;;) {
                    int count =  getState().get();
                    int n = count - 1;
                    if(count <= 0 || n < 0) {
                        return -1;
                    }
                    if(getState().compareAndSet(count, n)) {
                        return 1;
                    }
                }
            }
    
            @Override
            public boolean tryReleaseShared() { // state + 1
                return getState().incrementAndGet() >= 0;
            }
        };
    
        /** 许可数量 */
        public NeteaseSemaphore(int count) {
            aqs.getState().set(count); // 设置资源的状态
        }
    
        public void acquire() {
            aqs.acquireShared();
        } // 获取令牌
    
        public void release() {
            aqs.releaseShared();
        } // 释放令牌
    }
    
    
    import com.study.lock.aqs.AQSdemo;
    
    import java.util.Random;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.atomic.AtomicInteger;
    
    // 信号量机制
    public class SemaphoreDemo {
        public static void main(String[] args) {
            SemaphoreDemo semaphoreTest = new SemaphoreDemo();
            int N = 9;            // 客人数量
            NeteaseSemaphore semaphore = new NeteaseSemaphore(5); // 手牌数量,限制请求数量
            for (int i = 0; i < N; i++) {
                String vipNo = "vip-00" + i;
                new Thread(() -> {
                    try {
                        semaphore.acquire(); // 获取令牌
    
                        semaphoreTest.service(vipNo);
    
                        semaphore.release(); // 释放令牌
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
        }
    
        // 限流 控制5个线程 同时访问
        public void service(String vipNo) throws InterruptedException {
            System.out.println("楼上出来迎接贵宾一位,贵宾编号" + vipNo + ",...");
            Thread.sleep(new Random().nextInt(3000));
            System.out.println("欢送贵宾出门,贵宾编号" + vipNo);
        }
    
    }
    
    package com.study.lock.aqs;
    
    import java.util.Iterator;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.atomic.AtomicReference;
    import java.util.concurrent.locks.LockSupport;
    
    // 抽象队列同步器
    // state, owner, waiters
    public class NeteaseAqs {
        // acquire、 acquireShared : 定义了资源争用的逻辑,如果没拿到,则等待。
        // tryAcquire、 tryAcquireShared : 实际执行占用资源的操作,如何判定一个由使用者具体去实现。
        // release、 releaseShared : 定义释放资源的逻辑,释放之后,通知后续节点进行争抢。
        // tryRelease、 tryReleaseShared: 实际执行资源释放的操作,具体的AQS使用者去实现。
    
        // 1、 如何判断一个资源的拥有者
        public volatile AtomicReference<Thread> owner = new AtomicReference<>();
        // 保存 正在等待的线程
        public volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
        // 记录资源状态
        public volatile AtomicInteger state = new AtomicInteger(0);
    
        // 共享资源占用的逻辑,返回资源的占用情况
        public int tryAcquireShared(){
            throw new UnsupportedOperationException();
        }
    
        public void acquireShared(){
            boolean addQ = true;
            while(tryAcquireShared() < 0) {
                if (addQ) {
                    // 没拿到锁,加入到等待集合
                    waiters.offer(Thread.currentThread());
                    addQ = false;
                } else {
                    // 阻塞 挂起当前的线程,不要继续往下跑了
                    LockSupport.park(); // 伪唤醒,就是非unpark唤醒的
                }
            }
            waiters.remove(Thread.currentThread()); // 把线程移除
        }
    
        public boolean tryReleaseShared(){
            throw new UnsupportedOperationException();
        }
    
        public void releaseShared(){
            if (tryReleaseShared()) {
                // 通知等待者
                Iterator<Thread> iterator = waiters.iterator();
                while (iterator.hasNext()) {
                    Thread next = iterator.next();
                    LockSupport.unpark(next); // 唤醒
                }
            }
        }
    
        // 独占资源相关的代码
    
        public boolean tryAcquire() { // 交给使用者去实现。 模板方法设计模式
            throw new UnsupportedOperationException();
        }
    
        public void acquire() {
            boolean addQ = true;
            while (!tryAcquire()) {
                if (addQ) {
                    // 没拿到锁,加入到等待集合
                    waiters.offer(Thread.currentThread());
                    addQ = false;
                } else {
                    // 阻塞 挂起当前的线程,不要继续往下跑了
                    LockSupport.park(); // 伪唤醒,就是非unpark唤醒的
                }
            }
            waiters.remove(Thread.currentThread()); // 把线程移除
        }
    
        public boolean tryRelease() {
            throw new UnsupportedOperationException();
        }
    
        public void release() { // 定义了 释放资源之后要做的操作
            if (tryRelease()) {
                // 通知等待者
                Iterator<Thread> iterator = waiters.iterator();
                while (iterator.hasNext()) {
                    Thread next = iterator.next();
                    LockSupport.unpark(next); // 唤醒
                }
            }
        }
    
        public AtomicInteger getState() {
            return state;
        }
    
        public void setState(AtomicInteger state) {
            this.state = state;
        }
    }
    

    PS:工具是根据场景来的,达到某个场景这个工具才有它的价值,如果你不存在这个场景这个工具也就没有价值。多线程这块设计到3块的知识:筑基阶段(JMM,lock,cas,atomic,sync),并发容器(。里面都涉及到数据结构,我已经开通了专辑数据结构与算法,数据结构并不是一两篇文章就可以搞定的东西,大学可是一门学科。),工具类阶段(多线程工具类阶段,设计模式的体现。不同的源码都有自己的设计模式的体现)

    相关文章

      网友评论

          本文标题:Java多线程相关知识点扩展实例分析

          本文链接:https://www.haomeiwen.com/subject/xodmbctx.html