Created
December 2, 2021 01:10
-
-
Save jpeach/128b5199a564928d396545f92f7fc3a8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git pkg/server/sotw/v3/server.go pkg/server/sotw/v3/server.go | |
index b217b6cf..19d90612 100644 | |
--- pkg/server/sotw/v3/server.go | |
+++ pkg/server/sotw/v3/server.go | |
@@ -124,14 +124,6 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq | |
return out.Nonce, str.Send(out) | |
} | |
- open := func(w *watch, req *discovery.DiscoveryRequest, responder chan cache.Response) { | |
- w.cancel = s.cache.CreateWatch(req, streamState, responder) | |
- watches.cases[w.index] = reflect.SelectCase{ | |
- Dir: reflect.SelectRecv, | |
- Chan: reflect.ValueOf(responder), | |
- } | |
- } | |
- | |
if s.callbacks != nil { | |
if err := s.callbacks.OnStreamOpen(str.Context(), streamID, defaultTypeURL); err != nil { | |
return err | |
@@ -206,25 +198,31 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq | |
if w.nonce == "" || w.nonce == nonce { | |
w.Cancel() | |
- open(w, req, responder) | |
+ watches.addWatch(typeURL, &watch{ | |
+ cancel: s.cache.CreateWatch(req, streamState, responder), | |
+ nonce: w.nonce, | |
+ responseChan: responder, | |
+ }) | |
} | |
} else { | |
// No pre-existing watch exists, let's create one. | |
- // We need to precompute the watches first then open a watch in the cache. | |
- watches.responders[typeURL] = &watch{} | |
- w = watches.responders[typeURL] | |
- watches.RecomputeWatches(s.ctx, reqCh) | |
- | |
- open(w, req, responder) | |
+ watches.addWatch(typeURL, &watch{ | |
+ cancel: s.cache.CreateWatch(req, streamState, responder), | |
+ responseChan: responder, | |
+ }) | |
} | |
+ | |
+ // Recompute the dynamic select cases. | |
+ watches.RecomputeWatches(s.ctx, reqCh) | |
default: | |
// Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL | |
if !ok { | |
- return status.Errorf(codes.Unavailable, "resource watch failed") | |
+ // Receiver channel was closed. TODO(jpeach) probably cancel the watch or something? | |
+ return status.Errorf(codes.Unavailable, "resource watch %d failed", index) | |
} | |
res := value.Interface().(cache.Response) | |
- nonce, err := send(value.Interface().(cache.Response)) | |
+ nonce, err := send(res) | |
if err != nil { | |
return err | |
} | |
diff --git pkg/server/sotw/v3/watches.go pkg/server/sotw/v3/watches.go | |
index 2c40022e..4d7c7fc8 100644 | |
--- pkg/server/sotw/v3/watches.go | |
+++ pkg/server/sotw/v3/watches.go | |
@@ -6,6 +6,7 @@ import ( | |
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" | |
"github.com/envoyproxy/go-control-plane/pkg/cache/types" | |
+ "github.com/envoyproxy/go-control-plane/pkg/cache/v3" | |
) | |
// watches for all xDS resource types | |
@@ -31,38 +32,40 @@ func (w *watches) Cancel() { | |
} | |
} | |
+func (w *watches) addWatch(typeURL string, watch *watch) { | |
+ w.responders[typeURL] = watch | |
+} | |
+ | |
// recomputeWatches rebuilds the known list of dynamic channels if needed | |
func (w *watches) RecomputeWatches(ctx context.Context, req <-chan *discovery.DiscoveryRequest) { | |
- cases := []reflect.SelectCase{ | |
- { | |
+ w.cases = w.cases[:0] // Clear while retaining capacity. | |
+ | |
+ w.cases = append(w.cases, | |
+ reflect.SelectCase{ | |
Dir: reflect.SelectRecv, | |
Chan: reflect.ValueOf(ctx.Done()), | |
}, | |
- { | |
+ reflect.SelectCase{ | |
Dir: reflect.SelectRecv, | |
Chan: reflect.ValueOf(req), | |
}, | |
- } | |
+ ) | |
- index := len(cases) | |
- for _, watch := range w.responders { | |
- cases = append(cases, w.cases[watch.index]) | |
- watch.index = index | |
- index++ | |
+ for _, r := range w.responders { | |
+ w.cases = append(w.cases, | |
+ reflect.SelectCase{ | |
+ Dir: reflect.SelectRecv, | |
+ Chan: reflect.ValueOf(r.responseChan), | |
+ }, | |
+ ) | |
} | |
- | |
- w.cases = cases | |
} | |
// watch contains the necessary modifiables for receiving resource responses | |
type watch struct { | |
- cancel func() | |
- nonce string | |
- | |
- // Index is used to track the location of this channel in watches. This allows us | |
- // to update the channel used at this slot without recomputing the entire list of select | |
- // statements. | |
- index int | |
+ cancel func() | |
+ nonce string | |
+ responseChan chan cache.Response | |
} | |
// Cancel calls terminate and cancel |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment