Created
March 4, 2020 18:07
-
-
Save JCotton1123/35db668974fcdcd796efddfbe2acea1b to your computer and use it in GitHub Desktop.
CloudTrail Log Decompression Lambda
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"github.com/aws/aws-lambda-go/events" | |
"github.com/aws/aws-lambda-go/lambda" | |
"github.com/aws/aws-sdk-go/aws" | |
"github.com/aws/aws-sdk-go/aws/session" | |
"github.com/aws/aws-sdk-go/service/s3" | |
"github.com/aws/aws-sdk-go/service/s3/s3manager" | |
"github.com/pkg/errors" | |
"compress/gzip" | |
"fmt" | |
"io" | |
"log" | |
"os" | |
"path" | |
"strings" | |
) | |
// Handler receives and digests S3Events | |
func Handler(s3Event events.S3Event) error { | |
awsRegion := os.Getenv("AWS_REGION") | |
if awsRegion == "" { | |
awsRegion = "us-east-1" | |
} | |
sseAlgo := os.Getenv("SERVER_SIDE_ENCRYPTION") | |
if sseAlgo == "" { | |
sseAlgo = "AES256" | |
} | |
delSrcLog := os.Getenv("DELETE_SOURCE_LOG") == "true" | |
delTempFiles := os.Getenv("DELETE_TEMP_FILES") != "false" | |
sess, err := session.NewSession(&aws.Config{Region: aws.String(awsRegion)}) | |
if err != nil { | |
return errors.Wrap(err, "Failed to create AWS session") | |
} | |
svc := s3.New(sess) | |
downloader := s3manager.NewDownloader(sess) | |
uploader := s3manager.NewUploader(sess) | |
processRecord := func(record events.S3EventRecord) error { | |
bucket := record.S3.Bucket.Name | |
srcKey := record.S3.Object.Key | |
srcUrl := "s3://" + bucket + "/" + srcKey | |
if !strings.HasSuffix(srcKey, "json.gz") { | |
return fmt.Errorf("Event filter misconfigured, skipping non-log object: %s\n", srcUrl) | |
} | |
log.Printf("Processing log: %s\n", srcUrl) | |
dstKey := strings.Replace(srcKey, ".gz", "", 1) | |
dstUrl := "s3://" + bucket + "/" + dstKey | |
compFilePath := "/tmp/" + path.Base(srcKey) | |
uncompFilePath := "/tmp/" + path.Base(dstKey) | |
compFile, err := os.Create(compFilePath) | |
if err != nil { | |
return errors.Wrap(err, "Unable to create temporary file") | |
} | |
defer compFile.Close() | |
if delTempFiles { | |
defer os.Remove(compFile.Name()) | |
} | |
_, err = downloader.Download( | |
compFile, | |
&s3.GetObjectInput{ | |
Bucket: &bucket, | |
Key: &srcKey, | |
}, | |
) | |
if err != nil { | |
return errors.Wrap(err, "Failed to download log") | |
} | |
compFile.Seek(0, io.SeekStart) | |
uncompFile, err := os.Create(uncompFilePath) | |
if err != nil { | |
return errors.Wrap(err, "Failed to create temporary file") | |
} | |
defer uncompFile.Close() | |
if delTempFiles { | |
defer os.Remove(uncompFile.Name()) | |
} | |
gzipReader, err := gzip.NewReader(compFile) | |
if err != nil { | |
return errors.Wrap(err, "Failed to create gzip reader") | |
} | |
defer gzipReader.Close() | |
_, err = io.Copy(uncompFile, gzipReader) | |
if err != nil { | |
return errors.Wrap(err, "Failed to decompress log") | |
} | |
uncompFile.Seek(0, io.SeekStart) | |
_, err = uploader.Upload( | |
&s3manager.UploadInput{ | |
Bucket: &bucket, | |
Key: &dstKey, | |
Body: uncompFile, | |
ServerSideEncryption: &sseAlgo, | |
}, | |
) | |
if err != nil { | |
return errors.Wrap(err, "Failed to upload log") | |
} | |
if delSrcLog { | |
log.Printf("Deleting source log file: %s\n", srcUrl) | |
_, err = svc.DeleteObject( | |
&s3.DeleteObjectInput{ | |
Bucket: &bucket, | |
Key: &srcKey, | |
}, | |
) | |
if err != nil { | |
return errors.Wrap(err, "Failed to delete original log") | |
} | |
} | |
log.Printf("Successfully uploaded decompressed log: %s\n", dstUrl) | |
return nil | |
} | |
errs := []error{} | |
for _, record := range s3Event.Records { | |
err := processRecord(record) | |
if err != nil { | |
errs = append(errs, err) | |
log.Printf("Error: %s", err) | |
} | |
} | |
if len(errs) > 0 { | |
return fmt.Errorf("Encountered %v errors during this execution", len(errs)) | |
} | |
return nil | |
} | |
func main() { | |
lambda.Start(Handler) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"github.com/aws/aws-lambda-go/events" | |
"encoding/json" | |
"os" | |
"strings" | |
"testing" | |
) | |
const S3_EVENT = ` | |
{ | |
"Records": [ | |
{ | |
"eventVersion": "2.0", | |
"eventSource": "aws:s3", | |
"awsRegion": "us-east-1", | |
"eventTime": "1970-01-01T00:00:00.000Z", | |
"eventName": "ObjectCreated:Put", | |
"userIdentity": { | |
"principalId": "EXAMPLE" | |
}, | |
"requestParameters": { | |
"sourceIPAddress": "127.0.0.1" | |
}, | |
"responseElements": { | |
"x-amz-request-id": "EXAMPLE123456789", | |
"x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH" | |
}, | |
"s3": { | |
"s3SchemaVersion": "1.0", | |
"configurationId": "testConfigRule", | |
"bucket": { | |
"name": "{{bucket}}", | |
"ownerIdentity": { | |
"principalId": "EXAMPLE" | |
}, | |
"arn": "arn:aws:s3:::{{bucket}}" | |
}, | |
"object": { | |
"key": "{{object}}", | |
"size": 1024, | |
"eTag": "0123456789abcdef0123456789abcdef", | |
"sequencer": "0A1B2C3D4E5F678901" | |
} | |
} | |
} | |
] | |
} | |
` | |
func Test(t *testing.T) { | |
testBucket := os.Getenv("TEST_BUCKET") | |
if testBucket == "" { | |
t.Fatalf("TEST_BUCKET environment variable is not defined") | |
} | |
testLog := os.Getenv("TEST_LOG") | |
if testLog == "" { | |
t.Fatalf("TEST_LOG environment variable is not defined") | |
} | |
testEventJSON := S3_EVENT | |
testEventJSON = strings.ReplaceAll(testEventJSON, "{{bucket}}", testBucket) | |
testEventJSON = strings.ReplaceAll(testEventJSON, "{{object}}", testLog) | |
testEvent := events.S3Event{} | |
err := json.Unmarshal([]byte(testEventJSON), &testEvent) | |
if err != nil { | |
t.Fatalf("Failed to encode event: %s", err) | |
} | |
err = Handler(testEvent) | |
if err != nil { | |
t.Fatalf("Unexpected error: %s", err) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment