Skip to content

Instantly share code, notes, and snippets.

@denarced
Created September 12, 2024 04:58
Show Gist options
  • Save denarced/e56af99bdb4f75b82dd958d007574d0b to your computer and use it in GitHub Desktop.
Save denarced/e56af99bdb4f75b82dd958d007574d0b to your computer and use it in GitHub Desktop.
Fetch all access logs from S3 bucket
// Package spala parse S3 access logs.
package spala
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"os"
"strings"
"sync"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/denarced/s3-photo-access-log/shared"
)
// Parse logs, write all to "access.log".
func Parse(client S3Client, bucketName string) error {
if client == nil {
var err error
client, err = newRealS3Client()
if err != nil {
return err
}
}
outputFile, err := os.Create("access.log")
if err != nil {
return err
}
defer outputFile.Close()
keyCh := make(chan string)
go client.FetchKeys(bucketName, keyCh)
contentCh := make(chan string)
var wg sync.WaitGroup
for key := range keyCh {
wg.Add(1)
go client.FetchObject(bucketName, key, contentCh, &wg)
}
go closeChannel(contentCh, &wg)
for line := range contentCh {
_, err := outputFile.WriteString(strings.TrimSpace(line) + "\n")
if err != nil {
fmt.Fprintf(os.Stderr, "Error while writing: \"%s\".\n", err)
}
}
return nil
}
func closeChannel(c chan string, wg *sync.WaitGroup) {
wg.Wait()
close(c)
}
// S3Client abstracts S3 operations.
type S3Client interface {
// Fetch all keys in the bucket, push them to c.
FetchKeys(bucketName string, c chan string)
FetchObject(bucketName, key string, c chan string, wg *sync.WaitGroup) error
}
func newRealS3Client() (S3Client, error) {
cfg, err := config.LoadDefaultConfig(context.TODO())
if err != nil {
return nil, err
}
client := s3.NewFromConfig(cfg)
return &realS3Client{client: client}, nil
}
type realS3Client struct {
client *s3.Client
}
// FetchKeys .
func (v *realS3Client) FetchKeys(bucketName string, c chan string) {
shared.Logger.Info("Fetch S3 keys.", "bucket", bucketName)
paginator := s3.NewListObjectsV2Paginator(v.client, &s3.ListObjectsV2Input{
Bucket: &bucketName,
})
shared.Logger.Debug("Paginator.", "paginator", paginator)
for paginator.HasMorePages() {
out, err := paginator.NextPage(context.TODO())
if err != nil {
break
}
for _, each := range out.Contents {
c <- *each.Key
}
}
close(c)
}
// FetchObject fetches an S3 object.
func (v *realS3Client) FetchObject(
bucketName, key string,
c chan string,
wg *sync.WaitGroup,
) error {
defer wg.Done()
specifics := &s3.GetObjectInput{Bucket: &bucketName, Key: &key}
out, err := v.client.GetObject(context.TODO(), specifics)
if err != nil {
return err
}
defer out.Body.Close()
b, err := io.ReadAll(out.Body)
if err != nil {
return err
}
scanner := bufio.NewScanner(bytes.NewReader(b))
for scanner.Scan() {
c <- scanner.Text()
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment