美文网首页程序员技术干货基础知识
使用Java8新特性parallelStream遇到的坑

使用Java8新特性parallelStream遇到的坑

作者: GuangHui | 来源:发表于2018-12-19 20:17 被阅读17次

    1 问题测试代码

      public static void main(String[] args) {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
            List<Calendar> list = new ArrayList<>();
            for (int i = 0; i < 20; i++) {
                  Calendar startDay = new GregorianCalendar();
                  Calendar checkDay = new GregorianCalendar();
                  checkDay.setTime(startDay.getTime());//不污染入参
                  checkDay.add(checkDay.DATE,i);
                  list.add(checkDay);
                  checkDay = null;
                  startDay = null;
            }
    
            list.stream().forEach(day ->  System.out.println(sdf.format(day.getTime())));
            System.out.println("-----------------------");
            list.parallelStream().forEach(day ->  System.out.println(sdf.format(day.getTime())));
            System.out.println("-----------------------");
      }
    

    说明:

    (1) 使用stream().forEach(),就是单纯的串行遍历循环,和使用for循环得到的效果一样,只是这种方式可以使代码更精简;

    (2) 使用parallelStream().forEach(),是并行遍历循环,相当于是使用了多线程处理.这样可以在一定程度上提高执行效率.而程序在运行过程中具体会使用多少个线程进行处理,系统会根据运行服务器的资源占用情况自动进行分配.

    2 运行结果

    image.png

    3 原因排查

    网上搜索查询搜索到相关的文章如下:
    <<JAVA使用并行流(ParallelStream)时要注意的一些问题>>,
    <<java8的ParallelStream踩坑记录>>.

    这些文章中描述的问题归根结底都是同一类问题,那就是在使用parallelStream().forEach()时,都操作了线程不安全的对象(ArrayList).

    查看ArrayList的源码如下:

        transient Object[] elementData; // non-private to simplify nested  class access   
       /**
         * Appends the specified element to the end of this list.
         * @param e element to be appended to this list
         * @return <tt>true</tt> (as specified by {@link Collection#add})
         */
        public boolean add(E e) {
            ensureCapacityInternal(size + 1);  // Increments modCount!!
            elementData[size++] = e;
            return true;
        }
    

    通过查看源码可以看到,ArrayList本身底层是通过一个名为elementData的数组实现的,而add()方法上并没有加同步锁,可见在多线程并发情况下存在线程不安全的问题.

    这些文章最后的解决方案都是将操作ArrayList转化为一个同步的集合:

    Collections.synchronizedList(new ArrayList<>())
    

    这样并行流操作同一个ArrayList的对象中add()方法时,就都是同步串行操作的了,就不存在线程安全的问题了,也即是解决了文章中反馈的问题.

    那么出问题的原因就找到了,那就是在使用parallelStream().forEach()时,都操作了线程不安全的对象.

    4 结合自己的问题

    上面找到的出问题的原因,就是在parallelStream().forEach()中使用了线程不安全的对象.

    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
    ...
    list.parallelStream().forEach(**day** ->  System.out.println(sdf.format(day.getTime())));
    

    如上面代码所示,从list中遍历的day和day.getTime()肯定不会有线程安全问题.那么就只剩下SimpleDateFormat实例对象了.下面咱查看SimpleDateFormat对象的format()源码深挖得到如下信息:

    public abstract class DateFormat extends Format {
        /**
         * The {@link Calendar} instance used for calculating the date-time  fields
         * and the instant of time. This field is used for both formatting  and
         * parsing.
         *
         * <p>Subclasses should initialize this field to a {@link Calendar}
         * appropriate for the {@link Locale} associated with this
         * <code>DateFormat</code>.
         * @serial
         */
        protected Calendar calendar;   
        ...
     // Called from Format after creating a FieldDelegate
        private StringBuffer format(Date date, StringBuffer toAppendTo,
                                    FieldDelegate delegate) {
            // Convert input date to time field list
            calendar.setTime(date);
            boolean useDateFormatSymbols = useDateFormatSymbols();
            for (int i = 0; i < compiledPattern.length; ) {
                int tag = compiledPattern[i] >>> 8;
                int count = compiledPattern[i++] & 0xff;
                if (count == 255) {
                    count = compiledPattern[i++] << 16;
                    count |= compiledPattern[i++];
                }
                switch (tag) {
                case TAG_QUOTE_ASCII_CHAR:
                    toAppendTo.append((char)count);
                    break;
                case TAG_QUOTE_CHARS:
                    toAppendTo.append(compiledPattern, i, count);
                    i += count;
                    break;
                default:
                    subFormat(tag, count, delegate, toAppendTo,  useDateFormatSymbols);
                    break;
                }
            }
            return toAppendTo;
        }
    

    format()方法中操作了一个成员变量calendar,且该方法上未加同步锁,说明该方法在多线程并发访问时,存在线程安全问题.这就是上面测试代码中出现重复数据的根本原因.

    进一步查询得知,Java8以前的老版本中的日期和时间类全部都是线程不安全的,而在Java8新推出的日期类LocalDate和LocalDateTime非常友好的解决了上述问题.

    5 针对测试代码中问题的根本解决之道

    弃用Java8之前旧版本中的日期和时间类,改用新版本中的时间类.新修改后的代码如下:

          public static void main1(String[] args) {
                DateTimeFormatter fmt =  DateTimeFormatter.ofPattern("yyyy-MM-dd");
                LocalDate date = LocalDate.now();
                List<LocalDate> list = new ArrayList<>();
                for (int i = 0; i < 20; i++) {
                      LocalDate date1 = date.plusDays(i);
                      list.add(date1);
                }
                list.stream().forEach(day ->  System.out.println(day.format(fmt)));
                System.out.println("-----------------------");
                list.parallelStream().forEach(day ->  System.out.println(day.format(fmt)));
          }     
    
          public static void main2(String[] args) {
                DateTimeFormatter fmt =  DateTimeFormatter.ofPattern("yyyy-MM-dd");
                LocalDateTime date = LocalDateTime.now();
                List<LocalDateTime> list = new ArrayList<>();
                for (int i = 0; i < 20; i++) {
                      LocalDateTime date1 = date.plusDays(i);
                      list.add(date1);
                }
                list.stream().forEach(day ->  System.out.println(day.format(fmt)));
                System.out.println("-----------------------");
                list.parallelStream().forEach(day ->  System.out.println(day.format(fmt)));
          }
    

    看一下LocalDate和LocalDateTime的源码:通过查看源码,可以看到LocalDate和LocalDateTime类都是不可变和线程安全的.这样的下面的代码中的day每一次都是不同的对象

    list.parallelStream().forEach(day ->  System.out.println(day.format(fmt)));
    

    再来对比最初问题代码:并行操作时,在使用同一个sdf实例.

    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
    ...
    list.parallelStream().forEach(day ->  System.out.println(sdf.format(day.getTime())));
    

    LocalDate类源码:

    * @implSpec
    * This class is immutable and thread-safe.
    * @since 1.8
    */
    public **final** class LocalDate
            implements Temporal, TemporalAdjuster, ChronoLocalDate,  Serializable {
    ...
    

    LocalDateTime类源码:

    * @implSpec
    * This class is immutable and thread-safe.
    *
    * @since 1.8
    */
    public final class LocalDateTime
            implements Temporal, TemporalAdjuster,  ChronoLocalDateTime<LocalDate>, Serializable {
    

    至此,测试代码中出问题的根本原因找到,根本解决之道找到.OK!

    相关文章

      网友评论

        本文标题:使用Java8新特性parallelStream遇到的坑

        本文链接:https://www.haomeiwen.com/subject/sqdskqtx.html