Last active
December 30, 2021 22:32
-
-
Save rosmo/a6469ea80cf1375142cca0a7d6e89442 to your computer and use it in GitHub Desktop.
gcs2bq wrapper in golang
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# To build and deploy: | |
# | |
# docker build -t gcr.io/YOUR-PROJECT/gsc2bq-cloudrun:latest . | |
# docker push gcr.io/YOUR-PROJECT/gsc2bq-cloudrun:latest | |
# | |
# gcloud run deploy gcs2bq --image=gcr.io/YOUR-PROJECT/gsc2bq-cloudrun:latest \ | |
# --service-account=your-sa@YOUR-PROJECT.iam.gserviceaccount.com \ | |
# --timeout=60m --set-env-vars=GCS2BQ_PROJECT=YOUR-PROJECT,... \ | |
# --region=europe-west4 | |
# | |
# Test invocation with: | |
# curl -NH "Authorization: Bearer $(gcloud auth print-identity-token)" https://your-cloud-run-function.run.app | |
FROM golang:1.17-alpine3.15 as build | |
RUN go get github.com/GoogleCloudPlatform/professional-services/tools/gcs2bq | |
COPY main.go / | |
RUN go build -o /main /main.go | |
FROM google/cloud-sdk:alpine | |
COPY --from=build /go/bin/gcs2bq /bin/gcs2bq | |
COPY --from=build /main /bin/gcs2bq-entrypoint | |
RUN curl -Lso /bigquery.schema https://github.com/GoogleCloudPlatform/professional-services/raw/main/tools/gcs2bq/bigquery.schema | |
RUN curl -Lso /gcs2bq.avsc https://raw.githubusercontent.com/GoogleCloudPlatform/professional-services/main/tools/gcs2bq/gcs2bq.avsc | |
EXPOSE 8080 | |
ENTRYPOINT /bin/gcs2bq-entrypoint |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This is the old version that doesn't display output into the HTTP request | |
package main | |
import ( | |
"bytes" | |
"fmt" | |
"io/ioutil" | |
"log" | |
"net/http" | |
"os" | |
"os/exec" | |
"path" | |
"syscall" | |
) | |
func main() { | |
log.Print("Starting function...") | |
http.HandleFunc("/", handler) | |
// Determine port for HTTP service. | |
port := os.Getenv("PORT") | |
if port == "" { | |
port = "8080" | |
log.Printf("Defaulting to port %s", port) | |
} | |
// Start HTTP server. | |
log.Printf("Listening on port %s", port) | |
if err := http.ListenAndServe(":"+port, nil); err != nil { | |
log.Fatal(err) | |
} | |
} | |
func runCommand(canFail bool, allowedExitCodes []int, name string, args ...string) { | |
log.Printf("Running: %s %+q", name, args) | |
cmd := exec.Command(name, args...) | |
var stdout, stderr bytes.Buffer | |
cmd.Stdout = &stdout | |
cmd.Stderr = &stderr | |
if err := cmd.Run(); err != nil { | |
if stdout.String() != "" { | |
log.Println("stdout:", stdout.String()) | |
} | |
if stderr.String() != "" { | |
log.Println("stderr:", stderr.String()) | |
} | |
if canFail { | |
log.Println("Warning:", err) | |
return | |
} | |
if exiterr, ok := err.(*exec.ExitError); ok { | |
if status, ok := exiterr.Sys().(syscall.WaitStatus); ok { | |
for _, exitcode := range allowedExitCodes { | |
if status.ExitStatus() == exitcode { | |
return | |
} | |
} | |
log.Printf("Process exited with status code: %d", status.ExitStatus()) | |
} | |
} | |
log.Fatalln("Error:", err) | |
} | |
} | |
func handler(w http.ResponseWriter, r *http.Request) { | |
binary := "/bin/gcs2bq" | |
projectId := os.Getenv("GCS2BQ_PROJECT") | |
bigqueryDataset := os.Getenv("GCS2BQ_DATASET") | |
bigqueryTable := os.Getenv("GCS2BQ_TABLE") | |
bucket := os.Getenv("GCS2BQ_BUCKET") | |
location := os.Getenv("GCS2BQ_LOCATION") | |
file, err := ioutil.TempFile("/tmp", "gcs2bq") | |
if err != nil { | |
log.Fatal(err) | |
} | |
// Remove the file right away since gcs2bq excepts it not to be there | |
os.Remove(file.Name()) | |
filename := fmt.Sprintf("%s.avro", file.Name()) | |
args := []string{"-logtostderr", "-file", filename} | |
if os.Getenv("GCS2BQ_VERSIONS") != "" { | |
args = append(args, "-versions") | |
} | |
runCommand(false, []int{0, 2, 3}, binary, args...) | |
defer os.Remove(file.Name()) | |
mkBucketArgs := []string{"mb", "-p", projectId, "-c", "standard", "-l", location, "-b", "on", fmt.Sprintf("gs://%s", bucket)} | |
runCommand(true, []int{0}, "gsutil", mkBucketArgs...) | |
baseFilename := path.Base(filename) | |
copyFileArgs := []string{"cp", filename, fmt.Sprintf("gs://%s/%s", bucket, baseFilename)} | |
runCommand(false, []int{0}, "gsutil", copyFileArgs...) | |
bqMakeDatasetArgs := []string{"mk", "--project_id", projectId, "--location", location, bigqueryDataset} | |
runCommand(true, []int{0}, "bq", bqMakeDatasetArgs...) | |
bqLoadArgs := []string{"load", fmt.Sprintf("--project_id=%s", projectId), fmt.Sprintf("--location=%s", location), "--schema=bigquery.schema", "--source_format=AVRO", "--use_avro_logical_types", "--replace=true", fmt.Sprintf("%s.%s", bigqueryDataset, bigqueryTable), fmt.Sprintf("gs://%s/%s", bucket, baseFilename)} | |
runCommand(false, []int{0}, "bq", bqLoadArgs...) | |
removeFileArgs := []string{"rm", fmt.Sprintf("gs://%s/%s", bucket, baseFilename)} | |
runCommand(true, []int{0}, "gsutil", removeFileArgs...) | |
log.Println("All tasks finished.") | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This is the new version that streams stdout and stuff into the HTTP request, | |
// so better use this one | |
package main | |
import ( | |
"bufio" | |
"fmt" | |
"io/ioutil" | |
"log" | |
"net/http" | |
"os" | |
"os/exec" | |
"path" | |
"syscall" | |
"time" | |
) | |
func main() { | |
log.Print("Starting function...") | |
http.HandleFunc("/", handler) | |
// Determine port for HTTP service. | |
port := os.Getenv("PORT") | |
if port == "" { | |
port = "8080" | |
log.Printf("Defaulting to port %s", port) | |
} | |
// Start HTTP server. | |
log.Printf("Listening on port %s", port) | |
if err := http.ListenAndServe(":"+port, nil); err != nil { | |
log.Fatal(err) | |
} | |
} | |
func writeProgress(w http.ResponseWriter, flusher http.Flusher, message string) { | |
fmt.Fprintf(w, "%s\n", message) | |
flusher.Flush() | |
} | |
func runCommand(w http.ResponseWriter, flusher http.Flusher, canFail bool, allowedExitCodes []int, name string, args ...string) { | |
log.Printf("Running: %s %+q", name, args) | |
writeProgress(w, flusher, fmt.Sprintf("Running command: %s", name)) | |
cmd := exec.Command(name, args...) | |
stdout, err := cmd.StdoutPipe() | |
if err != nil { | |
log.Fatalln("Error getting stdout pipe:", err) | |
} | |
stdoutBuf := bufio.NewScanner(stdout) | |
stderr, err := cmd.StderrPipe() | |
if err != nil { | |
log.Fatalln("Error getting stderr pipe:", err) | |
} | |
stderrBuf := bufio.NewScanner(stderr) | |
if err := cmd.Start(); err != nil { | |
log.Fatalln("Error starting command:", err) | |
return | |
} | |
done := make(chan error) | |
output := make(chan string) | |
go func() { done <- cmd.Wait() }() | |
go func() { | |
for stdoutBuf.Scan() { | |
text := stdoutBuf.Text() | |
output <- text | |
} | |
}() | |
go func() { | |
for stderrBuf.Scan() { | |
text := stderrBuf.Text() | |
output <- text | |
} | |
}() | |
pollTimer := time.After(1 * time.Second) | |
for { | |
select { | |
case line := <-output: | |
writeProgress(w, flusher, line) | |
case <-pollTimer: | |
writeProgress(w, flusher, fmt.Sprintf("Still waiting to complete: %s", name)) | |
pollTimer = time.After(1 * time.Second) | |
case err := <-done: | |
if err != nil { | |
if canFail { | |
log.Println("Warning:", err) | |
return | |
} | |
if exiterr, ok := err.(*exec.ExitError); ok { | |
if status, ok := exiterr.Sys().(syscall.WaitStatus); ok { | |
for _, exitcode := range allowedExitCodes { | |
if status.ExitStatus() == exitcode { | |
return | |
} | |
} | |
log.Printf("Process exited with status code: %d", status.ExitStatus()) | |
} | |
} | |
log.Fatalln("Error:", err) | |
} else { | |
writeProgress(w, flusher, fmt.Sprintf("Command complete: %s", name)) | |
return | |
} | |
} | |
} | |
} | |
func handler(w http.ResponseWriter, r *http.Request) { | |
flusher, ok := w.(http.Flusher) | |
if !ok { | |
http.NotFound(w, r) | |
return | |
} | |
w.Header().Set("Transfer-Encoding", "chunked") | |
w.WriteHeader(http.StatusOK) | |
flusher.Flush() | |
binary := "/bin/gcs2bq" | |
projectId := os.Getenv("GCS2BQ_PROJECT") | |
bigqueryDataset := os.Getenv("GCS2BQ_DATASET") | |
bigqueryTable := os.Getenv("GCS2BQ_TABLE") | |
bucket := os.Getenv("GCS2BQ_BUCKET") | |
location := os.Getenv("GCS2BQ_LOCATION") | |
file, err := ioutil.TempFile("/tmp", "gcs2bq") | |
if err != nil { | |
log.Fatal(err) | |
} | |
// Remove the file right away since gcs2bq excepts it not to be there | |
os.Remove(file.Name()) | |
filename := fmt.Sprintf("%s.avro", file.Name()) | |
args := []string{"-logtostderr", "-file", filename} | |
if os.Getenv("GCS2BQ_LOCATION") != "" { | |
args = append(args, "-versions") | |
} | |
runCommand(w, flusher, false, []int{0, 2, 3}, binary, args...) | |
defer os.Remove(file.Name()) | |
mkBucketArgs := []string{"mb", "-p", projectId, "-c", "standard", "-l", location, "-b", "on", fmt.Sprintf("gs://%s", bucket)} | |
runCommand(w, flusher, true, []int{0}, "gsutil", mkBucketArgs...) | |
baseFilename := path.Base(filename) | |
copyFileArgs := []string{"cp", filename, fmt.Sprintf("gs://%s/%s", bucket, baseFilename)} | |
runCommand(w, flusher, false, []int{0}, "gsutil", copyFileArgs...) | |
bqMakeDatasetArgs := []string{"mk", "--project_id", projectId, "--location", location, bigqueryDataset} | |
runCommand(w, flusher, true, []int{0}, "bq", bqMakeDatasetArgs...) | |
bqLoadArgs := []string{"load", fmt.Sprintf("--project_id=%s", projectId), fmt.Sprintf("--location=%s", location), "--schema=bigquery.schema", "--source_format=AVRO", "--use_avro_logical_types", "--replace=true", fmt.Sprintf("%s.%s", bigqueryDataset, bigqueryTable), fmt.Sprintf("gs://%s/%s", bucket, baseFilename)} | |
runCommand(w, flusher, false, []int{0}, "bq", bqLoadArgs...) | |
removeFileArgs := []string{"rm", fmt.Sprintf("gs://%s/%s", bucket, baseFilename)} | |
runCommand(w, flusher, true, []int{0}, "gsutil", removeFileArgs...) | |
writeProgress(w, flusher, "All tasks finished.") | |
log.Println("All tasks finished.") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment