Skip to content

Instantly share code, notes, and snippets.

@ar2pi
Last active July 24, 2024 05:17
Show Gist options
  • Save ar2pi/b0e0353f39405cdac09c6a21f1b9d72b to your computer and use it in GitHub Desktop.
Save ar2pi/b0e0353f39405cdac09c6a21f1b9d72b to your computer and use it in GitHub Desktop.
Retry with exponential backoff
#!/usr/bin/env python3
import asyncio
import functools
import logging
import math
import signal
import time
from contextlib import asynccontextmanager, contextmanager
class TimeoutException(Exception):
pass
class NonRetryableException(Exception):
pass
@contextmanager
def timeout(seconds):
def signal_handler(signum, frame):
raise TimeoutException(f"Timed out after {seconds} seconds")
signal.signal(signal.SIGALRM, signal_handler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0)
@asynccontextmanager
async def async_timeout(seconds):
task = asyncio.current_task()
timeout_handle = asyncio.get_event_loop().call_later(seconds, task.cancel)
try:
yield
except asyncio.CancelledError:
raise TimeoutException(f"Task timed out after {seconds} seconds")
finally:
timeout_handle.cancel()
def retry(
wait_exponential_multiplier=1,
wait_exponential_max=60,
stop_max_delay=240,
stop_max_attempt_number=math.inf,
):
"""
## Description
Retry with exponential backoff maxed at `wait_exponential_max`
until either `stop_max_delay` or `stop_max_attempt_number` is reached.
## Args
- `wait_exponential_multiplier`: Multiplier by which to increase the delay between retries.
- `wait_exponential_max`: Maximum delay in seconds.
- `stop_max_delay`: Maximum total time in seconds.
- `stop_max_attempt_number`: Maximum number of attempts.
## Usage
```python
@retry()
some_unreliable_func():
pass
```
"""
def decorator(func):
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
last_error = None
time_start = time.time()
time_since_start = 0
retry_count = 0
try:
with timeout(stop_max_delay):
while retry_count < stop_max_attempt_number:
try:
result = func(*args, **kwargs)
except TimeoutException:
break
except Exception as e:
last_error = e
retry_count += 1
delay = min(
wait_exponential_max,
wait_exponential_multiplier * 2 ** (retry_count - 1),
)
logging.error(
f"{func.__name__}{args, kwargs} Error: {e}. Retrying in {delay} seconds..."
)
time.sleep(delay)
else:
return result
except TimeoutException:
pass
time_since_start = time.time() - time_start
raise NonRetryableException(
f"{func.__name__}{args, kwargs} Failed after {round(time_since_start, 2)}s, {retry_count} retries. Last error: {last_error}"
)
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
last_error = None
time_start = time.time()
time_since_start = 0
retry_count = 0
try:
async with async_timeout(stop_max_delay):
while retry_count < stop_max_attempt_number:
try:
result = await func(*args, **kwargs)
except Exception as e:
last_error = e
retry_count += 1
delay = min(
wait_exponential_max,
wait_exponential_multiplier * 2 ** (retry_count - 1),
)
logging.error(
f"{func.__name__}{args, kwargs} Error: {e}. Retrying in {delay} seconds..."
)
await asyncio.sleep(delay)
else:
return result
except TimeoutException:
pass
time_since_start = time.time() - time_start
raise NonRetryableException(
f"{func.__name__}{args, kwargs} Failed after {round(time_since_start, 2)}s, {retry_count} retries. Last error: {last_error}"
)
if asyncio.iscoroutinefunction(func):
return async_wrapper
return sync_wrapper
return decorator
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment