Last active
July 26, 2022 16:18
-
-
Save pssolanki111/d7ef5d8cee93ca74e80ef8d6f39b9268 to your computer and use it in GitHub Desktop.
A Rate limiter for async applications. Implements the Leaky Bucket Algorithm
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import contextlib | |
from collections import OrderedDict | |
import time | |
class AsyncLeakyBucket(contextlib.AbstractAsyncContextManager): | |
def __init__(self, max_rate: float, time_period: float = 60, loop=None): | |
self._loop, self._max_level = loop, max_rate | |
self._rate_per_sec, self._level = max_rate / time_period, 0.0 | |
self._last_check, self._waiters = 0.0, OrderedDict() | |
def _leak(self): | |
if self._level: | |
elapsed = time.time() - self._last_check | |
decrement = elapsed * self._rate_per_sec | |
self._level = max(self._level - decrement, 0) | |
self._last_check = time.time() | |
def has_capacity(self, amount: float = 1) -> bool: | |
self._leak() | |
requested = self._level + amount | |
if requested < self._max_level: | |
for fut in self._waiters.values(): | |
if not fut.done(): | |
fut.set_result(True) | |
break | |
return self._level + amount <= self._max_level | |
async def acquire(self, amount: float = 1): | |
if amount > self._max_level: | |
raise ValueError("Can't acquire more than the bucket capacity") | |
loop = self._loop or asyncio.get_event_loop() | |
task = asyncio.current_task(loop) | |
assert task is not None | |
while not self.has_capacity(amount): | |
fut = loop.create_future() | |
self._waiters[task] = fut | |
try: | |
await asyncio.wait_for(asyncio.shield(fut), 1 / self._rate_per_sec * amount, loop=loop) | |
except asyncio.TimeoutError: | |
pass | |
fut.cancel() | |
self._waiters.pop(task, None) | |
self._level += amount | |
async def __aenter__(self): | |
await self.acquire() | |
return None | |
async def __aexit__(self, exc_type, exc, tb) -> None: | |
return None |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment