Skip to content

Instantly share code, notes, and snippets.

@colm-mchugh
Created March 20, 2015 21:03
Show Gist options
  • Save colm-mchugh/63864cc4d98e974d90ed to your computer and use it in GitHub Desktop.
Save colm-mchugh/63864cc4d98e974d90ed to your computer and use it in GitHub Desktop.
multi-threaded client. Starts up multiple clients to process a stream of requests.
package main
import (
"bufio"
"crypto/md5"
"encoding/hex"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"net/http"
"os"
"runtime"
"strings"
"time"
)
var queryURL = flag.String("engine", "http://localhost:8093", "URL to cbq-engine")
var numWorkers = flag.Int("workers", 1, "Number of worker threads")
var numCPUs = flag.Int("cpus", 4, "Number of cpus to use")
var workQueueSize = flag.Int("queue-size", 10, "Worker Queue Size")
var inputFile = flag.String("input", "", "File containing queries")
var outputFile = flag.String("output", "stdout", "Where to write output")
var iterations = flag.Int("iterations", 1, "Number of times to iterate over input")
const (
QUERY_ENDPOINT = "/query"
)
func main() {
flag.Parse()
url := *queryURL + QUERY_ENDPOINT
nCpus := availableCPUs(*numCPUs)
output, err := getOutputHandle(*outputFile)
if err != nil {
fmt.Printf("Error opening outputfile: %s\n", err.Error())
os.Exit(0)
}
fmt.Printf("Inputfile=%s, Outputfile=%s, query url=%s, #workers=%d, (queue size=%d), #CPUs=%d\n", *inputFile, *outputFile, url, *numWorkers, *workQueueSize, nCpus)
jobs := balancer(*numWorkers, *workQueueSize)
results := make(chan *queryResults)
errors := make(chan error)
startTime := time.Now()
for i := 0; i < *iterations; i++ {
query_channel, err := getQueryChannel(*inputFile)
if err != nil {
fmt.Printf("%s\n", err.Error())
os.Exit(0)
}
numQueries := 0
for {
query, ok := <-query_channel
if !ok {
break
}
go dispatchJob(jobs, url, query, results, errors)
numQueries++
}
numResults := 0
for resp := range results {
resp.write(output)
numResults++
// responses for all requests read from results => stop reading results.
if numResults >= numQueries {
break
}
}
if numResults < numQueries {
// At least one query produced an error
numErrors := numQueries - numResults
for err := range errors {
fmt.Printf("Error: %s\n", err.Error())
numErrors--
if numErrors == 0 {
break
}
}
}
}
endTime := time.Now()
fmt.Printf("The query executing took %v to run\n", endTime.Sub(startTime))
}
func dispatchJob(jobs chan *job, url, query string, results chan *queryResults, errors chan error) {
j := MakeJob(url, query)
jobs <- j
select {
// Put the job results on results channel:
case x := <-j.results:
results <- x
// Error => Put the job error on errors channel:
case e := <-j.errors:
errors <- e
}
}
type job struct {
url string
query string
results chan *queryResults
errors chan error
}
func (this *job) writeResponse(r *http.Response) (*queryResults, error) {
var response queryResults
defer r.Body.Close()
if body, err := ioutil.ReadAll(r.Body); err != nil {
return nil, err
} else {
err := json.Unmarshal(body, &response)
if err != nil {
fmt.Printf("json.Unmarshal Error: %s. Body is %s, Status is %s\n", err.Error(), string(body), r.Status)
}
response.query = this.query
return &response, err
}
}
func MakeJob(url string, query string) *job {
return &job{
url: url,
query: query,
results: make(chan *queryResults),
errors: make(chan error),
}
}
type Worker struct {
jobs chan *job // Worker's job queue
count int // Number of jobs in job queue
}
func MakeWorker(queueSize int) *Worker {
return &Worker{
jobs: make(chan *job, queueSize),
count: 0,
}
}
func (this *Worker) doWork(done chan *Worker) {
for {
job := <-this.jobs
if resp, err := http.Post(job.url, "text/plain", strings.NewReader(job.query)); err != nil {
job.errors <- err
} else {
if r, err := job.writeResponse(resp); err != nil {
job.errors <- err
} else {
job.results <- r
}
}
done <- this
}
}
func balancer(numWorkers int, workerQueueSize int) chan *job {
jobs := make(chan *job)
done := make(chan *Worker)
workers := make([]*Worker, numWorkers)
// Create numWorkers workers and set them up to do work
for i := 0; i < numWorkers; i++ {
workers[i] = MakeWorker(workerQueueSize)
go workers[i].doWork(done)
}
go func() {
for {
var free *Worker
min := workerQueueSize
// Find the free-est worker - worker with smallest #jobs in their queue:
for _, w := range workers {
if w.count < min {
free = w
min = w.count
}
}
var jobsource chan *job
if free != nil {
jobsource = jobs
}
// If there is a free worker, get a job from jobsource and give it to worker
// otherwise (jobsource is nil) wait for a worker to be done and decrement it's #jobs
select {
case j := <-jobsource:
free.jobs <- j
free.count++
case w := <-done:
w.count--
}
}
}()
return jobs
}
type metric_record struct {
ElapsedTime string `json:"elapsedTime"`
ExecTime string `json:"executionTime"`
ResultCount int `json:"resultCount"`
ResultSize int64 `json:"resultSize"`
ErrorCount int `json:"errorCount`
}
type queryResults struct {
ID string `json:"requestID"`
Signature json.RawMessage `json:"signature"`
Results []json.RawMessage `json:"results"`
Status string `json:"status"`
Metrics metric_record `json:"metrics"`
query string
}
func (this *queryResults) write(w *os.File) {
elapsedTime, _ := time.ParseDuration(this.Metrics.ElapsedTime)
execTime, _ := time.ParseDuration(this.Metrics.ExecTime)
elapsedMillis := int64(elapsedTime / time.Millisecond)
execMillis := int64(execTime / time.Millisecond)
fmt.Printf("%s, %s, %s, %d, %s, %d, %d, %d, %d, %s\n", GetMD5Hash(this.query), this.Status, this.Metrics.ElapsedTime, elapsedMillis,
this.Metrics.ExecTime, execMillis, this.Metrics.ResultCount, this.Metrics.ResultSize,
this.Metrics.ErrorCount, this.query)
}
func getQueryChannel(input string) (chan string, error) {
fileHandle, err := os.Open(input)
if err != nil {
return nil, err
}
query_chan := make(chan string)
go func() {
defer fileHandle.Close()
scanner := bufio.NewScanner(fileHandle)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
query := scanner.Text()
query_chan <- query
}
close(query_chan)
}()
return query_chan, nil
}
func getOutputHandle(outputFile string) (*os.File, error) {
if outputFile == "stdout" {
return os.Stdout, nil
}
return os.Create(outputFile)
}
func availableCPUs(numCPUsRequested int) int {
runtime.GOMAXPROCS(numCPUsRequested)
maxProcs := runtime.GOMAXPROCS(0)
numCPU := runtime.NumCPU()
if maxProcs < numCPU {
return maxProcs
}
return numCPU
}
func GetMD5Hash(text string) string {
hasher := md5.New()
hasher.Write([]byte(text))
return hex.EncodeToString(hasher.Sum(nil))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment