钉钉开发者文档
https://ding-doc.dingtalk.com/doc#/serverapi2/qf2nxq
1、创建 自定义机器人,获取webhook。我这里安全设置选了关键词方式。
![](https://img.haomeiwen.com/i7869465/d50711254f7a8aea.png)
![](https://img.haomeiwen.com/i7869465/c18810fd112a3b30.png)
![](https://img.haomeiwen.com/i7869465/8a310480e724b881.png)
至此我们获取webhook, https://oapi.dingtalk.com/robot/send?access_token=14c405e24d249e6f43a4b0e4d70f59dd0bb92be349a9e597a42a93cbaxxxxxxx
2、创建airflow的连接
![](https://img.haomeiwen.com/i7869465/55d94f917dd2cdce.png)
![](https://img.haomeiwen.com/i7869465/d07aaf5736de358e.png)
3、编写dag.py
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.dingding_operator import DingdingOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['xxx@163.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
dag = DAG(
'dingding-test',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval='12 * * * *',
)
dd=DingdingOperator(
task_id='dingding',
dingding_conn_id='dingding_default',
message_type='text',
message='DingTalk airflow dingding test yanxin',
at_mobiles=['钉钉手机号'],
dag=dag,
)
ninecho = BashOperator(
task_id='ninecho',
bash_command='echo the-9 赛高',
dag=dag,
)
ninecho >> dd
![](https://img.haomeiwen.com/i7869465/43401f230c0e3656.png)
网友评论