Skip to content

Instantly share code, notes, and snippets.

@mark-mishyn
Created February 29, 2024 07:49
Show Gist options
  • Save mark-mishyn/89f8dbd09f24d28f99fc993e9ce577be to your computer and use it in GitHub Desktop.
Save mark-mishyn/89f8dbd09f24d28f99fc993e9ce577be to your computer and use it in GitHub Desktop.
SQS messages processing using long polling, it a very lightweight alternative for Celery
import json
import threading
import boto3
from configurations.config_store import Config
from src.routes.frontapp.tasks import (
add_tags_to_conversation,
reply_to_front_message,
)
from src.utils.aws import get_queue_url_by_name
sqs = boto3.client("sqs", region_name=Config.AWS_REGION)
message_handler_map = {
reply_to_front_message.__name__: reply_to_front_message,
add_tags_to_conversation.__name__: add_tags_to_conversation,
}
def process_and_delete_message(message: dict, queue_url: str):
message_body = json.loads(message["Body"])
function_name = message_body["handler_function"]
payload = message_body["payload"]
# actual processing
message_handler_map[function_name](payload)
sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"])
def long_poll_sqs():
queue_url = get_queue_url_by_name(Config.FRONTAPP_QUEUE, sqs)
while True:
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20,
)
messages = response.get("Messages", [])
for message in messages:
threading.Thread(
target=process_and_delete_message, args=(message, queue_url)
).start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment