美文网首页
Java线程池理解(1)

Java线程池理解(1)

作者: AlexSun1995 | 来源:发表于2017-12-27 20:05 被阅读0次

    介绍

    什么是线程池?
    线程的创建开销很大,如果每一次线程使用后就丢弃,等下次需要使用的再重新创建,则带来的浪费就很严重。线程池解决了线程复用的问题,线程在执行完任务以后并不是立即销毁,而是继续保留,可以在后续执行其他任务。

    概览

    实践1

    异步读写文件 使用singleThreadPool
    任务描述: 现在有一个很大的csv文件(21G), 内容是1990-2016年的专利数据,希望将2012年及以后的数据提取出来,写入一个新的csv文件中。关键点是:

    • 如果读一行写一行,这样总计1000万行以上的数据,时间将会很长。解决:将读取的文件写入cache中,当cache满了以后,把cache的数据交给写文件线程,cache清空,继续读取文件。
    • write线程虽然和read线程是独立的,但是write线程必须始终只有一个。如果多个的话写文件的顺序会乱掉,多个线程竞争IO,也未必会带来性能的提升。(一个引出问题:多线程写文件,线程锁的问题)
    • 用singleThreadPool控制写文件操作,能够保证写文件线程始终是同一个,如果新的任务提交而旧的任务还没有完成,singleThreadPool也可以阻塞等待(如何实现的?

    master, 核心部分

    package Thread.multi_rw;
    import java.io.*;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * 对大文件(21G csv)在多线程环境下读写
     * 将年份时间是在2012以后的按照行写入新的文件
     */
    public class WriteAndReadMaster {
        private File inputFile = null;
        private String outputFileName = null;
        private LineCache cache;
        private int readExceptionCount = 0;
    
        public WriteAndReadMaster(String sourcePath, String outfileName) {
            inputFile = new File(sourcePath);
            outputFileName = outfileName;
            this.cache = new LineCache();
        }
    
        public void execute() {
            System.out.println("Start to execute...");
            long startTime = System.currentTimeMillis();
            try {
                BufferedReader reader = new BufferedReader(new FileReader(inputFile));
                ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
                String line = null;
                // 此处一行行读取的,考虑是否可以一次读取多行?
                while (reader.ready() && ((line = reader.readLine()) != null)) {
                    String[] contents = line.split(",\"");
                    String data;
                    try {
                        data = contents[7];
                    } catch (ArrayIndexOutOfBoundsException e) {
                        System.err.println("index exception, ignore it");
                        this.readExceptionCount += 1;
                        continue;
                    }
    
                    if (data != null) {
                        try {
                            String tmp = data.substring(0, 4);
                            int year = Integer.parseInt(tmp);
                            if (year >= 2012) {
                                cache.push(line);
                            }
                        } catch (Exception e) {
                            System.err.println("line error, ignore it..");
                            this.readExceptionCount += 1;
                            continue;
                        }
    
                        if (cache.isCacheFull()) {
                            WriteFileTask writeFileTask = new WriteFileTask(outputFileName,
                                    cache.getData());
                            singleThreadPool.submit(writeFileTask);
                            cache.cleanUp();
                        }
                    }
                }
                WriteFileTask writeFileTask = new WriteFileTask(outputFileName,
                        cache.getData());
                singleThreadPool.submit(writeFileTask);
                cache.cleanUp();
    
            } catch (FileNotFoundException e) {
                System.err.println("file not found error");
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                System.err.println("total read exception:" + this.readExceptionCount);
            }
        }
    
    
    }
    

    cache 缓存类

    package Thread.multi_rw;
    
    
    import java.util.ArrayList;
    
    public class LineCache {
        private final int MAX_CACHE_LINE = 10000;
        private volatile int cnt;
        private ArrayList<String> data = null;
        public LineCache(){
            this.data = new ArrayList<>(MAX_CACHE_LINE);
    
            this.cnt = 0;
        }
    
        public LineCache cleanUp(){
            this.data = new ArrayList<>(MAX_CACHE_LINE);
            this.cnt = 0;
            return this;
        }
    
        public boolean isCacheFull(){
            if(data.size() >= MAX_CACHE_LINE){
                return true;
            }
            else return false;
        }
    
        public void push(String line){
            data.add(line);
        }
    
        public synchronized ArrayList<String> getData() {
            return data;
        }
    }
    

    负责写文件的Runable 对象

    package Thread.multi_rw;
    
    import java.io.BufferedWriter;
    import java.io.FileWriter;
    import java.io.IOException;
    import java.util.ArrayList;
    
    public class WriteFileTask implements Runnable {
    
        private String fileName = null;
        private static volatile int totalWriteLines = 0;
        private ArrayList<String> writeData = null;
        public volatile static int exceptionCount = 0;
        public WriteFileTask(String fileName, ArrayList<String> data) {
            this.fileName = fileName;
            this.writeData = data;
            System.err.println("data size: " + data.size());
        }
    
        @Override
        public void run() {
            int cnt = 0;
            try {
                BufferedWriter writer = new BufferedWriter(new FileWriter(fileName, true));
                for (String line : this.writeData) {
                    writer.write(line);
                    writer.write('\n');
                    totalWriteLines += 1;
                    cnt += 1;
                }
            } catch (IOException e) {
                System.err.println("Write Exception");
                this.exceptionCount += 1;
                e.printStackTrace();
            } finally {
                System.out.println("线程: " + Thread.currentThread() +
                        " 写入: " + cnt + "行,总计写入: " + totalWriteLines + "行");
                System.err.println("总计写入错误: " + exceptionCount);
            }
        }
    }
    
    

    参考文章

    深入理解java之线程池

    相关文章

      网友评论

          本文标题:Java线程池理解(1)

          本文链接:https://www.haomeiwen.com/subject/hixugxtx.html