Last active
September 20, 2021 23:40
-
-
Save chicagobuss/68c18e622e73faba435dc0e5b9ea83a3 to your computer and use it in GitHub Desktop.
lambda-to-kafka
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# First I created a working directory | |
~$ mkdir s3-kafka | |
# Then I installed the dependency I needed in that directory with pip | |
~$ cd s3-kafka | |
~$ pip install kafka-python -t $(pwd) | |
# Then I put my code into a file called s3-kafka.py | |
~$ vi s3-kafka.py | |
# Then I zipped up the entire directory into a zip file in my homedir (with a 'version' number for convenience) | |
~$ zip -r9 ~/s3-kafka-0.0.1.zip * | |
# Then I uploaded the zip to s3 | |
~$ cd ~ | |
~$ aws s3 cp s3-kafka-0.0.1.zip s3://examplebucket/ | |
# Now when I created the lambda function, I could just specify the s3 location of the deployment package. | |
# The key to making it all work is the "Handler" field of the lambda function definition: | |
# | |
# "Handler": "s3-kafka.handler" | |
# | |
# You need your filename to match the part before the . (my file was called s3-kafka.py so it needed to be s3-kafka) | |
# ...and you need your function to match the part after the . (my function that handles the code execution is called 'handler' as you can see in the code above) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import json | |
from kafka import KafkaProducer | |
s3_client = boto3.client('s3') | |
producer = KafkaProducer(bootstrap_servers='kafka1:9092,kafka2:9092,kafka3:9092') | |
def handler(event, context): | |
for record in event['Records']: | |
bucket = record['s3']['bucket']['name'] | |
key = record['s3']['object']['key'] | |
record['s3_full_path'] = "s3://%s/%s" % (bucket, key) | |
producer.send('new-objects', json.dumps(record)) | |
producer.flush() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment