美文网首页
AIRFLOW-SLEEK 安装

AIRFLOW-SLEEK 安装

作者: 山猪打不过家猪 | 来源:发表于2024-02-03 05:14 被阅读0次

0. 课程地址

  • windows 安装官方docker
    1.去官方网站找airflow的yml文件
https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html
  1. 查看yml我们需要3个文件夹,dags,logs.plungins,vscode中快捷键打开terminalCtrl + `
mkdir dags
  1. compose-docker
  2. 如果需要关闭docker直接使用
docker compose down
  1. 创建admin用户

  • docker 占用c盘大迁移
  1. 关闭所有发行版:wsl --shutdown
  2. 导入现有文件到指定目录
wsl --export docker-desktop-data D:\DockerDeskTopData\docker-desktop-data.tar
  1. 注销当前docker data
wsl --unregister docker-desktop-data

4.重新导入

wsl --import docker-desktop-data D:\DockerDeskTopData\ D:\DockerDeskTopData\docker-desktop-data.tar --version 2

1.认识airflow

  • 什么是airflow:用来创建调度和监控工作流程的框架
  • 什么是DAG = workflow: D(directed): 人物之间有明确的依赖关系指向;A(Acyclic):no cycles or loopd,不能完成任务后,又自动回到起始位置;G(graph):由节点和边组成的图表
    下面是一个DAG的例子:


    image.png
  • airflow的结构


    image.png

    webserver: 是用户的界面,管理和监控任务
    metadata base:存储相关任务的流程代码和工作流的状态
    schedular:调度工作流
    executor:接受schedular的工作流,调度和分配任务
    workers: 用来执行工作流的机器

2. airflow的生命周期

image.png
  • a happy workflow execution process


    image.png
  • 流程发布的过程


    image.png

3.创建第一个dag

from datetime import datetime,timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator


_default_args= {
    'owner':'lg',
    'retries':'5',
    'retry_delay':timedelta(minutes=3)

}

with DAG(
    dag_id = 'our_first_dag_lg_v5',
    default_args= _default_args,
    start_date = datetime(2024,2,3,1),
    schedule_interval = '@daily'
) as dag:
    task1 = BashOperator(
        task_id = '1_task',
        bash_command = 'echo hello world!'
    )

    task2 = BashOperator(
        task_id = '2_task',
        bash_command="echo I am task 2"
    )

    task3 = BashOperator(
        task_id = '3_task',
        bash_command = "echo I am task3"
    )
    #task1
    # task1 >> task2 >> task3
    task1>>[task2,task3]

4. 运行python文件

  • 无返回值的
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator

_default_args= {
    'owner':'lg',
    'retries':'5',
    'retry_delay':timedelta(minutes=3)

}

def great(name,age):
    # print('hello world!')
    print(f"Hello World! My name is {name},I am {age}"
          )


with DAG(
    dag_id = 'dag_python_v4',
    default_args= _default_args,
    start_date = datetime(2024,2,4),
    schedule_interval = '@daily'

)as dag:
    task1 = PythonOperator(
        task_id = 'greate',
        python_callable=great,
        #用来传参数
        op_kwargs={'name':'fxx','age':20}
    )

    task1
  • 有返回值,且有依赖关系的
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator

_default_args= {
    'owner':'lg',
    'retries':'5',
    'retry_delay':timedelta(minutes=3)

}

def great(ti,age):
    #将task2返回的值注册过来
    name = ti.xcom_pull(task_ids = 'get_name')
    print(f"Hello World! My name is {name} +{age}"
          )

def get_name():
    return 'FXX'

with DAG(
    dag_id = 'dag_python_v6',
    default_args= _default_args,
    start_date = datetime(2024,2,4),
    schedule_interval = '@daily'

)as dag:
    task1 = PythonOperator(
        task_id = 'great',
        python_callable=great,
        #用来传参数
        op_kwargs={'age':20}
    )

    task2 = PythonOperator(
        task_id = 'get_name',
        python_callable = get_name
    )
    #task2执行完才能执行task1
    task2>>task1
  • 多个参数的传递,使用push以 key value的形式
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator

_default_args= {
    'owner':'lg',
    'retries':'5',
    'retry_delay':timedelta(minutes=3)

}

def great(ti,age):
    #将task2返回的值注册过来
    fist_name = ti.xcom_pull(task_ids = 'get_name',key = 'first_name')
    last_name = ti.xcom_pull(task_ids = 'get_name',key = 'last_name')

    print(f"Hello World! My name is {fist_name} +{last_name}"
          )

def get_name(ti):
    ti.xcom_push(key= 'first_name',value= 'Pjj')
    ti.xcom_push(key= 'last_name',value= 'zzz')

with DAG(
    dag_id = 'dag_python_v6',
    default_args= _default_args,
    start_date = datetime(2024,2,4),
    schedule_interval = '@daily'

)as dag:
    task1 = PythonOperator(
        task_id = 'great',
        python_callable=great,
        #用来传参数
        op_kwargs={'age':20}
    )

    task2 = PythonOperator(
        task_id = 'get_name',
        python_callable = get_name
    )
    #task2执行完才能执行task1
    task2>>task1

5. 使用taskflow快速创建

from airflow.decorators import dag,task
from datetime import datetime, timedelta


_default_args= {
    'owner':'lg',
    'retries':'5',
    'retry_delay':timedelta(minutes=3)

}

@dag(
   dag_id='dag_with_taskflow_01',
    default_args= _default_args,
    start_date = datetime(2024,2,4),
    schedule_interval = '@daily'     
)
def hello_world_etl():
    
    @task()
    def get_name():
        return 'fxx'
    @task()
    def get_age():
        return 10
    
    @task()
    def greet(name,age):
        print(f'hello! My name is {name} and I am {age}.')

    
    name = get_name()
    age =  get_age()
    greet(name = name,age= age)

greet_dag = hello_world_etl()

6. run task in the past

方法1:更改默认参数catchup = True
方法2:使用backfill

image.png

7. install python packages in docker

  1. 创建需要安装的requirements.txt文件, 注意一定不要加空格在等于号左右
scikit-learn==0.24.2
matplotlib==3.3.3

2.编写Dockerfile

FROM apache/airflow:2.8.1
COPY requirements.txt requirements.txt
RUN pip install --user --upgrade pip
RUN pip install -r requirements.txt
  1. 控制台build一个名为extending_airflow:latest的新image
docker build . --tag extending_airflow:latest

4.删除之前的images

docker rmi airflow

5.compose up 后台运行

docker build . --tag extending_airflow_dbt:latest

相关文章

网友评论

      本文标题:AIRFLOW-SLEEK 安装

      本文链接:https://www.haomeiwen.com/subject/kczzodtx.html