美文网首页
XXL定时任务-注册过程

XXL定时任务-注册过程

作者: 佛爷石 | 来源:发表于2020-03-25 17:18 被阅读0次

一、启用

  1. 配置XxlJobConfig

  2. 示例代码:

    @Configuration
    public class XxlJobConfig {
        /**配置参数 - 实例化XxlJobSpringExecutor所需要的参数 **/
        @Value("${xxl.job.admin.addresses}")
        private String adminAddresses;
        ......
        /** 启动xxl后续过程,需要实例化XxlJobSpringExecutor **/    
        @Bean(initMethod = "start", destroyMethod = "destroy")
        public XxlJobSpringExecutor xxlJobExecutor() {
            ......
        }
    }
    
  3. 这里关注@Bean initMethod destroyMethod

    需要注意的是:

    ​ 单实例bean:容器启动时创建对象

    ​ 多实例bean:每次获取时创建对象

    初始化:

    ​ 对象创建完成,赋值完成,调用初始化方法

    销毁:

    ​ 单实例:容器关闭时调用

    ​ 多实例:容器不会销毁,只能手动调用销毁方法

  4. 这里我们进入类 : XxlJobSpringExecutor

    public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware {
        @Override
        public void start() throws Exception {
         /** 重点看这个方法 **/
            // init JobHandler Repository
            initJobHandlerRepository(applicationContext);
            // refresh GlueFactory
            GlueFactory.refreshInstance(1);
            /** 之后跟踪父类方法 **/
            // super start
            super.start();
        }
        /** 初始化本地JobHandler存储 **/
        private void initJobHandlerRepository(ApplicationContext applicationContext){
            if (applicationContext == null) {
                return;
            }
         /** 拿到spring上下文,获取注解了JobHandler的实例 **/
            Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);
    
            if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
                for (Object serviceBean : serviceBeanMap.values()) {
                    if (serviceBean instanceof IJobHandler){
                        /** 这里拿到我们自定义的JobHandler名称**/
                        String name = serviceBean.getClass().getAnnotation(JobHandler.class).value();
                        IJobHandler handler = (IJobHandler) serviceBean;
                        /** 判断本地缓存是否已经存在将要注册的名称 **/
                        if (loadJobHandler(name) != null) {
                            throw new RuntimeException("xxl-job jobhandler naming conflicts.");
                        }
                        /** 调用父类(XxlJobExecutor)的方法,缓存servie到本地 **/
                        registJobHandler(name, handler);
                    }
                }
            }
        }
        
        
        /** 获取spring上下文 **/
        private static ApplicationContext applicationContext;
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
        }
    }
    
  1. 父类解析 - XxlJobExecutor (由于这个类过长,我们逐个分析)

    /** 用于缓存JobHandler的map **/
    private static ConcurrentHashMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
     /** 缓存JobHandler信息到本地缓存 **/
        public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
            logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
            return jobHandlerRepository.put(name, jobHandler);
        }
     /** 根据自定义的JobHandler名称获取JobHandler实例**/
        public static IJobHandler loadJobHandler(String name){
            return jobHandlerRepository.get(name);
        }
    

    上面过程结束,本地的缓存就结束了,也就结束了启用的过程,下一步我们将继续跟踪后面的过程,注册;

二、注册

​ 首先看 XxlJobExecutor 中的start()方法和destroy() :

// ---------------------- start + stop ----------------------
    public void start() throws Exception {
        // init logpath - 日志路径初始化,由于logPath可以默认为空,且不是我们讨论主要流程,这里先不看他
        XxlJobFileAppender.initLogPath(logPath);
        // init admin-client 
        initAdminBizList(adminAddresses, accessToken);
        // init JobLogFileCleanThread
        JobLogFileCleanThread.getInstance().start(logRetentionDays);
        // init TriggerCallbackThread
        TriggerCallbackThread.getInstance().start();
        // init executor-server
        port = port>0?port: NetUtil.findAvailablePort(9999);
        ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
        initRpcProvider(ip, port, appName, accessToken);
    }

针对start()方法中的 initAdminBizList 调用:

/** **/
private static List<AdminBiz> adminBizList;
/** adminAddresses:http://192.168.1.33:8099/xxl-job-admin **/
/** accessToken: 没有使用**/
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
    if (adminAddresses!=null && adminAddresses.trim().length()>0) {
        for (String address: adminAddresses.trim().split(",")) {
            if (address!=null && address.trim().length()>0) {
                String addressUrl = address.concat(AdminBiz.MAPPING);
                /** 组装adminBiz实例 - 这里用了动态代理 **/
                AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), CallType.SYNC,AdminBiz.class, null, 10000, addressUrl, accessToken, null).getObject();
                if (adminBizList == null) {
                    adminBizList = new ArrayList<AdminBiz>();
                }
                adminBizList.add(adminBiz);
            }
        }
    }
}

AdminBiz:

private NetEnum netType;    - JETTY
private Serializer serializer; - HESSIAN
private CallType callType; - SYNC
private Class<?> iface; - AdminBiz
private String version;
private long timeout;
private String address; - http://192.168.1.33:8099/xxl-job-admin/api
private String accessToken; - 
private XxlRpcInvokeCallback invokeCallback; - null
client : Client client 

重点关注:initRpcProvider()方法

private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {
        // init invoker factory -- 此处暂时不用
        xxlRpcInvokerFactory = new XxlRpcInvokerFactory();
        // init, provider factory
        String address = IpUtil.getIpPort(ip, port);
        Map<String, String> serviceRegistryParam = new HashMap<String, String>();
        serviceRegistryParam.put("appName", appName);
        serviceRegistryParam.put("address", address);

        xxlRpcProviderFactory = new XxlRpcProviderFactory();
        xxlRpcProviderFactory.initConfig(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);

        // add services
        xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());

        // start - 此处调用 
       xxlRpcProviderFactory.start();

    }

类:XxlRpcProviderFactory

public void start() throws Exception {
        // start server
        server = netType.serverClass.newInstance();
        server.setStartedCallback(new BaseCallback() {      // serviceRegistry started
            @Override
            public void run() throws Exception {
                // start registry
                if (serviceRegistryClass != null) {
                    serviceRegistry = serviceRegistryClass.newInstance();
                      /** 此处调用执行具体的 ExecutorServiceRegistry - start() 
                      ExecutorRegistryThread - start() 方法
                      **/
                    serviceRegistry.start(serviceRegistryParam);

                    if (serviceData.size() > 0) {
                        String ipPort = IpUtil.getIpPort(ip, port);
                        for (String serviceKey :serviceData.keySet()) {
                            serviceRegistry.registry(serviceKey, ipPort);
                        }
                    }
                }
            }
        });
        server.setStopedCallback(new BaseCallback() {       // serviceRegistry stoped
            @Override
            public void run() {
                // stop registry
                if (serviceRegistry != null) {
                    if (serviceData.size() > 0) {
                        String ipPort = IpUtil.getIpPort(ip, port);
                        for (String serviceKey :serviceData.keySet()) {
                            serviceRegistry.remove(serviceKey, ipPort);
                        }
                    }
                    serviceRegistry.stop();
                    serviceRegistry = null;
                }
            }
        });
        server.start(this);
    }

再看: ExecutorRegistryThread - start() - 每隔30s发一次的注册信息,就在这个方法里面完成

public void start(final String appName, final String address){
        // valid
        if (appName==null || appName.trim().length()==0) {
            logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appName is null.");
            return;
        }
        if (XxlJobExecutor.getAdminBizList() == null) {
            logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
            return;
        }

        registryThread = new Thread(new Runnable() {
            @Override
            public void run() {

                // registry
                while (!toStop) {
                    try {
                        RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
                        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                            try {
                                /** 重点关注这个调用 在调用adminBiz.registry的时候,会触发 initAdminBizList 中 XxlRpcReferenceBean 的 getObject()方法的调用 ,下面给出这个方法**/
                                ReturnT<String> registryResult = adminBiz.registry(registryParam);
                                if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                    registryResult = ReturnT.SUCCESS;
                                    logger.info(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                    break;
                                } else {
                                    logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                }
                            } catch (Exception e) {
                                logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
                            }

                        }
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }

                    try {
                        TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                    } catch (InterruptedException e) {
                        logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
                    }
                }

                // registry remove
                try {
                    RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
                    for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                        try {
                            ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
                            if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                registryResult = ReturnT.SUCCESS;
                                logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                break;
                            } else {
                                logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                            }
                        } catch (Exception e) {
                            logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
                        }

                    }
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
                logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");

            }
        });
        registryThread.setDaemon(true);
        registryThread.start();
    }
public Object getObject() {
        return Proxy.newProxyInstance(Thread.currentThread()
                .getContextClassLoader(), new Class[] { iface },
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        String className = method.getDeclaringClass().getName();

                        // filter method like "Object.toString()"
                        if (Object.class.getName().equals(className)) {
                            logger.info(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}.{}]", className, method.getName());
                            throw new XxlRpcException("xxl-rpc proxy class-method not support");
                        }

                        // address
                        String address = routeAddress();
                        if (address==null || address.trim().length()==0) {
                            throw new XxlRpcException("xxl-rpc reference bean["+ className +"] address empty");
                        }

                        // request
                        XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();
                        xxlRpcRequest.setRequestId(UUID.randomUUID().toString());
                        xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());
                        xxlRpcRequest.setAccessToken(accessToken);
                        xxlRpcRequest.setClassName(className);
                        xxlRpcRequest.setMethodName(method.getName());
                        xxlRpcRequest.setParameterTypes(method.getParameterTypes());
                        xxlRpcRequest.setParameters(args);
                        /** 由于默认调用这个逻辑,就给出了这个逻辑 **/
                        // send
                        if (CallType.SYNC == callType) {
                            try {
                                // future set
                                XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(xxlRpcRequest, null);
                                /** 重点看这个逻辑,进行远程调用的 接着看 通信和序列化**/
                                // do invoke 
                                client.asyncSend(address, xxlRpcRequest);
                                // future get
                                XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
                                if (xxlRpcResponse.getErrorMsg() != null) {
                                    throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
                                }
                                return xxlRpcResponse.getResult();
                            } catch (Exception e) {
                                logger.info(">>>>>>>>>>> xxl-job, invoke error, address:{}, XxlRpcRequest{}", address, xxlRpcRequest);

                                throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
                            } finally{
                                // remove-InvokerFuture
                                XxlRpcFutureResponseFactory.removeInvokerFuture(xxlRpcRequest.getRequestId());
                            }
                        }
                });
    }

三、通信&序列化

通篇都是再说注册的过程,这里通信和序列化,通信框架的部分,由于默认使用的Jetty,我们就把Jetty的部分拿出来看下:JettyClient - postRequestAsync()

private void postRequestAsync(String address, XxlRpcRequest xxlRpcRequest) throws Exception {
        // reqURL
        String reqURL = address;
        if (!address.toLowerCase().startsWith("http")) {
            reqURL = "http://" + address;   // IP:PORT, need parse to url
        }
        /** 框架默认的使用Hession进行的序列化 **/
        // serialize request
        byte[] requestBytes = xxlRpcReferenceBean.getSerializer().serialize(xxlRpcRequest);

        // httpclient
        HttpClient httpClient = getJettyHttpClient();

        // request
        Request request = httpClient.newRequest(reqURL);
        request.method(HttpMethod.POST);
        request.timeout(xxlRpcReferenceBean.getTimeout() + 500, TimeUnit.MILLISECONDS);     // async, not need timeout
        request.content(new BytesContentProvider(requestBytes));
       /** 调用send方法,发送消息到远程服务端注册,到此结束 **/
        // invoke
        request.send(new BufferingResponseListener() {
            /** 这里处理 调用完成的结果 **/
            @Override
            public void onComplete(Result result) {
                } catch (Exception e){

                  
        });
    }

结束语:

  1. 注解的定义和解析
  2. RPC框架支持
  3. 动态代理的使用
  4. 工厂模式的使用

相关文章

网友评论

      本文标题:XXL定时任务-注册过程

      本文链接:https://www.haomeiwen.com/subject/rmexuhtx.html