Skip to content

Instantly share code, notes, and snippets.

@victorusachev
Created March 12, 2024 19:58
Show Gist options
  • Save victorusachev/556d95338179f6c40d97dc0560524382 to your computer and use it in GitHub Desktop.
Save victorusachev/556d95338179f6c40d97dc0560524382 to your computer and use it in GitHub Desktop.
apscheduler.BlockingScheduler with retry
import functools
from typing import Callable, Any
from loguru import logger
from integration.utils import retry
def run_something(f: Callable, *args: Any, **kwargs: Any) -> None:
try:
func = functools.partial(f, *args, **kwargs)
retry_kwargs = {
'tries': 5,
'delay': 1,
'max_delay': 60,
'backoff': 2,
}
retry(func, exceptions=Exception, logger=logger, **retry_kwargs)
except Exception as exc:
logger.error(f'An error has occurred: {exc}')
def run_one() -> None:
run_something(do_work, job_name='ONE')
def run_two() -> None:
run_something(do_work, job_name='TWO')
def do_work(job_name: str) -> None:
import random
import time
import uuid
task_id = uuid.uuid4()
logger.info('{} - {}: start task ...', job_name, task_id)
seconds = random.choice([10, 15, 30, 65])
has_error = random.choice([False, False, True])
logger.debug(f'{{}} - {{}}: {seconds=}, {has_error=}', job_name, task_id)
time.sleep(seconds)
if has_error:
raise Exception(f'An error occurred while executing the task {job_name} with ID {task_id}')
logger.info('{} - {}: completed in {}', job_name, task_id, seconds)
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from loguru import logger
TASKS_CONFIGURATION = [
{
'func': 'application.jobs:job_one',
'cron_expression': '*/1 * * * *',
'max_instances': 1,
},
{
'func': 'application.jobs:job_two',
'cron_expression': '*/1 * * * *',
'max_instances': 1,
},
]
def main() -> None:
tasks = TASKS_CONFIGURATION.copy()
scheduler = BlockingScheduler(job_defaults={'max_instances': 6})
for task in tasks:
func = task.pop('func')
trigger = CronTrigger.from_crontab(task.pop('cron_expression'))
kwargs = {**task}
scheduler.add_job(func=func, trigger=trigger, **kwargs)
scheduler.start()
if __name__ == '__main__':
main()
import logging
import random
import time
from typing import Callable, Type
def retry(
func: Callable[[], None],
exceptions: Type[Exception] | tuple[Type[Exception], ...],
tries: int = -1,
delay: int = 0,
max_delay: int | None = None,
backoff: int = 1,
jitter: int = 0,
logger: logging.Logger | None = None,
):
"""
Executes a function and retries it if it failed.
:param func: the function to execute.
:param exceptions: an exception or a tuple of exceptions to catch. default: Exception.
:param tries: the maximum number of attempts. default: -1 (infinite).
:param delay: initial delay between attempts. default: 0.
:param max_delay: the maximum value of delay. default: None (no limit).
:param backoff: multiplier applied to delay between attempts. default: 1 (no backoff).
:param jitter: extra seconds added to delay between attempts. default: 0.
fixed if a number, random if a range tuple (min, max)
:param logger: logger.warning(fmt, error, delay) will be called on failed attempts.
default: retry.logging_logger. if None, logging is disabled.
:returns: the result of the f function.
"""
while tries:
try:
return func()
except exceptions as e:
tries -= 1
if not tries:
raise
if logger is not None:
logger.warning(f'{e}, retrying in {delay} seconds...')
time.sleep(delay)
delay *= backoff
if isinstance(jitter, tuple):
delay += random.uniform(*jitter)
else:
delay += jitter
if max_delay is not None:
delay = min(delay, max_delay)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment