-
-
Save oleewere/102f956aee537064a655f753aab50ecc to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"io/ioutil" | |
"log" | |
"net/http" | |
"os" | |
"sync" | |
"time" | |
"github.com/joho/godotenv" | |
"go.temporal.io/sdk/client" | |
"gopkg.in/yaml.v2" | |
) | |
type Plan struct { | |
Sleep int `yaml:"sleep"` | |
Fail bool `yaml:"fail"` | |
Parallel int `yaml:"parallel"` | |
Workflows int `yaml:"workflows"` | |
} | |
var plans map[string]Plan | |
func loadPlans() { | |
yamlFile, err := ioutil.ReadFile("plan.yaml") | |
if err != nil { | |
log.Fatalf("Error reading YAML file: %s\n", err) | |
} | |
err = yaml.Unmarshal(yamlFile, &plans) | |
if err != nil { | |
log.Fatalf("Error parsing YAML file: %s\n", err) | |
} | |
} | |
func runWorkflow(c client.Client, namespace, taskQueue, workflowID string, sleep int, fail bool) string { | |
ctx := context.Background() | |
options := client.StartWorkflowOptions{ | |
ID: workflowID, | |
TaskQueue: taskQueue, | |
} | |
workflowOptions := client.ExecuteWorkflow(ctx, options, "testNodeWorkflow", sleep, fail) | |
err := workflowOptions.Get(ctx, nil) | |
if err != nil { | |
if _, ok := err.(*client.WorkflowExecutionError); ok { | |
return "failure" | |
} | |
log.Fatalf("Failed to execute workflow: %v", err) | |
} | |
return "success" | |
} | |
func executePlan(planName string) { | |
plan, ok := plans[planName] | |
if !ok { | |
log.Printf("Plan %s not found\n", planName) | |
return | |
} | |
c, err := client.Dial(client.Options{ | |
HostPort: os.Getenv("TEMPORAL_ADDRESS"), | |
Namespace: os.Getenv("TEMPORAL_NAMESPACE"), | |
}) | |
if err != nil { | |
log.Fatalf("Failed to create client: %v\n", err) | |
} | |
defer c.Close() | |
var wg sync.WaitGroup | |
results := make(chan string, plan.Workflows) | |
startTime := time.Now() | |
for i := 0; i < plan.Workflows; i++ { | |
wg.Add(1) | |
go func(workflowNum int) { | |
defer wg.Done() | |
workflowID := fmt.Sprintf("%s-%d", planName, workflowNum) | |
result := runWorkflow(c, os.Getenv("TEMPORAL_NAMESPACE"), os.Getenv("TEMPORAL_TASK_QUEUE"), workflowID, plan.Sleep, plan.Fail) | |
results <- result | |
}(i) | |
if (i+1)%plan.Parallel == 0 || i == plan.Workflows-1 { | |
wg.Wait() // Wait for this batch to finish | |
} | |
} | |
close(results) | |
successCount, failureCount := 0, 0 | |
for result := range results { | |
if result == "success" { | |
successCount++ | |
} else { | |
failureCount++ | |
} | |
} | |
duration := time.Since(startTime) | |
log.Printf("Total workflows: %d, Success: %d, Failure: %d, Duration: %s\n", plan.Workflows, successCount, failureCount, duration) | |
} | |
func executeHandler(w http.ResponseWriter, r *http.Request) { | |
planName := r.URL.Query().Get("planName") | |
if planName == "" { | |
http.Error(w, "Missing planName parameter", http.StatusBadRequest) | |
return | |
} | |
go executePlan(planName) | |
fmt.Fprintf(w, "Execution of plan %s started\n", planName) | |
} | |
func main() { | |
err := godotenv.Load() | |
if err != nil { | |
log.Fatalf("Error loading .env file: %v", err) | |
} | |
loadPlans() | |
http.HandleFunc("/execute", executeHandler) | |
log.Fatal(http.ListenAndServe(":8000", nil)) | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment