Last active
December 19, 2017 02:15
-
-
Save oozliuoo/97edef57b6987b94fe5c10800960a54c to your computer and use it in GitHub Desktop.
CeleryPipelineDemo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"BROKER": "redis://localhost:6379/0", | |
"BACKEND": "redis://localhost:6379/1", | |
"TASK_CREATE_MISSING_QUEUE": true | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import json | |
import time | |
from tasks import add, minus, mult | |
from kombu import Queue | |
from celery import Celery, chain | |
CONFIG = json.load(open('./config.json')) | |
BROKER = CONFIG["BROKER"] | |
BACKEND = CONFIG["BACKEND"] | |
TASK_CREATE_MISSING_QUEUE = CONFIG["TASK_CREATE_MISSING_QUEUE"] | |
WORKERS = { | |
"WORKER1": "worker1@localhost", | |
"WORKER2": "worker2@localhost", | |
"WORKER3": "worker3@localhost", | |
} | |
HASHES = { | |
"HASH1": "feature_hash1", | |
"HASH2": "feature_hash2", | |
"HASH3": "feature_hash3" | |
} | |
class CeleryApp: | |
@staticmethod | |
def connect_workers(): | |
'''Setup celery object and connect to broker. | |
Returns: | |
Celery instance | |
''' | |
return Celery( | |
'worker.tasks', | |
broker=BROKER, | |
backend=BACKEND, | |
task_create_missing_queues=TASK_CREATE_MISSING_QUEUE, | |
task_queues=() | |
) | |
class MainApp: | |
def __init__(self, celery_app): | |
''' | |
this class acccept a celery_app object as a decorate pattern, | |
which will be mainly used to handle jobs | |
@param {CeleryApp} celery_app - the CeleryApp object decorating this class | |
''' | |
self._celery_app = celery_app | |
def do_job1(self, feature_hash): | |
''' | |
function that mocks doing a specific job, it will pick | |
up a worker based on some policy, here just hardcoding | |
the policy | |
this job is doing (2 + 2) * 3 - 6 == 6, in the first queue via WORKER1 | |
@param {string} feature_hash - the feature hash that representing a series of tasks | |
''' | |
self._configure_routing(feature_hash, WORKERS["WORKER1"]) | |
# print("job1: %s" % self._celery_app.control.inspect().active()) | |
job = chain( | |
add.s(2, 2).set(queue=feature_hash, routing_key=feature_hash), | |
mult.s(3).set(queue=feature_hash, routing_key=feature_hash), | |
minus.s(6).set(queue=feature_hash, routing_key=feature_hash) | |
) | |
job.delay() | |
def do_job2(self, feature_hash): | |
''' | |
function that mocks doing a specific job, it will pick | |
up a worker based on some policy, here just hardcoding | |
the policy | |
this job is doing 2 * 5 * 3 - 10 + 18 == 38, in the second queue via WORKER2 | |
@param {string} feature_hash - the feature hash that representing a series of tasks | |
''' | |
self._configure_routing(feature_hash, WORKERS["WORKER1"]) | |
# print("job2: %s" % self._celery_app.control.inspect().active()) | |
job = chain( | |
mult.s(2, 5).set(queue=feature_hash, routing_key=feature_hash), | |
mult.s(3).set(queue=feature_hash, routing_key=feature_hash), | |
minus.s(10).set(queue=feature_hash, routing_key=feature_hash), | |
add.s(18).set(queue=feature_hash, routing_key=feature_hash), | |
) | |
job.delay() | |
def do_job3(self, feature_hash): | |
''' | |
function that mocks doing a specific job, it will pick | |
up a worker based on some policy, here just hardcoding | |
the policy | |
this job is doing (2 * 4 - 10) * 7 == -14, in the third queue via WORKER3 | |
@param {string} feature_hash - the feature hash that representing a series of tasks | |
''' | |
self._configure_routing(feature_hash, WORKERS["WORKER3"]) | |
# print("job3: %s" % self._celery_app.control.inspect().active()) | |
job = chain( | |
mult.s(2, 4).set(queue=feature_hash, routing_key=feature_hash), # cpu 10 mins | |
) | |
job.delay() | |
def do_job4(self, feature_hash): | |
''' | |
function that mocks doing a specific job, it will pick | |
up a worker based on some policy, here just hardcoding | |
the policy | |
this job is doing (2 * 4 - 10) * 7 == -14, but in the first queue via WORKER2 | |
@param {string} feature_hash - the feature hash that representing a series of tasks | |
''' | |
self._configure_routing(feature_hash, WORKERS["WORKER2"]) | |
# print("job4: %s" % self._celery_app.control.inspect().active()) | |
job = chain( | |
mult.s(2, 4).set(queue=feature_hash, routing_key=feature_hash), | |
minus.s(10).set(queue=feature_hash, routing_key=feature_hash), | |
mult.s(7).set(queue=feature_hash, routing_key=feature_hash) | |
) | |
job.delay() | |
def _configure_routing(self, feature_hash, worker): | |
''' | |
Configures routing at runtime, basically setting up new queues | |
and assign a worker to that queue | |
@param {string} feature_hash - the feature hash that representing a series of tasks | |
@param {string} worker - name (host) of worker that will be consuming the queue created | |
''' | |
if self._celery_app.conf.task_queues is None: | |
self._celery_app.conf.task_queues = ( | |
Queue(feature_hash, routing_key=feature_hash), | |
) | |
else: | |
self._celery_app.conf.task_queues += ( | |
Queue(feature_hash, routing_key=feature_hash), | |
) | |
self._celery_app.control.add_consumer( | |
feature_hash, | |
reply=True, | |
routing_key=feature_hash, | |
destination=[worker] | |
) | |
celery_app = CeleryApp.connect_workers() | |
main_app = MainApp(celery_app) | |
main_app.do_job1(HASHES["HASH1"]) | |
main_app.do_job2(HASHES["HASH1"]) | |
# main_app.do_job3(HASHES["HASH3"]) | |
# main_app.do_job4(HASHES["HASH1"]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import redis | |
from redlock import RedLock | |
from celery import Celery | |
from celery import task | |
app = Celery('worker.tasks', backend='redis://localhost:6379/1', broker='redis://localhost:6379/0') | |
ADD_LOCK_KEY = "add_lock" | |
MULT_LOCK_KEY = "mult_lock" | |
MINUS_LOCK_KEY = "minus_lock" | |
ADD_REDIS_KEY_SUFFIX = ".tasks.add" | |
MULT_REDIS_KEY_SUFFIX = ".tasks.mult" | |
MINUS_REDIS_KEY_SUFFIX = ".tasks.minus" | |
@app.task(bind=True) | |
def add(self, x, y): | |
""" | |
Executing add job with redlock | |
Decorators: | |
app | |
Arguments: | |
x {int} -- First param to be added | |
y {int} -- Second param to be added | |
Returns: | |
int -- sum of the two params | |
""" | |
current_worker_hostname = self.request.hostname | |
tasks_in_current_worker = app.control.inspect().active()[current_worker_hostname] | |
return execute_task_with_lock(_add, "add", ADD_LOCK_KEY, x, y) | |
def _add(x, y): | |
"""The acttual function doing add | |
Sleep for 5 secs, and then add up the two params | |
Arguments: | |
x {int} -- First param to be added | |
y {int} -- Second param to be added | |
Returns: | |
int -- sum of the two params | |
""" | |
time.sleep(5) | |
return x + y | |
@app.task(bind=True) | |
def mult(self, x, y): | |
""" | |
Executing mult job with redlock | |
Decorators: | |
app | |
Arguments: | |
x {int} -- First param to be multed | |
y {int} -- Second param to be multed | |
Returns: | |
int -- mult of the two params | |
""" | |
current_worker_hostname = self.request.hostname | |
tasks_in_current_worker = app.control.inspect().active()[current_worker_hostname] | |
return execute_task_with_lock(_mult, "mult", MULT_LOCK_KEY, x, y) | |
def _mult(x, y): | |
"""The acttual function doing mult | |
Sleep for 10 secs, and then multiply the two params | |
Arguments: | |
x {int} -- First param to be multed | |
y {int} -- Second param to be multed | |
Returns: | |
int -- mult of the two params | |
""" | |
time.sleep(10) | |
return x * y | |
@app.task(bind=True) | |
def minus(self, x, y): | |
""" | |
Executing minus job with redlock | |
Decorators: | |
app | |
Arguments: | |
x {int} -- First param to be minused from | |
y {int} -- Second param to be used to minus | |
Returns: | |
int -- diff of the two params | |
""" | |
current_worker_hostname = self.request.hostname | |
tasks_in_current_worker = app.control.inspect().active()[current_worker_hostname] | |
return execute_task_with_lock(_minus, "minus", MINUS_LOCK_KEY, x, y) | |
def _minus(x, y): | |
"""The acttual function doing minus | |
Sleep for 15 secs, and then minus the two params | |
Arguments: | |
x {int} -- First param to be minus from | |
y {int} -- Second param to be used to minus | |
Returns: | |
int -- difference of the two params | |
""" | |
time.sleep(15) | |
return x - y | |
def execute_task_with_lock(proc, task_name, lock_key, x, y): | |
"""Executing tasks with lock | |
Executing the task, but only one at a time, meaning | |
if there are the same tasks being processed, the incoming | |
one will be stalled | |
Arguments: | |
proc {Function} -- Actual function performing the task | |
task_name {string} -- Name of the task | |
lock_key {string} -- Lock key for the task | |
x {int} -- First argument of the task | |
y {int} -- Second argument of the task | |
Returns: | |
int -- result of the task | |
""" | |
result = -1 | |
has_no_lock = True | |
while has_no_lock: | |
try: | |
with RedLock(lock_key): | |
result = proc(x, y) | |
has_no_lock = False | |
except: | |
print("waiting for the previous %s to complete, wait for 1 sec" % task_name) | |
time.sleep(1) | |
has_no_lock = True | |
return result |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment