Skip to content

Instantly share code, notes, and snippets.

@BasPH
Created July 23, 2019 13:55
Show Gist options
  • Save BasPH/750fe3d7e63bd15392b74282fe04c085 to your computer and use it in GitHub Desktop.
Save BasPH/750fe3d7e63bd15392b74282fe04c085 to your computer and use it in GitHub Desktop.
"""Test the validity of all DAGs."""
import glob
from os import path
import pytest
from airflow import models as airflow_models
DAG_PATHS = glob.glob(path.join(path.dirname(__file__), "..", "..", "dags", "*.py"))
@pytest.mark.parametrize("dag_path", DAG_PATHS)
def test_dag_integrity(dag_path):
"""Import DAG files and check for a valid DAG instance."""
dag_name = path.basename(dag_path)
module = _import_file(dag_name, dag_path)
# Validate if there is at least 1 DAG object in the file
assert any(isinstance(var, airflow_models.DAG) for var in vars(module).values())
# For every DAG object, test for cycles
for dag in [var for var in vars(module).values() if isinstance(var, airflow_models.DAG)]:
dag.test_cycle()
def _import_file(module_name, module_path):
import importlib.util
spec = importlib.util.spec_from_file_location(module_name, str(module_path))
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment