美文网首页
删除azkaban的执行历史

删除azkaban的执行历史

作者: 飞有飞言 | 来源:发表于2021-06-12 22:37 被阅读0次
image.png

azkaban是一款工作流调度工具,由Linkedin开发并开源给社区。

azkaban保留了task,flow的执行历史。每个任务的标准输出(stdout)、标准错误输出(stderr)都会先暂时存到日志文件里,同时也存储到后端数据库中,一般是mysql。

azkaban通常用在大数据任务调度场景,把任务提交之后,如果任务是spark,hive,hadoop,flink等任务的话,会产生大量的日志输出。这些日志如果长时间存在mysql中,会让mysql的数据库过大,占用大量的磁盘空间。
所以就需要能够定时清除mysql中存储的执行历史的功能。

为此单独开发一个python脚本来清除执行历史。azkaban中有3个表和执行历史有关execution_logs, execution_jobs, execution_flows

因为最近的日志还是要看的,在任务执行有问题的时候还需要查看,所以保留最近执行的历史。

mysql的连接信息在azkaban的properties文件中保存的有,所以直接读了azkaban的properties文件。

#!/usr/bin/env python
# coding:utf-8


import sys
import pymysql
import time


# 清理30天之前的,每次清理3000条
sql_find_max_exec_id = '''select exec_id, from_unixtime(cast(submit_time/1000 as int)) from execution_flows 
where submit_time < unix_timestamp(date_sub(current_timestamp(), interval 30 day)) * 1000
order by exec_id asc limit 1 offset 3000
'''

tables_to_clear = ['execution_logs', 'execution_jobs', 'execution_flows']

sql_clear_execution = 'delete from %s where exec_id < %s'

def read_props(filepath):
    d = {}
    with open(filepath) as fp:
        for line in fp:
            line = line.strip()
            if not line or line.startswith('#'): continue
            t = line.split('=')
            d[t[0].strip()] = t[1].strip()

    return d


'''
mysql.port=3306
mysql.host=xxxx
mysql.database=azkaban
mysql.user=azkaban
mysql.password=aaaaaaaaaa
'''
props = read_props(sys.argv[1])


dbconn = pymysql.connect(host=props['mysql.host'], user=props['mysql.user'], 
    password=props['mysql.password'], database=props['mysql.database'],
    port=int(props.get('mysql.port', 3306)))


cursor = dbconn.cursor()
print('\n\n%s start' % time.strftime('%Y-%m-%d %H:%M'))


try:
    cursor.execute(sql_find_max_exec_id)
    max_exec_id, submit_time = cursor.fetchone()

    print('clear logs, max exec_id %d submit_time %s' % (max_exec_id, submit_time))

    for tbl in tables_to_clear:
        sql = sql_clear_execution % (tbl, max_exec_id)
        affected = cursor.execute(sql)
        print('delete from table %20s, %8d records deleted' % (tbl, affected))
    dbconn.commit()
except Exception, e:
    dbconn.rollback()
    raise e
finally:
    cursor.close()
    dbconn.close()

相关文章

  • 删除azkaban的执行历史

    azkaban是一款工作流调度工具,由Linkedin开发并开源给社区。 azkaban保留了task,flow的...

  • 9.Azkaban2.5安装部署

    1 安装前准备 1.1 将Azkaban Web服务器、Azkaban执行服务器、Azkaban的sql执行...

  • AZKABAN(二)使用

    1、启动azkaban 启动azkaban server。进入executor,执行: bin/ azkaban-...

  • Azkaban安装部署

    安装前准备 将Azkaban Web服务器、Azkaban执行服务器、Azkaban的sql执行脚本及MySQL安...

  • Azkaban安装配置

    1.1 安装前准备 1.将Azkaban Web服务器、Azkaban执行服务器、Azkaban的sql执行脚本及...

  • 二Azkaban安装部署

    2.1 安装前准备1)将Azkaban Web服务器、Azkaban执行服务器、Azkaban的sql执行脚本及M...

  • 2018-07-09

    工作情况: 1、关于Azkaban 设定执行时间的问题,如何设置按奇偶小时执行(拓展:Azkaban的定时任务的详...

  • 五、Azkaban源码总体分析

    一、概述 Azkaban的执行代码主要分为四大模块: (1)azkaban-webserver:主要提供web界面...

  • 数仓--Azkaban3-mysql中元数据库

    面试被问到Azkaban的job历史信息存在那张表中 概述 Azkaban3 共 15 张表,分别用于存储任务调度...

  • 利用Azkaban来完成大数据的任务调度

    Azkaban简介Azkaban的架构Azkaban做什么Azkaban安装mysqlCreate a datab...

网友评论

      本文标题:删除azkaban的执行历史

      本文链接:https://www.haomeiwen.com/subject/pwzieltx.html