0. 课程地址
- windows 安装官方docker
1.去官方网站找airflow的yml文件
https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html
- 查看yml我们需要3个文件夹,dags,logs.plungins,vscode中快捷键打开terminal
Ctrl +
`
mkdir dags
- compose-docker
- 如果需要关闭docker直接使用
docker compose down
- 创建admin用户
- docker 占用c盘大迁移
- 关闭所有发行版:wsl --shutdown
- 导入现有文件到指定目录
wsl --export docker-desktop-data D:\DockerDeskTopData\docker-desktop-data.tar
- 注销当前docker data
wsl --unregister docker-desktop-data
4.重新导入
wsl --import docker-desktop-data D:\DockerDeskTopData\ D:\DockerDeskTopData\docker-desktop-data.tar --version 2
1.认识airflow
- 什么是airflow:用来创建调度和监控工作流程的框架
-
什么是DAG = workflow: D(directed): 人物之间有明确的依赖关系指向;A(Acyclic):no cycles or loopd,不能完成任务后,又自动回到起始位置;G(graph):由节点和边组成的图表
下面是一个DAG的例子:
image.png
-
airflow的结构
image.png
webserver: 是用户的界面,管理和监控任务
metadata base:存储相关任务的流程代码和工作流的状态
schedular:调度工作流
executor:接受schedular的工作流,调度和分配任务
workers: 用来执行工作流的机器
2. airflow的生命周期

-
a happy workflow execution process
image.png
-
流程发布的过程
image.png
3.创建第一个dag
from datetime import datetime,timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
_default_args= {
'owner':'lg',
'retries':'5',
'retry_delay':timedelta(minutes=3)
}
with DAG(
dag_id = 'our_first_dag_lg_v5',
default_args= _default_args,
start_date = datetime(2024,2,3,1),
schedule_interval = '@daily'
) as dag:
task1 = BashOperator(
task_id = '1_task',
bash_command = 'echo hello world!'
)
task2 = BashOperator(
task_id = '2_task',
bash_command="echo I am task 2"
)
task3 = BashOperator(
task_id = '3_task',
bash_command = "echo I am task3"
)
#task1
# task1 >> task2 >> task3
task1>>[task2,task3]
4. 运行python文件
- 无返回值的
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
_default_args= {
'owner':'lg',
'retries':'5',
'retry_delay':timedelta(minutes=3)
}
def great(name,age):
# print('hello world!')
print(f"Hello World! My name is {name},I am {age}"
)
with DAG(
dag_id = 'dag_python_v4',
default_args= _default_args,
start_date = datetime(2024,2,4),
schedule_interval = '@daily'
)as dag:
task1 = PythonOperator(
task_id = 'greate',
python_callable=great,
#用来传参数
op_kwargs={'name':'fxx','age':20}
)
task1
- 有返回值,且有依赖关系的
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
_default_args= {
'owner':'lg',
'retries':'5',
'retry_delay':timedelta(minutes=3)
}
def great(ti,age):
#将task2返回的值注册过来
name = ti.xcom_pull(task_ids = 'get_name')
print(f"Hello World! My name is {name} +{age}"
)
def get_name():
return 'FXX'
with DAG(
dag_id = 'dag_python_v6',
default_args= _default_args,
start_date = datetime(2024,2,4),
schedule_interval = '@daily'
)as dag:
task1 = PythonOperator(
task_id = 'great',
python_callable=great,
#用来传参数
op_kwargs={'age':20}
)
task2 = PythonOperator(
task_id = 'get_name',
python_callable = get_name
)
#task2执行完才能执行task1
task2>>task1
- 多个参数的传递,使用push以 key value的形式
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
_default_args= {
'owner':'lg',
'retries':'5',
'retry_delay':timedelta(minutes=3)
}
def great(ti,age):
#将task2返回的值注册过来
fist_name = ti.xcom_pull(task_ids = 'get_name',key = 'first_name')
last_name = ti.xcom_pull(task_ids = 'get_name',key = 'last_name')
print(f"Hello World! My name is {fist_name} +{last_name}"
)
def get_name(ti):
ti.xcom_push(key= 'first_name',value= 'Pjj')
ti.xcom_push(key= 'last_name',value= 'zzz')
with DAG(
dag_id = 'dag_python_v6',
default_args= _default_args,
start_date = datetime(2024,2,4),
schedule_interval = '@daily'
)as dag:
task1 = PythonOperator(
task_id = 'great',
python_callable=great,
#用来传参数
op_kwargs={'age':20}
)
task2 = PythonOperator(
task_id = 'get_name',
python_callable = get_name
)
#task2执行完才能执行task1
task2>>task1
5. 使用taskflow快速创建
from airflow.decorators import dag,task
from datetime import datetime, timedelta
_default_args= {
'owner':'lg',
'retries':'5',
'retry_delay':timedelta(minutes=3)
}
@dag(
dag_id='dag_with_taskflow_01',
default_args= _default_args,
start_date = datetime(2024,2,4),
schedule_interval = '@daily'
)
def hello_world_etl():
@task()
def get_name():
return 'fxx'
@task()
def get_age():
return 10
@task()
def greet(name,age):
print(f'hello! My name is {name} and I am {age}.')
name = get_name()
age = get_age()
greet(name = name,age= age)
greet_dag = hello_world_etl()
6. run task in the past
方法1:更改默认参数catchup = True
方法2:使用backfill

7. install python packages in docker
- 创建需要安装的
requirements.txt
文件, 注意一定不要加空格在等于号左右
scikit-learn==0.24.2
matplotlib==3.3.3
2.编写Dockerfile
FROM apache/airflow:2.8.1
COPY requirements.txt requirements.txt
RUN pip install --user --upgrade pip
RUN pip install -r requirements.txt
- 控制台build一个名为extending_airflow:latest的新image
docker build . --tag extending_airflow:latest
4.删除之前的images
docker rmi airflow
5.compose up
后台运行
docker build . --tag extending_airflow_dbt:latest
网友评论