美文网首页javaWeb学习今日看点程序员
Java 多线程模型与并发设计

Java 多线程模型与并发设计

作者: forDream_12138 | 来源:发表于2016-11-04 14:12 被阅读581次

序言

上一次提到了Java 1.5中提供新的多线程模型,在大多数情况下,这已经能够满足日常开发的需要。但是偶尔也许觉得那一套模型还是觉得欠缺点什么,于是乎,Java 7/8中又提供了新的多线程模型。

Java 8中提供了并行流以及**ForkJoinPool **(FJP)和lambda(据说Java 8的lambda只是语法糖,没有深究过)

ForkJoinPool / ForkJoinTask

这一套工具是由Java 7提供的。要使用这种方法之前,应该有所了解函数式编程,如果有过JavaScript或者其他一个脚本语言的开发,应该对此不会陌生。另外,通过这种方法,比较难确定实际上是否使用了超过一个线程,因为这是由的具体实现决定的。最后,在默认情况下是通过ForkJoinPool.commonPool()实现并行的。这个通用池由JVM来管理,并且被JVM进程内的所有线程共享。(以下示例代码若非特别说明均要求Java 7及以上,部分代码出于简洁,使用了lambda表达式,因此需要Java 8及以上才可以运行。可以将lambda表达式用匿名内部类替代,即可在Java 7下编译通过)

多线程经常会伴随着并行计算(并行不等于并发),虽然并不绝对,但是通常与并行或多或少存在着联系。而并行计算的特点在于将较为复杂、庞大的任务,拆解成互不相干、较为简单、小型的任务,最后将各个小任务的结果汇总、分析、处理,得到原本大任务的结果。这样做的目的无非是提高效率,充分利用硬件计算资源,有效规避瓶颈效应。

接下来以计算y! - x!为例,展示代码:

// 计算y! - x!的值
class MyJob extends RecursiveTask<Integer> {

    private int y;
    private int x;

    public MyJob(int x, int y) {
        this.x = x;
        this.y = y;
    }

    @Override
    protected Integer compute() {
        if (x > 2) { // 先计算x的阶乘
            MyJob subJobX = new MyJob(x - 1, -1);
            subJobX.fork();
            x *= subJobX.join();

            if (y == -1) { // 判断是否为递归计算
                return x; // 递归计算则返回阶乘结果
            } else if (y > 2) { // 计算y的阶乘
                MyJob subJobY = new MyJob(y - 1, -1);
                subJobY.fork();
                y *= subJobY.join();

                return y - x; // 输出最后任务的结果
            } else { // 正常输入,不会进入这个分支
                System.err.println("Error.");
                return 0;
            }
        } else {
            return 2;
        }
    }
}

public class ForkJoinExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        MyJob job = new MyJob(4, 10);

        Future<Integer> result = forkJoinPool.submit(job);

        while (!result.isDone()) {
            System.err.println("Waiting for result");
            Thread.sleep(0, 1);
        }
        forkJoinPool.shutdown();
        System.err.println("The results is " + result.get());
    }
}

输出结果:

Waiting for result
Waiting for result
The results is 3628776

在这个例子中,我把计算n * (n - 1)作为最小的任务。因此先判断x是否大于2,如果不大于2,则fork出一个分支,计算(x - 1)!的值,y同理。最后得到x!与y!的值,相减,得出最后的结果。代码中,fork出来的分支中,参数y我作为一个标志,如果为-1则不是原始调用,而是fork的子任务,只需要负责计算传递进来x的阶乘即可。

这段代码中,你看不出任务在哪里完成(哪个线程)、由谁完成、什么时候完成。正如前面所述,甚至你很难看出来是否是大于一个线程在执行。

当然你可以尝试在compte方法中增加System.err.printf("%s is running.%n",Thread.currentThread().getName());的语句来查看输出,到底有几个线程在运行。不过根据我的实践来说,一般情况下,应该只有一个线程在运行。这不是说代码有问题,主要有两个因素:

  • 代码设计不合理,我的代码中,计算x的阶乘与y的阶乘是分开的,并没有一起fork,因此本质上并发性其实没有显示出来。我的代码其实相当于先计算x!,然后计算y!,最后计算y! - x!
  • 由于例子中代码的计算量很小,以当前CPU的计算能力有盈余。针对这种情况,可以尝试多开几个任务同时并行查看输出结果。

当然我觉得上面的例子不好,因此想了另一张场景,并用代码演示一下。

比如现在需要制作一个网络爬虫,爬什么呢,就爬简述首页推荐文章每篇文章的字数。代码中涉及网络请求和正则表达式的部分就不说明了,其中用了我自己写的一个小工具类Spider.javaHttpRequester.java

class JianshuSpiderJob extends RecursiveTask<List<String>> {
    private static final String HOST = "http://www.jianshu.com";
    private String url;

    public JianshuSpiderJob() {
        this(HOST);
    }

    protected JianshuSpiderJob(String url) {
        this.url = url;
    }

    protected List<String> requestHomepage() throws IOException {
        List<String> result = new ArrayList<>();

        Spider.newHost(new URL(this.url)).get((responseCode, responseHeaders, responseStream) -> { // 请求简书主页
            if (responseCode == 200) {
                Pattern indexPattern = Pattern.compile("(/p/[a-z0-9]+)\">([^<>]+)</a></h4>");
                Matcher indexMatcher = indexPattern.matcher(responseStream.toString());

                // 从这里开始派分子任务
                List<JianshuSpiderJob> subJobs = new ArrayList<>();
                while (indexMatcher.find()) {
                    String subUrl = indexMatcher.group(1);
                    String subTitle = indexMatcher.group(2);
                    JianshuSpiderJob subJob = new JianshuSpiderJob(subUrl);
                    subJob.fork();
                    subJobs.add(subJob);
                    result.add(String.format("%s=%s", subTitle, subUrl));
                }

                // 衔接子任务的结果
                for (JianshuSpiderJob job : subJobs) {
                    List<String> list = job.join();
                    if (list.size() > 0) {
                        String[] subResult = list.get(0).split("=");
                        for (int i = 0; i < result.size(); i++) {
                            if (result.get(i).indexOf(subResult[0]) > 0) {
                                String[] localResult = result.get(i).split("=");
                                result.remove(i);
                                result.add(String.format("%s=%s", localResult[0], subResult[1]));
                                break;
                            }
                        }
                    }
                }
            } else { // 网络错误
                System.err.println("There is an error when trying to get homepage.");
            }
            return responseCode;
        });

        return result; // 返回最终结果
    }

    protected List<String> requestSubPage() throws IOException {
        List<String> result = new ArrayList<>();
        Map<String, String> requestHeader = new HashMap<>();
        requestHeader.put("User-Agent", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.71 Safari/537.36");
        // 获取文章页面的详细信息
        Spider.newHost(new URL(HOST + this.url))
                .setRequestHeaders(requestHeader)
                .get((responseCode, responseHeaders, responseStream) -> { // 请求具体的文章页
                    String html = responseStream.toString();
                    Pattern contextPattern = Pattern.compile("\"slug\":\"([a-z0-9]+)\".*?\"wordage\":(\\d+)");
                    Matcher contextMatcher = contextPattern.matcher(html);
                    if (contextMatcher.find())
                        result.add(String.format("%s=%s", contextMatcher.group(1), contextMatcher.group(2)));
                    return responseCode;
                });
        return result;
    }

    @Override
    protected List<String> compute() {
        try {
            System.err.printf("%s is running.%n", Thread.currentThread().getName()); // 显示当前工作线程
            if (HOST.equals(this.url)) {
                return this.requestHomepage();
            } else {
                return this.requestSubPage();
            }
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }
    }
}

public class ForkJoinExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        JianshuSpiderJob job = new JianshuSpiderJob();
        forkJoinPool.submit(job);

        List<String> result = job.get();

        // 输出结果
        System.err.println("Article\tWords");
        for (String str : result) {
            String[] s = str.split("=");
            if (s.length > 1)
                System.err.printf("%s\t%s%n", s[0], s[1]);
            else
                System.err.println("Err -> " + str);
        }
        forkJoinPool.shutdown();

    }
}

这里例子相比较之前那个计算阶乘比较有典型,因为网络请求本身是阻塞。一个请求可能几毫秒就可以返回,也可以几秒钟才返回,甚至等了几秒钟以后,连接被中断,请求失败。例子中,并没有考虑网络异常的情况下。

运行结果:

ForkJoinPool-1-worker-1 is running.
ForkJoinPool-1-worker-1 is running.
ForkJoinPool-1-worker-0 is running.
ForkJoinPool-1-worker-2 is running.
ForkJoinPool-1-worker-0 is running.
ForkJoinPool-1-worker-3 is running.
ForkJoinPool-1-worker-0 is running.
ForkJoinPool-1-worker-2 is running.
ForkJoinPool-1-worker-4 is running.
ForkJoinPool-1-worker-3 is running.
ForkJoinPool-1-worker-6 is running.
ForkJoinPool-1-worker-4 is running.
ForkJoinPool-1-worker-3 is running.
ForkJoinPool-1-worker-4 is running.
ForkJoinPool-1-worker-2 is running.
ForkJoinPool-1-worker-0 is running.
ForkJoinPool-1-worker-7 is running.
ForkJoinPool-1-worker-4 is running.
ForkJoinPool-1-worker-3 is running.
ForkJoinPool-1-worker-0 is running.
ForkJoinPool-1-worker-4 is running.
Article Words
又一年轻姑娘离世:请记住这些“1+1=死神”的药物!  2862
报告大王,新概念英语一至四册全套资源(视频、音频、电子书)已被我活捉! 861
模式学习|找到最适合你的学习“套路”  4009
我们女孩子真不容易,既要貌美如花,又要赚钱养家。    1603
二十岁出头的你,别急着想要出人头地   2787
别怕,谁的大学不迷茫  1649
这10年,多少人从郭敬明到咪蒙 3143
老实人浪起来,你我都招架不住  2165
如果没有回报,我会坚持写作多久 2595
简书早报161030——《不负责任吐槽,四本买完看完就后悔的畅销书》  2029
时而自信,时而自卑,如何改变这种双重人生?   2855
有哪些小乐器,是学习起来非常方便的?  4259
穷人,最可怕的是总说自己穷   1838
时光回去,只愿未曾遇到你(五十七)   3109
亲爱的,千万别把孩子养得“输不起”!  2580
你凭什么诋毁我的爱豆!向语言暴力Say No! 2177
史上最全36个虐腹动作:想要马甲线,人鱼线的朋友练起来 776
两只蜗牛的爱情 3382
比文招亲【星言夙驾,说于桑田】十二   2493
《简书历史月刊003·三千年来谁著史》上线   1118

从结果来看,整个过程最多的时候使用了8个线程来完成这个任务。每次运行具体使用的线程数都不一样,读者也可以将这段代码复制过去,并引用连接中的两个类,看看结果如何。(由于运行结果依赖于简书服务器返回的结果,随着时间推移,程序结果很可能不正确,望知悉)

在整个代码中,程序员并不知道具体任务是如何分配,程序员的关注点只在业务逻辑本身上,而不用关心有关于线程调度的问题。具体的调度交给里FJP。

ForkJoinTask 中抛出异常

而在ForkJoinTask,可能会引发Unchecked Exception,因此可以调用ForkJoinTask.isCompletedAbnormally()来判断是否任务在执行中出现异常。如果返回值为Throwable类型则表明在执行过程中出现Unchecked Exception;若返回值为CancellationException则表明任务在执行过程中被取消;如果任务还没有结束或者正常完成,没有异常,则返回null。

ForkJoinPool / ForkJoinTask 与 Executor 关联

从类继承图上可以看到,ForkJoinPool 间接继承了Executor,因此可以认为两者师出同门,只不过后者提供更加便捷API,使程序员将关注点更加集中在业务上。既然两者师承一派,那么很多地方是一样或类似的,这里着重说一下不同的地方。

区别 Executor ForkJoinPool
接受的对象 Runnable和Callable的实例 Runnable、Callable和ForkJoinTask的实例
调度模式 处于后面等待中的任务需要等待前面任务执行后才有机会被执行,是否被执行取决于具体的调度规则 采用work-stealing模式帮助其他线程执行任务,即ExcuteService解决的是并发问题,而ForkJoinPool解决的是并行问题。

对于了解类UNIX系统的人来说,对于fork这个词应该不会陌生。这里fork的含义基本相同,即一个大任务分支出多个小任务执行,而小任务的执行过程中可能还会分支出更小的任务,如此往复,直到分支出来的任务是原子任务。

而join是等待刚才fork出去的分支,返回结果。顺序与fork正好相反,执行结果不断的join向上,最后那个大任务的结果就出来了。

其实FJP中还有一个Actor模型,但是我没用过,就不介绍了,感兴趣的可以善用搜索引擎。

Java 8 中的Stream

这个Stream不同于OIO中的Stream,不是一种输出/输出流,其本身不包含任何数据,更像一种迭代器。这在后面的例子中会提现出来,这个Stream允许并行的对集合类型进行迭代操作,并且依托于lambda表达式,可以用极为简便的代码完成对集合的CRUD操作。而Stream之所以能够提供并行迭代的,是因为其内部使用了FJP的模型(以下代码若非特别说明均需要Java 8及以上)

一般来说,使用一个Stream的流程是:

  1. 取得一个数据源 source
  2. 数据转换
  3. CRUD操作
  4. 返回新的Stream

Stream不会改变数据源,每次都会返回一个新的数据集合。而数据源可以是这些来源:

  • Collection 对象
    • Collection.stream()
    • Collection.parallelStream()
  • 数组
    • Arrays.stream(T array)
    • Stream.of()
  • BufferedReader
    • BufferedReader.lines()
  • java.util.stream.IntStream.range()
  • java.nio.file.Files.walk()
  • java.util.Spliterator
  • Random.ints()
  • BitSet.stream()
  • Pattern.splitAsStream(java.lang.CharSequence)
  • JarFile.stream()

Stream的操作大致分为两大列:

  • Intermediate,一个Stream后面可以跟随任意个Intermediate操作。其主要目的过过滤、映射数据,值得一提的是intermediate是lazy的,因此只有调用相关方法后才会进行相关Stream的真正操作(例如打开文件等)
  • Terminal,一个Stream只能有一个Terminal。一旦执行操作后,这个Stream就已经结束了,因此Terminal一定是一个Stream的最后一个操作。Terminal的调用才会真正开始Stream的遍历,并且会产生一个结果。

Stream的使用方法

理论安利的半天,看看比较直观的代码,比如从随机数中找到大于x的值:(输出大于50的数字)

public class StreamExample {
    public static void main(String[] args) {
        IntStream stream = new Random().ints(0, 100).limit(50); // 构造Stream,生成50个[0,100)之间随机数,这行代码结束的时候,数字还没有生成

        stream.filter(value -> value > 50) // 此时随机数还没有生成
                .forEach(System.out::println); // 直到要输出的时候,才从数据源获取数据
    }
}

代码有没有很简洁?传统方法需要各种各样的for循环,这里全部没有了。首先要格外强调的是:

  • Stream是延迟操作的
  • Stream本身是不包含任何数据
  • Stream的数据均来自于数据源
  • Stream只有执行Terminal操作时,才从数据源上获取数据
  • Stream的(输入)数据源可以是无穷大的
  • Stream的输出不能是无穷的,必须是一个有限集合

这里举个例子,说明一下数据源可以是无限的。常规的集合,数组、列表等都是有限集合,集合可以是非常大(受限于硬件限制),但必定有限。什么是无限的集合?数学上有个概念,叫自然数,定义是所有正整数加上0的集合,而正整数这个子集合是无穷的。那么在Java中如何表示这个无限集合 自然数呢?

class NaturalNumber implements Supplier<BigInteger> {
    private BigInteger num;

    public NaturalNumber() {
        this.num = BigInteger.valueOf(-1);
    }

    @Override
    public BigInteger get() {
        this.num = this.num.add(BigInteger.ONE);
        return this.num;
    }
}

这样就构造了一个无限的自然数集合,通过Stream.generate()方法来构建与这个无限集合相关的Stream对象,Stream每次获取值或调用get方法,无穷无尽。另外还有一个更简便的无穷的自然数集合,只有一句话:

Stream.iterate(0, val -> val + 1);

不过实际上这个有有穷的集合,受限于Integer数据类型的限制,最大只能到Integer.MAX_VALUE

那么什么是输出不能是无穷的呢?有输入,就可以输出,为什么不能无限输出呢?以这个自然数发生器来看个例子:

public class StreamExample {
    public static void main(String[] args) {
        Stream.generate(new NaturalNumber()).forEach(System.err::println);
    }
}

编译没有问题,运行起来也没有问题。但是...似乎程序永远也不会停下来,因为Stream能够得到无穷的输入,那么就可以无尽的输出。永不停歇,大多数情况下,我们不希望程序会这样,同样以这个自然数发生器为例,可能我希望计算从m到n自然数的累加值。但是数据源是无限的,怎么办?

public class StreamExample {
    static class FinalFieldHelper<T> {
        private T obj;

        public FinalFieldHelper(T obj) {
            this.obj = obj;
        }

        public T value() {
            return this.obj;
        }

        public void value(T obj) {
            this.obj = obj;
        }
    }

    public static void main(String[] args) {
        final int m = 10000, n = 100000;
        final FinalFieldHelper<BigInteger> result = new FinalFieldHelper<>(BigInteger.ZERO);
        Stream.generate(new NaturalNumber()).limit(n).skip(m).forEach(bigInteger -> result.value(bigInteger.add(result.value())));

        System.err.printf("from %d to %d -> %s%n",m,n,result.value().toString());
    }
}

是的,正如你所见的那样,使用limit方法,将一个无限集合截取成有限集合,然后再进行操作。因为对于无限集合而言,调用任何一个Terminal操作都会导致程序挂起。(FinalFieldHelper是一个辅助类,因为内部类访问外部类的变量必须是final的,所以在这里我无法更新result的值,用了这么个类变通一下)

这里介绍一下Stream的一些常用方法

方法 用途
distinct 去除重复对象,其结果依赖于具体对象的equals方法
filter 过滤数据源中的结果,产生新的Stream,参数为过滤的方法
map 对于Stream中包含的元素使用给定的转换函数进行转换操作,新生成的Stream只包含转换生成的元素。这个方法有三个对于原始类型的变种方法,分别是:mapToInt,mapToLong和mapToDouble。这三个方法也比较好理解,比如mapToInt就是把原始Stream转换成一个新的Stream,这个新生成的Stream中的元素都是int类型。之所以会有这样三个变种方法,可以免除自动装箱/拆箱的额外消耗;
flatMap 和map类似,不同的是其每个元素转换得到的是Stream对象,会把子Stream中的元素压缩到父集合中;
peek 生成一个包含原Stream的所有元素的新Stream,同时会提供一个消费函数(Consumer实例),新Stream每个元素被消费的时候都会执行给定的消费函数;
limit 对一个Stream进行截断操作,获取其前N个元素,如果原Stream中包含的元素个数小于N,那就获取其所有的元素;
skip 返回一个丢弃原Stream的前N个元素后剩下元素组成的新Stream,如果原Stream中包含的元素个数小于N,那么返回空Stream;

下面就这些常用方法,写一些对应的例子

public class StreamTestCase {
    private final Object[] source = new Object[]{"a", "b", null, "c", new String[]{"d1", "d2", "d3"}, "e", "a", "b", "c", "f"};

    private void printForEach(String methodName, Stream stream) {
        if (methodName != null)
            System.err.printf("===%s Start===%n", methodName);
        System.err.print('[');
        stream.forEach(o -> {
            if (o == null)
                System.err.print(o);
            else if (o instanceof Stream)
                this.printForEach(null, (Stream) o);
            else if (o instanceof String || o instanceof Boolean)
                System.err.print(o);
            else {
                Object[] obj = (Object[]) o;
                System.err.print('[');
                for (Object oo : obj) {
                    System.err.print(oo);
                    System.err.print(' ');
                }
                System.err.print(']');
            }
            System.err.print(' ');
        });
        System.err.println(']');
        if (methodName != null)
            System.err.printf("===%s Finish===%n", methodName);
    }

    @Test
    public void distinctTest() {
        this.printForEach("distinctTest", Stream.of(source).distinct()); // 去掉重复元素,后面的abc就被去掉了
    }

    @Test
    public void peekTest() {
        this.printForEach("piikTest", Stream.of(source).peek(o -> System.err.println("Peek -> " + o))); // 一定要有终端方法,peek才会被调用
    }

    @Test
    public void filterTest() {
        this.printForEach("filterTest", Stream.of(source).filter(o -> o != null && o instanceof String)); // 过滤掉了null和数组
    }

    @Test
    public void limitTest() {
        this.printForEach("limitTest", Stream.of(source).limit(4)); // 截取前四个元素
    }

    @Test
    public void skipTest() {
        this.printForEach("skipTest", Stream.of(source).skip(4)); // 去掉前四个元素
    }

    @Test
    public void mapTest() {
        // 将源对象根据自定义规则进行类型转换
        // 我的转化规则是null保持不变
        // 其他元素非String的转换为True
        // String类型,首字母Ascii码为偶数的为True 其余false
        this.printForEach("mapTest", Stream.of(source).map(o -> {
            if (o == null) return null;
            if (o.getClass().isArray()) return Boolean.TRUE;
            return (o.toString().charAt(0) & 1) == 0;
        }));
    }

    @Test
    public void flatMapTest() {
        this.printForEach("flatMapTest", Stream.of(source).flatMap((Function<Object, Stream<?>>) o -> {
            if (o == null) return null;
            if (o.getClass().isArray()) return Stream.of(Boolean.TRUE);
            return Stream.of((o.toString().charAt(0) & 1) == 0);
        }));
    }
}

相关文章

网友评论

    本文标题:Java 多线程模型与并发设计

    本文链接:https://www.haomeiwen.com/subject/fixuuttx.html