Created
January 17, 2020 16:34
-
-
Save yardbirdsax/b9d181e7d262c8edb72a9d328a8cfe7f to your computer and use it in GitHub Desktop.
Set S3 Access Logging Across Multiple S3 Buckets
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import boto3 | |
import botocore | |
import argparse | |
from pprint import pprint | |
import re | |
import logging | |
parser = argparse.ArgumentParser(description="Sets S3 Access Logging configuration on all buckets within an account matching a certain pattern.") | |
parser.add_argument('--target-bucket',type=str,help="The name of the bucket to use for the logging target.",required=True) | |
parser.add_argument('--pattern',type=str,help="The (regex) pattern used to match against bucket names.",default=".*") | |
parser.add_argument('--target-prefix',type=str,help="The prefix to use when generating the log files.",default="/") | |
parser.add_argument('--not-for-real',action="store_true") | |
parser.add_argument('--exclude-name',action='append',dest='exclude_names',type=str,help="Exclude buckets with this name. Can be specified multiple times.") | |
def get_bucket_region(Bucket:str, s3_client:boto3.client): | |
return s3_client.head_bucket(Bucket=Bucket)['ResponseMetadata']['HTTPHeaders']['x-amz-bucket-region'] | |
logging.basicConfig(level=logging.INFO,format="%(levelname)-12s: %(message)s") | |
logger = logging.getLogger(__name__) | |
# This is so we don't print out some needless Boto messages | |
logging.getLogger("botocore").setLevel(logging.WARN) | |
try: | |
args = parser.parse_args() | |
s3_client = boto3.client('s3') | |
s3_resource = boto3.resource('s3') | |
buckets = s3_client.list_buckets() | |
target_bucket = boto3.resource('s3').Bucket(args.target_bucket) | |
target_bucket_region = get_bucket_region(args.target_bucket,s3_client) | |
for bucket in buckets['Buckets']: | |
if args.target_bucket == bucket['Name']: | |
logger.warning(f"Bucket {bucket['Name']} is also the target bucket, skipping.") | |
continue | |
if re.match(pattern=args.pattern,string=bucket['Name']): | |
bucket_logging = s3_resource.BucketLogging(bucket['Name']) | |
bucket_logging.load() | |
if bucket_logging.logging_enabled == None: | |
logger.info(f"Bucket {bucket['Name']} does not have logging enabled. Setting to target bucket.") | |
exclude_bucket = False | |
if args.exclude_names != None: | |
for exclude_name in args.exclude_names: | |
if exclude_name == bucket['Name']: | |
logger.info(f"Bucket {bucket['Name']} is included in the list of excluded buckets and will be skipped.") | |
exclude_bucket = True | |
if exclude_bucket: | |
continue | |
bucket_region = get_bucket_region(bucket['Name'],s3_client) | |
if bucket_region != target_bucket_region: | |
logger.warning(f"Bucket {bucket['Name']} ({bucket_region}) is not in the same region as target bucket {target_bucket.name} ({target_bucket_region}). Skipping.") | |
continue | |
bucket_logging_status = { | |
"LoggingEnabled": { | |
"TargetBucket": args.target_bucket, | |
"TargetPrefix": args.target_prefix | |
} | |
} | |
if args.not_for_real: | |
logger.info(f"Would have set bucket logging status to: {bucket_logging_status}.") | |
else: | |
bucket_logging.put(BucketLoggingStatus=bucket_logging_status) | |
except botocore.exceptions.ClientError as err: | |
logger.error(f"Boto3 client error occurred: {err}") | |
except: | |
raise |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment