Created
April 13, 2017 14:49
-
-
Save jsokel/22dc8cd07ca9ee1c8c4af46f8db78ca8 to your computer and use it in GitHub Desktop.
A simple generic GO SQS publisher
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package queue | |
import ( | |
"encoding/base64" | |
"encoding/json" | |
log "github.com/Sirupsen/logrus" | |
"github.com/aws/aws-sdk-go/aws" | |
"github.com/aws/aws-sdk-go/service/sqs" | |
"github.com/aws/aws-sdk-go/service/sqs/sqsiface" | |
"reflect" | |
"strings" | |
) | |
// SQSPublisher is a common client for publishing message to an SQS queue. | |
// It will automatically add a MsgType attribute so multiple message types | |
// can be send down the same queue. It is up to the consumer to unmarshall | |
// according to the correct message type | |
type SqsPublisher interface { | |
// Publish will send a message | |
Publish(msg interface{}) error | |
} | |
type sqsPublisher struct { | |
client sqsiface.SQSAPI | |
queueUrl string | |
} | |
func NewSqsPublisher(sqsClient sqsiface.SQSAPI, queueUrl string) SqsPublisher { | |
return &sqsPublisher{ | |
client: sqsClient, | |
queueUrl: queueUrl, | |
} | |
} | |
func (p *sqsPublisher) Publish(msg interface{}) error { | |
log.Debugf("Publish message: %s to %s", msg, p.queueUrl) | |
payload, err := Encode(msg) | |
if err != nil { | |
log.WithError(err).Errorf("Message could not be encoded: %s", msg) | |
return err | |
} | |
input := &sqs.SendMessageInput{ | |
QueueUrl: &p.queueUrl, | |
MessageBody: &payload, | |
MessageAttributes: map[string]*sqs.MessageAttributeValue{ | |
MsgTypeKey: { | |
DataType: aws.String("String"), | |
StringValue: getMsgType(msg), | |
}, | |
}, | |
} | |
output, err := p.client.SendMessage(input) | |
if err != nil { | |
log.WithError(err).Errorf("Failed to publish message: %s", msg) | |
return err | |
} | |
log.Debugf("MessageId: %s", *output.MessageId) | |
return nil | |
} | |
func Encode(msg interface{}) (string, error) { | |
payload, err := json.Marshal(msg) | |
if err != nil { | |
return "", err | |
} | |
encoded := base64.StdEncoding.EncodeToString(payload) | |
return encoded, nil | |
} | |
// Uses reflection to the short name of the struct | |
func getMsgType(msg interface{}) *string { | |
msgType := reflect.TypeOf(msg).String() | |
idx := strings.Index(msgType, ".") | |
msgType = msgType[idx+1:] | |
return &msgType | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment