Created
June 13, 2023 18:42
-
-
Save skrawcz/677daa5e72cba8b9c26d91728468f9e0 to your computer and use it in GitHub Desktop.
Shows how to do async based functions in hamilton -- and then a suggestion for another way to parallelize
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from hamilton.function_modifiers import extract_columns | |
import pandas as pd | |
async def _run_query(col_name: str, query_string: str) -> pd.DataFrame: | |
# this would go to the database -- ideally the client is passed in as a parameter | |
# the assumption here is that the database driver is asyncio based, else there's no | |
# value in doing this :) | |
return pd.DataFrame({col_name: [query_string]}) | |
# async Hamilton func for query #1 | |
async def sql_1(param1: str) -> pd.DataFrame: | |
query_string = "..." + param1 + "..." | |
return await _run_query("a", query_string) | |
# query #1 dataframe -> columns | |
@extract_columns("a") | |
def sql_1_extract(sql_1: pd.DataFrame) -> pd.DataFrame: | |
return sql_1 | |
# async Hamilton func for query #n | |
async def sql_n(param2: str) -> pd.DataFrame: | |
query_string = "..." + param2 + "..." | |
return await _run_query("b", query_string) | |
# query #n dataframe -> columns | |
@extract_columns("b") | |
def sql_n_extract(sql_n: pd.DataFrame) -> pd.DataFrame: | |
return sql_n |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from hamilton import base | |
from hamilton import driver | |
from hamilton.experimental import h_async | |
from hamilton.experimental import h_dask | |
import async_funcs | |
dr = h_async.AsyncDriver({}, async_funcs) | |
loop = asyncio.get_event_loop() | |
execute_coroutine = dr.execute(["a", "b"], inputs={"param1": "1", "param2": "2"}) | |
dr.visualize_execution(["a", "b"], "./example", {"format": "png"}, inputs={"param1": "1", "param2": "2"}) | |
result = loop.run_until_complete(execute_coroutine) | |
print(result) | |
# suggested way to do parallelization: | |
import loading_functions | |
import transforming_functions | |
# parallelized hamilton RUN -- get data in parallel | |
# suggestion - split loading functions into their own module | |
adapter = h_dask.SimplePythonGraphAdapter(base.DictResult()) | |
dr1 = driver.Driver({}, loading_functions, result_builder=adapter) | |
results = dr1.execute(["table1", "table2"]) | |
# now create next driver to run things sequentially | |
# suggestion - split transforming functions into their own module(s) | |
dr2 = driver.Driver({}, transforming_functions) | |
results = dr2.execute(["col1", "col2"], inputs=results) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment