xds: split callback in xds_client to avoid type assertions (#3450)
This commit is contained in:
Родитель
ba34a8c58b
Коммит
71f583e958
|
@ -71,7 +71,7 @@ func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error {
|
|||
err = fmt.Errorf("xds: CDS target %s not found in received response %+v", wi.target, resp)
|
||||
}
|
||||
wi.stopTimer()
|
||||
wi.callback.(cdsCallback)(returnUpdate, err)
|
||||
wi.cdsCallback(returnUpdate, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -205,7 +205,7 @@ func (v2c *v2Client) handleEDSResponse(resp *xdspb.DiscoveryResponse) error {
|
|||
|
||||
if returnUpdate != nil {
|
||||
wi.stopTimer()
|
||||
wi.callback.(edsCallback)(returnUpdate, nil)
|
||||
wi.edsCallback(returnUpdate, nil)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -68,7 +68,7 @@ func (v2c *v2Client) handleLDSResponse(resp *xdspb.DiscoveryResponse) error {
|
|||
err = fmt.Errorf("xds: LDS target %s not found in received response %+v", wi.target, resp)
|
||||
}
|
||||
wi.stopTimer()
|
||||
wi.callback.(ldsCallback)(ldsUpdate{routeName: routeName}, err)
|
||||
wi.ldsCallback(ldsUpdate{routeName: routeName}, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -87,7 +87,7 @@ func (v2c *v2Client) handleRDSResponse(resp *xdspb.DiscoveryResponse) error {
|
|||
// that we are watching for in this response does not mean that the
|
||||
// server does not know about it.
|
||||
wi.stopTimer()
|
||||
wi.callback.(rdsCallback)(rdsUpdate{clusterName: returnCluster}, nil)
|
||||
wi.rdsCallback(rdsUpdate{clusterName: returnCluster}, nil)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -36,10 +36,10 @@ type watchHandleTestcase struct {
|
|||
|
||||
// Only one of the following should be non-nil. The one corresponding with
|
||||
// typeURL will be called.
|
||||
ldsWatch func(target string, ldsCb ldsCallback) (cancel func())
|
||||
rdsWatch func(routeName string, rdsCb rdsCallback) (cancel func())
|
||||
cdsWatch func(clusterName string, cdsCb cdsCallback) (cancel func())
|
||||
edsWatch func(clusterName string, edsCb edsCallback) (cancel func())
|
||||
ldsWatch func(target string, ldsCb ldsCallbackFunc) (cancel func())
|
||||
rdsWatch func(routeName string, rdsCb rdsCallbackFunc) (cancel func())
|
||||
cdsWatch func(clusterName string, cdsCb cdsCallbackFunc) (cancel func())
|
||||
edsWatch func(clusterName string, edsCb edsCallbackFunc) (cancel func())
|
||||
watchReqChan *testutils.Channel // The request sent for watch will be sent to this channel.
|
||||
handleXDSResp func(response *xdspb.DiscoveryResponse) error
|
||||
}
|
||||
|
|
|
@ -48,7 +48,10 @@ type watchInfo struct {
|
|||
target []string
|
||||
state watchState
|
||||
|
||||
callback interface{}
|
||||
ldsCallback ldsCallbackFunc
|
||||
rdsCallback rdsCallbackFunc
|
||||
cdsCallback cdsCallbackFunc
|
||||
edsCallback edsCallbackFunc
|
||||
expiryTimer *time.Timer
|
||||
}
|
||||
|
||||
|
@ -76,12 +79,12 @@ type ackInfo struct {
|
|||
type ldsUpdate struct {
|
||||
routeName string
|
||||
}
|
||||
type ldsCallback func(ldsUpdate, error)
|
||||
type ldsCallbackFunc func(ldsUpdate, error)
|
||||
|
||||
type rdsUpdate struct {
|
||||
clusterName string
|
||||
}
|
||||
type rdsCallback func(rdsUpdate, error)
|
||||
type rdsCallbackFunc func(rdsUpdate, error)
|
||||
|
||||
// CDSUpdate contains information from a received CDS response, which is of
|
||||
// interest to the registered CDS watcher.
|
||||
|
@ -92,6 +95,6 @@ type CDSUpdate struct {
|
|||
// EnableLRS indicates whether or not load should be reported through LRS.
|
||||
EnableLRS bool
|
||||
}
|
||||
type cdsCallback func(CDSUpdate, error)
|
||||
type cdsCallbackFunc func(CDSUpdate, error)
|
||||
|
||||
type edsCallback func(*EDSUpdate, error)
|
||||
type edsCallbackFunc func(*EDSUpdate, error)
|
||||
|
|
|
@ -401,11 +401,11 @@ func (v2c *v2Client) recv(stream adsStream) bool {
|
|||
// function.
|
||||
// The provided callback should not block or perform any expensive operations
|
||||
// or call other methods of the v2Client object.
|
||||
func (v2c *v2Client) watchLDS(target string, ldsCb ldsCallback) (cancel func()) {
|
||||
func (v2c *v2Client) watchLDS(target string, ldsCb ldsCallbackFunc) (cancel func()) {
|
||||
return v2c.watch(&watchInfo{
|
||||
typeURL: ldsURL,
|
||||
target: []string{target},
|
||||
callback: ldsCb,
|
||||
typeURL: ldsURL,
|
||||
target: []string{target},
|
||||
ldsCallback: ldsCb,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -415,11 +415,11 @@ func (v2c *v2Client) watchLDS(target string, ldsCb ldsCallback) (cancel func())
|
|||
// function.
|
||||
// The provided callback should not block or perform any expensive operations
|
||||
// or call other methods of the v2Client object.
|
||||
func (v2c *v2Client) watchRDS(routeName string, rdsCb rdsCallback) (cancel func()) {
|
||||
func (v2c *v2Client) watchRDS(routeName string, rdsCb rdsCallbackFunc) (cancel func()) {
|
||||
return v2c.watch(&watchInfo{
|
||||
typeURL: rdsURL,
|
||||
target: []string{routeName},
|
||||
callback: rdsCb,
|
||||
typeURL: rdsURL,
|
||||
target: []string{routeName},
|
||||
rdsCallback: rdsCb,
|
||||
})
|
||||
// TODO: Once a registered RDS watch is cancelled, we should send an RDS
|
||||
// request with no resources. This will let the server know that we are no
|
||||
|
@ -432,11 +432,11 @@ func (v2c *v2Client) watchRDS(routeName string, rdsCb rdsCallback) (cancel func(
|
|||
// function.
|
||||
// The provided callback should not block or perform any expensive operations
|
||||
// or call other methods of the v2Client object.
|
||||
func (v2c *v2Client) watchCDS(clusterName string, cdsCb cdsCallback) (cancel func()) {
|
||||
func (v2c *v2Client) watchCDS(clusterName string, cdsCb cdsCallbackFunc) (cancel func()) {
|
||||
return v2c.watch(&watchInfo{
|
||||
typeURL: cdsURL,
|
||||
target: []string{clusterName},
|
||||
callback: cdsCb,
|
||||
typeURL: cdsURL,
|
||||
target: []string{clusterName},
|
||||
cdsCallback: cdsCb,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -446,11 +446,11 @@ func (v2c *v2Client) watchCDS(clusterName string, cdsCb cdsCallback) (cancel fun
|
|||
// function.
|
||||
// The provided callback should not block or perform any expensive operations
|
||||
// or call other methods of the v2Client object.
|
||||
func (v2c *v2Client) watchEDS(clusterName string, edsCb edsCallback) (cancel func()) {
|
||||
func (v2c *v2Client) watchEDS(clusterName string, edsCb edsCallbackFunc) (cancel func()) {
|
||||
return v2c.watch(&watchInfo{
|
||||
typeURL: edsURL,
|
||||
target: []string{clusterName},
|
||||
callback: edsCb,
|
||||
typeURL: edsURL,
|
||||
target: []string{clusterName},
|
||||
edsCallback: edsCb,
|
||||
})
|
||||
// TODO: Once a registered EDS watch is cancelled, we should send an EDS
|
||||
// request with no resources. This will let the server know that we are no
|
||||
|
@ -491,7 +491,7 @@ func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) {
|
|||
case ldsURL:
|
||||
wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
|
||||
v2c.mu.Lock()
|
||||
wi.callback.(ldsCallback)(ldsUpdate{}, fmt.Errorf("xds: LDS target %s not found, watcher timeout", wi.target))
|
||||
wi.ldsCallback(ldsUpdate{}, fmt.Errorf("xds: LDS target %s not found, watcher timeout", wi.target))
|
||||
v2c.mu.Unlock()
|
||||
})
|
||||
case rdsURL:
|
||||
|
@ -503,14 +503,14 @@ func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) {
|
|||
err = fmt.Errorf("xds: no LDS watcher found when handling RDS watch for route {%v} from cache", routeName)
|
||||
}
|
||||
v2c.logger.Infof("Resource with name %v, type %v found in cache", routeName, wi.typeURL)
|
||||
wi.callback.(rdsCallback)(rdsUpdate{clusterName: cluster}, err)
|
||||
wi.rdsCallback(rdsUpdate{clusterName: cluster}, err)
|
||||
return
|
||||
}
|
||||
// Add the watch expiry timer only for new watches we don't find in
|
||||
// the cache, and return from here.
|
||||
wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
|
||||
v2c.mu.Lock()
|
||||
wi.callback.(rdsCallback)(rdsUpdate{}, fmt.Errorf("xds: RDS target %s not found, watcher timeout", wi.target))
|
||||
wi.rdsCallback(rdsUpdate{}, fmt.Errorf("xds: RDS target %s not found, watcher timeout", wi.target))
|
||||
v2c.mu.Unlock()
|
||||
})
|
||||
case cdsURL:
|
||||
|
@ -521,18 +521,18 @@ func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) {
|
|||
err = fmt.Errorf("xds: no CDS watcher found when handling CDS watch for cluster {%v} from cache", clusterName)
|
||||
}
|
||||
v2c.logger.Infof("Resource with name %v, type %v found in cache", clusterName, wi.typeURL)
|
||||
wi.callback.(cdsCallback)(update, err)
|
||||
wi.cdsCallback(update, err)
|
||||
return
|
||||
}
|
||||
wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
|
||||
v2c.mu.Lock()
|
||||
wi.callback.(cdsCallback)(CDSUpdate{}, fmt.Errorf("xds: CDS target %s not found, watcher timeout", wi.target))
|
||||
wi.cdsCallback(CDSUpdate{}, fmt.Errorf("xds: CDS target %s not found, watcher timeout", wi.target))
|
||||
v2c.mu.Unlock()
|
||||
})
|
||||
case edsURL:
|
||||
wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
|
||||
v2c.mu.Lock()
|
||||
wi.callback.(edsCallback)(nil, fmt.Errorf("xds: EDS target %s not found, watcher timeout", wi.target))
|
||||
wi.edsCallback(nil, fmt.Errorf("xds: EDS target %s not found, watcher timeout", wi.target))
|
||||
v2c.mu.Unlock()
|
||||
})
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче