-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathtest_subdag_operator.py
More file actions
60 lines (43 loc) · 1.46 KB
/
test_subdag_operator.py
File metadata and controls
60 lines (43 loc) · 1.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.bash_operator import BashOperator
from airflow.hooks.S3_hook import S3Hook
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2020, 9, 7),
"email": ["mikaela.pisani@rootstrap.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5)
}
def hello(file):
print('Hello!!!! ', file)
def loop_files(parent_dag_name, child_dag_name, args):
dag_subdag = DAG(
dag_id='{0}.{1}'.format(parent_dag_name, child_dag_name),
default_args=args,
schedule_interval="@once",
)
tasks = []
for i in range (5):
tasks = tasks + [PythonOperator(
task_id='hello_world' + str(i),
op_kwargs={'file': str(i)},
python_callable=hello,
dag=dag_subdag)]
return dag_subdag
dag = DAG("subdagtest2", default_args=default_args, schedule_interval= '@once')
start_op = BashOperator(
task_id='bash_test',
bash_command='echo "Starting TEST"',
dag=dag )
loop_files = SubDagOperator(
task_id='loop_files',
subdag=loop_files('subdagtest2', 'loop_files', default_args),dag=dag
)
start_op >> loop_files