Stored procedure to acquire reindex jobs (#1172)

This commit is contained in:
namadabu 2020-07-31 10:34:30 -07:00 коммит произвёл GitHub
Родитель 5bc6b74fbb
Коммит 607f4208f3
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
9 изменённых файлов: 457 добавлений и 22 удалений

Просмотреть файл

@ -11,6 +11,7 @@ using System.Threading;
using System.Threading.Tasks;
using EnsureThat;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.Scripts;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.Health.Abstractions.Exceptions;
@ -23,6 +24,7 @@ using Microsoft.Health.Fhir.CosmosDb.Configs;
using Microsoft.Health.Fhir.CosmosDb.Features.Storage.Operations.Export;
using Microsoft.Health.Fhir.CosmosDb.Features.Storage.Operations.Reindex;
using Microsoft.Health.Fhir.CosmosDb.Features.Storage.StoredProcedures.AcquireExportJobs;
using Microsoft.Health.Fhir.CosmosDb.Features.Storage.StoredProcedures.AcquireReindexJobs;
namespace Microsoft.Health.Fhir.CosmosDb.Features.Storage.Operations
{
@ -42,6 +44,7 @@ namespace Microsoft.Health.Fhir.CosmosDb.Features.Storage.Operations
private readonly ILogger _logger;
private static readonly AcquireExportJobs _acquireExportJobs = new AcquireExportJobs();
private static readonly AcquireReindexJobs _acquireReindexJobs = new AcquireReindexJobs();
/// <summary>
/// Initializes a new instance of the <see cref="CosmosFhirOperationDataStore"/> class.
@ -280,23 +283,28 @@ namespace Microsoft.Health.Fhir.CosmosDb.Features.Storage.Operations
public async Task<IReadOnlyCollection<ReindexJobWrapper>> AcquireReindexJobsAsync(ushort maximumNumberOfConcurrentJobsAllowed, TimeSpan jobHeartbeatTimeoutThreshold, CancellationToken cancellationToken)
{
// TODO: Shell for testing
var query = _queryFactory.Create<CosmosReindexJobRecordWrapper>(
_containerScope.Value,
new CosmosQueryContext(
new QueryDefinition(CheckActiveJobsByStatusQuery),
new QueryRequestOptions { PartitionKey = new PartitionKey(CosmosDbReindexConstants.ReindexJobPartitionKey) }));
FeedResponse<CosmosReindexJobRecordWrapper> result = await query.ExecuteNextAsync();
var jobList = new List<ReindexJobWrapper>();
CosmosReindexJobRecordWrapper cosmosJob = result.FirstOrDefault();
if (cosmosJob != null)
try
{
jobList.Add(new ReindexJobWrapper(cosmosJob.JobRecord, WeakETag.FromVersionId(cosmosJob.ETag)));
StoredProcedureExecuteResponse<IReadOnlyCollection<CosmosReindexJobRecordWrapper>> response = await _retryExceptionPolicyFactory.CreateRetryPolicy().ExecuteAsync(
async ct => await _acquireReindexJobs.ExecuteAsync(
_containerScope.Value.Scripts,
maximumNumberOfConcurrentJobsAllowed,
(ushort)jobHeartbeatTimeoutThreshold.TotalSeconds,
ct),
cancellationToken);
return response.Resource.Select(cosmosReindexWrapper => new ReindexJobWrapper(cosmosReindexWrapper.JobRecord, WeakETag.FromVersionId(cosmosReindexWrapper.ETag))).ToList();
}
catch (CosmosException dce)
{
if (dce.GetSubStatusCode() == HttpStatusCode.RequestEntityTooLarge)
{
throw new RequestRateExceededException(null);
}
return jobList;
_logger.LogError(dce, "Failed to acquire reindex jobs.");
throw;
}
}
public async Task<bool> CheckActiveReindexJobsAsync(CancellationToken cancellationToken)

Просмотреть файл

@ -0,0 +1,33 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using EnsureThat;
using Microsoft.Azure.Cosmos.Scripts;
using Microsoft.Health.Fhir.CosmosDb.Features.Storage.Operations.Reindex;
namespace Microsoft.Health.Fhir.CosmosDb.Features.Storage.StoredProcedures.AcquireReindexJobs
{
internal class AcquireReindexJobs : StoredProcedureBase
{
public async Task<StoredProcedureExecuteResponse<IReadOnlyCollection<CosmosReindexJobRecordWrapper>>> ExecuteAsync(
Scripts client,
ushort maximumNumberOfConcurrentJobsAllowed,
ushort jobHeartbeatTimeoutThresholdInSeconds,
CancellationToken cancellationToken)
{
EnsureArg.IsNotNull(client, nameof(client));
return await ExecuteStoredProc<IReadOnlyCollection<CosmosReindexJobRecordWrapper>>(
client,
CosmosDbReindexConstants.ReindexJobPartitionKey,
cancellationToken,
maximumNumberOfConcurrentJobsAllowed,
jobHeartbeatTimeoutThresholdInSeconds);
}
}
}

Просмотреть файл

@ -0,0 +1,150 @@
/**
* This stored procedure acquires list of available reindex jobs.
*
* @constructor
* @param {string} maximumNumberOfConcurrentJobsAllowedInString - The maximum number of concurrent jobs allowed in string.
* @param {string} jobHeartbeatTimeoutThresholdInSecondsInString - The number of seconds allowed before the job is considered to be stale in string.
*/
function acquireReindexJobs(maximumNumberOfConcurrentJobsAllowedInString, jobHeartbeatTimeoutThresholdInSecondsInString) {
const collection = getContext().getCollection();
const collectionLink = collection.getSelfLink();
const response = getContext().getResponse();
// Validate input
if (!maximumNumberOfConcurrentJobsAllowedInString) {
throwArgumentValidationError(`The required parameter 'maximumNumberOfConcurrentJobsAllowedInString' is not specified.`);
}
let maximumNumberOfConcurrentJobsAllowed = parseInt(maximumNumberOfConcurrentJobsAllowedInString);
if (maximumNumberOfConcurrentJobsAllowed <= 0) {
throwArgumentValidationError(`The specified maximumNumberOfConcurrentJobsAllowedInString with value '${maximumNumberOfConcurrentJobsAllowedInString}' is invalid.`);
}
if (!jobHeartbeatTimeoutThresholdInSecondsInString) {
throwArgumentValidationError(`The required parameter 'jobHeartbeatTimeoutThresholdInSecondsInString' is not specified.`);
}
let jobHeartbeatTimeoutThresholdInSeconds = parseInt(jobHeartbeatTimeoutThresholdInSecondsInString);
if (jobHeartbeatTimeoutThresholdInSeconds <= 0) {
throwArgumentValidationError(`The specified jobHeartbeatTimeoutThresholdInSecondsInString with value '${jobHeartbeatTimeoutThresholdInSecondsInString}' is invalid.`);
}
// Calculate the expiration time in seconds where the job is considered to be stale.
let expirationTime = new Date().setMilliseconds(0) / 1000 - jobHeartbeatTimeoutThresholdInSeconds;
tryQueryRunningJobs();
function tryQueryRunningJobs() {
// Find list of active running jobs.
let query = {
query: `SELECT VALUE COUNT(1) FROM ROOT r WHERE r.jobRecord.status = 'Running' AND r._ts > ${expirationTime}`
};
let isQueryAccepted = collection.queryDocuments(
collectionLink,
query,
{},
function (err, resources) {
if (err) {
throw err;
}
let numberOfRunningJobs = resources[0];
// Based on list of running jobs, query for list of available jobs.
tryQueryAvailableJobs(numberOfRunningJobs);
});
if (!isQueryAccepted) {
// We ran out of time.
throwTooManyRequestsError();
}
}
function tryQueryAvailableJobs(numberOfRunningJobs, continuation) {
let limit = maximumNumberOfConcurrentJobsAllowed - numberOfRunningJobs;
if (limit < 0) {
limit = 0;
}
let query = {
query: `SELECT TOP ${limit} * FROM ROOT r WHERE (r.jobRecord.status = 'Queued' OR (r.jobRecord.status = 'Running' AND r._ts <= ${expirationTime})) ORDER BY r._ts ASC`
};
let requestOptions = {
continuation: continuation
};
let isQueryAccepted = collection.queryDocuments(
collectionLink,
query,
requestOptions,
function (err, documents, responseOptions) {
if (err) {
throw err;
}
if (documents.length > 0) {
// Update each documents.
tryAcquire(documents, 0);
} else if (responseOptions.continuation) {
// The query came back with empty result but has continuation token, follow the token.
tryQueryAvailableJobs(numberOfRunningJobs, responseOptions.continuation);
} else {
// We don't have any documents so we are done.
response.setBody([]);
}
});
if (!isQueryAccepted) {
// We ran out of time.
throwTooManyRequestsError();
}
}
function tryAcquire(documents, index) {
if (documents.length === index) {
// Finished acquiring all jobs.
response.setBody(documents);
} else {
let document = documents[index];
let requestOptions = {
etag: document._etag
};
// Update the state.
document.jobRecord.status = 'Running';
let isQueryAccepted = collection.replaceDocument(
document._self,
document,
requestOptions,
function (err, updatedDocument) {
if (err) {
throw err;
}
documents[index] = updatedDocument;
tryAcquire(documents, index + 1);
});
if (!isQueryAccepted) {
// We ran out of time.
throwTooManyRequestsError();
}
}
}
function throwArgumentValidationError(message) {
throw new Error(ErrorCodes.BadRequest, message);
}
function throwTooManyRequestsError() {
throw new Error(ErrorCodes.RequestEntityTooLarge, `The request could not be completed.`);
}
}

Просмотреть файл

@ -6,6 +6,7 @@
<ItemGroup>
<EmbeddedResource Include="Features\Storage\StoredProcedures\AcquireExportJobs\acquireExportJobs.js" />
<EmbeddedResource Include="Features\Storage\StoredProcedures\AcquireReindexJobs\acquireReindexJobs.js" />
<EmbeddedResource Include="Features\Storage\StoredProcedures\HardDelete\hardDelete.js" />
<EmbeddedResource Include="Features\Storage\StoredProcedures\Upsert\upsertWithHistory.js" />
</ItemGroup>

Просмотреть файл

@ -0,0 +1,219 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Health.Fhir.Core.Features.Operations;
using Microsoft.Health.Fhir.Core.Features.Operations.Reindex.Models;
using Microsoft.Health.Fhir.Tests.Common.FixtureParameters;
using Microsoft.Health.Fhir.Tests.Integration.Features.Operations;
using Microsoft.Health.Fhir.Tests.Integration.Persistence;
using Xunit;
namespace Microsoft.Health.Fhir.Shared.Tests.Integration.Features.Operations
{
[Collection(FhirOperationTestConstants.FhirOperationTests)]
[FhirStorageTestsFixtureArgumentSets(DataStore.CosmosDb)]
public class FhirOperationDataStoreReindexTests : IClassFixture<FhirStorageTestsFixture>, IAsyncLifetime
{
private readonly IFhirOperationDataStore _operationDataStore;
private readonly IFhirStorageTestHelper _testHelper;
public FhirOperationDataStoreReindexTests(FhirStorageTestsFixture fixture)
{
_operationDataStore = fixture.OperationDataStore;
_testHelper = fixture.TestHelper;
}
public async Task InitializeAsync()
{
await _testHelper.DeleteAllReindexJobRecordsAsync();
}
public Task DisposeAsync()
{
return Task.CompletedTask;
}
[Fact]
public async Task GivenThereIsNoRunningReindexJob_WhenAcquiringReindexJobs_ThenAvailableReindexJobsShouldBeReturned()
{
ReindexJobRecord jobRecord = await InsertNewReindexJobRecordAsync();
IReadOnlyCollection<ReindexJobWrapper> jobs = await AcquireReindexJobsAsync();
// The job should be marked as running now since it's acquired.
jobRecord.Status = OperationStatus.Running;
Assert.NotNull(jobs);
Assert.Collection(
jobs,
job => ValidateReindexJobRecord(jobRecord, job.JobRecord));
}
[Theory]
[InlineData(OperationStatus.Canceled)]
[InlineData(OperationStatus.Completed)]
[InlineData(OperationStatus.Failed)]
[InlineData(OperationStatus.Running)]
public async Task GivenNoReindexJobInQueuedState_WhenAcquiringReindexJobs_ThenNoReindexJobShouldBeReturned(OperationStatus operationStatus)
{
ReindexJobRecord jobRecord = await InsertNewReindexJobRecordAsync(jobRecord => jobRecord.Status = operationStatus);
IReadOnlyCollection<ReindexJobWrapper> jobs = await AcquireReindexJobsAsync();
Assert.NotNull(jobs);
Assert.Empty(jobs);
}
[Theory]
[InlineData(1, 0)]
[InlineData(2, 1)]
[InlineData(3, 2)]
public async Task GivenNumberOfRunningReindexJobs_WhenAcquiringReindexJobs_ThenAvailableReindexJobsShouldBeReturned(ushort limit, int expectedNumberOfJobsReturned)
{
await CreateRunningReindexJob();
ReindexJobRecord jobRecord1 = await InsertNewReindexJobRecordAsync(); // Queued
await InsertNewReindexJobRecordAsync(jr => jr.Status = OperationStatus.Canceled);
await InsertNewReindexJobRecordAsync(jr => jr.Status = OperationStatus.Completed);
ReindexJobRecord jobRecord2 = await InsertNewReindexJobRecordAsync(); // Queued
await InsertNewReindexJobRecordAsync(jr => jr.Status = OperationStatus.Failed);
// The jobs that are running or completed should not be acquired.
var expectedJobRecords = new List<ReindexJobRecord> { jobRecord1, jobRecord2 };
IReadOnlyCollection<ReindexJobWrapper> acquiredJobWrappers = await AcquireReindexJobsAsync(maximumNumberOfConcurrentJobAllowed: limit);
Assert.NotNull(acquiredJobWrappers);
Assert.Equal(expectedNumberOfJobsReturned, acquiredJobWrappers.Count);
foreach (ReindexJobWrapper acquiredJobWrapper in acquiredJobWrappers)
{
ReindexJobRecord acquiredJobRecord = acquiredJobWrapper.JobRecord;
ReindexJobRecord expectedJobRecord = expectedJobRecords.SingleOrDefault(job => job.Id == acquiredJobRecord.Id);
Assert.NotNull(expectedJobRecord);
// The job should be marked as running now since it's acquired.
expectedJobRecord.Status = OperationStatus.Running;
ValidateReindexJobRecord(expectedJobRecord, acquiredJobRecord);
}
}
[Fact]
public async Task GivenThereIsRunningReindexJobThatExpired_WhenAcquiringReindexJobs_ThenTheExpiredReindexJobShouldBeReturned()
{
ReindexJobWrapper jobWrapper = await CreateRunningReindexJob();
await Task.Delay(1200);
IReadOnlyCollection<ReindexJobWrapper> expiredJobs = await AcquireReindexJobsAsync(jobHeartbeatTimeoutThreshold: TimeSpan.FromSeconds(1));
Assert.NotNull(expiredJobs);
Assert.Collection(
expiredJobs,
expiredJobWrapper => ValidateReindexJobRecord(jobWrapper.JobRecord, expiredJobWrapper.JobRecord));
}
[Fact]
public async Task GivenThereAreQueuedReindexJobs_WhenSimultaneouslyAcquiringReindexJobs_ThenCorrectNumberOfReindexJobsShouldBeReturned()
{
ReindexJobRecord[] jobRecords = new[]
{
await InsertNewReindexJobRecordAsync(jr => jr.Status = OperationStatus.Queued),
await InsertNewReindexJobRecordAsync(jr => jr.Status = OperationStatus.Queued),
await InsertNewReindexJobRecordAsync(jr => jr.Status = OperationStatus.Queued),
await InsertNewReindexJobRecordAsync(jr => jr.Status = OperationStatus.Queued),
};
var completionSource = new TaskCompletionSource<bool>();
Task<IReadOnlyCollection<ReindexJobWrapper>>[] tasks = new[]
{
WaitAndAcquireReindexJobsAsync(),
WaitAndAcquireReindexJobsAsync(),
};
completionSource.SetResult(true);
await Task.WhenAll(tasks);
// Only 2 jobs should have been acquired in total.
Assert.Equal(2, tasks.Sum(task => task.Result.Count));
// Only 1 of the tasks should be fulfilled.
Assert.Equal(2, tasks[0].Result.Count ^ tasks[1].Result.Count);
async Task<IReadOnlyCollection<ReindexJobWrapper>> WaitAndAcquireReindexJobsAsync()
{
await completionSource.Task;
return await AcquireReindexJobsAsync(maximumNumberOfConcurrentJobAllowed: 2);
}
}
private async Task<ReindexJobWrapper> CreateRunningReindexJob()
{
// Create a queued job.
await InsertNewReindexJobRecordAsync();
// Acquire the job. This will timestamp it and set it to running.
IReadOnlyCollection<ReindexJobWrapper> jobWrappers = await AcquireReindexJobsAsync(maximumNumberOfConcurrentJobAllowed: 1);
Assert.NotNull(jobWrappers);
Assert.Equal(1, jobWrappers.Count);
ReindexJobWrapper jobWrapper = jobWrappers.FirstOrDefault();
Assert.NotNull(jobWrapper);
Assert.NotNull(jobWrapper.JobRecord);
Assert.Equal(OperationStatus.Running, jobWrapper.JobRecord.Status);
return jobWrapper;
}
private async Task<ReindexJobRecord> InsertNewReindexJobRecordAsync(Action<ReindexJobRecord> jobRecordCustomizer = null)
{
var jobRecord = new ReindexJobRecord("searchParamHash", maxiumumConcurrency: 1, scope: "all");
jobRecordCustomizer?.Invoke(jobRecord);
ReindexJobWrapper result = await _operationDataStore.CreateReindexJobAsync(jobRecord, CancellationToken.None);
return result.JobRecord;
}
private async Task<IReadOnlyCollection<ReindexJobWrapper>> AcquireReindexJobsAsync(
ushort maximumNumberOfConcurrentJobAllowed = 1,
TimeSpan? jobHeartbeatTimeoutThreshold = null)
{
if (jobHeartbeatTimeoutThreshold == null)
{
jobHeartbeatTimeoutThreshold = TimeSpan.FromMinutes(1);
}
return await _operationDataStore.AcquireReindexJobsAsync(
maximumNumberOfConcurrentJobAllowed,
jobHeartbeatTimeoutThreshold.Value,
CancellationToken.None);
}
private void ValidateReindexJobRecord(ReindexJobRecord expected, ReindexJobRecord actual)
{
Assert.Equal(expected.Id, actual.Id);
Assert.Equal(expected.CanceledTime, actual.CanceledTime);
Assert.Equal(expected.EndTime, actual.EndTime);
Assert.Equal(expected.Hash, actual.Hash);
Assert.Equal(expected.SchemaVersion, actual.SchemaVersion);
Assert.Equal(expected.StartTime, actual.StartTime);
Assert.Equal(expected.Status, actual.Status);
Assert.Equal(expected.QueuedTime, actual.QueuedTime);
}
}
}

Просмотреть файл

@ -9,6 +9,7 @@
<Import_RootNamespace>Microsoft.Health.Fhir.Shared.Tests.Integration</Import_RootNamespace>
</PropertyGroup>
<ItemGroup>
<Compile Include="$(MSBuildThisFileDirectory)Features\Operations\FhirOperationDataStoreReindexTests.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Features\Operations\FhirOperationDataStoreTests.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Features\Operations\Export\CreateExportRequestHandlerTests.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Features\Operations\FhirOperationTestConstants.cs" />

Просмотреть файл

@ -15,6 +15,7 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Persistence
public class CosmosDbFhirStorageTestHelper : IFhirStorageTestHelper
{
private const string ExportJobPartitionKey = "ExportJob";
private const string ReindexJobPartitionKey = "ReindexJob";
private readonly Container _documentClient;
private readonly string _databaseId;
@ -31,10 +32,25 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Persistence
}
public async Task DeleteAllExportJobRecordsAsync(CancellationToken cancellationToken = default)
{
await DeleteAllRecordsAsync(ExportJobPartitionKey, cancellationToken);
}
public async Task DeleteExportJobRecordAsync(string id, CancellationToken cancellationToken = default)
{
await _documentClient.DeleteItemStreamAsync(id, new PartitionKey(ExportJobPartitionKey), cancellationToken: cancellationToken);
}
public async Task DeleteAllReindexJobRecordsAsync(CancellationToken cancellationToken = default)
{
await DeleteAllRecordsAsync(ReindexJobPartitionKey, cancellationToken);
}
private async Task DeleteAllRecordsAsync(string partitionKey, CancellationToken cancellationToken)
{
var query = _documentClient.GetItemQueryIterator<JObject>(
new QueryDefinition("SELECT doc.id FROM doc"),
requestOptions: new QueryRequestOptions { PartitionKey = new PartitionKey(ExportJobPartitionKey), });
requestOptions: new QueryRequestOptions { PartitionKey = new PartitionKey(partitionKey), });
while (query.HasMoreResults)
{
@ -42,16 +58,11 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Persistence
foreach (dynamic doc in documents)
{
await _documentClient.DeleteItemStreamAsync((string)doc.id, new PartitionKey(ExportJobPartitionKey), cancellationToken: cancellationToken);
await _documentClient.DeleteItemStreamAsync((string)doc.id, new PartitionKey(partitionKey), cancellationToken: cancellationToken);
}
}
}
public async Task DeleteExportJobRecordAsync(string id, CancellationToken cancellationToken = default)
{
await _documentClient.DeleteItemStreamAsync(id, new PartitionKey(ExportJobPartitionKey), cancellationToken: cancellationToken);
}
async Task<object> IFhirStorageTestHelper.GetSnapshotToken()
{
var documentQuery = _documentClient.GetItemQueryIterator<Tuple<int>>(

Просмотреть файл

@ -25,6 +25,13 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Persistence
/// <returns>A task.</returns>
Task DeleteExportJobRecordAsync(string id, CancellationToken cancellationToken = default);
/// <summary>
/// Deletes all reindex job records from the database.
/// </summary>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task.</returns>
Task DeleteAllReindexJobRecordsAsync(CancellationToken cancellationToken = default);
/// <summary>
/// Gets a token representing the state of the database.
/// </summary>

Просмотреть файл

@ -131,6 +131,11 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Persistence
}
}
public Task DeleteAllReindexJobRecordsAsync(CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
async Task<object> IFhirStorageTestHelper.GetSnapshotToken()
{
using (var connection = new SqlConnection(_connectionString))