Skip to content

Instantly share code, notes, and snippets.

@oleewere
Last active February 9, 2024 20:28
Show Gist options
  • Save oleewere/102f956aee537064a655f753aab50ecc to your computer and use it in GitHub Desktop.
Save oleewere/102f956aee537064a655f753aab50ecc to your computer and use it in GitHub Desktop.
package main
import (
"context"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"sync"
"time"
"github.com/joho/godotenv"
"go.temporal.io/sdk/client"
"gopkg.in/yaml.v2"
)
type Plan struct {
Sleep int `yaml:"sleep"`
Fail bool `yaml:"fail"`
Parallel int `yaml:"parallel"`
Workflows int `yaml:"workflows"`
}
var plans map[string]Plan
func loadPlans() {
yamlFile, err := ioutil.ReadFile("plan.yaml")
if err != nil {
log.Fatalf("Error reading YAML file: %s\n", err)
}
err = yaml.Unmarshal(yamlFile, &plans)
if err != nil {
log.Fatalf("Error parsing YAML file: %s\n", err)
}
}
func runWorkflow(c client.Client, namespace, taskQueue, workflowID string, sleep int, fail bool) string {
ctx := context.Background()
options := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: taskQueue,
}
workflowOptions := client.ExecuteWorkflow(ctx, options, "testNodeWorkflow", sleep, fail)
err := workflowOptions.Get(ctx, nil)
if err != nil {
if _, ok := err.(*client.WorkflowExecutionError); ok {
return "failure"
}
log.Fatalf("Failed to execute workflow: %v", err)
}
return "success"
}
func executePlan(planName string) {
plan, ok := plans[planName]
if !ok {
log.Printf("Plan %s not found\n", planName)
return
}
c, err := client.Dial(client.Options{
HostPort: os.Getenv("TEMPORAL_ADDRESS"),
Namespace: os.Getenv("TEMPORAL_NAMESPACE"),
})
if err != nil {
log.Fatalf("Failed to create client: %v\n", err)
}
defer c.Close()
var wg sync.WaitGroup
results := make(chan string, plan.Workflows)
startTime := time.Now()
for i := 0; i < plan.Workflows; i++ {
wg.Add(1)
go func(workflowNum int) {
defer wg.Done()
workflowID := fmt.Sprintf("%s-%d", planName, workflowNum)
result := runWorkflow(c, os.Getenv("TEMPORAL_NAMESPACE"), os.Getenv("TEMPORAL_TASK_QUEUE"), workflowID, plan.Sleep, plan.Fail)
results <- result
}(i)
if (i+1)%plan.Parallel == 0 || i == plan.Workflows-1 {
wg.Wait() // Wait for this batch to finish
}
}
close(results)
successCount, failureCount := 0, 0
for result := range results {
if result == "success" {
successCount++
} else {
failureCount++
}
}
duration := time.Since(startTime)
log.Printf("Total workflows: %d, Success: %d, Failure: %d, Duration: %s\n", plan.Workflows, successCount, failureCount, duration)
}
func executeHandler(w http.ResponseWriter, r *http.Request) {
planName := r.URL.Query().Get("planName")
if planName == "" {
http.Error(w, "Missing planName parameter", http.StatusBadRequest)
return
}
go executePlan(planName)
fmt.Fprintf(w, "Execution of plan %s started\n", planName)
}
func main() {
err := godotenv.Load()
if err != nil {
log.Fatalf("Error loading .env file: %v", err)
}
loadPlans()
http.HandleFunc("/execute", executeHandler)
log.Fatal(http.ListenAndServe(":8000", nil))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment