今天看到KafkaProperties.java中对参数的设置使用了PropertyMapper,很好奇.就来看一看PropertyMapper的源码.
备注: 大多数说明都写在了注释里.
KafkaProperties.java中的PropertyMapper
- 首先通过PropertyMapper.get()方法获取PropertyMapper中的静态实例变量(两个构造参数都为null).
private static final PropertyMapper INSTANCE = new PropertyMapper((PropertyMapper)null, (PropertyMapper.SourceOperator)null);
public static PropertyMapper get() {
return INSTANCE;
}
- 然后使用alwaysApplyingWhenNonNull()方法创建一个新的PropertyMapper(以INSTANCE为parent,whenNonNull方法为SourceOperator),SourceOperator的作用是在from生成Source时追加处理返回的Source,后面会讲到.
public PropertyMapper alwaysApplyingWhenNonNull() {
//把whenNonNull方法当成SourceOperator并作为参数调用alwaysApplying(SourceOperator operator)方法
return alwaysApplying(this::whenNonNull);
}
// 对source执行whenNonNull()方法并返回,可用来当做SourceOperator接口作为上面方法的参数
private <T> Source<T> whenNonNull(Source<T> source) {
return source.whenNonNull();
}
// 返回一个新的PropertyMapper实例,使用了给定 SourceOperator
public PropertyMapper alwaysApplying(SourceOperator operator) {
Assert.notNull(operator, "Operator must not be null");
return new PropertyMapper(this, operator);
}
public static final class Source<T> {
private final Supplier<T> supplier;
private final Predicate<T> predicate;
private Source(Supplier<T> supplier, Predicate<T> predicate) {
Assert.notNull(predicate, "Predicate must not be null");
this.supplier = supplier;
this.predicate = predicate;
}
// Source的whenNonNull方法返回一个新生成的Source,
//对原来的supplier包装一个不会抛出空指针异常的supplier(避免source中的supplier为Null),
//然后predicate(to时使用)则使用Object::nonNull.
public Source<T> whenNonNull() {
return new Source<>(new NullPointerExceptionSafeSupplier<>(this.supplier),Objects::nonNull);
}
//......略
}
//对传入的supplier封装一层,调用get时如果有空指针异常则进行捕获并return null
private static class NullPointerExceptionSafeSupplier<T> implements Supplier<T> {
private final Supplier<T> supplier;
NullPointerExceptionSafeSupplier(Supplier<T> supplier) {
this.supplier = supplier;
}
@Override
public T get() {
try {
return this.supplier.get();
}catch (NullPointerException ex) {
return null;
}
}
}
- 接着调用from方法构建PropertyMapper.Source :通过from方法传入一个Supplier接口,如果PropertyMapper没有parent(此处应该用了装饰者),则直接new出一个带CachingSupplier的Source.我们通过INSTANCE生成的PropertyMapper实例是以INSTANCE为实例的,故此处调用的是INSTANCE的from方法(其实最终还是走到了CachingSupplier)
private static final Predicate<?> ALWAYS = (t) -> {
return true;
};
/**
* 先对supplier封装一个CachingSupplier (见getSource方法)
* 然后又用sourceOperator封装一个NullPointerExceptionSafeSupplier(调用alwaysApplyingWhenNonNull方法返回的PropertyMapper进行此操作).
* 此处返回的Source对supplier进行了两层封装.
*/
public <T> PropertyMapper.Source<T> from(Supplier<T> supplier) {
Assert.notNull(supplier, "Supplier must not be null");
//getSource方法见下,包装一层带缓存的CachingSupplier
PropertyMapper.Source<T> source = this.getSource(supplier);
if (this.sourceOperator != null) {
//此处使用了sourceOperator,默认情况下就是上文传入的source.whenNonNull方法.
//故此处新返回source里的supplier是NullPointerExceptionSafeSupplier
source = this.sourceOperator.apply(source);
}
//默认情况下,return的Source中的Supplier先后包装了CachingSupplier和NullPointerExceptionSafeSupplier
return source;
}
private <T> PropertyMapper.Source<T> getSource(Supplier<T> supplier) {
return this.parent != null ?
//如果存在parent则使用parent的from方法.此处使用了INSTANCE的from方法.但是最终还是走到了下面的CachingSupplier
this.parent.from(supplier) :
//对传入的Supplier包装了一层带缓存(第一次调用会缓存)的Supplier,
//第二个是校验方法,使用的是默认的静态变量(无论如何都返回true,即不校验)
new PropertyMapper.Source(new PropertyMapper.CachingSupplier(supplier), ALWAYS);
}
- 倒数两步是Source的as方法对结果的修改操作和to方法返回结果. 这两步就比较简单了,调用Function对结果进行修改(包装了层新的Supplier返回了新的Source)然后to调用Source的Supplier(supplier层层调用)返回结果.
public <R> Source<R> as(Function<T, R> adapter) {
Assert.notNull(adapter, "Adapter must not be null");
Supplier<Boolean> test = () -> this.predicate.test(this.supplier.get());
Predicate<R> predicate = (t) -> test.get();
Supplier<R> supplier = () -> {
if (test.get()) {
return adapter.apply(this.supplier.get());
}
return null;
};
//
return new Source<>(supplier, predicate);
}
public void to(Consumer<T> consumer) {
Assert.notNull(consumer, "Consumer must not be null");
//这个supplier层层封装.
T value = this.supplier.get();
//符合predicate就执行consumer
if (this.predicate.test(value)) {
consumer.accept(value);
}
}
- 测试
private static void sourceTest() {
StringReturnTest origin = new StringReturnTest(), lastResult = new StringReturnTest();
origin.setMsg("first set");
PropertyMapper.Source<String> source = PropertyMapper.get().alwaysApplyingWhenNonNull().from(origin::getMsg);
origin.setMsg("second set");
//执行to的时候才真正调用origin::getmsg
source.as(s -> s).to(lastResult::setMsg);
//输出 "second set"
log.info("result:{}", lastResult.getMsg());
//由于CacheSupplier的关系,此处返回依旧不变仍是"second set"
origin.setMsg("third set");
source.to(lastResult::setMsg);
//输出 "second set"
log.info("result:{}", lastResult.getMsg());
}
-
未经as的source的supplier可以debug时看到.
两层supplier
网友评论