多线程

作者: relax_小罗罗 | 来源:发表于2020-05-02 01:23 被阅读0次

    Java 程序天生就是多线程的

    一个 Java 程序从 main()方法开始执行,然后按照既定的代码逻辑执行,看 似没有其他线程参与,但实际上 Java 程序天生就是多线程程序,因为执行 main() 方法的是一个名称为 main 的线程。
    [6] Monitor Ctrl-Break //监控 Ctrl-Break 中断信号的
    [5] Attach Listener //内存 dump,线程 dump,类信息统计,获取系统属性等
    [4] Signal Dispatcher // 分发处理发送给 JVM 信号的线程
    [3] Finalizer // 调用对象 finalize 方法的线程
    [2] Reference Handler//清除 Reference 的线程
    [1] main //main 线程,用户程序入口

    一个线程是进程中的执行流
    一个进程可以同时包括多个线程

    实现线程的两种方式

    Thread(主要是继承extends)

    thread 对象需要一个任务来执行,任务是指线程在启动时执行的工作,该工作的功能代码被卸载run()方法中
    run方法必须使用一下语法格式

    public class ThreadDemo extends Thread {
        int count =10;
        @Override
        public void run() {
         while (true){
             System.out.println("-----------"+count);
             if(--count==0){
                 return;
             }
         }
        }
    
        public static void main(String[] args) {
            ThreadDemo threadDemo=new ThreadDemo();
            threadDemo.start();
            System.out.println("--------主方法结束");
        }
    }
    
    

    线程必须用start()方法启动

    Runnable(主要是实现 implements)

    实质上Thread类实现了Runnable 接口,其中的run方法正式对Runnable接口中的run()方法的具体实现;
    构造方法:
    public Thread(Runnable target);
    public Thread(Runnable target,String threadName);

    public class RunnableDemo implements Runnable {
        int count =10;
        @Override
        public void run() {
            while (true){
                System.out.println("-----------"+count);
                if(--count==0){
                    return;
                }
            }
        }
    
        public static void main(String[] args) {
            RunnableDemo threadDemo=new RunnableDemo();
            new Thread(threadDemo).start();
            System.out.println("--------主方法结束");
        }
    }
    

    线程的生命周期

    image.png

    就绪状态:
    sleep();
    wait();
    等待输入/输出完成

    可称为运行状态方法:
    notify();
    notifyAll();
    interrupt();
    线程休眠结束;
    输入输出结束;


    image.png

    线程的休眠:

      @Override
        public void run() {
         while (true){
             try {
                 Thread.sleep(2000);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
             System.out.println("-----------"+count);
             if(--count==0){
                 return;
             }
         }
        }
    

    线程的加入

    public class JoinTest extends JFrame {
        /**
         *
         */
        private static final long serialVersionUID = 1L;
        private Thread threadA; // 定义两个线程
        private Thread threadB;
        final JProgressBar progressBar = new JProgressBar(); // 定义两个进度条组件
        final JProgressBar progressBar2 = new JProgressBar();
        int count = 0;
    
        public static void main(String[] args) {
            init(new JoinTest(), 100, 100);
        }
    
        public JoinTest() {
            super();
            // 将进度条设置在窗体最北面
            getContentPane().add(progressBar, BorderLayout.NORTH);
            // 将进度条设置在窗体最南面
            getContentPane().add(progressBar2, BorderLayout.SOUTH);
            progressBar.setStringPainted(true); // 设置进度条显示数字字符
            progressBar2.setStringPainted(true);
            // 使用匿名内部类形式初始化Thread实例子
            threadA = new Thread(new Runnable() {
                int count = 0;
    
                public void run() { // 重写run()方法
                    while (true) {
                        progressBar.setValue(++count); // 设置进度条的当前值
                        try {
                            Thread.sleep(100); // 使线程A休眠100毫秒
                            threadB.join(); // 使线程B调用join()方法
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            threadA.start(); // 启动线程A
            threadB = new Thread(new Runnable() {
                int count = 0;
    
                public void run() {
                    while (true) {
                        progressBar2.setValue(++count); // 设置进度条的当前值
                        try {
                            Thread.sleep(100); // 使线程B休眠100毫秒
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        if (count == 100) // 当count变量增长为100时
                            break; // 跳出循环
                    }
                }
            });
            threadB.start(); // 启动线程B
        }
    
        // 设置窗体各种属性方法
        public static void init(JFrame frame, int width, int height) {
            frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
            frame.setSize(width, height);
            frame.setVisible(true);
        }
    }
    
    

    线程中断

    JDK早已废除了stop方法,不建议使用stop()来停止一个线程的运行。现在提倡在run()方法中无限循环,然后使用一个布尔标记循环停止
    通知线程要停止了(体现线程的协作)
    interrupt();
    isInterrupted();
    Thread.interrupted(); 会将中断表示为改为false;

    //不推荐
            private boolean isContinue=false;
    
        public void setContinue(boolean aContinue) {
            isContinue = aContinue;
        }
    
        @Override
        public void run() {
            while (!isContinue){
                try {
    
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+"-----------"+count);
                --count;
            }
    }
    
     @Override
        public void run() {
            //推荐用法
          //  while (!isInterrupted()) {
            while (!Thread.interrupted()) {
                try {
    
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "-----------" + count);
                --count;
            }
    
    
        }
    
        public static void main(String[] args) throws InterruptedException {
            ThreadDemo threadDemo = new ThreadDemo();
            threadDemo.setName("线程A");
            threadDemo.start();
            Thread.sleep(4000);
            threadDemo.interrupt();
    
            System.out.println("--------主方法结束");
        }
    

    如果线程中使用了sleep() 和wait() 方法,可以使用interrupt()方法退出循环但是会抛出InterruptedException异常,只要捕获异常,并处理中断业务即可。

    public class InterruptedSwing extends JFrame {
    
        private static final long serialVersionUID = 1L;
        Thread thread;
    
        public static void main(String[] args) {
            init(new InterruptedSwing(), 100, 100);
        }
    
        public InterruptedSwing() {
            super();
            final JProgressBar progressBar = new JProgressBar(); // 创建进度条
            // 将进度条放置在窗体合适位置
            getContentPane().add(progressBar, BorderLayout.NORTH);
            progressBar.setStringPainted(true); // 设置进度条上显示数字
            thread = new Thread(new Runnable() {
                int count = 0;
    
                public void run() {
                    while (true) {
                        progressBar.setValue(++count); // 设置进度条的当前值
                        try {
                            Thread.sleep(1000); // 使线程休眠1000豪秒
                            // 捕捉InterruptedException异常
                        } catch (InterruptedException e) {
                            System.out.println("当前线程序被中断");
                            break;
                        }
                    }
                }
            });
            thread.start(); // 启动线程
            thread.interrupt(); // 中断线程
        }
    
        public static void init(JFrame frame, int width, int height) {
            frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
            frame.setSize(width, height);
            frame.setVisible(true);
        }
    }
    

    线程优先级

    public class PriorityTest extends JFrame {
        private static final long serialVersionUID = 1L;
        private Thread threadA;
        private Thread threadB;
        private Thread threadC;
        private Thread threadD;
    
        public PriorityTest() {
            getContentPane().setLayout(new GridLayout(4, 1));
            // 分别实例化4个线程
            final JProgressBar progressBar = new JProgressBar();
            final JProgressBar progressBar2 = new JProgressBar();
            final JProgressBar progressBar3 = new JProgressBar();
            final JProgressBar progressBar4 = new JProgressBar();
            getContentPane().add(progressBar);
            getContentPane().add(progressBar2);
            getContentPane().add(progressBar3);
            getContentPane().add(progressBar4);
            progressBar.setStringPainted(true);
            progressBar2.setStringPainted(true);
            progressBar3.setStringPainted(true);
            progressBar4.setStringPainted(true);
            threadA = new Thread(new MyThread(progressBar));
            threadB = new Thread(new MyThread(progressBar2));
            threadC = new Thread(new MyThread(progressBar3));
            threadD = new Thread(new MyThread(progressBar4));
            //优先级不在1-10之内 会出现ILLegalArgumentException异常
            setPriority("threadA", 5, threadA);
            setPriority("threadB", 5, threadB);
            setPriority("threadC", 4, threadC);
            setPriority("threadD", 3, threadD);
        }
    
        // 定义设置线程的名称、优先级的方法
        public static void setPriority(String threadName, int priority,
                                       Thread t) {
            t.setPriority(priority); // 设置线程的优先级
            t.setName(threadName); // 设置线程的名称
            t.start(); // 启动线程
        }
    
        public static void main(String[] args) {
            init(new PriorityTest(), 100, 100);
        }
    
        public static void init(JFrame frame, int width, int height) {
            frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
            frame.setSize(width, height);
            frame.setVisible(true);
        }
    
        private final class MyThread implements Runnable { // 定义一个实现Runnable接口的类
            private final JProgressBar bar;
            int count = 0;
    
            private MyThread(JProgressBar bar) {
                this.bar = bar;
            }
    
            public void run() { // 重写run()方法
                while (true) {
                    bar.setValue(count += 10); // 设置滚动条的值每次自增10
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        System.out.println("当前线程序被中断");
                    }
                }
            }
        }
    
    }
    

    线程安全

    两个线程同时存取单一对象的数据;
    类锁:多个对象之间互不干扰,锁的其实是class对象 ;

       private static synchronized void synClass(){
          
             try {
                TimeUnit.SECONDS.sleep(seconds);
            } catch (InterruptedException e) {
            }
            System.out.println("---------");
        }
    

    方法锁

    int num = 10; // 设置当前总票数
    
        public void run() {
            while (true) {
                if (num > 0) {
                    try {
                        Thread.sleep(100);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName()+"---" + num--);
                }
            }
        }
    
        public static void main(String[] args) {
            ThreadSafeTest t = new ThreadSafeTest(); // 实例化类对象
            Thread tA = new Thread(t,"A"); // 以该类对象分别实例化4个线程
            Thread tB = new Thread(t,"B");
            Thread tC = new Thread(t,"C");
            Thread tD = new Thread(t,"D");
            tA.start(); // 分别启动线程
            tB.start();
            tC.start();
            tD.start();
        }
    

    JAVA提供了synchronized关键字来防止资源冲突

     public void run() {
            while (true) {
                synchronized ("") {
                    if (num > 0) {
                        try {
                            Thread.sleep(1000);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        System.out.println(Thread.currentThread().getName() + "---" + num--);
                    }
                }
            }
        }
    

    synchronized(object){
    }
    Object 为任意一个对象,每个对象都存在一个标志位,并具有两个值,0 和1 ,一个线程运行到同步块时首先检查该对象的标志位,如果为0状态,表明此同步块中存在其他线程在运行,这时该线程处于就绪状态,直到处于同步块中的线程执行完同步块中代码时 状态改为1,线程才能执行同步块代码
    同步方法就是将synchronized 方法上面;

     public synchronized void  doinit(){
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    

    volatile关键字,最轻量的同步机制

    只适合一写多读.

    守护线程的使用

     private static class UseThread extends Thread{
            @Override
            public void run() {
                try {
                    while (!isInterrupted()) {
                        System.out.println(Thread.currentThread().getName()
                                + " I am extends Thread.");
                    }
                    System.out.println(Thread.currentThread().getName()
                            + " interrupt flag is " + isInterrupted());
                } finally {
                    //守护线程中finally不一定起作用
                    System.out.println(" .............finally");
                }
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            UseThread useThread = new UseThread();
            useThread.setDaemon(true);
            useThread.start();
            Thread.sleep(5);
           // useThread.interrupt();
        }
    

    ThreadLocal

    每个线程都有自己的副本,保证隔离;
    引发的内存泄漏分析
    强引用 Object o=new Object
    软引用
    弱引用 weakReference 只要发生GC 就会被回收
    虚引用

    public class UseThreadLocal {
        private static ThreadLocal<Integer> intLocal
                = new ThreadLocal<Integer>(){
            @Override
            protected Integer initialValue() {
                System.out.println("this initialvalue is running......");
                return 1;
            }
        };
    
        private static ThreadLocal<String> stringThreadLocal =new ThreadLocal<String>(){
            @Override
            protected String initialValue() {
                System.out.println("this initialvalue is running......");
                return " threadName  is :"+Thread.currentThread().getName() ;
            }
        };
    
        /**
         * 运行3个线程
         */
        public void StartThreadArray(){
            Thread[] runs = new Thread[3];
            for(int i=0;i<runs.length;i++){
                runs[i]=new Thread(new TestThread(i));
            }
            for(int i=0;i<runs.length;i++){
                runs[i].start();
            }
        }
    
        /**
         *类说明:测试线程,线程的工作是将ThreadLocal变量的值变化,并写回,看看线程之间是否会互相影响
         */
        public static class TestThread implements Runnable{
            int id;
            public TestThread(int id){
                this.id = id;
            }
            public void run() {
                System.out.println(Thread.currentThread().getName()+":start");
                Integer s = intLocal.get();
                s = s+id;
                intLocal.set(s);
                System.out.println(Thread.currentThread().getName()
                        +":"+ intLocal.get());
                System.out.println(stringThreadLocal.get());
                //当线程结束后,对应该线程的局部变量将自动被垃圾回收,所以显式调用该方法清除线程的局部变量并不是必须的操作,但它可以加快内存回收的速度
                intLocal.remove();
            }
        }
    
        public static void main(String[] args){
            UseThreadLocal test = new UseThreadLocal();
            test.StartThreadArray();
        }
    }
    

    ThreadLocal的线程不安全

    public class ThreadLocalUnsafe implements Runnable{
        //对象中同一个对象引用
        public static Number number = new Number(0);
        
        //去掉静态即可
      //  public Number number = new Number(0);
    
        public void run() {
            //每个线程计数加一
            number.setNum(number.getNum()+1);
            //将其存储到ThreadLocal中
            value.set(number);
            SleepTools.ms(2);
            //输出num值
            System.out.println(Thread.currentThread().getName()+"="+value.get().getNum());
        }
    
        public static ThreadLocal<Number> value = new ThreadLocal<Number>() {
        };
    
        public static void main(String[] args) {
            for (int i = 0; i < 5; i++) {
                new Thread(new ThreadLocalUnsafe()).start();
            }
        }
    
        private static class Number {
            public Number(int num) {
                this.num = num;
            }
    
            private int num;
    
            public int getNum() {
                return num;
            }
    
            public void setNum(int num) {
                this.num = num;
            }
    
            @Override
            public String toString() {
                return "Number [num=" + num + "]";
            }
        }
    
    }
    
    

    线程开发工具类

    fork/join

    算法中有 快速排序,归并排序,外部排序用到fork/join 概念


    image.png

    RecursiveTask接受返回值;
    RecursiveAction不接受返回值;

    CountDownLatch

    控制器:控制main线程将会等待所有Woker结束后才能继续执行


    image.png
    public class UseCountDownLatch {
        
        static CountDownLatch latch = new CountDownLatch(6);
    
        /*初始化线程*/
        private static class InitThread implements Runnable{
    
            public void run() {
                System.out.println("Thread_"+Thread.currentThread().getId()
                        +" ready init work......");
                latch.countDown();
                for(int i =0;i<2;i++) {
                    System.out.println("Thread_"+Thread.currentThread().getId()
                            +" ........continue do its work");
                }
            }
        }
    
        /*业务线程等待latch的计数器为0完成*/
        private static class BusiThread implements Runnable{
    
            public void run() {
                try {
                    latch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                for(int i =0;i<3;i++) {
                    System.out.println("BusiThread_"+Thread.currentThread().getId()
                            +" do business-----");
                }
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            new Thread(new Runnable() {
                public void run() {
                    SleepTools.ms(1);
                    System.out.println("Thread_"+Thread.currentThread().getId()
                            +" ready init work step 1st......");
                    latch.countDown();
                    System.out.println("begin step 2nd.......");
                    SleepTools.ms(1);
                    System.out.println("Thread_"+Thread.currentThread().getId()
                            +" ready init work step 2nd......");
                    latch.countDown();
                }
            }).start();
            new Thread(new BusiThread()).start();
            for(int i=0;i<=3;i++){
                Thread thread = new Thread(new InitThread());
                thread.start();
            }
    
            latch.await();
            System.out.println("Main do ites work........");
        }
    }
    

    CyclicBarrier

    到达一个节点,然后在一起执行
    CountDownLatch 总数可以和线程数不一样
    CyclicBarrier 必须等于线程数


    image.png
    public class UseCyclicBarrier {
    
        private static CyclicBarrier barrier
                = new CyclicBarrier(4,new CollectThread());
    
        //存放子线程工作结果的容器
        private static ConcurrentHashMap<String,Long> resultMap
                = new ConcurrentHashMap<>();
    
        public static void main(String[] args) {
            for(int i=0;i<4;i++){
                Thread thread = new Thread(new SubThread());
                thread.start();
            }
    
        }
    
        /*汇总的任务*/
        private static class CollectThread implements Runnable{
    
            @Override
            public void run() {
                StringBuilder result = new StringBuilder();
                for(Map.Entry<String,Long> workResult:resultMap.entrySet()){
                    result.append("["+workResult.getValue()+"]");
                }
                System.out.println(" the result = "+ result);
                System.out.println("do other business........");
            }
        }
    
        /*相互等待的子线程*/
        private static class SubThread implements Runnable{
    
            @Override
            public void run() {
                long id = Thread.currentThread().getId();
                resultMap.put(Thread.currentThread().getId()+"",id);
                try {
                        Thread.sleep(1000+id);
                        System.out.println("Thread_"+id+" ....do something ");
                        //汇总1次
                    barrier.await();
                    Thread.sleep(1000+id);
                    System.out.println("Thread_"+id+" ....do its business ");
                    //汇总2次
                    barrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
    
            }
        }
    }
    

    Semaphore

    线程流控
    useful.release(); 可以凭空new出来一个连接 放进池子
    所以用两个 来控制


    image.png
    public class DBPoolSemaphore {
        
        private final static int POOL_SIZE = 10;
        //两个指示器,分别表示池子还有可用连接和已用连接
        private final Semaphore useful,useless;
        //存放数据库连接的容器
        private static LinkedList<Connection> pool = new LinkedList<Connection>();
        //初始化池
        static {
            for (int i = 0; i < POOL_SIZE; i++) {
                pool.addLast(SqlConnectImpl.fetchConnection());
            }
        }
        public DBPoolSemaphore() {
            this.useful = new Semaphore(10);
            this.useless = new Semaphore(0);
        }
        
        /*归还连接*/
        public void returnConnect(Connection connection) throws InterruptedException {
            if(connection!=null) {
                System.out.println("当前有"+useful.getQueueLength()+"个线程等待数据库连接!!"
                        +"可用连接数:"+useful.availablePermits());
                useless.acquire();
                synchronized (pool) {
                    pool.addLast(connection);
                }
                useful.release();
            }
        }
        
        /*从池子拿连接*/
        public Connection takeConnect() throws InterruptedException {
            useful.acquire();
            Connection connection;
            synchronized (pool) {
                connection = pool.removeFirst();
            }
            useless.release();
            return connection;
        }
        
    }
    

    Callable、Future和FutureTask

    image.png

    通过线程拿到返回结果

    public class UseFuture {
        
        
        /*实现Callable接口,允许有返回值*/
        private static class UseCallable implements Callable<Integer>{
            private int sum;
            @Override
            public Integer call() throws Exception {
                System.out.println("Callable子线程开始计算!");  
    //          Thread.sleep(1000);
                for(int i=0 ;i<5000;i++){
                    if(Thread.currentThread().isInterrupted()) {
                        System.out.println("Callable子线程计算任务中断!");
                        return null;
                    }
                    sum=sum+i;
                    System.out.println("sum="+sum);
                }  
                System.out.println("Callable子线程计算结束!结果为: "+sum);  
                return sum; 
            }
        }
        
        public static void main(String[] args) 
                throws InterruptedException, ExecutionException {
    
            UseCallable useCallable = new UseCallable();
            //包装
            FutureTask<Integer> futureTask = new FutureTask<>(useCallable);
            Random r = new Random();
            new Thread(futureTask).start();
    
            Thread.sleep(1);
            if(r.nextInt(100)>50){
                System.out.println("Get UseCallable result = "+futureTask.get());
            }else{
                System.out.println("Cancel................. ");
                futureTask.cancel(true);
            }
    
        }
    
    }
    
    

    乐观锁 ---- 成不成功无所谓,先执行;
    悲观锁 ---- 先占有 在执行, 经常造成死锁;

    原子操作CAS--无锁化编程

    CAS(Compare And Swap)
    类似事务,要么 全部完成,要么不成功;
    一般处理器 提供 CAS指令()
    CAS的原理
    利用了现代处理器都支持的CAS的指令,
    循环这个指令,直到成功为止
    CAS的问题
    ABA问题: 拿到 A1 去电脑内存中值为A3
    开销问题: 循环的开销
    只能保证一个共享变量的原子操作


    image.png

    Jdk中相关原子操作类的使用
    更新基本类型类:AtomicBoolean,AtomicInteger,AtomicLong
    更新数组类:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray
    更新引用类型:AtomicReference,AtomicMarkableReference,AtomicStampedReference
    原子更新字段类: AtomicReferenceFieldUpdater,AtomicIntegerFieldUpdater,AtomicLongFieldUpdater

    相关文章

      网友评论

          本文标题:多线程

          本文链接:https://www.haomeiwen.com/subject/vzwcghtx.html