Last active
July 14, 2017 04:04
-
-
Save yssharma/644789c424a0714bda91d02ed8d4d952 to your computer and use it in GitHub Desktop.
Airflow dag for range runs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Here we look if there were any | |
# _MANUAL_OVERRIDE_START_DATE or _MANUAL_OVERRIDE_END_DATE passed | |
# in config, else, it falls back to yesterday | |
# for daily runs. | |
START = "{{dag_run.conf.get('_MANUAL_OVERRIDE_START_DATE', macros.ds_add(ds, -1)) if dag_run.conf else macros.ds_add(ds, -1)}}" | |
END = "{{dag_run.conf.get('_MANUAL_OVERRIDE_END_DATE', ds) if dag_run.conf else ds}}" | |
query = """ | |
select | |
day, | |
unix_time, | |
username, | |
product, | |
attribute | |
from huge_table | |
where attribute like '%interesting%' | |
and day >= '{START}' and day <= '{END}' | |
""".format(START=START, END=END) | |
dag = airflow.DAG("InterestingData", schedule_interval="10 * * * *", max_active_runs=1) | |
# This is my HiveOperator. | |
# Should be fine if you use | |
# you custom operator for hive/spark | |
hive_operator = airflow.operators.HiveOperator( | |
task_id='get_interesting_data', | |
hql=query, | |
hive_conn_id='emr_521_hive', | |
dag=dag) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment