Last active
July 3, 2023 18:23
-
-
Save SF-300/a4bb5fb7d190a094043d1be2961f1579 to your computer and use it in GitHub Desktop.
Declaratively describe delays - mostly for retry actions.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This module provides a declarative way to describe delay strategies for controlling the interval between repeated actions, such as retrying a network request. | |
There are two categories of strategies: | |
1. Root Strategies: These provide the base logic for calculating delay time. | |
- Linear: Increases delay time linearly. | |
- Exponential: Increases delay time exponentially. | |
2. Modifier Strategies: These adjust the delay time calculated by a root strategy. | |
- Jittered: Introduces a random element to the delay time from another strategy. | |
- Capped: Limits the number of delay iterations from another strategy. | |
The Delays class is used to introduce the actual delay according to the specified root or modifier strategy. | |
""" | |
import asyncio | |
import time | |
import random | |
import itertools | |
from typing import Iterable, Iterator, TypeAlias, AsyncIterator | |
__all__ = ("Strategy", "Linear", "Exponential", "Jittered", "Capped", "Delays") | |
Strategy: TypeAlias = Iterable[float] | |
class Linear(Strategy): | |
def __init__(self, start: float = 1, step: float = 1) -> None: | |
if start <= 0: | |
raise ValueError(f"'start' must be positive but {start} given") | |
self._start = start | |
self._step = step | |
def __iter__(self) -> Iterator[float]: | |
for i in itertools.count(start=self._start, step=self._step): | |
yield max(i, 0.0) | |
class Exponential(Strategy): | |
def __init__(self, base: float, start: float = 1, step: float = 1) -> None: | |
if start <= 0: | |
raise ValueError(f"'start' must be positive but {start} given") | |
self._base = base | |
self._start = start | |
self._step = step | |
def __iter__(self) -> Iterator[float]: | |
for i in itertools.count(start=self._start, step=self._step): | |
yield self._base ** i | |
class Jittered(Strategy): | |
def __init__(self, source: Strategy, delta: float) -> None: | |
self._source = source | |
self._delta = delta | |
def __iter__(self) -> Iterator[float]: | |
for delay in self._source: | |
yield random.uniform(max(delay - self._delta, 0.0), delay + self._delta) | |
class Capped(Strategy): | |
def __init__(self, source: Strategy, attempts: int) -> None: | |
if attempts <= 0: | |
raise ValueError(f"'attempts' must be positive but {attempts} given") | |
self._source = source | |
self._attempts = attempts | |
def __iter__(self) -> Iterator[float]: | |
return itertools.islice(self._source, self._attempts - 1) | |
class Delays: | |
"""Each iteration is delayed (except for the first one) while yielding the next delay as float. | |
Examples | |
-------- | |
def do_something_useful(retry_strategy: Strategy = Capped(Jittered(Linear(), 0.5), 10)): | |
# 'for' can be replaced with 'async for' to get asynchronous delays instead of synchronous ones. | |
for next_delay in Delays(retry_strategy): | |
if next_delay is not None: | |
print(f"Next iteration will be delayed for {next_delay} seconds.") | |
else: | |
print("No more iterations will happen after this one.") | |
""" | |
def __init__(self, strategy: Strategy) -> None: | |
self._delays = iter(strategy) | |
def __iter__(self) -> Iterator[float | None]: | |
for next_delay in self._delays: | |
yield next_delay | |
time.sleep(next_delay) | |
yield None | |
async def __aiter__(self) -> AsyncIterator[float | None]: | |
for next_delay in self._delays: | |
yield next_delay | |
await asyncio.sleep(next_delay) | |
yield None |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment