把Nio WatchService简单封装了一下,支持:
- 监控多个目录
- 设置监控类型(增、删、改)
- 设置线程池
- 设置多个过滤器
- 设置多个监听器(串行执行)
- 轻量级,不依赖第三方包
- fluent api
- 使用方法
import java.io.IOException;
import java.nio.file.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* <p> description: TODO
* <p> 2018/11/04
*
* @author ssj
* @version 1.0.0
*/
public class Test {
public static void main(String[] args) throws IOException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(5);
new FileWatcher.FileWatcherBuilder()
.addWatchPath("/home/ssj/Downloads/test")
.addWatchPath("/home/ssj/Downloads/test/新建文件夹")
.addWatchKind(StandardWatchEventKinds.ENTRY_CREATE)
.addWatchKind(StandardWatchEventKinds.ENTRY_DELETE)
.setExecutor(pool)
.addFilter((p,k)-> p.getFileName().toString().contains("test"))
.addFilter((p,k)-> p.getFileName().toString().endsWith(".txt"))
.addListener((p,k)->{
System.out.println("listener1:" + p.getFileName());
})
.addListener((p,k)->{
System.out.println("listener2:" + p.getFileName());
})
.build()
.startWatch();
TimeUnit.MINUTES.sleep(5);
}
}
- 工具类
import java.io.IOException;
import java.nio.file.*;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.logging.Logger;
/**
* <p> description: 监控文件变化,监控一层,不带递归
* 可监控多个地址,增删改,设置多个回调(串行执行)
* <p> 2018/11/07
*
* @author ssj
* @version 1.0.0
*/
public class FileWatcher {
private Set<Path> paths;
private Set<WatchEvent.Kind> kinds;
private List<BiConsumer<Path, WatchEvent.Kind>> listeners;
private BiConsumer<Path, WatchEvent.Kind> consumer;
private List<BiPredicate<Path, WatchEvent.Kind>> filters;
private BiPredicate<Path, WatchEvent.Kind> predicate;
private Executor pool;
private boolean started = false;
private static final Logger logger = Logger.getGlobal();
private FileWatcher(Set<Path> paths, Set<WatchEvent.Kind> kinds,
List<BiConsumer<Path, WatchEvent.Kind>> listeners, Executor pool,
List<BiPredicate<Path, WatchEvent.Kind>> filters) {
this.paths = paths;
this.kinds = kinds;
this.listeners = listeners;
this.pool = pool;
this.filters = filters;
}
private void initConsumer() {
consumer = (p, k) -> {};
for (BiConsumer<Path, WatchEvent.Kind> listener : listeners) {
consumer = consumer.andThen(listener);
}
}
private void initPredicate() {
predicate = (p, k) -> true;
if (filters != null && filters.size() != 0) {
for (BiPredicate<Path, WatchEvent.Kind> filter : filters) {
predicate = predicate.and(filter);
}
}
}
private void startWatch(Path path) throws IOException {
WatchService watchService =
FileSystems.getDefault().newWatchService();
WatchEvent.Kind[] kindsArry = kinds.toArray(new WatchEvent.Kind[kinds.size()]);
path.register(watchService, kindsArry);
for (; ; ) {
try {
WatchKey key = watchService.take();
List<WatchEvent<?>> watchEvents = key.pollEvents();
for (WatchEvent<?> watchEvent : watchEvents) {
Path p = (Path) watchEvent.context();
WatchEvent.Kind k = watchEvent.kind();
if (predicate.test(p, k)) {
consumer.accept(p, k);
}
}
boolean reset = key.reset();
if (!reset) {
logger.severe("watchkey重置失败,停止监听");
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
logger.severe("监听中断");
break;
}
}
watchService.close();
}
public void startWatch() {
if (started) {
logger.severe("监听已经开始, 无法重复监听");
return;
}
started = true;
initConsumer();
initPredicate();
for (Path path : paths) {
pool.execute(() -> {
try {
startWatch(path);
} catch (IOException e) {
e.printStackTrace();
}
});
}
logger.info("监听开始...");
}
public static class FileWatcherBuilder {
private Set<Path> paths = new HashSet<>();
private Set<WatchEvent.Kind> kinds = new HashSet<>();
private List<BiConsumer<Path, WatchEvent.Kind>> listeners = new ArrayList<>();
private List<BiPredicate<Path, WatchEvent.Kind>> filters;
private Executor pool;
public FileWatcherBuilder addWatchPath(String path) {
paths.add(Paths.get(path));
return this;
}
public FileWatcherBuilder addWatchKind(WatchEvent.Kind kind) {
kinds.add(kind);
return this;
}
public FileWatcherBuilder addListener(BiConsumer<Path, WatchEvent.Kind> listener) {
listeners.add(listener);
return this;
}
public FileWatcherBuilder addFilter(BiPredicate<Path, WatchEvent.Kind> filter) {
if (filters == null) {
filters = new ArrayList<>();
}
filters.add(filter);
return this;
}
public FileWatcherBuilder setExecutor(Executor pool) {
this.pool = pool;
return this;
}
public FileWatcher build() {
if (paths == null || paths.size() == 0) {
throw new IllegalArgumentException("还未设置监听路径");
}
if (kinds == null || kinds.size() == 0) {
throw new IllegalArgumentException("还未设置监听类型");
}
if (listeners == null || listeners.size() == 0) {
throw new IllegalArgumentException("还未设置监听回调");
}
if (this.pool == null) {
throw new IllegalArgumentException("未设置监控线程池");
}
return new FileWatcher(this.paths, this.kinds, this.listeners, this.pool, this.filters);
}
}
}
网友评论