Skip to content

Instantly share code, notes, and snippets.

@chenhan1218
Last active March 17, 2022 04:39
Show Gist options
  • Save chenhan1218/9a65d7cbcae643607238de60963a3569 to your computer and use it in GitHub Desktop.
Save chenhan1218/9a65d7cbcae643607238de60963a3569 to your computer and use it in GitHub Desktop.
import base64
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.ssh.operators.ssh import SSHOperator
default_args = {
'start_date': datetime(2022, 3, 1),
'retries': 0
}
def _decode_message(task_name, ti):
message = ti.xcom_pull(task_ids=task_name)
ret = base64.b64decode(message).decode().rstrip()
return ret
with DAG(dag_id='sshoperator_xcom',
schedule_interval='0 * * * *',
default_args=default_args,
catchup=False) as dag:
command = SSHOperator(task_id="command",
command="date",
ssh_conn_id="conn_id",
do_xcom_push=True)
decode = PythonOperator(
task_id='decode',
python_callable=_decode_message,
op_args = [
'command'
]
)
print = BashOperator(
task_id='print',
bash_command="echo {{ ti.xcom_pull(task_ids='decode') }}"
)
command >> decode >> print
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment