Step 1: Define the Snowflake Resource
from dagster import (
Definitions,
AssetKey,
RunRequest,
SensorEvaluationContext,
AssetExecutionContext,
asset,
define_asset_job,
sensor,
SkipReason,
AssetSelection,
AssetSpec,
MetadataValue,
ObserveResult,
multi_observable_source_asset,
EventRecordsFilter,
DagsterEventType,
RunRequest,
asset_sensor,
AssetKey,
EventLogEntry,
AssetMaterialization,
ScheduleDefinition,
)
from datetime import datetime, timedelta, timezone
from dagster_snowflake import SnowflakeResource
snowflake_resource = SnowflakeResource(
account="your_account",
user="your_user",
password="your_password",
database="your_database",
schema="your_schema",
warehouse="your_warehouse",
)
Step 2: Create an Observable Source Asset
TABLE_SCHEMA = "PUBLIC"
table_name = "my_table"
asset_spec = AssetSpec(table_name)
@multi_observable_source_asset(specs=[asset_spec], )
def my_table(duckdb_resource: DuckDBResource):
with duckdb_resource.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(f"SELECT MAX(loaded_at) FROM {table_name}")
last_updated = cursor.fetchone()[0]
yield ObserveResult(
asset_key=table_name,
metadata={
"dagster/last_updated_timestamp": MetadataValue.timestamp(last_updated)
},
)
# define an asset
@asset(deps=[AssetKey(table_name)])
def downstream_table():
return 1
downstream_job = define_asset_job("downstream_job", selection=AssetSelection.assets(downstream_table))
# Define the Observation Job
observation_job = define_asset_job(
"observation_job",
selection=AssetSelection.assets(my_table)
)
# Create a Schedule
observation_schedule = ScheduleDefinition(
name="hourly_observation_schedule",
cron_schedule="0 * * * *", # Every hour
job=observation_job,
)
Step 3: Define the Sensor
@sensor(jobs=[downstream_job])
def my_asset_observation_sensor(context):
asset_key = AssetKey(table_name)
instance = context.instance
# Fetch the latest observation event for the asset
records = instance.get_event_records(
event_records_filter=EventRecordsFilter(
event_type=DagsterEventType.ASSET_OBSERVATION,
asset_key=asset_key,
after_cursor=int(context.cursor) if context.cursor else None,
),
limit=1,
)
if not records:
return
latest_record = records[0]
#latest_observation = latest_record.dagster_event.event_specific_data.materialization
latest_observation = latest_record.asset_observation
#timestamp_str = latest_observation.metadata["dagster/last_updated_timestamp"]
# Extract the last_updated timestamp from the metadata
last_updated_metadata = latest_observation.metadata.get("dagster/last_updated_timestamp")
if not last_updated_metadata or not isinstance(last_updated_metadata, MetadataValue):
return
last_updated_timestamp = last_updated_metadata.value
last_updated_timestamp = datetime.fromtimestamp(last_updated_timestamp, tz=timezone.utc)
# Convert the cursor to a datetime object
cursor_timestamp = datetime.fromtimestamp(int(context.cursor), tz=timezone.utc) if context.cursor else None
# Compare the last_updated timestamp with the cursor
if cursor_timestamp is None or last_updated_timestamp > cursor_timestamp:
# Update the cursor to the latest observation's timestamp
context.update_cursor(str(latest_record.storage_id))
# Trigger a run
yield RunRequest(
run_key=f"run_for_{asset_key.to_user_string()}_{last_updated_timestamp}",
run_config={
"resources": {
"snowflake": {
"config": {
"account": "your_account",
"user": "your_user",
"password": "your_password",
"database": "your_database",
"schema": "your_schema",
"warehouse": "your_warehouse",
}
}
}
},)
Set up the downstream jopb
from dagster import define_asset_job
downstream_job = define_asset_job("downstream_job", selection=AssetSelection.assets(source_table))
Step 5: Combine Everything in Definitions
from dagster import Definitions
defs = Definitions(
assets=[downstream_table, my_table],
jobs=[downstream_job, observation_job],
sensors=[my_asset_observation_sensor],
resources={"snowflake": snowflake_resource},
schedules=[observation_schedule],
)