-
BlockingQueue接口和生产者-消费者模式
(1) 生产者-消费者模式消除了生产者类和消费者类之间的代码依赖性: 生产者将数据放入队列; 消费者从队列中取数据
(2) 阻塞队列提供了可阻塞的put和take方法, 和支持定时的offer和poll方法
(3) 队列可以有界也可以无界(在构造函数中指定, 但其实无界的阻塞队列也有最大值Integer.MAX_VAL)
(4) BlockingQueue是一个接口, 几个实现是ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue。
其中, ArrayBlockingQueue和LinkedBlockingQueue分别与ArrayList和LinkedList相似, 是FIFO队列;
PriorityBlockingQueue是优先级队列;
SynchronousQueue比较特殊, 它维护一组线程,没有存储功能, 生产者和消费者的put和take会一直阻塞直到另一个线程准备好参与到交付过程中
(5) 示例
桌面搜索: 生产者线程负责找到所有文件, 消费者线程负责对文件建立索引
public class ProducerConsumer { static class FileCrawler implements Runnable { private final BlockingQueue<File> fileQueue; private final FileFilter fileFilter; private final File root; public FileCrawler(BlockingQueue<File> fileQueue, final FileFilter fileFilter, File root) { this.fileQueue = fileQueue; this.root = root; this.fileFilter = new FileFilter() { public boolean accept(File f) { return f.isDirectory() || fileFilter.accept(f); } }; } private boolean alreadyIndexed(File f) { return false; } public void run() { try { crawl(root); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private void crawl(File root) throws InterruptedException { File[] entries = root.listFiles(fileFilter); if (entries != null) { for (File entry : entries) if (entry.isDirectory()) { crawl(entry); } else if (!alreadyIndexed(entry)) { fileQueue.put(entry); } } } } } static class Indexer implements Runnable { private final BlockingQueue<File> queue; public Indexer(BlockingQueue<File> queue) { this.queue = queue; } public void run() { try { while (true) { indexFile(queue.take()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } public void indexFile(File file) { // Index the file... } } private static final int BOUND = 10; private static final int N_CONSUMERS = Runtime.getRuntime().availableProcessors(); public static void startIndexing(File[] roots) { BlockingQueue<File> queue = new LinkedBlockingQueue<>(BOUND); FileFilter filter = new FileFilter() { public boolean accept(File file) { return true; } }; for (File root : roots) { new Thread(new FileCrawler(queue, filter, root)).start(); } for (int i = 0; i < N_CONSUMERS; i++) { new Thread(new Indexer(queue)).start(); } } }
(6) 串行线程封闭
线程封闭对象只能由单个线程拥有, 但可以通过安全发布该对象转移所有权: 这种安全的发布确保了对象状态对于新的所有者可见, 并且由于最初的所有者不会访问它, 因此对象被封闭在新的线程内。
阻塞队列简化了这项工作
网友评论