Skip to content

Instantly share code, notes, and snippets.

@PercyPham
Last active March 7, 2021 02:54
Show Gist options
  • Save PercyPham/07ee75a90c1fa84b4c27018e6ffee4b9 to your computer and use it in GitHub Desktop.
Save PercyPham/07ee75a90c1fa84b4c27018e6ffee4b9 to your computer and use it in GitHub Desktop.
Implement mini PubSub
package pubsub
import (
"context"
"fmt"
"sync"
"time"
)
// Topic string
type Topic string
// Event interface
type Event interface {
ID() string // ID getter
Topic() Topic // Topic getter
Data() interface{} // Data getter
CreatedAt() time.Time // CreatedAt getter
String() string
}
type event struct {
id string
topic Topic
data interface{}
createdAt time.Time
}
func newEvent(topic Topic, data interface{}) *event {
now := time.Now().UTC()
return &event{
id: fmt.Sprintf("%d", now.Nanosecond()),
topic: topic,
data: data,
createdAt: now,
}
}
func (e *event) ID() string { return e.id }
func (e *event) Topic() Topic { return e.topic }
func (e *event) Data() interface{} { return e.data }
func (e *event) CreatedAt() time.Time { return e.createdAt }
func (e *event) String() string { return fmt.Sprintf("Event %s", e.topic) }
// PubSub interface for publish/subscribe pattern
type PubSub interface {
// Publish an event to topic with specified data
Publish(ctx context.Context, topic Topic, data interface{}) error
// Subscribe returns a channel to receive events from specific topic and an unsubscribe method
Subscribe(ctx context.Context, topic Topic) (c <-chan Event, unsubscribe func())
}
type pubsub struct {
eventQueue chan Event
topicChansMap map[Topic][]chan Event
mu sync.RWMutex
}
// New instance of PubSub implementation
func New() PubSub {
ps := &pubsub{
eventQueue: make(chan Event, 10000),
topicChansMap: make(map[Topic][]chan Event),
}
ps.run()
return ps
}
func (ps *pubsub) run() {
go func() {
for {
e := <-ps.eventQueue
topic := e.Topic()
if chans, ok := ps.topicChansMap[topic]; ok {
for i := range chans {
go func(c chan Event) {
c <- e
}(chans[i])
}
}
}
}()
}
func (ps *pubsub) Publish(ctx context.Context, topic Topic, data interface{}) error {
go func() {
ps.eventQueue <- newEvent(topic, data)
}()
return nil
}
func (ps *pubsub) Subscribe(ctx context.Context, topic Topic) (<-chan Event, func()) {
c := make(chan Event)
ps.mu.Lock()
if chans, ok := ps.topicChansMap[topic]; ok {
chans = append(ps.topicChansMap[topic], c)
ps.topicChansMap[topic] = chans
} else {
ps.topicChansMap[topic] = []chan Event{c}
}
ps.mu.Unlock()
unsubscribe := func() {
if chans, ok := ps.topicChansMap[topic]; ok {
for i := range chans {
if chans[i] == c {
chans = append(chans[:i], chans[i+1:]...) // remove chan at index i
ps.mu.Lock()
ps.topicChansMap[topic] = chans
ps.mu.Unlock()
break
}
}
}
}
return c, unsubscribe
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment