[Internal] LocalRegionRequest: Adds the ability to pass location header during Address Resolution (#2318)
Co-authored-by: j82w <j82w@users.noreply.github.com>
This commit is contained in:
Родитель
2caa20c8a6
Коммит
f16153b986
|
@ -4,7 +4,7 @@
|
|||
<ClientOfficialVersion>3.17.0</ClientOfficialVersion>
|
||||
<ClientPreviewVersion>3.18.0</ClientPreviewVersion>
|
||||
<ClientPreviewSuffixVersion>preview</ClientPreviewSuffixVersion>
|
||||
<DirectVersion>3.17.1</DirectVersion>
|
||||
<DirectVersion>3.17.2</DirectVersion>
|
||||
<EncryptionVersion>1.0.0-previewV12</EncryptionVersion>
|
||||
<HybridRowVersion>1.1.0-preview1</HybridRowVersion>
|
||||
<AboveDirBuildProps>$([MSBuild]::GetPathOfFileAbove('Directory.Build.props', '$(MSBuildThisFileDirectory)../'))</AboveDirBuildProps>
|
||||
|
|
|
@ -70,6 +70,7 @@ namespace Microsoft.Azure.Cosmos
|
|||
request.RequestContext.TargetIdentity = result.TargetServiceIdentity;
|
||||
request.RequestContext.ResolvedPartitionKeyRange = result.TargetPartitionKeyRange;
|
||||
request.RequestContext.RegionName = this.location;
|
||||
request.RequestContext.LocalRegionRequest = result.Addresses.IsLocalRegion;
|
||||
|
||||
await this.requestSigner.SignRequestAsync(request, cancellationToken);
|
||||
|
||||
|
|
|
@ -96,7 +96,7 @@ namespace Microsoft.Azure.Cosmos.Routing
|
|||
IReadOnlyList<PartitionKeyRangeIdentity> partitionKeyRangeIdentities,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
List<Task<FeedResource<Address>>> tasks = new List<Task<FeedResource<Address>>>();
|
||||
List<Task<DocumentServiceResponse>> tasks = new List<Task<DocumentServiceResponse>>();
|
||||
int batchSize = GatewayAddressCache.DefaultBatchSize;
|
||||
|
||||
#if !(NETSTANDARD15 || NETSTANDARD16)
|
||||
|
@ -133,18 +133,25 @@ namespace Microsoft.Azure.Cosmos.Routing
|
|||
}
|
||||
}
|
||||
|
||||
foreach (FeedResource<Address> response in await Task.WhenAll(tasks))
|
||||
foreach (DocumentServiceResponse response in await Task.WhenAll(tasks))
|
||||
{
|
||||
IEnumerable<Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation>> addressInfos =
|
||||
response.Where(addressInfo => ProtocolFromString(addressInfo.Protocol) == this.protocol)
|
||||
.GroupBy(address => address.PartitionKeyRangeId, StringComparer.Ordinal)
|
||||
.Select(group => this.ToPartitionAddressAndRange(collection.ResourceId, @group.ToList()));
|
||||
|
||||
foreach (Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> addressInfo in addressInfos)
|
||||
using (response)
|
||||
{
|
||||
this.serverPartitionAddressCache.Set(
|
||||
new PartitionKeyRangeIdentity(collection.ResourceId, addressInfo.Item1.PartitionKeyRangeId),
|
||||
addressInfo.Item2);
|
||||
FeedResource<Address> addressFeed = response.GetResource<FeedResource<Address>>();
|
||||
|
||||
bool inNetworkRequest = this.IsInNetworkRequest(response);
|
||||
|
||||
IEnumerable<Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation>> addressInfos =
|
||||
addressFeed.Where(addressInfo => ProtocolFromString(addressInfo.Protocol) == this.protocol)
|
||||
.GroupBy(address => address.PartitionKeyRangeId, StringComparer.Ordinal)
|
||||
.Select(group => this.ToPartitionAddressAndRange(collection.ResourceId, @group.ToList(), inNetworkRequest));
|
||||
|
||||
foreach (Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> addressInfo in addressInfos)
|
||||
{
|
||||
this.serverPartitionAddressCache.Set(
|
||||
new PartitionKeyRangeIdentity(collection.ResourceId, addressInfo.Item1.PartitionKeyRangeId),
|
||||
addressInfo.Item2);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -322,17 +329,22 @@ namespace Microsoft.Azure.Cosmos.Routing
|
|||
|
||||
try
|
||||
{
|
||||
FeedResource<Address> masterAddresses = await this.GetMasterAddressesViaGatewayAsync(
|
||||
using (DocumentServiceResponse response = await this.GetMasterAddressesViaGatewayAsync(
|
||||
request,
|
||||
ResourceType.Database,
|
||||
null,
|
||||
entryUrl,
|
||||
forceRefresh,
|
||||
false);
|
||||
false))
|
||||
{
|
||||
FeedResource<Address> masterAddresses = response.GetResource<FeedResource<Address>>();
|
||||
|
||||
masterAddressAndRange = this.ToPartitionAddressAndRange(string.Empty, masterAddresses.ToList());
|
||||
this.masterPartitionAddressCache = masterAddressAndRange;
|
||||
this.suboptimalMasterPartitionTimestamp = DateTime.MaxValue;
|
||||
bool inNetworkRequest = this.IsInNetworkRequest(response);
|
||||
|
||||
masterAddressAndRange = this.ToPartitionAddressAndRange(string.Empty, masterAddresses.ToList(), inNetworkRequest);
|
||||
this.masterPartitionAddressCache = masterAddressAndRange;
|
||||
this.suboptimalMasterPartitionTimestamp = DateTime.MaxValue;
|
||||
}
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
|
@ -355,33 +367,38 @@ namespace Microsoft.Azure.Cosmos.Routing
|
|||
string partitionKeyRangeId,
|
||||
bool forceRefresh)
|
||||
{
|
||||
FeedResource<Address> response =
|
||||
await this.GetServerAddressesViaGatewayAsync(request, collectionRid, new[] { partitionKeyRangeId }, forceRefresh);
|
||||
|
||||
IEnumerable<Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation>> addressInfos =
|
||||
response.Where(addressInfo => ProtocolFromString(addressInfo.Protocol) == this.protocol)
|
||||
.GroupBy(address => address.PartitionKeyRangeId, StringComparer.Ordinal)
|
||||
.Select(group => this.ToPartitionAddressAndRange(collectionRid, @group.ToList()));
|
||||
|
||||
Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> result =
|
||||
addressInfos.SingleOrDefault(
|
||||
addressInfo => StringComparer.Ordinal.Equals(addressInfo.Item1.PartitionKeyRangeId, partitionKeyRangeId));
|
||||
|
||||
if (result == null)
|
||||
using (DocumentServiceResponse response =
|
||||
await this.GetServerAddressesViaGatewayAsync(request, collectionRid, new[] { partitionKeyRangeId }, forceRefresh))
|
||||
{
|
||||
string errorMessage = string.Format(
|
||||
CultureInfo.InvariantCulture,
|
||||
RMResources.PartitionKeyRangeNotFound,
|
||||
partitionKeyRangeId,
|
||||
collectionRid);
|
||||
FeedResource<Address> addressFeed = response.GetResource<FeedResource<Address>>();
|
||||
|
||||
throw new PartitionKeyRangeGoneException(errorMessage) { ResourceAddress = collectionRid };
|
||||
bool inNetworkRequest = this.IsInNetworkRequest(response);
|
||||
|
||||
IEnumerable<Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation>> addressInfos =
|
||||
addressFeed.Where(addressInfo => ProtocolFromString(addressInfo.Protocol) == this.protocol)
|
||||
.GroupBy(address => address.PartitionKeyRangeId, StringComparer.Ordinal)
|
||||
.Select(group => this.ToPartitionAddressAndRange(collectionRid, @group.ToList(), inNetworkRequest));
|
||||
|
||||
Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> result =
|
||||
addressInfos.SingleOrDefault(
|
||||
addressInfo => StringComparer.Ordinal.Equals(addressInfo.Item1.PartitionKeyRangeId, partitionKeyRangeId));
|
||||
|
||||
if (result == null)
|
||||
{
|
||||
string errorMessage = string.Format(
|
||||
CultureInfo.InvariantCulture,
|
||||
RMResources.PartitionKeyRangeNotFound,
|
||||
partitionKeyRangeId,
|
||||
collectionRid);
|
||||
|
||||
throw new PartitionKeyRangeGoneException(errorMessage) { ResourceAddress = collectionRid };
|
||||
}
|
||||
|
||||
return result.Item2;
|
||||
}
|
||||
|
||||
return result.Item2;
|
||||
}
|
||||
|
||||
private async Task<FeedResource<Address>> GetMasterAddressesViaGatewayAsync(
|
||||
private async Task<DocumentServiceResponse> GetMasterAddressesViaGatewayAsync(
|
||||
DocumentServiceRequest request,
|
||||
ResourceType resourceType,
|
||||
string resourceAddress,
|
||||
|
@ -435,16 +452,13 @@ namespace Microsoft.Azure.Cosmos.Routing
|
|||
trace: NoOpTrace.Singleton,
|
||||
cancellationToken: default))
|
||||
{
|
||||
using (DocumentServiceResponse documentServiceResponse =
|
||||
await ClientExtensions.ParseResponseAsync(httpResponseMessage))
|
||||
{
|
||||
GatewayAddressCache.LogAddressResolutionEnd(request, identifier);
|
||||
return documentServiceResponse.GetResource<FeedResource<Address>>();
|
||||
}
|
||||
DocumentServiceResponse documentServiceResponse = await ClientExtensions.ParseResponseAsync(httpResponseMessage);
|
||||
GatewayAddressCache.LogAddressResolutionEnd(request, identifier);
|
||||
return documentServiceResponse;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<FeedResource<Address>> GetServerAddressesViaGatewayAsync(
|
||||
private async Task<DocumentServiceResponse> GetServerAddressesViaGatewayAsync(
|
||||
DocumentServiceRequest request,
|
||||
string collectionRid,
|
||||
IEnumerable<string> partitionKeyRangeIds,
|
||||
|
@ -513,17 +527,13 @@ namespace Microsoft.Azure.Cosmos.Routing
|
|||
trace: NoOpTrace.Singleton,
|
||||
cancellationToken: default))
|
||||
{
|
||||
using (DocumentServiceResponse documentServiceResponse =
|
||||
await ClientExtensions.ParseResponseAsync(httpResponseMessage))
|
||||
{
|
||||
GatewayAddressCache.LogAddressResolutionEnd(request, identifier);
|
||||
|
||||
return documentServiceResponse.GetResource<FeedResource<Address>>();
|
||||
}
|
||||
DocumentServiceResponse documentServiceResponse = await ClientExtensions.ParseResponseAsync(httpResponseMessage);
|
||||
GatewayAddressCache.LogAddressResolutionEnd(request, identifier);
|
||||
return documentServiceResponse;
|
||||
}
|
||||
}
|
||||
|
||||
internal Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> ToPartitionAddressAndRange(string collectionRid, IList<Address> addresses)
|
||||
internal Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> ToPartitionAddressAndRange(string collectionRid, IList<Address> addresses, bool inNetworkRequest)
|
||||
{
|
||||
Address address = addresses.First();
|
||||
|
||||
|
@ -561,7 +571,19 @@ namespace Microsoft.Azure.Cosmos.Routing
|
|||
|
||||
return Tuple.Create(
|
||||
partitionKeyRangeIdentity,
|
||||
new PartitionAddressInformation(addressInfos));
|
||||
new PartitionAddressInformation(addressInfos, inNetworkRequest));
|
||||
}
|
||||
|
||||
private bool IsInNetworkRequest(DocumentServiceResponse documentServiceResponse)
|
||||
{
|
||||
bool inNetworkRequest = false;
|
||||
string inNetworkHeader = documentServiceResponse.ResponseHeaders.Get(HttpConstants.HttpHeaders.LocalRegionRequest);
|
||||
if (!string.IsNullOrEmpty(inNetworkHeader))
|
||||
{
|
||||
bool.TryParse(inNetworkHeader, out inNetworkRequest);
|
||||
}
|
||||
|
||||
return inNetworkRequest;
|
||||
}
|
||||
|
||||
private static string LogAddressResolutionStart(DocumentServiceRequest request, Uri targetEndpoint)
|
||||
|
|
|
@ -165,15 +165,66 @@ namespace Microsoft.Azure.Cosmos
|
|||
}
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
[Owner("aysarkar")]
|
||||
public async Task GatewayAddressCacheInNetworkRequestTestAsync()
|
||||
{
|
||||
FakeMessageHandler messageHandler = new FakeMessageHandler();
|
||||
HttpClient httpClient = new HttpClient(messageHandler);
|
||||
httpClient.Timeout = TimeSpan.FromSeconds(120);
|
||||
GatewayAddressCache cache = new GatewayAddressCache(
|
||||
new Uri(GatewayAddressCacheTests.DatabaseAccountApiEndpoint),
|
||||
Documents.Client.Protocol.Https,
|
||||
this.mockTokenProvider.Object,
|
||||
this.mockServiceConfigReader.Object,
|
||||
MockCosmosUtil.CreateCosmosHttpClient(() => httpClient),
|
||||
suboptimalPartitionForceRefreshIntervalInSeconds: 2,
|
||||
enableTcpConnectionEndpointRediscovery: true);
|
||||
|
||||
// No header should be present.
|
||||
PartitionAddressInformation legacyRequest = await cache.TryGetAddressesAsync(
|
||||
DocumentServiceRequest.Create(OperationType.Invalid, ResourceType.Address, AuthorizationTokenType.Invalid),
|
||||
this.testPartitionKeyRangeIdentity,
|
||||
this.serviceIdentity,
|
||||
false,
|
||||
CancellationToken.None);
|
||||
|
||||
|
||||
Assert.IsFalse(legacyRequest.IsLocalRegion);
|
||||
|
||||
// Header indicates the request is from the same azure region.
|
||||
messageHandler.Headers[HttpConstants.HttpHeaders.LocalRegionRequest] = "true";
|
||||
PartitionAddressInformation inNetworkAddresses = await cache.TryGetAddressesAsync(
|
||||
DocumentServiceRequest.Create(OperationType.Invalid, ResourceType.Address, AuthorizationTokenType.Invalid),
|
||||
this.testPartitionKeyRangeIdentity,
|
||||
this.serviceIdentity,
|
||||
true,
|
||||
CancellationToken.None);
|
||||
Assert.IsTrue(inNetworkAddresses.IsLocalRegion);
|
||||
|
||||
// Header indicates the request is not from the same azure region.
|
||||
messageHandler.Headers[HttpConstants.HttpHeaders.LocalRegionRequest] = "false";
|
||||
PartitionAddressInformation outOfNetworkAddresses = await cache.TryGetAddressesAsync(
|
||||
DocumentServiceRequest.Create(OperationType.Invalid, ResourceType.Address, AuthorizationTokenType.Invalid),
|
||||
this.testPartitionKeyRangeIdentity,
|
||||
this.serviceIdentity,
|
||||
true,
|
||||
CancellationToken.None);
|
||||
Assert.IsFalse(outOfNetworkAddresses.IsLocalRegion);
|
||||
}
|
||||
|
||||
private class FakeMessageHandler : HttpMessageHandler
|
||||
{
|
||||
private bool returnFullReplicaSet;
|
||||
private bool returnUpdatedAddresses;
|
||||
|
||||
public Dictionary<string, string> Headers { get; set; }
|
||||
|
||||
public FakeMessageHandler()
|
||||
{
|
||||
this.returnFullReplicaSet = false;
|
||||
this.returnUpdatedAddresses = false;
|
||||
this.Headers = new Dictionary<string, string>();
|
||||
}
|
||||
|
||||
protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
|
||||
|
@ -224,6 +275,14 @@ namespace Microsoft.Azure.Cosmos
|
|||
Content = content,
|
||||
};
|
||||
|
||||
if (this.Headers != null)
|
||||
{
|
||||
foreach (KeyValuePair<string, string> headerPair in this.Headers)
|
||||
{
|
||||
responseMessage.Headers.Add(headerPair.Key, headerPair.Value);
|
||||
}
|
||||
}
|
||||
|
||||
return Task.FromResult<HttpResponseMessage>(responseMessage);
|
||||
}
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче