美文网首页
大熊猫分布式组件开发系列教程(三)

大熊猫分布式组件开发系列教程(三)

作者: 蛋皮皮652 | 来源:发表于2020-11-19 20:26 被阅读0次

    今天我们来看看springboot定时任务如何做成分布式组件来供项目集成依赖使用,接下来就跟着大熊猫一起做crontask分布式组件开发。

    首先我们先创建一个crontask模块

    其实这个定时任务组件最主要的操作就是定时任务记录,以及定时任务日志这两张表

    接着就是一些工厂类的封装,监听类的实现

    编写JpJob实现Job的excute方法,以及编写执行之前的方法,执行后的方法。

    package com.panda.common.crontask.web.schedule;

    import com.panda.common.crontask.common.DateUtil;

    import com.panda.common.crontask.service.api.QuartzConfigService;

    import com.panda.common.crontask.service.api.QuartzLogService;

    import com.panda.common.crontask.service.api.dto.QuartzConfigDto;

    import com.panda.common.crontask.service.api.dto.QuartzLogDto;

    import org.quartz.Job;

    import org.quartz.JobExecutionContext;

    import org.slf4j.Logger;

    import org.slf4j.LoggerFactory;

    import org.springframework.beans.factory.annotation.Autowired;

    import java.io.Serializable;

    import java.time.Duration;

    import java.time.LocalDateTime;

    public abstract class JpJobimplements Job, Serializable {

    private static Loggerlogger = LoggerFactory.getLogger(JpJob.class);

        @Autowired

        private QuartzLogServicequartzLogService;

        @Autowired

        private QuartzConfigServicequartzConfigService;

        //任务开始时间

        private ThreadLocalbeforTime =new ThreadLocal<>();

        //日志Id

        private ThreadLocallogId =new ThreadLocal<>();

        public abstract void runJob();

        public void beforeRun(){

    String clazz =this.getClass().getName();

            QuartzLogDto quartzLogDto =new QuartzLogDto();

            quartzLogDto.setClazz(clazz);

            QuartzConfigDto quartzConfigDto =quartzConfigService.findByClazz(clazz);

            if(quartzConfigDto !=null){

    quartzLogDto.setQuartzId(quartzConfigDto.getId());

                quartzLogDto.setName(quartzConfigDto.getName());

            }

    quartzLogDto =quartzLogService.add(quartzLogDto);

            beforTime.set(LocalDateTime.now());

            logId.set(quartzLogDto.getId());

        }

    public void error(Exception e){

    QuartzLogDto quartzLogDto =quartzLogService.get(logId.get());

            Duration between = Duration.between(beforTime.get(), LocalDateTime.now());

            quartzLogDto.setSpendTime(between.toMillis());

            if(e !=null && e.getMessage() !=null)

    quartzLogDto.setExceptionMessage(e.getMessage().length() >500 ? e.getMessage().substring(0, 499) : e.getMessage());

            quartzLogDto.setResult(0);

            quartzLogService.update(logId.get(), quartzLogDto);

        }

    public void afterRun(){

    QuartzLogDto quartzLogDto =quartzLogService.get(logId.get());

            Duration between = Duration.between(beforTime.get(), LocalDateTime.now());

            quartzLogDto.setSpendTime(between.toMillis());

            quartzLogDto.setResult(1);

            quartzLogService.update(logId.get(), quartzLogDto);

        }

    @Override

        public void execute(JobExecutionContext jobExecutionContext) {

    String clazz =this.getClass().getName();

            logger.info("==== 定时任务 "+clazz+" ====> 开启 " + DateUtil.getStringToday());

            beforeRun();

            try {

    runJob();

            }catch (Exception e) {

    logger.info("==== 定时任务 "+clazz+" ====> 异常 "+e.getMessage());

                error(e);

            }finally {

    logger.info("==== 定时任务 "+clazz+" ====> 结束 " + DateUtil.getStringToday());

                afterRun();

            }

    }

    }

    编写JpJobFactory继承从而创建单例Job进行注入。

    package com.panda.common.crontask.web.schedule;

    import org.quartz.spi.TriggerFiredBundle;

    import org.springframework.beans.factory.annotation.Autowired;

    import org.springframework.beans.factory.config.AutowireCapableBeanFactory;

    import org.springframework.scheduling.quartz.AdaptableJobFactory;

    import org.springframework.stereotype.Component;

    @Component

    public class JpJobFactoryextends AdaptableJobFactory {

    @Autowired

        private AutowireCapableBeanFactorycapableBeanFactory;

        @Override

        protected ObjectcreateJobInstance(TriggerFiredBundle bundle)throws Exception {

    // 调用父类的方法

            Object jobInstance =super.createJobInstance(bundle);

            // 进行注入

            capableBeanFactory.autowireBean(jobInstance);

            return jobInstance;

        }

    }

    调度工厂类实现任务配置读取服务,项目启动激活所有定时任务,任务暂停,恢复,以及执行一次任务的方法。

    package com.panda.common.crontask.web.schedule;

    import com.panda.base.service.api.exception.ServiceException;

    import com.panda.common.crontask.service.api.QuartzConfigService;

    import com.panda.common.crontask.service.api.dto.QuartzConfigDto;

    import org.quartz.*;

    import org.slf4j.Logger;

    import org.slf4j.LoggerFactory;

    import org.springframework.beans.factory.annotation.Autowired;

    import org.springframework.scheduling.quartz.SchedulerFactoryBean;

    import org.springframework.stereotype.Component;

    import org.springframework.stereotype.Service;

    import java.util.List;

    /**

    * 调度工厂类

    *

    */

    @Service

    @Component

    public class JpSchedulerFactory {

    private static Loggerlogger = LoggerFactory.getLogger(JpSchedulerFactory.class);

        @Autowired

        SchedulerFactoryBeanschedulerFactoryBean;

        // 任务配置读取服务

        @Autowired

        private QuartzConfigServicequartzConfigService;

        public void scheduleJobs() {

    Scheduler scheduler = getScheduler();

            startJob(scheduler);

        }

    // 获取scheduler

        private SchedulergetScheduler(){

    return schedulerFactoryBean.getScheduler();

        }

    // 项目启动,开启所有激活的任务

        private void startJob(Scheduler scheduler)  {

    try {

    // 获取所有激活的任务

                List jobList =quartzConfigService.findByStatus(1);

                for (QuartzConfigDto config : jobList) {

    Class clazz = (Class) Class.forName(config.getClazz());

                    JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity(config.getId()).build();

                    CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(config.getCron());

                    CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(config.getId())

    .withSchedule(scheduleBuilder).build();

                    scheduler.scheduleJob(jobDetail, cronTrigger);

                }

    }catch (ClassNotFoundException e) {

    e.printStackTrace();

            }catch (SchedulerException e) {

    e.printStackTrace();

            }

    }

    // 任务暂停

        public void pauseJob(String id)throws SchedulerException {

    QuartzConfigDto quartzConfigDto =quartzConfigService.get(id);

            if(quartzConfigDto ==null)throw new ServiceException(502,"不存在的任务");

            JobKey jobKey = JobKey.jobKey(id);

            Scheduler scheduler = getScheduler();

            scheduler.deleteJob(jobKey);

        }

    // 任务恢复

        public void resumeJob(String id)throws SchedulerException, ClassNotFoundException {

    QuartzConfigDto quartzConfigDto =quartzConfigService.get(id);

            if(quartzConfigDto ==null)throw new ServiceException(502,"不存在的任务");

            JobKey jobKey = JobKey.jobKey(id);

            Scheduler scheduler = getScheduler();

            Class clazz = (Class) Class.forName(quartzConfigDto.getClazz());

            JobDetail detail = scheduler.getJobDetail(jobKey);

            if (detail ==null){

    JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity(id).build();

                CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(quartzConfigDto.getCron());

                CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(id).withSchedule(scheduleBuilder).build();

                scheduler.scheduleJob(jobDetail, cronTrigger);

            }else {

    scheduler.resumeJob(jobKey);

            }

    }

    // 执行一次任务

        public void runJob(String id)throws SchedulerException {

    QuartzConfigDto quartzConfigDto =quartzConfigService.get(id);

            if(quartzConfigDto ==null)throw new ServiceException(502,"不存在的任务");

            JobKey jobKey = JobKey.jobKey(id);

            Scheduler scheduler = getScheduler();

            scheduler.triggerJob(jobKey);

        }

    }

    定时任务运行工厂类用来springboot启动监听和注入SchedulerFactoryBean

    package com.panda.common.crontask.web.schedule;

    import org.springframework.beans.factory.annotation.Autowired;

    import org.springframework.context.ApplicationListener;

    import org.springframework.context.annotation.Bean;

    import org.springframework.context.annotation.Configuration;

    import org.springframework.context.event.ContextRefreshedEvent;

    import org.springframework.scheduling.quartz.SchedulerFactoryBean;

    /**

    * 定时任务运行工厂类

    *

    */

    @Configuration

    public class StartSchedulerListenerimplements ApplicationListener {

    @Autowired

        public com.panda.common.crontask.web.schedule.JpSchedulerFactoryjpSchedulerFactory;

        @Autowired

        private com.panda.common.crontask.web.schedule.JpJobFactoryjpJobFactory;

        // springboot 启动监听

        @Override

        public void onApplicationEvent(ContextRefreshedEvent event) {

    jpSchedulerFactory.scheduleJobs();

        }

    //注入SchedulerFactoryBean

        @Bean

        public SchedulerFactoryBeanschedulerFactoryBean() {

    SchedulerFactoryBean schedulerFactoryBean =new SchedulerFactoryBean();

            schedulerFactoryBean.setJobFactory(jpJobFactory);

            return schedulerFactoryBean;

        }

    }

    下面贴出api层代码

    package com.panda.common.crontask.web.api;

    import com.panda.auth.client.Authorization;

    import com.panda.base.service.api.exception.ServiceException;

    import com.panda.base.web.api.jersey.IResponse;

    import com.panda.base.web.api.jersey.security.ISecurityContext;

    import com.panda.base.web.api.response.WebApiResponse;

    import com.panda.common.crontask.common.ApplicationConstants;

    import com.panda.common.crontask.service.api.QuartzConfigService;

    import com.panda.common.crontask.service.api.QuartzLogService;

    import com.panda.common.crontask.service.api.dto.QuartzConfigDto;

    import com.panda.common.crontask.service.api.dto.QuartzLogDto;

    import com.panda.common.crontask.web.schedule.JpSchedulerFactory;

    import io.swagger.annotations.Api;

    import io.swagger.annotations.ApiOperation;

    import io.swagger.annotations.ApiParam;

    import org.slf4j.Logger;

    import org.slf4j.LoggerFactory;

    import org.springframework.beans.factory.annotation.Autowired;

    import org.springframework.data.domain.Page;

    import org.springframework.data.domain.PageRequest;

    import org.springframework.data.domain.Pageable;

    import org.springframework.data.domain.Sort;

    import org.springframework.stereotype.Component;

    import org.springframework.transaction.TransactionStatus;

    import org.springframework.transaction.support.TransactionCallback;

    import org.springframework.transaction.support.TransactionTemplate;

    import javax.annotation.security.RolesAllowed;

    import javax.ws.rs.*;

    import javax.ws.rs.core.Context;

    import javax.ws.rs.core.MediaType;

    import javax.ws.rs.core.SecurityContext;

    import java.time.LocalDateTime;@Path("quartz")

    @Component

    @Produces(MediaType.APPLICATION_JSON)

    @Consumes(MediaType.APPLICATION_JSON)

    @Api(tags ="【公共组件-定时任务】")@RolesAllowed("*")@Authorization(permission ="quartz:*")

    public class QuartzApiimplements ISecurityContext, IResponse {

    private final static Loggerlog = LoggerFactory.getLogger(QuartzApi.class);

        @Autowired

        QuartzConfigServicequartzConfigService;

        @Autowired

        QuartzLogServicequartzLogService;

        //事务模版

        @Autowired

        private TransactionTemplatetransactionTemplate;

        @Autowired

        JpSchedulerFactoryJpSchedulerFactory;

        @ApiOperation(value ="获取任务分页")

    @GET

        public WebApiResponse>getQuartzConfigPage(@Context SecurityContext context,// 如果@Authorization存在,则不可缺少次参数。

    @ApiParam(value ="页码(从0开始),默认0")@QueryParam("pageIndex") Integer pageIndex,

    @ApiParam(value ="页大小,默认10")@QueryParam("pageSize") Integer pageSize,

    @ApiParam(value ="任务名称")@QueryParam("name") String name,

    @ApiParam(value ="任务状态 1-正常,0-停止")@QueryParam("status") Integer status) {

    if (pageIndex ==null) pageIndex =0;

            if (pageSize ==null) pageSize =10;

            Pageable pageable = PageRequest.of(pageIndex, pageSize,Sort.by(Sort.Direction.DESC, "updateTime"));

            return response(quartzConfigService.findQuartzConfigPage(status,name,pageable));

        }

    @ApiOperation(value ="通过Id获取任务详情")

    @GET

        @Path("{id}")

    public WebApiResponsegetQuartzConfigById(@Context SecurityContext context,@ApiParam(required =true, value ="quartzConfig Id")@PathParam("id") String id) {

    return response(quartzConfigService.get(id));

        }

    @ApiOperation(value ="通过类名获取任务详情")

    @GET

        @Path("getByClazz/{clazz}")

    public WebApiResponsegetQuartzConfigByClazz(@Context SecurityContext context,@ApiParam(required =true, value ="clazz")@PathParam("clazz") String clazz ) {

    return response(quartzConfigService.findByClazz(clazz));

        }

    @ApiOperation(value ="添加任务")

    @POST

        @Path("add")

    public WebApiResponseaddQuartzConfig(QuartzConfigDto quartzConfigDto, @Context SecurityContext context) {

    return response(quartzConfigService.addOrUpdateQuartzConfig(quartzConfigDto));

        }

    @ApiOperation(value ="删除任务")

    @POST

        @Path("delete/{id}")

    public WebApiResponsedeleteQuartzConfig(@Context SecurityContext context, @ApiParam(required =true, value ="quartzConfig Id")@PathParam("id") String id) {

    return transactionTemplate.execute(new TransactionCallback() {

    @Override

                public WebApiResponsedoInTransaction(TransactionStatus transactionStatus) {

    try {

    JpSchedulerFactory.pauseJob(id);

                        quartzConfigService.delete(id);

                    }catch (Exception e) {

    log.error("删除任务任务失败!", e);

                        transactionStatus.setRollbackOnly();

                        throw new ServiceException(502,"删除任务失败");

                    }

    log.info("删除任务成功!");

                    return done();

                }

    });

        }

    @ApiOperation(value ="更新任务")

    @POST

        public WebApiResponseupdateQuartzConfig(QuartzConfigDto quartzConfigDto, @Context SecurityContext context) {

    try {

    JpSchedulerFactory.pauseJob(quartzConfigDto.getId());

                quartzConfigDto =quartzConfigService.updateQuartzConfig(quartzConfigDto);

                JpSchedulerFactory.resumeJob(quartzConfigDto.getId());

            }catch (Exception e) {

    log.error("更新任务失败!", e);

                throw new ServiceException(502,"暂停任务失败");

            }

    log.info("更新任务成功!");

            return response(quartzConfigDto);

        }

    @ApiOperation(value ="暂停任务")

    @GET

        @Path("pause/{id}")

    public WebApiResponsepauseQuartzConfig(@Context SecurityContext context, @ApiParam(required =true, value ="quartzConfig Id")@PathParam("id") String id) {

    return transactionTemplate.execute(new TransactionCallback() {

    @Override

                public WebApiResponsedoInTransaction(TransactionStatus transactionStatus) {

    try {

    JpSchedulerFactory.pauseJob(id);

                        quartzConfigService.updateJobStatus(id, ApplicationConstants.PAUSE_CODE);

                    }catch (Exception e) {

    log.error("暂停任务失败!", e);

                        transactionStatus.setRollbackOnly();

                        throw new ServiceException(502,"暂停任务失败");

                    }

    log.info("暂停任务成功!");

                    return done();

                }

    });

        }

    @ApiOperation(value ="恢复任务")

    @GET

        @Path("resume/{id}")

    public WebApiResponseresumeQuartzConfig(@Context SecurityContext context, @ApiParam(required =true, value ="quartzConfig Id")@PathParam("id") String id) {

    return transactionTemplate.execute(new TransactionCallback() {

    @Override

                public WebApiResponsedoInTransaction(TransactionStatus transactionStatus) {

    try {

    JpSchedulerFactory.resumeJob(id);

                        quartzConfigService.updateJobStatus(id,ApplicationConstants.ACTIVE_CODE);

                    }catch (Exception e) {

    log.error("恢复任务失败!", e);

                        transactionStatus.setRollbackOnly();

                        throw new ServiceException(502,"恢复任务失败");

                    }

    log.info("恢复任务成功!");

                    return done();

                }

    });

        }

    @ApiOperation(value ="执行一次任务")

    @GET

        @Path("run/{id}")

    public WebApiResponserunQuartzConfig(@Context SecurityContext context, @ApiParam(required =true, value ="quartzConfig Id")@PathParam("id") String id) {

    try {

    JpSchedulerFactory.runJob(id);

            }catch (Exception e) {

    log.error("执行任务失败!", e);

                throw new ServiceException(502,"执行任务失败");

            }

    log.info("执行任务成功!");

            return done();

        }

    @ApiOperation(value ="获取任务日志分页")

    @GET

        @Path("log")

    public WebApiResponse>getQuartzLogPage(@Context SecurityContext context,@ApiParam(required =false, value ="页码(从0开始),默认0")@QueryParam("pageIndex") Integer pageIndex,@ApiParam(required =false, value ="页大小,默认10")@QueryParam("pageSize") Integer pageSize,@ApiParam(value ="任务Id")@QueryParam("quartzId") String quartzId,@ApiParam(value ="任务名称")@QueryParam("name") String name,@ApiParam(value ="执行结果 1-成功,0-失败")@QueryParam("result") Integer result,@ApiParam(value ="开始时间 yyyy-MM-dd 00:00:00")@QueryParam("startTime") LocalDateTime startTime,

    @ApiParam(value ="结束时间 yyyy-MM-dd 00:00:00")@QueryParam("endTime") LocalDateTime endTime) {

    if (pageIndex ==null) pageIndex =0;

            if (pageSize ==null) pageSize =10;

            Pageable pageable = PageRequest.of(pageIndex, pageSize, Sort.by(Sort.Direction.DESC, "createTime"));

            return response(quartzLogService.findQuartzLogPage(quartzId,name,result,startTime,endTime,pageable));

        }

    }

    执行实例

    package com.panda.common.crontask.web.task;

    import com.panda.common.crontask.web.schedule.JpJob;

    import org.slf4j.Logger;

    import org.slf4j.LoggerFactory;

    import org.springframework.scheduling.annotation.EnableScheduling;

    import org.springframework.stereotype.Component;

    import java.io.Serializable;

    /**

    * 定时任务实现类

    */

    @Component

    @EnableScheduling

    public class ScheduleTaskTestJobextends JpJobimplements Serializable {

    private static Loggerlogger = LoggerFactory.getLogger(ScheduleTaskTestJob.class);

        /**

        * 执行任务

        *

        * @throws RuntimeException

    */

        @Override

        public void runJob()throws RuntimeException {

    //在这里写定时任务执行的内容

            logger.info("==== 定时任务 ScheduleTaskTestJob ====> 执行中...... ");

        }

    }

    以上就是实现定时任务分布式组件的主要代码,写一个run模块集成一下数据库,做一个启动类就可以运行了,然后上传到neuxs做成依赖就可以供其他项目使用了,想看源码可以关注“蛋皮皮”公众号找大熊猫给你源码。

    相关文章

      网友评论

          本文标题:大熊猫分布式组件开发系列教程(三)

          本文链接:https://www.haomeiwen.com/subject/dcnliktx.html