Add new logs to recovery path (#337)
* add simple log * add detail column to FasterAzureStorageAccessCompleted * refactor read range log * pass just the position parameter instead of printing full details for FasterStorageAccessCompleted * put position and id information into details --------- Co-authored-by: sebastianburckhardt <sburckha@microsoft.com>
This commit is contained in:
Родитель
66762a381d
Коммит
0abc16d3e8
|
@ -295,7 +295,7 @@ namespace DurableTask.Netherite.Faster
|
|||
this.underLease,
|
||||
"BlobBaseClient.DeleteAsync",
|
||||
"DeleteDeviceSegment",
|
||||
"",
|
||||
$"id={id}",
|
||||
entry.PageBlob.Default.Name,
|
||||
5000,
|
||||
true,
|
||||
|
@ -448,13 +448,14 @@ namespace DurableTask.Netherite.Faster
|
|||
{
|
||||
using (stream)
|
||||
{
|
||||
var position = destinationAddress + offset;
|
||||
long originalStreamPosition = stream.Position;
|
||||
await this.BlobManager.PerformWithRetriesAsync(
|
||||
BlobManager.AsynchronousStorageWriteMaxConcurrency,
|
||||
true,
|
||||
"PageBlobClient.UploadPagesAsync",
|
||||
"WriteToDevice",
|
||||
$"id={id} length={length} destinationAddress={destinationAddress + offset}",
|
||||
$"id={id} position={position} length={length}",
|
||||
blobEntry.PageBlob.Default.Name,
|
||||
1000 + (int)length / 1000,
|
||||
true,
|
||||
|
@ -498,11 +499,15 @@ namespace DurableTask.Netherite.Faster
|
|||
|
||||
async Task ReadFromBlobAsync(UnmanagedMemoryStream stream, BlobUtilsV12.PageBlobClients blob, long sourceAddress, uint readLength, long id)
|
||||
{
|
||||
long readRangeStart = sourceAddress;
|
||||
long readRangeEnd = readRangeStart + readLength;
|
||||
string operationReadRange = $"[{readRangeStart}, {readRangeEnd}]";
|
||||
using (stream)
|
||||
{
|
||||
long offset = 0;
|
||||
while (readLength > 0)
|
||||
{
|
||||
var position = sourceAddress + offset;
|
||||
var length = Math.Min(readLength, MAX_DOWNLOAD_SIZE);
|
||||
|
||||
await this.BlobManager.PerformWithRetriesAsync(
|
||||
|
@ -510,7 +515,7 @@ namespace DurableTask.Netherite.Faster
|
|||
true,
|
||||
"PageBlobClient.DownloadStreamingAsync",
|
||||
"ReadFromDevice",
|
||||
$"id={id} readLength={length} sourceAddress={sourceAddress + offset}",
|
||||
$"id={id} position={position} length={length} operationReadRange={operationReadRange}",
|
||||
blob.Default.Name,
|
||||
1000 + (int)length / 1000,
|
||||
true,
|
||||
|
|
|
@ -1116,23 +1116,23 @@ namespace DurableTask.Netherite.Faster
|
|||
var metaFileBlob = partDir.GetBlockBlobClient(this.GetIndexCheckpointMetaBlobName(indexToken));
|
||||
|
||||
this.PerformWithRetries(
|
||||
false,
|
||||
"BlockBlobClient.OpenWrite",
|
||||
"WriteIndexCheckpointMetadata",
|
||||
$"token={indexToken} size={commitMetadata.Length}",
|
||||
metaFileBlob.Name,
|
||||
1000,
|
||||
true,
|
||||
(numAttempts) =>
|
||||
{
|
||||
var client = metaFileBlob.WithRetries;
|
||||
using var blobStream = client.OpenWrite(overwrite: true);
|
||||
using var writer = new BinaryWriter(blobStream);
|
||||
writer.Write(commitMetadata.Length);
|
||||
writer.Write(commitMetadata);
|
||||
writer.Flush();
|
||||
return (commitMetadata.Length, true);
|
||||
});
|
||||
false,
|
||||
"BlockBlobClient.OpenWrite",
|
||||
"WriteIndexCheckpointMetadata",
|
||||
$"token={indexToken} size={commitMetadata.Length}",
|
||||
metaFileBlob.Name,
|
||||
1000,
|
||||
true,
|
||||
(numAttempts) =>
|
||||
{
|
||||
var client = metaFileBlob.WithRetries;
|
||||
using var blobStream = client.OpenWrite(overwrite: true);
|
||||
using var writer = new BinaryWriter(blobStream);
|
||||
writer.Write(commitMetadata.Length);
|
||||
writer.Write(commitMetadata);
|
||||
writer.Flush();
|
||||
return (commitMetadata.Length, true);
|
||||
});
|
||||
|
||||
this.CheckpointInfo.IndexToken = indexToken;
|
||||
this.StorageTracer?.FasterStorageProgress($"StorageOpReturned ICheckpointManager.CommitIndexCheckpoint, target={metaFileBlob.Name}");
|
||||
|
|
|
@ -21,7 +21,7 @@ namespace DurableTask.Netherite.Faster
|
|||
bool requireLease,
|
||||
string name,
|
||||
string intent,
|
||||
string data,
|
||||
string details,
|
||||
string target,
|
||||
int expectedLatencyBound,
|
||||
bool isCritical,
|
||||
|
@ -59,7 +59,7 @@ namespace DurableTask.Netherite.Faster
|
|||
|
||||
this.PartitionErrorHandler.Token.ThrowIfCancellationRequested();
|
||||
|
||||
this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) started attempt {numAttempts}; target={target} {data}");
|
||||
this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) started attempt {numAttempts}; target={target} {details}");
|
||||
|
||||
stopwatch.Restart();
|
||||
|
||||
|
@ -68,14 +68,14 @@ namespace DurableTask.Netherite.Faster
|
|||
long size = await operationAsync(numAttempts).ConfigureAwait(false);
|
||||
|
||||
stopwatch.Stop();
|
||||
this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) succeeded on attempt {numAttempts}; target={target} latencyMs={stopwatch.Elapsed.TotalMilliseconds:F1} {data}");
|
||||
this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) succeeded on attempt {numAttempts}; target={target} latencyMs={stopwatch.Elapsed.TotalMilliseconds:F1} {details}");
|
||||
|
||||
if (stopwatch.ElapsedMilliseconds > expectedLatencyBound)
|
||||
{
|
||||
this.TraceHelper.FasterPerfWarning($"storage operation {name} ({intent}) took {stopwatch.Elapsed.TotalSeconds:F1}s on attempt {numAttempts}, which is excessive; {data}");
|
||||
this.TraceHelper.FasterPerfWarning($"storage operation {name} ({intent}) took {stopwatch.Elapsed.TotalSeconds:F1}s on attempt {numAttempts}, which is excessive; {details}");
|
||||
}
|
||||
|
||||
this.TraceHelper.FasterAzureStorageAccessCompleted(intent, size, name, target, stopwatch.Elapsed.TotalMilliseconds, numAttempts);
|
||||
this.TraceHelper.FasterAzureStorageAccessCompleted(intent, size, name, details, target, stopwatch.Elapsed.TotalMilliseconds, numAttempts);
|
||||
|
||||
return;
|
||||
}
|
||||
|
@ -91,7 +91,7 @@ namespace DurableTask.Netherite.Faster
|
|||
|
||||
if (BlobUtils.IsTimeout(e))
|
||||
{
|
||||
this.TraceHelper.FasterPerfWarning($"storage operation {name} ({intent}) timed out on attempt {numAttempts} after {stopwatch.Elapsed.TotalSeconds:F1}s, retrying now; target={target} {data}");
|
||||
this.TraceHelper.FasterPerfWarning($"storage operation {name} ({intent}) timed out on attempt {numAttempts} after {stopwatch.Elapsed.TotalSeconds:F1}s, retrying now; target={target} {details}");
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -103,7 +103,7 @@ namespace DurableTask.Netherite.Faster
|
|||
}
|
||||
catch (Azure.RequestFailedException ex) when (BlobUtilsV12.PreconditionFailed(ex) && readETagAsync != null)
|
||||
{
|
||||
this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) failed precondition on attempt {numAttempts}; target={target} latencyMs={stopwatch.Elapsed.TotalMilliseconds:F1} {data}");
|
||||
this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) failed precondition on attempt {numAttempts}; target={target} latencyMs={stopwatch.Elapsed.TotalMilliseconds:F1} {details}");
|
||||
mustReadETagFirst = true;
|
||||
continue;
|
||||
}
|
||||
|
@ -134,7 +134,7 @@ namespace DurableTask.Netherite.Faster
|
|||
bool requireLease,
|
||||
string name,
|
||||
string intent,
|
||||
string data,
|
||||
string details,
|
||||
string target,
|
||||
int expectedLatencyBound,
|
||||
bool isCritical,
|
||||
|
@ -156,7 +156,7 @@ namespace DurableTask.Netherite.Faster
|
|||
|
||||
this.PartitionErrorHandler.Token.ThrowIfCancellationRequested();
|
||||
|
||||
this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) started attempt {numAttempts}; target={target} {data}");
|
||||
this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) started attempt {numAttempts}; target={target} {details}");
|
||||
stopwatch.Restart();
|
||||
|
||||
this.FaultInjector?.StorageAccess(this, name, intent, target);
|
||||
|
@ -169,13 +169,13 @@ namespace DurableTask.Netherite.Faster
|
|||
}
|
||||
|
||||
stopwatch.Stop();
|
||||
this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) succeeded on attempt {numAttempts}; target={target} latencyMs={stopwatch.Elapsed.TotalMilliseconds:F1} size={size} {data} ");
|
||||
this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) succeeded on attempt {numAttempts}; target={target} latencyMs={stopwatch.Elapsed.TotalMilliseconds:F1} size={size} {details} ");
|
||||
|
||||
this.TraceHelper.FasterAzureStorageAccessCompleted(intent, size, name, target, stopwatch.Elapsed.TotalMilliseconds, numAttempts);
|
||||
this.TraceHelper.FasterAzureStorageAccessCompleted(intent, size, name, details, target, stopwatch.Elapsed.TotalMilliseconds, numAttempts);
|
||||
|
||||
if (stopwatch.ElapsedMilliseconds > expectedLatencyBound)
|
||||
{
|
||||
this.TraceHelper.FasterPerfWarning($"storage operation {name} ({intent}) took {stopwatch.Elapsed.TotalSeconds:F1}s on attempt {numAttempts}, which is excessive; {data}");
|
||||
this.TraceHelper.FasterPerfWarning($"storage operation {name} ({intent}) took {stopwatch.Elapsed.TotalSeconds:F1}s on attempt {numAttempts}, which is excessive; {details}");
|
||||
}
|
||||
|
||||
return;
|
||||
|
@ -191,7 +191,7 @@ namespace DurableTask.Netherite.Faster
|
|||
stopwatch.Stop();
|
||||
if (BlobUtils.IsTimeout(e))
|
||||
{
|
||||
this.TraceHelper.FasterPerfWarning($"storage operation {name} ({intent}) timed out on attempt {numAttempts} after {stopwatch.Elapsed.TotalSeconds:F1}s, retrying now; target={target} {data}");
|
||||
this.TraceHelper.FasterPerfWarning($"storage operation {name} ({intent}) timed out on attempt {numAttempts} after {stopwatch.Elapsed.TotalSeconds:F1}s, retrying now; target={target} {details}");
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -256,8 +256,9 @@ namespace DurableTask.Netherite.Faster
|
|||
}
|
||||
|
||||
// recover Faster
|
||||
this.blobManager.TraceHelper.FasterProgress($"Recovering FasterKV");
|
||||
this.blobManager.TraceHelper.FasterProgress($"Recovering FasterKV - Entering fht.RecoverAsync");
|
||||
await this.fht.RecoverAsync(this.partition.Settings.FasterTuningParameters?.NumPagesToPreload ?? 1, true, -1, this.terminationToken);
|
||||
this.blobManager.TraceHelper.FasterProgress($"Recovering FasterKV - Returned from fht.RecoverAsync");
|
||||
this.mainSession = this.CreateASession($"main-{this.blobManager.IncarnationTimestamp:o}", false);
|
||||
for (int i = 0; i < this.querySessions.Length; i++)
|
||||
{
|
||||
|
|
|
@ -146,13 +146,13 @@ namespace DurableTask.Netherite.Faster
|
|||
}
|
||||
}
|
||||
|
||||
public void FasterAzureStorageAccessCompleted(string intent, long size, string operation, string target, double latency, int attempt)
|
||||
public void FasterAzureStorageAccessCompleted(string intent, long size, string operation, string details, string target, double latency, int attempt)
|
||||
{
|
||||
if (this.logLevelLimit <= LogLevel.Debug)
|
||||
{
|
||||
this.logger.LogDebug("Part{partition:D2} storage access completed intent={intent} size={size} operation={operation} target={target} latency={latency} attempt={attempt}",
|
||||
this.partitionId, intent, size, operation, target, latency, attempt);
|
||||
EtwSource.Log.FasterAzureStorageAccessCompleted(this.account, this.taskHub, this.partitionId, intent, size, operation, target, latency, attempt, TraceUtils.AppName, TraceUtils.ExtensionVersion);
|
||||
this.logger.LogDebug("Part{partition:D2} storage access completed intent={intent} size={size} operation={operation} {details} target={target} latency={latency} attempt={attempt}",
|
||||
this.partitionId, intent, size, operation, details, target, latency, attempt);
|
||||
EtwSource.Log.FasterAzureStorageAccessCompleted(this.account, this.taskHub, this.partitionId, intent, size, operation, details, target, latency, attempt, TraceUtils.AppName, TraceUtils.ExtensionVersion);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -378,11 +378,11 @@ namespace DurableTask.Netherite
|
|||
this.WriteEvent(265, Account, TaskHub, PartitionId, Details, AppName, ExtensionVersion);
|
||||
}
|
||||
|
||||
[Event(266, Level = EventLevel.Verbose, Version = 1)]
|
||||
public void FasterAzureStorageAccessCompleted(string Account, string TaskHub, int PartitionId, string Intent, long Size, string Operation, string Target, double Latency, int Attempt, string AppName, string ExtensionVersion)
|
||||
[Event(266, Level = EventLevel.Verbose, Version = 3)]
|
||||
public void FasterAzureStorageAccessCompleted(string Account, string TaskHub, int PartitionId, string Intent,long Size, string Operation, string Details, string Target, double Latency, int Attempt, string AppName, string ExtensionVersion)
|
||||
{
|
||||
SetCurrentThreadActivityId(serviceInstanceId);
|
||||
this.WriteEvent(266, Account, TaskHub, PartitionId, Intent, Size, Operation, Target, Latency, Attempt, AppName, ExtensionVersion);
|
||||
this.WriteEvent(266, Account, TaskHub, PartitionId, Intent, Size, Operation, Details, Target, Latency, Attempt, AppName, ExtensionVersion);
|
||||
}
|
||||
|
||||
[Event(267, Level = EventLevel.Warning, Version = 1)]
|
||||
|
|
Загрузка…
Ссылка в новой задаче