Skip to content

Instantly share code, notes, and snippets.

@cnolanminich
cnolanminich / jenkins.file
Last active September 13, 2024 16:29
Jenkinsfile for hybrid
pipeline {
agent any
environment {
// Construct the IMAGE_TAG using Jenkins environment variables
IMAGE_TAG = "${env.GIT_COMMIT}-${env.BUILD_ID}
AWS_ACCESS_KEY_ID = credentials('aws-access-key-id') // Reference to the AWS access key ID secret
AWS_SECRET_ACCESS_KEY = credentials('aws-secret-access-key') // Reference to the AWS secret access key secret
AWS_REGION = 'us-west-2' // Set your AWS region
}
@cnolanminich
cnolanminich / conditional_schedule.py
Created September 11, 2024 02:50
Schedule that only runs if the upstream asset is fresh
import dagster as dg
from datetime import timedelta
# Upstream asset that should be fresh
@dg.asset
def upstream_asset(context: dg.AssetExecutionContext) -> None:
context.log.info("Upstream asset is being computed")
@dg.asset
def downstream_asset(context: dg.AssetExecutionContext) -> None:
@cnolanminich
cnolanminich / snowflake_sensor.py
Created August 23, 2024 21:13
sensor that checks multiple snowflake tables and ensures they're all
from dagster import (
sensor,
EventLogEntry,
RunRequest,
SensorEvaluationContext,
SkipReason,
)
from datetime import datetime
# this would be your job
@cnolanminich
cnolanminich / dlt_assets.py
Created July 26, 2024 16:34
example using dlt to ingest s3
# after running dlt init filesystem duckdb
from dagster import AssetExecutionContext
from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets
import dlt
from dlt_sources.filesystem_pipeline import s3_locations_data
from dlt_sources.filesystem import readers
from pathlib import Path
import os
@cnolanminich
cnolanminich / .env
Last active July 26, 2024 16:32
Use dlt open api codegen to create Dagster pipeline
LOCATIONS_DATA_2__SOURCES__GITHUB_FROM_OPENAPI__BASE_URL=https://api.github.com
LOCATIONS_DATA_2__SOURCES__GITHUB_FROM_OPENAPI__ACCESS_TOKEN={your_token_here}
@cnolanminich
cnolanminich / definitions.py
Last active July 26, 2024 14:01
Example dlt loading from s3
from dagster import (
Definitions,
)
from .assets.dlt_assets import dagster_s3_assets
defs = Definitions(
assets=[dagster_s3_assets])
@cnolanminich
cnolanminich / example_cluster_role.yaml
Created June 28, 2024 00:50
Permissions needed for cross-namespace
# give the created service account cross namespace permissions (cluster role)
extraManifests:
- kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: extra-cluster-role
rules:
- apiGroups: ["batch"]
resources: ["jobs", "jobs/status"]
verbs: ["*"]
@cnolanminich
cnolanminich / asset_group1.py
Last active June 18, 2024 19:54
Example of assets that take deps from a set of other assets
#assets you want to put in the initial asset group
from dagster import (
asset,
)
# Define your assets and group them
@asset
def asset1():
pass
@cnolanminich
cnolanminich / example_snowflake_sensor.md
Last active June 12, 2024 01:39
Execute a Snowflake job based on whether a table has been updated

Step 1: Define the Snowflake Resource

from dagster import (
    Definitions,
    AssetKey,
    RunRequest,
    SensorEvaluationContext,
    AssetExecutionContext,
@cnolanminich
cnolanminich / example_singular_test.sql
Created April 11, 2024 23:30
singular dbt test with more than 1 ref with config needed
{{ config(
severity = 'warn',
meta = {
'dagster': {
'ref': {
'name': 'orders_cleaned'
}
}
}
)