Last active
January 20, 2019 20:16
-
-
Save artemrys/4e2984b0266d99f45200f3d92790c3a0 to your computer and use it in GitHub Desktop.
Celery External Call using RabbitMQ
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from celery import Celery | |
from celery import bootsteps | |
from kombu import Consumer, Exchange, Queue | |
queue = Queue("input.queue", Exchange("default"), "input.key") | |
app = Celery(broker="amqp://") | |
# Decalring the general input message handler | |
class InputMessageHandler(object): | |
def handle(self, body): | |
body_json = json.loads(body) | |
_type = body_json["type"] | |
if _type == "ETL": | |
ETLMessageHandler().handle(body_json) | |
# Declaring the ETL message handler | |
class ETLMessageHandler(object): | |
def handle(self, body): | |
print("Working on ETL for message: {0}".format(body)) | |
# Calling out your Celery tasks here | |
# Declaring the bootstep for our purposes | |
class InputMessageConsumerStep(bootsteps.ConsumerStep): | |
def get_consumers(self, channel): | |
return [Consumer(channel, | |
queues=[queue], | |
callbacks=[self.handle_message], | |
accept=["json"])] | |
def handle_message(self, body, message): | |
InputMessageHandler().handle(body) | |
message.ack() | |
app.steps["consumer"].add(InputMessageConsumerStep) | |
if __name__ == "__main__": | |
app.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment