Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save hermes-pimentel/e10cef2fc4e0af54c8dc01367bcbd99f to your computer and use it in GitHub Desktop.
Save hermes-pimentel/e10cef2fc4e0af54c8dc01367bcbd99f to your computer and use it in GitHub Desktop.
msk-lambda-firehose-transform-logs-to-opensearch.py
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import re
import json
import base64
from dateutil import parser
from functools import partial
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def parse_log_line(rawlog, log_lines):
# Compile the regular expression
pattern = re.compile(r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3})\] (\w+) (.*)')
cls_pattern = re.compile(r"/([^/]+)/")
# Gets cluster name from cluster_arn
cls_name = cls_pattern.search(log_lines['cluster_arn'])
cluster_name = cls_name.group(1) if cls_name else log_lines['cluster_arn']
# Search the log line
match = pattern.match(rawlog)
if match:
# Assign the groups to variables
timestamp, log_level, message = match.groups()
#Convert timestamp to iso format
date_time = parser.parse(timestamp)
timestamp = date_time.isoformat()
# Return a dictionary with the parsed log line
return {
'@timestamp': timestamp,
'account_id' : log_lines['account_id'],
'cluster_arn': log_lines['cluster_arn'],
'cluster_name' : cluster_name,
'broker_id': log_lines['broker_id'],
'log_level': log_level,
'message': message
}
else:
return None
def transform_record(record):
logger.info("Starting transform_record function")
# Get the log lines from the record
log_lines = record['data']
# Decode the log lines from base64
log_lines = base64.b64decode(log_lines).decode()
# Load the log lines as a JSON document
log_lines = json.loads(log_lines)
# Parse the log lines
parsed_log_lines = parse_log_line(log_lines['raw_log'], log_lines)
if parsed_log_lines is None:
logger.warning("Finished transform_record function - Some transformations fail")
return {
'recordId': record['recordId'],
'result': 'ProcessingFailed',
'data': record['data']
}
else:
logger.info("Finished transform_record function - Success")
return {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(json.dumps(parsed_log_lines).encode())
}
def lambda_handler(event, context):
logger.info("Starting Lambda Function")
# Iterate over the records in the event
records = event['records']
# Apply the transform function to each record
try:
transformed_records = map(transform_record, records)
# Return the parsed log lines as a JSON document
return {'records': list(transformed_records)}
except Exception as e:
logger.exception("Lambda Failed to execute:")
raise e
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment