Skip to content

Instantly share code, notes, and snippets.

@cnolanminich
Created July 26, 2024 16:34
Show Gist options
  • Save cnolanminich/2d35402147bbe94203f17843acc8be61 to your computer and use it in GitHub Desktop.
Save cnolanminich/2d35402147bbe94203f17843acc8be61 to your computer and use it in GitHub Desktop.
example using dlt to ingest s3
# after running dlt init filesystem duckdb
from dagster import AssetExecutionContext
from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets
import dlt
from dlt_sources.filesystem_pipeline import s3_locations_data
from dlt_sources.filesystem import readers
from pathlib import Path
import os
@dlt_assets(
dlt_source=
readers(
bucket_url="s3://hooli-demo/embedded-elt/", file_glob="*.csv"
),
dlt_pipeline=dlt.pipeline(
pipeline_name="locations_data_2",
dataset_name="locations_2",
destination=dlt.destinations.duckdb(os.path.join(DBT_PROJECT_DIR, "example.duckdb")), #"duckdb",
progress="log",
),
name="locations_2",
group_name="dlt_testing",
)
def dagster_s3_assets_2(context: AssetExecutionContext, dlt: DagsterDltResource):
yield from dlt.run(context=context)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment