diff --git a/xds/internal/client/cds.go b/xds/internal/client/cds.go index 76437e2d..aa1ffd89 100644 --- a/xds/internal/client/cds.go +++ b/xds/internal/client/cds.go @@ -71,7 +71,7 @@ func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error { err = fmt.Errorf("xds: CDS target %s not found in received response %+v", wi.target, resp) } wi.stopTimer() - wi.callback.(cdsCallback)(returnUpdate, err) + wi.cdsCallback(returnUpdate, err) return nil } diff --git a/xds/internal/client/eds.go b/xds/internal/client/eds.go index c7566f31..5deeddbd 100644 --- a/xds/internal/client/eds.go +++ b/xds/internal/client/eds.go @@ -205,7 +205,7 @@ func (v2c *v2Client) handleEDSResponse(resp *xdspb.DiscoveryResponse) error { if returnUpdate != nil { wi.stopTimer() - wi.callback.(edsCallback)(returnUpdate, nil) + wi.edsCallback(returnUpdate, nil) } return nil diff --git a/xds/internal/client/lds.go b/xds/internal/client/lds.go index 4ebf4dc8..a67cde20 100644 --- a/xds/internal/client/lds.go +++ b/xds/internal/client/lds.go @@ -68,7 +68,7 @@ func (v2c *v2Client) handleLDSResponse(resp *xdspb.DiscoveryResponse) error { err = fmt.Errorf("xds: LDS target %s not found in received response %+v", wi.target, resp) } wi.stopTimer() - wi.callback.(ldsCallback)(ldsUpdate{routeName: routeName}, err) + wi.ldsCallback(ldsUpdate{routeName: routeName}, err) return nil } diff --git a/xds/internal/client/rds.go b/xds/internal/client/rds.go index ae7d44f3..0594a9cd 100644 --- a/xds/internal/client/rds.go +++ b/xds/internal/client/rds.go @@ -87,7 +87,7 @@ func (v2c *v2Client) handleRDSResponse(resp *xdspb.DiscoveryResponse) error { // that we are watching for in this response does not mean that the // server does not know about it. wi.stopTimer() - wi.callback.(rdsCallback)(rdsUpdate{clusterName: returnCluster}, nil) + wi.rdsCallback(rdsUpdate{clusterName: returnCluster}, nil) } return nil } diff --git a/xds/internal/client/testutil_test.go b/xds/internal/client/testutil_test.go index 2fbc7183..436c4be6 100644 --- a/xds/internal/client/testutil_test.go +++ b/xds/internal/client/testutil_test.go @@ -36,10 +36,10 @@ type watchHandleTestcase struct { // Only one of the following should be non-nil. The one corresponding with // typeURL will be called. - ldsWatch func(target string, ldsCb ldsCallback) (cancel func()) - rdsWatch func(routeName string, rdsCb rdsCallback) (cancel func()) - cdsWatch func(clusterName string, cdsCb cdsCallback) (cancel func()) - edsWatch func(clusterName string, edsCb edsCallback) (cancel func()) + ldsWatch func(target string, ldsCb ldsCallbackFunc) (cancel func()) + rdsWatch func(routeName string, rdsCb rdsCallbackFunc) (cancel func()) + cdsWatch func(clusterName string, cdsCb cdsCallbackFunc) (cancel func()) + edsWatch func(clusterName string, edsCb edsCallbackFunc) (cancel func()) watchReqChan *testutils.Channel // The request sent for watch will be sent to this channel. handleXDSResp func(response *xdspb.DiscoveryResponse) error } diff --git a/xds/internal/client/types.go b/xds/internal/client/types.go index 6162d7e9..be9a366f 100644 --- a/xds/internal/client/types.go +++ b/xds/internal/client/types.go @@ -48,7 +48,10 @@ type watchInfo struct { target []string state watchState - callback interface{} + ldsCallback ldsCallbackFunc + rdsCallback rdsCallbackFunc + cdsCallback cdsCallbackFunc + edsCallback edsCallbackFunc expiryTimer *time.Timer } @@ -76,12 +79,12 @@ type ackInfo struct { type ldsUpdate struct { routeName string } -type ldsCallback func(ldsUpdate, error) +type ldsCallbackFunc func(ldsUpdate, error) type rdsUpdate struct { clusterName string } -type rdsCallback func(rdsUpdate, error) +type rdsCallbackFunc func(rdsUpdate, error) // CDSUpdate contains information from a received CDS response, which is of // interest to the registered CDS watcher. @@ -92,6 +95,6 @@ type CDSUpdate struct { // EnableLRS indicates whether or not load should be reported through LRS. EnableLRS bool } -type cdsCallback func(CDSUpdate, error) +type cdsCallbackFunc func(CDSUpdate, error) -type edsCallback func(*EDSUpdate, error) +type edsCallbackFunc func(*EDSUpdate, error) diff --git a/xds/internal/client/v2client.go b/xds/internal/client/v2client.go index 61c4643d..faaa0e19 100644 --- a/xds/internal/client/v2client.go +++ b/xds/internal/client/v2client.go @@ -401,11 +401,11 @@ func (v2c *v2Client) recv(stream adsStream) bool { // function. // The provided callback should not block or perform any expensive operations // or call other methods of the v2Client object. -func (v2c *v2Client) watchLDS(target string, ldsCb ldsCallback) (cancel func()) { +func (v2c *v2Client) watchLDS(target string, ldsCb ldsCallbackFunc) (cancel func()) { return v2c.watch(&watchInfo{ - typeURL: ldsURL, - target: []string{target}, - callback: ldsCb, + typeURL: ldsURL, + target: []string{target}, + ldsCallback: ldsCb, }) } @@ -415,11 +415,11 @@ func (v2c *v2Client) watchLDS(target string, ldsCb ldsCallback) (cancel func()) // function. // The provided callback should not block or perform any expensive operations // or call other methods of the v2Client object. -func (v2c *v2Client) watchRDS(routeName string, rdsCb rdsCallback) (cancel func()) { +func (v2c *v2Client) watchRDS(routeName string, rdsCb rdsCallbackFunc) (cancel func()) { return v2c.watch(&watchInfo{ - typeURL: rdsURL, - target: []string{routeName}, - callback: rdsCb, + typeURL: rdsURL, + target: []string{routeName}, + rdsCallback: rdsCb, }) // TODO: Once a registered RDS watch is cancelled, we should send an RDS // request with no resources. This will let the server know that we are no @@ -432,11 +432,11 @@ func (v2c *v2Client) watchRDS(routeName string, rdsCb rdsCallback) (cancel func( // function. // The provided callback should not block or perform any expensive operations // or call other methods of the v2Client object. -func (v2c *v2Client) watchCDS(clusterName string, cdsCb cdsCallback) (cancel func()) { +func (v2c *v2Client) watchCDS(clusterName string, cdsCb cdsCallbackFunc) (cancel func()) { return v2c.watch(&watchInfo{ - typeURL: cdsURL, - target: []string{clusterName}, - callback: cdsCb, + typeURL: cdsURL, + target: []string{clusterName}, + cdsCallback: cdsCb, }) } @@ -446,11 +446,11 @@ func (v2c *v2Client) watchCDS(clusterName string, cdsCb cdsCallback) (cancel fun // function. // The provided callback should not block or perform any expensive operations // or call other methods of the v2Client object. -func (v2c *v2Client) watchEDS(clusterName string, edsCb edsCallback) (cancel func()) { +func (v2c *v2Client) watchEDS(clusterName string, edsCb edsCallbackFunc) (cancel func()) { return v2c.watch(&watchInfo{ - typeURL: edsURL, - target: []string{clusterName}, - callback: edsCb, + typeURL: edsURL, + target: []string{clusterName}, + edsCallback: edsCb, }) // TODO: Once a registered EDS watch is cancelled, we should send an EDS // request with no resources. This will let the server know that we are no @@ -491,7 +491,7 @@ func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) { case ldsURL: wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { v2c.mu.Lock() - wi.callback.(ldsCallback)(ldsUpdate{}, fmt.Errorf("xds: LDS target %s not found, watcher timeout", wi.target)) + wi.ldsCallback(ldsUpdate{}, fmt.Errorf("xds: LDS target %s not found, watcher timeout", wi.target)) v2c.mu.Unlock() }) case rdsURL: @@ -503,14 +503,14 @@ func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) { err = fmt.Errorf("xds: no LDS watcher found when handling RDS watch for route {%v} from cache", routeName) } v2c.logger.Infof("Resource with name %v, type %v found in cache", routeName, wi.typeURL) - wi.callback.(rdsCallback)(rdsUpdate{clusterName: cluster}, err) + wi.rdsCallback(rdsUpdate{clusterName: cluster}, err) return } // Add the watch expiry timer only for new watches we don't find in // the cache, and return from here. wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { v2c.mu.Lock() - wi.callback.(rdsCallback)(rdsUpdate{}, fmt.Errorf("xds: RDS target %s not found, watcher timeout", wi.target)) + wi.rdsCallback(rdsUpdate{}, fmt.Errorf("xds: RDS target %s not found, watcher timeout", wi.target)) v2c.mu.Unlock() }) case cdsURL: @@ -521,18 +521,18 @@ func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) { err = fmt.Errorf("xds: no CDS watcher found when handling CDS watch for cluster {%v} from cache", clusterName) } v2c.logger.Infof("Resource with name %v, type %v found in cache", clusterName, wi.typeURL) - wi.callback.(cdsCallback)(update, err) + wi.cdsCallback(update, err) return } wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { v2c.mu.Lock() - wi.callback.(cdsCallback)(CDSUpdate{}, fmt.Errorf("xds: CDS target %s not found, watcher timeout", wi.target)) + wi.cdsCallback(CDSUpdate{}, fmt.Errorf("xds: CDS target %s not found, watcher timeout", wi.target)) v2c.mu.Unlock() }) case edsURL: wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { v2c.mu.Lock() - wi.callback.(edsCallback)(nil, fmt.Errorf("xds: EDS target %s not found, watcher timeout", wi.target)) + wi.edsCallback(nil, fmt.Errorf("xds: EDS target %s not found, watcher timeout", wi.target)) v2c.mu.Unlock() }) }