美文网首页源码阅读
XX-JOB阅读笔记(一):执行器注册到管理器实现方式

XX-JOB阅读笔记(一):执行器注册到管理器实现方式

作者: 黎明_dba5 | 来源:发表于2019-11-01 09:50 被阅读0次

    这里不再对开源框架XX-JOB做介绍,单纯介绍部分功能实现原理。本篇记录执行器Executor如何注册到任务管理器Admin。
    本系列文章基于V2.1.0版本介绍,附github上架构图。

    image.png Executor将本执行器的ip地址及端口号注册到Admin,实际是保存在数据库表xxl_job_registry中,保存后地址如: image.png

    然后Admin在根据不同策略获取这些地址。

    整体大概流程

    image.png

    工程目录

    image.png

    以springboot为例

    初始化配置文件XxlJobConfig.java -->创建XxlJobSpringExecutor.java(set 执行器和管理器各种信息) bean -->指定init和destory方法 --> XxlJobSpringExecutor执行start()

    //从application.properties文件中读取admin和executor信息,并初始化到XxlJobSpringExecutor类中,指定init和destory方法
    @Bean(initMethod = "start", destroyMethod = "destroy")
        public XxlJobSpringExecutor xxlJobExecutor() {
            logger.info(">>>>>>>>>>> xxl-job config init.");
            XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
            //设置admin地址,eg:http://127.0.0.1:8080/xxl-job-admin
            xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
            //设置执行器名称,eg:xxl-job-executor-sample
            xxlJobSpringExecutor.setAppName(appName);
            //设置执行器ip和port
            xxlJobSpringExecutor.setIp(ip);       
            xxlJobSpringExecutor.setPort(port);
             //设置执行器访问口令
            xxlJobSpringExecutor.setAccessToken(accessToken);
           //设置日志保存路径
            xxlJobSpringExecutor.setLogPath(logPath);
             //设置日志保存天数
            xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
    
            return xxlJobSpringExecutor;
        }
    

    接着XxlJobSpringExecutor执行start方法

    public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware {
        @Override
        public void start() throws Exception {
            // 初始化执行器上面的任务
            initJobHandlerRepository(applicationContext);
            // refresh GlueFactory
            GlueFactory.refreshInstance(1);
            // super start
            super.start();
        }
    

    接着父类XxlJobExecutor 执行start方法

    // ---------------------- start + stop ----------------------
        public void start() throws Exception {
            //设置日志路径
            // init logpath
            XxlJobFileAppender.initLogPath(logPath);
            //设置admin地址及执行器访问口令
            // init invoker, admin-client
            initAdminBizList(adminAddresses, accessToken);
            //设置日志清理线程参数
            // init JobLogFileCleanThread
            JobLogFileCleanThread.getInstance().start(logRetentionDays);
            //任务执行结果回调线程(包含回调失败后重试机制)
            // init TriggerCallbackThread
            TriggerCallbackThread.getInstance().start();
            // init executor-server
            //设置执行器ip和port
            port = port>0?port: NetUtil.findAvailablePort(9999);
            ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
            //注册执行器
            initRpcProvider(ip, port, appName, accessToken);
        }
    

    接着看上面的initAdminBizList(adminAddresses, accessToken)方法,这一步是初始化Admin的值,以及初始化执行器访问口令,下面看具体执行逻辑

    //初始化各种rpc的各种协议
    private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
            serializer = Serializer.SerializeEnum.HESSIAN.getSerializer();
            if (adminAddresses!=null && adminAddresses.trim().length()>0) {
                for (String address: adminAddresses.trim().split(",")) {
                    if (address!=null && address.trim().length()>0) {
                        String addressUrl = address.concat(AdminBiz.MAPPING);
                        // 这里的getObject() 返回的是一个动态代理对象,代理对象在使用方法时,并不是真实的自己调用,而是委托尤其关联到的hander对象的invoke方法来调用
                        AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(
                                NetEnum.NETTY_HTTP,
                                serializer,
                                CallType.SYNC,
                                LoadBalance.ROUND,
                                AdminBiz.class,
                                null,
                                3000,
                                addressUrl,
                                accessToken,
                                null,
                                null
                        ).getObject();//getObject方法比较重要
                        if (adminBizList == null) {
                            adminBizList = new ArrayList<AdminBiz>();
                        }
                        adminBizList.add(adminBiz);
                    }
                }
            }
        }
    
    public Object getObject() {
                    //使用动态代理,通过此方法发送请求到Admin的/api接口,api接口收到请求后,解析出具体的方法和参数,获取到对应的Bean,通过反射执行具体的方法,最终实现调用AdminBizImpl.registry()
            return Proxy.newProxyInstance(Thread.currentThread()
                    .getContextClassLoader(), new Class[] { iface },
                    new InvocationHandler() {
                        @Override
                        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    

    接着看initRpcProvider(ip, port, appName, accessToken)方法:

    private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {
            // init, provider factory
            String address = IpUtil.getIpPort(ip, port);
            Map<String, String> serviceRegistryParam = new HashMap<String, String>();
            serviceRegistryParam.put("appName", appName);
            serviceRegistryParam.put("address", address);
            xxlRpcProviderFactory = new XxlRpcProviderFactory();
            //指定执行器注册类为ExecutorServiceRegistry
            xxlRpcProviderFactory.initConfig(NetEnum.NETTY_HTTP, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);
            // add services
            xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());
            //启动执行器注册工厂
            // start
            xxlRpcProviderFactory.start();
    
        }
    

    接着是 xxlRpcProviderFactory.start()方法

    public void start() throws Exception {
            // start server
            serviceAddress = IpUtil.getIpPort(this.ip, port);
            server = netType.serverClass.newInstance();
            server.setStartedCallback(new BaseCallback() {      // serviceRegistry started
                @Override
                public void run() throws Exception {
                    // start registry
                    if (serviceRegistryClass != null) {
                        serviceRegistry = serviceRegistryClass.newInstance();
                                            //执行器注册类启动
                        serviceRegistry.start(serviceRegistryParam);
                        if (serviceData.size() > 0) {
                            serviceRegistry.registry(serviceData.keySet(), serviceAddress);
                        }
                    }
                }
            });
            server.setStopedCallback(new BaseCallback() {       // serviceRegistry stoped
                @Override
                public void run() {
                    // stop registry
                    if (serviceRegistry != null) {
                        if (serviceData.size() > 0) {
                            serviceRegistry.remove(serviceData.keySet(), serviceAddress);
                        }
                        serviceRegistry.stop();
                        serviceRegistry = null;
                    }
                }
            });
            server.start(this);
        }
    

    回到ExecutorServiceRegistry的start方法,

    public static class ExecutorServiceRegistry extends ServiceRegistry {
            @Override
            public void start(Map<String, String> param) {
                //此处进行注册
                // start registry
                ExecutorRegistryThread.getInstance().start(param.get("appName"), param.get("address"));
            }
            @Override
            public void stop() {
                // stop registry
                ExecutorRegistryThread.getInstance().toStop();
            }
            @Override
            public boolean registry(Set<String> keys, String value) {
                return false;
            }
            @Override
            public boolean remove(Set<String> keys, String value) {
                return false;
            }
            @Override
            public Map<String, TreeSet<String>> discovery(Set<String> keys) {
                return null;
            }
            @Override
            public TreeSet<String> discovery(String key) {
                return null;
            }
        }
    

    进入ExecutorRegistryThread.start()方法,

     registryThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    // registry
                    while (!toStop) {
                        try {
                            RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
                            for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                                try {//此处注册
                                    ReturnT<String> registryResult = adminBiz.registry(registryParam);
                                    if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                        registryResult = ReturnT.SUCCESS;
                                        logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                        break;
                                    } else {
                                        logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                    }
                                } catch (Exception e) {
                                    logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
                                }
    
                            }
                        } catch (Exception e) {
                            if (!toStop) {
                                logger.error(e.getMessage(), e);
                            }
    
                        }
    

    Admin中接受Executor请求的入口

    /**
     * Executor调用Admin入口,接受到请求后,在进行反思操作,实现调用具体方法
     * Created by xuxueli on 17/5/10.
     */
    @Controller
    public class JobApiController implements InitializingBean {
        @Override
        public void afterPropertiesSet() throws Exception {
        }
        //执行器调用管理器方法入口
        @RequestMapping(AdminBiz.MAPPING)
        @PermissionLimit(limit=false)
        public void api(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {        
            XxlJobScheduler.invokeAdminService(request, response);
        }
    }
    

    XxlJobScheduler.invokeAdminService(request, response) ->servletServerHandler.handle(null, request, response)->xxlRpcProviderFactory.invokeService(xxlRpcRequest)

    public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) {
            //  make response
            XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
            xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());
    
            // match service bean 获取匹配的Bean
            String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());
            Object serviceBean = serviceData.get(serviceKey);
    
            // valid
            if (serviceBean == null) {
                xxlRpcResponse.setErrorMsg("The serviceKey["+ serviceKey +"] not found.");
                return xxlRpcResponse;
            }
    
            if (System.currentTimeMillis() - xxlRpcRequest.getCreateMillisTime() > 3*60*1000) {
                xxlRpcResponse.setErrorMsg("The timestamp difference between admin and executor exceeds the limit.");
                return xxlRpcResponse;
            }
            if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.trim().equals(xxlRpcRequest.getAccessToken())) {
                xxlRpcResponse.setErrorMsg("The access token[" + xxlRpcRequest.getAccessToken() + "] is wrong.");
                return xxlRpcResponse;
            }
    
            try {
                // invoke
                Class<?> serviceClass = serviceBean.getClass();
                String methodName = xxlRpcRequest.getMethodName();
                Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
                Object[] parameters = xxlRpcRequest.getParameters();
    
                Method method = serviceClass.getMethod(methodName, parameterTypes);
                method.setAccessible(true);
                //反射调用具体方法
                Object result = method.invoke(serviceBean, parameters);
    
                /*FastClass serviceFastClass = FastClass.create(serviceClass);
                FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
                Object result = serviceFastMethod.invoke(serviceBean, parameters);*/
    
                xxlRpcResponse.setResult(result);
            } catch (Throwable t) {
                // catch error
                logger.error("xxl-rpc provider invokeService error.", t);
                xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));
            }
    
            return xxlRpcResponse;
        }
    

    Admin中实现类

    @Override
        public ReturnT<String> registry(RegistryParam registryParam) {
            //注册信息入库
            int ret = xxlJobRegistryDao.registryUpdate(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
            if (ret < 1) {
                xxlJobRegistryDao.registrySave(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
    
                // fresh
                freshGroupRegistryInfo(registryParam);
            }
            return ReturnT.SUCCESS;
        }
    

    其中Executor动态代理AdminBiz接口和Admin的/api动态反射执行具体方法属于作者自研RPC框架部分,本篇只做了注册部分和解析部分的介绍,后续会单独介绍自研RPC框架部分。
    到此,执行器的地址就已经完全注册到管理器中,

    相关文章

      网友评论

        本文标题:XX-JOB阅读笔记(一):执行器注册到管理器实现方式

        本文链接:https://www.haomeiwen.com/subject/pdzjvctx.html