* start

* Fix bytes

* dump resource ids before

* do not go to the database

* SuccessOnRetry

* fixed tests

* Moved wait logic request handler

* Removed cancel request from job exec exception

* plus

* test correction

* Adding ability to raise SQL exceptions in E2E tests

* Temp

* Users/sergal/importtests (#3797)

* Adding ability to raise SQL exceptions in E2E tests

* error handling

* removed console

* job hosting tests

* limit to in proc

* no retriable in export

* correct wait

* retriable back in export

* rest

* minus using

* remove pragma

* Remove retriable from export

* tests

* retriable obsolete

* polly retries and simpler tests

* Addressed comments

* After merge fixes plus get import request handler tests

* removed not applicable orch tests

* second place to hide message

* time wait limit

* new retriable error

* Fixes after merge

* using

* using

* test adjusted for grouped

* []

* Removed task cancelled exception logic
This commit is contained in:
SergeyGaluzo 2024-04-09 16:41:45 -07:00 коммит произвёл GitHub
Родитель fc296dc9c1
Коммит ff6b6e381c
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
14 изменённых файлов: 259 добавлений и 1420 удалений

Просмотреть файл

@ -43,151 +43,127 @@ namespace Microsoft.Health.Fhir.Core.UnitTests.Features.Operations.BulkImport
}
[Fact]
public async Task GivenAFhirMediator_WhenGettingAnExistingBulkImportJobWithCompletedStatus_ThenHttpResponseCodeShouldBeOk()
public async Task WhenGettingCompletedJob_ThenResponseCodeShouldBeOk()
{
var coordResult = new ImportOrchestratorJobResult()
{
Request = "Request",
};
var coordResult = new ImportOrchestratorJobResult() { Request = "Request" };
var coord = new JobInfo() { Status = JobStatus.Completed, Result = JsonConvert.SerializeObject(coordResult), Definition = JsonConvert.SerializeObject(new ImportOrchestratorJobDefinition()) };
var workerResult = new ImportProcessingJobResult() { SucceededResources = 1, FailedResources = 1, ErrorLogLocation = "http://xyz" };
var worker = new JobInfo() { Id = 1, Status = JobStatus.Completed, Result = JsonConvert.SerializeObject(workerResult), Definition = JsonConvert.SerializeObject(new ImportProcessingJobDefinition() { ResourceLocation = "http://xyz" }) };
var orchestratorJob = new JobInfo()
{
Id = 0,
GroupId = 0,
Status = JobStatus.Completed,
Result = JsonConvert.SerializeObject(coordResult),
Definition = JsonConvert.SerializeObject(new ImportOrchestratorJobDefinition()),
};
var processingJobResult = new ImportProcessingJobResult()
{
SucceededResources = 1,
FailedResources = 1,
ErrorLogLocation = "http://ResourceErrorLogLocation",
};
var processingJob = new JobInfo()
{
Id = 1,
GroupId = 0,
Status = JobStatus.Completed,
Result = JsonConvert.SerializeObject(processingJobResult),
Definition = JsonConvert.SerializeObject(new ImportProcessingJobDefinition() { ResourceLocation = "http://ResourceLocation" }),
};
GetImportResponse result = await SetupAndExecuteGetBulkImportJobByIdAsync(orchestratorJob, new List<JobInfo>() { processingJob });
var result = await SetupAndExecuteGetBulkImportJobByIdAsync(coord, [worker]);
Assert.Equal(HttpStatusCode.OK, result.StatusCode);
Assert.Single(result.JobResult.Output);
Assert.Single(result.JobResult.Error);
}
[Fact]
public async Task GivenAFhirMediator_WhenGettingAnCompletedImportJobWithFailure_ThenHttpResponseCodeShouldBeExpected()
[Theory]
[InlineData(HttpStatusCode.BadRequest)]
[InlineData(HttpStatusCode.InternalServerError)]
[InlineData((HttpStatusCode)0)]
public async Task WhenGettingFailedJob_ThenExecptionIsTrownWithCorrectResponseCode(HttpStatusCode statusCode)
{
var orchestratorJobResult = new ImportJobErrorResult()
{
HttpStatusCode = HttpStatusCode.BadRequest,
ErrorMessage = "error",
};
var coord = new JobInfo() { Status = JobStatus.Completed };
var workerResult = new ImportJobErrorResult() { ErrorMessage = "Error", HttpStatusCode = statusCode };
var worker = new JobInfo() { Id = 1, Status = JobStatus.Failed, Result = JsonConvert.SerializeObject(workerResult) };
var orchestratorJob = new JobInfo()
{
Status = JobStatus.Failed,
Result = JsonConvert.SerializeObject(orchestratorJobResult),
};
var ofe = await Assert.ThrowsAsync<OperationFailedException>(() => SetupAndExecuteGetBulkImportJobByIdAsync(coord, [worker]));
OperationFailedException ofe = await Assert.ThrowsAsync<OperationFailedException>(() => SetupAndExecuteGetBulkImportJobByIdAsync(orchestratorJob, new List<JobInfo>()));
Assert.Equal(HttpStatusCode.BadRequest, ofe.ResponseStatusCode);
Assert.NotNull(ofe.Message);
Assert.Equal(statusCode == 0 ? HttpStatusCode.InternalServerError : statusCode, ofe.ResponseStatusCode);
Assert.Equal(string.Format(Core.Resources.OperationFailed, OperationsConstants.Import, ofe.ResponseStatusCode == HttpStatusCode.InternalServerError ? HttpStatusCode.InternalServerError : "Error"), ofe.Message);
}
[Fact]
public async Task GivenAFhirMediator_WhenGettingAnExistingBulkImportJobThatWasCanceled_ThenOperationFailedExceptionIsThrownWithBadRequestHttpResponseCode()
public async Task WhenGettingFailedJob_WithGenericException_ThenExecptionIsTrownWithCorrectResponseCode()
{
var orchestratorJob = new JobInfo()
{
Status = JobStatus.Cancelled,
};
OperationFailedException ofe = await Assert.ThrowsAsync<OperationFailedException>(() => SetupAndExecuteGetBulkImportJobByIdAsync(orchestratorJob, new List<JobInfo>()));
var coord = new JobInfo() { Status = JobStatus.Completed };
object workerResult = new { message = "Error", stackTrace = "Trace" };
var worker = new JobInfo() { Id = 1, Status = JobStatus.Failed, Result = JsonConvert.SerializeObject(workerResult) };
var ofe = await Assert.ThrowsAsync<OperationFailedException>(() => SetupAndExecuteGetBulkImportJobByIdAsync(coord, [worker]));
Assert.Equal(HttpStatusCode.InternalServerError, ofe.ResponseStatusCode);
Assert.Equal(string.Format(Core.Resources.OperationFailed, OperationsConstants.Import, HttpStatusCode.InternalServerError), ofe.Message);
}
[Fact]
public async Task WhenGettingImpprtWithCancelledOrchestratorJob_ThenExceptionIsThrownWithBadResponseCode()
{
var coord = new JobInfo() { Status = JobStatus.Cancelled };
var ofe = await Assert.ThrowsAsync<OperationFailedException>(() => SetupAndExecuteGetBulkImportJobByIdAsync(coord, []));
Assert.Equal(HttpStatusCode.BadRequest, ofe.ResponseStatusCode);
}
[Fact]
public async Task GivenAFhirMediator_WhenGettingAnExistingBulkImportJobWithNotCompletedStatus_ThenHttpResponseCodeShouldBeAccepted()
public async Task WhenGettingImportWithCancelledWorkerJob_ThenExceptionIsThrownWithBadResponseCode()
{
var orchestratorJobResult = new ImportOrchestratorJobResult()
{
Request = "Request",
};
var coord = new JobInfo() { Status = JobStatus.Completed };
var worker = new JobInfo() { Id = 1, Status = JobStatus.Cancelled };
var ofe = await Assert.ThrowsAsync<OperationFailedException>(() => SetupAndExecuteGetBulkImportJobByIdAsync(coord, [worker]));
Assert.Equal(HttpStatusCode.BadRequest, ofe.ResponseStatusCode);
}
var orchestratorJob = new JobInfo()
[Fact]
public async Task WhenGettingInFlightJob_ThenResponseCodeShouldBeAccepted()
{
var coordResult = new ImportOrchestratorJobResult() { Request = "Request" };
var coord = new JobInfo() { Status = JobStatus.Completed, Result = JsonConvert.SerializeObject(coordResult), Definition = JsonConvert.SerializeObject(new ImportOrchestratorJobDefinition()) };
var workerResult = new ImportProcessingJobResult() { SucceededResources = 1, FailedResources = 1, ErrorLogLocation = "http://xyz" };
// jobs 1 and 2 are created for the same input file, they are grouped together in the results
var worker1 = new JobInfo()
{
Id = 1,
GroupId = 1,
Status = JobStatus.Running,
Result = JsonConvert.SerializeObject(orchestratorJobResult),
Definition = JsonConvert.SerializeObject(new ImportOrchestratorJobDefinition()),
Status = JobStatus.Completed,
Result = JsonConvert.SerializeObject(workerResult),
Definition = JsonConvert.SerializeObject(new ImportProcessingJobDefinition() { ResourceLocation = "http://xyz" }),
};
var processingJobResult = new ImportProcessingJobResult()
{
SucceededResources = 1,
FailedResources = 1,
ErrorLogLocation = "http://ResourceErrorLogLocation",
};
var processingJob1 = new JobInfo()
var worker2 = new JobInfo()
{
Id = 2,
GroupId = 1,
Status = JobStatus.Completed,
Result = JsonConvert.SerializeObject(processingJobResult),
Definition = JsonConvert.SerializeObject(new ImportProcessingJobDefinition() { ResourceLocation = "http://ResourceLocation" }),
Result = JsonConvert.SerializeObject(workerResult),
Definition = JsonConvert.SerializeObject(new ImportProcessingJobDefinition() { ResourceLocation = "http://xyz" }),
};
var processingJob2 = new JobInfo()
var worker3 = new JobInfo()
{
Id = 3,
GroupId = 1,
Status = JobStatus.Completed,
Result = JsonConvert.SerializeObject(processingJobResult),
Definition = JsonConvert.SerializeObject(new ImportProcessingJobDefinition() { ResourceLocation = "http://ResourceLocation" }),
Result = JsonConvert.SerializeObject(workerResult),
Definition = JsonConvert.SerializeObject(new ImportProcessingJobDefinition() { ResourceLocation = "http://xyz2" }),
};
var processingJob3 = new JobInfo()
var worker4 = new JobInfo()
{
Id = 4,
GroupId = 1,
Status = JobStatus.Running,
Result = JsonConvert.SerializeObject(processingJobResult),
Definition = JsonConvert.SerializeObject(new ImportProcessingJobDefinition() { ResourceLocation = "http://ResourceLocation" }),
};
GetImportResponse result = await SetupAndExecuteGetBulkImportJobByIdAsync(orchestratorJob, new List<JobInfo>() { processingJob1, processingJob2, processingJob3 });
var result = await SetupAndExecuteGetBulkImportJobByIdAsync(coord, [worker1, worker2, worker3, worker4]);
Assert.Equal(HttpStatusCode.Accepted, result.StatusCode);
Assert.Equal(2, result.JobResult.Output.Count);
Assert.Equal(2, result.JobResult.Error.Count);
Assert.Equal(3, result.JobResult.Error.Count);
}
[Fact]
public async Task GivenAFhirMediator_WhenGettingWithNotExistJob_ThenNotFoundShouldBeReturned()
public async Task WhenGettingANotExistingJob_ThenNotFoundShouldBeReturned()
{
await Assert.ThrowsAsync<ResourceNotFoundException>(async () => await _mediator.GetImportStatusAsync(1, CancellationToken.None));
}
private async Task<GetImportResponse> SetupAndExecuteGetBulkImportJobByIdAsync(JobInfo orchestratorJobInfo, List<JobInfo> processingJobInfos)
private async Task<GetImportResponse> SetupAndExecuteGetBulkImportJobByIdAsync(JobInfo coord, List<JobInfo> workers)
{
_queueClient.GetJobByIdAsync(Arg.Any<byte>(), Arg.Any<long>(), Arg.Any<bool>(), Arg.Any<CancellationToken>()).Returns(orchestratorJobInfo);
_queueClient.GetJobByIdAsync(Arg.Any<byte>(), Arg.Any<long>(), Arg.Any<bool>(), Arg.Any<CancellationToken>()).Returns(coord);
var allJobs = new List<JobInfo>(processingJobInfos);
allJobs.Add(orchestratorJobInfo);
var allJobs = new List<JobInfo>(workers);
allJobs.Add(coord);
_queueClient.GetJobByGroupIdAsync(Arg.Any<byte>(), Arg.Any<long>(), Arg.Any<bool>(), Arg.Any<CancellationToken>()).Returns(allJobs);
return await _mediator.GetImportStatusAsync(orchestratorJobInfo.Id, CancellationToken.None);
return await _mediator.GetImportStatusAsync(coord.Id, CancellationToken.None);
}
}
}

Просмотреть файл

@ -45,93 +45,90 @@ namespace Microsoft.Health.Fhir.Core.Features.Operations.Import
throw new UnauthorizedFhirActionException();
}
JobInfo coordInfo = await _queueClient.GetJobByIdAsync(QueueType.Import, request.JobId, false, cancellationToken);
if (coordInfo == null || coordInfo.Status == JobStatus.Archived)
var coord = await _queueClient.GetJobByIdAsync(QueueType.Import, request.JobId, false, cancellationToken);
if (coord == null || coord.Status == JobStatus.Archived)
{
throw new ResourceNotFoundException(string.Format(Core.Resources.ImportJobNotFound, request.JobId));
}
if (coordInfo.Status == JobStatus.Created)
else if (coord.Status == JobStatus.Created || coord.Status == JobStatus.Running)
{
return new GetImportResponse(HttpStatusCode.Accepted);
}
else if (coordInfo.Status == JobStatus.Running)
else if (coord.Status == JobStatus.Cancelled)
{
if (string.IsNullOrEmpty(coordInfo.Result))
throw new OperationFailedException(Core.Resources.UserRequestedCancellation, HttpStatusCode.BadRequest);
}
else if (coord.Status == JobStatus.Failed)
{
var errorResult = JsonConvert.DeserializeObject<ImportJobErrorResult>(coord.Result);
if (errorResult.HttpStatusCode == 0)
{
return new GetImportResponse(HttpStatusCode.Accepted);
errorResult.HttpStatusCode = HttpStatusCode.InternalServerError;
}
ImportOrchestratorJobResult orchestratorJobResult = JsonConvert.DeserializeObject<ImportOrchestratorJobResult>(coordInfo.Result);
(List<ImportOperationOutcome> completedOperationOutcome, List<ImportFailedOperationOutcome> failedOperationOutcome)
= await GetProcessingResultAsync(coordInfo.GroupId, cancellationToken);
var result = new ImportJobResult()
{
Request = orchestratorJobResult.Request,
TransactionTime = coordInfo.CreateDate,
Output = completedOperationOutcome,
Error = failedOperationOutcome,
};
return new GetImportResponse(HttpStatusCode.Accepted, result);
}
else if (coordInfo.Status == JobStatus.Completed)
{
ImportOrchestratorJobResult orchestratorJobResult = JsonConvert.DeserializeObject<ImportOrchestratorJobResult>(coordInfo.Result);
(List<ImportOperationOutcome> completedOperationOutcome, List<ImportFailedOperationOutcome> failedOperationOutcome)
= await GetProcessingResultAsync(coordInfo.GroupId, cancellationToken);
var result = new ImportJobResult()
{
Request = orchestratorJobResult.Request,
TransactionTime = coordInfo.CreateDate,
Output = completedOperationOutcome,
Error = failedOperationOutcome,
};
return new GetImportResponse(HttpStatusCode.OK, result);
}
else if (coordInfo.Status == JobStatus.Failed)
{
var errorResult = JsonConvert.DeserializeObject<ImportJobErrorResult>(coordInfo.Result);
//// do not show error message for InternalServerError
// hide error message for InternalServerError
var failureReason = errorResult.HttpStatusCode == HttpStatusCode.InternalServerError ? HttpStatusCode.InternalServerError.ToString() : errorResult.ErrorMessage;
throw new OperationFailedException(string.Format(Core.Resources.OperationFailed, OperationsConstants.Import, failureReason), errorResult.HttpStatusCode);
}
else if (coordInfo.Status == JobStatus.Cancelled)
else if (coord.Status == JobStatus.Completed)
{
throw new OperationFailedException(Core.Resources.UserRequestedCancellation, HttpStatusCode.BadRequest);
var start = Stopwatch.StartNew();
var jobs = (await _queueClient.GetJobByGroupIdAsync(QueueType.Import, coord.GroupId, true, cancellationToken)).Where(x => x.Id != coord.Id).ToList();
var results = GetProcessingResultAsync(jobs);
await Task.Delay(TimeSpan.FromSeconds(start.Elapsed.TotalSeconds > 6 ? 60 : start.Elapsed.TotalSeconds * 10), cancellationToken); // throttle to avoid misuse.
var inFlightJobsExist = jobs.Any(x => x.Status == JobStatus.Running || x.Status == JobStatus.Created);
var cancelledJobsExist = jobs.Any(x => x.Status == JobStatus.Cancelled || (x.Status == JobStatus.Running && x.CancelRequested));
var failedJobsExist = jobs.Any(x => x.Status == JobStatus.Failed);
if (cancelledJobsExist && !failedJobsExist)
{
throw new OperationFailedException(Core.Resources.UserRequestedCancellation, HttpStatusCode.BadRequest);
}
else if (failedJobsExist)
{
var failed = jobs.First(x => x.Status == JobStatus.Failed);
var errorResult = JsonConvert.DeserializeObject<ImportJobErrorResult>(failed.Result);
if (errorResult.HttpStatusCode == 0)
{
errorResult.HttpStatusCode = HttpStatusCode.InternalServerError;
}
// hide error message for InternalServerError
var failureReason = errorResult.HttpStatusCode == HttpStatusCode.InternalServerError ? HttpStatusCode.InternalServerError.ToString() : errorResult.ErrorMessage;
throw new OperationFailedException(string.Format(Core.Resources.OperationFailed, OperationsConstants.Import, failureReason), errorResult.HttpStatusCode);
}
else // no failures here
{
var coordResult = JsonConvert.DeserializeObject<ImportOrchestratorJobResult>(coord.Result);
var result = new ImportJobResult() { Request = coordResult.Request, TransactionTime = coord.CreateDate, Output = results.Completed, Error = results.Failed };
return new GetImportResponse(!inFlightJobsExist ? HttpStatusCode.OK : HttpStatusCode.Accepted, result);
}
}
else
{
throw new OperationFailedException(Core.Resources.UnknownError, HttpStatusCode.InternalServerError);
}
}
private async Task<(List<ImportOperationOutcome> completedOperationOutcome, List<ImportFailedOperationOutcome> failedOperationOutcome)> GetProcessingResultAsync(long groupId, CancellationToken cancellationToken)
{
var start = Stopwatch.StartNew();
var jobs = await _queueClient.GetJobByGroupIdAsync(QueueType.Import, groupId, true, cancellationToken);
var duration = start.Elapsed.TotalSeconds;
var completedOperationOutcome = new List<ImportOperationOutcome>();
var failedOperationOutcome = new List<ImportFailedOperationOutcome>();
foreach (var job in jobs.Where(_ => _.Id != groupId && _.Status == JobStatus.Completed)) // ignore coordinator && not completed
static (List<ImportOperationOutcome> Completed, List<ImportFailedOperationOutcome> Failed) GetProcessingResultAsync(IList<JobInfo> jobs)
{
var definition = JsonConvert.DeserializeObject<ImportProcessingJobDefinition>(job.Definition);
var result = JsonConvert.DeserializeObject<ImportProcessingJobResult>(job.Result);
completedOperationOutcome.Add(new ImportOperationOutcome() { Type = definition.ResourceType, Count = result.SucceededResources, InputUrl = new Uri(definition.ResourceLocation) });
if (result.FailedResources > 0)
var completed = new List<ImportOperationOutcome>();
var failed = new List<ImportFailedOperationOutcome>();
foreach (var job in jobs.Where(_ => _.Status == JobStatus.Completed))
{
failedOperationOutcome.Add(new ImportFailedOperationOutcome() { Type = definition.ResourceType, Count = result.FailedResources, InputUrl = new Uri(definition.ResourceLocation), Url = result.ErrorLogLocation });
var definition = JsonConvert.DeserializeObject<ImportProcessingJobDefinition>(job.Definition);
var result = JsonConvert.DeserializeObject<ImportProcessingJobResult>(job.Result);
completed.Add(new ImportOperationOutcome() { Type = definition.ResourceType, Count = result.SucceededResources, InputUrl = new Uri(definition.ResourceLocation) });
if (result.FailedResources > 0)
{
failed.Add(new ImportFailedOperationOutcome() { Type = definition.ResourceType, Count = result.FailedResources, InputUrl = new Uri(definition.ResourceLocation), Url = result.ErrorLogLocation });
}
}
// group success results by url
var groupped = completed.GroupBy(o => o.InputUrl).Select(g => new ImportOperationOutcome() { Type = g.First().Type, Count = g.Sum(_ => _.Count), InputUrl = g.Key }).ToList();
return (groupped, failed);
}
await Task.Delay(TimeSpan.FromSeconds(duration * 10), cancellationToken); // throttle to avoid misuse.
return (completedOperationOutcome, failedOperationOutcome);
}
}
}

Просмотреть файл

@ -31,10 +31,5 @@ namespace Microsoft.Health.Fhir.Core.Features.Operations.Import
/// Critical error during data processing.
/// </summary>
public string ErrorDetails { get; set; }
/// <summary>
/// Current index for last checkpoint
/// </summary>
public long CurrentIndex { get; set; }
}
}

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -10,10 +10,13 @@ using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using MediatR;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Health.Core.Features.Context;
using Microsoft.Health.Fhir.Core.Features.Audit;
using Microsoft.Health.Fhir.Core.Features.Context;
using Microsoft.Health.Fhir.Core.Features.Operations.BulkDelete;
using Microsoft.Health.Fhir.Core.Features.Operations.Import;
using Microsoft.Health.Fhir.Core.Features.Persistence;
using Microsoft.Health.Fhir.SqlServer.Features.Operations.Import;
@ -104,6 +107,9 @@ namespace Microsoft.Health.Fhir.SqlServer.UnitTests.Features.Operations.Import
IImportErrorStoreFactory importErrorStoreFactory = Substitute.For<IImportErrorStoreFactory>();
RequestContextAccessor<IFhirRequestContext> contextAccessor = Substitute.For<RequestContextAccessor<IFhirRequestContext>>();
ILoggerFactory loggerFactory = new NullLoggerFactory();
IMediator mediator = Substitute.For<IMediator>();
IAuditLogger auditLogger = Substitute.For<IAuditLogger>();
IQueueClient queueClient = Substitute.For<IQueueClient>();
loader.LoadResources(Arg.Any<string>(), Arg.Any<long>(), Arg.Any<int>(), Arg.Any<string>(), Arg.Any<ImportMode>(), Arg.Any<CancellationToken>())
.Returns(callInfo =>
@ -133,11 +139,14 @@ namespace Microsoft.Health.Fhir.SqlServer.UnitTests.Features.Operations.Import
});
ImportProcessingJob job = new ImportProcessingJob(
mediator,
queueClient,
loader,
importer,
importErrorStoreFactory,
contextAccessor,
loggerFactory);
loggerFactory,
auditLogger);
await Assert.ThrowsAsync<JobExecutionException>(() => job.ExecuteAsync(GetJobInfo(inputData, result), CancellationToken.None));
}
@ -154,6 +163,9 @@ namespace Microsoft.Health.Fhir.SqlServer.UnitTests.Features.Operations.Import
IImportErrorStoreFactory importErrorStoreFactory = Substitute.For<IImportErrorStoreFactory>();
RequestContextAccessor<IFhirRequestContext> contextAccessor = Substitute.For<RequestContextAccessor<IFhirRequestContext>>();
ILoggerFactory loggerFactory = new NullLoggerFactory();
IMediator mediator = Substitute.For<IMediator>();
IAuditLogger auditLogger = Substitute.For<IAuditLogger>();
IQueueClient queueClient = Substitute.For<IQueueClient>();
importer.Import(Arg.Any<Channel<ImportResource>>(), Arg.Any<IImportErrorStore>(), Arg.Any<ImportMode>(), Arg.Any<CancellationToken>())
.Returns(callInfo =>
@ -167,11 +179,14 @@ namespace Microsoft.Health.Fhir.SqlServer.UnitTests.Features.Operations.Import
});
ImportProcessingJob job = new ImportProcessingJob(
mediator,
queueClient,
loader,
importer,
importErrorStoreFactory,
contextAccessor,
loggerFactory);
loggerFactory,
auditLogger);
await Assert.ThrowsAsync<JobExecutionException>(() => job.ExecuteAsync(GetJobInfo(inputData, result), CancellationToken.None));
}
@ -187,6 +202,9 @@ namespace Microsoft.Health.Fhir.SqlServer.UnitTests.Features.Operations.Import
IImportErrorStoreFactory importErrorStoreFactory = Substitute.For<IImportErrorStoreFactory>();
RequestContextAccessor<IFhirRequestContext> contextAccessor = Substitute.For<RequestContextAccessor<IFhirRequestContext>>();
ILoggerFactory loggerFactory = new NullLoggerFactory();
IMediator mediator = Substitute.For<IMediator>();
IAuditLogger auditLogger = Substitute.For<IAuditLogger>();
IQueueClient queueClient = Substitute.For<IQueueClient>();
loader.LoadResources(Arg.Any<string>(), Arg.Any<long>(), Arg.Any<int>(), Arg.Any<string>(), Arg.Any<ImportMode>(), Arg.Any<CancellationToken>())
.Returns(callInfo =>
@ -236,7 +254,7 @@ namespace Microsoft.Health.Fhir.SqlServer.UnitTests.Features.Operations.Import
return progress;
});
var job = new ImportProcessingJob(loader, importer, importErrorStoreFactory, contextAccessor, loggerFactory);
var job = new ImportProcessingJob(mediator, queueClient, loader, importer, importErrorStoreFactory, contextAccessor, loggerFactory, auditLogger);
string resultString = await job.ExecuteAsync(GetJobInfo(inputData, currentResult), CancellationToken.None);
ImportProcessingJobResult result = JsonConvert.DeserializeObject<ImportProcessingJobResult>(resultString);

Просмотреть файл

@ -42,7 +42,8 @@ namespace Microsoft.Health.Fhir.SqlServer.Features
|| str.Contains("connection timeout expired", StringComparison.OrdinalIgnoreCase)
|| str.Contains("existing connection was forcibly closed by the remote host", StringComparison.OrdinalIgnoreCase)
|| str.Contains("connection was recovered and rowcount in the first query is not available", StringComparison.OrdinalIgnoreCase)
|| str.Contains("connection was successfully established with the server, but then an error occurred during the login process", StringComparison.OrdinalIgnoreCase);
|| str.Contains("connection was successfully established with the server, but then an error occurred during the login process", StringComparison.OrdinalIgnoreCase)
|| str.Contains("server provided routing information, but timeout already expired", StringComparison.OrdinalIgnoreCase);
////A severe error occurred on the current command. The results, if any, should be discarded.
////Meaning:

Просмотреть файл

@ -85,9 +85,7 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Operations.Import
public async Task<string> ExecuteAsync(JobInfo jobInfo, CancellationToken cancellationToken)
{
ImportOrchestratorJobDefinition inputData = jobInfo.DeserializeDefinition<ImportOrchestratorJobDefinition>();
ImportOrchestratorJobResult currentResult = string.IsNullOrEmpty(jobInfo.Result) ? new ImportOrchestratorJobResult() : jobInfo.DeserializeResult<ImportOrchestratorJobResult>();
var inputData = jobInfo.DeserializeDefinition<ImportOrchestratorJobDefinition>();
var fhirRequestContext = new FhirRequestContext(
method: "Import",
uriString: inputData.RequestUri.ToString(),
@ -98,73 +96,57 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Operations.Import
{
IsBackgroundTask = true,
};
_contextAccessor.RequestContext = fhirRequestContext;
currentResult.Request = inputData.RequestUri.ToString();
var result = new ImportOrchestratorJobResult();
result.Request = inputData.RequestUri.ToString();
ImportJobErrorResult errorResult = null;
try
{
cancellationToken.ThrowIfCancellationRequested();
await ValidateResourcesAsync(inputData, cancellationToken);
_logger.LogJobInformation(jobInfo, "Input Resources Validated.");
_logger.LogJobInformation(jobInfo, "Input resources validated.");
await ExecuteImportProcessingJobAsync(jobInfo, inputData, currentResult, cancellationToken);
_logger.LogJobInformation(jobInfo, "SubJobs Completed.");
await EnqueueProcessingJobsAsync(jobInfo, inputData, result, cancellationToken);
_logger.LogJobInformation(jobInfo, "Registration of processing jobs completed.");
}
catch (TaskCanceledException taskCanceledEx)
catch (OperationCanceledException ex)
{
_logger.LogJobInformation(taskCanceledEx, jobInfo, "Import job canceled. {Message}", taskCanceledEx.Message);
errorResult = new ImportJobErrorResult()
{
HttpStatusCode = HttpStatusCode.BadRequest,
ErrorMessage = taskCanceledEx.Message,
};
await WaitCancelledJobCompletedAsync(jobInfo);
await SendImportMetricsNotification(JobStatus.Cancelled, jobInfo, currentResult, inputData.ImportMode, fhirRequestContext);
_logger.LogJobInformation(ex, jobInfo, "Import job canceled. {Message}", ex.Message);
errorResult = new ImportJobErrorResult() { ErrorMessage = ex.Message, HttpStatusCode = HttpStatusCode.BadRequest };
await SendNotification(JobStatus.Cancelled, jobInfo, 0, 0, result.TotalBytes, inputData.ImportMode, fhirRequestContext, _logger, _auditLogger, _mediator);
}
catch (OperationCanceledException canceledEx)
catch (IntegrationDataStoreException ex)
{
_logger.LogJobInformation(canceledEx, jobInfo, "Import job canceled. {Message}", canceledEx.Message);
errorResult = new ImportJobErrorResult()
{
HttpStatusCode = HttpStatusCode.BadRequest,
ErrorMessage = canceledEx.Message,
};
await WaitCancelledJobCompletedAsync(jobInfo);
await SendImportMetricsNotification(JobStatus.Cancelled, jobInfo, currentResult, inputData.ImportMode, fhirRequestContext);
}
catch (IntegrationDataStoreException integrationDataStoreEx)
{
_logger.LogJobInformation(integrationDataStoreEx, jobInfo, "Failed to access input files.");
errorResult = new ImportJobErrorResult()
{
HttpStatusCode = integrationDataStoreEx.StatusCode,
ErrorMessage = integrationDataStoreEx.Message,
};
await SendImportMetricsNotification(JobStatus.Failed, jobInfo, currentResult, inputData.ImportMode, fhirRequestContext);
_logger.LogJobInformation(ex, jobInfo, "Failed to access input files.");
errorResult = new ImportJobErrorResult() { ErrorMessage = ex.Message, HttpStatusCode = ex.StatusCode };
await SendNotification(JobStatus.Failed, jobInfo, 0, 0, result.TotalBytes, inputData.ImportMode, fhirRequestContext, _logger, _auditLogger, _mediator);
}
catch (JobExecutionException ex)
{
_logger.LogJobInformation(ex, jobInfo, "Failed to process input resources.");
errorResult = ex.Error != null ? (ImportJobErrorResult)ex.Error : new ImportJobErrorResult() { ErrorMessage = ex.Message, ErrorDetails = ex.ToString() };
if (errorResult.HttpStatusCode == 0)
{
errorResult.HttpStatusCode = HttpStatusCode.InternalServerError;
}
await CancelProcessingJobsAsync(jobInfo);
await SendImportMetricsNotification(JobStatus.Failed, jobInfo, currentResult, inputData.ImportMode, fhirRequestContext);
if (errorResult.HttpStatusCode == HttpStatusCode.InternalServerError)
{
_logger.LogJobError(ex, jobInfo, "Failed to register processing jobs.");
}
else
{
_logger.LogJobInformation(ex, jobInfo, "Failed to register processing jobs.");
}
await SendNotification(JobStatus.Failed, jobInfo, 0, 0, result.TotalBytes, inputData.ImportMode, fhirRequestContext, _logger, _auditLogger, _mediator);
}
catch (Exception ex)
{
_logger.LogJobInformation(ex, jobInfo, "Failed to import data.");
_logger.LogJobError(ex, jobInfo, "Failed to register processing jobs.");
errorResult = new ImportJobErrorResult() { ErrorMessage = ex.Message, ErrorDetails = ex.ToString(), HttpStatusCode = HttpStatusCode.InternalServerError };
await CancelProcessingJobsAsync(jobInfo);
await SendImportMetricsNotification(JobStatus.Failed, jobInfo, currentResult, inputData.ImportMode, fhirRequestContext);
await SendNotification(JobStatus.Failed, jobInfo, 0, 0, result.TotalBytes, inputData.ImportMode, fhirRequestContext, _logger, _auditLogger, _mediator);
}
if (errorResult != null)
@ -172,8 +154,7 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Operations.Import
throw new JobExecutionException(errorResult.ErrorMessage, errorResult);
}
await SendImportMetricsNotification(JobStatus.Completed, jobInfo, currentResult, inputData.ImportMode, fhirRequestContext);
return JsonConvert.SerializeObject(currentResult);
return JsonConvert.SerializeObject(result);
}
private async Task ValidateResourcesAsync(ImportOrchestratorJobDefinition inputData, CancellationToken cancellationToken)
@ -193,24 +174,24 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Operations.Import
});
}
private async Task SendImportMetricsNotification(JobStatus jobStatus, JobInfo jobInfo, ImportOrchestratorJobResult currentResult, ImportMode importMode, FhirRequestContext fhirRequestContext)
internal static async Task SendNotification<T>(JobStatus status, JobInfo info, long succeeded, long failed, long bytes, ImportMode importMode, FhirRequestContext context, ILogger<T> logger, IAuditLogger auditLogger, IMediator mediator)
{
_logger.LogJobInformation(jobInfo, "SucceededResources {SucceededResources} and FailedResources {FailedResources} in Import", currentResult.SucceededResources, currentResult.FailedResources);
logger.LogJobInformation(info, "SucceededResources {SucceededResources} and FailedResources {FailedResources} in Import", succeeded, failed);
if (importMode == ImportMode.IncrementalLoad)
{
var incrementalImportProperties = new Dictionary<string, string>();
incrementalImportProperties["JobId"] = jobInfo.Id.ToString();
incrementalImportProperties["SucceededResources"] = currentResult.SucceededResources.ToString();
incrementalImportProperties["FailedResources"] = currentResult.FailedResources.ToString();
incrementalImportProperties["JobId"] = info.Id.ToString();
incrementalImportProperties["SucceededResources"] = succeeded.ToString();
incrementalImportProperties["FailedResources"] = failed.ToString();
_auditLogger.LogAudit(
auditLogger.LogAudit(
AuditAction.Executed,
operation: "import/" + ImportMode.IncrementalLoad.ToString(),
resourceType: string.Empty,
requestUri: fhirRequestContext.Uri,
requestUri: context.Uri,
statusCode: HttpStatusCode.Accepted,
correlationId: fhirRequestContext.CorrelationId,
correlationId: context.CorrelationId,
callerIpAddress: null,
callerClaims: null,
customHeaders: null,
@ -218,34 +199,30 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Operations.Import
callerAgent: DefaultCallerAgent,
additionalProperties: incrementalImportProperties);
_logger.LogJobInformation(jobInfo, "Audit logs for incremental import are added.");
logger.LogJobInformation(info, "Audit logs for incremental import are added.");
}
var importJobMetricsNotification = new ImportJobMetricsNotification(
jobInfo.Id.ToString(),
jobStatus.ToString(),
jobInfo.CreateDate,
info.Id.ToString(),
status.ToString(),
info.CreateDate,
Clock.UtcNow,
currentResult.TotalBytes,
currentResult.SucceededResources,
currentResult.FailedResources,
bytes,
succeeded,
failed,
importMode);
await _mediator.Publish(importJobMetricsNotification, CancellationToken.None);
await mediator.Publish(importJobMetricsNotification, CancellationToken.None);
}
private async Task ExecuteImportProcessingJobAsync(JobInfo coord, ImportOrchestratorJobDefinition coordDefinition, ImportOrchestratorJobResult currentResult, CancellationToken cancellationToken)
private async Task EnqueueProcessingJobsAsync(JobInfo coord, ImportOrchestratorJobDefinition coordDefinition, ImportOrchestratorJobResult result, CancellationToken cancellationToken)
{
currentResult.TotalBytes = 0;
currentResult.FailedResources = 0;
currentResult.SucceededResources = 0;
// split blobs by size
var inputs = new List<InputResource>();
await Parallel.ForEachAsync(coordDefinition.Input, new ParallelOptions { MaxDegreeOfParallelism = 16 }, async (input, cancel) =>
{
var blobLength = (long)(await _integrationDataStoreClient.GetPropertiesAsync(input.Url, cancellationToken))[IntegrationDataStoreClientConstants.BlobPropertyLength];
currentResult.TotalBytes += blobLength;
result.TotalBytes += blobLength;
foreach (var offset in GetOffsets(blobLength, BytesToRead))
{
var newInput = input.Clone();
@ -260,9 +237,7 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Operations.Import
var jobIds = await EnqueueProcessingJobsAsync(inputs, coord.GroupId, coordDefinition, cancellationToken);
currentResult.CreatedJobs = jobIds.Count;
await WaitCompletion(coord, jobIds, currentResult, cancellationToken);
result.CreatedJobs = jobIds.Count;
}
internal static IEnumerable<long> GetOffsets(long blobLength, int bytesToRead)
@ -275,81 +250,6 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Operations.Import
}
}
private async Task WaitCompletion(JobInfo orchestratorInfo, IList<long> jobIds, ImportOrchestratorJobResult currentResult, CancellationToken cancellationToken)
{
_logger.LogJobInformation(orchestratorInfo, "Waiting for other workers to pull work from the queue");
await Task.Delay(TimeSpan.FromSeconds(PollingPeriodSec), cancellationToken); // there is no sense in checking right away as workers are polling queue on the same interval
do
{
var completedJobIds = new HashSet<long>();
var jobIdsToCheck = jobIds.Take(20).ToList();
var jobInfos = new List<JobInfo>();
double duration;
try
{
var start = Stopwatch.StartNew();
jobInfos.AddRange(await _timeoutRetries.ExecuteAsync(async () => await _queueClient.GetJobsByIdsAsync(QueueType.Import, jobIdsToCheck.ToArray(), false, cancellationToken)));
duration = start.Elapsed.TotalSeconds;
}
catch (SqlException ex)
{
_logger.LogJobError(ex, orchestratorInfo, "Failed to get running jobs.");
throw new JobExecutionException(ex.Message, ex);
}
foreach (var jobInfo in jobInfos)
{
if (jobInfo.Status != JobStatus.Created && jobInfo.Status != JobStatus.Running)
{
if (jobInfo.Status == JobStatus.Completed)
{
var procesingJobResult = jobInfo.DeserializeResult<ImportProcessingJobResult>();
currentResult.SucceededResources += procesingJobResult.SucceededResources;
currentResult.FailedResources += procesingJobResult.FailedResources;
currentResult.ProcessedBytes += procesingJobResult.ProcessedBytes;
}
else if (jobInfo.Status == JobStatus.Failed)
{
var procesingJobResult = jobInfo.DeserializeResult<ImportJobErrorResult>();
_logger.LogJobError(jobInfo, "Job is set to 'Failed'. Message: {Message}.", procesingJobResult.ErrorMessage);
throw new JobExecutionException(procesingJobResult.ErrorMessage, procesingJobResult);
}
else if (jobInfo.Status == JobStatus.Cancelled)
{
const string message = "Import operation cancelled by customer.";
_logger.LogJobError(jobInfo, message);
throw new OperationCanceledException(message);
}
completedJobIds.Add(jobInfo.Id);
_logger.LogJobInformation(jobInfo, "Job with id: {JobId} and group id: {GroupId} completed.", jobInfo.Id, jobInfo.GroupId);
}
}
if (completedJobIds.Count > 0)
{
foreach (var jobId in completedJobIds)
{
jobIds.Remove(jobId);
}
currentResult.CompletedJobs += completedJobIds.Count;
orchestratorInfo.Result = JsonConvert.SerializeObject(currentResult);
await _queueClient.PutJobHeartbeatAsync(orchestratorInfo, cancellationToken); // remove when progress is reported by selecting results of children.
_logger.LogJobInformation(orchestratorInfo, "Throttle to avoid high database utilization.");
await Task.Delay(TimeSpan.FromSeconds(duration), cancellationToken); // throttle to avoid high database utilization.
}
else
{
_logger.LogJobInformation(orchestratorInfo, "Waiting for child jobs to finish.");
await Task.Delay(TimeSpan.FromSeconds(PollingPeriodSec), cancellationToken);
}
}
while (jobIds.Count > 0);
}
private async Task<IList<long>> EnqueueProcessingJobsAsync(IEnumerable<InputResource> inputs, long groupId, ImportOrchestratorJobDefinition coordDefinition, CancellationToken cancellationToken)
{
var definitions = new List<ImportProcessingJobDefinition>();
@ -390,44 +290,5 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Operations.Import
throw new JobExecutionException("Failed to enqueue jobs.", ex);
}
}
private async Task CancelProcessingJobsAsync(JobInfo jobInfo)
{
try
{
_logger.LogJobInformation(jobInfo, "Cancelling job.", jobInfo.Id, jobInfo.GroupId);
await _queueClient.CancelJobByGroupIdAsync(jobInfo.QueueType, jobInfo.GroupId, CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogJobWarning(ex, jobInfo, "Failed to cancel job.");
}
await WaitCancelledJobCompletedAsync(jobInfo);
}
private async Task WaitCancelledJobCompletedAsync(JobInfo jobInfo)
{
while (true)
{
try
{
_logger.LogJobInformation(jobInfo, nameof(WaitCancelledJobCompletedAsync));
var jobInfos = await _timeoutRetries.ExecuteAsync(async () => await _queueClient.GetJobByGroupIdAsync(QueueType.Import, jobInfo.GroupId, false, CancellationToken.None));
if (jobInfos.All(t => (t.Status != JobStatus.Created && t.Status != JobStatus.Running) || !t.CancelRequested || t.Id == jobInfo.Id))
{
break;
}
}
catch (SqlException ex)
{
_logger.LogJobWarning(ex, jobInfo, "Failed to get jobs by groupId {GroupId}.", jobInfo.GroupId);
throw new JobExecutionException(ex.Message, ex);
}
await Task.Delay(TimeSpan.FromSeconds(5));
}
}
}
}

Просмотреть файл

@ -11,9 +11,13 @@ using System.Threading.Channels;
using System.Threading.Tasks;
using Azure;
using EnsureThat;
using MediatR;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Primitives;
using Microsoft.Health.Core.Features.Audit;
using Microsoft.Health.Core.Features.Context;
using Microsoft.Health.Fhir.Core.Extensions;
using Microsoft.Health.Fhir.Core.Features.Audit;
using Microsoft.Health.Fhir.Core.Features.Context;
using Microsoft.Health.Fhir.Core.Features.Operations;
using Microsoft.Health.Fhir.Core.Features.Operations.Import;
@ -26,25 +30,35 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Operations.Import
public class ImportProcessingJob : IJob
{
private const string CancelledErrorMessage = "Import processing job is canceled.";
internal const string DefaultCallerAgent = "Microsoft.Health.Fhir.Server";
private readonly IMediator _mediator;
private readonly IQueueClient _queueClient;
private readonly IImportResourceLoader _importResourceLoader;
private readonly IImporter _importer;
private readonly IImportErrorStoreFactory _importErrorStoreFactory;
private readonly RequestContextAccessor<IFhirRequestContext> _contextAccessor;
private readonly ILogger<ImportProcessingJob> _logger;
private readonly IAuditLogger _auditLogger;
public ImportProcessingJob(
IMediator mediator,
IQueueClient queueClient,
IImportResourceLoader importResourceLoader,
IImporter importer,
IImportErrorStoreFactory importErrorStoreFactory,
RequestContextAccessor<IFhirRequestContext> contextAccessor,
ILoggerFactory loggerFactory)
ILoggerFactory loggerFactory,
IAuditLogger auditLogger)
{
_mediator = EnsureArg.IsNotNull(mediator, nameof(mediator));
_queueClient = EnsureArg.IsNotNull(queueClient, nameof(queueClient));
_importResourceLoader = EnsureArg.IsNotNull(importResourceLoader, nameof(importResourceLoader));
_importer = EnsureArg.IsNotNull(importer, nameof(importer));
_importErrorStoreFactory = EnsureArg.IsNotNull(importErrorStoreFactory, nameof(importErrorStoreFactory));
_contextAccessor = EnsureArg.IsNotNull(contextAccessor, nameof(contextAccessor));
_logger = EnsureArg.IsNotNull(loggerFactory, nameof(loggerFactory)).CreateLogger<ImportProcessingJob>();
_auditLogger = EnsureArg.IsNotNull(auditLogger, nameof(auditLogger));
}
public async Task<string> ExecuteAsync(JobInfo jobInfo, CancellationToken cancellationToken)
@ -52,7 +66,7 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Operations.Import
EnsureArg.IsNotNull(jobInfo, nameof(jobInfo));
var definition = jobInfo.DeserializeDefinition<ImportProcessingJobDefinition>();
var currentResult = new ImportProcessingJobResult();
var result = new ImportProcessingJobResult();
var fhirRequestContext = new FhirRequestContext(
method: "Import",
@ -71,32 +85,30 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Operations.Import
{
cancellationToken.ThrowIfCancellationRequested();
// Initialize error store
// Design of error writes is too complex. We do not need separate init and writes. Also, it leads to adding duplicate error records on job restart.
IImportErrorStore importErrorStore = await _importErrorStoreFactory.InitializeAsync(GetErrorFileName(definition.ResourceType, jobInfo.GroupId, jobInfo.Id), cancellationToken);
currentResult.ErrorLogLocation = importErrorStore.ErrorFileLocation;
result.ErrorLogLocation = importErrorStore.ErrorFileLocation;
// Load and parse resource from bulk resource
// Design of resource loader is too complex. There is no need to have any channel and separate load task.
// This design was driven from assumption that worker/processing job deals with entire large file.
// This is not true anymore, as worker deals with just small portion of file accessing it by offset.
// We should just open reader and walk through all needed records in a single thread.
(Channel<ImportResource> importResourceChannel, Task loadTask) = _importResourceLoader.LoadResources(definition.ResourceLocation, definition.Offset, definition.BytesToRead, definition.ResourceType, definition.ImportMode, cancellationToken);
// Import to data store
var importProgress = await _importer.Import(importResourceChannel, importErrorStore, definition.ImportMode, cancellationToken);
currentResult.SucceededResources = importProgress.SucceededResources;
currentResult.FailedResources = importProgress.FailedResources;
currentResult.ErrorLogLocation = importErrorStore.ErrorFileLocation;
currentResult.ProcessedBytes = importProgress.ProcessedBytes;
result.SucceededResources = importProgress.SucceededResources;
result.FailedResources = importProgress.FailedResources;
result.ErrorLogLocation = importErrorStore.ErrorFileLocation;
result.ProcessedBytes = importProgress.ProcessedBytes;
_logger.LogJobInformation(jobInfo, "Import Job {JobId} progress: succeed {SucceedCount}, failed: {FailedCount}", jobInfo.Id, currentResult.SucceededResources, currentResult.FailedResources);
_logger.LogJobInformation(jobInfo, "Import Job {JobId} progress: succeed {SucceedCount}, failed: {FailedCount}", jobInfo.Id, result.SucceededResources, result.FailedResources);
try
{
await loadTask;
}
catch (TaskCanceledException tce)
{
_logger.LogJobWarning(tce, jobInfo, nameof(TaskCanceledException));
throw;
}
catch (OperationCanceledException oce)
{
_logger.LogJobWarning(oce, jobInfo, nameof(OperationCanceledException));
@ -116,7 +128,7 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Operations.Import
}
catch (IntegrationDataStoreException ex)
{
_logger.LogJobInformation(ex, jobInfo, "Failed to access input files.");
_logger.LogJobWarning(ex, jobInfo, "Failed to access input files.");
var error = new ImportJobErrorResult() { ErrorMessage = ex.Message, HttpStatusCode = ex.StatusCode };
throw new JobExecutionException(ex.Message, error, ex);
}
@ -127,14 +139,12 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Operations.Import
throw new JobExecutionException(ex.Message, error, ex);
}
jobInfo.Data = currentResult.SucceededResources + currentResult.FailedResources;
return JsonConvert.SerializeObject(currentResult);
}
catch (TaskCanceledException canceledEx)
{
_logger.LogJobInformation(canceledEx, jobInfo, CancelledErrorMessage);
var error = new ImportJobErrorResult() { ErrorMessage = CancelledErrorMessage };
throw new JobExecutionException(canceledEx.Message, error, canceledEx);
jobInfo.Data = result.SucceededResources + result.FailedResources;
// jobs are small, send on success only
await ImportOrchestratorJob.SendNotification(JobStatus.Completed, jobInfo, result.SucceededResources, result.FailedResources, result.ProcessedBytes, definition.ImportMode, fhirRequestContext, _logger, _auditLogger, _mediator);
return JsonConvert.SerializeObject(result);
}
catch (OperationCanceledException canceledEx)
{
@ -144,7 +154,7 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Operations.Import
}
catch (Exception ex)
{
_logger.LogJobInformation(ex, jobInfo, "Critical error in import processing job.");
_logger.LogJobError(ex, jobInfo, "Critical error in import processing job.");
var error = new ImportJobErrorResult() { ErrorMessage = ex.Message, ErrorDetails = ex.ToString() };
throw new JobExecutionException(ex.Message, error, ex);
}

Просмотреть файл

@ -55,30 +55,30 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Operations.Import
long succeededCount = 0;
long processedBytes = 0;
long currentIndex = -1;
var importErrorBuffer = new List<string>();
var resourceBuffer = new List<ImportResource>();
var errors = new List<string>();
var resourceBatch = new List<ImportResource>();
await foreach (ImportResource resource in inputChannel.Reader.ReadAllAsync(cancellationToken))
{
cancellationToken.ThrowIfCancellationRequested();
currentIndex = resource.Index;
resourceBuffer.Add(resource);
if (resourceBuffer.Count < _importTaskConfiguration.TransactionSize)
resourceBatch.Add(resource);
if (resourceBatch.Count < _importTaskConfiguration.TransactionSize)
{
continue;
}
var resultInt = await ImportResourcesInBuffer(resourceBuffer, importErrorBuffer, importMode, cancellationToken);
var resultInt = await ImportResourcesInBuffer(resourceBatch, errors, importMode, cancellationToken);
succeededCount += resultInt.LoadedCount;
processedBytes += resultInt.ProcessedBytes;
}
var result = await ImportResourcesInBuffer(resourceBuffer, importErrorBuffer, importMode, cancellationToken);
var result = await ImportResourcesInBuffer(resourceBatch, errors, importMode, cancellationToken);
succeededCount += result.LoadedCount;
processedBytes += result.ProcessedBytes;
return await UploadImportErrorsAsync(importErrorStore, succeededCount, importErrorBuffer.Count, importErrorBuffer.ToArray(), currentIndex, processedBytes, cancellationToken);
return await UploadImportErrorsAsync(importErrorStore, succeededCount, errors.Count, errors.ToArray(), currentIndex, processedBytes, cancellationToken);
}
finally
{
@ -93,8 +93,9 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Operations.Import
var validResources = resources.Where(r => string.IsNullOrEmpty(r.ImportError)).ToList();
var newErrors = await _store.ImportResourcesAsync(validResources, importMode, cancellationToken);
errors.AddRange(newErrors);
var totalBytes = resources.Sum(_ => (long)_.Length);
resources.Clear();
return (validResources.Count - newErrors.Count, resources.Sum(_ => (long)_.Length));
return (validResources.Count - newErrors.Count, totalBytes);
}
private async Task<ImportProcessingProgress> UploadImportErrorsAsync(IImportErrorStore importErrorStore, long succeededCount, long failedCount, string[] importErrors, long lastIndex, long processedBytes, CancellationToken cancellationToken)

Просмотреть файл

@ -275,7 +275,7 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Storage
await action(sqlCommand, cancellationToken);
if (retry > 0)
{
await TryLogEvent($"Retry:{sqlCommand.CommandText}", "Warn", $"retries={retry} error={lastException}", start, cancellationToken);
await TryLogEvent($"SuccessOnRetry:{sqlCommand.CommandText}", "Warn", $"retries={retry} error={lastException}", start, cancellationToken);
}
return;

Просмотреть файл

@ -380,6 +380,11 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Storage
internal async Task<IReadOnlyList<string>> ImportResourcesAsync(IReadOnlyList<ImportResource> resources, ImportMode importMode, CancellationToken cancellationToken)
{
if (resources.Count == 0) // do not go to the database
{
return new List<string>();
}
(List<ImportResource> Loaded, List<ImportResource> Conflicts) results;
var retries = 0;
while (true)

Просмотреть файл

@ -180,7 +180,7 @@ namespace Microsoft.Health.JobManagement
return;
}
catch (Exception ex) when (ex is OperationCanceledException || ex is TaskCanceledException)
catch (OperationCanceledException ex)
{
_logger.LogWarning(ex, "Job with id: {JobId} and group id: {GroupId} of type: {JobType} canceled.", jobInfo.Id, jobInfo.GroupId, jobInfo.QueueType);
jobInfo.Status = JobStatus.Cancelled;

Просмотреть файл

@ -845,12 +845,17 @@ IF (SELECT count(*) FROM EventLog WHERE Process = 'MergeResourcesCommitTransacti
{
var resourceCount = Regex.Matches(patientNdJsonResource, "{\"resourceType\":").Count * 2;
var notificationList = _metricHandler.NotificationMapping[typeof(ImportJobMetricsNotification)];
Assert.Single(notificationList);
var notification = notificationList.First() as ImportJobMetricsNotification;
Assert.Equal(JobStatus.Completed.ToString(), notification.Status);
Assert.NotNull(notification.DataSize);
Assert.Equal(resourceCount, notification.SucceededCount);
Assert.Equal(0, notification.FailedCount);
Assert.Equal(2, notificationList.Count);
var succeeded = 0L;
foreach (var notification in notificationList.Select(_ => (ImportJobMetricsNotification)_))
{
Assert.Equal(JobStatus.Completed.ToString(), notification.Status);
Assert.NotNull(notification.DataSize);
succeeded += notification.SucceededCount.Value;
Assert.Equal(0, notification.FailedCount);
}
Assert.Equal(resourceCount, succeeded);
}
}

Просмотреть файл

@ -57,6 +57,8 @@ namespace Microsoft.Health.Internal.Fhir.PerfTester
_sqlRetryService = SqlRetryService.GetInstance(iSqlConnectionBuilder);
_store = new SqlStoreClient<SqlServerFhirDataStore>(_sqlRetryService, NullLogger<SqlServerFhirDataStore>.Instance);
DumpResourceIds();
if (_callType == "GetDate" || _callType == "LogEvent")
{
Console.WriteLine($"Start at {DateTime.UtcNow.ToString("s")}");
@ -85,8 +87,6 @@ namespace Microsoft.Health.Internal.Fhir.PerfTester
return;
}
DumpResourceIds();
var resourceIds = GetRandomIds();
SwitchToResourceTable();
ExecuteParallelCalls(resourceIds); // compare this