Last active
October 13, 2023 16:14
-
-
Save josemarcosrf/6ce2037ae927f1c519ef4c96b7a069b2 to your computer and use it in GitHub Desktop.
Python helper functions, decorators and context managers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import signal | |
from contextlib import contextmanager | |
from time import perf_counter | |
@contextmanager | |
def timeout(duration: int): | |
def timeout_handler(signum, frame): | |
raise Exception(f"Block timed out after {duration} seconds") | |
signal.signal(signal.SIGALRM, timeout_handler) | |
signal.alarm(duration) | |
try: | |
yield | |
finally: | |
signal.alarm(0) | |
class catchtime: | |
def __enter__(self): | |
self.time = perf_counter() | |
return self | |
def __exit__(self, type, value, traceback): | |
self.time = perf_counter() - self.time | |
self.readout = f"⏳️ Done in: {self.time:.3f} seconds" | |
rprint(self.readout) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import os | |
import re | |
import requests | |
import yaml | |
import numpy as np | |
from concurrent.futures.thread import ThreadPoolExecutor | |
from requests.adapters import HTTPAdapter | |
from requests.packages.urllib3.util.retry import Retry # noqa | |
from itertools import chain | |
from typing import Any | |
from typing import Dict | |
from typing import Callable | |
from typing import List | |
from typing import Text | |
from typing import Tuple | |
from typing import Union | |
REPL_1 = re.compile('(.)([A-Z][a-z]+)') | |
REPL_2 = re.compile('([a-z0-9])([A-Z])') | |
def camel_to_snake(name): | |
name = re.sub(REPL_1, r'\1_\2', name) | |
return re.sub(REPL_2, r'\1_\2', name).lower() | |
def read_config(conf_file: Text) -> Dict[Any, Any]: | |
def env_replace(elem: Any) -> Any: | |
if isinstance(elem, dict): | |
return {k: env_replace(v) for k, v in elem.items()} | |
elif isinstance(elem, list): | |
return [env_replace(v) for v in elem] | |
elif isinstance(elem, str) and elem.startswith("$"): | |
var_name = elem[1:] | |
return os.getenv(var_name) # set to None if it doesn't exist | |
else: | |
return elem | |
with open(conf_file, "r") as f: | |
config = yaml.safe_load(f) or {} | |
return env_replace(config) | |
def configure_request_session( | |
total: int = 10, connect: int = 3, backoff: int = 1 | |
) -> requests.Session: | |
session = requests.Session() | |
retries = Retry(total=total, connect=connect, backoff_factor=backoff) | |
adapter = HTTPAdapter(max_retries=retries) | |
session.mount("https://", adapter) | |
session.mount("http://", adapter) | |
return session | |
async def parallel_requests( | |
func_calls: List[Tuple[Union[Text, int], Callable, Tuple[Any]]], | |
max_workers: int = 10, | |
): | |
"""Calls a series of functions in parallel. | |
Args: | |
func_calls (List[Tuple[Union[Text, int], Callable, Tuple[Any]]]): | |
list of tuples; (function ID, function, function parameters) | |
max_workers (int, optional): [description]. Defaults to 10. | |
""" | |
def mapping_func_call(func_id, func, *args, **kwargs): | |
return {func_id: func(*args, **kwargs)} | |
with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
# Initialize the event loop | |
loop = asyncio.get_event_loop() | |
# Create all tasks | |
tasks = [ | |
loop.run_in_executor( | |
executor, mapping_func_call, *tuple([f_id, func, *f_params]) | |
) | |
for f_id, func, f_params in func_calls | |
] | |
# Initializes the tasks to run and awaits their results | |
results = {} | |
for response in await asyncio.gather(*tasks): | |
results.update(response) | |
return results | |
def flatten(lol: List[List[Any]]) -> List[Any]: | |
return list(chain.from_iterable(lol)) | |
def as_batch(x: Union[List, np.ndarray]) -> np.ndarray: | |
x = np.array(x) | |
if x.ndim < 2: | |
x = np.expand_dims(x, axis=0) | |
return x | |
class dotdict(dict): | |
""" dot.notation access to dictionary attributes """ | |
__getattr__ = dict.get | |
__setattr__ = dict.__setitem__ | |
__delattr__ = dict.__delitem__ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import time | |
import signal | |
import numpy as np | |
from functools import wraps | |
from typing import Callable | |
from pstats import SortKey | |
from pstats import Stats | |
def profiled(*filters): | |
def wrapper(func: Callable) -> Callable: | |
@wraps(func) | |
def profile_wrapper(*args, **kwargs): | |
# https://docs.python.org/3/library/profile.html#module-cProfile | |
with Profile(builtins=False) as profile: | |
res = func(*args, **kwargs) | |
( | |
Stats(profile) | |
.strip_dirs() | |
.sort_stats(SortKey.CUMULATIVE) | |
.print_stats(*filters) # print top 10% by cumulative time | |
) | |
return res | |
return profile_wrapper | |
return wrapper | |
def force_async(fn): | |
""" | |
turns a sync function to async function using threads | |
""" | |
from concurrent.futures import ThreadPoolExecutor | |
import asyncio | |
pool = ThreadPoolExecutor() | |
@wraps(fn) | |
def wrapper(*args, **kwargs): | |
future = pool.submit(fn, *args, **kwargs) | |
return asyncio.wrap_future(future) # make it awaitable | |
return wrapper | |
def force_sync(fn): | |
""" | |
turn an async function to sync function | |
""" | |
import asyncio | |
@wraps(fn) | |
def wrapper(*args, **kwargs): | |
res = fn(*args, **kwargs) | |
if asyncio.iscoroutine(res): | |
return asyncio.get_event_loop().run_until_complete(res) | |
return res | |
return wrapper | |
def fire_and_forget(f): | |
def wrapped(*args, **kwargs): | |
return asyncio.get_event_loop().run_in_executor(None, f, *args, *kwargs) | |
return wrapped | |
def as_numpy_array(func, dtype=np.float32): | |
@wraps(func) | |
def arg_wrapper(self, *args, **kwargs): | |
r = func(self, *args, **kwargs) | |
r_type = type(r).__name__ | |
if r_type in {"ndarray", "EagerTensor", "Tensor", "list"}: | |
return np.array(r, dtype) | |
else: | |
raise TypeError("unrecognized type {}: {}".format(r_type, type(r))) | |
return arg_wrapper | |
def as_numpy_batch(func, dtype=np.float32): | |
@wraps(func) | |
def arg_wrapper(self, *args, **kwargs): | |
r = func(self, *args, **kwargs) | |
r_type = type(r).__name__ | |
if r_type in {"ndarray", "EagerTensor", "Tensor", "list"}: | |
r = np.array(r, dtype) | |
else: | |
raise TypeError("unrecognized type {}: {}".format(r_type, type(r))) | |
if r.ndim < 2: | |
r = np.expand_dims(r, axis=0) | |
return r | |
return arg_wrapper | |
def batched(batch_size): | |
def wrapper(func): | |
@wraps(func) | |
def batcher(*args, **kwargs): | |
long_list = args[0] | |
n = len(long_list) | |
start = 0 | |
end = start + batch_size | |
chunk = long_list[start:end] | |
while chunk: | |
print(f"sending from {start} to {end}") | |
func(chunk) | |
start = end | |
end += batch_size | |
chunk = long_list[start:end] | |
return batcher | |
return wrapper | |
def f_timeout(timeout_secs: int): | |
def wrapper(func): | |
@wraps(func) | |
def time_limited(*args, **kwargs): | |
# Register an handler for the timeout | |
def handler(signum, frame): | |
raise Exception(f"Timeout for function '{func.__name__}'") | |
# Register the signal function handler | |
signal.signal(signal.SIGALRM, handler) | |
# Define a timeout for your function | |
signal.alarm(timeout_secs) | |
result = None | |
try: | |
result = func(*args, **kwargs) | |
except Exception as exc: | |
raise exc | |
finally: | |
# disable the signal alarm | |
signal.alarm(0) | |
return result | |
return time_limited | |
return wrapper | |
def timeit(func: Callable) -> Callable: | |
@wraps(func) | |
def timed_wrapper(*args, **kwargs): | |
start = time.time() | |
res = func(*args, **kwargs) | |
delta = time.time() - start | |
return res, delta | |
return timed_wrapper |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Added a timeout wrapper to interrupt a function's execution