美文网首页
删除azkaban的执行历史

删除azkaban的执行历史

作者: 飞有飞言 | 来源:发表于2021-06-12 22:37 被阅读0次
    image.png

    azkaban是一款工作流调度工具,由Linkedin开发并开源给社区。

    azkaban保留了task,flow的执行历史。每个任务的标准输出(stdout)、标准错误输出(stderr)都会先暂时存到日志文件里,同时也存储到后端数据库中,一般是mysql。

    azkaban通常用在大数据任务调度场景,把任务提交之后,如果任务是spark,hive,hadoop,flink等任务的话,会产生大量的日志输出。这些日志如果长时间存在mysql中,会让mysql的数据库过大,占用大量的磁盘空间。
    所以就需要能够定时清除mysql中存储的执行历史的功能。

    为此单独开发一个python脚本来清除执行历史。azkaban中有3个表和执行历史有关execution_logs, execution_jobs, execution_flows

    因为最近的日志还是要看的,在任务执行有问题的时候还需要查看,所以保留最近执行的历史。

    mysql的连接信息在azkaban的properties文件中保存的有,所以直接读了azkaban的properties文件。

    #!/usr/bin/env python
    # coding:utf-8
    
    
    import sys
    import pymysql
    import time
    
    
    # 清理30天之前的,每次清理3000条
    sql_find_max_exec_id = '''select exec_id, from_unixtime(cast(submit_time/1000 as int)) from execution_flows 
    where submit_time < unix_timestamp(date_sub(current_timestamp(), interval 30 day)) * 1000
    order by exec_id asc limit 1 offset 3000
    '''
    
    tables_to_clear = ['execution_logs', 'execution_jobs', 'execution_flows']
    
    sql_clear_execution = 'delete from %s where exec_id < %s'
    
    def read_props(filepath):
        d = {}
        with open(filepath) as fp:
            for line in fp:
                line = line.strip()
                if not line or line.startswith('#'): continue
                t = line.split('=')
                d[t[0].strip()] = t[1].strip()
    
        return d
    
    
    '''
    mysql.port=3306
    mysql.host=xxxx
    mysql.database=azkaban
    mysql.user=azkaban
    mysql.password=aaaaaaaaaa
    '''
    props = read_props(sys.argv[1])
    
    
    dbconn = pymysql.connect(host=props['mysql.host'], user=props['mysql.user'], 
        password=props['mysql.password'], database=props['mysql.database'],
        port=int(props.get('mysql.port', 3306)))
    
    
    cursor = dbconn.cursor()
    print('\n\n%s start' % time.strftime('%Y-%m-%d %H:%M'))
    
    
    try:
        cursor.execute(sql_find_max_exec_id)
        max_exec_id, submit_time = cursor.fetchone()
    
        print('clear logs, max exec_id %d submit_time %s' % (max_exec_id, submit_time))
    
        for tbl in tables_to_clear:
            sql = sql_clear_execution % (tbl, max_exec_id)
            affected = cursor.execute(sql)
            print('delete from table %20s, %8d records deleted' % (tbl, affected))
        dbconn.commit()
    except Exception, e:
        dbconn.rollback()
        raise e
    finally:
        cursor.close()
        dbconn.close()
    
    

    相关文章

      网友评论

          本文标题:删除azkaban的执行历史

          本文链接:https://www.haomeiwen.com/subject/pwzieltx.html