Created
March 31, 2023 03:59
-
-
Save froop/1a24a0d3953020e774f8d4b43dace2bf to your computer and use it in GitHub Desktop.
[Python][AWS] FilterLogEvents of CloudWatch Logs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
import boto3 | |
import time | |
import logging | |
# Constants | |
LOG_GROUP_NAME = '/aws/lambda/test-filter-log-events' # Log group name of CloudWatch Logs | |
NUM_CHARS_TO_PRINT = 10 # Number of characters to print from log message | |
DELAY_OF_START_MS = 5000 # Initial delay before starting the log fetching process | |
MAX_RETRY_SLEEP_SECS = 8 # Maximum sleep time in seconds between retries | |
# Logging configuration | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') | |
logging.getLogger('boto3').setLevel(logging.WARNING) | |
logging.getLogger('botocore').setLevel(logging.WARNING) | |
logging.getLogger('urllib3').setLevel(logging.WARNING) | |
# Function to print formatted log event | |
def print_event(event): | |
message = event['message'] | |
truncated_message = message[:NUM_CHARS_TO_PRINT] | |
num_chars_truncated = len(message) - len(truncated_message) | |
if num_chars_truncated > 0: | |
truncated_message = "%s... %d more" % (truncated_message, num_chars_truncated) | |
print("%s %s" % (event['timestamp'], truncated_message)) | |
# Function to process the log events from the response | |
def process_events(response, latest_event_info): | |
response_size = 0 | |
for event in response['events']: | |
if latest_event_info['timestamp'] != event['timestamp']: | |
latest_event_info['timestamp'] = event['timestamp'] | |
latest_event_info['events'] = set() | |
if event['eventId'] not in latest_event_info['events']: | |
print_event(event) | |
latest_event_info['events'].add(event['eventId']) | |
response_size += len(event['message']) | |
return latest_event_info, response_size | |
# Function to update the request parameters for the next request | |
def update_request_params(response, latest_event_info, response_size, request_params): | |
if 'nextToken' not in response: | |
# if 'nextToken' not in response or response_size == 0: | |
request_params['startTime'] = latest_event_info['timestamp'] | |
if 'nextToken' in request_params: | |
del request_params['nextToken'] | |
else: | |
request_params['nextToken'] = response['nextToken'] | |
return request_params | |
# Function to handle retries in case of no log events | |
def handle_retry(response_size, retry_count): | |
if response_size > 0: | |
return 0 | |
sleep_secs = min(2**retry_count, MAX_RETRY_SLEEP_SECS) | |
logging.debug('sleep: retry_count=%d, sleep_secs=%d' % (retry_count, sleep_secs)) | |
time.sleep(sleep_secs) | |
return retry_count + 1 | |
# Function to truncate nextToken for logging output | |
def truncate_token(nextToken): | |
if nextToken is None: | |
return '' | |
return "%s...%s" % (nextToken[:4], nextToken[-4:]) | |
############### | |
# Main function | |
############### | |
client = boto3.client('logs') | |
request_params = { | |
'logGroupName': LOG_GROUP_NAME, | |
'startTime': int(time.time()) * 1000 - DELAY_OF_START_MS | |
} | |
latest_event_info = { | |
'timestamp': request_params['startTime'], | |
'events': set() | |
} | |
retry_count = 0 | |
# Main loop to continuously fetch log events | |
while True: | |
# Request to CloudWatch Logs API | |
logging.info('request: startTime=%s nextToken=%s' % | |
(request_params.get('startTime'), truncate_token(request_params.get('nextToken')))) | |
response = client.filter_log_events(**request_params) | |
latest_event_info, response_size = process_events(response, latest_event_info) | |
logging.info('response: latest=%d, size=%d, nextToken=%s' % | |
(latest_event_info.get('timestamp'), response_size, truncate_token(response.get('nextToken')))) | |
# Prepare for the next request | |
request_params = update_request_params(response, latest_event_info, response_size, request_params) | |
retry_count = handle_retry(response_size, retry_count) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment