美文网首页Java 之旅
中级09 - Java多线程初步

中级09 - Java多线程初步

作者: 晓风残月1994 | 来源:发表于2019-11-25 00:17 被阅读0次

    中级09 - Java多线程初步

    介绍多线程带来的问题,以及基本解决方案。

    • 竞争条件带来的数据错误问题
    • 死锁的原理、排查与防范
    • 线程安全:同步方法、并发工具包

    一 、线程不安全的表现 竞争条件带来的数据错误

    • i++
    • if-then-do

    如果最终结果不正确,那么程序跑地再快也没有意义。

    二、死锁的产生、排查和防范

    1. 产生

    Java 中某个锁只能同时被一个线程持有,当两个线程互相在等待对方持有的锁时就形成了死锁。

    为了便于理解,想象一手交钱一手交货的问题:

    A 持有现金,B 持有粉,但为了掩人耳目,只能通过一个狭小的窗口进行交易,无法同时一手交钱一手交货,,俩人都持有对方需要的资源,并且也都在等待对方手上的资源,但谁也不愿先给出自己的资源。
    此时便形成了死锁。

    类似的还有 哲学家就餐问题

    public class DeadlockExample {
    
        private static final Object lock1 = new Object();
        private static final Object lock2 = new Object();
    
        public static void main(String[] args) {
            new Thread1().start();
            new Thread2().start();
        }
    
        static class Thread1 extends Thread {
            @Override
            public void run() {
                synchronized (lock1) {
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                    synchronized (lock2) {
                        System.out.println("看不到我");
                    }
                }
            }
        }
    
    
        static class Thread2 extends Thread {
            @Override
            public void run() {
                synchronized (lock2) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                    synchronized (lock1) {
                        System.out.println("还是看不到我");
                    }
                }
            }
        }
    
    }
    

    2. 排查

    使用jps列出当前 Java 进程 ID:

    jps
    
    17876 Jps
    17928 RemoteMavenServer36
    9224 Launcher
    19260
    9164 DeadlockExample
    

    jstack 打印给定 Java 进程中的所有栈信息进行排查:

    jstack 9164
    
    2019-09-29 17:01:49
    # ...
    # 省略
    # ...
    Found one Java-level deadlock:
    =============================
    "Thread-1":
      waiting to lock monitor 0x0000000002d8b648 (object 0x00000000d61a4cb0, a java.lang.Object),
      which is held by "Thread-0"
    "Thread-0":
      waiting to lock monitor 0x0000000002d8a1a8 (object 0x00000000d61a4cc0, a java.lang.Object),
      which is held by "Thread-1"
    
    Java stack information for the threads listed above:
    ===================================================
    "Thread-1":
            at com.github.hcsp.calculation.DeadlockExample$Thread2.run(DeadlockExample.java:42)
            - waiting to lock <0x00000000d61a4cb0> (a java.lang.Object)
            - locked <0x00000000d61a4cc0> (a java.lang.Object)
    "Thread-0":
            at com.github.hcsp.calculation.DeadlockExample$Thread1.run(DeadlockExample.java:24)
            - waiting to lock <0x00000000d61a4cc0> (a java.lang.Object)
            - locked <0x00000000d61a4cb0> (a java.lang.Object)
    
    Found 1 deadlock.
    

    3. 防范

    所有的线程都按照相同的顺序获得资源的锁。

    三、实现线程安全的基本手段

    1. 不可变类 Integer/String 等

    2. 同步方法

    2.1 synchronized

    • synchronized (一个对象) 把这个对象当成锁
    • static synchronized方法 把 Class 对象当成锁
    • 实例的 synchronized方法 把该实例当成锁(等价于 synchronized (this) {})

    还是之前多个线程同时修改共享变量时的问题,现在采用 synchronizedd 同步块,使得同一时刻只能有一个线程持有锁,去执行同步块中的代码:

    public class Test {
        private static int i = 0;
        private static final Object lock = new Object(); // 定义一个锁
    
        public static void main(String[] args) {
            for (int j = 0; j < 1000; j++) {
                new Thread(Test::modifySharedVariable).start();
            }
        }
    
        private static void modifySharedVariable() {
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            synchronized (lock) { // 使用同步块
                i++;
                System.out.println("i = " + i);
            }
        }
    }
    

    除了使用 synchronized 同步块,还可以使用 synchronized 关键字:

    private synchronized static void modifySharedVariable() {
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            i++;
            System.out.println("i = " + i);
        }
    

    2.2 Collections.synchronized*

    如 Collections.synchronizedMap 等,基本只是把原来的 map 相关方法使用 synchronized 同步块包装了一遍。
    但是注意,使用 Collections.synchronized* 时,如果存在非集合本身的,未经 synchronized 的操作,那么并不能保证线程安全,还是需要 synchronized 同步块。

    3. 并发工具包

    3.1 ConcurrentHashMap

    下图中的 Collection 和 Map 都是线程不安全的:
    [图片上传失败...(image-96a1e3-1575509314567)]

    多个线程并发访问 map 是不安全的:

    import java.util.HashMap;
    import java.util.Map;
    import java.util.Random;
    
    public class Main {
        private static final Map<Integer, Integer> map = new HashMap<>();
    
        public static void main(String[] args) {
            for (int j = 0; j < 1000; j++) {
                new Thread(Main::concurrentlyAccess).start();
            }
        }
    
        private static void concurrentlyAccess() {
            Integer r = new Random().nextInt();
            map.put(r, r);
            for (Integer i : map.keySet()) {
                System.out.println(i);
            }
        }
    }
    

    报错如下:

    java.util.ConcurrentModificationException
        at java.util.HashMap$HashIterator.nextNode(HashMap.java:1445)
        at java.util.HashMap$KeyIterator.next(HashMap.java:1469)
        at com.github.hcsp.calculation.Main.concurrentlyAccess(Main.java:25)
        at java.lang.Thread.run(Thread.java:748)
    

    解决办法除了使用刚才提到的 synchronized,还可以无脑地使用 JUC 并发工具包提供的线程安全的 ConcurrentHashMap 替换原本涉及到线程安全问题时的 HashMap。

    3.2 Atomic*

    可以创建具有原子性操作的数据,如 AtomicInteger。

    3.3 ReentrantLock 可重入锁

    ReentrantLock 通过一个双向链表来实现锁机制,每个节点代表一条线程,head 节点拥有
    锁,其他节点均被挂起等待唤醒。等到当前锁释放成功后,会唤醒链表中的下一个节点,并将其更新为新的 head 节点。

    可重入锁是指:当线程请求一个由其它线程持有的锁时,该线程会阻塞,而当线程请求由自己持有的锁时,如果该锁是可重入锁,请求就会成功,否则阻塞。

    即当一个线程获取了某个对象锁后,还可以再次获得该对象锁。

    ReentrantLock 作为一个可重入锁,内部是通过 state 作为计数器来统计总共执行了多少次 lock 方法,如果同一个锁 lock 两次,state 为 2,那么只调用一次 unlock,state 为 1,锁尚未释放,再 unlock 一次,state 为 0,锁成功释放。

    ReentrantLock 和 synchronized 方法/声明具有相似的行为和语义,但是更灵活强大:

    import java.util.concurrent.locks.ReentrantLock;
    
    public class ReentrantLockTest {
    
        private static int i = 0;
        private static final ReentrantLock lock = new ReentrantLock();
    
        public static void main(String[] args) {
            for (int i = 0; i < 1000; i++) {
                new Thread(ReentrantLockTest::concurrentlyAccess).start();
            }
        }
    
        private static void concurrentlyAccess() {
            lock.lock();  // block until condition holds
            // 这里可以掩饰 lock.lock();  // 
            try {
                i = i + 1;
                System.out.println("i = " + i);
            } finally {
                lock.unlock();
            }
        }
    
    }
    

    可重入锁的作用就是为了避免死锁,因为 Java 中某个锁只能同时被一个线程持有,如下例所示,如果 synchronized 锁不可重入,不可再次获得,将会造成死锁,该线程一直等待进入方法 b 需要的这把对象锁,而该锁明明又被自己占用着。

    所以 synchronized 作为可重入锁,进入 a 方法时持有当前对象锁,紧接着进入 b 方法时又要请求相同的锁,此时可重复持有当前对象锁:

    private synchronized void a() {
        b();
    }
    private synchronized void b() {
    }
    

    四、线程的状态与Object类中的线程方法

    1. 线程的状态

    Java中线程的状态分为 6 种:

    1. 初始(NEW):新创建了一个线程对象,但还没有调用start()方法。
    2. 运行(RUNNABLE):Java线程中将就绪(ready)和运行中(running)两种状态笼统的称为“运行”。

    线程对象创建后,其他线程(比如main线程)调用了该对象的 start() 方法。该状态的线程位于可运行线程池中,等待被线程调度选中,获取 CPU 的使用权,此时处于就绪状态(ready)。就绪状态的线程在获得 CPU 时间片后变为运行中状态(running)。

    1. 阻塞(BLOCKED):线程因为需要等待一个锁时被阻塞的状态,拿不到锁,锁在别的线程手里。

    2. 等待(WAITING):前提是线程已经拥有锁了,然后进入该状态,等待其他线程做出一些特定动作(通知或中断)

    3. 超时等待(TIMED_WAITING):该状态不同于 WAITING,它可以在指定的时间后自行返回。

    4. 终止(TERMINATED):表示该线程已经执行完毕。

    2. Obect 类中的线程方法

    为什么 Java 中所有的对象都可以作为锁?因为所有的对象都继承自 Object 类,而 Object 类提供了线程相关的方法。

    2.1 wait

    只有持有某个对象锁的线程(假设是 A)才能调用该对象的 wait() 方法,调用后 A 会释放锁并进入waiting 状态,直到其他某个线程(B)抢到了对象锁,调用了该对象锁的 notify()/notifyAll() 方法后,被 wait 的 A 线程才能重新获得锁的所有权并继续执行代码。

    2.2 notify

    B 线程调用 notify() 唤醒一个正在等待该锁的线程 A,若有多个线程都在等待,那么也只会根据 JVM 内部机制唤醒其中一个。

    2.3 notifyAll

    notifyAll() 唤醒所有正在等待该对象锁的线程。

    notify()/notifyAll() 方法所在的 synchronized 方法或块结束后,线程 B 释放该对象锁,然后这些被唤醒的线程会和其他线程一起按照常规自由竞争该锁。

    五、生产者和消费者模型(多线程经典问题)

    每当条件满足时,一个线程被唤醒,做完事情后,让自己继续沉睡的同时顺便唤醒别的线程。

    请实现一个生产者/消费者模型,其中:
    生产者生产10个随机的整数供消费者使用(随机数可以通过new Random().nextInt()获得)
    使得标准输出依次输出它们,例如:

    Producing 42
    Consuming 42
    Producing -1
    Consuming -1
    ...
    Producing 10086
    Consuming 10086
    Producing -12345678
    Consuming -12345678
    

    下面演示三种实现方式:

    • wait/notify/notifyAll
    • Lock/Condition
    • BlockingQueue

    1. wait/notify/notifyAll

    使用最基本的 Object 类中的线程方法和 synchronized。
    Boss,整体调度:

    package com.github.hcsp.multithread;
    
    public class Boss {
        public static void main(String[] args) throws InterruptedException {
            Object lock = new Object();
            Container container = new Container();
    
            Producer producer = new Producer(container, lock);
            Consumer consumer = new Consumer(container, lock);
    
            producer.start();
            consumer.start();
    
            producer.join();
            producer.join();
        }
    }
    

    Container,盛放制造品的容器:

    package com.github.hcsp.multithread;
    
    import java.util.Optional;
    
    public class Container {
        private Optional<Integer> value = Optional.empty();
    
        public Optional<Integer> getValue() {
            return value;
        }
    
        public void setValue(Optional<Integer> value) {
            this.value = value;
        }
    }
    

    Producerr,生产者:

    package com.github.hcsp.multithread;
    
    import java.util.Optional;
    import java.util.Random;
    
    public class Producer extends Thread {
        private Container container;
        private Object lock;
    
        public Producer(Container container, Object lock) {
            this.container = container;
            this.lock = lock;
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                synchronized (lock) {
                    while (container.getValue().isPresent()) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    Integer r = new Random().nextInt();
                    container.setValue(Optional.of(r));
                    System.out.println("Producing " + r);
                    lock.notify();
                }
    
            }
        }
    }
    

    Consumer,消费者:

    package com.github.hcsp.multithread;
    
    import java.util.Optional;
    
    public class Consumer extends Thread {
        private Object lock;
        private Container container;
    
        public Consumer(Container container, Object lock) {
            this.container = container;
            this.lock = lock;
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                synchronized (lock) {
                    while (!container.getValue().isPresent()) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    Integer v = container.getValue().get();
                    container.setValue(Optional.empty());
                    System.out.println("Consuming " + v);
                    lock.notify();
                }
    
            }
        }
    }
    

    2. Lock/Condition

    使用 ReentrantLock 和 Condition。

    ReentrantLock 比 synchronized 自动加解锁机制更灵活。

    Condition 是个接口,在 Java1.5 中出现,用来替代传统的 Object Monitor Methods 中 wait/notify/notifyAll() 的 是 await/signal/signalAll(),这种方式使线程间协作更加安全和高效,推荐使用。
    Condition 依赖于 Lock 接口,生成一个 Condition 的基本代码是 lock.newCondition()。
    Condition 抽离出了线程调用机制,可以和任意的锁结合在一起,使得同一个对象可以有多个等待队列(一个锁可以 newCondition() 多次)。
    使得在下面的例子中,当容器满了或者空了时,同一时间内只需要通知一条对应的等待中的线程。

    Boss2:

    package com.github.hcsp.multithread;
    
    import java.util.concurrent.locks.ReentrantLock;
    
    public class Boss2 {
        public static void main(String[] args) throws InterruptedException {
            ReentrantLock lock = new ReentrantLock();
            Container2 container = new Container2(lock);
    
            Producer2 producer = new Producer2(container, lock);
            Consumer2 consumer = new Consumer2(container, lock);
    
            consumer.start();
            producer.start();
    
            producer.join();
            producer.join();
        }
    }
    

    Container2:

    package com.github.hcsp.multithread;
    
    import java.util.Optional;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class Container2 {
    
        private Condition notConsumedYet; // 尚未被消费掉
        private Condition notProducedYet; // 尚未被生产出来
        private Optional<Integer> value = Optional.empty();
    
        public Container2(ReentrantLock lock) {
            this.notConsumedYet = lock.newCondition();
            this.notProducedYet = lock.newCondition();
        }
    
        public Condition getNotConsumedYet() {
            return notConsumedYet;
        }
    
        public Condition getNotProducedYet() {
            return notProducedYet;
        }
    
        public Optional<Integer> getValue() {
            return value;
        }
    
        public void setValue(Optional<Integer> value) {
            this.value = value;
        }
    }
    

    Producer2:

    package com.github.hcsp.multithread;
    
    import java.util.Optional;
    import java.util.Random;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class Producer2 extends Thread {
        private Container2 container;
        private ReentrantLock lock;
    
        public Producer2(Container2 container, ReentrantLock lock) {
            this.container = container;
            this.lock = lock;
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                lock.lock();
                try {
                    while (container.getValue().isPresent()) {
                        try {
                            container.getNotProducedYet().await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    Integer r = new Random().nextInt();
                    container.setValue(Optional.of(r));
                    System.out.println("Producing " + r);
                    container.getNotConsumedYet().signal();
                } finally {
                    lock.unlock();
                }
            }
        }
    }
    

    Consumer2:

    package com.github.hcsp.multithread;
    
    import java.util.Optional;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class Consumer2 extends Thread {
        private Container2 container;
        private ReentrantLock lock;
    
        public Consumer2(Container2 container, ReentrantLock lock) {
            this.container = container;
            this.lock = lock;
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                lock.lock();
                try {
                    while (!container.getValue().isPresent()) {
                        try {
                            container.getNotConsumedYet().await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    Integer v = container.getValue().get();
                    container.setValue(Optional.empty());
                    System.out.println("Consuming " + v);
                    container.getNotProducedYet().signal();
                } finally {
                    lock.unlock();
                }
            }
        }
    }
    

    另外说一下个人对以上这 2 种生产者和消费者模型实现方式的理解:

    设置条件判断是因为需要先生产后才能消费, 未消费前也不能再生产, 所以当生产或消费条件不满足时, 需要令当前线程进入 waiting。
    另一方面,是为了当前线程被中断和假唤醒时,要继续进入 waiting, 并且要用循环一直"盯"着。
    另外,在 for 循环中,synchronized 和 ReentrantLock 作为可重入锁可以重复加锁, 条件不成熟时也可重新进入锁,故也需要条件判断。

    3. BlockingQueue

    BlockingQueue 取回元素时要等待队列非空,存储元素时要等待队列非满,可以方便的实现生产者和消费者模型,创建两条 BlockingQueue,一条负责管理产品,一条负责管理调度信号,不再需要手动创建容器。

    Boss3:

    package com.github.hcsp.multithread;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    
    public class Boss3 {
        public static void main(String[] args) throws InterruptedException {
            BlockingQueue queue = new ArrayBlockingQueue(1);
            BlockingQueue signalQueue = new ArrayBlockingQueue(1);
    
            Producer3 producer = new Producer3(queue, signalQueue);
            Consumer3 consumer = new Consumer3(queue, signalQueue);
    
            producer.start();
            consumer.start();
    
            producer.join();
            producer.join();
        }
    }
    

    Producer3:

    package com.github.hcsp.multithread;
    
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    
    public class Producer3 extends Thread {
        BlockingQueue<Integer> queue;
        BlockingQueue<Integer> signalQueue;
    
        public Producer3(BlockingQueue<Integer> queue, BlockingQueue<Integer> signalQueue) {
            this.queue = queue;
            this.signalQueue = signalQueue;
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                int r = new Random().nextInt();
                System.out.println("Producing " + r);
                try {
                    queue.put(r);
                    signalQueue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    Consumer3:

    package com.github.hcsp.multithread;
    
    import java.util.concurrent.BlockingQueue;
    
    public class Consumer3 extends Thread {
        BlockingQueue<Integer> queue;
        BlockingQueue<Integer> signalQueue;
    
        public Consumer3(BlockingQueue<Integer> queue, BlockingQueue<Integer> signalQueue) {
            this.queue = queue;
            this.signalQueue = signalQueue;
        }
    
            @Override
            public void run () {
                for (int i = 0; i < 10; i++) {
                    try {
                        System.out.println("Consuming " + queue.take());
                        signalQueue.put(0);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    

    六、线程池与 Callable/Future

    1. Callable/Future

    1.1 Callable

    Callable 接口引入自 Java1.5,和 Runnable 一样,它们的实例设计用于在其他线程中执行,但不同的是, Callable 可以返回值,也能抛异常。

    1.2 Future

    Future 接口代表⼀个“未来才会返回的结果”,调用 get() 方法会等待直到拿到计算结果。

    2. 什么是线程池

    2.1 定义

    Java 的线程调度完全依赖于操作系统的线程调度,线程是昂贵的,不能无节制地开辟线程,以免将操作系统的资源耗尽,所以线程池是预先定义好的若干个线程。

    2.2 Java 中的线程池

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 简单场景下为方便起见,直接使用 Executors 工具类快速创建线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
    
        // submit 方法会立刻返回一个 Future(类似于 JS 中的 Promise)
        // 提交的任务会异步执行,不会阻塞当前线程
        Future<Integer> future1 = threadPool.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                Thread.sleep(3000);
                return 0;
            }
        });
    
        Future<String> future2 = threadPool.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(1000);
                return "ojbk";
            }
        });
    
        Future<Object> future3 = threadPool.submit(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                throw new RuntimeException();
            }
        });
    
        System.out.println(future1.get());
        System.out.println(future2.get());
        System.out.println(future3.get());
    }
    

    2.3 线程池的构造函数(有坑待填)

    3. 实战:多线程的 WordCount

    创建 WordCount 对象时传入文件列表,可统计这些文件中的单词数。

    使用方法:

    List<File> files = xxx;
    WordCount wordCount = new WordCount(10);
    Map<String, Integer> countResult = wordCount.count(files);
    System.out.println(countResult);
    

    WordCount 类的实现:

    package com.github.hcsp.multithread;
    
    import org.apache.commons.io.FileUtils;
    
    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileReader;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    public class WordCount {
    
        private final int threadNum;
        private ExecutorService threadPool;
    
        public WordCount(int threadNum) {
            threadPool = Executors.newFixedThreadPool(threadNum);
            this.threadNum = threadNum;
        }
    
        /**
         * 统计文件中各单词的数量
         *
         * @param files 文件列表
         * @return 单词统计结果
         * @throws IOException          文件读写出错时抛出
         * @throws ExecutionException   Future 取回结果出错时抛出
         * @throws InterruptedException 线程被中断时抛出
         */
        public Map<String, Integer> count(List<File> files) throws IOException, ExecutionException, InterruptedException {
            BufferedReader reader = new BufferedReader(new FileReader(mergeFilesIntoSingleFile(files)));
            List<Future<Map<String, Integer>>> futures = new ArrayList<>();
            // 开辟若干个线程,每个线程读取文件的一行内容,并将单词统计结果返回
            // 最后主线程将工作线程返回的结果汇总在一起
            for (int i = 0; i < threadNum; i++) {
                futures.add(threadPool.submit(new WorkerJob(reader)));
            }
            // 最终结果集
            Map<String, Integer> finalResult = new HashMap<>();
            // 将futures中的每个子结果集合并到终集中
            for (Future<Map<String, Integer>> future : futures) {
                Map<String, Integer> resultFromWorker = future.get();
                mergeWorkerResultIntoFinalResult(resultFromWorker, finalResult);
            }
            threadPool.shutdown();
            return finalResult;
        }
    
        /**
         * 将文件列表中的文件合并为一个文件
         *
         * @param files 文件列表
         * @return 结果文件
         * @throws IOException 文件读写出错时抛出
         */
        private File mergeFilesIntoSingleFile(List<File> files) throws IOException {
            File result = File.createTempFile("tmp", "");
            for (File file : files) {
                String encoding = "UTF-8";
                FileUtils.write(result, FileUtils.readFileToString(file, encoding), encoding, true);
            }
            return result;
        }
    
        /**
         * 将子集合并到终集中
         *
         * @param resultFromWorker 终集
         * @param finalResult      子集
         */
        private void mergeWorkerResultIntoFinalResult(Map<String, Integer> resultFromWorker,
                                                      Map<String, Integer> finalResult) {
            for (Map.Entry<String, Integer> entry : resultFromWorker.entrySet()) {
                String word = entry.getKey();
                int mergedResult = finalResult.getOrDefault(word, 0) + entry.getValue();
                finalResult.put(word, mergedResult);
            }
        }
    
        static class WorkerJob implements Callable<Map<String, Integer>> {
            private BufferedReader reader;
    
            private WorkerJob(BufferedReader reader) {
                this.reader = reader;
            }
    
            @Override
            public Map<String, Integer> call() throws Exception {
                String line;
                Map<String, Integer> result = new HashMap<>();
                while ((line = reader.readLine()) != null) {
                    String[] words = line.split(" ");
    //                System.out.println(Thread.currentThread().getName());
    //                System.out.println(line);
    //                System.out.println();
                    for (String word : words) {
                        result.put(word, result.getOrDefault(word, 0) + 1);
                    }
                }
                return result;
            }
        }
    }
    

    简单删除一些代码就可以实现单文件统计(略)。


    参考:

    1. Java线程的6种状态及切换(透彻讲解)

    相关文章

      网友评论

        本文标题:中级09 - Java多线程初步

        本文链接:https://www.haomeiwen.com/subject/jkhtpctx.html