xds/cds: add env var for aggregated and DNS cluster (#4440)
This commit is contained in:
Родитель
50c071e9b5
Коммит
a12250e98f
|
@ -37,11 +37,13 @@ const (
|
|||
// and kept in variable BootstrapFileName.
|
||||
//
|
||||
// When both bootstrap FileName and FileContent are set, FileName is used.
|
||||
BootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG"
|
||||
BootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG"
|
||||
|
||||
circuitBreakingSupportEnv = "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"
|
||||
timeoutSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"
|
||||
faultInjectionSupportEnv = "GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION"
|
||||
clientSideSecuritySupportEnv = "GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT"
|
||||
aggregateAndDNSSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"
|
||||
|
||||
c2pResolverSupportEnv = "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER"
|
||||
c2pResolverTestOnlyTrafficDirectorURIEnv = "GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI"
|
||||
|
@ -60,6 +62,7 @@ var (
|
|||
//
|
||||
// When both bootstrap FileName and FileContent are set, FileName is used.
|
||||
BootstrapFileContent = os.Getenv(BootstrapFileContentEnv)
|
||||
|
||||
// CircuitBreakingSupport indicates whether circuit breaking support is
|
||||
// enabled, which can be disabled by setting the environment variable
|
||||
// "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING" to "false".
|
||||
|
@ -71,10 +74,6 @@ var (
|
|||
// FaultInjectionSupport is used to control both fault injection and HTTP
|
||||
// filter support.
|
||||
FaultInjectionSupport = !strings.EqualFold(os.Getenv(faultInjectionSupportEnv), "false")
|
||||
// C2PResolverSupport indicates whether support for C2P resolver is enabled.
|
||||
// This can be enabled by setting the environment variable
|
||||
// "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER" to "true".
|
||||
C2PResolverSupport = strings.EqualFold(os.Getenv(c2pResolverSupportEnv), "true")
|
||||
// ClientSideSecuritySupport is used to control processing of security
|
||||
// configuration on the client-side.
|
||||
//
|
||||
|
@ -82,6 +81,17 @@ var (
|
|||
// have a brand new API on the server-side and users explicitly need to use
|
||||
// the new API to get security integration on the server.
|
||||
ClientSideSecuritySupport = strings.EqualFold(os.Getenv(clientSideSecuritySupportEnv), "true")
|
||||
// AggregateAndDNSSupportEnv indicates whether processing of aggregated
|
||||
// cluster and DNS cluster is enabled, which can be enabled by setting the
|
||||
// environment variable
|
||||
// "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" to
|
||||
// "true".
|
||||
AggregateAndDNSSupportEnv = strings.EqualFold(os.Getenv(aggregateAndDNSSupportEnv), "true")
|
||||
|
||||
// C2PResolverSupport indicates whether support for C2P resolver is enabled.
|
||||
// This can be enabled by setting the environment variable
|
||||
// "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER" to "true".
|
||||
C2PResolverSupport = strings.EqualFold(os.Getenv(c2pResolverSupportEnv), "true")
|
||||
// C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing.
|
||||
C2PResolverTestOnlyTrafficDirectorURI = os.Getenv(c2pResolverTestOnlyTrafficDirectorURIEnv)
|
||||
)
|
||||
|
|
|
@ -124,6 +124,9 @@ func (s) TestValidateCluster_Failure(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
oldAggregateAndDNSSupportEnv := env.AggregateAndDNSSupportEnv
|
||||
env.AggregateAndDNSSupportEnv = true
|
||||
defer func() { env.CircuitBreakingSupport = oldAggregateAndDNSSupportEnv }()
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
if update, err := validateClusterAndConstructClusterUpdate(test.cluster); err == nil {
|
||||
|
@ -261,6 +264,9 @@ func (s) TestValidateCluster_Success(t *testing.T) {
|
|||
origCircuitBreakingSupport := env.CircuitBreakingSupport
|
||||
env.CircuitBreakingSupport = true
|
||||
defer func() { env.CircuitBreakingSupport = origCircuitBreakingSupport }()
|
||||
oldAggregateAndDNSSupportEnv := env.AggregateAndDNSSupportEnv
|
||||
env.AggregateAndDNSSupportEnv = true
|
||||
defer func() { env.CircuitBreakingSupport = oldAggregateAndDNSSupportEnv }()
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
update, err := validateClusterAndConstructClusterUpdate(test.cluster)
|
||||
|
|
|
@ -615,9 +615,15 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (Clu
|
|||
ret.EDSServiceName = cluster.GetEdsClusterConfig().GetServiceName()
|
||||
return ret, nil
|
||||
case cluster.GetType() == v3clusterpb.Cluster_LOGICAL_DNS:
|
||||
if !env.AggregateAndDNSSupportEnv {
|
||||
return ClusterUpdate{}, fmt.Errorf("unsupported cluster type (%v, %v) in response: %+v", cluster.GetType(), cluster.GetClusterType(), cluster)
|
||||
}
|
||||
ret.ClusterType = ClusterTypeLogicalDNS
|
||||
return ret, nil
|
||||
case cluster.GetClusterType() != nil && cluster.GetClusterType().Name == "envoy.clusters.aggregate":
|
||||
if !env.AggregateAndDNSSupportEnv {
|
||||
return ClusterUpdate{}, fmt.Errorf("unsupported cluster type (%v, %v) in response: %+v", cluster.GetType(), cluster.GetClusterType(), cluster)
|
||||
}
|
||||
clusters := &v3aggregateclusterpb.ClusterConfig{}
|
||||
if err := proto.Unmarshal(cluster.GetClusterType().GetTypedConfig().GetValue(), clusters); err != nil {
|
||||
return ClusterUpdate{}, fmt.Errorf("failed to unmarshal resource: %v", err)
|
||||
|
@ -626,7 +632,7 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (Clu
|
|||
ret.PrioritizedClusterNames = clusters.Clusters
|
||||
return ret, nil
|
||||
default:
|
||||
return ClusterUpdate{}, fmt.Errorf("unexpected cluster type (%v, %v) in response: %+v", cluster.GetType(), cluster.GetClusterType(), cluster)
|
||||
return ClusterUpdate{}, fmt.Errorf("unsupported cluster type (%v, %v) in response: %+v", cluster.GetType(), cluster.GetClusterType(), cluster)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче