Created
December 6, 2018 16:09
-
-
Save trusch/790af091d0496cf66def95d66de1886c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package http | |
import ( | |
"context" | |
"encoding/json" | |
"io" | |
"net/http" | |
"strconv" | |
"strings" | |
"time" | |
"github.com/contiamo/event-bus/pkg/events" | |
) | |
// LongPollingHandler is an http handler which serves event streams over websocket | |
type LongPollingHandler struct { | |
cli events.ControllerClient | |
} | |
func (handler *LongPollingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |
channel := r.URL.Query()["channel"] | |
if channel == "" { | |
w.WriteHeader(http.StatusBadRequest) | |
return | |
} | |
seqStr := GetLongPollingIndex(r) | |
seq, err := strconv.ParseInt(seqStr, 10, 64) | |
if err != nil { | |
seq = 0 | |
} | |
pollDuration := GetLongPollDuration(r) | |
ctx, cancel := context.WithTimeout(r.Context(), pollDuration) | |
defer cancel() | |
stream, err := handler.cli.Subscribe(ctx, &events.SubscribeRequest{ | |
Channel: channel, | |
Seq: seq, | |
}) | |
if err != nil { | |
w.WriteHeader(http.StatusInternalServerError) | |
return | |
} | |
res := make([]*event.Event, 0) | |
for { | |
evt, err := stream.Recv() | |
if err != nil { | |
if err == io.EOF { | |
break | |
} | |
w.WriteHeader(http.StatusInternalServerError) | |
return | |
} | |
res = append(res, evt) | |
} | |
if len(res) > 0 { | |
SetLongPollingIndex(w, strconv.Itoa(res[len(res)-1].Seq)) | |
json.NewEncoder(w).Encode(res) | |
return | |
} | |
WriteLongPollingTimeout(w) | |
} | |
const ( | |
// LabsLongPollingHeaderKey is the header value used to send the long polling position | |
// index value. This value will be an encoded value. Getter and Setter helpers are | |
// provided for handling this value without needing to deal with encoding/decoding | |
// directly. | |
LabsLongPollingHeaderKey = "X-Polling-Index" | |
) | |
// GetLongPollingIndex extracts and decodes the long-polling index value from the | |
// hex encoded GET parameter. If the value is invalid or missing, this will return | |
// the empty string | |
func GetLongPollingIndex(r *http.Request) string { | |
// check the GET parameter and the Prefer header | |
index := r.URL.Query().Get("index") | |
if index != "" { | |
return MustDecodeString(index) | |
} | |
prefer := r.Header["Prefer"] | |
for _, x := range prefer { | |
x = strings.TrimSpace(x) | |
preferences := strings.Split(x, ";") | |
for _, p := range preferences { | |
p = strings.TrimSpace(p) | |
if !strings.HasPrefix(p, "index") { | |
continue | |
} | |
parts := strings.Split(p, "=") | |
if len(parts) > 1 { | |
index = parts[1] | |
return MustDecodeString(index) | |
} | |
} | |
} | |
return "" | |
} | |
// SetLongPollingIndex sets the `X-Polling-Index` and | |
// `X-Polling-Update-Strategy` headers on the ResponsWriter. | |
// If the index is the empty string, it is a no-op. | |
func SetLongPollingIndex(w http.ResponseWriter, index string) { | |
if index == "" { | |
return | |
} | |
w.Header().Set(LabsLongPollingHeaderKey, EncodeToString(index)) | |
} | |
// WriteLongPollingTimeout writes the timeout status code to the ResponseWriter | |
func WriteLongPollingTimeout(w http.ResponseWriter) { | |
w.WriteHeader(http.StatusNotModified) | |
} | |
// GetLongPollDuration parses the long-polling Prefer header and returns the value | |
// as a duration. The return value will be bound between a min of 15 and max of 60 seconds. | |
func GetLongPollDuration(r *http.Request) time.Duration { | |
return time.Duration(GetRequestTimeout(r)) * time.Second | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment