Skip to content

Commit

Permalink
xds: keep ads flow control local to xdsclient/transport package (#7578)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Sep 3, 2024
1 parent 535bdce commit 92111dc
Show file tree
Hide file tree
Showing 30 changed files with 306 additions and 337 deletions.
66 changes: 33 additions & 33 deletions xds/csds/csds_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,50 +70,50 @@ func Test(t *testing.T) {

type nopListenerWatcher struct{}

func (nopListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) {
onDone.OnDone()
func (nopListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopListenerWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) {
onDone.OnDone()
func (nopListenerWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
onDone.OnDone()
func (nopListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
onDone()
}

type nopRouteConfigWatcher struct{}

func (nopRouteConfigWatcher) OnUpdate(_ *xdsresource.RouteConfigResourceData, onDone xdsresource.DoneNotifier) {
onDone.OnDone()
func (nopRouteConfigWatcher) OnUpdate(_ *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopRouteConfigWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) {
onDone.OnDone()
func (nopRouteConfigWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
onDone.OnDone()
func (nopRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
onDone()
}

type nopClusterWatcher struct{}

func (nopClusterWatcher) OnUpdate(_ *xdsresource.ClusterResourceData, onDone xdsresource.DoneNotifier) {
onDone.OnDone()
func (nopClusterWatcher) OnUpdate(_ *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopClusterWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) {
onDone.OnDone()
func (nopClusterWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
onDone.OnDone()
func (nopClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
onDone()
}

type nopEndpointsWatcher struct{}

func (nopEndpointsWatcher) OnUpdate(_ *xdsresource.EndpointsResourceData, onDone xdsresource.DoneNotifier) {
onDone.OnDone()
func (nopEndpointsWatcher) OnUpdate(_ *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopEndpointsWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) {
onDone.OnDone()
func (nopEndpointsWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
onDone.OnDone()
func (nopEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
onDone()
}

// This watcher writes the onDone callback on to a channel for the test to
Expand All @@ -126,31 +126,31 @@ func (nopEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifie
// for ADS stream level flow control), and was causing CSDS to not receive any
// updates from the xDS client.
type blockingListenerWatcher struct {
testCtxDone <-chan struct{} // Closed when the test is done.
onDoneCh chan xdsresource.DoneNotifier // Channel to write the onDone callback to.
testCtxDone <-chan struct{} // Closed when the test is done.
onDoneCh chan xdsresource.OnDoneFunc // Channel to write the onDone callback to.
}

func newBlockingListenerWatcher(testCtxDone <-chan struct{}) *blockingListenerWatcher {
return &blockingListenerWatcher{
testCtxDone: testCtxDone,
onDoneCh: make(chan xdsresource.DoneNotifier, 1),
onDoneCh: make(chan xdsresource.OnDoneFunc, 1),
}
}

func (w *blockingListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) {
func (w *blockingListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}
func (w *blockingListenerWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) {
func (w *blockingListenerWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}
func (w *blockingListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
func (w *blockingListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}

// writeOnDone attempts to writes the onDone callback on the onDone channel. It
// returns when it can successfully write to the channel or when the test is
// done, which is signalled by testCtxDone being closed.
func writeOnDone(testCtxDone <-chan struct{}, onDoneCh chan xdsresource.DoneNotifier, onDone xdsresource.DoneNotifier) {
func writeOnDone(testCtxDone <-chan struct{}, onDoneCh chan xdsresource.OnDoneFunc, onDone xdsresource.OnDoneFunc) {
select {
case <-testCtxDone:
case onDoneCh <- onDone:
Expand Down Expand Up @@ -545,7 +545,7 @@ func (s) TestCSDS_NACK(t *testing.T) {
case <-ctx.Done():
t.Fatal("Timed out waiting for watch callback")
case onDone := <-watcher2.onDoneCh:
onDone.OnDone()
onDone()
}

// Update the second resource with an empty ApiListener field which is
Expand All @@ -564,7 +564,7 @@ func (s) TestCSDS_NACK(t *testing.T) {
case <-ctx.Done():
t.Fatal("Timed out waiting for watch callback")
case onDone := <-watcher2.onDoneCh:
onDone.OnDone()
onDone()
}

// Verify that the xDS client reports the first listener resource as being
Expand Down
18 changes: 9 additions & 9 deletions xds/internal/balancer/cdsbalancer/cluster_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,19 @@ type clusterWatcher struct {
parent *cdsBalancer
}

func (cw *clusterWatcher) OnUpdate(u *xdsresource.ClusterResourceData, onDone xdsresource.DoneNotifier) {
handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone.OnDone() }
cw.parent.serializer.ScheduleOr(handleUpdate, onDone.OnDone)
func (cw *clusterWatcher) OnUpdate(u *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) {
handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone() }
cw.parent.serializer.ScheduleOr(handleUpdate, onDone)
}

func (cw *clusterWatcher) OnError(err error, onDone xdsresource.DoneNotifier) {
handleError := func(context.Context) { cw.parent.onClusterError(cw.name, err); onDone.OnDone() }
cw.parent.serializer.ScheduleOr(handleError, onDone.OnDone)
func (cw *clusterWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { cw.parent.onClusterError(cw.name, err); onDone() }
cw.parent.serializer.ScheduleOr(handleError, onDone)
}

func (cw *clusterWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
handleNotFound := func(context.Context) { cw.parent.onClusterResourceNotFound(cw.name); onDone.OnDone() }
cw.parent.serializer.ScheduleOr(handleNotFound, onDone.OnDone)
func (cw *clusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
handleNotFound := func(context.Context) { cw.parent.onClusterResourceNotFound(cw.name); onDone() }
cw.parent.serializer.ScheduleOr(handleNotFound, onDone)
}

// watcherState groups the state associated with a clusterWatcher.
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/balancer/clusterresolver/clusterresolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (b *clusterResolverBalancer) handleResourceUpdate(update *resourceUpdate) {
b.updateChildConfig()

if update.onDone != nil {
update.onDone.OnDone()
update.onDone()
}
}

Expand Down
18 changes: 9 additions & 9 deletions xds/internal/balancer/clusterresolver/resource_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type resourceUpdate struct {
priorities []priorityConfig
// To be invoked once the update is completely processed, or is dropped in
// favor of a newer update.
onDone xdsresource.DoneNotifier
onDone xdsresource.OnDoneFunc
}

// topLevelResolver is used by concrete endpointsResolver implementations for
Expand All @@ -49,7 +49,7 @@ type topLevelResolver interface {
// endpointsResolver implementation. The onDone callback is to be invoked
// once the update is completely processed, or is dropped in favor of a
// newer update.
onUpdate(onDone xdsresource.DoneNotifier)
onUpdate(onDone xdsresource.OnDoneFunc)
}

// endpointsResolver wraps the functionality to resolve a given resource name to
Expand Down Expand Up @@ -215,7 +215,7 @@ func (rr *resourceResolver) updateMechanisms(mechanisms []DiscoveryMechanism) {
}
// Regenerate even if there's no change in discovery mechanism, in case
// priority order changed.
rr.generateLocked(xdsresource.NopDoneNotifier{})
rr.generateLocked(func() {})
}

// resolveNow is typically called to trigger re-resolve of DNS. The EDS
Expand Down Expand Up @@ -264,7 +264,7 @@ func (rr *resourceResolver) stop(closing bool) {
select {
case ru := <-rr.updateChannel:
if ru.onDone != nil {
ru.onDone.OnDone()
ru.onDone()
}
default:
}
Expand All @@ -281,14 +281,14 @@ func (rr *resourceResolver) stop(closing bool) {
// clusterresolver LB policy.
//
// Caller must hold rr.mu.
func (rr *resourceResolver) generateLocked(onDone xdsresource.DoneNotifier) {
func (rr *resourceResolver) generateLocked(onDone xdsresource.OnDoneFunc) {
var ret []priorityConfig
for _, rDM := range rr.children {
u, ok := rDM.r.lastUpdate()
if !ok {
// Don't send updates to parent until all resolvers have update to
// send.
onDone.OnDone()
onDone()
return
}
switch uu := u.(type) {
Expand All @@ -304,18 +304,18 @@ func (rr *resourceResolver) generateLocked(onDone xdsresource.DoneNotifier) {
// receive path.
case ru := <-rr.updateChannel:
if ru.onDone != nil {
ru.onDone.OnDone()
ru.onDone()
}
default:
}
rr.updateChannel <- &resourceUpdate{priorities: ret, onDone: onDone}
}

func (rr *resourceResolver) onUpdate(onDone xdsresource.DoneNotifier) {
func (rr *resourceResolver) onUpdate(onDone xdsresource.OnDoneFunc) {
handleUpdate := func(context.Context) {
rr.mu.Lock()
rr.generateLocked(onDone)
rr.mu.Unlock()
}
rr.serializer.ScheduleOr(handleUpdate, func() { onDone.OnDone() })
rr.serializer.ScheduleOr(handleUpdate, func() { onDone() })
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

var (
Expand Down Expand Up @@ -80,7 +79,7 @@ func newDNSResolver(target string, topLevelResolver topLevelResolver, logger *gr
ret.logger.Infof("Failed to parse dns hostname %q in clusterresolver LB policy", target)
}
ret.updateReceived = true
ret.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{})
ret.topLevelResolver.onUpdate(func() {})
return ret
}

Expand All @@ -90,7 +89,7 @@ func newDNSResolver(target string, topLevelResolver topLevelResolver, logger *gr
ret.logger.Infof("Failed to build DNS resolver for target %q: %v", target, err)
}
ret.updateReceived = true
ret.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{})
ret.topLevelResolver.onUpdate(func() {})
return ret
}
ret.dnsR = r
Expand Down Expand Up @@ -154,7 +153,7 @@ func (dr *dnsDiscoveryMechanism) UpdateState(state resolver.State) error {
dr.updateReceived = true
dr.mu.Unlock()

dr.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{})
dr.topLevelResolver.onUpdate(func() {})
return nil
}

Expand All @@ -177,7 +176,7 @@ func (dr *dnsDiscoveryMechanism) ReportError(err error) {
dr.updateReceived = true
dr.mu.Unlock()

dr.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{})
dr.topLevelResolver.onUpdate(func() {})
}

func (dr *dnsDiscoveryMechanism) NewAddress(addresses []resolver.Address) {
Expand Down
14 changes: 7 additions & 7 deletions xds/internal/balancer/clusterresolver/resource_resolver_eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelR
}

// OnUpdate is invoked to report an update for the resource being watched.
func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData, onDone xdsresource.DoneNotifier) {
func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) {
if er.stopped.HasFired() {
onDone.OnDone()
onDone()
return
}

Expand All @@ -89,9 +89,9 @@ func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceD
er.topLevelResolver.onUpdate(onDone)
}

func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.DoneNotifier) {
func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.OnDoneFunc) {
if er.stopped.HasFired() {
onDone.OnDone()
onDone()
return
}

Expand All @@ -104,7 +104,7 @@ func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.DoneNotif
// Continue using a previously received good configuration if one
// exists.
er.mu.Unlock()
onDone.OnDone()
onDone()
return
}

Expand All @@ -120,9 +120,9 @@ func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.DoneNotif
er.topLevelResolver.onUpdate(onDone)
}

func (er *edsDiscoveryMechanism) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
func (er *edsDiscoveryMechanism) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
if er.stopped.HasFired() {
onDone.OnDone()
onDone()
return
}

Expand Down
36 changes: 18 additions & 18 deletions xds/internal/resolver/watch_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@ func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatch
return lw
}

func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) {
handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone.OnDone() }
l.parent.serializer.ScheduleOr(handleUpdate, onDone.OnDone)
func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone() }
l.parent.serializer.ScheduleOr(handleUpdate, onDone)
}

func (l *listenerWatcher) OnError(err error, onDone xdsresource.DoneNotifier) {
handleError := func(context.Context) { l.parent.onListenerResourceError(err); onDone.OnDone() }
l.parent.serializer.ScheduleOr(handleError, onDone.OnDone)
func (l *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { l.parent.onListenerResourceError(err); onDone() }
l.parent.serializer.ScheduleOr(handleError, onDone)
}

func (l *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
handleNotFound := func(context.Context) { l.parent.onListenerResourceNotFound(); onDone.OnDone() }
l.parent.serializer.ScheduleOr(handleNotFound, onDone.OnDone)
func (l *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
handleNotFound := func(context.Context) { l.parent.onListenerResourceNotFound(); onDone() }
l.parent.serializer.ScheduleOr(handleNotFound, onDone)
}

func (l *listenerWatcher) stop() {
Expand All @@ -68,22 +68,22 @@ func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfi
return rw
}

func (r *routeConfigWatcher) OnUpdate(u *xdsresource.RouteConfigResourceData, onDone xdsresource.DoneNotifier) {
func (r *routeConfigWatcher) OnUpdate(u *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) {
handleUpdate := func(context.Context) {
r.parent.onRouteConfigResourceUpdate(r.resourceName, u.Resource)
onDone.OnDone()
onDone()
}
r.parent.serializer.ScheduleOr(handleUpdate, onDone.OnDone)
r.parent.serializer.ScheduleOr(handleUpdate, onDone)
}

func (r *routeConfigWatcher) OnError(err error, onDone xdsresource.DoneNotifier) {
handleError := func(context.Context) { r.parent.onRouteConfigResourceError(r.resourceName, err); onDone.OnDone() }
r.parent.serializer.ScheduleOr(handleError, onDone.OnDone)
func (r *routeConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { r.parent.onRouteConfigResourceError(r.resourceName, err); onDone() }
r.parent.serializer.ScheduleOr(handleError, onDone)
}

func (r *routeConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
handleNotFound := func(context.Context) { r.parent.onRouteConfigResourceNotFound(r.resourceName); onDone.OnDone() }
r.parent.serializer.ScheduleOr(handleNotFound, onDone.OnDone)
func (r *routeConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
handleNotFound := func(context.Context) { r.parent.onRouteConfigResourceNotFound(r.resourceName); onDone() }
r.parent.serializer.ScheduleOr(handleNotFound, onDone)
}

func (r *routeConfigWatcher) stop() {
Expand Down
Loading

0 comments on commit 92111dc

Please sign in to comment.