Created
August 4, 2022 23:13
-
-
Save seanlindo/a096af16dc34df4f4e6f31be3c2c5bae to your computer and use it in GitHub Desktop.
Dagster Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dagster import IOManager, io_manager | |
class MyIOManager(IOManager): | |
def __init__(self): | |
self.storage_dict = {} | |
def handle_output(self, context, obj): | |
self.storage_dict[(context.step_key, context.name)] = obj | |
def load_input(self, context): | |
return self.storage_dict[(context.upstream_output.step_key, context.upstream_output.name)] | |
@io_manager | |
def my_io_manager(_): | |
return MyIOManager() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dagster import asset, with_resources, repository, define_asset_job, SourceAsset | |
from example_io_manager import my_io_manager | |
raw_users = SourceAsset(key="raw_users", io_manager_key="test_io_manager") | |
@asset(io_manager_key="test_io_manager") | |
def upstream_asset(raw_users): | |
#TODO do some transformations | |
transformedUsers = raw_users | |
return transformedUsers | |
#TODO what if there are many transformation steps? does each one have to be an asset? can they just be separate ops that work on the asset? | |
@repository | |
def repo(): | |
return [ | |
*with_resources( | |
[ raw_users, upstream_asset ], | |
resource_defs={ | |
"test_io_manager": my_io_manager | |
}, | |
), | |
define_asset_job("process_users"), | |
] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment