美文网首页
Java 多线程——线程安全、线程池

Java 多线程——线程安全、线程池

作者: BitterOutsider | 来源:发表于2020-11-25 21:55 被阅读0次

线程不安全的表现

  1. 操作共享数据,导致数据错误(非原子操作)
  • i++ (Java 多线程原理内有详细代码例子)
  • if-then-do 从以下结果的代码来看,相同的数字被加入map了不止一次。
class Main {
    private static final Map<Integer, Integer> map = new HashMap<>();

    public static void main(String[] args) {
        for (int i = 0; i < 1000; i++) {
            new Thread(Main::putARandomNumberIntoMap).start();
        }
    }

    public static void putARandomNumberIntoMap() {
        int n = new Random().nextInt(10);
        if (!map.containsKey(n)) {
            map.put(n, n);
            System.out.println("put " + n);
        }
    }
}
put 8 put 1 put 9 put 0 put 5 put 0 put 6 put 6 put 2 put 4 put 5 put 4 put 0 put 3 put 7 put 6 put 1 put 5
  1. 死锁
  • 重现死锁
    以下代码是一个死锁的例子。首先先对synchronized关键词作出解释:synchronized能够保证同一时刻最多只有一个线程执行该段代码。它就像一把锁,只有获得了某项资源,该线程才能执行synchronized所包的代码块。
    在代码中,Thread1获得了lock1后,睡了500毫秒,然后申请获得lock2。而Thread1获得lock2后,只睡了100毫秒,申请lock1。于是Thread1在等待被Thread2占用的lock2,同样的Thread2在等待被Thread1占用的lock1,无限等待,造成了死锁。
class Main {
    private static final Object lock1 = new Object();
    private static final Object lock2 = new Object();

    public static void main(String[] args) {
        new Thread1().start();
        new Thread2().start();
    }

    static class Thread1 extends Thread {
        @Override
        public void run() {
            synchronized (lock1) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (lock2) {
                }
            }
        }
    }

    static class Thread2 extends Thread {
        @Override
        public void run() {
            synchronized (lock2) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (lock1) {
                }
            }
        }
    }
}
  • 死锁问题的排查
    使用jps命令查看 java 所有进程,可以得到相应的进程编号。
    使用jstack 编号查看进程状态。

线程安全

实现线程安全的基本手段

  1. synchronized 同步块
  • synchronized(⼀个对象),把这个对象当成锁。
class Main {
    private static final Object lock = new Object();
    static int i = 0;

    public static void main(String[] args) {
        for (int j = 0; j < 1000; j++) {
            new Thread(Main::addOne).start();
        }
    }

    static void addOne() {
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        synchronized (lock) {
            i++;
            System.out.println(i);
        }
    }
}
  • Static synchronized⽅法,把Class对象当成锁。 如下例子中的实例main。这里简单介绍下什么是Class。Class则用于存放每个类的类型信息,可以简单的理解为类的说明书。
static synchronized void addOne() {
    try {
      Thread.sleep(1);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    i++;
    System.out.println(i);
}
  • 实例的synchronnized⽅法把该实例当成锁。
class Main {
    private int i = 0;

    public static void main(String[] args) {
        final Main main = new Main();
        for (int j = 0; j < 1000; j++) {
            new Thread(main::addOne).start();
        }
    }

    synchronized void addOne() {
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        i++;
        System.out.println(i);
    }
}
  1. Collections.synchronized
  2. JUC包
  • ConcurrentHashMap
    任何使⽤HashMap有线程安全问题的地⽅,都⽆脑地使⽤ConcurrentHashMap替换即可。
  • AtomicInteger
    对于Integer的操作将变成原子的。同样的还有AtomicBooleanAtomicLong
class Main {
    private final AtomicInteger i = new AtomicInteger(0);

    public static void main(String[] args) {
        final Main main = new Main();
        for (int j = 0; j < 1000; j++) {
            new Thread(main::addOne).start();
        }
    }

   public void addOne() {
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(i.addAndGet(1));
    }
}
  • ReentrantLock
    首先解释什么是可重入锁。aaa拿到锁后执行bbb,发现需要同一把锁。可重入锁就是同一把锁你可以重新进入一次。
private synchronized void aaa {
    bbb()
}
private synchronized void bbb { }

下面是ReentrantLock官方示例:

class X {
    private final ReentrantLock lock = new ReentrantLock();

    public void m() {
      lock.lock();  // block until condition holds
      try {
        // ... method body
      } finally {
        lock.unlock()
      }
    }
}

Object类里的线程方法

为什么Java中的所有对象都可以成为锁?Java从⼀开始就把线程作为语⾔特性,提供语⾔级的⽀持。Object.wait()/notify()/notifyAll()⽅法,实现了我们自由地调度线程(synchronized称为同步,这三个则实现协同)。总的来说:wait()使线程停止运行,notify()使停止运行的线程继续运行。再举一个直观的例子:

• 线程:⼀个⼯⼈
• 代码:⼀份说明书
• 同步对象:⼀个印章
• wait():拿到印章,把⾃⼰的名字加⼊等待队列,然后把印章
放回去
• notify():拿到印章,挑⼀个等待队列的⼈通知⼀下
• notifyAll():拿到印章,把等待队列的⼈全通知⼀下

  • Object.wait()
    作用是使当前执行代码的线程进行等待,并且wait()所在的代码处停止执行,直到被notify换新。在调用wait()之前,线程必须获得该对象的锁,因此只能在同步方法/同步代码块中调用wait()方法。即:
private void aaa() {
    sysynchronized (lock) {
        lock.wait()
    }
}
  • Object.notify()
    作用是,如果有多个线程等待,那么JVM随机挑选出一个wait的线程,对其发出通知notify(),并等待notify()方法的线程释放锁。notify()也要在同步块中调用。
  • Object.notifyAll()
    作用是唤醒所有线程,让所有线程竞争锁。

三种方法实现多线程的经典问题——⽣产者/消费者模型

具体需求是这样的:开启两个线程,生产者生产一个数字,消费者消费这个数字。生产者必须等待消费者消费完这一个数字才能生产,消费者必须等待生产者生产出一个数字才能消费。也就是说放数字的容器内最多只有一个待消费的数字。

  1. wait/notify/notifyAll
    可以参考Object.wait文档里的写法
synchronized (obj) {
   while (condition does not hold)
       obj.wait();
         ... // Perform action appropriate to condition
}

容器:我们选择Optional,Optional 类是一个可以为null的容器对象。如果值存在则isPresent()方法会返回true,调用get()方法会返回该对象。
生产者:如果容器内还有值,则等待。反之,则生产一个数字,加入容器,并唤醒其他线程。
消费者:如果容器为空,则等待。反之,消费一个数字,并唤醒其他线程。

class Main {
    public static void main(String[] args) {
        final Container container = new Container();
        final Object lock = new Object();
        new Producer(container, lock).start();
        new Consumer(container, lock).start();
    }

    static class Container {
        private Optional<Integer> value = Optional.empty();

        public Optional<Integer> getValue() {
            return value;
        }

        public void setValue(Optional<Integer> value) {
            this.value = value;
        }
    }

    static class Producer extends Thread {
        private final Container container;
        private final Object lock;

        Producer(Container container, Object lock) {
            this.container = container;
            this.lock = lock;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                synchronized (lock) {
                   while (container.getValue().isPresent()) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    int randomNumber = new Random().nextInt();
                    System.out.println("producing:" + randomNumber);
                    container.setValue(Optional.of(randomNumber));
                    lock.notify();
                }
            }
        }
    }

    static class Consumer extends Thread {
        private final Container container;
        private final Object lock;

        public Consumer(Container container, Object lock) {
            this.container = container;
            this.lock = lock;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                synchronized (lock) {
                    while (!container.value.isPresent()) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("consuming:" + container.getValue().get());
                    container.setValue(Optional.empty());
                    lock.notify();
                }
            }
        }
    }
}
  1. Lock/Condition
    Condition 在管程模型中代表的就是等待的条件。ConditionLock的基础上使用,在原来Lock的基础上实现了可以基于多种条件来让线程实现同步的效果增加了多个条件后,我们可以更有针对性,也更灵活的协调多种条件下的线程协调。同样的,我们参考Condition文档中给的例子(由于篇幅原因,这里就不给出了),来完成我们的代码。
class Main {
    public static void main(String[] args) {
        final ReentrantLock lock = new ReentrantLock();
        final Container container = new Container(lock);
        new Producer(container, lock).start();
        new Consumer(container, lock).start();
    }

    static class Container {
        private final Condition notConsumeYet;
        private final Condition notProduceYet;
        private Optional<Integer> value = Optional.empty();

        public Container(ReentrantLock lock) {
            this.notConsumeYet = lock.newCondition();
            this.notProduceYet = lock.newCondition();
        }

        public Condition getNotConsumeYet() {
            return notConsumeYet;
        }

        public Condition getNotProduceYet() {
            return notProduceYet;
        }

        public Optional<Integer> getValue() {
            return value;
        }

        public void setValue(Optional<Integer> value) {
            this.value = value;
        }
    }

    static class Producer extends Thread {
        private final Container container;
        private final ReentrantLock lock;

        Producer(Container container, ReentrantLock lock) {
            this.container = container;
            this.lock = lock;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                lock.lock();
                try {
                    while (container.getValue().isPresent()) {
                        try {
                            container.getNotConsumeYet().await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    int randomNumber = new Random().nextInt();
                    System.out.println("producing:" + randomNumber);
                    container.setValue(Optional.of(randomNumber));
                    container.getNotProduceYet().signal();
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    static class Consumer extends Thread {
        private final Container container;
        private final ReentrantLock lock;

        public Consumer(Container container, ReentrantLock lock) {
            this.container = container;
            this.lock = lock;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                lock.lock();
                try {
                    while (!container.value.isPresent()) {
                        try {
                            container.getNotProduceYet().await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("consuming:" + container.getValue().get());
                    container.setValue(Optional.empty());
                    container.getNotConsumeYet().signal();
                } finally {
                    lock.unlock();
                }
            }
        }
    }
}
  1. BlockingQueue
    java.util.concurrent.BlockingQueue接口(Queue的子接口),它的主要作用是作为线程同步的工具。下面代码中加入signal的原因是为了防止连续输出两个producing xxx。如果没有控制信号,则会立刻开始下一次循环,再打印一次producing xxx
class Main {
    public static void main(String[] args) {
        LinkedBlockingDeque<Integer> blockingQueue = new LinkedBlockingDeque<>(1);
        LinkedBlockingDeque<Integer> signal = new LinkedBlockingDeque<>(1);
        new Producer(blockingQueue, signal).start();
        new Consumer(blockingQueue, signal).start();
    }

    static class Producer extends Thread {
        private final BlockingQueue<Integer> queue;
        private final BlockingQueue<Integer> signal;

        public Producer(BlockingQueue<Integer> queue, BlockingQueue<Integer> signal) {
            this.queue = queue;
            this.signal = signal;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    final int number = new Random().nextInt();
                    System.out.println("producing:" + number);
                    queue.put(number);
                    signal.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class Consumer extends Thread {
        private final BlockingQueue<Integer> queue;
        private final BlockingQueue<Integer> signal;

        public Consumer(BlockingQueue<Integer> queue, BlockingQueue<Integer> signal) {
            this.queue = queue;
            this.signal = signal;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    System.out.println("consuming:" + queue.take());
                    signal.put(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

线程池

线程池就是预先定义好的若⼲个线程。线程是昂贵的(Java线程模型的缺陷),线程池的出现实现了线程的重用。java中的线程池是由Executors创建的,Executors是一个工厂、工具方法。


我们用Executors.newFixedThreadPool(int)创建了一个线程池,以下是线程池提供的一些方法。注意到submit方法的参数有RunnableCallable<T>两种。Runnable最大的问题就是它没有返回值,那么我们就必须操作共享变量,这是一个很烦人的事情。不同的是,Callable可以有<T>类型的返回值,也可以抛出异常。 值得注意的是,这两种submit方法返回的都是Future类型的值。Future表示的是异步返回的结果,根据他的名字,我们也可以理解为“未来返回的结果”。

以下是一个简单的线程池使用示例和运行结果:
class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        final ExecutorService threadPool = Executors.newFixedThreadPool(10);

        final Future<Integer> future1 = threadPool.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return 1;
            }
        });

        final Future<String> future2 = threadPool.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "Ben";
            }
        });

        final Future<Object> future3 = threadPool.submit(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                throw new RuntimeException();
            }
        });

        System.out.println(future1.get());
        System.out.println(future2.get());
        System.out.println(future3.get());
    }
}

当然我们也可以写成Lamda表达式:

final Future<Integer> future1 = threadPool.submit(() -> 1);
final Future<String> future2 = threadPool.submit(() -> "Ben");
final Future<Object> future3 = threadPool.submit(() -> { throw new RuntimeException(); });

练习

特别地,下面是我练习多线程的两个程序:

  1. 多线程练习程序——统计文件中各单词数量
  2. 多线程实战——200行代码实现多线程网络聊天室

相关文章

  • JAVA 多线程与锁

    JAVA 多线程与锁 线程与线程池 线程安全可能出现的场景 共享变量资源多线程间的操作。 依赖时序的操作。 不同数...

  • JAVA开发-常见面试题

    一、java多线程 线程池的原理,为什么要创建线程池? 线程的生命周期,什么时候会出现僵死进程? 什么是线程安全,...

  • Java:线程池Executors.newFixedThread

    摘要:Java,多线程,线程池 多线程编程和线程池概述 (1)多线程程序: 计算机可以实现多任务 ( multit...

  • 10.3多线程详解

    Java高级-多线程 多线程创建 多线程通讯 线程池 1.多线程创建 thread/runnable图:继承Thr...

  • 后端架构师技术图谱(三)-并发、锁、设计模式(二)

    并发 多线程 《40个Java多线程问题总结》 线程安全 《Java并发编程——线程安全及解决机制简介》 一致性、...

  • 线程

    Java 并发编程:线程池的使用 Java 并发编程:线程池的使用java 多线程核心技术梳理 (附源码) 本文对...

  • 多线程juc锁

    java_basic 1 线程安全 在Java多线程编程当中,实现线程安全: 内部锁(Synchronized...

  • Java源码-线程池

    一、线程池实现原理 Java支持多线程,多线程可以提高任务的执行效率。但是Java里面的线程跟操作系统的线程是一一...

  • ConcurrentHashMap

    总结 HashMap在多线程中不安全,java提供了线程安全的ConcurrentHashMap 类,保证在多线程...

  • Thread

    队列 线程锁 多线程,线程池 队列 多线程爬虫示例 多线程 自定义线程 线程池

网友评论

      本文标题:Java 多线程——线程安全、线程池

      本文链接:https://www.haomeiwen.com/subject/jnmniktx.html