unsetting read only on retries (#3671)
This commit is contained in:
Родитель
4ae7bec855
Коммит
3b31406553
|
@ -94,7 +94,7 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Operations.Import
|
|||
{
|
||||
loaded = new List<ImportResource>();
|
||||
conflicts = new List<ImportResource>();
|
||||
ImportResourcesInBufferInternal(resources, loaded, conflicts, importMode, cancellationToken).Wait();
|
||||
ImportResourcesInBufferInternal(resources, loaded, conflicts, importMode, retries == 0, cancellationToken).Wait();
|
||||
break;
|
||||
}
|
||||
catch (Exception e)
|
||||
|
@ -126,7 +126,7 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Operations.Import
|
|||
resources.Clear();
|
||||
}
|
||||
|
||||
private async Task ImportResourcesInBufferInternal(List<ImportResource> resources, List<ImportResource> loaded, List<ImportResource> conflicts, ImportMode importMode, CancellationToken cancellationToken)
|
||||
private async Task ImportResourcesInBufferInternal(List<ImportResource> resources, List<ImportResource> loaded, List<ImportResource> conflicts, ImportMode importMode, bool useReplicasForReads, CancellationToken cancellationToken)
|
||||
{
|
||||
var goodResources = resources.Where(r => string.IsNullOrEmpty(r.ImportError)).ToList();
|
||||
if (importMode == ImportMode.InitialLoad)
|
||||
|
@ -134,7 +134,7 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Operations.Import
|
|||
var inputDedupped = goodResources.GroupBy(_ => _.ResourceWrapper.ToResourceKey(true)).Select(_ => _.OrderBy(_ => _.ResourceWrapper.LastModified).First()).ToList();
|
||||
var current = new HashSet<ResourceKey>((await _store.GetAsync(inputDedupped.Select(_ => _.ResourceWrapper.ToResourceKey(true)).ToList(), cancellationToken)).Select(_ => _.ToResourceKey(true)));
|
||||
loaded.AddRange(inputDedupped.Where(i => !current.TryGetValue(i.ResourceWrapper.ToResourceKey(true), out _)));
|
||||
await MergeResourcesAsync(loaded, cancellationToken);
|
||||
await MergeResourcesAsync(loaded, useReplicasForReads, cancellationToken);
|
||||
}
|
||||
else if (importMode == ImportMode.IncrementalLoad)
|
||||
{
|
||||
|
@ -144,12 +144,12 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Operations.Import
|
|||
.Select(_ => _.First())
|
||||
.ToList();
|
||||
|
||||
await HandleIncrementalVersionedImport(inputDedupped);
|
||||
await HandleIncrementalVersionedImport(inputDedupped, useReplicasForReads);
|
||||
|
||||
await HandleIncrementalUnversionedImport(inputDedupped);
|
||||
await HandleIncrementalUnversionedImport(inputDedupped, useReplicasForReads);
|
||||
}
|
||||
|
||||
async Task HandleIncrementalVersionedImport(List<ImportResource> inputDedupped)
|
||||
async Task HandleIncrementalVersionedImport(List<ImportResource> inputDedupped, bool useReplicasForReads)
|
||||
{
|
||||
// Dedup by version via ToResourceKey - only keep first occurance of a version in this batch.
|
||||
var inputDeduppedWithVersions = inputDedupped.Where(_ => _.KeepVersion).GroupBy(_ => _.ResourceWrapper.ToResourceKey()).Select(_ => _.First()).ToList();
|
||||
|
@ -159,10 +159,10 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Operations.Import
|
|||
|
||||
// Import resource versions that don't exist in the db. Sorting is used in merge to set isHistory - don't change it without updating that method!
|
||||
loaded.AddRange(inputDeduppedWithVersions.Where(i => !currentResourceKeysInDb.TryGetValue(i.ResourceWrapper.ToResourceKey(), out _)).OrderBy(_ => _.ResourceWrapper.ResourceId).ThenByDescending(_ => _.ResourceWrapper.LastModified));
|
||||
await MergeResourcesAsync(loaded, cancellationToken);
|
||||
await MergeResourcesAsync(loaded, useReplicasForReads, cancellationToken);
|
||||
}
|
||||
|
||||
async Task HandleIncrementalUnversionedImport(List<ImportResource> inputDedupped)
|
||||
async Task HandleIncrementalUnversionedImport(List<ImportResource> inputDedupped, bool useReplicasForReads)
|
||||
{
|
||||
// Dedup by resource id - only keep first occurance of an unversioned resource. This method is run in many parallel workers - we cannot guarantee processing order across parallel file streams. Taking the first resource avoids conflicts.
|
||||
var inputDeduppedNoVersion = inputDedupped.Where(_ => !_.KeepVersion).GroupBy(_ => _.ResourceWrapper.ToResourceKey(ignoreVersion: true)).Select(_ => _.First()).ToList();
|
||||
|
@ -200,18 +200,18 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Operations.Import
|
|||
|
||||
// Finally merge the resources to the db.
|
||||
var inputDeduppedNoVersionNoConflict = inputDeduppedNoVersion.Except(conflicts).ToList(); // some resources might get version assigned
|
||||
await MergeResourcesAsync(inputDeduppedNoVersionNoConflict.Where(_ => _.KeepVersion).ToList(), cancellationToken);
|
||||
await MergeResourcesAsync(inputDeduppedNoVersionNoConflict.Where(_ => !_.KeepVersion).ToList(), cancellationToken);
|
||||
await MergeResourcesAsync(inputDeduppedNoVersionNoConflict.Where(_ => _.KeepVersion).ToList(), useReplicasForReads, cancellationToken);
|
||||
await MergeResourcesAsync(inputDeduppedNoVersionNoConflict.Where(_ => !_.KeepVersion).ToList(), useReplicasForReads, cancellationToken);
|
||||
loaded.AddRange(inputDeduppedNoVersionNoConflict);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task MergeResourcesAsync(IList<ImportResource> resources, CancellationToken cancellationToken)
|
||||
private async Task MergeResourcesAsync(IList<ImportResource> resources, bool useReplicasForReads, CancellationToken cancellationToken)
|
||||
{
|
||||
var input = resources.Where(_ => _.KeepLastUpdated).Select(_ => new ResourceWrapperOperation(_.ResourceWrapper, true, true, null, requireETagOnUpdate: false, keepVersion: _.KeepVersion, bundleResourceContext: null)).ToList();
|
||||
await _store.MergeInternalAsync(input, true, true, false, cancellationToken);
|
||||
await _store.MergeInternalAsync(input, true, true, false, useReplicasForReads, cancellationToken);
|
||||
input = resources.Where(_ => !_.KeepLastUpdated).Select(_ => new ResourceWrapperOperation(_.ResourceWrapper, true, true, null, requireETagOnUpdate: false, keepVersion: _.KeepVersion, bundleResourceContext: null)).ToList();
|
||||
await _store.MergeInternalAsync(input, false, true, false, cancellationToken);
|
||||
await _store.MergeInternalAsync(input, false, true, false, useReplicasForReads, cancellationToken);
|
||||
}
|
||||
|
||||
private void AppendErrorsToBuffer(IEnumerable<ImportResource> dups, IEnumerable<ImportResource> conflicts, List<string> importErrorBuffer)
|
||||
|
|
|
@ -124,7 +124,7 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Storage
|
|||
|
||||
try
|
||||
{
|
||||
var results = await MergeInternalAsync(resources, false, false, mergeOptions.EnlistInTransaction, cancellationToken); // TODO: Pass correct retries value once we start supporting retries
|
||||
var results = await MergeInternalAsync(resources, false, false, mergeOptions.EnlistInTransaction, retries == 0, cancellationToken); // TODO: Pass correct retries value once we start supporting retries
|
||||
return results;
|
||||
}
|
||||
catch (Exception e)
|
||||
|
@ -148,7 +148,7 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Storage
|
|||
}
|
||||
|
||||
// Split in a separate method to allow special logic in $import.
|
||||
internal async Task<IDictionary<DataStoreOperationIdentifier, DataStoreOperationOutcome>> MergeInternalAsync(IReadOnlyList<ResourceWrapperOperation> resources, bool keepLastUpdated, bool keepAllDeleted, bool enlistInTransaction, CancellationToken cancellationToken)
|
||||
internal async Task<IDictionary<DataStoreOperationIdentifier, DataStoreOperationOutcome>> MergeInternalAsync(IReadOnlyList<ResourceWrapperOperation> resources, bool keepLastUpdated, bool keepAllDeleted, bool enlistInTransaction, bool useReplicasForReads, CancellationToken cancellationToken)
|
||||
{
|
||||
var results = new Dictionary<DataStoreOperationIdentifier, DataStoreOperationOutcome>();
|
||||
if (resources == null || resources.Count == 0)
|
||||
|
@ -158,7 +158,7 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Storage
|
|||
|
||||
// Ignore input resource version to get latest version from the store.
|
||||
// Include invisible records (true parameter), so version is correctly determined in case only invisible is left in store.
|
||||
var existingResources = (await GetAsync(resources.Select(r => r.Wrapper.ToResourceKey(true)).Distinct().ToList(), true, cancellationToken)).ToDictionary(r => r.ToResourceKey(true), r => r);
|
||||
var existingResources = (await GetAsync(resources.Select(r => r.Wrapper.ToResourceKey(true)).Distinct().ToList(), true, useReplicasForReads, cancellationToken)).ToDictionary(r => r.ToResourceKey(true), r => r);
|
||||
|
||||
// Assume that most likely case is that all resources should be updated.
|
||||
(var transactionId, var minSequenceId) = await StoreClient.MergeResourcesBeginTransactionAsync(resources.Count, cancellationToken);
|
||||
|
@ -410,12 +410,12 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Storage
|
|||
|
||||
public async Task<IReadOnlyList<ResourceWrapper>> GetAsync(IReadOnlyList<ResourceKey> keys, CancellationToken cancellationToken)
|
||||
{
|
||||
return await GetAsync(keys, false, cancellationToken); // do not return invisible records in public interface
|
||||
return await GetAsync(keys, false, true, cancellationToken); // do not return invisible records in public interface
|
||||
}
|
||||
|
||||
private async Task<IReadOnlyList<ResourceWrapper>> GetAsync(IReadOnlyList<ResourceKey> keys, bool includeInvisible, CancellationToken cancellationToken)
|
||||
private async Task<IReadOnlyList<ResourceWrapper>> GetAsync(IReadOnlyList<ResourceKey> keys, bool includeInvisible, bool isReadOnly, CancellationToken cancellationToken)
|
||||
{
|
||||
return await _sqlStoreClient.GetAsync(keys, _model.GetResourceTypeId, _compressedRawResourceConverter.ReadCompressedRawResource, _model.GetResourceTypeName, cancellationToken, includeInvisible);
|
||||
return await _sqlStoreClient.GetAsync(keys, _model.GetResourceTypeId, _compressedRawResourceConverter.ReadCompressedRawResource, _model.GetResourceTypeName, isReadOnly, cancellationToken, includeInvisible);
|
||||
}
|
||||
|
||||
public async Task<ResourceWrapper> GetAsync(ResourceKey key, CancellationToken cancellationToken)
|
||||
|
|
|
@ -53,12 +53,12 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Storage
|
|||
await _sqlRetryService.TryLogEvent(process, status, text, startDate, cancellationToken);
|
||||
}
|
||||
|
||||
public async Task<IReadOnlyList<ResourceWrapper>> GetAsync(IReadOnlyList<ResourceKey> keys, Func<string, short> getResourceTypeId, Func<MemoryStream, string> decompress, Func<short, string> getResourceTypeName, CancellationToken cancellationToken, bool includeInvisible = false)
|
||||
public async Task<IReadOnlyList<ResourceWrapper>> GetAsync(IReadOnlyList<ResourceKey> keys, Func<string, short> getResourceTypeId, Func<MemoryStream, string> decompress, Func<short, string> getResourceTypeName, bool isReadOnly, CancellationToken cancellationToken, bool includeInvisible = false)
|
||||
{
|
||||
return await GetAsync(keys.Select(_ => new ResourceDateKey(getResourceTypeId(_.ResourceType), _.Id, 0, _.VersionId)).ToList(), decompress, getResourceTypeName, cancellationToken, includeInvisible);
|
||||
return await GetAsync(keys.Select(_ => new ResourceDateKey(getResourceTypeId(_.ResourceType), _.Id, 0, _.VersionId)).ToList(), decompress, getResourceTypeName, isReadOnly, cancellationToken, includeInvisible);
|
||||
}
|
||||
|
||||
public async Task<IReadOnlyList<ResourceWrapper>> GetAsync(IReadOnlyList<ResourceDateKey> keys, Func<MemoryStream, string> decompress, Func<short, string> getResourceTypeName, CancellationToken cancellationToken, bool includeInvisible = false)
|
||||
public async Task<IReadOnlyList<ResourceWrapper>> GetAsync(IReadOnlyList<ResourceDateKey> keys, Func<MemoryStream, string> decompress, Func<short, string> getResourceTypeName, bool isReadOnly, CancellationToken cancellationToken, bool includeInvisible = false)
|
||||
{
|
||||
if (keys == null || keys.Count == 0)
|
||||
{
|
||||
|
@ -74,7 +74,7 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Storage
|
|||
{
|
||||
try
|
||||
{
|
||||
return (await cmd.ExecuteReaderAsync(_sqlRetryService, (reader) => { return ReadResourceWrapper(reader, false, decompress, getResourceTypeName); }, _logger, cancellationToken, isReadOnly: true)).Where(_ => includeInvisible || _.RawResource.Data != _invisibleResource).ToList();
|
||||
return (await cmd.ExecuteReaderAsync(_sqlRetryService, (reader) => { return ReadResourceWrapper(reader, false, decompress, getResourceTypeName); }, _logger, cancellationToken, isReadOnly: isReadOnly)).Where(_ => includeInvisible || _.RawResource.Data != _invisibleResource).ToList();
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
|
|
|
@ -278,7 +278,7 @@ namespace Microsoft.Health.Internal.Fhir.PerfTester
|
|||
{
|
||||
var typeId = resourceId.Item2.First().ResourceTypeId;
|
||||
var id = resourceId.Item2.First().ResourceId;
|
||||
var first = _store.GetAsync(new[] { new ResourceDateKey(typeId, id, 0, null) }, (s) => "xyz", (i) => typeId.ToString(), CancellationToken.None).Result.FirstOrDefault();
|
||||
var first = _store.GetAsync(new[] { new ResourceDateKey(typeId, id, 0, null) }, (s) => "xyz", (i) => typeId.ToString(), true, CancellationToken.None).Result.FirstOrDefault();
|
||||
if (first == null)
|
||||
{
|
||||
Interlocked.Increment(ref errors);
|
||||
|
|
Загрузка…
Ссылка в новой задаче