Skip to content

Instantly share code, notes, and snippets.

@renierdbruyn
Forked from turtlemonvh/tasks.py
Created April 3, 2020 11:05
Show Gist options
  • Save renierdbruyn/0190908ee2b97e5c012ad383d9a7288a to your computer and use it in GitHub Desktop.
Save renierdbruyn/0190908ee2b97e5c012ad383d9a7288a to your computer and use it in GitHub Desktop.
Dynamically add celery tasks
## THIS IS UNTESTED
from worker.models import TaskType
from website.celery import app
import importlib
# Dynamically add registered tasks
# Celery._tasks is a task registry object
# https://github.com/celery/celery/blob/34c43244b1681a59540936748800aaa504786a35/celery/app/base.py#L162 - _tasks
# https://github.com/celery/celery/blob/524421a36dcc838a5f51e5cf122902aab774bad1/celery/app/registry.py#L22 - task registry object
# Autodiscover calls AppLoader which is a thin shell around BaseLoader
# https://github.com/celery/celery/blob/34c43244b1681a59540936748800aaa504786a35/celery/app/base.py#L341 - autodisover
# https://github.com/celery/celery/blob/34c43244b1681a59540936748800aaa504786a35/celery/loaders/base.py - base loader
# task function calls _task_from_fun, which adds tasks to registry
# https://github.com/celery/celery/blob/34c43244b1681a59540936748800aaa504786a35/celery/app/base.py#L227 - task; creates a new task
# https://github.com/celery/celery/blob/34c43244b1681a59540936748800aaa504786a35/celery/app/base.py#L269 - _task_from_fun
# task function is called as a decorator on a task function
# TL;DR
# - call app.task directly on functions you want to add
def add_tasks():
"""Grab task definitions out of database and add them to celery's registry"""
for task in TaskType.objects.all():
# Grab the reference to the function
# get function from call_string
# call string should be in the form: "module1.module2.module3.task_class"
module_path = task.call_string.split(".")
module = importlib.import_module(".%s" %(module_path[-2]),
package=".".join(module_path[0:-2]))
task_class = getattr(module, module_path[-1])
# The execute method is the callable
# Call the 'task' method to register
app.task(getattr(task_class, 'execute'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment