Skip to content

Instantly share code, notes, and snippets.

@KevinYang21
Created May 24, 2018 09:19
Show Gist options
  • Save KevinYang21/a89e37b82b5fc58a2e189d8ee23e61e4 to your computer and use it in GitHub Desktop.
Save KevinYang21/a89e37b82b5fc58a2e189d8ee23e61e4 to your computer and use it in GitHub Desktop.
test DAG
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import timedelta
from airflow.hooks.hive_hooks import HiveCliHook, HiveServer2Hook
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.hive_operator import HiveOperator
from airflow.operators.python_operator import PythonOperator
import airflow
args = {
'owner': 'airflow',
'retries': 100,
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
dag_id='kevin_hive_s2_test', default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60))
hql = "set airflow.ctx.dag_id;\nset airflow.ctx.dag_run_id;\n" \
"set airflow.ctx.task_id;\nset airflow.ctx.execution_date;\n"
def dummy_hive_callback(**kwargs):
hive_s2_hook = HiveServer2Hook('hiveserver2_silver')
dag_id = hive_s2_hook.get_records(hql='set airflow.ctx.dag_id')
dag_run_execution_date = \
hive_s2_hook.get_records(hql='set airflow.ctx.dag_run_id')
task_id = hive_s2_hook.get_records(hql='set airflow.ctx.task_id')
execution_date = \
hive_s2_hook.get_records(hql='set airflow.ctx.execution_date')
print(dag_id)
print(dag_run_execution_date)
print(task_id)
print(execution_date)
hive_cli_hook = HiveCliHook('hive_cli_default')
dag_id = hive_cli_hook.run_cli(hql='set airflow.ctx.dag_id')
dag_run_execution_date = \
hive_cli_hook.run_cli(hql='set airflow.ctx.dag_run_id')
task_id = hive_cli_hook.run_cli(hql='set airflow.ctx.task_id')
execution_date = \
hive_cli_hook.run_cli(hql='set airflow.ctx.execution_date')
print(dag_id)
print(dag_run_execution_date)
print(task_id)
print(execution_date)
PythonOperator(
task_id='hive_in_python_op',
dag=dag,
python_callable=dummy_hive_callback
)
BashOperator(
task_id='echo_env_vars',
dag=dag,
bash_command="echo $AIRFLOW_CTX_DAD_ID;"
"echo $AIRFLOW_CTX_TASK_ID;"
"echo $AIRFLOW_CTX_EXECUTION_DATE;"
"echo $AIRFLOW_CTX_DAG_RUN_ID;"
"echo $AIRFLOW_HOME"
)
HiveOperator(
task_id='hive_cli',
dag=dag,
hql=hql,
hive_cli_conn_id="hive_cli_default",
provide_context=True
)
HiveOperator(
task_id='beeline',
dag=dag,
hql=hql,
hive_cli_conn_id="hiveserver2_bronze_alejandro",
provide_context=True
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment