Last active
September 26, 2023 03:13
-
-
Save victorouse/8f2ebe970949bd6c2af16b1ea35ff7ab to your computer and use it in GitHub Desktop.
Custom Slack operator for Airflow DAGs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# dags/operators/custom_slack_operator.py | |
from airflow.providers.slack.operators.slack import SlackAPIPostOperator | |
from utils.slack_message import SlackMessage, Attachment, Block | |
class CustomSlackOperator(SlackAPIPostOperator): | |
github_url = "https://github.com/your-org/your-repo" | |
airflow_webserver_url = "https://xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-dot-australia-southeast1.composer.googleusercontent.com" | |
status_color_map = { | |
"success": "#2EB67D", | |
"running": "#ECB22E", | |
"failed": "#E01E5A", | |
} | |
def __init__( | |
self, | |
channel="#data-ops", | |
status=None, | |
task_name=None, | |
subscribers=None, | |
*args, | |
**kwargs, | |
): | |
self.subscribers = subscribers | |
self.status = status | |
self.task_name = task_name | |
super().__init__( | |
*args, slack_conn_id="slack_bot_token", channel=channel, **kwargs | |
) | |
def execute(self, context): | |
status = self.status or context["ti"].state | |
color = self.status_color_map.get(status) | |
dag_name = context["dag"].dag_id | |
fallback = f"DAG {dag_name} has {status}" | |
log_url = f"{self.airflow_webserver_url}/dags/{dag_name}/graph" | |
self.text = "" | |
self.attachments = self.create_slack_message( | |
dag_name=dag_name, | |
task_name=self.task_name, | |
status=status, | |
color=color, | |
fallback=fallback, | |
subscribers=self.subscribers, | |
log_url=log_url, | |
) | |
super().execute() | |
@classmethod | |
def failure_notification(cls, context): | |
dag_run = context["dag_run"] | |
failed_tis = dag_run.get_task_instances(state="failed") | |
failed_task_ids = [ti.task_id for ti in failed_tis] | |
task_name = failed_task_ids[0] if len(failed_task_ids) > 0 else None | |
cls( | |
task_id="slack_notification_on_failure", | |
task_name=task_name, | |
status="failed", | |
subscribers=["@victor", "@data-crunchers"], | |
).execute(context) | |
@staticmethod | |
def create_slack_message( | |
*, | |
color, | |
fallback, | |
status, | |
dag_name, | |
task_name=None, | |
subscribers=None, | |
log_url=None, | |
): | |
message = SlackMessage() | |
attachment = Attachment(color=color, fallback=fallback) | |
block = Block() | |
block.add_field( | |
"Repository", | |
f"<{CustomSlackOperator.github_url}|your-org/your-repo>", | |
) | |
block.add_field("Event", "dag_run", code_format=True) | |
block.add_field("DAG", dag_name, code_format=True) | |
block.add_field("Status", status, code_format=True) | |
if task_name: | |
block.add_field("Task", task_name, code_format=True) if task_name else None | |
if subscribers: | |
block.add_field( | |
"Subscribers", | |
", ".join(subscribers), | |
) | |
attachment.add_block(block) | |
if log_url: | |
divider = {"type": "divider"} | |
attachment.blocks.append(divider) | |
log_button = { | |
"type": "actions", | |
"elements": [ | |
{ | |
"type": "button", | |
"text": {"type": "plain_text", "text": "Logs"}, | |
"url": log_url, | |
} | |
], | |
} | |
attachment.blocks.append(log_button) | |
message.add_attachment(attachment) | |
return message.attachments |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# dags/utils/slack_message.py | |
class Block: | |
def __init__(self, block_type="section"): | |
self.block_type = block_type | |
self.fields = [] | |
def add_field(self, title, value, code_format=False): | |
if code_format: | |
value = f"`{value}`" | |
self.fields.append({"type": "mrkdwn", "text": f"*{title}*\n{value}"}) | |
def to_dict(self): | |
return {"type": self.block_type, "fields": self.fields} | |
class Attachment: | |
def __init__(self, color, fallback): | |
self.color = color | |
self.fallback = fallback | |
self.blocks = [] | |
def add_block(self, block): | |
self.blocks.append(block.to_dict()) | |
def to_dict(self): | |
return {"color": self.color, "fallback": self.fallback, "blocks": self.blocks} | |
class SlackMessage: | |
def __init__(self): | |
self.attachments = [] | |
def add_attachment(self, attachment): | |
self.attachments.append(attachment.to_dict()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment