Created
April 5, 2023 17:28
-
-
Save Vaansh/f996220be3ee83fd29857fd50311e7bf to your computer and use it in GitHub Desktop.
Many to one example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import annotations | |
import logging | |
import time | |
from abc import ABC, abstractmethod | |
from enum import Enum | |
from multiprocessing import Process | |
import pika | |
logging.basicConfig(level=logging.INFO) | |
class Platform(Enum): | |
SUB = 1 | |
PUBA = 2 | |
PUBB = 3 | |
class Publisher(ABC): | |
@abstractmethod | |
def publish(self): | |
pass | |
class Subscriber(ABC): | |
@abstractmethod | |
def subscribe(self, msg): | |
pass | |
class ConcretePublisherA(Publisher): | |
def __init__(self, identifier, mq): | |
self.identifier = identifier | |
self.mq = mq | |
def publish(self): | |
connection = pika.BlockingConnection( | |
pika.ConnectionParameters(host="localhost") | |
) | |
channel = connection.channel() | |
body = "A {} Published".format(self.identifier) | |
for _ in range(5): | |
time.sleep(2) | |
channel.basic_publish(exchange="", routing_key=self.mq, body=body) | |
logging.info("{} ->".format(body)) | |
logging.info("A Disconnected.") | |
connection.close() | |
class ConcretePublisherB(Publisher): | |
def __init__(self, identifier, mq): | |
self.identifier = identifier | |
self.mq = mq | |
def publish(self): | |
connection = pika.BlockingConnection( | |
pika.ConnectionParameters(host="localhost") | |
) | |
channel = connection.channel() | |
body = "B {} Published".format(self.identifier) | |
for _ in range(3): | |
time.sleep(3) | |
channel.basic_publish(exchange="", routing_key=self.mq, body=body) | |
logging.info("{} ->".format(body)) | |
logging.info("B Disconnected.") | |
connection.close() | |
class ConcreteSubscriber(Subscriber): | |
def __init__(self, identifier): | |
self.id = identifier | |
def subscribe(self, mq): | |
connection = pika.BlockingConnection( | |
pika.ConnectionParameters(host="localhost") | |
) | |
channel = connection.channel() | |
channel.queue_declare(queue=mq) | |
def process(ch, method, properties, body): | |
logging.info("Subscriber notified <- " % body) | |
channel.basic_consume(queue=mq, on_message_callback=process, auto_ack=True) | |
logging.info("Subscriber Added. Listening on MQ ({}).".format(mq)) | |
channel.start_consuming() | |
class Config: | |
def __init__(self): | |
self.mq = None | |
self.sub = None | |
self.pubs = [] | |
self.sub_platform = None | |
def run(self): | |
if self.sub == None or len(self.pubs) == 0: | |
logging.error("Sub or Pub(s) not registered or configured incorrectly") | |
return | |
for pub in self.pubs: | |
process = Process(target=pub.publish) | |
process.start() | |
def register_subscriber(self, platform, identifier): | |
self.mq = identifier + platform.name | |
self.sub_platform = platform | |
match platform: | |
case Platform.SUB: | |
sub = ConcreteSubscriber(identifier=identifier) | |
case _: | |
sub = None | |
self.sub = sub | |
process = Process(target=self.sub.subscribe, args=(self.mq,)) | |
process.start() | |
def register_publisher(self, platform, identifier): | |
if not (self.sub and self.sub_platform and self.mq): | |
logging.error("Sub not registered or configured incorrectly") | |
return | |
match platform: | |
case Platform.PUBA: | |
pub = ConcretePublisherA(identifier=identifier, mq=self.mq) | |
case Platform.PUBB: | |
pub = ConcretePublisherB(identifier=identifier, mq=self.mq) | |
case _: | |
pub = None | |
self.pubs.append(pub) | |
if __name__ == "__main__": | |
cfg = Config() | |
cfg.register_subscriber(Platform.SUB, "abc") | |
cfg.register_publisher(Platform.PUBA, "123") | |
cfg.register_publisher(Platform.PUBB, "456") | |
cfg.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment