前言
1、什么是管道模式
管道模式不属于我们常说的23种设计模式中的一种,它可以看成是责任链模式的一种变体。所谓的管道模式用技术话来说,就是把数据传递给一个任务队列,由任务队列按次序依次对数据进行加工处理。
image.png
2、什么样的场景适合用管道模式
当业务流程比较复杂时,需要拆分成多个子步骤,且每个子步骤可以自由组合,替换,新增,删除的场景
实现管道的一般套路
1、封装管道数据透传上下文
public class ChannelHandlerContext extends ConcurrentHashMap<String,Object> {
protected static Class<? extends ChannelHandlerContext> contextClass = ChannelHandlerContext.class;
protected static final TransmittableThreadLocal<? extends ChannelHandlerContext> CHAIN_CONTEXT = new TransmittableThreadLocal<ChannelHandlerContext>() {
@Override
protected ChannelHandlerContext initialValue() {
try {
return contextClass.getDeclaredConstructor().newInstance();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
};
/**
* 覆盖默认的管道上下文
*
* @param clazz
*/
public static void setContextClass(Class<? extends ChannelHandlerContext> clazz) {
contextClass = clazz;
}
/**
* 获取当前管道上下文
*
*
*/
public static final ChannelHandlerContext getCurrentContext() {
return CHAIN_CONTEXT.get();
}
/**
* 释放上下文资源
*
* @return
*/
public void release() {
this.clear();
CHAIN_CONTEXT.remove();
}
/**
*
* 获取上下文默认值
* @param key
* @param defaultValue
* @return
*/
public Object getDefault(String key, Object defaultValue) {
return Optional.ofNullable(get(key)).orElse(defaultValue);
}
public static final String CHANNEL_HANDLER_REQUEST_KEY = "channelHandlerRequest";
public ChannelHandlerRequest getChannelHandlerRequest() {
return (ChannelHandlerRequest) this.getDefault(CHANNEL_HANDLER_REQUEST_KEY,ChannelHandlerRequest.builder().build());
}
}
2、定义管道抽象执行器
public abstract class AbstactChannelHandler {
private String channelHandlerName;
public String getChannelHandlerName() {
return channelHandlerName;
}
public void setChannelHandlerName(String channelHandlerName) {
this.channelHandlerName = channelHandlerName;
}
public abstract boolean handler(ChannelHandlerContext chx);
}
3、定义管道
@Slf4j
public class ChannelPipeline {
private LinkedBlockingDeque<AbstactChannelHandler> channelHandlers = new LinkedBlockingDeque();
private ChannelHandlerContext handlerContext;
public ChannelPipeline addFirst(AbstactChannelHandler channelHandler){
return addFirst(null,channelHandler);
}
public ChannelPipeline addLast(AbstactChannelHandler channelHandler){
return addLast(null,channelHandler);
}
public ChannelPipeline addFirst(String channelHandlerName,AbstactChannelHandler channelHandler){
if(StringUtils.isNotBlank(channelHandlerName)){
channelHandler.setChannelHandlerName(channelHandlerName);
}
channelHandlers.addFirst(channelHandler);
return this;
}
public ChannelPipeline addLast(String channelHandlerName,AbstactChannelHandler channelHandler){
if(org.apache.commons.lang3.StringUtils.isNotBlank(channelHandlerName)){
channelHandler.setChannelHandlerName(channelHandlerName);
}
channelHandlers.addLast(channelHandler);
return this;
}
public void setChannelHandlers(LinkedBlockingDeque<AbstactChannelHandler> channelHandlers) {
this.channelHandlers = channelHandlers;
}
public ChannelHandlerContext getHandlerContext() {
return handlerContext;
}
public void setHandlerContext(ChannelHandlerContext handlerContext) {
this.handlerContext = handlerContext;
}
public boolean start(ChannelHandlerRequest channelHandlerRequest){
if(channelHandlers.isEmpty()){
log.warn("channelHandlers is empty");
return false;
}
return handler(channelHandlerRequest);
}
private boolean handler(ChannelHandlerRequest channelHandlerRequest) {
if(StringUtils.isBlank(channelHandlerRequest.getRequestId())){
channelHandlerRequest.setRequestId(String.valueOf(SnowflakeUtils.getNextId()));
}
handlerContext.put(ChannelHandlerContext.CHANNEL_HANDLER_REQUEST_KEY,channelHandlerRequest);
boolean isSuccess = true;
try {
for (AbstactChannelHandler channelHandler : channelHandlers) {
isSuccess = channelHandler.handler(handlerContext);
if(!isSuccess){
break;
}
}
if(!isSuccess){
channelHandlers.clear();
}
} catch (Exception e) {
log.error("{}",e.getMessage());
isSuccess = false;
} finally {
handlerContext.release();
}
return isSuccess;
}
}
4、根据业务的复杂度拆分不同子任务管道执行器
@Slf4j
public class UserCheckChannelHandler extends AbstactChannelHandler {
@Override
public boolean handler(ChannelHandlerContext chx) {
ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();
System.out.println("------------------------------------步骤一:用户数据校验【"+channelHandlerRequest.getRequestId()+"】");
Object params = channelHandlerRequest.getParams();
if(params instanceof User){
User user = (User)params;
if(StringUtils.isBlank(user.getFullname())){
log.error("用户名不能为空");
return false;
}
return true;
}
return false;
}
}
@Slf4j
public class UserFillUsernameAndEmailChannelHandler extends AbstactChannelHandler {
@SneakyThrows
@Override
public boolean handler(ChannelHandlerContext chx) {
ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();
System.out.println("------------------------------------步骤二:用户名以及邮箱填充【将汉语转成拼音填充】【"+channelHandlerRequest.getRequestId()+"】");
Object params = channelHandlerRequest.getParams();
if(params instanceof User){
User user = (User)params;
String fullname = user.getFullname();
HanyuPinyinOutputFormat hanyuPinyinOutputFormat = new HanyuPinyinOutputFormat();
hanyuPinyinOutputFormat.setToneType(HanyuPinyinToneType.WITHOUT_TONE);
String username = PinyinHelper.toHanYuPinyinString(fullname, hanyuPinyinOutputFormat);
user.setUsername(username);
user.setEmail(username + "@qq.com");
return true;
}
return false;
}
}
public class UserPwdEncryptChannelHandler extends AbstactChannelHandler {
@Override
public boolean handler(ChannelHandlerContext chx) {
ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();
System.out.println("------------------------------------步骤三:用户密码明文转密文【"+channelHandlerRequest.getRequestId()+"】");
Object params = channelHandlerRequest.getParams();
if(params instanceof User){
String encryptPwd = DigestUtil.sha256Hex(((User) params).getPassword());
((User) params).setPassword(encryptPwd);
return true;
}
return false;
}
}
public class UserMockSaveChannelHandler extends AbstactChannelHandler {
@Override
public boolean handler(ChannelHandlerContext chx) {
ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();
System.out.println("------------------------------------步骤四:模拟用户数据落库【"+channelHandlerRequest.getRequestId()+"】");
Object params = channelHandlerRequest.getParams();
if(params instanceof User){
Map<String, User> userMap = new HashMap<>();
User user = (User)params;
userMap.put(user.getUsername(),user);
chx.put("userMap",userMap);
return true;
}
return false;
}
}
public class UserPrintChannleHandler extends AbstactChannelHandler {
@Override
public boolean handler(ChannelHandlerContext chx) {
ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();
System.out.println("------------------------------------步骤五:打印用户数据【"+channelHandlerRequest.getRequestId()+"】");
Object params = channelHandlerRequest.getParams();
if(params instanceof User){
Object userMap = chx.get("userMap");
if(userMap instanceof Map){
Map map = (Map)userMap;
if(map.containsKey(((User) params).getUsername())){
System.out.println(map.get(((User) params).getUsername()));
return true;
}
}
}
return false;
}
}
5、对各个子任务进行编排组合
@Service
public class UserServiceImpl implements UserService {
@Override
public boolean save(User user) {
return ChannelPipelineExecutor.pipeline()
.addLast(new UserCheckChannelHandler())
.addLast(new UserFillUsernameAndEmailChannelHandler())
.addLast(new UserPwdEncryptChannelHandler())
.addLast(new UserMockSaveChannelHandler())
.addLast(new UserPrintChannleHandler())
.start(ChannelHandlerRequest.builder().params(user).build());
}
}
6、测试
Faker faker = Faker.instance(Locale.CHINA);
User user = User.builder().age(20)
.fullname(faker.name().fullName())
.mobile(faker.phoneNumber().phoneNumber())
.password("123456").build();
userService.save(user);
查看控制台
[图片上传失败...(image-f910c2-1661824275785)]
思考一下:上述实现的管道模式,有没有优化的空间?
在步骤5对各个子任务进行编排组合,假设子业务存在N个步骤,我们需要addLast N次,感觉有点硬编码了。因此我们可以做如下改造
改造
1、定义管道注解
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Component
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public @interface Pipeline {
Class consumePipelinesService();
String consumePipelinesMethod();
Class[] args() default {};
int order();
}
2、定义管道扫描器
public class PipelineClassPathBeanDefinitionScanner extends ClassPathBeanDefinitionScanner {
public PipelineClassPathBeanDefinitionScanner(BeanDefinitionRegistry registry) {
super(registry);
}
@Override
protected Set<BeanDefinitionHolder> doScan(String... basePackages) {
Set<BeanDefinitionHolder> beanDefinitionHolders = super.doScan(basePackages);
for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {
GenericBeanDefinition beanDefinition = (GenericBeanDefinition) beanDefinitionHolder.getBeanDefinition();
String className = beanDefinition.getBeanClassName();
beanDefinition.getPropertyValues().addPropertyValue("pipelineServiceClz",className);
beanDefinition.setBeanClass(ComsumePipelineFactoryBean.class);
}
return beanDefinitionHolders;
}
@Override
protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
return beanDefinition.getMetadata().isInterface();
}
}
3、定义管道注册器
public class PipelineImportBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar {
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
PipelineClassPathBeanDefinitionScanner scanner = new PipelineClassPathBeanDefinitionScanner(registry);
scanner.addIncludeFilter(new AnnotationTypeFilter(FunctionalInterface.class));
Set<String> basePackages = getBasePackages(importingClassMetadata);
String[] basePackageArr = {};
scanner.scan(basePackages.toArray(basePackageArr));
}
protected Set<String> getBasePackages(AnnotationMetadata importingClassMetadata) {
Map<String, Object> attributes = importingClassMetadata.getAnnotationAttributes(EnabledPipeline.class.getCanonicalName());
Set<String> basePackages = new HashSet<>();
for (String pkg : (String[]) attributes.get("basePackages")) {
if (StringUtils.hasText(pkg)) {
basePackages.add(pkg);
}
}
if (basePackages.isEmpty()) {
basePackages.add(
ClassUtils.getPackageName(importingClassMetadata.getClassName()));
}
return basePackages;
}
}
4、定义EnableXXX注解
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(PipelineImportBeanDefinitionRegistrar.class)
public @interface EnabledPipeline {
String[] basePackages() default {};
}
注: 此外还需定义管道代理和管道factoryBean,因为篇幅就不贴了。感兴趣的朋友就查看文末的demo链接
5、将原有的管道任务执行器,改造成如下
@Slf4j
@Pipeline(consumePipelinesService = UserService.class,consumePipelinesMethod = "save",args = {User.class},order = 1)
public class UserCheckChannelHandler extends AbstactChannelHandler {
@Override
public boolean handler(ChannelHandlerContext chx) {
ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();
System.out.println("------------------------------------步骤一:用户数据校验【"+channelHandlerRequest.getRequestId()+"】");
String json = JSON.toJSONString(channelHandlerRequest.getParams());
List<User> users = JSON.parseArray(json,User.class);
if(CollectionUtil.isEmpty(users) || StringUtils.isBlank(users.get(0).getFullname())){
log.error("用户名不能为空");
return false;
}
return true;
}
}
@Slf4j
@Pipeline(consumePipelinesService = UserService.class,consumePipelinesMethod = "save",args = {User.class},order = 2)
public class UserFillUsernameAndEmailChannelHandler extends AbstactChannelHandler {
@SneakyThrows
@Override
public boolean handler(ChannelHandlerContext chx) {
ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();
System.out.println("------------------------------------步骤二:用户名以及邮箱填充【将汉语转成拼音填充】【"+channelHandlerRequest.getRequestId()+"】");
String json = JSON.toJSONString(channelHandlerRequest.getParams());
List<User> users = JSON.parseArray(json,User.class);
if(CollectionUtil.isNotEmpty(users)){
User user = users.get(0);
String fullname = user.getFullname();
HanyuPinyinOutputFormat hanyuPinyinOutputFormat = new HanyuPinyinOutputFormat();
hanyuPinyinOutputFormat.setToneType(HanyuPinyinToneType.WITHOUT_TONE);
String username = PinyinHelper.toHanYuPinyinString(fullname, hanyuPinyinOutputFormat);
user.setUsername(username);
user.setEmail(username + "@qq.com");
return true;
}
return false;
}
}
。。。省略剩余管道任务执行器
6、原来的步骤编排,仅需写接口即可
@FunctionalInterface
public interface UserService {
boolean save(User user);
}
仅需这样即可进行编排
7、测试
在启动类上加上@EnabledPipeline注解。示例如下
@SpringBootApplication
@EnabledPipeline(basePackages = "com.github.lybgeek.pipeline.spring.test")
public class SpringPipelineApplication {
public static void main(String[] args) {
SpringApplication.run(SpringPipelineApplication.class);
}
}
@Test
public void testPipeline(){
boolean isOk = userService.save(user);
Assert.assertTrue(isOk);
}
image.png
编排的效果和之前的一样
总结
本文主要实现2种不同形式的管道模式,一种基于注解,编排步骤通过注解直接写在了执行器上,通过执行器去定位业务执行方法。另外一种是业务方法里面自己组合调用执行器。通过注解这方式虽然避免了业务方法自己去编排执行器,但也存在当执行器一多的话,就需要翻每个执行器类,看他的执行器顺序,这样可能会出现执行器因为顺序问题,而达不到我们想要的组合效果。基于这个问题,我将在下篇文章,在介绍其他2种实现方式
demo链接
https://github.com/lyb-geek/springboot-learning/tree/master/springboot-pipeline
网友评论