默认$AIRFLOW_HOME/plugins存放定义的plugins,自定义组件。可以自定义operator,hook等等。我们希望可以直接使用这种模式定义机器学习的一个算子。下面定义了一个简单的加法算子。
# -*- coding: UTF-8 -*-
# !/usr/bin/env python
from airflow.plugins_manager import AirflowPlugin
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
# Will show up under airflow.operators.plus_plugin.PluginOperator
class PlusOperator(BaseOperator):
@apply_defaults
def __init__(self, op_args=None, params=None, provide_context=False, set_context=False, *args, **kwargs):
super(PlusOperator, self).__init__(*args, **kwargs)
self.params = params or {}
self.set_context = set_context
def execute(self, context):
if self.provide_context:
context.update(self.op_kwargs)
self.op_kwargs = context
puls = self.op_kwargs['a'] + self.op_kwargs['b']
print "a =", self.op_kwargs['a'], ". b=", self.op_kwargs['a']
return_value = self.main()
context[self.task_id].xcom_push(key='return_value', value=return_value)
return puls
# Defining the plugin class
class PlusPlugin(AirflowPlugin):
name = "plus_plugin"
operators = [PlusOperator]
在dag中使用案例如下
from airflow.operators.plus_plugin import PlusOperator
plus_task = PlusOperator(task_id='plus_task', provide_context=True, params={'a': 1,'b':2},dag=dag)
网友评论