美文网首页
多线程-基础

多线程-基础

作者: 麦大大吃不胖 | 来源:发表于2020-12-04 10:40 被阅读0次

    by shihang.mai

    1. 创建线程&停止线程

    1.1 创建线程

    1. extends Thread->new Class().start();
    2. implements Runnable->new Thread(new Class()).start();
    3. Lambda表达式
    4. 线程池new ThreadPoolExecutor()
    public class Testmsh {
    
        public static void main(String[] args) {
    
            //1. 继承Thread
            new MyThread().start();
            //2. 实现Runnable接口
            new Thread(new MyRun()).start();
            //3. lambda表达式
            new Thread(()->{
                System.out.println("i am lambda");
            }).start();
            //4. 线程池
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(4), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
            threadPoolExecutor.submit(new MyRun());
        }
    
        static class MyRun implements Runnable {
            @Override
            public void run() {
                System.out.println("Hello MyRun!");
            }
        }
    
        static class MyThread extends Thread {
            @Override
            public void run() {
                System.out.println("Hello MyThread!");
            }
        }
    }
    

    1.2 停止线程

    1. interrupt
    2. 标志位

    2. 线程状态

    线程状态迁移
    1. new Thread()进入NEW状态
    2. start()进入RUNNABLE状态,而RUNNABLE状态包括READY和RUNNING状态,它们之间通过cpu调度切换,或者RUNNING状态的线程主动掉用yield()进入READY状态
    3. 线程执行完毕就进入TEMINATED销毁状态
    4. RUNNABLE状态可通过
    • Thread.sleep(time),o.wait(time),t.join(time)变为TIMEWAITING状态,时间结束回到RUNNABLE状态
    • o.wait(),t.join()变为WAITING状态,o.notify(),o.notifyAll()回到RUNNABLE状态
    • 等待锁变为BLOCKED状态,获得锁回到RUNNABLE状态

    3. 多线程的某写类用法

    3.1 ReentrantLock

    基本和sync一样,写sync的地方直接用lock即可.注意必须手动unlock释放锁。

    public class T02_ReentrantLock2 {
        Lock lock = new ReentrantLock();
    
        void m1() {
            try {
                lock.lock(); //synchronized(this)
                for (int i = 0; i < 10; i++) {
                    TimeUnit.SECONDS.sleep(1);
    
                    System.out.println(i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        void m2() {
            try {
                lock.lock();
                System.out.println("m2 ...");
            } finally {
                lock.unlock();
            }
    
        }
    
        public static void main(String[] args) {
            T02_ReentrantLock2 rl = new T02_ReentrantLock2();
            new Thread(rl::m1).start();
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            new Thread(rl::m2).start();
        }
    }
    

    特点:

    1. 有尝试锁操作

      lock.tryLock(5, TimeUnit.SECONDS);
      
    2. 可被打断的锁

      Thread t2 = new Thread(()->{
               try {
                   //lock.lock();
                   lock.lockInterruptibly(); //可以对interrupt()方法做出响应
                   System.out.println("t2 start");
                   TimeUnit.SECONDS.sleep(5);
                   System.out.println("t2 end");
               } catch (InterruptedException e) {
                   System.out.println("interrupted!");
               } finally {
                   lock.unlock();
               }
           });
           t2.start();
           
           try {
               TimeUnit.SECONDS.sleep(1);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           t2.interrupt(); 
      
    3. 可设置公平锁

    ReentrantLock lock=new ReentrantLock(true);
    

    公平锁:去麦当劳买早餐

    • 如果没人排队,直接去买
    • 如果有人排队,自己排在后面

    非公平锁:去麦当劳买早餐

    • 如果没人排队,直接去买
    • 就算有人排队,我先去插一下队
      • 如果插到,就买
      • 如果插不到,自己排在队后

    3.2 CountDownLatch

    倒数器

    CountDownLatch latch = new CountDownLatch(100);
    //减1
    latch.countDown();
    //阻塞,等待减到为0,代码继续向下走
    latch.await();
    ...............
    

    3.3 CyclicBarrier

    栅栏

    CyclicBarrier barrier = new CyclicBarrier(20, () -> System.out.println("满人"));
    barrier.await();
    

    3.4 Phaser

    栅栏组

    public class T09_TestPhaser2 {
        static Random r = new Random();
        static MarriagePhaser phaser = new MarriagePhaser();
    
    
        static void milliSleep(int milli) {
            try {
                TimeUnit.MILLISECONDS.sleep(milli);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
    
            phaser.bulkRegister(7);
    
            for(int i=0; i<5; i++) {
    
                new Thread(new Person("p" + i)).start();
            }
    
            new Thread(new Person("新郎")).start();
            new Thread(new Person("新娘")).start();
    
        }
    
    
    
        static class MarriagePhaser extends Phaser {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
    
                switch (phase) {
                    case 0:
                        System.out.println("所有人到齐了!" + registeredParties);
                        System.out.println();
                        return false;
                    case 1:
                        System.out.println("所有人吃完了!" + registeredParties);
                        System.out.println();
                        return false;
                    case 2:
                        System.out.println("所有人离开了!" + registeredParties);
                        System.out.println();
                        return false;
                    case 3:
                        System.out.println("婚礼结束!新郎新娘抱抱!" + registeredParties);
                        return true;
                    default:
                        return true;
                }
            }
        }
    
    
        static class Person implements Runnable {
            String name;
    
            public Person(String name) {
                this.name = name;
            }
    
            public void arrive() {
    
                milliSleep(r.nextInt(1000));
                System.out.printf("%s 到达现场!\n", name);
                phaser.arriveAndAwaitAdvance();
            }
    
            public void eat() {
                milliSleep(r.nextInt(1000));
                System.out.printf("%s 吃完!\n", name);
                phaser.arriveAndAwaitAdvance();
            }
    
            public void leave() {
                milliSleep(r.nextInt(1000));
                System.out.printf("%s 离开!\n", name);
    
    
                phaser.arriveAndAwaitAdvance();
            }
    
            private void hug() {
                if(name.equals("新郎") || name.equals("新娘")) {
                    milliSleep(r.nextInt(1000));
                    System.out.printf("%s 洞房!\n", name);
                    phaser.arriveAndAwaitAdvance();
                } else {
                    phaser.arriveAndDeregister();
                    //phaser.register()
                }
            }
    
            @Override
            public void run() {
                arrive();
    
    
                eat();
    
    
                leave();
    
    
                hug();
    
            }
        }
    }
    

    3.5 ReadWriteLock

    readLock-共享锁:当是读锁时,其他读线程可以读

    writeLock-互斥锁,排他锁:当是写锁时,其他的写和读都不能进行

    读必须加锁,如果读不加锁的话,那么在读期间就可能进行写,那么读就会发生"脏读"

    public class T10_TestReadWriteLock {
        static Lock lock = new ReentrantLock();
        private static int value;
    
        static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        static Lock readLock = readWriteLock.readLock();
        static Lock writeLock = readWriteLock.writeLock();
    
        public static void read(Lock lock) {
            try {
                lock.lock();
                Thread.sleep(1000);
                System.out.println("read over!");
                //模拟读取操作
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public static void write(Lock lock, int v) {
            try {
                lock.lock();
                Thread.sleep(1000);
                value = v;
                System.out.println("write over!");
                //模拟写操作
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
    
    
    
    
        public static void main(String[] args) {
            //Runnable readR = ()-> read(lock);
            Runnable readR = ()-> read(readLock);
    
            //Runnable writeR = ()->write(lock, new Random().nextInt());
            Runnable writeR = ()->write(writeLock, new Random().nextInt());
    
            for(int i=0; i<18; i++) new Thread(readR).start();
            for(int i=0; i<2; i++) new Thread(writeR).start();
    
    
        }
    

    3.6 Semaphore

    信号量,限流用

    public class T11_TestSemaphore {
        public static void main(String[] args) {
            //Semaphore s = new Semaphore(2);
            Semaphore s = new Semaphore(2, true);
            //允许一个线程同时执行
            //Semaphore s = new Semaphore(1);
    
            new Thread(()->{
                try {
                    s.acquire();
    
                    System.out.println("T1 running...");
                    Thread.sleep(200);
                    System.out.println("T1 running...");
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    s.release();
                }
            }).start();
    
            new Thread(()->{
                try {
                    s.acquire();
    
                    System.out.println("T2 running...");
                    Thread.sleep(200);
                    System.out.println("T2 running...");
    
                    s.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
    

    3.7 Exchanger

    交换器,线程间交换值

    public class T12_TestExchanger {
    
        static Exchanger<String> exchanger = new Exchanger<>();
    
        public static void main(String[] args) {
            new Thread(()->{
                String s = "T1";
                try {
                    s = exchanger.exchange(s);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " " + s);
    
            }, "t1").start();
    
    
            new Thread(()->{
                String s = "T2";
                try {
                    s = exchanger.exchange(s);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " " + s);
    
            }, "t2").start();
    
    
        }
    }
    

    3.8 LockSupport

    //停止当前线程
    LockSupport.park()
    //放行线程
    LockSupport.unpark(线程对象) 
    

    4. 多线程使用

    问题1:
    实现一个容器,提供两个方法,add,size
    写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束

    //有问题的解法
    public class T03_NotifyHoldingLock { //wait notify
    
        //添加volatile,使t2能够得到通知
      //-----volatile一般别修饰引用类型,因为修改引用里面的值,并不可见------
        volatile List lists = new ArrayList();
    
        public void add(Object o) {
            lists.add(o);
        }
    
        public int size() {
            return lists.size();
        }
        
        public static void main(String[] args) {
            T03_NotifyHoldingLock c = new T03_NotifyHoldingLock();
            
            final Object lock = new Object();
            
            new Thread(() -> {
                synchronized(lock) {
                    System.out.println("t2启动");
                    if(c.size() != 5) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("t2 结束");
                }
                
            }, "t2").start();
            
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
    
            new Thread(() -> {
                System.out.println("t1启动");
                synchronized(lock) {
                    for(int i=0; i<10; i++) {
                        c.add(new Object());
                        System.out.println("add " + i);
                        
                        if(c.size() == 5) {
                            lock.notify();
                        }
                        
                        try {
                            TimeUnit.SECONDS.sleep(1);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }, "t1").start();
            
            
        }
    }
    /*t2启动
    t1启动
    add 0
    add 1
    add 2
    add 3
    add 4
    add 5
    add 6
    add 7
    add 8
    add 9
    t2 结束
    */
    

    上面代码不行,因为notify()并不会释放所,而lock.wait()后必须重新获取锁才能继续执行

    /**
     * 问题1: wait()& notify()
     * 实现一个容器,提供两个方法,add,size
     * 写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束
     */
    public class Question1 {
    
        private List list = new ArrayList();
    
        public void add(Object o){
            list.add(o);
        }
    
        public int size(){
            return list.size();
        }
    
        public static void main(String[] args) {
            Question1 question1= new Question1();
    
            final Object lock =new Object();
    
            Thread thread1 =new Thread(new Runnable() {
                @Override
                public void run() {
                    synchronized (lock){
                        for (int i = 0; i < 10; i++) {
                            question1.add(i);
                            System.out.println("线程1加入"+i);
                            if(question1.size()==5){
                                lock.notify();
                                try {
                                    lock.wait();
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }
                        }
                    }
                }
            });
    
    
    
            Thread thread2 =new Thread(new Runnable() {
                @Override
                public void run() {
                    synchronized (lock){
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("线程2结束");
                        lock.notify();
                    }
                }
            });
    
            thread2.start();
            thread1.start();
    
        }
    
    }
    
    /**
     * 问题1: CountDownLatch
     * 实现一个容器,提供两个方法,add,size
     * 写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束
     */
    public class Question1_2 {
    
        private List list = new ArrayList();
    
        public void add(Object o){
            list.add(o);
        }
    
        public int size(){
            return list.size();
        }
    
        public static void main(String[] args) {
            Question1_2 question1_2= new Question1_2();
            CountDownLatch lock1 = new CountDownLatch(1);
            CountDownLatch lock2 = new CountDownLatch(1);
    
            Thread thread1 =new Thread(new Runnable() {
                @Override
                public void run() {
                        for (int i = 0; i < 10; i++) {
                            question1_2.add(i);
                            System.out.println("线程1加入"+i);
                            if(question1_2.size()==5){
                                lock1.countDown();
                                try {
                                    lock2.await();
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }
                        }
                    }
            });
    
    
    
            Thread thread2 =new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        lock1.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("线程2结束");
                    lock2.countDown();
                }
            });
    
            thread2.start();
            thread1.start();
    
        }
    
    
    
    }
    
    /**
     * 问题1: LockSupport
     * 实现一个容器,提供两个方法,add,size
     * 写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束
     */
    public class Question1_3 {
    
        private List list = new ArrayList();
    
        public void add(Object o){
            list.add(o);
        }
    
        public int size(){
            return list.size();
        }
    
        static Thread thread1,thread2 = null;
    
    
        public static void main(String[] args) {
            Question1_3 question1_3= new Question1_3();
    
            thread1 =new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < 10; i++) {
                        question1_3.add(i);
                        System.out.println("线程1加入"+i);
                        if(question1_3.size()==5){
                            LockSupport.unpark(thread2);
                            LockSupport.park();
                        }
                    }
                }
            });
    
    
    
            thread2 =new Thread(new Runnable() {
                @Override
                public void run() {
                    LockSupport.park();
                    System.out.println("线程2结束");
                    LockSupport.unpark(thread1);
                }
            });
    
            thread2.start();
            thread1.start();
    
    
    
        }
    }
    

    问题2:
    写一个固定容量同步容器,拥有put和get方法,以及getCount方法
    能够支持2个生产者线程以及10个消费者线程的阻塞调用

    /**
     * 问题2: synchronized
     * 写一个固定容量同步容器,拥有put和get方法,以及getCount方法
     * 能够支持2个生产者线程以及10个消费者线程的阻塞调用
     * 1. 看put方法。因为,当用if时,生产线程A进入判断条件,调用this.wait()让出,自己等待
     * 2. 消费线程C取走一个数据,数量降为9,叫醒线程,生产线程B向里面加入值,达到10
     * 3. 线程A继续向下走,数据已经超过规定的10个,出问题。
     */
    public class Question2<T> {
        private LinkedList<T> list = new LinkedList();
    
        private int max= 10;
    
        public Question2(){}
    
        public Question2(int capcity){
            max = capcity;
        }
    
    
        public synchronized void put(T t) throws InterruptedException {
            while(list.size()==max){
                System.out.println("生产者:"+Thread.currentThread().getName()+"阻塞,让出锁");
                this.wait();
            }
            list.add(t);
            System.out.println("生产者:"+Thread.currentThread().getName()+"向容器加入值"+t);
            this.notifyAll();
        }
    
        public synchronized T get() throws InterruptedException {
            while(list.size()==0){
                System.out.println("消费者:"+Thread.currentThread().getName()+"阻塞,让出锁");
                this.wait();
            }
            T t = list.removeFirst();
            System.out.println("消费者:"+Thread.currentThread().getName()+"从容器获得值"+t);
            this.notifyAll();
            return t;
        }
    
        public static void main(String[] args) {
            Question2<String> c = new Question2<>();
            //启动消费者线程
            for (int i = 0; i < 10; i++) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
    
                        try {
                            while (true){
                                c.get();
                                TimeUnit.SECONDS.sleep(1);
                            }
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
    
                    }
                }).start();
            }
    
            //启动生产者线程
            for (int i = 0; i < 2; i++) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for (int j = 0; j < 25; j++) {
                            try {
                                c.put(String.valueOf(j));
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }).start();
            }
        }
    
    
    }
    
    /**
     * 问题2: ReentrantLock&Condition
     * 写一个固定容量同步容器,拥有put和get方法,以及getCount方法
     * 能够支持2个生产者线程以及10个消费者线程的阻塞调用
     *
     * 用synchronized,在生产者满数据的情况下,也会叫醒另外的一个生产者。存在极端情况,CPU一直被两个生产者占用,导致消费者消费不了
     * 用ReentrantLock&Condition,那么生产者满数据,叫醒的只是消费者,消费者没数据时,只叫醒生产者
     * Condition的本质是不同的等待队列。这个例子有两个等待队列,一个生产者等待队列,另一个是消费者等待队列
     */
    public class Question2_1<T> {
    
        private LinkedList<T> list = new LinkedList();
    
        private Lock lock = new ReentrantLock();
    
        private Condition producer = lock.newCondition();
    
        private Condition consumer = lock.newCondition();
    
        private int max= 10;
    
        public Question2_1(){}
    
        public Question2_1(int capcity){
            max = capcity;
        }
    
        public void put(T t) throws InterruptedException {
            try{
                lock.lock();
                while(list.size()==max){
                    System.out.println("生产者:"+Thread.currentThread().getName()+"阻塞,让出锁");
                    producer.await();
                }
                list.add(t);
                System.out.println("生产者:"+Thread.currentThread().getName()+"向容器加入值"+t);
                consumer.signalAll();
            }finally {
                lock.unlock();
            }
    
        }
    
        public T get() throws InterruptedException {
            try {
                lock.lock();
                while(list.size()==0){
                    System.out.println("消费者:"+Thread.currentThread().getName()+"阻塞,让出锁");
                    consumer.await();
                }
                T t = list.removeFirst();
                System.out.println("消费者:"+Thread.currentThread().getName()+"从容器获得值"+t);
                producer.signalAll();
                return t;
            }finally {
                lock.unlock();
            }
        }
    
        public static void main(String[] args) {
            Question2_1<String> c = new Question2_1<>();
            //启动消费者线程
            for (int i = 0; i < 10; i++) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
    
                        try {
                            while (true){
                                c.get();
                                TimeUnit.SECONDS.sleep(1);
                            }
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
    
                    }
                }).start();
            }
    
            //启动生产者线程
            for (int i = 0; i < 2; i++) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for (int j = 0; j < 25; j++) {
                            try {
                                c.put(String.valueOf(j));
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }).start();
            }
        }
    
    }
    

    5. 线程池最多能开线程数

    2 ^ 29 -1

    6. 多线程之间的通讯

    1. 共享内存

      运行时数区

    2. 消息传递

      通过queue,wait notify等等

    7. as-if-serial&happens-before

    1. as-if-serial语义保证单线程内程序的执行结果不被改变,happens-before关系保证正确同步的多线程程序的执行结果不被改变。

    2. as-if-serial语义给编写单线程程序的程序员创造了一个幻境:单线程程序是按程序的顺序来执行的。happens-before关系给编写正确同步的多线程程序的程序员创造了一个幻境:正确同步的多线程程序是按happens-before指定的顺序来执行的。

    3. as-if-serial语义和happens-before这么做的目的,都是为了在不改变程序执行结果的前提下,尽可能地提高程序执行的并行度

    相关文章

      网友评论

          本文标题:多线程-基础

          本文链接:https://www.haomeiwen.com/subject/akcziktx.html