xds/resolver: support inline RDS resource from LDS response (#4299)
This commit is contained in:
Родитель
0028242dbb
Коммит
c72e1c8f75
|
@ -195,7 +195,15 @@ type UpdateMetadata struct {
|
|||
type ListenerUpdate struct {
|
||||
// RouteConfigName is the route configuration name corresponding to the
|
||||
// target which is being watched through LDS.
|
||||
//
|
||||
// Only one of RouteConfigName and InlineRouteConfig is set.
|
||||
RouteConfigName string
|
||||
// InlineRouteConfig is the inline route configuration (RDS response)
|
||||
// returned inside LDS.
|
||||
//
|
||||
// Only one of RouteConfigName and InlineRouteConfig is set.
|
||||
InlineRouteConfig *RouteConfigUpdate
|
||||
|
||||
// MaxStreamDuration contains the HTTP connection manager's
|
||||
// common_http_protocol_options.max_stream_duration field, or zero if
|
||||
// unset.
|
||||
|
|
|
@ -54,6 +54,7 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) {
|
|||
v3LDSTarget = "lds.target.good:3333"
|
||||
v2RouteConfigName = "v2RouteConfig"
|
||||
v3RouteConfigName = "v3RouteConfig"
|
||||
routeName = "routeName"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -132,6 +133,39 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) {
|
|||
ConfigType: &v3httppb.HttpFilter_TypedConfig{TypedConfig: unknownFilterConfig},
|
||||
IsOptional: true,
|
||||
}
|
||||
v3LisWithInlineRoute = &anypb.Any{
|
||||
TypeUrl: version.V3ListenerURL,
|
||||
Value: func() []byte {
|
||||
hcm := &v3httppb.HttpConnectionManager{
|
||||
RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
|
||||
RouteConfig: &v3routepb.RouteConfiguration{
|
||||
Name: routeName,
|
||||
VirtualHosts: []*v3routepb.VirtualHost{{
|
||||
Domains: []string{v3LDSTarget},
|
||||
Routes: []*v3routepb.Route{{
|
||||
Match: &v3routepb.RouteMatch{
|
||||
PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"},
|
||||
},
|
||||
Action: &v3routepb.Route_Route{
|
||||
Route: &v3routepb.RouteAction{
|
||||
ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName},
|
||||
}}}}}}},
|
||||
},
|
||||
CommonHttpProtocolOptions: &v3corepb.HttpProtocolOptions{
|
||||
MaxStreamDuration: durationpb.New(time.Second),
|
||||
},
|
||||
}
|
||||
mcm := marshalAny(hcm)
|
||||
lis := &v3listenerpb.Listener{
|
||||
Name: v3LDSTarget,
|
||||
ApiListener: &v3listenerpb.ApiListener{
|
||||
ApiListener: mcm,
|
||||
},
|
||||
}
|
||||
mLis, _ := proto.Marshal(lis)
|
||||
return mLis
|
||||
}(),
|
||||
}
|
||||
v3LisWithFilters = func(fs ...*v3httppb.HttpFilter) *anypb.Any {
|
||||
hcm := &v3httppb.HttpConnectionManager{
|
||||
RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{
|
||||
|
@ -650,6 +684,25 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) {
|
|||
Version: testVersion,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "v3 listener with inline route configuration",
|
||||
resources: []*anypb.Any{v3LisWithInlineRoute},
|
||||
wantUpdate: map[string]ListenerUpdate{
|
||||
v3LDSTarget: {
|
||||
InlineRouteConfig: &RouteConfigUpdate{
|
||||
VirtualHosts: []*VirtualHost{{
|
||||
Domains: []string{v3LDSTarget},
|
||||
Routes: []*Route{{Prefix: newStringP("/"), WeightedClusters: map[string]WeightedCluster{clusterName: {Weight: 1}}}},
|
||||
}}},
|
||||
MaxStreamDuration: time.Second,
|
||||
Raw: v3LisWithInlineRoute,
|
||||
},
|
||||
},
|
||||
wantMD: UpdateMetadata{
|
||||
Status: ServiceStatusACKed,
|
||||
Version: testVersion,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "multiple listener resources",
|
||||
resources: []*anypb.Any{v2Lis, v3LisWithFilters()},
|
||||
|
|
|
@ -72,7 +72,7 @@ func unmarshalListenerResource(r *anypb.Any, logger *grpclog.PrefixLogger) (stri
|
|||
}
|
||||
logger.Infof("Resource with name: %v, type: %T, contains: %v", lis.GetName(), lis, lis)
|
||||
|
||||
lu, err := processListener(lis, v2)
|
||||
lu, err := processListener(lis, logger, v2)
|
||||
if err != nil {
|
||||
return lis.GetName(), ListenerUpdate{}, err
|
||||
}
|
||||
|
@ -80,16 +80,16 @@ func unmarshalListenerResource(r *anypb.Any, logger *grpclog.PrefixLogger) (stri
|
|||
return lis.GetName(), *lu, nil
|
||||
}
|
||||
|
||||
func processListener(lis *v3listenerpb.Listener, v2 bool) (*ListenerUpdate, error) {
|
||||
func processListener(lis *v3listenerpb.Listener, logger *grpclog.PrefixLogger, v2 bool) (*ListenerUpdate, error) {
|
||||
if lis.GetApiListener() != nil {
|
||||
return processClientSideListener(lis, v2)
|
||||
return processClientSideListener(lis, logger, v2)
|
||||
}
|
||||
return processServerSideListener(lis)
|
||||
}
|
||||
|
||||
// processClientSideListener checks if the provided Listener proto meets
|
||||
// the expected criteria. If so, it returns a non-empty routeConfigName.
|
||||
func processClientSideListener(lis *v3listenerpb.Listener, v2 bool) (*ListenerUpdate, error) {
|
||||
func processClientSideListener(lis *v3listenerpb.Listener, logger *grpclog.PrefixLogger, v2 bool) (*ListenerUpdate, error) {
|
||||
update := &ListenerUpdate{}
|
||||
|
||||
apiLisAny := lis.GetApiListener().GetApiListener()
|
||||
|
@ -112,9 +112,11 @@ func processClientSideListener(lis *v3listenerpb.Listener, v2 bool) (*ListenerUp
|
|||
}
|
||||
update.RouteConfigName = name
|
||||
case *v3httppb.HttpConnectionManager_RouteConfig:
|
||||
// TODO: Add support for specifying the RouteConfiguration inline
|
||||
// in the LDS response.
|
||||
return nil, fmt.Errorf("LDS response contains RDS config inline. Not supported for now: %+v", apiLis)
|
||||
routeU, err := generateRDSUpdateFromRouteConfiguration(apiLis.GetRouteConfig(), logger, v2)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse inline RDS resp: %v", err)
|
||||
}
|
||||
update.InlineRouteConfig = &routeU
|
||||
case nil:
|
||||
return nil, fmt.Errorf("no RouteSpecifier: %+v", apiLis)
|
||||
default:
|
||||
|
|
|
@ -110,6 +110,22 @@ func (w *serviceUpdateWatcher) handleLDSResp(update xdsclient.ListenerUpdate, er
|
|||
httpFilterConfig: update.HTTPFilters,
|
||||
}
|
||||
|
||||
if update.InlineRouteConfig != nil {
|
||||
// If there was an RDS watch, cancel it.
|
||||
w.rdsName = ""
|
||||
if w.rdsCancel != nil {
|
||||
w.rdsCancel()
|
||||
w.rdsCancel = nil
|
||||
}
|
||||
|
||||
// Handle the inline RDS update as if it's from an RDS watch.
|
||||
w.updateVirtualHostsFromRDS(*update.InlineRouteConfig)
|
||||
return
|
||||
}
|
||||
|
||||
// RDS name from update is not an empty string, need RDS to fetch the
|
||||
// routes.
|
||||
|
||||
if w.rdsName == update.RouteConfigName {
|
||||
// If the new RouteConfigName is same as the previous, don't cancel and
|
||||
// restart the RDS watch.
|
||||
|
@ -126,6 +142,18 @@ func (w *serviceUpdateWatcher) handleLDSResp(update xdsclient.ListenerUpdate, er
|
|||
w.rdsCancel = w.c.WatchRouteConfig(update.RouteConfigName, w.handleRDSResp)
|
||||
}
|
||||
|
||||
func (w *serviceUpdateWatcher) updateVirtualHostsFromRDS(update xdsclient.RouteConfigUpdate) {
|
||||
matchVh := findBestMatchingVirtualHost(w.serviceName, update.VirtualHosts)
|
||||
if matchVh == nil {
|
||||
// No matching virtual host found.
|
||||
w.serviceCb(serviceUpdate{}, fmt.Errorf("no matching virtual host found for %q", w.serviceName))
|
||||
return
|
||||
}
|
||||
|
||||
w.lastUpdate.virtualHost = matchVh
|
||||
w.serviceCb(w.lastUpdate, nil)
|
||||
}
|
||||
|
||||
func (w *serviceUpdateWatcher) handleRDSResp(update xdsclient.RouteConfigUpdate, err error) {
|
||||
w.logger.Infof("received RDS update: %+v, err: %v", update, err)
|
||||
w.mu.Lock()
|
||||
|
@ -142,16 +170,7 @@ func (w *serviceUpdateWatcher) handleRDSResp(update xdsclient.RouteConfigUpdate,
|
|||
w.serviceCb(serviceUpdate{}, err)
|
||||
return
|
||||
}
|
||||
|
||||
matchVh := findBestMatchingVirtualHost(w.serviceName, update.VirtualHosts)
|
||||
if matchVh == nil {
|
||||
// No matching virtual host found.
|
||||
w.serviceCb(serviceUpdate{}, fmt.Errorf("no matching virtual host found for %q", w.serviceName))
|
||||
return
|
||||
}
|
||||
|
||||
w.lastUpdate.virtualHost = matchVh
|
||||
w.serviceCb(w.lastUpdate, nil)
|
||||
w.updateVirtualHostsFromRDS(update)
|
||||
}
|
||||
|
||||
func (w *serviceUpdateWatcher) close() {
|
||||
|
|
|
@ -356,3 +356,81 @@ func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) {
|
|||
t.Fatalf("wait for cancel route watch failed: %v, want nil", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestServiceWatchInlineRDS covers the cases switching between:
|
||||
// - LDS update contains RDS name to watch
|
||||
// - LDS update contains inline RDS resource
|
||||
func (s) TestServiceWatchInlineRDS(t *testing.T) {
|
||||
serviceUpdateCh := testutils.NewChannel()
|
||||
xdsC := fakeclient.NewClient()
|
||||
cancelWatch := watchService(xdsC, targetStr, func(update serviceUpdate, err error) {
|
||||
serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
|
||||
}, nil)
|
||||
defer cancelWatch()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
|
||||
// First LDS update is LDS with RDS name to watch.
|
||||
waitForWatchListener(ctx, t, xdsC, targetStr)
|
||||
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil)
|
||||
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
|
||||
wantUpdate := serviceUpdate{virtualHost: &xdsclient.VirtualHost{Domains: []string{"target"}, Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}}}}
|
||||
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
|
||||
VirtualHosts: []*xdsclient.VirtualHost{
|
||||
{
|
||||
Domains: []string{targetStr},
|
||||
Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}},
|
||||
},
|
||||
},
|
||||
}, nil)
|
||||
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Switch LDS resp to a LDS with inline RDS resource
|
||||
wantVirtualHosts2 := &xdsclient.VirtualHost{Domains: []string{"target"},
|
||||
Routes: []*xdsclient.Route{{
|
||||
Path: newStringP(""),
|
||||
WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}},
|
||||
}},
|
||||
}
|
||||
wantUpdate2 := serviceUpdate{virtualHost: wantVirtualHosts2}
|
||||
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{InlineRouteConfig: &xdsclient.RouteConfigUpdate{
|
||||
VirtualHosts: []*xdsclient.VirtualHost{wantVirtualHosts2},
|
||||
}}, nil)
|
||||
// This inline RDS resource should cause the RDS watch to be canceled.
|
||||
if err := xdsC.WaitForCancelRouteConfigWatch(ctx); err != nil {
|
||||
t.Fatalf("wait for cancel route watch failed: %v, want nil", err)
|
||||
}
|
||||
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate2); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Switch LDS update back to LDS with RDS name to watch.
|
||||
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil)
|
||||
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
|
||||
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
|
||||
VirtualHosts: []*xdsclient.VirtualHost{
|
||||
{
|
||||
Domains: []string{targetStr},
|
||||
Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}},
|
||||
},
|
||||
},
|
||||
}, nil)
|
||||
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Switch LDS resp to a LDS with inline RDS resource again.
|
||||
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{InlineRouteConfig: &xdsclient.RouteConfigUpdate{
|
||||
VirtualHosts: []*xdsclient.VirtualHost{wantVirtualHosts2},
|
||||
}}, nil)
|
||||
// This inline RDS resource should cause the RDS watch to be canceled.
|
||||
if err := xdsC.WaitForCancelRouteConfigWatch(ctx); err != nil {
|
||||
t.Fatalf("wait for cancel route watch failed: %v, want nil", err)
|
||||
}
|
||||
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate2); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче