Last active
May 25, 2023 16:20
-
-
Save crscheid/10dbf291779667831b856fab9cc9745b to your computer and use it in GitHub Desktop.
Sample code for renaming flows in Prefect 2.0 at runtime
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import prefect | |
from prefect import flow, task, get_run_logger | |
import requests | |
import json | |
import time | |
from random import randint, random | |
def rename_flow_run(new_name: str): | |
"""Renames the current flow""" | |
flow_id = str(prefect.context.get_run_context().flow_run.dict()['id']) | |
response = requests.patch(f'http://<ip>:<port>/api/flow_runs/{flow_id}', headers={'Content-Type': 'application/json'}, data=json.dumps({'name': new_name}), allow_redirects=False) | |
@task | |
def determine_type(content_id: str): | |
# Return a random type | |
if randint(0, 100) > 50: | |
return "a" | |
else: | |
return "b" | |
@flow | |
def handle_a_case(content_id: str): | |
logger = get_run_logger() | |
logger.info(f"handle_a_case: checking {content_id}") | |
rename_flow_run(content_id) | |
# Do work | |
logger.info("handle_a_case: Doing work") | |
@flow | |
def handle_b_case(content_id: str): | |
logger = get_run_logger() | |
logger.info(f"handle_b_case: checking {content_id}") | |
rename_flow_run(content_id) | |
# Do work | |
logger.info("handle_b_case: Doing work") | |
@flow | |
def handle_object(object_id: str): | |
logger = get_run_logger() | |
logger.info(f"handle_content: checking {object_id}") | |
rename_flow_run(object_id) | |
# Simulate checking type | |
content_type = determine_type(object_id) | |
if content_type == "a": | |
logger.info(f"handle_object: sending {object_id} to a sub flow") | |
handle_a_case(object_id) | |
elif content_type == "b": | |
logger.info(f"handle_object: sending {object_id} to b sub flow") | |
handle_b_case(object_id) | |
else: | |
logger.info( | |
"handle_object: completing, other type of content received") | |
if __name__ == "__main__": | |
handle_object("test_id_678910") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment