Skip to content

Instantly share code, notes, and snippets.

@mreiferson
Last active August 29, 2015 14:00
Show Gist options
  • Save mreiferson/11403980 to your computer and use it in GitHub Desktop.
Save mreiferson/11403980 to your computer and use it in GitHub Desktop.
new go-nsq API
func loop(inChan chan *nsq.Message) {
for _, msg := range inChan {
err := doWork(msg)
if err != nil {
msg.Requeue(-1)
continue
}
msg.Finish()
}
}
func main() {
inChan := make(chan *nsq.Message)
conf := nsq.NewConfig()
conf.Set("max_in_flight", 1000)
conf.Set("tls_v1", true)
r, err := nsq.NewReader(*topic, *channel, conf)
if err != nil {
log.Fatalf(err.Error())
}
r.SetLogger(log.New(os.Stdout, "", log.LstdFlags), nsq.LogLevelInfo)
r.SetConcurrentHandlers(nsq.HandlerFunc(func(m *nsq.Message) error {
if doLater {
m.RequeueWithoutBackoff(300 * time.Second)
return nil
}
m.DisableAutoResponse()
inChan <- m
return nil
}), 10)
err := r.ConnectToNSQLookupd(nsqlookupdHTTPAddr)
if err != nil {
log.Fatalf(err.Error())
}
go loop()
<-r.StopChan
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment