美文网首页
python cron

python cron

作者: 青铜搬砖工 | 来源:发表于2019-03-05 23:01 被阅读0次

python 对于cron表达式支持不是太好,不支持秒级,?与年,在croniter上面封装一层支持标准


from croniter import croniter
from datetime import datetime,timedelta

week_day_dict = {
    1: 7,
    2: 1,
    3: 2,
    4: 3,
    5: 4,
    6: 5,
    7: 6
  }


    """计算定时任务下次运行时间
    sched str: 定时任务时间表达式
    timeFormat str: 格式为"%Y-%m-%d %H:%M"
    queryTimes int: 查询下次运行次数
    """
    try:
        now = datetime.datetime.now()
    except ValueError:
        raise
    else:
        # 以当前时间为基准开始计算
        cron = croniter.croniter(sched, now)
        return [cron.get_next(datetime.datetime).strftime(timeFormat) for i in range(queryTimes)]

class MyCroniter(croniter):
    def __init__(self,expr_format, start_time=None, ret_type=float,
                 day_or=True):
        #秒 分 时 日 月 星期 年
        expr_format = expr_format.strip()
        expr_format = self.get_year(expr_format)
        expr_format = self.get_second(expr_format)
        expr_format = self.get_week(expr_format)
        #如果以?结尾则设置day_or = false 不匹配星期
        if  expr_format.endswith('?'):
            expr_format=expr_format.replace("?","*")
            day_or=False
        else:
            #如果?不在结尾则说明是日期为?
            if "?" in expr_format:
                expr_format=expr_format.replace("?","*")

        self.next_date = None
        self.second_index = 0
        super(MyCroniter,self).__init__(" ".join(expr_format.split(" ")[1:]), datetime.now()  , ret_type,day_or)

    def get_next_second(self, ret_type=None):
        #如果秒数为*,则每次next加一秒,直到59秒调用self._get_next
        #否则直接在秒位加上 应该的秒数
        try:
            if len(self.second)==1 and self.second[0].isdigit():
                self.next_date = self._get_next(ret_type or self._ret_type, is_prev=False)
                self.next_date += + timedelta(seconds=int(self.second[0]))
            else:
                if not self.next_date:
                    self.next_date = self._get_next(ret_type or self._ret_type, is_prev=False)
                if "all" in self.second:
                    if self.next_date.strftime("%S") =='59':
                        self.next_date = self._get_next(ret_type or self._ret_type, is_prev=False)
                    else:
                        self.next_date = self.next_date + timedelta(seconds=1)
                else:
                    if int(self.next_date.strftime("%S")) >=int(self.second[-1]):
                        self.next_date = self._get_next(ret_type or self._ret_type, is_prev=False)
                        self.second_index = 0
                        # self.next_date = self.next_date + timedelta(seconds=int(self.second[self.second_index]))
                    # else:
                    if self.second_index ==0:
                        self.next_date = self.next_date + timedelta(seconds=int(self.second[self.second_index]))
                    else:
                        self.next_date = self.next_date-timedelta(seconds=int(self.second[self.second_index-1]))
                        self.next_date = self.next_date + timedelta(seconds=int(self.second[self.second_index]))
                    self.second_index +=1

        except Exception as e:
            print("格式错误 : {}".format(e))


    def get_year(self,expr_format):
        if len(expr_format.split(" "))==6: #如果缺少 年  就相当于年为*
            self.year = ["all"]
        elif expr_format.endswith('*')  :#任意年都可以
            expr_format = " ".join(expr_format.split(" ")[:-1])
            self.year = ["all"]
        else:
            self.year = expr_format.split(" ")[-1]
            expr_format = " ".join(expr_format.split(" ")[:-1])
            if "-" in self.year:
                temp_year = self.year.split("-")#例如 "2019-2021" 》》 [2019 ,2020, 2021]
                self.year = list()
                for year in range(int(temp_year[0]),int(temp_year[1])+1):
                    self.year.append(str(year))
            elif "," in self.year:#例如 "2019,2020 " 》》 [2019 ,2029]
                self.year = self.year.split(",")
            elif "/" in self.year:
                temp_year = self.year.split("/")#例如 "2019/3 " 》》 [2019 ,2021,2024,...]
                self.year = list()
                for year in range(int(temp_year[0]), 2100,int(temp_year[1])):
                    self.year.append(str(year))
            else :
                self.year = [self.year]

        return expr_format

    def get_second(self,expr_format):
        if expr_format.startswith('*')  :
            self.second = ["all"]
        else:
            self.second = expr_format.split(" ")[0]
            if "-" in self.second:
                temp_second = self.second.split("-")
                self.second = list()
                for second in range(int(temp_second[0]),int(temp_second[1])+1):
                    self.second.append(str(second))
            elif "," in self.second:
                self.second = self.second.split(",")
            elif "/" in self.second:
                temp_second = self.second.split("/")
                self.second = list()
                for second in range(int(temp_second[0]), 59,int(temp_second[1])):
                    self.second.append(str(second))
            else :
                self.second = [self.second]

        return expr_format

    def get_week(self,expr_format):
        global week_day_dict
        week = expr_format.split(" ")[-1]
        result =""
        for item in week:
            if item.isdigit():
                result +=str(week_day_dict[int(item)])
            else:
                result+=item
        temp_list = expr_format.split(" ")
        temp_list[-1] = result
        return " ".join(temp_list)




    def get_next(self,ret_type=None):
        self.get_next_second(ret_type)
        if "all" in self.year:
             return  self.next_date
        else:
            while True:
                if self.next_date.strftime("%Y") in self.year:
                    return self.next_date
                else:
                    if self.next_date.strftime("%Y")>self.year[-1]:
                        return None
                    else:
                        self.get_next_second(ret_type)
iter = None
CRON = ""
def cron_to_date(cron):
    # 0 0 12 ? * 3 : 秒 分 时 日 月 星期
    # tmp = cron.replace('?', '*')

    global iter
    global CRON
    if not iter or CRON !=cron:
        iter = MyCroniter(cron)
        CRON = cron
    next_time = iter.get_next(datetime)
    if next_time:
        return next_time
    else:
        return ''


if __name__ == '__main__':

    # for i in range(100):
    #
    #     if i >50:
    #         print(cron_to_date("0/6 * * * * ?"))
    #     else:
    #         print(cron_to_date("0/5 * * * * ?"))

    # print(cron_to_date("0 59 17 6 * ?"))
    for i in range(100):
        print(cron_to_date("56 0/2 20 6 * ?"))
    # print(cron_to_date("0/5 * * * * ?"))

相关文章

网友评论

      本文标题:python cron

      本文链接:https://www.haomeiwen.com/subject/ejoauqtx.html