Created
December 6, 2018 14:45
-
-
Save trusch/0b807ecc84b7fd4a4f2b49e5622f70e0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package events | |
import ( | |
"io" | |
"github.com/joomcode/errorx" | |
"github.com/nats-io/go-nats-streaming" | |
uuid "github.com/satori/go.uuid" | |
) | |
type controller struct { | |
natsClient stan.Conn | |
} | |
// NewController creates a new event controller | |
func NewController(natsAddr, clusterID string) (ControllerServer, error) { | |
cli, err := stan.Connect(clusterID, uuid.NewV4().String(), stan.NatsURL(natsAddr)) | |
if err != nil { | |
return nil, errorx.Decorate(err, "failed to connect to nats") | |
} | |
return &controller{cli}, nil | |
} | |
// Subscribe subscribes a client to a stream | |
func (c *controller) Subscribe(req *SubscribeRequest, resp Controller_SubscribeServer) error { | |
done := make(chan error, 1) | |
s, err := c.natsClient.Subscribe(req.Channel, func(m *stan.Msg) { | |
evt := &Event{ | |
Seq: m.Sequence, | |
Data: m.Data, | |
} | |
if err := resp.Send(evt); err != nil { | |
if err == io.EOF { | |
done <- nil | |
} else { | |
done <- err | |
} | |
return | |
} | |
}, stan.StartAtSequence(req.Seq)) | |
if err != nil { | |
return err | |
} | |
defer s.Close() | |
select { | |
case <-resp.Context().Done(): | |
return resp.Context().Err() | |
case err := <-done: | |
return err | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment