Skip to content

Instantly share code, notes, and snippets.

@cnolanminich
Created September 11, 2024 02:50
Show Gist options
  • Save cnolanminich/9106cff7e9f5eec1a0f8d43aca6a781e to your computer and use it in GitHub Desktop.
Save cnolanminich/9106cff7e9f5eec1a0f8d43aca6a781e to your computer and use it in GitHub Desktop.
Schedule that only runs if the upstream asset is fresh
import dagster as dg
from datetime import timedelta
# Upstream asset that should be fresh
@dg.asset
def upstream_asset(context: dg.AssetExecutionContext) -> None:
context.log.info("Upstream asset is being computed")
@dg.asset
def downstream_asset(context: dg.AssetExecutionContext) -> None:
context.log.info("downstream asset is being computed")
# freshness check on upstream_asset to ensure it's fresh within 2 hours
upstream_asset_freshness_checks = dg.build_last_update_freshness_checks(
assets=[upstream_asset],
lower_bound_delta=timedelta(hours=2),
)
# sensor to assess freshness every 30 seconds
freshness_checks_sensor = dg.build_sensor_for_freshness_checks(
freshness_checks=upstream_asset_freshness_checks
)
scheduled_job = dg.define_asset_job("my_job", selection=[upstream_asset, downstream_asset])
# schedule that runs once a minute. If the upstream asset is not fresh, then the schedule does not run
# if the asset is fresh, the schedule for asset a and b will run
@dg.schedule(job=scheduled_job, cron_schedule="* * * * *")
def schedule_that_checks(context):
# Find runs of the same job that are currently running
asset_key = dg.AssetKey("upstream_asset")
check_key = dg.AssetCheckKey(asset_key=asset_key, name="freshness_check")
summary_records = context.instance.event_log_storage.get_asset_check_summary_records([check_key])
context.log.info(f"Summary records: {summary_records}")
summary_record = summary_records.get(check_key)
last_check_execution_record = summary_record.last_check_execution_record
context.log.info(f"Last check execution record: {last_check_execution_record}")
if last_check_execution_record and last_check_execution_record.status.value == "FAILED":
return dg.SkipReason("Upstream asset is not fresh")
return dg.RunRequest()
defs = dg.Definitions(
assets=[upstream_asset, downstream_asset],
asset_checks=upstream_asset_freshness_checks,
sensors=[freshness_checks_sensor],
schedules = [schedule_that_checks]
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment