Created
March 20, 2015 21:03
-
-
Save colm-mchugh/63864cc4d98e974d90ed to your computer and use it in GitHub Desktop.
multi-threaded client. Starts up multiple clients to process a stream of requests.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"crypto/md5" | |
"encoding/hex" | |
"encoding/json" | |
"flag" | |
"fmt" | |
"io/ioutil" | |
"net/http" | |
"os" | |
"runtime" | |
"strings" | |
"time" | |
) | |
var queryURL = flag.String("engine", "http://localhost:8093", "URL to cbq-engine") | |
var numWorkers = flag.Int("workers", 1, "Number of worker threads") | |
var numCPUs = flag.Int("cpus", 4, "Number of cpus to use") | |
var workQueueSize = flag.Int("queue-size", 10, "Worker Queue Size") | |
var inputFile = flag.String("input", "", "File containing queries") | |
var outputFile = flag.String("output", "stdout", "Where to write output") | |
var iterations = flag.Int("iterations", 1, "Number of times to iterate over input") | |
const ( | |
QUERY_ENDPOINT = "/query" | |
) | |
func main() { | |
flag.Parse() | |
url := *queryURL + QUERY_ENDPOINT | |
nCpus := availableCPUs(*numCPUs) | |
output, err := getOutputHandle(*outputFile) | |
if err != nil { | |
fmt.Printf("Error opening outputfile: %s\n", err.Error()) | |
os.Exit(0) | |
} | |
fmt.Printf("Inputfile=%s, Outputfile=%s, query url=%s, #workers=%d, (queue size=%d), #CPUs=%d\n", *inputFile, *outputFile, url, *numWorkers, *workQueueSize, nCpus) | |
jobs := balancer(*numWorkers, *workQueueSize) | |
results := make(chan *queryResults) | |
errors := make(chan error) | |
startTime := time.Now() | |
for i := 0; i < *iterations; i++ { | |
query_channel, err := getQueryChannel(*inputFile) | |
if err != nil { | |
fmt.Printf("%s\n", err.Error()) | |
os.Exit(0) | |
} | |
numQueries := 0 | |
for { | |
query, ok := <-query_channel | |
if !ok { | |
break | |
} | |
go dispatchJob(jobs, url, query, results, errors) | |
numQueries++ | |
} | |
numResults := 0 | |
for resp := range results { | |
resp.write(output) | |
numResults++ | |
// responses for all requests read from results => stop reading results. | |
if numResults >= numQueries { | |
break | |
} | |
} | |
if numResults < numQueries { | |
// At least one query produced an error | |
numErrors := numQueries - numResults | |
for err := range errors { | |
fmt.Printf("Error: %s\n", err.Error()) | |
numErrors-- | |
if numErrors == 0 { | |
break | |
} | |
} | |
} | |
} | |
endTime := time.Now() | |
fmt.Printf("The query executing took %v to run\n", endTime.Sub(startTime)) | |
} | |
func dispatchJob(jobs chan *job, url, query string, results chan *queryResults, errors chan error) { | |
j := MakeJob(url, query) | |
jobs <- j | |
select { | |
// Put the job results on results channel: | |
case x := <-j.results: | |
results <- x | |
// Error => Put the job error on errors channel: | |
case e := <-j.errors: | |
errors <- e | |
} | |
} | |
type job struct { | |
url string | |
query string | |
results chan *queryResults | |
errors chan error | |
} | |
func (this *job) writeResponse(r *http.Response) (*queryResults, error) { | |
var response queryResults | |
defer r.Body.Close() | |
if body, err := ioutil.ReadAll(r.Body); err != nil { | |
return nil, err | |
} else { | |
err := json.Unmarshal(body, &response) | |
if err != nil { | |
fmt.Printf("json.Unmarshal Error: %s. Body is %s, Status is %s\n", err.Error(), string(body), r.Status) | |
} | |
response.query = this.query | |
return &response, err | |
} | |
} | |
func MakeJob(url string, query string) *job { | |
return &job{ | |
url: url, | |
query: query, | |
results: make(chan *queryResults), | |
errors: make(chan error), | |
} | |
} | |
type Worker struct { | |
jobs chan *job // Worker's job queue | |
count int // Number of jobs in job queue | |
} | |
func MakeWorker(queueSize int) *Worker { | |
return &Worker{ | |
jobs: make(chan *job, queueSize), | |
count: 0, | |
} | |
} | |
func (this *Worker) doWork(done chan *Worker) { | |
for { | |
job := <-this.jobs | |
if resp, err := http.Post(job.url, "text/plain", strings.NewReader(job.query)); err != nil { | |
job.errors <- err | |
} else { | |
if r, err := job.writeResponse(resp); err != nil { | |
job.errors <- err | |
} else { | |
job.results <- r | |
} | |
} | |
done <- this | |
} | |
} | |
func balancer(numWorkers int, workerQueueSize int) chan *job { | |
jobs := make(chan *job) | |
done := make(chan *Worker) | |
workers := make([]*Worker, numWorkers) | |
// Create numWorkers workers and set them up to do work | |
for i := 0; i < numWorkers; i++ { | |
workers[i] = MakeWorker(workerQueueSize) | |
go workers[i].doWork(done) | |
} | |
go func() { | |
for { | |
var free *Worker | |
min := workerQueueSize | |
// Find the free-est worker - worker with smallest #jobs in their queue: | |
for _, w := range workers { | |
if w.count < min { | |
free = w | |
min = w.count | |
} | |
} | |
var jobsource chan *job | |
if free != nil { | |
jobsource = jobs | |
} | |
// If there is a free worker, get a job from jobsource and give it to worker | |
// otherwise (jobsource is nil) wait for a worker to be done and decrement it's #jobs | |
select { | |
case j := <-jobsource: | |
free.jobs <- j | |
free.count++ | |
case w := <-done: | |
w.count-- | |
} | |
} | |
}() | |
return jobs | |
} | |
type metric_record struct { | |
ElapsedTime string `json:"elapsedTime"` | |
ExecTime string `json:"executionTime"` | |
ResultCount int `json:"resultCount"` | |
ResultSize int64 `json:"resultSize"` | |
ErrorCount int `json:"errorCount` | |
} | |
type queryResults struct { | |
ID string `json:"requestID"` | |
Signature json.RawMessage `json:"signature"` | |
Results []json.RawMessage `json:"results"` | |
Status string `json:"status"` | |
Metrics metric_record `json:"metrics"` | |
query string | |
} | |
func (this *queryResults) write(w *os.File) { | |
elapsedTime, _ := time.ParseDuration(this.Metrics.ElapsedTime) | |
execTime, _ := time.ParseDuration(this.Metrics.ExecTime) | |
elapsedMillis := int64(elapsedTime / time.Millisecond) | |
execMillis := int64(execTime / time.Millisecond) | |
fmt.Printf("%s, %s, %s, %d, %s, %d, %d, %d, %d, %s\n", GetMD5Hash(this.query), this.Status, this.Metrics.ElapsedTime, elapsedMillis, | |
this.Metrics.ExecTime, execMillis, this.Metrics.ResultCount, this.Metrics.ResultSize, | |
this.Metrics.ErrorCount, this.query) | |
} | |
func getQueryChannel(input string) (chan string, error) { | |
fileHandle, err := os.Open(input) | |
if err != nil { | |
return nil, err | |
} | |
query_chan := make(chan string) | |
go func() { | |
defer fileHandle.Close() | |
scanner := bufio.NewScanner(fileHandle) | |
scanner.Split(bufio.ScanLines) | |
for scanner.Scan() { | |
query := scanner.Text() | |
query_chan <- query | |
} | |
close(query_chan) | |
}() | |
return query_chan, nil | |
} | |
func getOutputHandle(outputFile string) (*os.File, error) { | |
if outputFile == "stdout" { | |
return os.Stdout, nil | |
} | |
return os.Create(outputFile) | |
} | |
func availableCPUs(numCPUsRequested int) int { | |
runtime.GOMAXPROCS(numCPUsRequested) | |
maxProcs := runtime.GOMAXPROCS(0) | |
numCPU := runtime.NumCPU() | |
if maxProcs < numCPU { | |
return maxProcs | |
} | |
return numCPU | |
} | |
func GetMD5Hash(text string) string { | |
hasher := md5.New() | |
hasher.Write([]byte(text)) | |
return hex.EncodeToString(hasher.Sum(nil)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment