This commit is contained in:
Abhijeet Mohanty 2024-09-18 22:45:19 -04:00
Родитель b43e1e0398
Коммит 1c17033cc4
2 изменённых файлов: 128 добавлений и 133 удалений

Просмотреть файл

@ -37,7 +37,7 @@ namespace Microsoft.Azure.Cosmos.Routing
private readonly TimeSpan MinTimeBetweenAccountRefresh = TimeSpan.FromSeconds(15);
private readonly int backgroundRefreshLocationTimeIntervalInMS = GlobalEndpointManager.DefaultBackgroundRefreshLocationTimeIntervalInMS;
private readonly object backgroundAccountRefreshLock = new object();
private readonly object isAccountRefreshInProgressLock = new object();
private readonly object isAccountRefreshInProgressLock = new object();
private readonly ReaderWriterLockSlim locationCacheDatabaseAccountReadWriteLock = new ReaderWriterLockSlim();
private bool isAccountRefreshInProgress = false;
private bool isBackgroundAccountRefreshActive = false;
@ -90,18 +90,18 @@ namespace Microsoft.Azure.Cosmos.Routing
}
}
public ReadOnlyCollection<Uri> ReadEndpoints => this.locationCache.ReadEndpoints;
public ReadOnlyCollection<Uri> ReadEndpoints => this.locationCache.ReadEndpoints;
public ReadOnlyCollection<Uri> AccountReadEndpoints => this.locationCache.AccountReadEndpoints;
public ReadOnlyCollection<Uri> WriteEndpoints => this.locationCache.WriteEndpoints;
public ReadOnlyCollection<Uri> WriteEndpoints => this.locationCache.WriteEndpoints;
public int PreferredLocationCount
{
get
{
Collection<string> effectivePreferredLocations = this.GetEffectivePreferredLocations();
return effectivePreferredLocations.Count;
}
}
@ -113,8 +113,8 @@ namespace Microsoft.Azure.Cosmos.Routing
public Uri GetHubUri()
{
return this.locationCache.GetHubUri();
}
return this.locationCache.GetHubUri();
}
/// <summary>
/// This will get the account information.
@ -125,20 +125,20 @@ namespace Microsoft.Azure.Cosmos.Routing
/// </summary>
public static async Task<AccountProperties> GetDatabaseAccountFromAnyLocationsAsync(
Uri defaultEndpoint,
IList<string>? locations,
IList<string>? locations,
IList<Uri>? accountInitializationCustomEndpoints,
Func<Uri, Task<AccountProperties>> getDatabaseAccountFn,
CancellationToken cancellationToken,
CancellationToken cancellationToken,
ReaderWriterLockSlim accountPropertiesReaderWriterLock)
{
{
using (GetAccountPropertiesHelper threadSafeGetAccountHelper = new GetAccountPropertiesHelper(
defaultEndpoint,
locations,
locations,
accountInitializationCustomEndpoints,
getDatabaseAccountFn,
cancellationToken))
{
return await threadSafeGetAccountHelper.GetAccountPropertiesAsync(accountPropertiesReaderWriterLock);
cancellationToken))
{
return await threadSafeGetAccountHelper.GetAccountPropertiesAsync(accountPropertiesReaderWriterLock);
}
}
@ -149,30 +149,30 @@ namespace Microsoft.Azure.Cosmos.Routing
{
private readonly CancellationTokenSource CancellationTokenSource;
private readonly Uri DefaultEndpoint;
private readonly bool LimitToGlobalEndpointOnly;
private readonly bool LimitToGlobalEndpointOnly;
private readonly IEnumerator<Uri> ServiceEndpointEnumerator;
private readonly Func<Uri, Task<AccountProperties>> GetDatabaseAccountFn;
private readonly List<Exception> TransientExceptions = new List<Exception>();
private AccountProperties? AccountProperties = null;
private Exception? NonRetriableException = null;
private Exception? NonRetriableException = null;
private int disposeCounter = 0;
public GetAccountPropertiesHelper(
Uri defaultEndpoint,
IList<string>? locations,
IList<string>? locations,
IList<Uri>? accountInitializationCustomEndpoints,
Func<Uri, Task<AccountProperties>> getDatabaseAccountFn,
CancellationToken cancellationToken)
{
this.DefaultEndpoint = defaultEndpoint;
this.LimitToGlobalEndpointOnly = (locations == null || locations.Count == 0) && (accountInitializationCustomEndpoints == null || accountInitializationCustomEndpoints.Count == 0);
this.GetDatabaseAccountFn = getDatabaseAccountFn;
this.CancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
this.ServiceEndpointEnumerator = GetAccountPropertiesHelper
.GetServiceEndpoints(
defaultEndpoint,
locations,
accountInitializationCustomEndpoints)
this.GetDatabaseAccountFn = getDatabaseAccountFn;
this.CancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
this.ServiceEndpointEnumerator = GetAccountPropertiesHelper
.GetServiceEndpoints(
defaultEndpoint,
locations,
accountInitializationCustomEndpoints)
.GetEnumerator();
}
@ -183,7 +183,7 @@ namespace Microsoft.Azure.Cosmos.Routing
{
return await this.GetOnlyGlobalEndpointAsync(readerWriterLock);
}
Task globalEndpointTask = this.GetAndUpdateAccountPropertiesAsync(this.DefaultEndpoint, readerWriterLock);
// Start a timer to start secondary requests in parallel.
@ -273,7 +273,7 @@ namespace Microsoft.Azure.Cosmos.Routing
/// This is done in a thread safe way to allow multiple tasks to iterate over the list of service endpoints.
/// </summary>
private async Task TryGetAccountPropertiesFromAllLocationsAsync(ReaderWriterLockSlim readerWriterLock)
{
{
while (this.TryMoveNextServiceEndpointhreadSafe(
out Uri? serviceEndpoint))
{
@ -283,20 +283,20 @@ namespace Microsoft.Azure.Cosmos.Routing
return;
}
await this.GetAndUpdateAccountPropertiesAsync(
endpoint: serviceEndpoint,
await this.GetAndUpdateAccountPropertiesAsync(
endpoint: serviceEndpoint,
readerWriterLock);
}
}
/// <summary>
/// We first iterate through all the private endpoints to fetch the account information.
/// If all the attempt fails to fetch the metadata from the private endpoints, we will
/// attempt to retrieve the account information from the regional endpoints constructed
/// using the preferred regions list.
/// </summary>
/// <param name="serviceEndpoint">An instance of <see cref="Uri"/> that will contain the service endpoint.</param>
/// <returns>A boolean flag indicating if the <see cref="ServiceEndpointEnumerator"/> was advanced in a thread safe manner.</returns>
}
/// <summary>
/// We first iterate through all the private endpoints to fetch the account information.
/// If all the attempt fails to fetch the metadata from the private endpoints, we will
/// attempt to retrieve the account information from the regional endpoints constructed
/// using the preferred regions list.
/// </summary>
/// <param name="serviceEndpoint">An instance of <see cref="Uri"/> that will contain the service endpoint.</param>
/// <returns>A boolean flag indicating if the <see cref="ServiceEndpointEnumerator"/> was advanced in a thread safe manner.</returns>
private bool TryMoveNextServiceEndpointhreadSafe(
out Uri? serviceEndpoint)
{
@ -317,7 +317,7 @@ namespace Microsoft.Azure.Cosmos.Routing
serviceEndpoint = this.ServiceEndpointEnumerator.Current;
return true;
}
}
}
private async Task GetAndUpdateAccountPropertiesAsync(Uri endpoint, ReaderWriterLockSlim readerWriterLock)
{
@ -338,7 +338,7 @@ namespace Microsoft.Azure.Cosmos.Routing
if (databaseAccount != null)
{
readerWriterLock.EnterWriteLock();
try
{
this.AccountProperties = databaseAccount;
@ -378,81 +378,81 @@ namespace Microsoft.Azure.Cosmos.Routing
}
return false;
}
/// <summary>
/// Returns an instance of <see cref="IEnumerable{Uri}"/> containing the private and regional service endpoints to iterate over.
/// </summary>
/// <param name="defaultEndpoint">An instance of <see cref="Uri"/> containing the default global endpoint.</param>
/// <param name="locations">An instance of <see cref="IList{T}"/> containing the preferred serviceEndpoint names.</param>
/// <param name="accountInitializationCustomEndpoints">An instance of <see cref="IList{T}"/> containing the custom private endpoints.</param>
/// <returns>An instance of <see cref="IEnumerator{T}"/> containing the service endpoints.</returns>
private static IEnumerable<Uri> GetServiceEndpoints(
Uri defaultEndpoint,
IList<string>? locations,
IList<Uri>? accountInitializationCustomEndpoints)
{
// We first iterate over all the private endpoints and yield return them.
if (accountInitializationCustomEndpoints?.Count > 0)
{
foreach (Uri customEndpoint in accountInitializationCustomEndpoints)
{
// Yield return all of the custom private endpoints first.
yield return customEndpoint;
}
}
// The next step is to iterate over the preferred locations, construct and yield return the regional endpoints one by one.
// The regional endpoints will be constructed by appending the preferred region name as a suffix to the default global endpoint.
if (locations?.Count > 0)
{
foreach (string location in locations)
{
// Yield return all of the regional endpoints once the private custom endpoints are visited.
yield return LocationHelper.GetLocationEndpoint(defaultEndpoint, location);
}
}
}
public void Dispose()
{
if (Interlocked.Increment(ref this.disposeCounter) == 1)
{
this.CancellationTokenSource?.Cancel();
this.CancellationTokenSource?.Dispose();
}
}
}
/// <summary>
/// Returns an instance of <see cref="IEnumerable{Uri}"/> containing the private and regional service endpoints to iterate over.
/// </summary>
/// <param name="defaultEndpoint">An instance of <see cref="Uri"/> containing the default global endpoint.</param>
/// <param name="locations">An instance of <see cref="IList{T}"/> containing the preferred serviceEndpoint names.</param>
/// <param name="accountInitializationCustomEndpoints">An instance of <see cref="IList{T}"/> containing the custom private endpoints.</param>
/// <returns>An instance of <see cref="IEnumerator{T}"/> containing the service endpoints.</returns>
private static IEnumerable<Uri> GetServiceEndpoints(
Uri defaultEndpoint,
IList<string>? locations,
IList<Uri>? accountInitializationCustomEndpoints)
{
// We first iterate over all the private endpoints and yield return them.
if (accountInitializationCustomEndpoints?.Count > 0)
{
foreach (Uri customEndpoint in accountInitializationCustomEndpoints)
{
// Yield return all of the custom private endpoints first.
yield return customEndpoint;
}
}
// The next step is to iterate over the preferred locations, construct and yield return the regional endpoints one by one.
// The regional endpoints will be constructed by appending the preferred region name as a suffix to the default global endpoint.
if (locations?.Count > 0)
{
foreach (string location in locations)
{
// Yield return all of the regional endpoints once the private custom endpoints are visited.
yield return LocationHelper.GetLocationEndpoint(defaultEndpoint, location);
}
}
}
public void Dispose()
{
if (Interlocked.Increment(ref this.disposeCounter) == 1)
{
this.CancellationTokenSource?.Cancel();
this.CancellationTokenSource?.Dispose();
}
}
}
public virtual Uri ResolveServiceEndpoint(DocumentServiceRequest request)
{
return this.locationCache.ResolveServiceEndpoint(request);
}
/// <summary>
/// Gets the default endpoint of the account
/// </summary>
/// <returns>the default endpoint.</returns>
public Uri GetDefaultEndpoint()
{
return this.locationCache.GetDefaultEndpoint();
}
/// <summary>
/// Gets the mapping of available write region names to the respective endpoints
/// </summary>
public ReadOnlyDictionary<string, Uri> GetAvailableWriteEndpointsByLocation()
{
return this.locationCache.GetAvailableWriteEndpointsByLocation();
}
/// <summary>
/// Gets the mapping of available read region names to the respective endpoints
/// </summary>
public ReadOnlyDictionary<string, Uri> GetAvailableReadEndpointsByLocation()
{
return this.locationCache.GetAvailableReadEndpointsByLocation();
}
}
/// <summary>
/// Gets the default endpoint of the account
/// </summary>
/// <returns>the default endpoint.</returns>
public Uri GetDefaultEndpoint()
{
return this.locationCache.GetDefaultEndpoint();
}
/// <summary>
/// Gets the mapping of available write region names to the respective endpoints
/// </summary>
public ReadOnlyDictionary<string, Uri> GetAvailableWriteEndpointsByLocation()
{
return this.locationCache.GetAvailableWriteEndpointsByLocation();
}
/// <summary>
/// Gets the mapping of available read region names to the respective endpoints
/// </summary>
public ReadOnlyDictionary<string, Uri> GetAvailableReadEndpointsByLocation()
{
return this.locationCache.GetAvailableReadEndpointsByLocation();
}
/// <summary>
/// Returns serviceEndpoint corresponding to the endpoint
@ -467,11 +467,11 @@ namespace Microsoft.Azure.Cosmos.Routing
{
return this.locationCache.GetApplicableEndpoints(request, isReadRequest);
}
public ReadOnlyCollection<string> GetApplicableRegions(IEnumerable<string> excludeRegions, bool isReadRequest)
{
return this.locationCache.GetApplicableRegions(excludeRegions, isReadRequest);
}
}
public bool TryGetLocationForGatewayDiagnostics(Uri endpoint, out string regionName)
{
@ -515,9 +515,10 @@ namespace Microsoft.Azure.Cosmos.Routing
if (this.cancellationTokenSource.IsCancellationRequested)
{
return;
}
}
this.locationCacheDatabaseAccountReadWriteLock.EnterWriteLock();
this.locationCacheDatabaseAccountReadWriteLock.EnterWriteLock();
try
{
this.locationCache.OnDatabaseAccountRead(databaseAccount);
@ -526,7 +527,7 @@ namespace Microsoft.Azure.Cosmos.Routing
{
this.locationCacheDatabaseAccountReadWriteLock.ExitWriteLock();
}
if (this.isBackgroundAccountRefreshActive)
{
return;
@ -675,9 +676,8 @@ namespace Microsoft.Azure.Cosmos.Routing
try
{
this.LastBackgroundRefreshUtc = DateTime.UtcNow;
AccountProperties accountProperties = await this.GetDatabaseAccountAsync(true);
this.locationCacheDatabaseAccountReadWriteLock.EnterWriteLock();
try
@ -688,12 +688,12 @@ namespace Microsoft.Azure.Cosmos.Routing
{
this.locationCacheDatabaseAccountReadWriteLock.ExitWriteLock();
}
}
catch (Exception ex)
{
DefaultTrace.TraceWarning("Failed to refresh database account with exception: {0}. Activity Id: '{1}'",
ex,
System.Diagnostics.Trace.CorrelationManager.ActivityId);
}
catch (Exception ex)
{
DefaultTrace.TraceWarning("Failed to refresh database account with exception: {0}. Activity Id: '{1}'",
ex,
System.Diagnostics.Trace.CorrelationManager.ActivityId);
}
finally
{
@ -711,10 +711,10 @@ namespace Microsoft.Azure.Cosmos.Routing
obsoleteValue: null,
singleValueInitFunc: () => GlobalEndpointManager.GetDatabaseAccountFromAnyLocationsAsync(
this.defaultEndpoint,
this.GetEffectivePreferredLocations(),
this.GetEffectivePreferredLocations(),
this.connectionPolicy.AccountInitializationCustomEndpoints,
this.GetDatabaseAccountAsync,
this.cancellationTokenSource.Token,
this.cancellationTokenSource.Token,
this.locationCacheDatabaseAccountReadWriteLock),
cancellationToken: this.cancellationTokenSource.Token,
forceRefresh: forceRefresh);
@ -730,8 +730,8 @@ namespace Microsoft.Azure.Cosmos.Routing
TimeSpan timeSinceLastRefresh = DateTime.UtcNow - this.LastBackgroundRefreshUtc;
return (this.isAccountRefreshInProgress || this.MinTimeBetweenAccountRefresh > timeSinceLastRefresh)
&& !forceRefresh;
}
}
public Collection<string> GetEffectivePreferredLocations()
{
if (this.connectionPolicy.PreferredLocations != null && this.connectionPolicy.PreferredLocations.Count > 0)

Просмотреть файл

@ -275,11 +275,6 @@ namespace Microsoft.Azure.Cosmos.Routing
public ReadOnlyCollection<string> GetAvailableWriteLocations()
{
return this.locationInfo.AvailableWriteLocations;
}
public ReadOnlyCollection<string> GetAvailableWriteLocations()
{
return this.locationInfo.AvailableWriteLocations;
}
/// <summary>