Last active
May 2, 2024 13:46
-
-
Save benc-uk/40fe0185dea7a14730e358cee46b9efb to your computer and use it in GitHub Desktop.
Simple FastAPI web app to subscribe & receive from an SNS Topic
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from fastapi import FastAPI, Request | |
from dotenv import load_dotenv | |
import uvicorn | |
import requests | |
import os | |
load_dotenv() | |
app = FastAPI() | |
# The SNS_TOPIC_ARN env var must be set | |
allowed_topics = [ | |
os.environ['SNS_TOPIC_ARN'] | |
] | |
# Main endpoint to receive SNS messages | |
@app.post("/sns/subscription") | |
async def index(request: Request): | |
if 'x-amz-sns-message-type' not in request.headers: | |
print('A regular HTTP request, will be ignored!') | |
return {"message": "Request filtered out", "status": 403} | |
body = await request.json() | |
message_type = request.headers.get('x-amz-sns-message-type', '') | |
topic_arn = body["TopicArn"] | |
if topic_arn not in allowed_topics: | |
return {"message": "Topic not allowed", "status": 403} | |
# More checks can be added here, like checking the signature of the message | |
# Anyhow, this is just a simple example | |
if message_type == 'SubscriptionConfirmation': | |
print('Subscription confirmation in progress...') | |
subscribe_url = body['SubscribeURL'] | |
# Confirm the subscription by sending a GET request to the SubscribeURL | |
response = requests.get(subscribe_url) | |
if response.status_code == 200: | |
return {"message": "Successfully subscribed to the topic", "status": 200} | |
else: | |
return {"message": "Failed to subscribe to the topic", "status": 500} | |
sns_message = body.get('Message', '') | |
print(f'SNS message received, length: {len(sns_message)} bytes') | |
# ============================================ | |
# Your message processing logic goes here! | |
# ============================================ | |
return {"message": "Message received", "status": 200} | |
# Health check endpoint for Front Door and other load balancing probes | |
@app.get("/healthz") | |
async def index(request: Request): | |
return {"status": "OK"} | |
if __name__ == '__main__': | |
uvicorn.run('main:app', host='0.0.0.0', port=8000) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment