Reduce the number of storage calls in lease manager (#357)

* Couple improvements in Azure Lease Manager to reduce numberof storage calls.

* N/A as partition id

* Go with default timeout

* Moving to most recent AMQP release

* Fix flaky EPH test

* Adding 30 seconds default operation timeout back to tests.

* Reducing EPH to storage IO calls.

* Couple more fixes

* .

* Set token for owned leases.

* Refresh lease before acquiring in processor host.

* Fix metada removal order during lease release.

* Update lease token only for already running pumps to avoid resetting receiver position data.

* FetchAttributesAsync of blob as part of GetAllLeasesAsync() call.

* Refresh lease before attempting to steal

* Don't retry if we already lost the lease during receiver open.

* Don't attempt to steal if owner has changed from the calculation time to refresh time.

* -

* Partition pump to close when hit ReceiverDisconnectedException since this is not recoverable.

* -

* Ignore any failure during releasing the lease

* Don't update pump token if token is empty

* Nullify the owner on the lease in case this host lost it.

* Increment ourLeaseCount when a lease is acquired.

* Correcting task list

* No need to assign pump lease token to downloaded lease.

* comment update

* comment update

* Clear ownership on partial acquisition.

* Clear ownership on partial acquisition.

* Make sure we don't leave the lease as owned if acquisition failed.

* Adding logs to debug lease corruption bug

* Adding logs to debug lease corruption bug

* Small fix at steal lease check

* Protect subject iterator variable during task creation in for loops.

* .

* Renew lease right after ChangeLease call

* Don't create pump if partition expired or already moved to some other host.

* Use refreshed lease while creating partition pump.

* Remove temporary debug logs.

* Addressing SJ's comments

* Remove obsolete
This commit is contained in:
Serkant Karaca 2019-02-11 13:34:24 -08:00 коммит произвёл GitHub
Родитель 1c4c4c7b51
Коммит cec369a9fa
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
15 изменённых файлов: 396 добавлений и 224 удалений

Просмотреть файл

@ -9,6 +9,8 @@ namespace Microsoft.Azure.EventHubs.Processor
class AzureBlobLease : Lease
{
readonly bool isOwned;
// ctor needed for deserialization
internal AzureBlobLease()
{
@ -17,14 +19,23 @@ namespace Microsoft.Azure.EventHubs.Processor
internal AzureBlobLease(string partitionId, CloudBlockBlob blob) : base(partitionId)
{
this.Blob = blob;
}
this.isOwned = blob.Properties.LeaseState == LeaseState.Leased;
}
internal AzureBlobLease(AzureBlobLease source)
internal AzureBlobLease(string partitionId, string owner, CloudBlockBlob blob) : base(partitionId)
{
this.Blob = blob;
this.Owner = owner;
this.isOwned = blob.Properties.LeaseState == LeaseState.Leased;
}
internal AzureBlobLease(AzureBlobLease source)
: base(source)
{
this.Offset = source.Offset;
this.SequenceNumber = source.SequenceNumber;
this.Blob = source.Blob;
this.isOwned = source.isOwned;
}
internal AzureBlobLease(AzureBlobLease source, CloudBlockBlob blob) : base(source)
@ -32,17 +43,16 @@ namespace Microsoft.Azure.EventHubs.Processor
this.Offset = source.Offset;
this.SequenceNumber = source.SequenceNumber;
this.Blob = blob;
this.isOwned = blob.Properties.LeaseState == LeaseState.Leased;
}
// do not serialize
[JsonIgnore]
public CloudBlockBlob Blob { get; }
public override async Task<bool> IsExpired()
public override Task<bool> IsExpired()
{
await this.Blob.FetchAttributesAsync().ConfigureAwait(false); // Get the latest metadata
var currentState = this.Blob.Properties.LeaseState;
return currentState != LeaseState.Leased;
return Task.FromResult(!this.isOwned);
}
}
}

Просмотреть файл

@ -5,6 +5,7 @@ namespace Microsoft.Azure.EventHubs.Processor
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.EventHubs.Primitives;
using Newtonsoft.Json;
@ -13,6 +14,8 @@ namespace Microsoft.Azure.EventHubs.Processor
class AzureStorageCheckpointLeaseManager : ICheckpointManager, ILeaseManager
{
static string MetaDataOwnerName = "OWNINGHOST";
EventProcessorHost host;
TimeSpan leaseDuration;
TimeSpan leaseRenewInterval;
@ -107,7 +110,9 @@ namespace Microsoft.Azure.EventHubs.Processor
public Task<bool> CreateCheckpointStoreIfNotExistsAsync()
{
return CreateLeaseStoreIfNotExistsAsync();
// Because we control the caller, we know that this method will only be called after createLeaseStoreIfNotExists.
// In this implementation, it's the same store, so the store will always exist if execution reaches here.
return Task.FromResult(true);
}
public async Task<Checkpoint> GetCheckpointAsync(string partitionId)
@ -126,19 +131,10 @@ namespace Microsoft.Azure.EventHubs.Processor
return checkpoint;
}
[Obsolete("Use UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint) instead", true)]
public Task UpdateCheckpointAsync(Checkpoint checkpoint)
{
throw new NotImplementedException();
}
public async Task<Checkpoint> CreateCheckpointIfNotExistsAsync(string partitionId)
public Task<Checkpoint> CreateCheckpointIfNotExistsAsync(string partitionId)
{
// Normally the lease will already be created, checkpoint store is initialized after lease store.
AzureBlobLease lease = (AzureBlobLease)await CreateLeaseIfNotExistsAsync(partitionId).ConfigureAwait(false);
Checkpoint checkpoint = new Checkpoint(partitionId, lease.Offset, lease.SequenceNumber);
return checkpoint;
return Task.FromResult<Checkpoint>(null);
}
public async Task UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint)
@ -228,26 +224,45 @@ namespace Microsoft.Azure.EventHubs.Processor
public async Task<Lease> GetLeaseAsync(string partitionId) // throws URISyntaxException, IOException, StorageException
{
AzureBlobLease retval = null;
CloudBlockBlob leaseBlob = GetBlockBlobReference(partitionId);
if (await leaseBlob.ExistsAsync(null, this.operationContext).ConfigureAwait(false))
{
retval = await DownloadLeaseAsync(partitionId, leaseBlob).ConfigureAwait(false);
}
await leaseBlob.FetchAttributesAsync().ConfigureAwait(false);
return retval;
return await DownloadLeaseAsync(partitionId, leaseBlob).ConfigureAwait(false);
}
public IEnumerable<Task<Lease>> GetAllLeases()
public async Task<IEnumerable<Lease>> GetAllLeasesAsync()
{
IEnumerable<string> partitionIds = this.host.PartitionManager.GetPartitionIdsAsync().WaitAndUnwrapException();
var leaseList = new List<Lease>();
BlobContinuationToken continuationToken = null;
foreach (string id in partitionIds)
do
{
yield return GetLeaseAsync(id);
}
var leaseBlobsResult = await this.consumerGroupDirectory.ListBlobsSegmentedAsync(
true,
BlobListingDetails.Metadata,
null,
continuationToken,
null,
this.operationContext);
foreach (CloudBlockBlob leaseBlob in leaseBlobsResult.Results)
{
// Try getting owner name from existing blob.
// This might return null when run on the existing lease after SDK upgrade.
leaseBlob.Metadata.TryGetValue(MetaDataOwnerName, out var owner);
// Discover partition id from URI path of the blob.
var partitionId = leaseBlob.Uri.AbsolutePath.Split('/').Last();
leaseList.Add(new AzureBlobLease(partitionId, owner, leaseBlob));
}
continuationToken = leaseBlobsResult.ContinuationToken;
} while (continuationToken != null);
return leaseList;
}
public async Task<Lease> CreateLeaseIfNotExistsAsync(string partitionId) // throws URISyntaxException, IOException, StorageException
@ -315,6 +330,7 @@ namespace Microsoft.Azure.EventHubs.Processor
string partitionId = lease.PartitionId;
try
{
bool renewLease = false;
string newToken;
await leaseBlob.FetchAttributesAsync(null, null, this.operationContext).ConfigureAwait(false);
if (leaseBlob.Properties.LeaseState == LeaseState.Leased)
@ -332,6 +348,7 @@ namespace Microsoft.Azure.EventHubs.Processor
}
ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, lease.PartitionId, "Need to ChangeLease");
renewLease = true;
newToken = await leaseBlob.ChangeLeaseAsync(
newLeaseId,
AccessCondition.GenerateLeaseCondition(lease.Token),
@ -341,29 +358,38 @@ namespace Microsoft.Azure.EventHubs.Processor
else
{
ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, lease.PartitionId, "Need to AcquireLease");
try
{
newToken = await leaseBlob.AcquireLeaseAsync(leaseDuration, newLeaseId, null, null, this.operationContext).ConfigureAwait(false);
}
catch (StorageException se)
when (se.RequestInformation != null
&& se.RequestInformation.ErrorCode.Equals(BlobErrorCodeStrings.LeaseAlreadyPresent, StringComparison.OrdinalIgnoreCase))
{
// Either some other host grabbed the lease or checkpoint call renewed it.
return false;
}
newToken = await leaseBlob.AcquireLeaseAsync(
leaseDuration,
newLeaseId,
null,
null,
this.operationContext).ConfigureAwait(false);
}
lease.Token = newToken;
lease.Owner = this.host.HostName;
lease.IncrementEpoch(); // Increment epoch each time lease is acquired or stolen by a new host
// Renew lease here if needed?
// ChangeLease doesn't renew so we should avoid lease expiring before next renew interval.
if (renewLease)
{
await this.RenewLeaseCoreAsync(lease).ConfigureAwait(false);
}
await leaseBlob.UploadTextAsync(
JsonConvert.SerializeObject(lease),
null,
AccessCondition.GenerateLeaseCondition(lease.Token),
null,
this.operationContext).ConfigureAwait(false);
// Update owner in the metadata.
lease.Blob.Metadata[MetaDataOwnerName] = lease.Owner;
await lease.Blob.SetMetadataAsync(
AccessCondition.GenerateLeaseCondition(lease.Token),
null,
this.operationContext).ConfigureAwait(false);
}
catch (StorageException se)
{
@ -418,6 +444,14 @@ namespace Microsoft.Azure.EventHubs.Processor
Token = string.Empty,
Owner = string.Empty
};
// Remove owner in the metadata.
leaseBlob.Metadata.Remove(MetaDataOwnerName);
await leaseBlob.SetMetadataAsync(
AccessCondition.GenerateLeaseCondition(leaseId),
null,
this.operationContext);
await leaseBlob.UploadTextAsync(
JsonConvert.SerializeObject(releasedCopy),
null,
@ -478,7 +512,7 @@ namespace Microsoft.Azure.EventHubs.Processor
return true;
}
async Task<AzureBlobLease> DownloadLeaseAsync(string partitionId, CloudBlockBlob blob) // throws StorageException, IOException
async Task<Lease> DownloadLeaseAsync(string partitionId, CloudBlockBlob blob) // throws StorageException, IOException
{
string jsonLease = await blob.DownloadTextAsync().ConfigureAwait(false);
@ -513,15 +547,7 @@ namespace Microsoft.Azure.EventHubs.Processor
CloudBlockBlob GetBlockBlobReference(string partitionId)
{
CloudBlockBlob leaseBlob = this.consumerGroupDirectory.GetBlockBlobReference(partitionId);
// Fixed, keeping workaround commented until full validation.
// GetBlockBlobReference creates a new ServiceClient thus resets options.
// Because of this we lose settings like MaximumExecutionTime on the client.
// Until storage addresses the issue we need to override it here once more.
// Tracking bug: https://github.com/Azure/azure-storage-net/issues/398
// leaseBlob.ServiceClient.DefaultRequestOptions = this.storageClient.DefaultRequestOptions;
return leaseBlob;
return this.consumerGroupDirectory.GetBlockBlobReference(partitionId);
}
}
}

Просмотреть файл

@ -35,6 +35,13 @@ namespace Microsoft.Azure.EventHubs.Processor
lastException = e;
ProcessorEventSource.Log.PartitionPumpWarning(
this.Host.HostName, this.PartitionContext.PartitionId, "Failure creating client or receiver, retrying", e.ToString());
// Don't retry if we already lost the lease.
if (e is ReceiverDisconnectedException)
{
break;
}
retryCount++;
}
}

Просмотреть файл

@ -5,8 +5,10 @@ namespace Microsoft.Azure.EventHubs.Processor
{
internal static class EventProcessorHostActionStrings
{
internal static readonly string DownloadingLeases = "Downloading Leases";
internal static readonly string CheckingLeases = "Checking Leases";
internal static readonly string RenewingLease = "Renewing Lease";
internal static readonly string ReleasingLease = "Releasing Lease";
internal static readonly string StealingLease = "Stealing Lease";
internal static readonly string CreatingLease = "Creating Lease";
internal static readonly string ClosingEventProcessor = "Closing Event Processor";

Просмотреть файл

@ -38,13 +38,6 @@ namespace Microsoft.Azure.EventHubs.Processor
/// <returns>Checkpoint info for the given partition, or null if none has been previously stored.</returns>
Task<Checkpoint> GetCheckpointAsync(string partitionId);
/// <summary>
/// Update the checkpoint in the store with the offset/sequenceNumber in the provided checkpoint.
/// </summary>
/// <param name="checkpoint">offset/sequeceNumber to update the store with.</param>
[System.Obsolete("Use UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint) instead", true)]
Task UpdateCheckpointAsync(Checkpoint checkpoint);
/// <summary>
/// Create the checkpoint for the given partition if it doesn't exist. Do nothing if it does exist.
/// The offset/sequenceNumber for a freshly-created checkpoint should be set to StartOfStream/0.

Просмотреть файл

@ -65,7 +65,7 @@ namespace Microsoft.Azure.EventHubs.Processor
/// A typical implementation could just call GetLeaseAsync() on all partitions.
/// </summary>
/// <returns>list of lease info.</returns>
IEnumerable<Task<Lease>> GetAllLeases();
Task<IEnumerable<Lease>> GetAllLeasesAsync();
/// <summary>
/// Create in the store the lease info for the given partition, if it does not exist. Do nothing if it does exist

Просмотреть файл

@ -45,7 +45,7 @@
<ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="10.0.3" />
<PackageReference Include="WindowsAzure.Storage" Version="9.2.0" />
<PackageReference Include="WindowsAzure.Storage" Version="9.3.3" />
</ItemGroup>
<ItemGroup Condition=" '$(TargetFramework)' == 'net461' ">

Просмотреть файл

@ -92,6 +92,7 @@ namespace Microsoft.Azure.EventHubs.Processor
}
catch (Exception e)
{
// Ideally RunLoop should never throw.
ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, "Exception from partition manager main loop, shutting down", e.ToString());
this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, "N/A", e, EventProcessorHostActionStrings.PartitionManagerMainLoop);
}
@ -124,8 +125,8 @@ namespace Microsoft.Azure.EventHubs.Processor
// Now make sure the leases exist
foreach (string id in await this.GetPartitionIdsAsync().ConfigureAwait(false))
{
await RetryAsync(() => leaseManager.CreateLeaseIfNotExistsAsync(id), id, "Failure creating lease for partition, retrying",
"Out of retries creating lease for partition", EventProcessorHostActionStrings.CreatingLease, 5).ConfigureAwait(false);
await RetryAsync(() => leaseManager.CreateLeaseIfNotExistsAsync(id), id, $"Failure creating lease for partition {id}, retrying",
$"Out of retries creating lease for partition {id}", EventProcessorHostActionStrings.CreatingLease, 5).ConfigureAwait(false);
}
// Make sure the checkpoint store exists
@ -141,8 +142,8 @@ namespace Microsoft.Azure.EventHubs.Processor
// Now make sure the checkpoints exist
foreach (string id in await this.GetPartitionIdsAsync().ConfigureAwait(false))
{
await RetryAsync(() => checkpointManager.CreateCheckpointIfNotExistsAsync(id), id, "Failure creating checkpoint for partition, retrying",
"Out of retries creating checkpoint blob for partition", EventProcessorHostActionStrings.CreatingCheckpoint, 5).ConfigureAwait(false);
await RetryAsync(() => checkpointManager.CreateCheckpointIfNotExistsAsync(id), id, $"Failure creating checkpoint for partition {id}, retrying",
$"Out of retries creating checkpoint for partition {id}", EventProcessorHostActionStrings.CreatingCheckpoint, 5).ConfigureAwait(false);
}
}
@ -201,141 +202,232 @@ namespace Microsoft.Azure.EventHubs.Processor
loopStopwatch.Restart();
ILeaseManager leaseManager = this.host.LeaseManager;
Dictionary<string, Lease> allLeases = new Dictionary<string, Lease>();
var allLeases = new ConcurrentDictionary<string, Lease>();
var leasesOwnedByOthers = new ConcurrentDictionary<string, Lease>();
// Inspect all leases.
// Acquire any expired leases.
// Renew any leases that currently belong to us.
var gettingAllLeases = leaseManager.GetAllLeases();
var leasesOwnedByOthers = new List<Lease>();
IEnumerable<Lease> downloadedLeases;
var renewLeaseTasks = new List<Task>();
int ourLeaseCount = 0;
// First thing is first, renew owned leases.
foreach (Task<Lease> getLeaseTask in gettingAllLeases)
{
try
{
var lease = await getLeaseTask.ConfigureAwait(false);
allLeases[lease.PartitionId] = lease;
if (lease.Owner == this.host.HostName)
{
ourLeaseCount++;
ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, lease.PartitionId, "Trying to renew lease.");
renewLeaseTasks.Add(leaseManager.RenewLeaseAsync(lease).ContinueWith(renewResult =>
{
if (renewResult.IsFaulted || !renewResult.WaitAndUnwrapException())
{
// Might have failed due to intermittent error or lease-lost.
// Just log here, expired leases will be picked by same or another host anyway.
ProcessorEventSource.Log.PartitionPumpError(this.host.HostName, lease.PartitionId, "Failed to renew lease.", renewResult.Exception?.Message);
this.host.EventProcessorOptions.NotifyOfException(
this.host.HostName,
lease.PartitionId,
renewResult.Exception,
EventProcessorHostActionStrings.RenewingLease);
}
}, cancellationToken));
}
else
{
leasesOwnedByOthers.Add(lease);
}
}
catch (Exception e)
{
ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, "Failure during checking lease.", e.ToString());
this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, "N/A", e, EventProcessorHostActionStrings.CheckingLeases);
}
}
// Wait until we are done with renewing our own leases here.
// In theory, this should never throw, error are logged and notified in the renew tasks.
await Task.WhenAll(renewLeaseTasks).ConfigureAwait(false);
ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, "Lease renewal is finished.");
// Check any expired leases that we can grab here.
var checkLeaseTasks = new List<Task>();
foreach (var possibleLease in allLeases.Values.Where(lease => lease.Owner != this.host.HostName))
{
checkLeaseTasks.Add(Task.Run(async () =>
{
try
{
if (await possibleLease.IsExpired().ConfigureAwait(false))
{
ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, possibleLease.PartitionId, "Trying to acquire lease.");
if (await leaseManager.AcquireLeaseAsync(possibleLease).ConfigureAwait(false))
{
ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, possibleLease.PartitionId, "Acquired lease.");
}
}
}
catch (Exception e)
{
ProcessorEventSource.Log.PartitionPumpError(this.host.HostName, possibleLease.PartitionId, "Failure during acquiring lease", e.ToString());
this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, possibleLease.PartitionId, e, EventProcessorHostActionStrings.CheckingLeases);
}
}, cancellationToken));
}
await Task.WhenAll(checkLeaseTasks);
// Grab more leases if available and needed for load balancing
if (leasesOwnedByOthers.Count > 0)
{
Lease stealThisLease = WhichLeaseToSteal(leasesOwnedByOthers, ourLeaseCount);
if (stealThisLease != null)
{
try
{
ProcessorEventSource.Log.PartitionPumpStealLeaseStart(this.host.HostName, stealThisLease.PartitionId);
if (await leaseManager.AcquireLeaseAsync(stealThisLease).ConfigureAwait(false))
{
// Succeeded in stealing lease
ProcessorEventSource.Log.PartitionPumpStealLeaseStop(this.host.HostName, stealThisLease.PartitionId);
}
else
{
ProcessorEventSource.Log.EventProcessorHostWarning(this.host.HostName,
"Failed to steal lease for partition " + stealThisLease.PartitionId, null);
}
}
catch (Exception e)
{
ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName,
"Exception during stealing lease for partition " + stealThisLease.PartitionId, e.ToString());
this.host.EventProcessorOptions.NotifyOfException(this.host.HostName,
stealThisLease.PartitionId, e, EventProcessorHostActionStrings.StealingLease);
}
}
}
// Update pump with new state of leases.
foreach (string partitionId in allLeases.Keys)
{
try
{
Lease updatedLease = allLeases[partitionId];
ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, $"Lease on partition {updatedLease.PartitionId} owned by {updatedLease.Owner}");
if (updatedLease.Owner == this.host.HostName)
{
await this.CheckAndAddPumpAsync(partitionId, updatedLease).ConfigureAwait(false);
}
else
{
await this.RemovePumpAsync(partitionId, CloseReason.LeaseLost).ConfigureAwait(false);
}
}
catch (Exception e)
{
ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, $"Exception during add/remove pump on partition {partitionId}", e.Message);
this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, partitionId, e, EventProcessorHostActionStrings.PartitionPumpManagement);
}
}
try
{
{
try
{
downloadedLeases = await leaseManager.GetAllLeasesAsync();
}
catch (Exception e)
{
ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, "Exception during downloading leases", e.Message);
this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, "N/A", e, EventProcessorHostActionStrings.DownloadingLeases);
// Avoid tight spin if getallleases call keeps failing.
await Task.Delay(1000);
continue;
}
// First things first, renew owned leases.
foreach (var lease in downloadedLeases)
{
var subjectLease = lease;
try
{
allLeases[subjectLease.PartitionId] = subjectLease;
if (subjectLease.Owner == this.host.HostName)
{
ourLeaseCount++;
// Get lease from partition since we need the token at this point.
if (!this.partitionPumps.TryGetValue(subjectLease.PartitionId, out var capturedPump))
{
continue;
}
var capturedLease = capturedPump.Lease;
ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, capturedLease.PartitionId, "Trying to renew lease.");
renewLeaseTasks.Add(leaseManager.RenewLeaseAsync(capturedLease).ContinueWith(renewResult =>
{
if (renewResult.IsFaulted)
{
// Might have failed due to intermittent error or lease-lost.
// Just log here, expired leases will be picked by same or another host anyway.
ProcessorEventSource.Log.PartitionPumpError(
this.host.HostName,
capturedLease.PartitionId,
"Failed to renew lease.",
renewResult.Exception?.Message);
this.host.EventProcessorOptions.NotifyOfException(
this.host.HostName,
capturedLease.PartitionId,
renewResult.Exception,
EventProcessorHostActionStrings.RenewingLease);
// Nullify the owner on the lease in case this host lost it.
// This helps to remove pump earlier reducing duplicate receives.
if (renewResult.Exception?.GetBaseException() is LeaseLostException)
{
allLeases[capturedLease.PartitionId].Owner = null;
}
}
}, cancellationToken));
}
else if (!await subjectLease.IsExpired().ConfigureAwait(false))
{
leasesOwnedByOthers[subjectLease.PartitionId] = subjectLease;
}
}
catch (Exception e)
{
ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, "Failure during checking lease.", e.ToString());
this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, "N/A", e, EventProcessorHostActionStrings.CheckingLeases);
}
}
// Wait until we are done with renewing our own leases here.
// In theory, this should never throw, error are logged and notified in the renew tasks.
await Task.WhenAll(renewLeaseTasks).ConfigureAwait(false);
ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, "Lease renewal is finished.");
// Check any expired leases that we can grab here.
var checkLeaseTasks = new List<Task>();
foreach (var possibleLease in allLeases.Values.Where(lease => lease.Owner != this.host.HostName))
{
var subjectLease = possibleLease;
checkLeaseTasks.Add(Task.Run(async () =>
{
try
{
if (await subjectLease.IsExpired().ConfigureAwait(false))
{
// Get fresh content of lease subject to acquire.
var downloadedLease = await leaseManager.GetLeaseAsync(subjectLease.PartitionId).ConfigureAwait(false);
allLeases[subjectLease.PartitionId] = downloadedLease;
// Check expired once more here incase another host have already leased this since we populated the list.
if (await downloadedLease.IsExpired().ConfigureAwait(false))
{
ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, downloadedLease.PartitionId, "Trying to acquire lease.");
if (await leaseManager.AcquireLeaseAsync(downloadedLease).ConfigureAwait(false))
{
ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, downloadedLease.PartitionId, "Acquired lease.");
leasesOwnedByOthers.TryRemove(downloadedLease.PartitionId, out var removedLease);
Interlocked.Increment(ref ourLeaseCount);
}
else
{
// Acquisition failed. Make sure we don't leave the lease as owned.
allLeases[subjectLease.PartitionId].Owner = null;
ProcessorEventSource.Log.EventProcessorHostWarning(this.host.HostName,
"Failed to acquire lease for partition " + downloadedLease.PartitionId, null);
}
}
}
}
catch (Exception e)
{
ProcessorEventSource.Log.PartitionPumpError(this.host.HostName, subjectLease.PartitionId, "Failure during acquiring lease", e.ToString());
this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, subjectLease.PartitionId, e, EventProcessorHostActionStrings.CheckingLeases);
// Acquisition failed. Make sure we don't leave the lease as owned.
allLeases[subjectLease.PartitionId].Owner = null;
}
}, cancellationToken));
}
await Task.WhenAll(checkLeaseTasks).ConfigureAwait(false);
ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, "Expired lease check is finished.");
// Grab more leases if available and needed for load balancing
if (leasesOwnedByOthers.Count > 0)
{
Lease stealThisLease = WhichLeaseToSteal(leasesOwnedByOthers.Values, ourLeaseCount);
// Don't attempt to steal the lease if current host has a pump for this partition id
// This is possible when current pump is in failed state due to lease moved to some other host.
if (stealThisLease != null && !this.partitionPumps.ContainsKey(stealThisLease.PartitionId))
{
try
{
// Get fresh content of lease subject to acquire.
var downloadedLease = await leaseManager.GetLeaseAsync(stealThisLease.PartitionId).ConfigureAwait(false);
allLeases[stealThisLease.PartitionId] = downloadedLease;
// Don't attempt to steal if lease is already expired.
// Expired leases are picked up by other hosts quickly.
// Don't attempt to steal if owner has changed from the calculation time to refresh time.
if (!await downloadedLease.IsExpired().ConfigureAwait(false)
&& downloadedLease.Owner == stealThisLease.Owner)
{
ProcessorEventSource.Log.PartitionPumpStealLeaseStart(this.host.HostName, downloadedLease.PartitionId);
if (await leaseManager.AcquireLeaseAsync(downloadedLease).ConfigureAwait(false))
{
// Succeeded in stealing lease
ProcessorEventSource.Log.PartitionPumpStealLeaseStop(this.host.HostName, downloadedLease.PartitionId);
ourLeaseCount++;
}
else
{
// Acquisition failed. Make sure we don't leave the lease as owned.
allLeases[stealThisLease.PartitionId].Owner = null;
ProcessorEventSource.Log.EventProcessorHostWarning(this.host.HostName,
"Failed to steal lease for partition " + downloadedLease.PartitionId, null);
}
}
}
catch (Exception e)
{
ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName,
"Exception during stealing lease for partition " + stealThisLease.PartitionId, e.ToString());
this.host.EventProcessorOptions.NotifyOfException(this.host.HostName,
stealThisLease.PartitionId, e, EventProcessorHostActionStrings.StealingLease);
// Acquisition failed. Make sure we don't leave the lease as owned.
allLeases[stealThisLease.PartitionId].Owner = null;
}
}
}
// Update pump with new state of leases on owned partitions in parallel.
var createRemovePumpTasks = new List<Task>();
foreach (string partitionId in allLeases.Keys)
{
var subjectPartitionId = partitionId;
createRemovePumpTasks.Add(Task.Run(async () =>
{
try
{
Lease updatedLease = allLeases[subjectPartitionId];
ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, $"Lease on partition {updatedLease.PartitionId} owned by {updatedLease.Owner}");
if (updatedLease.Owner == this.host.HostName)
{
await this.CheckAndAddPumpAsync(subjectPartitionId, updatedLease).ConfigureAwait(false);
}
else
{
await this.TryRemovePumpAsync(subjectPartitionId, CloseReason.LeaseLost).ConfigureAwait(false);
}
}
catch (Exception e)
{
ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, $"Exception during add/remove pump on partition {subjectPartitionId}", e.Message);
this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, subjectPartitionId, e, EventProcessorHostActionStrings.PartitionPumpManagement);
}
}, cancellationToken));
}
await Task.WhenAll(createRemovePumpTasks).ConfigureAwait(false);
ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, "Pump update is finished.");
// Consider reducing the wait time with last lease-walkthrough's time taken.
var elapsedTime = loopStopwatch.Elapsed;
if (leaseManager.LeaseRenewInterval > elapsedTime)
@ -343,9 +435,17 @@ namespace Microsoft.Azure.EventHubs.Processor
await Task.Delay(leaseManager.LeaseRenewInterval.Subtract(elapsedTime), cancellationToken).ConfigureAwait(false);
}
}
catch (TaskCanceledException)
catch (Exception e)
{
// Bail on the async work if we are canceled.
// TaskCancelledException is expected furing host unregister.
if (e is TaskCanceledException)
{
continue;
}
// Loop should not exit unless signalled via cancellation token. Log any failures and continue.
ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, "Exception from partition manager main loop, continuing", e.Message);
this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, "N/A", e, EventProcessorHostActionStrings.PartitionPumpManagement);
}
}
}
@ -359,31 +459,50 @@ namespace Microsoft.Azure.EventHubs.Processor
if (capturedPump.PumpStatus == PartitionPumpStatus.Errored || capturedPump.IsClosing)
{
// The existing pump is bad. Remove it.
await RemovePumpAsync(partitionId, CloseReason.Shutdown).ConfigureAwait(false);
await TryRemovePumpAsync(partitionId, CloseReason.Shutdown).ConfigureAwait(false);
}
else
{
// Pump is working, just replace the lease.
ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, partitionId, "Updating lease for pump");
capturedPump.SetLease(lease);
// Lease token can show up empty here if lease content download has failed or not recently acquired.
// Don't update the token if so.
if (!string.IsNullOrWhiteSpace(lease.Token))
{
// Pump is working, just replace the lease token.
ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, partitionId, "Updating lease token for pump");
capturedPump.SetLeaseToken(lease.Token);
}
else
{
ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, partitionId, "Skipping to update lease token for pump");
}
}
}
else
{
// No existing pump, create a new one.
await CreateNewPumpAsync(partitionId, lease).ConfigureAwait(false);
await CreateNewPumpAsync(partitionId).ConfigureAwait(false);
}
}
async Task CreateNewPumpAsync(string partitionId, Lease lease)
async Task CreateNewPumpAsync(string partitionId)
{
PartitionPump newPartitionPump = new EventHubPartitionPump(this.host, lease);
// Refresh lease content and do last minute check to reduce partition moves.
var refreshedLease = await this.host.LeaseManager.GetLeaseAsync(partitionId);
if (refreshedLease.Owner != this.host.HostName || await refreshedLease.IsExpired().ConfigureAwait(false))
{
// Partition moved to some other node after lease acquisition.
// Return w/o creating the pump.
ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, partitionId, $"Partition moved to another host or expired after acquisition.");
return;
}
PartitionPump newPartitionPump = new EventHubPartitionPump(this.host, refreshedLease);
await newPartitionPump.OpenAsync().ConfigureAwait(false);
this.partitionPumps.TryAdd(partitionId, newPartitionPump); // do the put after start, if the start fails then put doesn't happen
ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, partitionId, "Created new PartitionPump");
}
async Task RemovePumpAsync(string partitionId, CloseReason reason)
async Task TryRemovePumpAsync(string partitionId, CloseReason reason)
{
PartitionPump capturedPump;
if (this.partitionPumps.TryRemove(partitionId, out capturedPump))
@ -394,12 +513,6 @@ namespace Microsoft.Azure.EventHubs.Processor
}
// else, pump is already closing/closed, don't need to try to shut it down again
}
else
{
// PartitionManager main loop tries to remove pump for every partition that the host does not own, just to be sure.
// Not finding a pump for a partition is normal and expected most of the time.
ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, partitionId, "No pump found to remove for this partition");
}
}
Task RemoveAllPumpsAsync(CloseReason reason)
@ -408,15 +521,22 @@ namespace Microsoft.Azure.EventHubs.Processor
var keys = new List<string>(this.partitionPumps.Keys);
foreach (string partitionId in keys)
{
tasks.Add(this.RemovePumpAsync(partitionId, reason));
tasks.Add(this.TryRemovePumpAsync(partitionId, reason));
}
return Task.WhenAll(tasks);
}
Lease WhichLeaseToSteal(List<Lease> stealableLeases, int haveLeaseCount)
Lease WhichLeaseToSteal(IEnumerable<Lease> stealableLeases, int haveLeaseCount)
{
IDictionary<string, int> countsByOwner = CountLeasesByOwner(stealableLeases);
// Consider all leases might be already released where we won't have any entry in the return counts map.
if (countsByOwner.Count == 0)
{
return null;
}
var biggestOwner = countsByOwner.OrderByDescending(o => o.Value).First();
Lease stealThisLease = null;
@ -450,7 +570,7 @@ namespace Microsoft.Azure.EventHubs.Processor
Dictionary<string, int> CountLeasesByOwner(IEnumerable<Lease> leases)
{
var counts = leases.GroupBy(lease => lease.Owner).Select(group => new
var counts = leases.Where(lease => lease.Owner != null).GroupBy(lease => lease.Owner).Select(group => new
{
Owner = group.Key,
Count = group.Count()

Просмотреть файл

@ -23,7 +23,7 @@ namespace Microsoft.Azure.EventHubs.Processor
protected EventProcessorHost Host { get; }
protected Lease Lease { get; }
protected internal Lease Lease { get; }
protected IEventProcessor Processor { get; private set; }
@ -31,9 +31,9 @@ namespace Microsoft.Azure.EventHubs.Processor
protected AsyncLock ProcessingAsyncLock { get; }
internal void SetLease(Lease newLease)
internal void SetLeaseToken(string newToken)
{
this.PartitionContext.Lease = newLease;
this.PartitionContext.Lease.Token = newToken;
}
public async Task OpenAsync()
@ -124,12 +124,15 @@ namespace Microsoft.Azure.EventHubs.Processor
if (reason != CloseReason.LeaseLost)
{
// Since this pump is dead, release the lease.
// Ignore LeaseLostException
try
{
await this.Host.LeaseManager.ReleaseLeaseAsync(this.PartitionContext.Lease).ConfigureAwait(false);
}
catch (LeaseLostException) { }
catch (Exception e)
{
// Log and ignore any failure since expired lease will be picked by another host.
this.Host.EventProcessorOptions.NotifyOfException(this.Host.HostName, this.PartitionContext.PartitionId, e, EventProcessorHostActionStrings.ReleasingLease);
}
}
this.PumpStatus = PartitionPumpStatus.Closed;

Просмотреть файл

@ -312,6 +312,12 @@ namespace Microsoft.Azure.EventHubs.Amqp
}
catch { }
// ReceiverDisconnectedException is a special case where we know we cannot recover the pump.
if (e is ReceiverDisconnectedException)
{
break;
}
continue;
}

Просмотреть файл

@ -68,7 +68,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Amqp" Version="2.3.5" />
<PackageReference Include="Microsoft.Azure.Amqp" Version="2.3.7" />
<PackageReference Include="Microsoft.Azure.Services.AppAuthentication" Version="1.0.3" />
<PackageReference Include="Microsoft.IdentityModel.Clients.ActiveDirectory" Version="3.19.8" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="4.5.0" />

Просмотреть файл

@ -137,7 +137,7 @@ namespace Microsoft.Azure.EventHubs.Tests.Client
// We will send a thousand messages where each message is 1K.
var totalSent = 0;
var rnd = new Random();
TestUtility.Log($"Starting to send.");
TestUtility.Log("Starting to send.");
do
{
// Send random body size.

Просмотреть файл

@ -150,7 +150,7 @@ namespace Microsoft.Azure.EventHubs.Tests.Client
foreach (var invalidPartitionId in invalidPartitions)
{
await Assert.ThrowsAsync<InvalidOperationException>(async () =>
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(async () =>
{
TestUtility.Log($"Getting partition information from invalid partition {invalidPartitionId}");
await this.EventHubClient.GetPartitionRuntimeInformationAsync(invalidPartitionId);

Просмотреть файл

@ -152,7 +152,9 @@ namespace Microsoft.Azure.EventHubs.Tests.Processor
// Prepare host trackers.
var hostReceiveEvents = new ConcurrentDictionary<string, AsyncAutoResetEvent>();
var containerName = Guid.NewGuid().ToString();
var hosts = new List<EventProcessorHost>();
try
{
for (int hostId = 0; hostId < hostCount; hostId++)
@ -163,11 +165,11 @@ namespace Microsoft.Azure.EventHubs.Tests.Processor
TestUtility.Log("Creating EventProcessorHost");
var eventProcessorHost = new EventProcessorHost(
thisHostName,
string.Empty, // Passing empty as entity path here rsince path is already in EH connection string.
string.Empty, // Passing empty as entity path here since path is already in EH connection string.
PartitionReceiver.DefaultConsumerGroupName,
TestUtility.EventHubsConnectionString,
TestUtility.StorageConnectionString,
Guid.NewGuid().ToString());
containerName);
hosts.Add(eventProcessorHost);
TestUtility.Log($"Calling RegisterEventProcessorAsync");
var processorOptions = new EventProcessorOptions
@ -673,7 +675,9 @@ namespace Microsoft.Azure.EventHubs.Tests.Processor
};
};
await eventProcessorHost.RegisterEventProcessorFactoryAsync(processorFactory);
var epo = EventProcessorOptions.DefaultOptions;
epo.ReceiveTimeout = TimeSpan.FromSeconds(10);
await eventProcessorHost.RegisterEventProcessorFactoryAsync(processorFactory, epo);
// Wait 15 seconds then create a new epoch receiver.
// This will trigger ReceiverDisconnectedExcetion in the host.
@ -685,7 +689,7 @@ namespace Microsoft.Azure.EventHubs.Tests.Processor
targetPartition, EventPosition.FromStart(), 2);
await externalReceiver.ReceiveAsync(100, TimeSpan.FromSeconds(5));
// Give another 1 minute for host to recover then do the validatins.
// Give another 1 minute for host to recover then do the validations.
await Task.Delay(60000);
TestUtility.Log("Verifying that host was able to receive ReceiverDisconnectedException");

Просмотреть файл

@ -38,6 +38,7 @@ namespace Microsoft.Azure.EventHubs.Tests
// Update operation timeout on ConnectionStringBuilder.
ehCsb.OperationTimeout = TimeSpan.FromSeconds(30);
EventHubsConnectionString = ehCsb.ToString();
}
@ -62,7 +63,7 @@ namespace Microsoft.Azure.EventHubs.Tests
internal static async Task SendToPartitionAsync(EventHubClient ehClient, string partitionId, EventData eventData, int numberOfMessages = 1)
{
TestUtility.Log($"Starting to send {numberOfMessages} to partition {partitionId}.");
TestUtility.Log($"Starting to send {numberOfMessages} messages to partition {partitionId}.");
var partitionSender = ehClient.CreatePartitionSender(partitionId);
for (int i = 0; i < numberOfMessages; i++)