美文网首页
聊聊如何利用管道模式来进行业务编排(上篇)

聊聊如何利用管道模式来进行业务编排(上篇)

作者: linyb极客之路 | 来源:发表于2022-08-30 09:51 被阅读0次

    前言

    1、什么是管道模式

    管道模式不属于我们常说的23种设计模式中的一种,它可以看成是责任链模式的一种变体。所谓的管道模式用技术话来说,就是把数据传递给一个任务队列,由任务队列按次序依次对数据进行加工处理。


    image.png

    2、什么样的场景适合用管道模式

    当业务流程比较复杂时,需要拆分成多个子步骤,且每个子步骤可以自由组合,替换,新增,删除的场景

    实现管道的一般套路

    1、封装管道数据透传上下文

    public class ChannelHandlerContext extends ConcurrentHashMap<String,Object> {
    
        protected static Class<? extends ChannelHandlerContext> contextClass = ChannelHandlerContext.class;
    
        protected static final TransmittableThreadLocal<? extends ChannelHandlerContext> CHAIN_CONTEXT = new TransmittableThreadLocal<ChannelHandlerContext>() {
            @Override
            protected ChannelHandlerContext initialValue() {
                try {
                    return contextClass.getDeclaredConstructor().newInstance();
                } catch (Throwable e) {
                    throw new RuntimeException(e);
                }
            }
        };
    
        /**
         * 覆盖默认的管道上下文
         *
         * @param clazz
         */
        public static void setContextClass(Class<? extends ChannelHandlerContext> clazz) {
            contextClass = clazz;
        }
    
        /**
         * 获取当前管道上下文
         *
         *
         */
        public static final ChannelHandlerContext getCurrentContext() {
            return CHAIN_CONTEXT.get();
        }
    
        /**
         * 释放上下文资源
         *
         * @return
         */
        public void release() {
            this.clear();
            CHAIN_CONTEXT.remove();
        }
    
        /**
         *
         * 获取上下文默认值
         * @param key
         * @param defaultValue
         * @return
         */
        public Object getDefault(String key, Object defaultValue) {
            return Optional.ofNullable(get(key)).orElse(defaultValue);
        }
    
        public static final String CHANNEL_HANDLER_REQUEST_KEY = "channelHandlerRequest";
    
        public ChannelHandlerRequest getChannelHandlerRequest() {
            return (ChannelHandlerRequest) this.getDefault(CHANNEL_HANDLER_REQUEST_KEY,ChannelHandlerRequest.builder().build());
        }
    
    
    }
    
    

    2、定义管道抽象执行器

    public abstract class AbstactChannelHandler {
    
        private String channelHandlerName;
    
        public String getChannelHandlerName() {
            return channelHandlerName;
        }
    
        public void setChannelHandlerName(String channelHandlerName) {
            this.channelHandlerName = channelHandlerName;
        }
    
        public abstract boolean handler(ChannelHandlerContext chx);
    
    
    }
    
    

    3、定义管道

    @Slf4j
    public class ChannelPipeline {
    
        private LinkedBlockingDeque<AbstactChannelHandler> channelHandlers = new LinkedBlockingDeque();
    
        private ChannelHandlerContext handlerContext;
    
    
        public ChannelPipeline addFirst(AbstactChannelHandler channelHandler){
           return addFirst(null,channelHandler);
        }
    
        public ChannelPipeline addLast(AbstactChannelHandler channelHandler){
          return addLast(null,channelHandler);
        }
    
        public ChannelPipeline addFirst(String channelHandlerName,AbstactChannelHandler channelHandler){
            if(StringUtils.isNotBlank(channelHandlerName)){
                channelHandler.setChannelHandlerName(channelHandlerName);
            }
            channelHandlers.addFirst(channelHandler);
            return this;
        }
    
        public ChannelPipeline addLast(String channelHandlerName,AbstactChannelHandler channelHandler){
            if(org.apache.commons.lang3.StringUtils.isNotBlank(channelHandlerName)){
                channelHandler.setChannelHandlerName(channelHandlerName);
            }
            channelHandlers.addLast(channelHandler);
            return this;
        }
    
    
        public void setChannelHandlers(LinkedBlockingDeque<AbstactChannelHandler> channelHandlers) {
            this.channelHandlers = channelHandlers;
        }
    
        public ChannelHandlerContext getHandlerContext() {
            return handlerContext;
        }
    
        public void setHandlerContext(ChannelHandlerContext handlerContext) {
            this.handlerContext = handlerContext;
        }
    
        public boolean start(ChannelHandlerRequest channelHandlerRequest){
             if(channelHandlers.isEmpty()){
                 log.warn("channelHandlers is empty");
                 return false;
             }
    
            return handler(channelHandlerRequest);
        }
    
        private boolean handler(ChannelHandlerRequest channelHandlerRequest) {
            if(StringUtils.isBlank(channelHandlerRequest.getRequestId())){
                channelHandlerRequest.setRequestId(String.valueOf(SnowflakeUtils.getNextId()));
            }
            handlerContext.put(ChannelHandlerContext.CHANNEL_HANDLER_REQUEST_KEY,channelHandlerRequest);
            boolean isSuccess = true;
            try {
                for (AbstactChannelHandler channelHandler : channelHandlers) {
                      isSuccess = channelHandler.handler(handlerContext);
                    if(!isSuccess){
                        break;
                    }
                }
    
                if(!isSuccess){
                    channelHandlers.clear();
                }
            } catch (Exception e) {
                log.error("{}",e.getMessage());
                isSuccess = false;
            } finally {
                handlerContext.release();
            }
            return isSuccess;
        }
    
    }
    
    

    4、根据业务的复杂度拆分不同子任务管道执行器

    @Slf4j
    public class UserCheckChannelHandler extends AbstactChannelHandler {
    
        
        @Override
        public boolean handler(ChannelHandlerContext chx) {
            ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();
            System.out.println("------------------------------------步骤一:用户数据校验【"+channelHandlerRequest.getRequestId()+"】");
            Object params = channelHandlerRequest.getParams();
            if(params instanceof User){
                User user = (User)params;
                if(StringUtils.isBlank(user.getFullname())){
                    log.error("用户名不能为空");
                    return false;
                }
                return true;
            }
    
    
            return false;
        }
    }
    
    @Slf4j
    public class UserFillUsernameAndEmailChannelHandler extends AbstactChannelHandler {
        @SneakyThrows
        @Override
        public boolean handler(ChannelHandlerContext chx) {
            ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();
            System.out.println("------------------------------------步骤二:用户名以及邮箱填充【将汉语转成拼音填充】【"+channelHandlerRequest.getRequestId()+"】");
            Object params = channelHandlerRequest.getParams();
            if(params instanceof User){
                User user = (User)params;
                String fullname = user.getFullname();
                HanyuPinyinOutputFormat hanyuPinyinOutputFormat = new HanyuPinyinOutputFormat();
                hanyuPinyinOutputFormat.setToneType(HanyuPinyinToneType.WITHOUT_TONE);
                String username = PinyinHelper.toHanYuPinyinString(fullname, hanyuPinyinOutputFormat);
                user.setUsername(username);
                user.setEmail(username + "@qq.com");
                return true;
            }
    
    
            return false;
        }
    }
    
    
    public class UserPwdEncryptChannelHandler extends AbstactChannelHandler {
        @Override
        public boolean handler(ChannelHandlerContext chx) {
            ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();
            System.out.println("------------------------------------步骤三:用户密码明文转密文【"+channelHandlerRequest.getRequestId()+"】");
            Object params = channelHandlerRequest.getParams();
            if(params instanceof User){
                String encryptPwd = DigestUtil.sha256Hex(((User) params).getPassword());
                ((User) params).setPassword(encryptPwd);
                return true;
            }
    
            return false;
        }
    }
    
    
    public class UserMockSaveChannelHandler extends AbstactChannelHandler {
    
        @Override
        public boolean handler(ChannelHandlerContext chx) {
            ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();
            System.out.println("------------------------------------步骤四:模拟用户数据落库【"+channelHandlerRequest.getRequestId()+"】");
            Object params = channelHandlerRequest.getParams();
            if(params instanceof User){
                Map<String, User> userMap = new HashMap<>();
                User user = (User)params;
                userMap.put(user.getUsername(),user);
                chx.put("userMap",userMap);
                return true;
            }
    
    
            return false;
        }
    }
    
    
    public class UserPrintChannleHandler extends AbstactChannelHandler {
        @Override
        public boolean handler(ChannelHandlerContext chx) {
            ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();
            System.out.println("------------------------------------步骤五:打印用户数据【"+channelHandlerRequest.getRequestId()+"】");
            Object params = channelHandlerRequest.getParams();
            if(params instanceof User){
                Object userMap = chx.get("userMap");
                if(userMap instanceof Map){
                    Map map = (Map)userMap;
                    if(map.containsKey(((User) params).getUsername())){
                        System.out.println(map.get(((User) params).getUsername()));
                        return true;
                    }
                }
            }
    
            return false;
        }
    }
    
    

    5、对各个子任务进行编排组合

    @Service
    public class UserServiceImpl implements UserService {
    
        @Override
        public boolean save(User user) {
           return ChannelPipelineExecutor.pipeline()
                    .addLast(new UserCheckChannelHandler())
                    .addLast(new UserFillUsernameAndEmailChannelHandler())
                    .addLast(new UserPwdEncryptChannelHandler())
                    .addLast(new UserMockSaveChannelHandler())
                    .addLast(new UserPrintChannleHandler())
                    .start(ChannelHandlerRequest.builder().params(user).build());
        }
    }
    
    

    6、测试

      Faker faker = Faker.instance(Locale.CHINA);
            User user = User.builder().age(20)
                    .fullname(faker.name().fullName())
                    .mobile(faker.phoneNumber().phoneNumber())
                    .password("123456").build();
            userService.save(user);
    

    查看控制台
    [图片上传失败...(image-f910c2-1661824275785)]

    思考一下:上述实现的管道模式,有没有优化的空间?

    在步骤5对各个子任务进行编排组合,假设子业务存在N个步骤,我们需要addLast N次,感觉有点硬编码了。因此我们可以做如下改造

    改造

    1、定义管道注解

    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.TYPE)
    @Documented
    @Component
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public @interface Pipeline {
    
        Class consumePipelinesService();
    
        String consumePipelinesMethod();
    
        Class[] args() default {};
    
        int order();
    }
    
    

    2、定义管道扫描器

    public class PipelineClassPathBeanDefinitionScanner extends ClassPathBeanDefinitionScanner {
    
        public PipelineClassPathBeanDefinitionScanner(BeanDefinitionRegistry registry) {
            super(registry);
        }
    
    
        @Override
        protected Set<BeanDefinitionHolder> doScan(String... basePackages) {
            Set<BeanDefinitionHolder> beanDefinitionHolders = super.doScan(basePackages);
            for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {
                GenericBeanDefinition beanDefinition = (GenericBeanDefinition) beanDefinitionHolder.getBeanDefinition();
                String className = beanDefinition.getBeanClassName();
                beanDefinition.getPropertyValues().addPropertyValue("pipelineServiceClz",className);
                beanDefinition.setBeanClass(ComsumePipelineFactoryBean.class);
    
            }
    
            return beanDefinitionHolders;
    
        }
    
        @Override
        protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
            return beanDefinition.getMetadata().isInterface();
        }
    }
    
    

    3、定义管道注册器

    public class PipelineImportBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar {
    
        @Override
        public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
    
    
            PipelineClassPathBeanDefinitionScanner scanner = new PipelineClassPathBeanDefinitionScanner(registry);
            scanner.addIncludeFilter(new AnnotationTypeFilter(FunctionalInterface.class));
            Set<String> basePackages = getBasePackages(importingClassMetadata);
            String[] basePackageArr = {};
            scanner.scan(basePackages.toArray(basePackageArr));
    
        }
    
        protected Set<String> getBasePackages(AnnotationMetadata importingClassMetadata) {
            Map<String, Object> attributes = importingClassMetadata.getAnnotationAttributes(EnabledPipeline.class.getCanonicalName());
    
            Set<String> basePackages = new HashSet<>();
    
            for (String pkg : (String[]) attributes.get("basePackages")) {
                if (StringUtils.hasText(pkg)) {
                    basePackages.add(pkg);
                }
            }
    
            if (basePackages.isEmpty()) {
                basePackages.add(
                        ClassUtils.getPackageName(importingClassMetadata.getClassName()));
            }
            return basePackages;
        }
    }
    
    

    4、定义EnableXXX注解

    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.TYPE)
    @Documented
    @Import(PipelineImportBeanDefinitionRegistrar.class)
    public @interface EnabledPipeline {
    
        String[] basePackages() default {};
    }
    
    

    注: 此外还需定义管道代理和管道factoryBean,因为篇幅就不贴了。感兴趣的朋友就查看文末的demo链接

    5、将原有的管道任务执行器,改造成如下

    @Slf4j
    @Pipeline(consumePipelinesService = UserService.class,consumePipelinesMethod = "save",args = {User.class},order = 1)
    public class UserCheckChannelHandler extends AbstactChannelHandler {
    
        
        @Override
        public boolean handler(ChannelHandlerContext chx) {
            ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();
            System.out.println("------------------------------------步骤一:用户数据校验【"+channelHandlerRequest.getRequestId()+"】");
            String json = JSON.toJSONString(channelHandlerRequest.getParams());
            List<User> users = JSON.parseArray(json,User.class);
            if(CollectionUtil.isEmpty(users) || StringUtils.isBlank(users.get(0).getFullname())){
                log.error("用户名不能为空");
                return false;
            }
            return true;
    
    
        }
    }
    
    
    @Slf4j
    @Pipeline(consumePipelinesService = UserService.class,consumePipelinesMethod = "save",args = {User.class},order = 2)
    public class UserFillUsernameAndEmailChannelHandler extends AbstactChannelHandler {
        @SneakyThrows
        @Override
        public boolean handler(ChannelHandlerContext chx) {
            ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();
            System.out.println("------------------------------------步骤二:用户名以及邮箱填充【将汉语转成拼音填充】【"+channelHandlerRequest.getRequestId()+"】");
            String json = JSON.toJSONString(channelHandlerRequest.getParams());
            List<User> users = JSON.parseArray(json,User.class);
            if(CollectionUtil.isNotEmpty(users)){
                User user = users.get(0);
                String fullname = user.getFullname();
                HanyuPinyinOutputFormat hanyuPinyinOutputFormat = new HanyuPinyinOutputFormat();
                hanyuPinyinOutputFormat.setToneType(HanyuPinyinToneType.WITHOUT_TONE);
                String username = PinyinHelper.toHanYuPinyinString(fullname, hanyuPinyinOutputFormat);
                user.setUsername(username);
                user.setEmail(username + "@qq.com");
                return true;
    
            }
    
    
    
            return false;
        }
    }
    
    

    。。。省略剩余管道任务执行器

    6、原来的步骤编排,仅需写接口即可

    @FunctionalInterface
    public interface UserService {
    
        boolean save(User user);
    
    }
    
    

    仅需这样即可进行编排

    7、测试

    在启动类上加上@EnabledPipeline注解。示例如下

    @SpringBootApplication
    @EnabledPipeline(basePackages = "com.github.lybgeek.pipeline.spring.test")
    public class SpringPipelineApplication  {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringPipelineApplication.class);
        }
    
    }
    
     @Test
        public void testPipeline(){
            boolean isOk = userService.save(user);
            Assert.assertTrue(isOk);
    
        }
    
    image.png

    编排的效果和之前的一样

    总结

    本文主要实现2种不同形式的管道模式,一种基于注解,编排步骤通过注解直接写在了执行器上,通过执行器去定位业务执行方法。另外一种是业务方法里面自己组合调用执行器。通过注解这方式虽然避免了业务方法自己去编排执行器,但也存在当执行器一多的话,就需要翻每个执行器类,看他的执行器顺序,这样可能会出现执行器因为顺序问题,而达不到我们想要的组合效果。基于这个问题,我将在下篇文章,在介绍其他2种实现方式

    demo链接

    https://github.com/lyb-geek/springboot-learning/tree/master/springboot-pipeline

    相关文章

      网友评论

          本文标题:聊聊如何利用管道模式来进行业务编排(上篇)

          本文链接:https://www.haomeiwen.com/subject/ojdlnrtx.html