MySqlOperator
研究Airflow的主意用途是寻找一个ETL调度工具,可以定时调度MySql的存储过程。
Airflow提供了MySqlOperator,可以执行SQL语句,具体使用样例如下:
def mysql_operator_test(self):
sql = """
CREATE TABLE IF NOT EXISTS test_airflow (dummy VARCHAR(50));
"""
t = operators.MySqlOperator(
task_id='basic_mysql',
sql=sql,
mysql_conn_id='airflow_db',
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def mysql_operator_test_multi(self):
sql = [
"TRUNCATE TABLE test_airflow",
"INSERT INTO test_airflow VALUES ('X')",
]
t = operators.MySqlOperator(
task_id='mysql_operator_test_multi',
mysql_conn_id='airflow_db',
sql=sql, dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_mysql_to_mysql(self):
sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES L 100;"
t = operators.GenericTransfer(
task_id='test_m2m',
preoperator=[
"DROP TABLE IF EXISTS test_mysql_to_mysql",
"CREATE TABLE IF NOT EXISTS "
"test_mysql_to_mysql LIK INFORMATION_SCHEMA.TABLES"
],
source_conn_id='airflow_db',
destination_conn_id='airflow_db',
destination_table="test_mysql_to_mysql",
sql=sql,
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_sql_sensor(self):
t = operators.SqlSensor(
task_id='sql_sensor_check',
conn_id='mysql_default',
sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES",
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_D force=True)
网友评论