event包
EventUtils
事件绑定和监听的工具类
//给事件源绑定监听器
public static <L> void addEventListener(final Object eventSource, final Class<L> listenerType, final L listener)
//绑定事件到某个对象的方法,即发生事件后,可以执行某个对象的方法
public static <L> void bindEventsToMethod(final Object target, final String methodName, final Object eventSource,
final Class<L> listenerType, final String... eventTypes)
concurrent包
AtomicInitializer
用于初始化对象,调用get()方法后,要不只获得一个单例,要不重新出事化一个对象。
AtomicSafeInitializer
用于初始化对象,但是它能保证只初始化一次。
BackgroundInitializer
用于在后台初始化资源,调用start()方法后,可以做其他事情,直到调用get()方法,如果已经初始化完毕,则可以马上获取,否则线程会被阻塞
CallableBackgroundInitializer
带返回值的初始化
// a Callable that performs a complex computation
Callable<Integer> computationCallable = new MyComputationCallable();
// setup the background initializer
CallableBackgroundInitializer<Integer> initializer =
new CallableBackgroundInitializer(computationCallable);
initializer.start();
// Now do some other things. Initialization runs in a parallel thread
...
// Wait for the end of initialization and access the result
Integer result = initializer.get();
BasicThreadFactory
线程工厂类
BasicThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("workerthread-%d")
.daemon(true)
.priority(Thread.MAX_PRIORITY)
.build();
ExecutorService exec = Executors.newSingleThreadExecutor(factory);
ThresholdCircuitBreaker
阈值电子回路。传递一个增量值,如果到达阈值,则表示开启
long threshold = 10L;
ThresholdCircuitBreaker breaker = new ThresholdCircuitBreaker(10L);
...
public void handleRequest(Request request) {
long memoryUsed = estimateMemoryUsage(request);
if (breaker.incrementAndCheckState(memoryUsed)) {
// actually handle this request
} else {
// do something else, e.g. send an error code
}
}
AbstractCircuitBreaker
抽象电子回路。提供一些基础实现,另外实现了事件监听,即到达阈值后会告诉监听方。关键代码如下
//添加监听器
public void addChangeListener(final PropertyChangeListener listener) {
changeSupport.addPropertyChangeListener(listener);
}
//状态变更,告诉监听方
protected void changeState(final State newState) {
if (state.compareAndSet(newState.oppositeState(), newState)) {
changeSupport.firePropertyChange(PROPERTY_NAME, !isOpen(newState), isOpen(newState));
}
}
ConcurrentUtils
并发工具类。提供一些方法将检查型异常转为非检查型异常。
//不存在则初始化并放入map
public static <K, V> V createIfAbsent(final ConcurrentMap<K, V> map, final K key,
final ConcurrentInitializer<V> init) throws ConcurrentException {
if (map == null || init == null) {
return null;
}
final V value = map.get(key);
if (value == null) {
return putIfAbsent(map, key, init.get());
}
return value;
}
ConstantInitializer
调用get()方法,总会返回同一个对象。不用使用同步。
Memoizer
记忆器? 为什么拼写不正确?
主要功能是缓存一个计算结果,下次来取,直接获取结果,不用重新计算
public O compute(final I arg) throws InterruptedException {
while (true) {
Future<O> future = cache.get(arg);
if (future == null) {
final Callable<O> eval = () -> computable.compute(arg);
final FutureTask<O> futureTask = new FutureTask<>(eval);
future = cache.putIfAbsent(arg, futureTask);
//如果future是null,表示第一次执行
if (future == null) {
future = futureTask;
//第一次执行
futureTask.run();
}
}
try {
return future.get();
} catch (final CancellationException e) {
cache.remove(arg, future);
} catch (final ExecutionException e) {
if (recalculate) {
cache.remove(arg, future);
}
throw launderException(e.getCause());
}
}
}
MultiBackgroundInitializer
名字叫多重后台初始化器。
protected MultiBackgroundInitializerResults initialize() throws Exception {
Map<String, BackgroundInitializer<?>> inits;
synchronized (this) {
// create a snapshot to operate on
inits = new HashMap<>(
childInitializers);
}
// start the child initializers
final ExecutorService exec = getActiveExecutor();
for (final BackgroundInitializer<?> bi : inits.values()) {
if (bi.getExternalExecutor() == null) {
// share the executor service if necessary
bi.setExternalExecutor(exec);
}
// 异步提交,提交完就会返回
bi.start();
}
// collect the results
final Map<String, Object> results = new HashMap<>();
final Map<String, ConcurrentException> excepts = new HashMap<>();
for (final Map.Entry<String, BackgroundInitializer<?>> e : inits.entrySet()) {
try {
//可是在e.getValue().get()的时候会阻塞,其中一个没执行完,就会等待执行完,然后等待下一个执行完
results.put(e.getKey(), e.getValue().get());
} catch (final ConcurrentException cex) {
excepts.put(e.getKey(), cex);
}
}
return new MultiBackgroundInitializerResults(inits, results, excepts);
}
TimedSemaphore
时间的信号量。超过时间,会自动恢复信号量。不用调用release()来释放信号量
TimedSemaphore sem = new TimedSemaphore(1, TimeUnit.SECOND, 10);
StatisticsThread thread = new StatisticsThread(sem);
thread.start();
public class StatisticsThread extends Thread {
//信号量用来限制数据库负载
private final TimedSemaphore semaphore;
public StatisticsThread(TimedSemaphore timedSemaphore) {
semaphore = timedSemaphore;
}
public void run() {
try {
while (true) {
//获取信号量
semaphore.acquire(); // limit database load
//执行查询
performQuery(); // issue a query
}
} catch(InterruptedException) {
// fall through
}
}
...
}
实现逻辑是:起一个定时器,定时重置acquireCount变量(表示已被使用的信号量)为0,然后其他线程就可以拿到信号量,不用一直被阻塞住(因为之前在acquire()方法里调用了wait()方法)
//起一个定时器,定时执行endOfPeriod
protected ScheduledFuture<?> startTimer() {
return getExecutorService().scheduleAtFixedRate(() -> endOfPeriod(), getPeriod(), getPeriod(), getUnit());
}
//使用synchronized来保证acquireCount的同步访问
synchronized void endOfPeriod() {
lastCallsPerPeriod = acquireCount;
totalAcquireCount += acquireCount;
periodCount++;
acquireCount = 0;
//唤醒其他已阻塞的线程
notifyAll();
}
网友评论