Skip to content

Instantly share code, notes, and snippets.

Last active September 28, 2023 13:12
Show Gist options
  • Save wtfzambo/190d4fb1f3b00f8f9a012de269420350 to your computer and use it in GitHub Desktop.
Save wtfzambo/190d4fb1f3b00f8f9a012de269420350 to your computer and use it in GitHub Desktop.
Easily format slack messages for pipelines.
import datetime
from abc import ABC, abstractmethod
from typing import Any, Literal, Sequence, cast, overload
from dlt.common.pipeline import LoadInfo
from dlt.common.storages.load_storage import LoadJobInfo
from slack_table import ColumnSpec, TRow
class BaseDataFormatter(ABC):
_COL_SPECS: Sequence[ColumnSpec]
_formatted_data: Sequence[TRow]
_global_max_width = 50
_width_multiplier = 1.2
def __init__(self, data: LoadInfo):
if not getattr(self, "_COL_SPECS", None):
raise NotImplementedError(
f"_COL_SPECS must be set in the __init__ method of {self.__class__.__name__}"
) = data
def _format_data(self) -> None:
def _update_max_col_width(self) -> None:
for col_spec in self._COL_SPECS:
max_width = col_spec.get("width", 0)
for row in self._formatted_data:
data_index = col_spec["data_index"]
value = row[data_index]
max_width = min(max(max_width, len(value)), self._global_max_width)
col_spec["width"] = int(max_width * self._width_multiplier)
def format_data(self) -> tuple[Sequence[ColumnSpec], Sequence[TRow]]:
return self._COL_SPECS, self._formatted_data
class JobFailedDataFormatter(BaseDataFormatter):
def __init__(self, data: LoadInfo):
# fmt: off
self._COL_SPECS: Sequence[ColumnSpec] = [
{"title": "Table", "data_index": "table_name", "base_path": "job_file_info"},
{"title": "Started on", "data_index": "created_at"},
{"title": "Message", "data_index": "failed_message"},
# fmt: on
self._failed_jobs = self._get_failed_jobs()
def _get_failed_jobs(self) -> list[LoadJobInfo]:
return [
for package in
for job in["failed_jobs"]
def _format_obj(self, obj: Any) -> str:
if isinstance(obj, datetime.datetime):
obj = obj.strftime("%Y-%m-%d %H:%M:%S")
if callable(obj):
obj = cast(str, obj())
return str(obj).replace("\n", " ")
def _get_nested_attr(self, obj: Any, attr_path: str) -> str:
attrs = attr_path.split(".")
for attr in attrs:
obj = getattr(obj, attr, None)
return self._format_obj(obj)
def _format_data(self) -> None:
formatted_data = []
for job in self._failed_jobs:
job_dict = {}
for col_spec in self._COL_SPECS:
base_path = col_spec.get("base_path")
data_index = col_spec["data_index"]
path = f"{base_path}.{data_index}" if base_path else data_index
job_dict[data_index] = self._get_nested_attr(job, path)
self._formatted_data = formatted_data
class SchemaChangesFormatter(BaseDataFormatter):
def __init__(self, data: LoadInfo):
self._COL_SPECS: Sequence[ColumnSpec] = [
{"title": "Table", "data_index": "table_name"},
{"title": "Source", "data_index": "schema_name"},
{"title": "Column", "data_index": "column"},
{"title": "Data type", "data_index": "data_type"},
def _format_data(self) -> None:
formatted_data = []
for package in
if not package.schema_update.items():
for table_name, table in package.schema_update.items():
for i, (column_name, column) in enumerate(table["columns"].items()):
changes_dict = {}
changes_dict["table_name"] = table_name if i == 0 else ""
changes_dict["schema_name"] = package.schema_name if i == 0 else ""
changes_dict["column"] = column_name
changes_dict["data_type"] = column["data_type"]
self._formatted_data = formatted_data
def get_data_formatter(
alert_type: Literal["failed_jobs"], data: LoadInfo
) -> JobFailedDataFormatter:
def get_data_formatter(
alert_type: Literal["schema_changes"], data: LoadInfo
) -> SchemaChangesFormatter:
def get_data_formatter(
alert_type: Literal["failed_jobs", "schema_changes"], data: LoadInfo
) -> JobFailedDataFormatter | SchemaChangesFormatter:
if alert_type == "failed_jobs":
return JobFailedDataFormatter(data)
elif alert_type == "schema_changes":
return SchemaChangesFormatter(data)
raise ValueError("Unsupported alert type")
import os
import traceback
from typing import Any, Callable, Sequence
import dlt
from dlt.common.pipeline import LoadInfo
from dlt.common.runtime.slack import send_slack_message
from loguru import logger
from formatters import get_data_formatter
from slack_table import slack_table
def _check_failed_jobs(load_info: LoadInfo) -> None:
if not load_info.has_failed_jobs:
formatter = get_data_formatter("failed_jobs", load_info)
formatted_data = formatter.format_data()
slack_message = slack_table(
"⚠️ Warning - the following jobs have failed",
f"Dataset ⟶ `{load_info.pipeline.dataset_name}`",
logger.warning("\n" + slack_message.replace("```", "\n"))
load_info.pipeline.runtime_config.slack_incoming_hook, slack_message
def _check_schema_changes(load_info: LoadInfo) -> None:
if load_info.pipeline.first_run:
formatter = get_data_formatter("schema_changes", load_info)
formatted_data = formatter.format_data()
if not formatted_data[1]:
slack_message = slack_table(
"🧩 Info - schema changes (new columns/tables) detected",
f"Dataset ⟶ `{load_info.pipeline.dataset_name}`",
logger.warning("\n" + slack_message.replace("```", "\n"))
load_info.pipeline.runtime_config.slack_incoming_hook, slack_message
def _alert_failure(exception: Exception, traceback: str) -> None:
slack_message = (
+ f"*🔥 Error - pipeline crashed in `{CI_ENVIRONMENT}`*\n\n"
+ f"\tError message ⟶ `{repr(exception)}`\n\n"
+ f"```{traceback}```"
send_slack_message(dlt.config["runtime.slack_incoming_hook"], slack_message)
def pipeline_sentry(fn: Callable[[], Sequence[LoadInfo]]) -> Callable[[], None]:
def wrapper(*args: Any, **kwargs: Any) -> None:
try:"Starting pipeline in {fn.__module__}")
pipeline_load_infos = fn(*args, **kwargs)
for load_info in pipeline_load_infos:
except Exception as e:
tb = traceback.format_exc()
_alert_failure(e, tb)
return wrapper
# inspired by
from typing import Literal, NotRequired, Sequence, TypedDict, cast
class ColumnSpec(TypedDict):
title: str
data_index: str
base_path: NotRequired[str]
width: NotRequired[int]
align: NotRequired[str]
TRow = dict[str, str]
def _pad_left(text: str = "", max_length: int = 13) -> str:
return text[:max_length].rjust(max_length)
def _pad_right(text: str = "", max_length: int = 13) -> str:
return text[:max_length].ljust(max_length)
def _fill_dash(length: int) -> str:
return "-" * length
def _get_lines(columns: Sequence[ColumnSpec]) -> str:
total_length = sum([col.get("width", 10) for col in columns]) + len(columns) - 1
return _fill_dash(total_length)
def _get_col_value(col: ColumnSpec, row: TRow) -> str:
align = col.get("align", "left")
width = col.get("width", 10)
data_index = col["data_index"]
pad = _pad_left if align == "right" else _pad_right
value = str(row.get(data_index, ""))
return pad(value, width)
def _get_row(columns: Sequence[ColumnSpec], row: TRow | Literal["-"]) -> str:
if row == "-":
return _get_lines(columns)
return " ".join([_get_col_value(column, cast(TRow, row)) for column in columns])
def _get_header_col(col: ColumnSpec) -> str:
align = col.get("align", "left")
width = col.get("width", 10)
title = col.get("title", "")
pad = _pad_left if align == "right" else _pad_right
return pad(title, width)
def _get_header_row(columns: Sequence[ColumnSpec]) -> str:
return " ".join([_get_header_col(column) for column in columns])
# WARN: currently breaks formatting in slack if message longer than 4k characters
def slack_table(
title: str = "",
subtitle: str = "",
columns: Sequence[ColumnSpec] = [],
data_source: Sequence[TRow] = [],
) -> str:
table_data = (
[_get_header_row(columns)] + # noqa
[_get_row(columns, '-')] + # noqa
[_get_row(columns, row) for row in data_source]
return (
+ f"\t{subtitle}\n\n```"
+ "\n".join(table_data)
+ "\n```\n"
Copy link


(Install loguru for no hassle out of the box pretty logging)

from dlt.common.pipeline import LoadInfo
from typing import Sequence

from monitoring import pipeline_sentry

def run_pipeline() -> Sequence[LoadInfo]:
    load_info =
    return load_info,  # <- Notice the comma. 
    # As is, the decorated function must return a sequence of `LoadInfo` for pipeline_sentry to work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment