improve tracing of Azure Storage accesses

This commit is contained in:
Sebastian Burckhardt 2021-01-21 16:34:33 -08:00
Родитель 5075e9dd10
Коммит beb0a6cf6b
7 изменённых файлов: 104 добавлений и 44 удалений

Просмотреть файл

@ -95,7 +95,7 @@ namespace DurableTask.Netherite
: CloudStorageAccount.Parse(this.Settings.StorageConnectionString).Credentials.AccountName;
EtwSource.Log.OrchestrationServiceCreated(this.ServiceInstanceId, this.StorageAccountName, this.Settings.HubName, this.Settings.WorkerId, TraceUtils.ExtensionVersion);
this.Logger.LogInformation("NetheriteOrchestrationService created, workerId={workerId}, transport={transport}, storage={storage}", this.Settings.WorkerId, this.configuredTransport, this.configuredStorage);
this.Logger.LogInformation("NetheriteOrchestrationService created, workerId={workerId}, processorCount={processorCount}, transport={transport}, storage={storage}", Environment.ProcessorCount, this.Settings.WorkerId, this.configuredTransport, this.configuredStorage);
switch (this.configuredTransport)
{

Просмотреть файл

@ -84,6 +84,7 @@ namespace DurableTask.Netherite.Faster
BlobManager.AsynchronousStorageReadMaxConcurrency,
true,
"PageBlobDirectory.ListBlobsSegmentedAsync",
"RecoverDevice",
$"continuationToken={continuationToken}",
this.pageBlobDirectory.Prefix,
2000,
@ -96,6 +97,7 @@ namespace DurableTask.Netherite.Faster
currentToken: continuationToken,
options: BlobManager.BlobRequestOptionsWithRetry,
operationContext: null);
return response.Results.Count(); // not accurate, in terms of bytes, but still useful for tracing purposes
});
foreach (IListBlobItem item in response.Results)
@ -178,13 +180,15 @@ namespace DurableTask.Netherite.Faster
null,
this.underLease,
"CloudPageBlob.DeleteAsync",
"(RemoveSegment)",
"DeleteDeviceSegment",
"",
pageBlob.Name,
5000,
true,
async (numAttempts) =>
{
await pageBlob.DeleteAsync(cancellationToken: this.PartitionErrorHandler.Token);
return 1;
});
deletionTask.ContinueWith((Task t) => callback(result));
@ -204,11 +208,16 @@ namespace DurableTask.Netherite.Faster
BlobManager.AsynchronousStorageWriteMaxConcurrency,
this.underLease,
"CloudPageBlob.DeleteAsync",
"(DeleteAsync)",
"DeleteDevice",
"",
pageBlob.Name,
5000,
false,
(numAttempts) => pageBlob.DeleteAsync(cancellationToken: this.PartitionErrorHandler.Token));
async (numAttempts) =>
{
await pageBlob.DeleteAsync(cancellationToken: this.PartitionErrorHandler.Token);
return 1;
});
}
return Task.WhenAll(this.blobs.Values.Select(Delete).ToList());
@ -289,6 +298,7 @@ namespace DurableTask.Netherite.Faster
BlobManager.AsynchronousStorageWriteMaxConcurrency,
true,
"CloudPageBlob.WritePagesAsync",
"WriteToDevice",
$"length={length} destinationAddress={destinationAddress + offset}",
blob.Name,
1000 + (int)length / 1000,
@ -308,6 +318,8 @@ namespace DurableTask.Netherite.Faster
contentChecksum: null, accessCondition: null, options: blobRequestOptions, operationContext: null, cancellationToken: this.PartitionErrorHandler.Token)
.ConfigureAwait(false);
}
return (long) length;
});
}
}
@ -325,6 +337,7 @@ namespace DurableTask.Netherite.Faster
BlobManager.AsynchronousStorageReadMaxConcurrency,
true,
"CloudPageBlob.DownloadRangeToStreamAsync",
"ReadFromDevice",
$"readLength={readLength} sourceAddress={sourceAddress}",
blob.Name,
1000 + (int) readLength / 1000,
@ -350,6 +363,8 @@ namespace DurableTask.Netherite.Faster
{
throw new InvalidDataException($"wrong amount of data received from page blob, expected={readLength}, actual={stream.Position}");
}
return readLength;
});
}
}

Просмотреть файл

@ -67,18 +67,21 @@ namespace DurableTask.Netherite.Faster
BlobManager.AsynchronousStorageReadMaxConcurrency,
true,
"CloudPageBlob.CreateAsync",
"CreateDevice",
"",
pageBlob.Name,
3000,
true,
(numAttempts) =>
async (numAttempts) =>
{
return pageBlob.CreateAsync(
await pageBlob.CreateAsync(
size,
accessCondition: null,
options: BlobManager.BlobRequestOptionsDefault,
operationContext: null,
this.azureStorageDevice.PartitionErrorHandler.Token);
return 1;
});
// At this point the blob is fully created. After this line all consequent writers will write immediately. We just

Просмотреть файл

@ -425,7 +425,8 @@ namespace DurableTask.Netherite.Faster
null,
false,
"CloudBlockBlob.UploadFromByteArrayAsync",
"(Create commit blob)",
"CreateCommitLog",
"",
this.eventLogCommitBlob.Name,
2000,
true,
@ -440,6 +441,8 @@ namespace DurableTask.Netherite.Faster
// creation race, try from top
this.TraceHelper.LeaseProgress("Creation race observed, retrying");
}
return 1;
});
continue;
@ -460,7 +463,7 @@ namespace DurableTask.Netherite.Faster
else
{
TimeSpan nextRetryIn = BlobManager.GetDelayBetweenRetries(numAttempts);
this.TraceHelper.FasterPerfWarning($"Lease acquisition experienced transient error, retrying in {nextRetryIn}");
this.TraceHelper.FasterPerfWarning($"Lease acquisition failed transiently, retrying in {nextRetryIn}");
await Task.Delay(nextRetryIn);
}
continue;
@ -489,6 +492,11 @@ namespace DurableTask.Netherite.Faster
await this.eventLogCommitBlob.RenewLeaseAsync(acc, this.PartitionErrorHandler.Token)
.ConfigureAwait(false);
this.TraceHelper.LeaseProgress($"Renewed lease at {this.leaseTimer.Elapsed.TotalSeconds - this.LeaseDuration.TotalSeconds}s");
if (nextLeaseTimer.ElapsedMilliseconds > 2000)
{
this.TraceHelper.FasterPerfWarning($"RenewLeaseAsync took {nextLeaseTimer.Elapsed.TotalSeconds:F1}s, which is excessive; {this.leaseTimer.Elapsed.TotalSeconds - this.LeaseDuration.TotalSeconds}s past expiry");
}
}
this.leaseTimer = nextLeaseTimer;
@ -619,7 +627,8 @@ namespace DurableTask.Netherite.Faster
this.PerformWithRetries(
false,
"CloudBlockBlob.UploadFromByteArray",
"(commit log)",
"WriteCommitLogMetadata",
"",
this.eventLogCommitBlob.Name,
1000,
true,
@ -629,7 +638,7 @@ namespace DurableTask.Netherite.Faster
{
var blobRequestOptions = numAttempts > 2 ? BlobManager.BlobRequestOptionsDefault : BlobManager.BlobRequestOptionsAggressiveTimeout;
this.eventLogCommitBlob.UploadFromByteArray(commitMetadata, 0, commitMetadata.Length, acc, blobRequestOptions);
return true;
return (commitMetadata.Length, true);
}
catch (StorageException ex) when (BlobUtils.LeaseConflict(ex))
{
@ -645,7 +654,7 @@ namespace DurableTask.Netherite.Faster
this.TraceHelper.LeaseProgress("ILogCommitManager.Commit: wait for next renewal");
this.NextLeaseRenewalTask.Wait();
this.TraceHelper.LeaseProgress("ILogCommitManager.Commit: renewal complete");
return false;
return (commitMetadata.Length, false);
}
});
}
@ -666,7 +675,8 @@ namespace DurableTask.Netherite.Faster
this.PerformWithRetries(
false,
"CloudBlockBlob.DownloadToStream",
"(read commit log)",
"ReadCommitLogMetadata",
"",
this.eventLogCommitBlob.Name,
1000,
true,
@ -681,7 +691,7 @@ namespace DurableTask.Netherite.Faster
{
var blobRequestOptions = numAttempts > 2 ? BlobManager.BlobRequestOptionsDefault : BlobManager.BlobRequestOptionsAggressiveTimeout;
this.eventLogCommitBlob.DownloadToStream(stream, acc, blobRequestOptions);
return true;
return (stream.Position, true);
}
catch (StorageException ex) when (BlobUtils.LeaseConflict(ex))
{
@ -697,7 +707,7 @@ namespace DurableTask.Netherite.Faster
this.TraceHelper.LeaseProgress("ILogCommitManager.Commit: wait for next renewal");
this.NextLeaseRenewalTask.Wait();
this.TraceHelper.LeaseProgress("ILogCommitManager.Commit: renewal complete");
return false;
return (0, false);
}
});
@ -811,7 +821,8 @@ namespace DurableTask.Netherite.Faster
this.PerformWithRetries(
false,
"CloudBlockBlob.OpenWrite",
"(CommitIndexCheckpoint)",
"WriteIndexCheckpointMetadata",
$"token={indexToken} size={commitMetadata.Length}",
metaFileBlob.Name,
1000,
true,
@ -823,7 +834,7 @@ namespace DurableTask.Netherite.Faster
writer.Write(commitMetadata.Length);
writer.Write(commitMetadata);
writer.Flush();
return true;
return (commitMetadata.Length, true);
}
});
@ -841,7 +852,8 @@ namespace DurableTask.Netherite.Faster
this.PerformWithRetries(
false,
"CloudBlockBlob.OpenWrite",
"(CommitLogCheckpoint)",
"WriteHybridLogCheckpointMetadata",
$"token={logToken}",
metaFileBlob.Name,
1000,
true,
@ -853,7 +865,7 @@ namespace DurableTask.Netherite.Faster
writer.Write(commitMetadata.Length);
writer.Write(commitMetadata);
writer.Flush();
return true;
return (commitMetadata.Length + 4, true);
}
});
@ -872,7 +884,8 @@ namespace DurableTask.Netherite.Faster
this.PerformWithRetries(
false,
"CloudBlockBlob.OpenRead",
"(GetIndexCheckpointMetadata)",
"ReadIndexCheckpointMetadata",
"",
metaFileBlob.Name,
1000,
true,
@ -882,7 +895,7 @@ namespace DurableTask.Netherite.Faster
using var reader = new BinaryReader(blobstream);
var len = reader.ReadInt32();
result = reader.ReadBytes(len);
return true;
return (len + 4, true);
});
this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.GetIndexCommitMetadata Returned {result?.Length ?? null} bytes from {tag}, target={metaFileBlob.Name}");
@ -899,7 +912,8 @@ namespace DurableTask.Netherite.Faster
this.PerformWithRetries(
false,
"CloudBlockBlob.OpenRead",
"(GetLogCheckpointMetadata)",
"ReadLogCheckpointMetadata",
"",
metaFileBlob.Name,
1000,
true,
@ -909,7 +923,7 @@ namespace DurableTask.Netherite.Faster
using var reader = new BinaryReader(blobstream);
var len = reader.ReadInt32();
result = reader.ReadBytes(len);
return true;
return (len + 4, true);
});
this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.GetIndexCommitMetadata Returned {result?.Length ?? null} bytes from {tag}, target={metaFileBlob.Name}");
@ -971,11 +985,16 @@ namespace DurableTask.Netherite.Faster
BlobManager.AsynchronousStorageWriteMaxConcurrency,
true,
"CloudBlockBlob.UploadTextAsync",
"(finalize checkpoint)",
"WriteCheckpointMetadata",
"",
checkpointCompletedBlob.Name,
1000,
true,
(numAttempts) => checkpointCompletedBlob.UploadTextAsync(text));
async (numAttempts) =>
{
await checkpointCompletedBlob.UploadTextAsync(text);
return text.Length;
});
}
// Primary FKV

Просмотреть файл

@ -18,11 +18,12 @@ namespace DurableTask.Netherite.Faster
SemaphoreSlim semaphore,
bool requireLease,
string name,
string description,
string intent,
string data,
string target,
int expectedLatencyBound,
bool isCritical,
Func<int, Task> operation)
Func<int, Task<long>> operation)
{
try
{
@ -44,19 +45,21 @@ namespace DurableTask.Netherite.Faster
await this.ConfirmLeaseIsGoodForAWhileAsync().ConfigureAwait(false);
}
this.StorageTracer?.FasterStorageProgress($"starting {name} target={target} numAttempts={numAttempts} {description}");
this.StorageTracer?.FasterStorageProgress($"starting {name} ({intent}) target={target} numAttempts={numAttempts} {data}");
stopwatch.Restart();
await operation(numAttempts).ConfigureAwait(false);
long size = await operation(numAttempts).ConfigureAwait(false);
stopwatch.Stop();
this.StorageTracer?.FasterStorageProgress($"finished {name} target={target} latencyMs={stopwatch.Elapsed.TotalMilliseconds:F1} {description} ");
this.StorageTracer?.FasterStorageProgress($"finished {name} ({intent}) target={target} latencyMs={stopwatch.Elapsed.TotalMilliseconds:F1} {data} ");
if (stopwatch.ElapsedMilliseconds > expectedLatencyBound)
{
this.TraceHelper.FasterPerfWarning($"{name} took {stopwatch.Elapsed.TotalSeconds:F1}s, which is excessive; {description}");
this.TraceHelper.FasterPerfWarning($"{name} ({intent}) attempt {numAttempts} took {stopwatch.Elapsed.TotalSeconds:F1}s, which is excessive; {data}");
}
this.TraceHelper.FasterAzureStorageAccessCompleted(intent, size, name, target, stopwatch.Elapsed.TotalMilliseconds, numAttempts);
return;
}
catch (StorageException e) when (BlobUtils.IsTransientStorageError(e, this.PartitionErrorHandler.Token) && numAttempts < BlobManager.MaxRetries)
@ -64,19 +67,19 @@ namespace DurableTask.Netherite.Faster
stopwatch.Stop();
if (BlobUtils.IsTimeout(e))
{
this.TraceHelper.FasterPerfWarning($"{name} timed out after {stopwatch.ElapsedMilliseconds:F1}ms, retrying now; target={target} {description}");
this.TraceHelper.FasterPerfWarning($"{name} ({intent}) attempt {numAttempts} timed out after {stopwatch.Elapsed.TotalSeconds:F1}s, retrying now; target={target} {data}");
}
else
{
TimeSpan nextRetryIn = BlobManager.GetDelayBetweenRetries(numAttempts);
this.HandleStorageError(name, $"storage operation {name} attempt {numAttempts} failed, retry in {nextRetryIn}s", target, e, false, true);
this.HandleStorageError(name, $"storage operation {name} ({intent}) attempt {numAttempts} failed transiently, retry in {nextRetryIn}s", target, e, false, true);
await Task.Delay(nextRetryIn);
}
continue;
}
catch (Exception exception) when (!Utils.IsFatal(exception))
{
this.HandleStorageError(name, $"storage operation {name} failed", target, exception, isCritical, this.PartitionErrorHandler.IsTerminated);
this.HandleStorageError(name, $"storage operation {name} ({intent}) failed", target, exception, isCritical, this.PartitionErrorHandler.IsTerminated);
throw;
}
}
@ -93,11 +96,12 @@ namespace DurableTask.Netherite.Faster
public void PerformWithRetries(
bool requireLease,
string name,
string description,
string intent,
string data,
string target,
int expectedLatencyBound,
bool isCritical,
Func<int,bool> operation)
Func<int,(long,bool)> operation)
{
Stopwatch stopwatch = new Stopwatch();
int numAttempts = 0;
@ -114,10 +118,10 @@ namespace DurableTask.Netherite.Faster
this.PartitionErrorHandler.Token.ThrowIfCancellationRequested();
this.StorageTracer?.FasterStorageProgress($"starting {name} target={target} numAttempts={numAttempts} {description}");
this.StorageTracer?.FasterStorageProgress($"starting {name} ({intent}) target={target} numAttempts={numAttempts} {data}");
stopwatch.Restart();
bool completed = operation(numAttempts);
(long size, bool completed) = operation(numAttempts);
if (!completed)
{
@ -125,11 +129,13 @@ namespace DurableTask.Netherite.Faster
}
stopwatch.Stop();
this.StorageTracer?.FasterStorageProgress($"finished {name} target={target} latencyMs={stopwatch.Elapsed.TotalMilliseconds:F1} {description} ");
this.StorageTracer?.FasterStorageProgress($"finished {name} ({intent}) target={target} latencyMs={stopwatch.Elapsed.TotalMilliseconds:F1} size={size} {data} ");
this.TraceHelper.FasterAzureStorageAccessCompleted(intent, size, name, target, stopwatch.Elapsed.TotalMilliseconds, numAttempts);
if (stopwatch.ElapsedMilliseconds > expectedLatencyBound)
{
this.TraceHelper.FasterPerfWarning($"{name} took {stopwatch.Elapsed.TotalSeconds:F1}s, which is excessive; {description}");
this.TraceHelper.FasterPerfWarning($"{name} ({intent}) attempt {numAttempts} took {stopwatch.Elapsed.TotalSeconds:F1}s, which is excessive; {data}");
}
return;
@ -140,19 +146,19 @@ namespace DurableTask.Netherite.Faster
stopwatch.Stop();
if (BlobUtils.IsTimeout(e))
{
this.TraceHelper.FasterPerfWarning($"{name} timed out after {stopwatch.ElapsedMilliseconds:F1}ms, retrying now; target={target} {description}");
this.TraceHelper.FasterPerfWarning($"{name} ({intent}) attempt {numAttempts} timed out after {stopwatch.Elapsed.TotalSeconds:F1}s, retrying now; target={target} {data}");
}
else
{
TimeSpan nextRetryIn = BlobManager.GetDelayBetweenRetries(numAttempts);
this.HandleStorageError(name, $"storage operation {name} attempt {numAttempts} failed, retry in {nextRetryIn}s", target, e, false, true);
this.HandleStorageError(name, $"storage operation {name} ({intent}) attempt {numAttempts} failed transiently, retry in {nextRetryIn}s", target, e, false, true);
Thread.Sleep(nextRetryIn);
}
continue;
}
catch (Exception exception) when (!Utils.IsFatal(exception))
{
this.HandleStorageError(name, $"storage operation {name} failed", target, exception, isCritical, this.PartitionErrorHandler.IsTerminated);
this.HandleStorageError(name, $"storage operation {name} ({intent}) failed", target, exception, isCritical, this.PartitionErrorHandler.IsTerminated);
throw;
}
}

Просмотреть файл

@ -132,6 +132,16 @@ namespace DurableTask.Netherite.Faster
}
}
public void FasterAzureStorageAccessCompleted(string intent, long size, string operation, string target, double latency, int attempt)
{
if (this.logLevelLimit <= LogLevel.Debug)
{
this.logger.LogDebug("Part{partition:D2} storage access completed intent={intent} size={size} operation={operation} target={target} latency={latency} attempt={attempt}",
this.partitionId, intent, size, operation, target, latency, attempt);
this.etwLogTrace?.FasterAzureStorageAccessCompleted(this.account, this.taskHub, this.partitionId, intent, size, operation, target, latency, attempt, TraceUtils.ExtensionVersion);
}
}
// ----- lease management events
public void LeaseAcquired()

Просмотреть файл

@ -327,11 +327,18 @@ namespace DurableTask.Netherite
this.WriteEvent(264, Account, TaskHub, PartitionId, Details, ExtensionVersion);
}
[Event(265, Level = EventLevel.Warning, Version = 1)]
[Event(265, Level = EventLevel.Verbose, Version = 1)]
public void FasterAzureStorageAccessCompleted(string Account, string TaskHub, int PartitionId, string Intent, long Size, string Operation, string Target, double Latency, int Attempt, string ExtensionVersion)
{
SetCurrentThreadActivityId(serviceInstanceId);
this.WriteEvent(265, Account, TaskHub, PartitionId, Intent, Size, Operation, Target, Latency, Attempt, ExtensionVersion);
}
[Event(266, Level = EventLevel.Warning, Version = 1)]
public void FasterPerfWarning(string Account, string TaskHub, int PartitionId, string Details, string ExtensionVersion)
{
SetCurrentThreadActivityId(serviceInstanceId);
this.WriteEvent(265, Account, TaskHub, PartitionId, Details, ExtensionVersion);
this.WriteEvent(266, Account, TaskHub, PartitionId, Details, ExtensionVersion);
}