Skip to content

Instantly share code, notes, and snippets.

@meetchandan
Last active September 13, 2024 13:05
Show Gist options
  • Save meetchandan/523935cad74944855ca936e6f4357a15 to your computer and use it in GitHub Desktop.
Save meetchandan/523935cad74944855ca936e6f4357a15 to your computer and use it in GitHub Desktop.
func worker(events <-chan Event) {
ctx := context.Background()
client, err := bigquery.NewClient(ctx, ProjectID)
if err != nil {
panic(err)
}
u := client.Dataset(DataSet).Table(Table).Uploader()
buffer := []*Event{}
for e := range events {
buffer = append(buffer, &e)
if len(buffer) == BatchSize {
tmpBuffer := buffer
go flushBuffer(ctx, u, tmpBuffer)
buffer = []*Event{}
}
}
// Handle any remaining events in the buffer after the channel closes
if len(buffer) > 0 {
flushBuffer(ctx, u, buffer)
}
}
func flushBuffer(ctx context.Context, u *bigquery.Uploader, buffer []*Event) {
log.Printf("Flushing %d events to BQ", len(buffer))
if err := u.Put(ctx, buffer); err != nil {
log.Printf("Error flushing to BQ: %s", err)
} else {
log.Printf("Flush done")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment