Created
September 12, 2024 04:58
-
-
Save denarced/e56af99bdb4f75b82dd958d007574d0b to your computer and use it in GitHub Desktop.
Fetch all access logs from S3 bucket
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Package spala parse S3 access logs. | |
package spala | |
import ( | |
"bufio" | |
"bytes" | |
"context" | |
"fmt" | |
"io" | |
"os" | |
"strings" | |
"sync" | |
"github.com/aws/aws-sdk-go-v2/config" | |
"github.com/aws/aws-sdk-go-v2/service/s3" | |
"github.com/denarced/s3-photo-access-log/shared" | |
) | |
// Parse logs, write all to "access.log". | |
func Parse(client S3Client, bucketName string) error { | |
if client == nil { | |
var err error | |
client, err = newRealS3Client() | |
if err != nil { | |
return err | |
} | |
} | |
outputFile, err := os.Create("access.log") | |
if err != nil { | |
return err | |
} | |
defer outputFile.Close() | |
keyCh := make(chan string) | |
go client.FetchKeys(bucketName, keyCh) | |
contentCh := make(chan string) | |
var wg sync.WaitGroup | |
for key := range keyCh { | |
wg.Add(1) | |
go client.FetchObject(bucketName, key, contentCh, &wg) | |
} | |
go closeChannel(contentCh, &wg) | |
for line := range contentCh { | |
_, err := outputFile.WriteString(strings.TrimSpace(line) + "\n") | |
if err != nil { | |
fmt.Fprintf(os.Stderr, "Error while writing: \"%s\".\n", err) | |
} | |
} | |
return nil | |
} | |
func closeChannel(c chan string, wg *sync.WaitGroup) { | |
wg.Wait() | |
close(c) | |
} | |
// S3Client abstracts S3 operations. | |
type S3Client interface { | |
// Fetch all keys in the bucket, push them to c. | |
FetchKeys(bucketName string, c chan string) | |
FetchObject(bucketName, key string, c chan string, wg *sync.WaitGroup) error | |
} | |
func newRealS3Client() (S3Client, error) { | |
cfg, err := config.LoadDefaultConfig(context.TODO()) | |
if err != nil { | |
return nil, err | |
} | |
client := s3.NewFromConfig(cfg) | |
return &realS3Client{client: client}, nil | |
} | |
type realS3Client struct { | |
client *s3.Client | |
} | |
// FetchKeys . | |
func (v *realS3Client) FetchKeys(bucketName string, c chan string) { | |
shared.Logger.Info("Fetch S3 keys.", "bucket", bucketName) | |
paginator := s3.NewListObjectsV2Paginator(v.client, &s3.ListObjectsV2Input{ | |
Bucket: &bucketName, | |
}) | |
shared.Logger.Debug("Paginator.", "paginator", paginator) | |
for paginator.HasMorePages() { | |
out, err := paginator.NextPage(context.TODO()) | |
if err != nil { | |
break | |
} | |
for _, each := range out.Contents { | |
c <- *each.Key | |
} | |
} | |
close(c) | |
} | |
// FetchObject fetches an S3 object. | |
func (v *realS3Client) FetchObject( | |
bucketName, key string, | |
c chan string, | |
wg *sync.WaitGroup, | |
) error { | |
defer wg.Done() | |
specifics := &s3.GetObjectInput{Bucket: &bucketName, Key: &key} | |
out, err := v.client.GetObject(context.TODO(), specifics) | |
if err != nil { | |
return err | |
} | |
defer out.Body.Close() | |
b, err := io.ReadAll(out.Body) | |
if err != nil { | |
return err | |
} | |
scanner := bufio.NewScanner(bytes.NewReader(b)) | |
for scanner.Scan() { | |
c <- scanner.Text() | |
} | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment