Implements reindex job management for SQL.
Adds ability to update search indices to support new search parameters.

Refs AB#78973
+semver: feature
This commit is contained in:
Robin Todd 2021-04-01 22:01:44 -07:00 коммит произвёл GitHub
Родитель 0f1582abf3
Коммит 458b27cf6c
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
48 изменённых файлов: 6272 добавлений и 248 удалений

Просмотреть файл

@ -50,7 +50,7 @@ namespace Microsoft.Health.Fhir.Core.Features.Operations
Task<ExportJobOutcome> UpdateExportJobAsync(ExportJobRecord jobRecord, WeakETag eTag, CancellationToken cancellationToken);
/// <summary>
/// Commits a new reindex job record to the data store
/// Commits a new reindex job record to the data store.
/// </summary>
/// <param name="jobRecord">The reindex job record</param>
/// <param name="cancellationToken">The cancellation token</param>
@ -85,7 +85,7 @@ namespace Microsoft.Health.Fhir.Core.Features.Operations
Task<IReadOnlyCollection<ReindexJobWrapper>> AcquireReindexJobsAsync(ushort maximumNumberOfConcurrentJobsAllowed, TimeSpan jobHeartbeatTimeoutThreshold, CancellationToken cancellationToken);
/// <summary>
/// Gets a reindex job by id
/// Gets a reindex job by id.
/// </summary>
/// <param name="jobId">The id of the job.</param>
/// <param name="cancellationToken">The cancellation token</param>

Просмотреть файл

@ -22,8 +22,6 @@ namespace Microsoft.Health.Fhir.Core.Features.Persistence
Task HardDeleteAsync(ResourceKey key, CancellationToken cancellationToken);
Task UpdateSearchParameterHashBatchAsync(IReadOnlyCollection<ResourceWrapper> resources, CancellationToken cancellationToken);
Task UpdateSearchParameterIndicesBatchAsync(IReadOnlyCollection<ResourceWrapper> resources, CancellationToken cancellationToken);
Task<ResourceWrapper> UpdateSearchIndexForResourceAsync(ResourceWrapper resourceWrapper, WeakETag weakETag, CancellationToken cancellationToken);

20
src/Microsoft.Health.Fhir.Core/Resources.Designer.cs сгенерированный
Просмотреть файл

@ -19,7 +19,7 @@ namespace Microsoft.Health.Fhir.Core {
// class via a tool like ResGen or Visual Studio.
// To add or remove a member, edit your .ResX file then rerun ResGen
// with the /str option, or rebuild your VS project.
[global::System.CodeDom.Compiler.GeneratedCodeAttribute("System.Resources.Tools.StronglyTypedResourceBuilder", "16.0.0.0")]
[global::System.CodeDom.Compiler.GeneratedCodeAttribute("System.Resources.Tools.StronglyTypedResourceBuilder", "4.0.0.0")]
[global::System.Diagnostics.DebuggerNonUserCodeAttribute()]
[global::System.Runtime.CompilerServices.CompilerGeneratedAttribute()]
internal class Resources {
@ -754,6 +754,24 @@ namespace Microsoft.Health.Fhir.Core {
}
}
/// <summary>
/// Looks up a localized string similar to One or more resources to be reindexed were not found, indicating that they have been deleted since the reindex job kicked off..
/// </summary>
internal static string ReindexingResourceNotFound {
get {
return ResourceManager.GetString("ReindexingResourceNotFound", resourceCulture);
}
}
/// <summary>
/// Looks up a localized string similar to The version of one or more resources did not match, indicating that they have been updated since the reindex job kicked off..
/// </summary>
internal static string ReindexingResourceVersionConflict {
get {
return ResourceManager.GetString("ReindexingResourceVersionConflict", resourceCulture);
}
}
/// <summary>
/// Looks up a localized string similar to Reindex job id {0} is in state {1} and cannot be cancelled..
/// </summary>

Просмотреть файл

@ -1,17 +1,17 @@
<?xml version="1.0" encoding="utf-8"?>
<root>
<!--
Microsoft ResX Schema
<!--
Microsoft ResX Schema
Version 2.0
The primary goals of this format is to allow a simple XML format
that is mostly human readable. The generation and parsing of the
various data types are done through the TypeConverter classes
The primary goals of this format is to allow a simple XML format
that is mostly human readable. The generation and parsing of the
various data types are done through the TypeConverter classes
associated with the data types.
Example:
... ado.net/XML headers & schema ...
<resheader name="resmimetype">text/microsoft-resx</resheader>
<resheader name="version">2.0</resheader>
@ -26,36 +26,36 @@
<value>[base64 mime encoded string representing a byte array form of the .NET Framework object]</value>
<comment>This is a comment</comment>
</data>
There are any number of "resheader" rows that contain simple
There are any number of "resheader" rows that contain simple
name/value pairs.
Each data row contains a name, and value. The row also contains a
type or mimetype. Type corresponds to a .NET class that support
text/value conversion through the TypeConverter architecture.
Classes that don't support this are serialized and stored with the
Each data row contains a name, and value. The row also contains a
type or mimetype. Type corresponds to a .NET class that support
text/value conversion through the TypeConverter architecture.
Classes that don't support this are serialized and stored with the
mimetype set.
The mimetype is used for serialized objects, and tells the
ResXResourceReader how to depersist the object. This is currently not
The mimetype is used for serialized objects, and tells the
ResXResourceReader how to depersist the object. This is currently not
extensible. For a given mimetype the value must be set accordingly:
Note - application/x-microsoft.net.object.binary.base64 is the format
that the ResXResourceWriter will generate, however the reader can
Note - application/x-microsoft.net.object.binary.base64 is the format
that the ResXResourceWriter will generate, however the reader can
read any of the formats listed below.
mimetype: application/x-microsoft.net.object.binary.base64
value : The object must be serialized with
value : The object must be serialized with
: System.Runtime.Serialization.Formatters.Binary.BinaryFormatter
: and then encoded with base64 encoding.
mimetype: application/x-microsoft.net.object.soap.base64
value : The object must be serialized with
value : The object must be serialized with
: System.Runtime.Serialization.Formatters.Soap.SoapFormatter
: and then encoded with base64 encoding.
mimetype: application/x-microsoft.net.object.bytearray.base64
value : The object must be serialized into a byte array
value : The object must be serialized into a byte array
: using a System.ComponentModel.TypeConverter
: and then encoded with base64 encoding.
-->
@ -538,4 +538,10 @@
<value>The 'handling' value '{0}' is invalid. The supported values are: {1}.</value>
<comment>{0}: the invalid handling value. {1}: the supported handling values.</comment>
</data>
</root>
<data name="ReindexingResourceVersionConflict" xml:space="preserve">
<value>The version of one or more resources did not match, indicating that they have been updated since the reindex job kicked off.</value>
</data>
<data name="ReindexingResourceNotFound" xml:space="preserve">
<value>One or more resources to be reindexed were not found, indicating that they have been deleted since the reindex job kicked off.</value>
</data>
</root>

Просмотреть файл

@ -306,16 +306,6 @@ namespace Microsoft.Health.Fhir.CosmosDb.Features.Storage
}
}
public async Task UpdateSearchParameterHashBatchAsync(IReadOnlyCollection<ResourceWrapper> resources, CancellationToken cancellationToken)
{
// TODO: use batch command to update both hash values and search index values for list updateSearchIndices
// this is a place holder update until we batch update resources
foreach (var resource in resources)
{
await UpdateSearchIndexForResourceAsync(resource, WeakETag.FromVersionId(resource.Version), cancellationToken);
}
}
public async Task UpdateSearchParameterIndicesBatchAsync(IReadOnlyCollection<ResourceWrapper> resources, CancellationToken cancellationToken)
{
// TODO: use batch command to update both hash values and search index values for list updateSearchIndices

Просмотреть файл

@ -19,7 +19,7 @@
"SupportsUI": false,
"SupportsXml": true,
"SupportsAnonymizedExport": false
},
"CoreFeatures": {
"SupportsBatch": true,

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -17,5 +17,6 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Schema
V5 = 5,
V6 = 6,
V7 = 7,
V8 = 8,
}
}

Просмотреть файл

@ -8,8 +8,9 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Schema
public static class SchemaVersionConstants
{
public const int Min = (int)SchemaVersion.V4;
public const int Max = (int)SchemaVersion.V7;
public const int Max = (int)SchemaVersion.V8;
public const int SearchParameterStatusSchemaVersion = (int)SchemaVersion.V6;
public const int SupportForReferencesWithMissingTypeVersion = (int)SchemaVersion.V7;
public const int SearchParameterHashSchemaVersion = (int)SchemaVersion.V8;
}
}

Просмотреть файл

@ -11,6 +11,7 @@ using EnsureThat;
using Microsoft.Health.Fhir.Core.Features;
using Microsoft.Health.Fhir.Core.Features.Search;
using Microsoft.Health.Fhir.Core.Features.Search.Expressions;
using Microsoft.Health.Fhir.SqlServer.Features.Schema;
using Microsoft.Health.Fhir.SqlServer.Features.Schema.Model;
using Microsoft.Health.Fhir.SqlServer.Features.Storage;
using Microsoft.Health.SqlServer;
@ -29,14 +30,21 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Search.Expressions.Visitors.Q
private List<string> _includeFromCteIds;
private int _curFromCteIndex = -1;
private readonly bool _isHistorySearch;
private readonly SqlSearchType _searchType;
private readonly string _searchParameterHash;
private int _tableExpressionCounter = -1;
private SqlRootExpression _rootExpression;
private readonly SchemaInformation _schemaInfo;
private bool _sortVisited = false;
private HashSet<int> _cteToLimit = new HashSet<int>();
public SqlQueryGenerator(IndentedStringBuilder sb, SqlQueryParameterManager parameters, SqlServerFhirModel model, bool isHistorySearch, SchemaInformation schemaInfo)
public SqlQueryGenerator(
IndentedStringBuilder sb,
SqlQueryParameterManager parameters,
SqlServerFhirModel model,
SqlSearchType searchType,
SchemaInformation schemaInfo,
string searchParameterHash)
{
EnsureArg.IsNotNull(sb, nameof(sb));
EnsureArg.IsNotNull(parameters, nameof(parameters));
@ -46,8 +54,9 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Search.Expressions.Visitors.Q
StringBuilder = sb;
Parameters = parameters;
Model = model;
_isHistorySearch = isHistorySearch;
_searchType = searchType;
_schemaInfo = schemaInfo;
_searchParameterHash = searchParameterHash;
}
public IndentedStringBuilder StringBuilder { get; }
@ -132,14 +141,11 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Search.Expressions.Visitors.Q
StringBuilder.Append(expression.SearchParamTableExpressions.Count > 0 ? "CAST(IsMatch AS bit) AS IsMatch, " : "CAST(1 AS bit) AS IsMatch, ");
StringBuilder.Append(expression.SearchParamTableExpressions.Count > 0 ? "CAST(IsPartial AS bit) AS IsPartial, " : "CAST(0 AS bit) AS IsPartial, ");
if (_schemaInfo.Current > 3)
StringBuilder.Append(VLatest.Resource.IsRawResourceMetaSet, resourceTableAlias).Append(", ");
if (_schemaInfo.Current >= SchemaVersionConstants.SearchParameterHashSchemaVersion)
{
// IsRawResourceMetaSet column was added in V4
StringBuilder.Append(VLatest.Resource.IsRawResourceMetaSet, resourceTableAlias).Append(", ");
}
else
{
StringBuilder.Append("CAST(0 AS bit) AS IsRawResourceMetaSet, ");
StringBuilder.Append(VLatest.Resource.SearchParamHash, resourceTableAlias).Append(", ");
}
StringBuilder.Append(VLatest.Resource.RawResource, resourceTableAlias);
@ -156,7 +162,7 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Search.Expressions.Visitors.Q
StringBuilder.Append("FROM ").Append(VLatest.Resource).Append(" ").Append(resourceTableAlias);
if (expression.SearchParamTableExpressions.Count == 0 &&
!_isHistorySearch &&
!_searchType.HasFlag(SqlSearchType.History) &&
expression.ResourceTableExpressions.Any(e => e.AcceptVisitor(ExpressionContainsParameterVisitor.Instance, SearchParameterNames.ResourceType)) &&
!expression.ResourceTableExpressions.Any(e => e.AcceptVisitor(ExpressionContainsParameterVisitor.Instance, SearchParameterNames.Id)))
{
@ -187,6 +193,7 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Search.Expressions.Visitors.Q
{
AppendHistoryClause(delimitedClause);
AppendDeletedClause(delimitedClause);
AppendSearchParameterHashClause(delimitedClause);
}
}
@ -830,7 +837,7 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Search.Expressions.Visitors.Q
private void AppendDeletedClause(in IndentedStringBuilder.DelimitedScope delimited, string tableAlias = null)
{
if (!_isHistorySearch)
if (!_searchType.HasFlag(SqlSearchType.History))
{
delimited.BeginDelimitedElement().Append(VLatest.Resource.IsDeleted, tableAlias).Append(" = 0");
}
@ -838,7 +845,7 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Search.Expressions.Visitors.Q
private void AppendHistoryClause(in IndentedStringBuilder.DelimitedScope delimited, string tableAlias = null)
{
if (!_isHistorySearch)
if (!_searchType.HasFlag(SqlSearchType.History))
{
delimited.BeginDelimitedElement();
@ -846,6 +853,16 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Search.Expressions.Visitors.Q
}
}
private void AppendSearchParameterHashClause(in IndentedStringBuilder.DelimitedScope delimited, string tableAlias = null)
{
if (_searchType.HasFlag(SqlSearchType.Reindex))
{
delimited.BeginDelimitedElement();
StringBuilder.Append("(").Append(VLatest.Resource.SearchParamHash, tableAlias).Append(" != ").Append(Parameters.AddParameter(_searchParameterHash)).Append(" OR ").Append(VLatest.Resource.SearchParamHash, tableAlias).Append(" IS NULL)");
}
}
private void AddIncludeLimitCte(string resourceType, string cte)
{
_includeLimitCtesByResourceType ??= new Dictionary<string, List<string>>();

Просмотреть файл

@ -0,0 +1,28 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------
using System;
namespace Microsoft.Health.Fhir.SqlServer.Features.Search
{
/// <summary>
/// Enum to specify the type of search to carry out.
/// </summary>
/// <remarks>
/// With the exception of None, the flags attribute requires enumeration constants to be powers of two, that is, 1, 2, 4, 8.
/// </remarks>
[Flags]
internal enum SqlSearchType
{
// Set if we do not need to consider history or reindexing
Default = 0,
// Set if we are including previous resource versions or deleted resources
History = 1 << 0,
// Set if the search parameter hash value needs to be considered in a search
Reindex = 1 << 1,
}
}

Просмотреть файл

@ -21,6 +21,7 @@ using Microsoft.Health.Fhir.Core.Features.Persistence;
using Microsoft.Health.Fhir.Core.Features.Search;
using Microsoft.Health.Fhir.Core.Features.Search.Expressions;
using Microsoft.Health.Fhir.Core.Models;
using Microsoft.Health.Fhir.SqlServer.Features.Schema;
using Microsoft.Health.Fhir.SqlServer.Features.Schema.Model;
using Microsoft.Health.Fhir.SqlServer.Features.Search.Expressions;
using Microsoft.Health.Fhir.SqlServer.Features.Search.Expressions.Visitors;
@ -51,6 +52,7 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Search
private readonly SchemaInformation _schemaInformation;
private readonly ISortingValidator _sortingValidator;
private readonly IFhirRequestContextAccessor _requestContextAccessor;
private const int _resourceTableColumnCount = 10;
public SqlServerSearchService(
ISearchOptionsFactory searchOptionsFactory,
@ -93,7 +95,7 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Search
// If we should include the total count of matching search results
if (searchOptions.IncludeTotal == TotalType.Accurate && !searchOptions.CountOnly)
{
searchResult = await SearchImpl(searchOptions, false, cancellationToken);
searchResult = await SearchImpl(searchOptions, SqlSearchType.Default, null, cancellationToken);
// If this is the first page and there aren't any more pages
if (searchOptions.ContinuationToken == null && searchResult.ContinuationToken == null)
@ -109,7 +111,7 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Search
searchOptions.CountOnly = true;
// And perform a second read.
var countOnlySearchResult = await SearchImpl(searchOptions, false, cancellationToken);
var countOnlySearchResult = await SearchImpl(searchOptions, SqlSearchType.Default, null, cancellationToken);
searchResult.TotalCount = countOnlySearchResult.TotalCount;
}
@ -122,7 +124,7 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Search
}
else
{
searchResult = await SearchImpl(searchOptions, false, cancellationToken);
searchResult = await SearchImpl(searchOptions, SqlSearchType.Default, null, cancellationToken);
}
return searchResult;
@ -130,10 +132,10 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Search
protected override async Task<SearchResult> SearchHistoryInternalAsync(SearchOptions searchOptions, CancellationToken cancellationToken)
{
return await SearchImpl(searchOptions, true, cancellationToken);
return await SearchImpl(searchOptions, SqlSearchType.History, null, cancellationToken);
}
private async Task<SearchResult> SearchImpl(SearchOptions searchOptions, bool historySearch, CancellationToken cancellationToken)
private async Task<SearchResult> SearchImpl(SearchOptions searchOptions, SqlSearchType searchType, string currentSearchParameterHash, CancellationToken cancellationToken)
{
Expression searchExpression = searchOptions.Expression;
@ -195,7 +197,13 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Search
EnableTimeAndIoMessageLogging(stringBuilder, sqlConnectionWrapper);
var queryGenerator = new SqlQueryGenerator(stringBuilder, new SqlQueryParameterManager(sqlCommandWrapper.Parameters), _model, historySearch, _schemaInformation);
var queryGenerator = new SqlQueryGenerator(
stringBuilder,
new SqlQueryParameterManager(sqlCommandWrapper.Parameters),
_model,
searchType,
_schemaInformation,
currentSearchParameterHash);
expression.AcceptVisitor(queryGenerator, searchOptions);
@ -228,27 +236,19 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Search
while (await reader.ReadAsync(cancellationToken))
{
(short resourceTypeId,
string resourceId,
int version,
bool isDeleted,
long resourceSurrogateId,
string requestMethod,
bool isMatch,
bool isPartialEntry,
bool isRawResourceMetaSet,
Stream rawResourceStream) =
reader.ReadRow(
VLatest.Resource.ResourceTypeId,
VLatest.Resource.ResourceId,
VLatest.Resource.Version,
VLatest.Resource.IsDeleted,
VLatest.Resource.ResourceSurrogateId,
VLatest.Resource.RequestMethod,
_isMatch,
_isPartial,
VLatest.Resource.IsRawResourceMetaSet,
VLatest.Resource.RawResource);
PopulateResourceTableColumnsToRead(
reader,
out short resourceTypeId,
out string resourceId,
out int version,
out bool isDeleted,
out long resourceSurrogateId,
out string requestMethod,
out bool isMatch,
out bool isPartialEntry,
out bool isRawResourceMetaSet,
out string searchParameterHash,
out Stream rawResourceStream);
// If we get to this point, we know there are more results so we need a continuation token
// Additionally, this resource shouldn't be included in the results
@ -272,7 +272,7 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Search
// Keep track of sort value if this is the last row.
// if we have more than 10 columns, it means sort expressions were added.
if (matchCount == searchOptions.MaxItemCount - 1 && reader.FieldCount > 10)
if (matchCount == searchOptions.MaxItemCount - 1 && reader.FieldCount > _resourceTableColumnCount + 1)
{
sortValue = reader.GetValue(SortValueColumnName) as DateTime?;
}
@ -295,7 +295,8 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Search
isDeleted,
null,
null,
null),
null,
searchParameterHash),
isMatch ? SearchEntryMode.Match : SearchEntryMode.Include));
}
@ -337,6 +338,55 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Search
}
}
private void PopulateResourceTableColumnsToRead(
SqlDataReader reader,
out short resourceTypeId,
out string resourceId,
out int version,
out bool isDeleted,
out long resourceSurrogateId,
out string requestMethod,
out bool isMatch,
out bool isPartialEntry,
out bool isRawResourceMetaSet,
out string searchParameterHash,
out Stream rawResourceStream)
{
searchParameterHash = null;
if (_schemaInformation.Current >= SchemaVersionConstants.SearchParameterHashSchemaVersion)
{
(resourceTypeId, resourceId, version, isDeleted, resourceSurrogateId, requestMethod, isMatch, isPartialEntry,
isRawResourceMetaSet, searchParameterHash, rawResourceStream) = reader.ReadRow(
VLatest.Resource.ResourceTypeId,
VLatest.Resource.ResourceId,
VLatest.Resource.Version,
VLatest.Resource.IsDeleted,
VLatest.Resource.ResourceSurrogateId,
VLatest.Resource.RequestMethod,
_isMatch,
_isPartial,
VLatest.Resource.IsRawResourceMetaSet,
VLatest.Resource.SearchParamHash,
VLatest.Resource.RawResource);
}
else
{
(resourceTypeId, resourceId, version, isDeleted, resourceSurrogateId, requestMethod, isMatch, isPartialEntry,
isRawResourceMetaSet, rawResourceStream) = reader.ReadRow(
VLatest.Resource.ResourceTypeId,
VLatest.Resource.ResourceId,
VLatest.Resource.Version,
VLatest.Resource.IsDeleted,
VLatest.Resource.ResourceSurrogateId,
VLatest.Resource.RequestMethod,
_isMatch,
_isPartial,
VLatest.Resource.IsRawResourceMetaSet,
VLatest.Resource.RawResource);
}
}
[Conditional("DEBUG")]
private void EnableTimeAndIoMessageLogging(IndentedStringBuilder stringBuilder, SqlConnectionWrapper sqlConnectionWrapper)
{
@ -372,9 +422,9 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Search
_logger.LogInformation(sb.ToString());
}
protected override Task<SearchResult> SearchForReindexInternalAsync(SearchOptions searchOptions, string searchParameterHash, CancellationToken cancellationToken)
protected async override Task<SearchResult> SearchForReindexInternalAsync(SearchOptions searchOptions, string searchParameterHash, CancellationToken cancellationToken)
{
throw new NotImplementedException();
return await SearchImpl(searchOptions, SqlSearchType.Reindex, searchParameterHash, cancellationToken);
}
}
}

Просмотреть файл

@ -30,22 +30,26 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Storage.Registry
private readonly VLatest.UpsertSearchParamsTvpGenerator<List<ResourceSearchParameterStatus>> _updateSearchParamsTvpGenerator;
private readonly ISearchParameterStatusDataStore _filebasedSearchParameterStatusDataStore;
private readonly SchemaInformation _schemaInformation;
private readonly SqlServerFhirModel _fhirModel;
public SqlServerSearchParameterStatusDataStore(
Func<IScoped<SqlConnectionWrapperFactory>> scopedSqlConnectionWrapperFactory,
VLatest.UpsertSearchParamsTvpGenerator<List<ResourceSearchParameterStatus>> updateSearchParamsTvpGenerator,
FilebasedSearchParameterStatusDataStore.Resolver filebasedRegistry,
SchemaInformation schemaInformation)
SchemaInformation schemaInformation,
SqlServerFhirModel fhirModel)
{
EnsureArg.IsNotNull(scopedSqlConnectionWrapperFactory, nameof(scopedSqlConnectionWrapperFactory));
EnsureArg.IsNotNull(updateSearchParamsTvpGenerator, nameof(updateSearchParamsTvpGenerator));
EnsureArg.IsNotNull(filebasedRegistry, nameof(filebasedRegistry));
EnsureArg.IsNotNull(schemaInformation, nameof(schemaInformation));
EnsureArg.IsNotNull(fhirModel, nameof(fhirModel));
_scopedSqlConnectionWrapperFactory = scopedSqlConnectionWrapperFactory;
_updateSearchParamsTvpGenerator = updateSearchParamsTvpGenerator;
_filebasedSearchParameterStatusDataStore = filebasedRegistry.Invoke();
_schemaInformation = schemaInformation;
_fhirModel = fhirModel;
}
// TODO: Make cancellation token an input.
@ -131,7 +135,17 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Storage.Registry
{
VLatest.UpsertSearchParams.PopulateCommand(sqlCommandWrapper, _updateSearchParamsTvpGenerator.Generate(statuses));
await sqlCommandWrapper.ExecuteNonQueryAsync(CancellationToken.None);
using (SqlDataReader sqlDataReader = await sqlCommandWrapper.ExecuteReaderAsync(CommandBehavior.SequentialAccess, CancellationToken.None))
{
while (await sqlDataReader.ReadAsync())
{
// The upsert procedure returns the search parameters that were new.
(short searchParamId, string searchParamUri) = sqlDataReader.ReadRow(VLatest.SearchParam.SearchParamId, VLatest.SearchParam.Uri);
// Add the new search parameters to the FHIR model dictionary.
_fhirModel.AddSearchParamIdToUriMapping(searchParamUri, searchParamId);
}
}
}
}
}

Просмотреть файл

@ -37,11 +37,13 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Storage
/// </summary>
internal class SqlServerFhirDataStore : IFhirDataStore, IProvideCapability
{
private readonly SqlServerDataStoreConfiguration _configuration;
private readonly SqlServerFhirModel _model;
private readonly SearchParameterToSearchValueTypeMap _searchParameterTypeMap;
private readonly V6.UpsertResourceTvpGenerator<ResourceMetadata> _upsertResourceTvpGeneratorV6;
private readonly VLatest.UpsertResourceTvpGenerator<ResourceMetadata> _upsertResourceTvpGeneratorVLatest;
private readonly V7.UpsertResourceTvpGenerator<ResourceMetadata> _upsertResourceTvpGeneratorV7;
private readonly VLatest.UpsertResourceTvpGenerator<IReadOnlyList<ResourceWrapper>> _upsertResourceTvpGeneratorVLatest;
private readonly VLatest.ReindexResourceTvpGenerator<IReadOnlyList<ResourceWrapper>> _reindexResourceTvpGeneratorVLatest;
private readonly VLatest.BulkReindexResourcesTvpGenerator<IReadOnlyList<ResourceWrapper>> _bulkReindexResourcesTvpGeneratorVLatest;
private readonly RecyclableMemoryStreamManager _memoryStreamManager;
private readonly CoreFeatureConfiguration _coreFeatures;
private readonly SqlConnectionWrapperFactory _sqlConnectionWrapperFactory;
@ -49,31 +51,37 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Storage
private readonly SchemaInformation _schemaInformation;
public SqlServerFhirDataStore(
SqlServerDataStoreConfiguration configuration,
SqlServerFhirModel model,
SearchParameterToSearchValueTypeMap searchParameterTypeMap,
V6.UpsertResourceTvpGenerator<ResourceMetadata> upsertResourceTvpGeneratorV6,
VLatest.UpsertResourceTvpGenerator<ResourceMetadata> upsertResourceTvpGeneratorVLatest,
V7.UpsertResourceTvpGenerator<ResourceMetadata> upsertResourceTvpGeneratorV7,
VLatest.UpsertResourceTvpGenerator<IReadOnlyList<ResourceWrapper>> upsertResourceTvpGeneratorVLatest,
VLatest.ReindexResourceTvpGenerator<IReadOnlyList<ResourceWrapper>> reindexResourceTvpGeneratorVLatest,
VLatest.BulkReindexResourcesTvpGenerator<IReadOnlyList<ResourceWrapper>> bulkReindexResourcesTvpGeneratorVLatest,
IOptions<CoreFeatureConfiguration> coreFeatures,
SqlConnectionWrapperFactory sqlConnectionWrapperFactory,
ILogger<SqlServerFhirDataStore> logger,
SchemaInformation schemaInformation)
{
EnsureArg.IsNotNull(configuration, nameof(configuration));
EnsureArg.IsNotNull(model, nameof(model));
EnsureArg.IsNotNull(searchParameterTypeMap, nameof(searchParameterTypeMap));
EnsureArg.IsNotNull(upsertResourceTvpGeneratorV6, nameof(upsertResourceTvpGeneratorV6));
EnsureArg.IsNotNull(upsertResourceTvpGeneratorV7, nameof(upsertResourceTvpGeneratorV7));
EnsureArg.IsNotNull(upsertResourceTvpGeneratorVLatest, nameof(upsertResourceTvpGeneratorVLatest));
EnsureArg.IsNotNull(reindexResourceTvpGeneratorVLatest, nameof(reindexResourceTvpGeneratorVLatest));
EnsureArg.IsNotNull(bulkReindexResourcesTvpGeneratorVLatest, nameof(bulkReindexResourcesTvpGeneratorVLatest));
EnsureArg.IsNotNull(coreFeatures, nameof(coreFeatures));
EnsureArg.IsNotNull(sqlConnectionWrapperFactory, nameof(sqlConnectionWrapperFactory));
EnsureArg.IsNotNull(logger, nameof(logger));
EnsureArg.IsNotNull(schemaInformation, nameof(schemaInformation));
_configuration = configuration;
_model = model;
_searchParameterTypeMap = searchParameterTypeMap;
_upsertResourceTvpGeneratorV6 = upsertResourceTvpGeneratorV6;
_upsertResourceTvpGeneratorV7 = upsertResourceTvpGeneratorV7;
_upsertResourceTvpGeneratorVLatest = upsertResourceTvpGeneratorVLatest;
_reindexResourceTvpGeneratorVLatest = reindexResourceTvpGeneratorVLatest;
_bulkReindexResourcesTvpGeneratorVLatest = bulkReindexResourcesTvpGeneratorVLatest;
_coreFeatures = coreFeatures.Value;
_sqlConnectionWrapperFactory = sqlConnectionWrapperFactory;
_logger = logger;
@ -155,9 +163,25 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Storage
long baseResourceSurrogateId = ResourceSurrogateIdHelper.LastUpdatedToResourceSurrogateId(resource.LastModified.UtcDateTime);
short resourceTypeId = _model.GetResourceTypeId(resource.ResourceTypeName);
if (_schemaInformation.Current >= SchemaVersionConstants.SupportForReferencesWithMissingTypeVersion)
if (_schemaInformation.Current >= SchemaVersionConstants.SearchParameterHashSchemaVersion)
{
VLatest.UpsertResource.PopulateCommand(
sqlCommandWrapper,
baseResourceSurrogateId: ResourceSurrogateIdHelper.LastUpdatedToResourceSurrogateId(resource.LastModified.UtcDateTime),
resourceTypeId: _model.GetResourceTypeId(resource.ResourceTypeName),
resourceId: resource.ResourceId,
eTag: eTag,
allowCreate: allowCreate,
isDeleted: resource.IsDeleted,
keepHistory: keepHistory,
requestMethod: resource.Request.Method,
searchParamHash: resource.SearchParameterHash,
rawResource: stream,
tableValuedParameters: _upsertResourceTvpGeneratorVLatest.Generate(new List<ResourceWrapper> { resource }));
}
else if (_schemaInformation.Current >= SchemaVersionConstants.SupportForReferencesWithMissingTypeVersion)
{
V7.UpsertResource.PopulateCommand(
sqlCommandWrapper,
baseResourceSurrogateId: baseResourceSurrogateId,
resourceTypeId: resourceTypeId,
@ -168,7 +192,7 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Storage
keepHistory: keepHistory,
requestMethod: resource.Request.Method,
rawResource: stream,
tableValuedParameters: _upsertResourceTvpGeneratorVLatest.Generate(resourceMetadata));
tableValuedParameters: _upsertResourceTvpGeneratorV7.Generate(resourceMetadata));
}
else
{
@ -232,11 +256,13 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Storage
rawResource = await CompressedRawResourceConverter.ReadCompressedRawResource(rawResourceStream);
}
bool isRawResourceMetaSet = false;
var isRawResourceMetaSet = sqlDataReader.Read(resourceTable.IsRawResourceMetaSet, 5);
if (_schemaInformation.Current >= 4)
string searchParamHash = null;
if (_schemaInformation.Current >= SchemaVersionConstants.SearchParameterHashSchemaVersion)
{
isRawResourceMetaSet = sqlDataReader.Read(resourceTable.IsRawResourceMetaSet, 5);
searchParamHash = sqlDataReader.Read(resourceTable.SearchParamHash, 6);
}
return new ResourceWrapper(
@ -249,7 +275,8 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Storage
isDeleted,
searchIndices: null,
compartmentIndices: null,
lastModifiedClaims: null)
lastModifiedClaims: null,
searchParamHash)
{
IsHistory = isHistory,
};
@ -269,24 +296,33 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Storage
}
}
public async Task UpdateSearchParameterHashBatchAsync(IReadOnlyCollection<ResourceWrapper> resources, CancellationToken cancellationToken)
{
// TODO: use bach command to update only hash values for list updateHashValueOnly
// this is a place holder update until we batch update resources
foreach (var resource in resources)
{
await UpsertAsync(resource, WeakETag.FromVersionId(resource.Version), false, true, cancellationToken);
}
}
public async Task UpdateSearchParameterIndicesBatchAsync(IReadOnlyCollection<ResourceWrapper> resources, CancellationToken cancellationToken)
{
// TODO: use batch command to update both hash values and search index values for list updateSearchIndices
// this is a place holder update until we batch update resources
foreach (var resource in resources)
using (SqlConnectionWrapper sqlConnectionWrapper = await _sqlConnectionWrapperFactory.ObtainSqlConnectionWrapperAsync(cancellationToken, true))
using (SqlCommandWrapper sqlCommandWrapper = sqlConnectionWrapper.CreateSqlCommand())
{
await UpsertAsync(resource, WeakETag.FromVersionId(resource.Version), false, true, cancellationToken);
VLatest.BulkReindexResources.PopulateCommand(
sqlCommandWrapper,
_bulkReindexResourcesTvpGeneratorVLatest.Generate(resources.ToList()));
try
{
await sqlCommandWrapper.ExecuteNonQueryAsync(cancellationToken);
}
catch (SqlException e)
{
switch (e.Number)
{
// TODO: we should attempt to reindex resources that failed to be reindexed
case SqlErrorCodes.PreconditionFailed:
throw new PreconditionFailedException(string.Format(Core.Resources.ReindexingResourceVersionConflict));
case SqlErrorCodes.NotFound:
throw new ResourceNotFoundException(string.Format(Core.Resources.ReindexingResourceNotFound));
default:
_logger.LogError(e, "Error from SQL database on reindex");
throw;
}
}
}
}
@ -311,9 +347,44 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Storage
}
}
public Task<ResourceWrapper> UpdateSearchIndexForResourceAsync(ResourceWrapper resourceWrapper, WeakETag weakETag, CancellationToken cancellationToken)
public async Task<ResourceWrapper> UpdateSearchIndexForResourceAsync(ResourceWrapper resource, WeakETag weakETag, CancellationToken cancellationToken)
{
throw new NotImplementedException();
int? eTag = weakETag == null
? (int?)null
: (int.TryParse(weakETag.VersionId, out var parsedETag) ? parsedETag : -1); // Set the etag to a sentinel value to enable expected failure paths when updating with both existing and nonexistent resources.
using (SqlConnectionWrapper sqlConnectionWrapper = await _sqlConnectionWrapperFactory.ObtainSqlConnectionWrapperAsync(cancellationToken, true))
using (SqlCommandWrapper sqlCommandWrapper = sqlConnectionWrapper.CreateSqlCommand())
{
VLatest.ReindexResource.PopulateCommand(
sqlCommandWrapper,
resourceTypeId: _model.GetResourceTypeId(resource.ResourceTypeName),
resourceId: resource.ResourceId,
eTag,
searchParamHash: resource.SearchParameterHash,
tableValuedParameters: _reindexResourceTvpGeneratorVLatest.Generate(new List<ResourceWrapper> { resource }));
try
{
await sqlCommandWrapper.ExecuteScalarAsync(cancellationToken);
return resource;
}
catch (SqlException e)
{
switch (e.Number)
{
case SqlErrorCodes.PreconditionFailed:
// TODO: we should attempt to reindex the resource
throw new PreconditionFailedException(string.Format(Core.Resources.ReindexingResourceVersionConflict));
case SqlErrorCodes.NotFound:
throw new ResourceNotFoundException(string.Format(Core.Resources.ReindexingResourceNotFound));
default:
_logger.LogError(e, "Error from SQL database on reindex");
throw;
}
}
}
}
public async Task<int?> GetProvisionedDataStoreCapacityAsync(CancellationToken cancellationToken = default)

Просмотреть файл

@ -107,6 +107,20 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Storage
return _searchParamUriToId[searchParamUri];
}
public void AddSearchParamIdToUriMapping(string searchParamUri, short searchParamId)
{
ThrowIfNotInitialized();
_searchParamUriToId.Add(new Uri(searchParamUri), searchParamId);
}
public void RemoveSearchParamIdToUriMapping(string searchParamUri)
{
ThrowIfNotInitialized();
_searchParamUriToId.Remove(new Uri(searchParamUri));
}
public byte GetCompartmentTypeId(string compartmentType)
{
ThrowIfNotInitialized();

Просмотреть файл

@ -6,6 +6,7 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
@ -22,7 +23,6 @@ using Microsoft.Health.SqlServer;
using Microsoft.Health.SqlServer.Features.Client;
using Microsoft.Health.SqlServer.Features.Storage;
using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;
namespace Microsoft.Health.Fhir.SqlServer.Features.Storage
{
@ -44,7 +44,6 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Storage
_jsonSerializerSettings = new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver(),
Converters = new List<JsonConverter>
{
new EnumLiteralJsonConverter(),
@ -195,9 +194,147 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Storage
}
}
public Task<IReadOnlyCollection<ReindexJobWrapper>> AcquireReindexJobsAsync(ushort maximumNumberOfConcurrentJobsAllowed, TimeSpan jobHeartbeatTimeoutThreshold, CancellationToken cancellationToken)
public async Task<ReindexJobWrapper> CreateReindexJobAsync(ReindexJobRecord jobRecord, CancellationToken cancellationToken)
{
throw new NotImplementedException();
using (SqlConnectionWrapper sqlConnectionWrapper = await _sqlConnectionWrapperFactory.ObtainSqlConnectionWrapperAsync(cancellationToken, true))
using (SqlCommandWrapper sqlCommandWrapper = sqlConnectionWrapper.CreateSqlCommand())
{
VLatest.CreateReindexJob.PopulateCommand(
sqlCommandWrapper,
jobRecord.Id,
jobRecord.Status.ToString(),
JsonConvert.SerializeObject(jobRecord, _jsonSerializerSettings));
var rowVersion = (int?)await sqlCommandWrapper.ExecuteScalarAsync(cancellationToken);
if (rowVersion == null)
{
throw new OperationFailedException(string.Format(Core.Resources.OperationFailed, OperationsConstants.Reindex, "Failed to create reindex job because no row version was returned."), HttpStatusCode.InternalServerError);
}
return new ReindexJobWrapper(jobRecord, WeakETag.FromVersionId(rowVersion.ToString()));
}
}
public async Task<ReindexJobWrapper> GetReindexJobByIdAsync(string id, CancellationToken cancellationToken)
{
EnsureArg.IsNotNullOrWhiteSpace(id, nameof(id));
using (SqlConnectionWrapper sqlConnectionWrapper = await _sqlConnectionWrapperFactory.ObtainSqlConnectionWrapperAsync(cancellationToken, true))
using (SqlCommandWrapper sqlCommandWrapper = sqlConnectionWrapper.CreateSqlCommand())
{
VLatest.GetReindexJobById.PopulateCommand(sqlCommandWrapper, id);
using (SqlDataReader sqlDataReader = await sqlCommandWrapper.ExecuteReaderAsync(CommandBehavior.SequentialAccess, cancellationToken))
{
if (!sqlDataReader.Read())
{
throw new JobNotFoundException(string.Format(Core.Resources.JobNotFound, id));
}
(string rawJobRecord, byte[] rowVersion) = sqlDataReader.ReadRow(VLatest.ReindexJob.RawJobRecord, VLatest.ReindexJob.JobVersion);
return CreateReindexJobWrapper(rawJobRecord, rowVersion);
}
}
}
public async Task<ReindexJobWrapper> UpdateReindexJobAsync(ReindexJobRecord jobRecord, WeakETag eTag, CancellationToken cancellationToken)
{
EnsureArg.IsNotNull(jobRecord, nameof(jobRecord));
byte[] rowVersionAsBytes = GetRowVersionAsBytes(eTag);
using (SqlConnectionWrapper sqlConnectionWrapper = await _sqlConnectionWrapperFactory.ObtainSqlConnectionWrapperAsync(cancellationToken, true))
using (SqlCommandWrapper sqlCommandWrapper = sqlConnectionWrapper.CreateSqlCommand())
{
VLatest.UpdateReindexJob.PopulateCommand(
sqlCommandWrapper,
jobRecord.Id,
jobRecord.Status.ToString(),
JsonConvert.SerializeObject(jobRecord, _jsonSerializerSettings),
rowVersionAsBytes);
try
{
var rowVersion = (byte[])await sqlCommandWrapper.ExecuteScalarAsync(cancellationToken);
if (rowVersion.NullIfEmpty() == null)
{
throw new OperationFailedException(string.Format(Core.Resources.OperationFailed, OperationsConstants.Reindex, "Failed to create reindex job because no row version was returned."), HttpStatusCode.InternalServerError);
}
return new ReindexJobWrapper(jobRecord, GetRowVersionAsEtag(rowVersion));
}
catch (SqlException e)
{
if (e.Number == SqlErrorCodes.PreconditionFailed)
{
throw new JobConflictException();
}
else if (e.Number == SqlErrorCodes.NotFound)
{
throw new JobNotFoundException(string.Format(Core.Resources.JobNotFound, jobRecord.Id));
}
else
{
_logger.LogError(e, "Error from SQL database on reindex job update.");
throw;
}
}
}
}
public async Task<IReadOnlyCollection<ReindexJobWrapper>> AcquireReindexJobsAsync(ushort maximumNumberOfConcurrentJobsAllowed, TimeSpan jobHeartbeatTimeoutThreshold, CancellationToken cancellationToken)
{
using (SqlConnectionWrapper sqlConnectionWrapper = await _sqlConnectionWrapperFactory.ObtainSqlConnectionWrapperAsync(cancellationToken, true))
using (SqlCommandWrapper sqlCommandWrapper = sqlConnectionWrapper.CreateSqlCommand())
{
var jobHeartbeatTimeoutThresholdInSeconds = Convert.ToInt64(jobHeartbeatTimeoutThreshold.TotalSeconds);
VLatest.AcquireReindexJobs.PopulateCommand(
sqlCommandWrapper,
jobHeartbeatTimeoutThresholdInSeconds,
maximumNumberOfConcurrentJobsAllowed);
var acquiredJobs = new List<ReindexJobWrapper>();
using (SqlDataReader sqlDataReader = await sqlCommandWrapper.ExecuteReaderAsync(CommandBehavior.SequentialAccess, cancellationToken))
{
while (await sqlDataReader.ReadAsync(cancellationToken))
{
(string rawJobRecord, byte[] rowVersion) = sqlDataReader.ReadRow(VLatest.ReindexJob.RawJobRecord, VLatest.ReindexJob.JobVersion);
acquiredJobs.Add(CreateReindexJobWrapper(rawJobRecord, rowVersion));
}
}
return acquiredJobs;
}
}
public async Task<(bool found, string id)> CheckActiveReindexJobsAsync(CancellationToken cancellationToken)
{
using (SqlConnectionWrapper sqlConnectionWrapper = await _sqlConnectionWrapperFactory.ObtainSqlConnectionWrapperAsync(cancellationToken, true))
using (SqlCommandWrapper sqlCommandWrapper = sqlConnectionWrapper.CreateSqlCommand())
{
VLatest.CheckActiveReindexJobs.PopulateCommand(sqlCommandWrapper);
var activeJobs = new List<string>();
using (SqlDataReader sqlDataReader = await sqlCommandWrapper.ExecuteReaderAsync(CommandBehavior.SequentialAccess, cancellationToken))
{
while (await sqlDataReader.ReadAsync(cancellationToken))
{
string id = sqlDataReader.ReadRow(VLatest.ReindexJob.Id);
activeJobs.Add(id);
}
}
// Currently, there can only be one active reindex job at a time.
return (activeJobs.Count > 0, activeJobs.Count > 0 ? activeJobs.FirstOrDefault() : string.Empty);
}
}
private ExportJobOutcome CreateExportJobOutcome(string rawJobRecord, byte[] rowVersionAsBytes)
@ -236,24 +373,13 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Storage
return versionAsBytes;
}
public Task<ReindexJobWrapper> CreateReindexJobAsync(ReindexJobRecord jobRecord, CancellationToken cancellationToken)
private ReindexJobWrapper CreateReindexJobWrapper(string rawJobRecord, byte[] rowVersionAsBytes)
{
throw new NotImplementedException();
}
var reindexJobRecord = JsonConvert.DeserializeObject<ReindexJobRecord>(rawJobRecord, _jsonSerializerSettings);
public Task<ReindexJobWrapper> UpdateReindexJobAsync(ReindexJobRecord jobRecord, WeakETag eTag, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
WeakETag etag = GetRowVersionAsEtag(rowVersionAsBytes);
public Task<ReindexJobWrapper> GetReindexJobByIdAsync(string jobId, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
public Task<(bool found, string id)> CheckActiveReindexJobsAsync(CancellationToken cancellationToken)
{
throw new NotImplementedException();
return new ReindexJobWrapper(reindexJobRecord, etag);
}
}
}

Просмотреть файл

@ -0,0 +1,114 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using EnsureThat;
using Microsoft.Health.Fhir.Core.Features.Persistence;
using Microsoft.Health.Fhir.Core.Models;
using Microsoft.Health.Fhir.SqlServer.Features.Schema.Model;
using Microsoft.Health.SqlServer.Features.Schema.Model;
namespace Microsoft.Health.Fhir.SqlServer.Features.Storage.TvpRowGeneration
{
internal class BulkCompartmentAssignmentV1RowGenerator : ITableValuedParameterRowGenerator<IReadOnlyList<ResourceWrapper>, BulkCompartmentAssignmentTableTypeV1Row>
{
private readonly SqlServerFhirModel _model;
private readonly SearchParameterToSearchValueTypeMap _searchParameterTypeMap;
private bool _initialized;
private byte _patientCompartmentId;
private byte _encounterCompartmentId;
private byte _relatedPersonCompartmentId;
private byte _practitionerCompartmentId;
private byte _deviceCompartmentId;
public BulkCompartmentAssignmentV1RowGenerator(SqlServerFhirModel model, SearchParameterToSearchValueTypeMap searchParameterTypeMap)
{
EnsureArg.IsNotNull(model, nameof(model));
EnsureArg.IsNotNull(searchParameterTypeMap, nameof(searchParameterTypeMap));
_model = model;
_searchParameterTypeMap = searchParameterTypeMap;
}
public IEnumerable<BulkCompartmentAssignmentTableTypeV1Row> GenerateRows(IReadOnlyList<ResourceWrapper> resources)
{
EnsureInitialized();
for (var index = 0; index < resources.Count; index++)
{
ResourceWrapper resource = resources[index];
var resourceMetadata = new ResourceMetadata(
resource.CompartmentIndices,
resource.SearchIndices?.ToLookup(e => _searchParameterTypeMap.GetSearchValueType(e)),
resource.LastModifiedClaims);
CompartmentIndices compartments = resourceMetadata.Compartments;
if (compartments == null)
{
yield break;
}
if (compartments.PatientCompartmentEntry != null)
{
foreach (var entry in compartments.PatientCompartmentEntry)
{
yield return new BulkCompartmentAssignmentTableTypeV1Row(index, _patientCompartmentId, entry);
}
}
if (compartments.EncounterCompartmentEntry != null)
{
foreach (var entry in compartments.EncounterCompartmentEntry)
{
yield return new BulkCompartmentAssignmentTableTypeV1Row(index, _encounterCompartmentId, entry);
}
}
if (compartments.RelatedPersonCompartmentEntry != null)
{
foreach (var entry in compartments.RelatedPersonCompartmentEntry)
{
yield return new BulkCompartmentAssignmentTableTypeV1Row(index, _relatedPersonCompartmentId, entry);
}
}
if (compartments.PractitionerCompartmentEntry != null)
{
foreach (var entry in compartments.PractitionerCompartmentEntry)
{
yield return new BulkCompartmentAssignmentTableTypeV1Row(index, _practitionerCompartmentId, entry);
}
}
if (compartments.DeviceCompartmentEntry != null)
{
foreach (var entry in compartments.DeviceCompartmentEntry)
{
yield return new BulkCompartmentAssignmentTableTypeV1Row(index, _deviceCompartmentId, entry);
}
}
}
}
private void EnsureInitialized()
{
if (Volatile.Read(ref _initialized))
{
return;
}
_patientCompartmentId = _model.GetCompartmentTypeId(KnownCompartmentTypes.Patient);
_encounterCompartmentId = _model.GetCompartmentTypeId(KnownCompartmentTypes.Encounter);
_relatedPersonCompartmentId = _model.GetCompartmentTypeId(KnownCompartmentTypes.RelatedPerson);
_practitionerCompartmentId = _model.GetCompartmentTypeId(KnownCompartmentTypes.Practitioner);
_deviceCompartmentId = _model.GetCompartmentTypeId(KnownCompartmentTypes.Device);
Volatile.Write(ref _initialized, true);
}
}
}

Просмотреть файл

@ -0,0 +1,83 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Runtime.CompilerServices;
using Microsoft.Health.Core.Extensions;
using Microsoft.Health.Fhir.Core.Features.Search;
using Microsoft.Health.Fhir.Core.Features.Search.SearchValues;
using LinqExpression = System.Linq.Expressions.Expression;
namespace Microsoft.Health.Fhir.SqlServer.Features.Storage.TvpRowGeneration
{
internal abstract class BulkCompositeSearchParameterRowGenerator<TSearchValue, TRow> : BulkSearchParameterRowGenerator<TSearchValue, TRow>
where TSearchValue : ITuple
where TRow : struct
{
private readonly Func<EnumeratorWrapper<ISearchValue>, TSearchValue> _converter = CreateConverterFunc();
protected BulkCompositeSearchParameterRowGenerator(SqlServerFhirModel model, SearchParameterToSearchValueTypeMap searchParameterTypeMap)
: base(model, searchParameterTypeMap)
{
}
protected override IEnumerable<TSearchValue> ConvertSearchValue(SearchIndexEntry entry)
{
var compositeSearchValue = (CompositeSearchValue)entry.Value;
foreach (var components in compositeSearchValue.Components.CartesianProduct())
{
using (IEnumerator<ISearchValue> enumerator = components.GetEnumerator())
{
yield return _converter(new EnumeratorWrapper<ISearchValue>(enumerator));
}
}
}
/// <summary>
/// Creates a function that takes the components of a composite search parameter as an
/// enumerator and creates a ValueTuple with fields for each component.
/// </summary>
/// <returns>The generated function.</returns>
private static Func<EnumeratorWrapper<ISearchValue>, TSearchValue> CreateConverterFunc()
{
var parameter = LinqExpression.Parameter(typeof(EnumeratorWrapper<ISearchValue>));
MethodInfo nextValueMethod = parameter.Type.GetMethod(nameof(EnumeratorWrapper<ISearchValue>.NextValue));
ConstructorInfo constructorInfo = typeof(TSearchValue).GetConstructors().Single();
return LinqExpression.Lambda<Func<EnumeratorWrapper<ISearchValue>, TSearchValue>>(
LinqExpression.New(
constructorInfo,
constructorInfo.GetParameters().Select(p => LinqExpression.Convert(
LinqExpression.Call(parameter, nextValueMethod),
p.ParameterType))),
parameter).Compile();
}
/// <summary>
/// Helper class to make the generated code in <see cref="CreateConverterFunc"/>
/// a little simpler.
/// </summary>
/// <typeparam name="T">The element type</typeparam>
private struct EnumeratorWrapper<T>
{
private readonly IEnumerator<T> _enumerator;
public EnumeratorWrapper(IEnumerator<T> enumerator)
{
_enumerator = enumerator;
}
public T NextValue()
{
_enumerator.MoveNext();
return _enumerator.Current;
}
}
}
}

Просмотреть файл

@ -0,0 +1,46 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------
using System;
using Microsoft.Health.Fhir.Core.Features.Search;
using Microsoft.Health.Fhir.Core.Features.Search.SearchValues;
using Microsoft.Health.Fhir.SqlServer.Features.Schema.Model;
namespace Microsoft.Health.Fhir.SqlServer.Features.Storage.TvpRowGeneration
{
internal class BulkDateTimeSearchParameterV1RowGenerator : BulkSearchParameterRowGenerator<DateTimeSearchValue, BulkDateTimeSearchParamTableTypeV1Row>
{
private short _lastUpdatedSearchParamId;
public BulkDateTimeSearchParameterV1RowGenerator(SqlServerFhirModel model, SearchParameterToSearchValueTypeMap searchParameterTypeMap)
: base(model, searchParameterTypeMap)
{
}
internal override bool TryGenerateRow(int offset, short searchParamId, DateTimeSearchValue searchValue, out BulkDateTimeSearchParamTableTypeV1Row row)
{
if (searchParamId == _lastUpdatedSearchParamId)
{
// this value is already stored on the Resource table.
row = default;
return false;
}
row = new BulkDateTimeSearchParamTableTypeV1Row(
offset,
searchParamId,
searchValue.Start,
searchValue.End,
(searchValue.Start - searchValue.End).Ticks > TimeSpan.TicksPerDay);
return true;
}
protected override void Initialize()
{
_lastUpdatedSearchParamId = Model.GetSearchParamId(SearchParameterNames.LastUpdatedUri);
}
}
}

Просмотреть файл

@ -0,0 +1,32 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------
using Microsoft.Health.Fhir.Core.Features.Search.SearchValues;
using Microsoft.Health.Fhir.SqlServer.Features.Schema.Model;
namespace Microsoft.Health.Fhir.SqlServer.Features.Storage.TvpRowGeneration
{
internal class BulkNumberSearchParameterV1RowGenerator : BulkSearchParameterRowGenerator<NumberSearchValue, BulkNumberSearchParamTableTypeV1Row>
{
public BulkNumberSearchParameterV1RowGenerator(SqlServerFhirModel model, SearchParameterToSearchValueTypeMap searchParameterTypeMap)
: base(model, searchParameterTypeMap)
{
}
internal override bool TryGenerateRow(int offset, short searchParamId, NumberSearchValue searchValue, out BulkNumberSearchParamTableTypeV1Row row)
{
bool isSingleValue = searchValue.Low == searchValue.High;
row = new BulkNumberSearchParamTableTypeV1Row(
offset,
searchParamId,
isSingleValue ? searchValue.Low : null,
isSingleValue ? null : searchValue.Low ?? (decimal?)VLatest.NumberSearchParam.LowValue.MinValue,
isSingleValue ? null : searchValue.High ?? (decimal?)VLatest.NumberSearchParam.HighValue.MaxValue);
return true;
}
}
}

Просмотреть файл

@ -0,0 +1,34 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------
using Microsoft.Health.Fhir.Core.Features.Search.SearchValues;
using Microsoft.Health.Fhir.SqlServer.Features.Schema.Model;
namespace Microsoft.Health.Fhir.SqlServer.Features.Storage.TvpRowGeneration
{
internal class BulkQuantitySearchParameterV1RowGenerator : BulkSearchParameterRowGenerator<QuantitySearchValue, BulkQuantitySearchParamTableTypeV1Row>
{
public BulkQuantitySearchParameterV1RowGenerator(SqlServerFhirModel model, SearchParameterToSearchValueTypeMap searchParameterTypeMap)
: base(model, searchParameterTypeMap)
{
}
internal override bool TryGenerateRow(int offset, short searchParamId, QuantitySearchValue searchValue, out BulkQuantitySearchParamTableTypeV1Row row)
{
bool isSingleValue = searchValue.Low == searchValue.High;
row = new BulkQuantitySearchParamTableTypeV1Row(
offset,
searchParamId,
string.IsNullOrWhiteSpace(searchValue.System) ? default(int?) : Model.GetSystemId(searchValue.System),
string.IsNullOrWhiteSpace(searchValue.Code) ? default(int?) : Model.GetQuantityCodeId(searchValue.Code),
isSingleValue ? searchValue.Low : null,
isSingleValue ? null : searchValue.Low ?? (decimal?)VLatest.QuantitySearchParam.LowValue.MinValue,
isSingleValue ? null : searchValue.High ?? (decimal?)VLatest.QuantitySearchParam.HighValue.MaxValue);
return true;
}
}
}

Просмотреть файл

@ -0,0 +1,31 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------
using Microsoft.Health.Fhir.Core.Features.Search.SearchValues;
using Microsoft.Health.Fhir.SqlServer.Features.Schema.Model;
namespace Microsoft.Health.Fhir.SqlServer.Features.Storage.TvpRowGeneration
{
internal class BulkReferenceSearchParameterV1RowGenerator : BulkSearchParameterRowGenerator<ReferenceSearchValue, BulkReferenceSearchParamTableTypeV1Row>
{
public BulkReferenceSearchParameterV1RowGenerator(SqlServerFhirModel model, SearchParameterToSearchValueTypeMap searchParameterTypeMap)
: base(model, searchParameterTypeMap)
{
}
internal override bool TryGenerateRow(int offset, short searchParamId, ReferenceSearchValue searchValue, out BulkReferenceSearchParamTableTypeV1Row row)
{
row = new BulkReferenceSearchParamTableTypeV1Row(
offset,
searchParamId,
searchValue.BaseUri?.ToString(),
searchValue.ResourceType == null ? (short?)null : Model.GetResourceTypeId(searchValue.ResourceType),
searchValue.ResourceId,
ReferenceResourceVersion: null);
return true;
}
}
}

Просмотреть файл

@ -0,0 +1,49 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------
using Microsoft.Health.Fhir.Core.Features.Search.SearchValues;
using Microsoft.Health.Fhir.SqlServer.Features.Schema.Model;
namespace Microsoft.Health.Fhir.SqlServer.Features.Storage.TvpRowGeneration
{
internal class BulkReferenceTokenCompositeSearchParameterV1RowGenerator : BulkCompositeSearchParameterRowGenerator<(ReferenceSearchValue component1, TokenSearchValue component2), BulkReferenceTokenCompositeSearchParamTableTypeV1Row>
{
private readonly BulkReferenceSearchParameterV1RowGenerator _referenceRowGenerator;
private readonly BulkTokenSearchParameterV1RowGenerator _tokenRowGenerator;
public BulkReferenceTokenCompositeSearchParameterV1RowGenerator(
SqlServerFhirModel model,
BulkReferenceSearchParameterV1RowGenerator referenceRowGenerator,
BulkTokenSearchParameterV1RowGenerator tokenRowGenerator,
SearchParameterToSearchValueTypeMap searchParameterTypeMap)
: base(model, searchParameterTypeMap)
{
_referenceRowGenerator = referenceRowGenerator;
_tokenRowGenerator = tokenRowGenerator;
}
internal override bool TryGenerateRow(int offset, short searchParamId, (ReferenceSearchValue component1, TokenSearchValue component2) searchValue, out BulkReferenceTokenCompositeSearchParamTableTypeV1Row row)
{
if (_referenceRowGenerator.TryGenerateRow(offset, searchParamId, searchValue.component1, out var reference1Row) &&
_tokenRowGenerator.TryGenerateRow(offset, searchParamId, searchValue.component2, out var token2Row))
{
row = new BulkReferenceTokenCompositeSearchParamTableTypeV1Row(
offset,
searchParamId,
reference1Row.BaseUri,
reference1Row.ReferenceResourceTypeId,
reference1Row.ReferenceResourceId,
reference1Row.ReferenceResourceVersion,
token2Row.SystemId,
token2Row.Code);
return true;
}
row = default;
return false;
}
}
}

Просмотреть файл

@ -0,0 +1,48 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------
using System.Collections.Generic;
using EnsureThat;
using Microsoft.Health.Fhir.Core.Features.Persistence;
using Microsoft.Health.Fhir.SqlServer.Features.Schema.Model;
using Microsoft.Health.SqlServer.Features.Schema.Model;
namespace Microsoft.Health.Fhir.SqlServer.Features.Storage.TvpRowGeneration
{
internal class BulkReindexResourceV1RowGenerator : ITableValuedParameterRowGenerator<IReadOnlyList<ResourceWrapper>, BulkReindexResourceTableTypeV1Row>
{
private readonly SqlServerFhirModel _model;
public BulkReindexResourceV1RowGenerator(SqlServerFhirModel model)
{
EnsureArg.IsNotNull(model, nameof(model));
_model = model;
}
public IEnumerable<BulkReindexResourceTableTypeV1Row> GenerateRows(IReadOnlyList<ResourceWrapper> input)
{
for (var index = 0; index < input.Count; index++)
{
ResourceWrapper resource = input[index];
var resourceTypeId = _model.GetResourceTypeId(resource.ResourceTypeName);
var resourceId = resource.ResourceId;
int etag = 0;
if (resource.Version != null && !int.TryParse(resource.Version, out etag))
{
// Set the etag to a sentinel value to enable expected failure paths when updating with both existing and nonexistent resources.
etag = -1;
}
yield return new BulkReindexResourceTableTypeV1Row(
index,
resourceTypeId,
resourceId,
resource.Version == null ? null : (int?)etag,
resource.SearchParameterHash);
}
}
}
}

Просмотреть файл

@ -0,0 +1,53 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------
using System.Collections.Generic;
using System.Linq;
using EnsureThat;
using Microsoft.Health.Fhir.Core.Features.Persistence;
using Microsoft.Health.Fhir.SqlServer.Features.Schema.Model;
using Microsoft.Health.SqlServer.Features.Schema.Model;
namespace Microsoft.Health.Fhir.SqlServer.Features.Storage.TvpRowGeneration
{
internal class BulkResourceWriteClaimV1RowGenerator : ITableValuedParameterRowGenerator<IReadOnlyList<ResourceWrapper>, BulkResourceWriteClaimTableTypeV1Row>
{
private readonly SqlServerFhirModel _model;
private readonly SearchParameterToSearchValueTypeMap _searchParameterTypeMap;
public BulkResourceWriteClaimV1RowGenerator(SqlServerFhirModel model, SearchParameterToSearchValueTypeMap searchParameterTypeMap)
{
EnsureArg.IsNotNull(model, nameof(model));
EnsureArg.IsNotNull(searchParameterTypeMap, nameof(searchParameterTypeMap));
_model = model;
_searchParameterTypeMap = searchParameterTypeMap;
}
public IEnumerable<BulkResourceWriteClaimTableTypeV1Row> GenerateRows(IReadOnlyList<ResourceWrapper> resources)
{
for (var index = 0; index < resources.Count; index++)
{
ResourceWrapper resource = resources[index];
var resourceMetadata = new ResourceMetadata(
resource.CompartmentIndices,
resource.SearchIndices?.ToLookup(e => _searchParameterTypeMap.GetSearchValueType(e)),
resource.LastModifiedClaims);
IReadOnlyCollection<KeyValuePair<string, string>> writeClaims = resourceMetadata.WriteClaims;
if (writeClaims == null)
{
yield break;
}
foreach (var claim in writeClaims)
{
yield return new BulkResourceWriteClaimTableTypeV1Row(index, _model.GetClaimTypeId(claim.Key), claim.Value);
}
}
}
}
}

Просмотреть файл

@ -0,0 +1,95 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using EnsureThat;
using Microsoft.Health.Fhir.Core.Features.Persistence;
using Microsoft.Health.Fhir.Core.Features.Search;
using Microsoft.Health.SqlServer.Features.Schema.Model;
namespace Microsoft.Health.Fhir.SqlServer.Features.Storage.TvpRowGeneration
{
internal abstract class BulkSearchParameterRowGenerator<TSearchValue, TRow> : ITableValuedParameterRowGenerator<IReadOnlyList<ResourceWrapper>, TRow>
where TRow : struct
{
private readonly SearchParameterToSearchValueTypeMap _searchParameterTypeMap;
private readonly bool _isConvertSearchValueOverridden;
private bool _isInitialized;
protected BulkSearchParameterRowGenerator(SqlServerFhirModel model, SearchParameterToSearchValueTypeMap searchParameterTypeMap)
{
EnsureArg.IsNotNull(model, nameof(model));
EnsureArg.IsNotNull(searchParameterTypeMap, nameof(searchParameterTypeMap));
Model = model;
_searchParameterTypeMap = searchParameterTypeMap;
_isConvertSearchValueOverridden = GetType().GetMethod(nameof(ConvertSearchValue), BindingFlags.Instance | BindingFlags.NonPublic).DeclaringType != typeof(SearchParameterRowGenerator<TSearchValue, TRow>);
}
protected SqlServerFhirModel Model { get; }
public virtual IEnumerable<TRow> GenerateRows(IReadOnlyList<ResourceWrapper> input)
{
EnsureInitialized();
for (var index = 0; index < input.Count; index++)
{
ResourceWrapper resource = input[index];
var resourceMetadata = new ResourceMetadata(
resource.CompartmentIndices,
resource.SearchIndices?.ToLookup(e => _searchParameterTypeMap.GetSearchValueType(e)),
resource.LastModifiedClaims);
foreach (SearchIndexEntry v in resourceMetadata.GetSearchIndexEntriesByType(typeof(TSearchValue)))
{
short searchParamId = Model.GetSearchParamId(v.SearchParameter.Url);
if (!_isConvertSearchValueOverridden)
{
// save an array allocation
if (TryGenerateRow(index, searchParamId, (TSearchValue)v.Value, out TRow row))
{
yield return row;
}
}
else
{
foreach (var searchValue in ConvertSearchValue(v))
{
if (TryGenerateRow(index, searchParamId, searchValue, out TRow row))
{
yield return row;
}
}
}
}
}
}
private void EnsureInitialized()
{
if (Volatile.Read(ref _isInitialized))
{
return;
}
Initialize();
Volatile.Write(ref _isInitialized, true);
}
protected virtual IEnumerable<TSearchValue> ConvertSearchValue(SearchIndexEntry entry) => new[] { (TSearchValue)entry.Value };
protected virtual void Initialize()
{
}
internal abstract bool TryGenerateRow(int offset, short searchParamId, TSearchValue searchValue, out TRow row);
}
}

Просмотреть файл

@ -0,0 +1,40 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------
using Microsoft.Health.Fhir.Core.Features.Search.SearchValues;
using Microsoft.Health.Fhir.SqlServer.Features.Schema.Model;
namespace Microsoft.Health.Fhir.SqlServer.Features.Storage.TvpRowGeneration
{
internal class BulkStringSearchParameterV1RowGenerator : BulkSearchParameterRowGenerator<StringSearchValue, BulkStringSearchParamTableTypeV1Row>
{
private readonly int _indexedTextMaxLength = (int)VLatest.StringSearchParam.Text.Metadata.MaxLength;
public BulkStringSearchParameterV1RowGenerator(SqlServerFhirModel model, SearchParameterToSearchValueTypeMap searchParameterTypeMap)
: base(model, searchParameterTypeMap)
{
}
internal override bool TryGenerateRow(int offset, short searchParamId, StringSearchValue searchValue, out BulkStringSearchParamTableTypeV1Row row)
{
string indexedPrefix;
string overflow;
if (searchValue.String.Length > _indexedTextMaxLength)
{
// TODO: this truncation can break apart grapheme clusters.
indexedPrefix = searchValue.String.Substring(0, _indexedTextMaxLength);
overflow = searchValue.String;
}
else
{
indexedPrefix = searchValue.String;
overflow = null;
}
row = new BulkStringSearchParamTableTypeV1Row(offset, searchParamId, indexedPrefix, overflow);
return true;
}
}
}

Просмотреть файл

@ -0,0 +1,52 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------
using Microsoft.Health.Fhir.Core.Features.Search.SearchValues;
using Microsoft.Health.Fhir.SqlServer.Features.Schema.Model;
namespace Microsoft.Health.Fhir.SqlServer.Features.Storage.TvpRowGeneration
{
internal class BulkTokenDateTimeCompositeSearchParameterV1RowGenerator : BulkCompositeSearchParameterRowGenerator<(TokenSearchValue component1, DateTimeSearchValue component2), BulkTokenDateTimeCompositeSearchParamTableTypeV1Row>
{
private readonly BulkTokenSearchParameterV1RowGenerator _tokenRowGenerator;
private readonly BulkDateTimeSearchParameterV1RowGenerator _dateTimeV1RowGenerator;
public BulkTokenDateTimeCompositeSearchParameterV1RowGenerator(
SqlServerFhirModel model,
BulkTokenSearchParameterV1RowGenerator tokenRowGenerator,
BulkDateTimeSearchParameterV1RowGenerator dateTimeV1RowGenerator,
SearchParameterToSearchValueTypeMap searchParameterTypeMap)
: base(model, searchParameterTypeMap)
{
_tokenRowGenerator = tokenRowGenerator;
_dateTimeV1RowGenerator = dateTimeV1RowGenerator;
}
internal override bool TryGenerateRow(
int offset,
short searchParamId,
(TokenSearchValue component1, DateTimeSearchValue component2) searchValue,
out BulkTokenDateTimeCompositeSearchParamTableTypeV1Row row)
{
if (_tokenRowGenerator.TryGenerateRow(offset, default, searchValue.component1, out var token1Row) &&
_dateTimeV1RowGenerator.TryGenerateRow(offset, default, searchValue.component2, out var token2Row))
{
row = new BulkTokenDateTimeCompositeSearchParamTableTypeV1Row(
offset,
searchParamId,
token1Row.SystemId,
token1Row.Code,
token2Row.StartDateTime,
token2Row.EndDateTime,
token2Row.IsLongerThanADay);
return true;
}
row = default;
return false;
}
}
}

Просмотреть файл

@ -0,0 +1,54 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------
using Microsoft.Health.Fhir.Core.Features.Search.SearchValues;
using Microsoft.Health.Fhir.SqlServer.Features.Schema.Model;
namespace Microsoft.Health.Fhir.SqlServer.Features.Storage.TvpRowGeneration
{
internal class BulkTokenNumberNumberCompositeSearchParameterV1RowGenerator : BulkCompositeSearchParameterRowGenerator<(TokenSearchValue component1, NumberSearchValue component2, NumberSearchValue component3), BulkTokenNumberNumberCompositeSearchParamTableTypeV1Row>
{
private readonly BulkTokenSearchParameterV1RowGenerator _tokenRowGenerator;
private readonly BulkNumberSearchParameterV1RowGenerator _numberV1RowGenerator;
public BulkTokenNumberNumberCompositeSearchParameterV1RowGenerator(SqlServerFhirModel model, BulkTokenSearchParameterV1RowGenerator tokenRowGenerator, BulkNumberSearchParameterV1RowGenerator numberV1RowGenerator, SearchParameterToSearchValueTypeMap searchParameterTypeMap)
: base(model, searchParameterTypeMap)
{
_tokenRowGenerator = tokenRowGenerator;
_numberV1RowGenerator = numberV1RowGenerator;
}
internal override bool TryGenerateRow(
int offset,
short searchParamId,
(TokenSearchValue component1, NumberSearchValue component2, NumberSearchValue component3) searchValue,
out BulkTokenNumberNumberCompositeSearchParamTableTypeV1Row row)
{
if (_tokenRowGenerator.TryGenerateRow(default, default, searchValue.component1, out var token1Row) &&
_numberV1RowGenerator.TryGenerateRow(default, default, searchValue.component2, out var token2Row) &&
_numberV1RowGenerator.TryGenerateRow(default, default, searchValue.component3, out var token3Row))
{
bool hasRange = token2Row.SingleValue == null || token3Row.SingleValue == null;
row = new BulkTokenNumberNumberCompositeSearchParamTableTypeV1Row(
offset,
searchParamId,
token1Row.SystemId,
token1Row.Code,
hasRange ? null : token2Row.SingleValue,
token2Row.LowValue ?? token2Row.SingleValue,
token2Row.HighValue ?? token2Row.SingleValue,
hasRange ? null : token3Row.SingleValue,
token3Row.LowValue ?? token3Row.SingleValue,
token3Row.HighValue ?? token3Row.SingleValue,
HasRange: hasRange);
return true;
}
row = default;
return false;
}
}
}

Просмотреть файл

@ -0,0 +1,54 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------
using Microsoft.Health.Fhir.Core.Features.Search.SearchValues;
using Microsoft.Health.Fhir.SqlServer.Features.Schema.Model;
namespace Microsoft.Health.Fhir.SqlServer.Features.Storage.TvpRowGeneration
{
internal class BulkTokenQuantityCompositeSearchParameterV1RowGenerator : BulkCompositeSearchParameterRowGenerator<(TokenSearchValue component1, QuantitySearchValue component2), BulkTokenQuantityCompositeSearchParamTableTypeV1Row>
{
private readonly BulkTokenSearchParameterV1RowGenerator _tokenRowGenerator;
private readonly BulkQuantitySearchParameterV1RowGenerator _quantityV1RowGenerator;
public BulkTokenQuantityCompositeSearchParameterV1RowGenerator(
SqlServerFhirModel model,
BulkTokenSearchParameterV1RowGenerator tokenRowGenerator,
BulkQuantitySearchParameterV1RowGenerator quantityV1RowGenerator,
SearchParameterToSearchValueTypeMap searchParameterTypeMap)
: base(model, searchParameterTypeMap)
{
_tokenRowGenerator = tokenRowGenerator;
_quantityV1RowGenerator = quantityV1RowGenerator;
}
internal override bool TryGenerateRow(
int offset,
short searchParamId,
(TokenSearchValue component1, QuantitySearchValue component2) searchValue,
out BulkTokenQuantityCompositeSearchParamTableTypeV1Row row)
{
if (_tokenRowGenerator.TryGenerateRow(default, default, searchValue.component1, out var token1Row) &&
_quantityV1RowGenerator.TryGenerateRow(default, default, searchValue.component2, out var token2Row))
{
row = new BulkTokenQuantityCompositeSearchParamTableTypeV1Row(
offset,
searchParamId,
token1Row.SystemId,
token1Row.Code,
token2Row.SystemId,
token2Row.QuantityCodeId,
token2Row.SingleValue,
token2Row.LowValue,
token2Row.HighValue);
return true;
}
row = default;
return false;
}
}
}

Просмотреть файл

@ -0,0 +1,42 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------
using Microsoft.Health.Fhir.Core.Features.Search;
using Microsoft.Health.Fhir.Core.Features.Search.SearchValues;
using Microsoft.Health.Fhir.SqlServer.Features.Schema.Model;
namespace Microsoft.Health.Fhir.SqlServer.Features.Storage.TvpRowGeneration
{
internal class BulkTokenSearchParameterV1RowGenerator : BulkSearchParameterRowGenerator<TokenSearchValue, BulkTokenSearchParamTableTypeV1Row>
{
private short _resourceIdSearchParamId;
public BulkTokenSearchParameterV1RowGenerator(SqlServerFhirModel model, SearchParameterToSearchValueTypeMap searchParameterTypeMap)
: base(model, searchParameterTypeMap)
{
}
internal override bool TryGenerateRow(int offset, short searchParamId, TokenSearchValue searchValue, out BulkTokenSearchParamTableTypeV1Row row)
{
// don't store if the code is empty or if this is the Resource _id parameter. The id is already maintained on the Resource table.
if (string.IsNullOrWhiteSpace(searchValue.Code) ||
searchParamId == _resourceIdSearchParamId)
{
row = default;
return false;
}
row = new BulkTokenSearchParamTableTypeV1Row(
offset,
searchParamId,
searchValue.System == null ? (int?)null : Model.GetSystemId(searchValue.System),
searchValue.Code);
return true;
}
protected override void Initialize() => _resourceIdSearchParamId = Model.GetSearchParamId(SearchParameterNames.IdUri);
}
}

Просмотреть файл

@ -0,0 +1,51 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------
using Microsoft.Health.Fhir.Core.Features.Search.SearchValues;
using Microsoft.Health.Fhir.SqlServer.Features.Schema.Model;
namespace Microsoft.Health.Fhir.SqlServer.Features.Storage.TvpRowGeneration
{
internal class BulkTokenStringCompositeSearchParameterV1RowGenerator : BulkCompositeSearchParameterRowGenerator<(TokenSearchValue component1, StringSearchValue component2), BulkTokenStringCompositeSearchParamTableTypeV1Row>
{
private readonly BulkTokenSearchParameterV1RowGenerator _tokenRowGenerator;
private readonly BulkStringSearchParameterV1RowGenerator _stringV1RowGenerator;
public BulkTokenStringCompositeSearchParameterV1RowGenerator(
SqlServerFhirModel model,
BulkTokenSearchParameterV1RowGenerator tokenRowGenerator,
BulkStringSearchParameterV1RowGenerator stringV1RowGenerator,
SearchParameterToSearchValueTypeMap searchParameterTypeMap)
: base(model, searchParameterTypeMap)
{
_tokenRowGenerator = tokenRowGenerator;
_stringV1RowGenerator = stringV1RowGenerator;
}
internal override bool TryGenerateRow(
int offset,
short searchParamId,
(TokenSearchValue component1, StringSearchValue component2) searchValue,
out BulkTokenStringCompositeSearchParamTableTypeV1Row row)
{
if (_tokenRowGenerator.TryGenerateRow(default, default, searchValue.component1, out var token1Row) &&
_stringV1RowGenerator.TryGenerateRow(default, default, searchValue.component2, out var string2Row))
{
row = new BulkTokenStringCompositeSearchParamTableTypeV1Row(
offset,
searchParamId,
token1Row.SystemId,
token1Row.Code,
string2Row.Text,
TextOverflow2: string2Row.TextOverflow);
return true;
}
row = default;
return false;
}
}
}

Просмотреть файл

@ -0,0 +1,30 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------
using Microsoft.Health.Fhir.Core.Features.Search.SearchValues;
using Microsoft.Health.Fhir.SqlServer.Features.Schema.Model;
namespace Microsoft.Health.Fhir.SqlServer.Features.Storage.TvpRowGeneration
{
internal class BulkTokenTextSearchParameterV1RowGenerator : BulkSearchParameterRowGenerator<TokenSearchValue, BulkTokenTextTableTypeV1Row>
{
public BulkTokenTextSearchParameterV1RowGenerator(SqlServerFhirModel model, SearchParameterToSearchValueTypeMap searchParameterTypeMap)
: base(model, searchParameterTypeMap)
{
}
internal override bool TryGenerateRow(int offset, short searchParamId, TokenSearchValue searchValue, out BulkTokenTextTableTypeV1Row row)
{
if (string.IsNullOrWhiteSpace(searchValue.Text))
{
row = default;
return false;
}
row = new BulkTokenTextTableTypeV1Row(offset, searchParamId, searchValue.Text);
return true;
}
}
}

Просмотреть файл

@ -0,0 +1,41 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------
using Microsoft.Health.Fhir.Core.Features.Search.SearchValues;
using Microsoft.Health.Fhir.SqlServer.Features.Schema.Model;
namespace Microsoft.Health.Fhir.SqlServer.Features.Storage.TvpRowGeneration
{
internal class BulkTokenTokenCompositeSearchParameterV1RowGenerator : BulkCompositeSearchParameterRowGenerator<(TokenSearchValue component1, TokenSearchValue component2), BulkTokenTokenCompositeSearchParamTableTypeV1Row>
{
private readonly BulkTokenSearchParameterV1RowGenerator _tokenRowGenerator;
public BulkTokenTokenCompositeSearchParameterV1RowGenerator(SqlServerFhirModel model, BulkTokenSearchParameterV1RowGenerator tokenRowGenerator, SearchParameterToSearchValueTypeMap searchParameterTypeMap)
: base(model, searchParameterTypeMap)
{
_tokenRowGenerator = tokenRowGenerator;
}
internal override bool TryGenerateRow(int offset, short searchParamId, (TokenSearchValue component1, TokenSearchValue component2) searchValue, out BulkTokenTokenCompositeSearchParamTableTypeV1Row row)
{
if (_tokenRowGenerator.TryGenerateRow(default, default, searchValue.component1, out var token1Row) &&
_tokenRowGenerator.TryGenerateRow(default, default, searchValue.component2, out var token2Row))
{
row = new BulkTokenTokenCompositeSearchParamTableTypeV1Row(
offset,
searchParamId,
token1Row.SystemId,
token1Row.Code,
token2Row.SystemId,
token2Row.Code);
return true;
}
row = default;
return false;
}
}
}

Просмотреть файл

@ -0,0 +1,24 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------
using Microsoft.Health.Fhir.Core.Features.Search.SearchValues;
using Microsoft.Health.Fhir.SqlServer.Features.Schema.Model;
namespace Microsoft.Health.Fhir.SqlServer.Features.Storage.TvpRowGeneration
{
internal class BulkUriSearchParameterV1RowGenerator : BulkSearchParameterRowGenerator<UriSearchValue, BulkUriSearchParamTableTypeV1Row>
{
public BulkUriSearchParameterV1RowGenerator(SqlServerFhirModel model, SearchParameterToSearchValueTypeMap searchParameterTypeMap)
: base(model, searchParameterTypeMap)
{
}
internal override bool TryGenerateRow(int offset, short searchParamId, UriSearchValue searchValue, out BulkUriSearchParamTableTypeV1Row row)
{
row = new BulkUriSearchParamTableTypeV1Row(offset, searchParamId, searchValue.Uri);
return true;
}
}
}

Просмотреть файл

@ -17,6 +17,12 @@
</EmbeddedResource>
<EmbeddedResource Include="Features\Schema\Migrations\7.diff.sql" />
<EmbeddedResource Include="Features\Schema\Migrations\7.sql">
<InputToImmutableSqlGenerator>true</InputToImmutableSqlGenerator>
<InputToMutableSqlGenerator>true</InputToMutableSqlGenerator>
<MutableClassVersion>7</MutableClassVersion>
</EmbeddedResource>
<EmbeddedResource Include="Features\Schema\Migrations\8.diff.sql" />
<EmbeddedResource Include="Features\Schema\Migrations\8.sql">
<InputToImmutableSqlGenerator>true</InputToImmutableSqlGenerator>
<InputToMutableSqlGenerator>true</InputToMutableSqlGenerator>
<MutableClassVersion>Latest</MutableClassVersion>

Просмотреть файл

@ -24,14 +24,14 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Features.Operations
{
[Collection(FhirOperationTestConstants.FhirOperationTests)]
[FhirStorageTestsFixtureArgumentSets(DataStore.All)]
public class FhirOperationDataStoreTests : IClassFixture<FhirStorageTestsFixture>, IAsyncLifetime
public class FhirOperationDataStoreExportTests : IClassFixture<FhirStorageTestsFixture>, IAsyncLifetime
{
private readonly IFhirOperationDataStore _operationDataStore;
private readonly IFhirStorageTestHelper _testHelper;
private readonly CreateExportRequest _exportRequest = new CreateExportRequest(new Uri("http://localhost/ExportJob"), ExportJobType.All);
public FhirOperationDataStoreTests(FhirStorageTestsFixture fixture)
public FhirOperationDataStoreExportTests(FhirStorageTestsFixture fixture)
{
_operationDataStore = fixture.OperationDataStore;
_testHelper = fixture.TestHelper;
@ -48,7 +48,7 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Features.Operations
}
[Fact]
public async Task GivenANewExportRequest_WhenCreatingExportJob_ThenGetsJobCreated()
public async Task GivenANewExportRequest_WhenCreatingAnExportJob_ThenAnExportJobGetsCreated()
{
var jobRecord = new ExportJobRecord(_exportRequest.RequestUri, _exportRequest.RequestType, ExportFormatTags.ResourceName, _exportRequest.ResourceType, null, "hash", rollingFileSizeInMB: 64);
@ -61,7 +61,7 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Features.Operations
}
[Fact]
public async Task GivenAMatchingJob_WhenGettingById_ThenTheMatchingJobShouldBeReturned()
public async Task GivenAMatchingExportJob_WhenGettingById_ThenTheMatchingExportJobShouldBeReturned()
{
var jobRecord = await InsertNewExportJobRecordAsync();
@ -71,7 +71,7 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Features.Operations
}
[Fact]
public async Task GivenNoMatchingJob_WhenGettingById_ThenJobNotFoundExceptionShouldBeThrown()
public async Task GivenNoMatchingExportJob_WhenGettingById_ThenJobNotFoundExceptionShouldBeThrown()
{
var jobRecord = await InsertNewExportJobRecordAsync();
@ -79,7 +79,7 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Features.Operations
}
[Fact]
public async Task GivenAMatchingJob_WhenGettingByHash_ThenTheMatchingJobShouldBeReturned()
public async Task GivenAMatchingExportJob_WhenGettingByHash_ThenTheMatchingExportJobShouldBeReturned()
{
var jobRecord = await InsertNewExportJobRecordAsync();
@ -89,7 +89,7 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Features.Operations
}
[Fact]
public async Task GivenNoMatchingJob_WhenGettingByHash_ThenNoMatchingJobShouldBeReturned()
public async Task GivenNoMatchingExportJob_WhenGettingByHash_ThenNoMatchingExportJobShouldBeReturned()
{
var jobRecord = await InsertNewExportJobRecordAsync();
@ -99,7 +99,7 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Features.Operations
}
[Fact]
public async Task GivenThereIsNoRunningJob_WhenAcquiringJobs_ThenAvailableJobsShouldBeReturned()
public async Task GivenThereIsNoRunningExportJob_WhenAcquiringExportJobs_ThenAvailableExportJobsShouldBeReturned()
{
ExportJobRecord jobRecord = await InsertNewExportJobRecordAsync();
@ -119,7 +119,7 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Features.Operations
[InlineData(OperationStatus.Completed)]
[InlineData(OperationStatus.Failed)]
[InlineData(OperationStatus.Running)]
public async Task GivenJobIsNotInQueuedState_WhenAcquiringJobs_ThenNoJobShouldBeReturned(OperationStatus operationStatus)
public async Task GivenExportJobIsNotInQueuedState_WhenAcquiringExportJobs_ThenNoExportJobShouldBeReturned(OperationStatus operationStatus)
{
ExportJobRecord jobRecord = await InsertNewExportJobRecordAsync(jr => jr.Status = operationStatus);
@ -133,9 +133,9 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Features.Operations
[InlineData(1, 0)]
[InlineData(2, 1)]
[InlineData(3, 2)]
public async Task GivenNumberOfRunningJobs_WhenAcquiringJobs_ThenAvailableJobsShouldBeReturned(ushort limit, int expectedNumberOfJobsReturned)
public async Task GivenNumberOfRunningExportJobs_WhenAcquiringExportJobs_ThenAvailableExportJobsShouldBeReturned(ushort limit, int expectedNumberOfJobsReturned)
{
await CreateRunningJob();
await CreateRunningExportJob();
ExportJobRecord jobRecord1 = await InsertNewExportJobRecordAsync(); // Queued
await InsertNewExportJobRecordAsync(jr => jr.Status = OperationStatus.Canceled);
await InsertNewExportJobRecordAsync(jr => jr.Status = OperationStatus.Completed);
@ -165,9 +165,9 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Features.Operations
}
[Fact]
public async Task GivenThereIsRunningJobThatExpired_WhenAcquiringJobs_ThenTheExpiredJobShouldBeReturned()
public async Task GivenThereIsARunningExportJobThatExpired_WhenAcquiringExportJobs_ThenTheExpiredExportJobShouldBeReturned()
{
ExportJobOutcome jobOutcome = await CreateRunningJob();
ExportJobOutcome jobOutcome = await CreateRunningExportJob();
await Task.Delay(1200);
@ -180,7 +180,7 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Features.Operations
}
[Fact]
public async Task GivenThereAreQueuedJobs_WhenSimultaneouslyAcquiringJobs_ThenCorrectJobsShouldBeReturned()
public async Task GivenThereAreQueuedExportJobs_WhenSimultaneouslyAcquiringExportJobs_ThenCorrectExportJobsShouldBeReturned()
{
ExportJobRecord[] jobRecords = new[]
{
@ -217,9 +217,9 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Features.Operations
}
[Fact]
public async Task GivenARunningJob_WhenUpdatingTheJob_ThenTheJobShouldBeUpdated()
public async Task GivenARunningExportJob_WhenUpdatingTheExportJob_ThenTheExportJobShouldBeUpdated()
{
ExportJobOutcome jobOutcome = await CreateRunningJob();
ExportJobOutcome jobOutcome = await CreateRunningExportJob();
ExportJobRecord job = jobOutcome.JobRecord;
job.Status = OperationStatus.Completed;
@ -231,9 +231,9 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Features.Operations
}
[Fact]
public async Task GivenAnOldVersionOfAJob_WhenUpdatingTheJob_ThenJobConflictExceptionShouldBeThrown()
public async Task GivenAnOldVersionOfAnExportJob_WhenUpdatingTheExportJob_ThenJobConflictExceptionShouldBeThrown()
{
ExportJobOutcome jobOutcome = await CreateRunningJob();
ExportJobOutcome jobOutcome = await CreateRunningExportJob();
ExportJobRecord job = jobOutcome.JobRecord;
// Update the job for a first time. This should not fail.
@ -246,9 +246,9 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Features.Operations
}
[Fact]
public async Task GivenANonexistentJob_WhenUpdatingTheJob_ThenJobNotFoundExceptionShouldBeThrown()
public async Task GivenANonexistentExportJob_WhenUpdatingTheExportJob_ThenJobNotFoundExceptionShouldBeThrown()
{
ExportJobOutcome jobOutcome = await CreateRunningJob();
ExportJobOutcome jobOutcome = await CreateRunningExportJob();
ExportJobRecord job = jobOutcome.JobRecord;
WeakETag jobVersion = jobOutcome.ETag;
@ -259,9 +259,9 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Features.Operations
}
[Fact]
public async Task GivenThereIsARunningJob_WhenSimultaneousUpdateCallsOccur_ThenJobConflictExceptionShouldBeThrown()
public async Task GivenThereIsARunningExportJob_WhenSimultaneousUpdateCallsOccur_ThenJobConflictExceptionShouldBeThrown()
{
ExportJobOutcome runningJobOutcome = await CreateRunningJob();
ExportJobOutcome runningJobOutcome = await CreateRunningExportJob();
var completionSource = new TaskCompletionSource<bool>();
@ -285,7 +285,7 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Features.Operations
}
}
private async Task<ExportJobOutcome> CreateRunningJob()
private async Task<ExportJobOutcome> CreateRunningExportJob()
{
// Create a queued job.
await InsertNewExportJobRecordAsync();

Просмотреть файл

@ -10,6 +10,7 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.Health.Fhir.Core.Features.Operations;
using Microsoft.Health.Fhir.Core.Features.Operations.Reindex.Models;
using Microsoft.Health.Fhir.Core.Features.Persistence;
using Microsoft.Health.Fhir.Tests.Common.FixtureParameters;
using Microsoft.Health.Fhir.Tests.Integration.Features.Operations;
using Microsoft.Health.Fhir.Tests.Integration.Persistence;
@ -18,7 +19,7 @@ using Xunit;
namespace Microsoft.Health.Fhir.Shared.Tests.Integration.Features.Operations
{
[Collection(FhirOperationTestConstants.FhirOperationTests)]
[FhirStorageTestsFixtureArgumentSets(DataStore.CosmosDb)]
[FhirStorageTestsFixtureArgumentSets(DataStore.All)]
public class FhirOperationDataStoreReindexTests : IClassFixture<FhirStorageTestsFixture>, IAsyncLifetime
{
private readonly IFhirOperationDataStore _operationDataStore;
@ -40,6 +41,24 @@ namespace Microsoft.Health.Fhir.Shared.Tests.Integration.Features.Operations
return Task.CompletedTask;
}
[Fact]
public async Task GivenAMatchingReindexJob_WhenGettingById_ThenTheMatchingReindexJobShouldBeReturned()
{
ReindexJobRecord jobRecord = await InsertNewReindexJobRecordAsync();
ReindexJobWrapper jobWrapper = await _operationDataStore.GetReindexJobByIdAsync(jobRecord.Id, CancellationToken.None);
Assert.Equal(jobRecord.Id, jobWrapper?.JobRecord?.Id);
}
[Fact]
public async Task GivenNoMatchingReindexJob_WhenGettingById_ThenJobNotFoundExceptionShouldBeThrown()
{
ReindexJobRecord jobRecord = await InsertNewReindexJobRecordAsync();
await Assert.ThrowsAsync<JobNotFoundException>(() => _operationDataStore.GetReindexJobByIdAsync("test", CancellationToken.None));
}
[Fact]
public async Task GivenThereIsNoRunningReindexJob_WhenAcquiringReindexJobs_ThenAvailableReindexJobsShouldBeReturned()
{
@ -158,6 +177,113 @@ namespace Microsoft.Health.Fhir.Shared.Tests.Integration.Features.Operations
}
}
[Fact]
public async Task GivenARunningReindexJob_WhenUpdatingTheReindexJob_ThenTheJobShouldBeUpdated()
{
ReindexJobWrapper jobWrapper = await CreateRunningReindexJob();
ReindexJobRecord job = jobWrapper.JobRecord;
job.Status = OperationStatus.Completed;
await _operationDataStore.UpdateReindexJobAsync(job, jobWrapper.ETag, CancellationToken.None);
ReindexJobWrapper updatedJobWrapper = await _operationDataStore.GetReindexJobByIdAsync(job.Id, CancellationToken.None);
ValidateReindexJobRecord(job, updatedJobWrapper?.JobRecord);
}
[Fact]
public async Task GivenAnOldVersionOfAReindexJob_WhenUpdatingTheReindexJob_ThenJobConflictExceptionShouldBeThrown()
{
ReindexJobWrapper jobWrapper = await CreateRunningReindexJob();
ReindexJobRecord job = jobWrapper.JobRecord;
// Update the job for a first time. This should not fail.
job.Status = OperationStatus.Completed;
WeakETag jobVersion = jobWrapper.ETag;
await _operationDataStore.UpdateReindexJobAsync(job, jobVersion, CancellationToken.None);
// Attempt to update the job a second time with the old version.
await Assert.ThrowsAsync<JobConflictException>(() => _operationDataStore.UpdateReindexJobAsync(job, jobVersion, CancellationToken.None));
}
[Fact]
public async Task GivenANonexistentReindexJob_WhenUpdatingTheReindexJob_ThenJobNotFoundExceptionShouldBeThrown()
{
ReindexJobWrapper jobWrapper = await CreateRunningReindexJob();
ReindexJobRecord job = jobWrapper.JobRecord;
WeakETag jobVersion = jobWrapper.ETag;
await _testHelper.DeleteReindexJobRecordAsync(job.Id);
await Assert.ThrowsAsync<JobNotFoundException>(() => _operationDataStore.UpdateReindexJobAsync(job, jobVersion, CancellationToken.None));
}
[Fact]
public async Task GivenThereIsARunningReindexJob_WhenSimultaneousUpdateCallsOccur_ThenJobConflictExceptionShouldBeThrown()
{
ReindexJobWrapper runningJobWrapper = await CreateRunningReindexJob();
var completionSource = new TaskCompletionSource<bool>();
Task<ReindexJobWrapper>[] tasks = new[]
{
WaitAndUpdateReindexJobAsync(runningJobWrapper),
WaitAndUpdateReindexJobAsync(runningJobWrapper),
WaitAndUpdateReindexJobAsync(runningJobWrapper),
};
completionSource.SetResult(true);
await Assert.ThrowsAsync<JobConflictException>(() => Task.WhenAll(tasks));
async Task<ReindexJobWrapper> WaitAndUpdateReindexJobAsync(ReindexJobWrapper jobWrapper)
{
await completionSource.Task;
jobWrapper.JobRecord.Status = OperationStatus.Completed;
return await _operationDataStore.UpdateReindexJobAsync(jobWrapper.JobRecord, jobWrapper.ETag, CancellationToken.None);
}
}
[Theory]
[InlineData(OperationStatus.Running)]
[InlineData(OperationStatus.Queued)]
[InlineData(OperationStatus.Paused)]
public async Task GivenAnActiveReindexJob_WhenGettingActiveReindexJobs_ThenTheCorrectJobIdShouldBeReturned(OperationStatus operationStatus)
{
ReindexJobRecord jobRecord = await InsertNewReindexJobRecordAsync(job => job.Status = operationStatus);
(bool, string) activeReindexJobResult = await _operationDataStore.CheckActiveReindexJobsAsync(CancellationToken.None);
Assert.True(activeReindexJobResult.Item1);
Assert.Equal(jobRecord.Id, activeReindexJobResult.Item2);
}
[Theory]
[InlineData(OperationStatus.Canceled)]
[InlineData(OperationStatus.Completed)]
[InlineData(OperationStatus.Failed)]
[InlineData(OperationStatus.Unknown)]
public async Task GivenNoActiveReindexJobs_WhenGettingActiveReindexJobs_ThenNoJobIdShouldBeReturned(OperationStatus operationStatus)
{
await InsertNewReindexJobRecordAsync(job => job.Status = operationStatus);
(bool, string) activeReindexJobResult = await _operationDataStore.CheckActiveReindexJobsAsync(CancellationToken.None);
Assert.False(activeReindexJobResult.Item1);
Assert.Empty(activeReindexJobResult.Item2);
}
[Fact]
public async Task GivenNoReindexJobs_WhenGettingActiveReindexJobs_ThenNoJobIdShouldBeReturned()
{
(bool, string) activeReindexJobResult = await _operationDataStore.CheckActiveReindexJobsAsync(CancellationToken.None);
Assert.False(activeReindexJobResult.Item1);
Assert.Empty(activeReindexJobResult.Item2);
}
private async Task<ReindexJobWrapper> CreateRunningReindexJob()
{
// Create a queued job.

Просмотреть файл

@ -50,7 +50,6 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Features.Operations.Reindex
private IFhirOperationDataStore _fhirOperationDataStore;
private IScoped<IFhirOperationDataStore> _scopedOperationDataStore;
private IScoped<IFhirDataStore> _scopedDataStore;
private ISearchParameterStatusDataStore _searchParameterStatusDataStore;
private IFhirStorageTestHelper _fhirStorageTestHelper;
private SearchParameterDefinitionManager _searchParameterDefinitionManager;
@ -61,7 +60,6 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Features.Operations.Reindex
private ISupportedSearchParameterDefinitionManager _supportedSearchParameterDefinitionManager;
private SearchParameterStatusManager _searchParameterStatusManager;
private readonly ISearchParameterSupportResolver _searchParameterSupportResolver = Substitute.For<ISearchParameterSupportResolver>();
private readonly IMediator _mediator = Substitute.For<IMediator>();
private ReindexJobWorker _reindexJobWorker;
private IScoped<ISearchService> _searchService;
@ -73,7 +71,6 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Features.Operations.Reindex
{
_fixture = fixture;
_testHelper = _fixture.TestHelper;
_searchParameterSupportResolver.IsSearchParameterSupported(Arg.Any<SearchParameterInfo>()).Returns((true, false));
}
public async Task InitializeAsync()
@ -82,7 +79,6 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Features.Operations.Reindex
_fhirStorageTestHelper = _fixture.TestHelper;
_scopedOperationDataStore = _fhirOperationDataStore.CreateMockScope();
_scopedDataStore = _fixture.DataStore.CreateMockScope();
_searchParameterStatusDataStore = _fixture.SearchParameterStatusDataStore;
_jobConfiguration = new ReindexJobConfiguration();
IOptions<ReindexJobConfiguration> optionsReindexConfig = Substitute.For<IOptions<ReindexJobConfiguration>>();
@ -97,11 +93,7 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Features.Operations.Reindex
_searchParameterDefinitionManager,
Deserializers.ResourceDeserializer);
_searchParameterStatusManager = new SearchParameterStatusManager(
_searchParameterStatusDataStore,
_searchParameterDefinitionManager,
_searchParameterSupportResolver,
_mediator);
_searchParameterStatusManager = _fixture.SearchParameterStatusManager;
_createReindexRequestHandler = new CreateReindexRequestHandler(
_fhirOperationDataStore,
@ -263,6 +255,9 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Features.Operations.Reindex
// The foo search parameter can be used to filter for the first test patient
ResourceWrapper patient = searchResults.Results.FirstOrDefault().Resource;
Assert.Contains(sampleName1, patient.RawResource.Data);
// Confirm that the reindexing operation did not create a new version of the resource
Assert.Equal("1", searchResults.Results.FirstOrDefault().Resource.Version);
}
finally
{
@ -324,6 +319,9 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Features.Operations.Reindex
// The foo search parameter can be used to filter for the first test patient
ResourceWrapper patient = searchResults.Results.FirstOrDefault().Resource;
Assert.Contains(sampleName1, patient.RawResource.Data);
// Confirm that the reindexing operation did not create a new version of the resource
Assert.Equal("1", searchResults.Results.FirstOrDefault().Resource.Version);
}
finally
{
@ -499,7 +497,7 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Features.Operations.Reindex
patientResource.Name = new List<HumanName> { new() { Family = patientName }};
patientResource.Id = patientId;
patientResource.VersionId = "v1";
patientResource.VersionId = "1";
var resourceElement = patientResource.ToResourceElement();
var rawResource = new RawResource(patientResource.ToJson(), FhirResourceFormat.Json, isMetaSet: false);
@ -516,7 +514,7 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Features.Operations.Reindex
Observation observationResource = Samples.GetDefaultObservation().ToPoco<Observation>();
observationResource.Id = observationId;
observationResource.VersionId = "v1";
observationResource.VersionId = "1";
var resourceElement = observationResource.ToResourceElement();
var rawResource = new RawResource(observationResource.ToJson(), FhirResourceFormat.Json, isMetaSet: false);

Просмотреть файл

@ -0,0 +1,130 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Health.Extensions.DependencyInjection;
using Microsoft.Health.Fhir.Core.Features;
using Microsoft.Health.Fhir.Core.Features.Definition;
using Microsoft.Health.Fhir.Core.Features.Persistence;
using Microsoft.Health.Fhir.Core.Features.Search;
using Microsoft.Health.Fhir.Core.Models;
using Microsoft.Health.Fhir.Core.UnitTests.Extensions;
using Microsoft.Health.Fhir.Tests.Common;
using Microsoft.Health.Fhir.Tests.Common.FixtureParameters;
using Microsoft.Health.Fhir.Tests.Integration.Persistence;
using NSubstitute;
using Xunit;
using Task = System.Threading.Tasks.Task;
namespace Microsoft.Health.Fhir.Tests.Integration.Features.Operations.Reindex
{
[FhirStorageTestsFixtureArgumentSets(DataStore.All)]
public class ReindexSearchTests : IClassFixture<FhirStorageTestsFixture>
{
private readonly IScoped<IFhirDataStore> _scopedDataStore;
private readonly IScoped<ISearchService> _searchService;
private readonly SearchParameterDefinitionManager _searchParameterDefinitionManager;
private readonly ISearchIndexer _searchIndexer = Substitute.For<ISearchIndexer>();
public ReindexSearchTests(FhirStorageTestsFixture fixture)
{
_scopedDataStore = fixture.DataStore.CreateMockScope();
_searchService = fixture.SearchService.CreateMockScope();
_searchParameterDefinitionManager = fixture.SearchParameterDefinitionManager;
}
[Fact]
public async Task GivenResourceWithMatchingHash_WhenPerformingReindexSearch_ThenResourceShouldNotBeReturned()
{
ResourceWrapper testPatient = null;
try
{
UpsertOutcome outcome = await UpsertPatientData();
testPatient = outcome.Wrapper;
var patientHash = testPatient.SearchParameterHash;
var queryParametersList = new List<Tuple<string, string>>()
{
Tuple.Create(KnownQueryParameterNames.Count, "100"),
Tuple.Create(KnownQueryParameterNames.Type, "Patient"),
};
// Pass in the same hash value
SearchResult searchResult = await _searchService.Value.SearchForReindexAsync(queryParametersList, patientHash, false, CancellationToken.None);
// A reindex search should return all the resources that have a different hash value than the one specified
Assert.Empty(searchResult.Results);
}
finally
{
if (testPatient != null)
{
await _scopedDataStore.Value.HardDeleteAsync(testPatient.ToResourceKey(), CancellationToken.None);
}
}
}
[Fact]
public async Task GivenResourceWithDifferentHash_WhenPerformingReindexSearch_ThenResourceShouldBeReturned()
{
ResourceWrapper testPatient = null;
try
{
UpsertOutcome outcome = await UpsertPatientData();
testPatient = outcome.Wrapper;
var queryParametersList = new List<Tuple<string, string>>()
{
Tuple.Create(KnownQueryParameterNames.Count, "100"),
Tuple.Create(KnownQueryParameterNames.Type, "Patient"),
};
// Pass in a different hash value
SearchResult searchResult = await _searchService.Value.SearchForReindexAsync(queryParametersList, "differentHash", false, CancellationToken.None);
// A reindex search should return all the resources that have a different hash value than the one specified.
Assert.Single(searchResult.Results);
}
finally
{
if (testPatient != null)
{
await _scopedDataStore.Value.HardDeleteAsync(testPatient.ToResourceKey(), CancellationToken.None);
}
}
}
private async Task<UpsertOutcome> UpsertPatientData()
{
var json = Samples.GetJson("Patient");
var rawResource = new RawResource(json, FhirResourceFormat.Json, isMetaSet: false);
var resourceRequest = new ResourceRequest(WebRequestMethods.Http.Put);
var compartmentIndices = Substitute.For<CompartmentIndices>();
var resourceElement = Deserializers.ResourceDeserializer.DeserializeRaw(rawResource, "v1", DateTimeOffset.UtcNow);
var searchIndices = _searchIndexer.Extract(resourceElement);
var wrapper = new ResourceWrapper(
resourceElement,
rawResource,
resourceRequest,
false,
searchIndices,
compartmentIndices,
new List<KeyValuePair<string, string>>(),
_searchParameterDefinitionManager.GetSearchParameterHashForResourceType("Patient"));
return await _scopedDataStore.Value.UpsertAsync(wrapper, null, true, true, CancellationToken.None);
}
}
}

Просмотреть файл

@ -10,10 +10,11 @@
</PropertyGroup>
<ItemGroup>
<Compile Include="$(MSBuildThisFileDirectory)Features\Operations\FhirOperationDataStoreReindexTests.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Features\Operations\FhirOperationDataStoreTests.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Features\Operations\FhirOperationDataStoreExportTests.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Features\Operations\Export\CreateExportRequestHandlerTests.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Features\Operations\FhirOperationTestConstants.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Features\Operations\Reindex\ReindexJobTests.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Features\Operations\Reindex\ReindexSearchTests.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Persistence\SearchParameterStatusDataStoreTests.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Persistence\SqlServerSchemaUpgradeTests.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Persistence\CosmosDbFhirStorageTestHelper.cs" />

Просмотреть файл

@ -20,6 +20,7 @@ using Microsoft.Health.Fhir.Core.Features.Operations;
using Microsoft.Health.Fhir.Core.Features.Persistence;
using Microsoft.Health.Fhir.Core.Features.Search;
using Microsoft.Health.Fhir.Core.Features.Search.Expressions.Parsers;
using Microsoft.Health.Fhir.Core.Features.Search.Parameters;
using Microsoft.Health.Fhir.Core.Features.Search.Registry;
using Microsoft.Health.Fhir.Core.Features.Search.SearchValues;
using Microsoft.Health.Fhir.Core.Models;
@ -53,6 +54,7 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Persistence
private ISearchService _searchService;
private SearchParameterDefinitionManager _searchParameterDefinitionManager;
private SupportedSearchParameterDefinitionManager _supportedSearchParameterDefinitionManager;
private SearchParameterStatusManager _searchParameterStatusManager;
private CosmosClient _cosmosClient;
public CosmosDbFhirStorageTestsFixture()
@ -96,6 +98,8 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Persistence
_filebasedSearchParameterStatusDataStore = new FilebasedSearchParameterStatusDataStore(_searchParameterDefinitionManager, ModelInfoProvider.Instance);
IMediator mediator = Substitute.For<IMediator>();
var updaters = new ICollectionUpdater[]
{
new FhirCollectionSettingsUpdater(_cosmosDataStoreConfiguration, optionsMonitor, NullLogger<FhirCollectionSettingsUpdater>.Instance),
@ -103,7 +107,7 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Persistence
new CosmosDbSearchParameterStatusInitializer(
() => _filebasedSearchParameterStatusDataStore,
new CosmosQueryFactory(
new CosmosResponseProcessor(fhirRequestContextAccessor, Substitute.For<IMediator>(), Substitute.For<ICosmosQueryLogger>(), NullLogger<CosmosResponseProcessor>.Instance),
new CosmosResponseProcessor(fhirRequestContextAccessor, mediator, Substitute.For<ICosmosQueryLogger>(), NullLogger<CosmosResponseProcessor>.Instance),
NullFhirCosmosQueryLogger.Instance),
_cosmosDataStoreConfiguration),
};
@ -115,12 +119,12 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Persistence
var cosmosResponseProcessor = Substitute.For<ICosmosResponseProcessor>();
var responseProcessor = new CosmosResponseProcessor(fhirRequestContextAccessor, Substitute.For<IMediator>(), Substitute.For<ICosmosQueryLogger>(), NullLogger<CosmosResponseProcessor>.Instance);
var responseProcessor = new CosmosResponseProcessor(fhirRequestContextAccessor, mediator, Substitute.For<ICosmosQueryLogger>(), NullLogger<CosmosResponseProcessor>.Instance);
var handler = new FhirCosmosResponseHandler(() => new NonDisposingScope(_container), _cosmosDataStoreConfiguration, fhirRequestContextAccessor, responseProcessor);
var retryExceptionPolicyFactory = new RetryExceptionPolicyFactory(_cosmosDataStoreConfiguration, Substitute.For<IFhirRequestContextAccessor>());
var retryExceptionPolicyFactory = new RetryExceptionPolicyFactory(_cosmosDataStoreConfiguration, fhirRequestContextAccessor);
var documentClientInitializer = new FhirCosmosClientInitializer(testProvider, () => new[] { handler }, retryExceptionPolicyFactory, NullLogger<FhirCosmosClientInitializer>.Instance);
_cosmosClient = documentClientInitializer.CreateCosmosClient(_cosmosDataStoreConfiguration);
var fhirCollectionInitializer = new CollectionInitializer(_cosmosCollectionConfiguration, _cosmosDataStoreConfiguration, upgradeManager, retryExceptionPolicyFactory, Substitute.For<ICosmosClientTestProvider>(), NullLogger<CollectionInitializer>.Instance);
var fhirCollectionInitializer = new CollectionInitializer(_cosmosCollectionConfiguration, _cosmosDataStoreConfiguration, upgradeManager, retryExceptionPolicyFactory, testProvider, NullLogger<CollectionInitializer>.Instance);
// Cosmos DB emulators throws errors when multiple collections are initialized concurrently.
// Use the semaphore to only allow one initialization at a time.
@ -155,7 +159,7 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Persistence
retryExceptionPolicyFactory,
NullLogger<CosmosFhirDataStore>.Instance,
options,
new Lazy<ISupportedSearchParameterDefinitionManager>(Substitute.For<ISupportedSearchParameterDefinitionManager>()));
new Lazy<ISupportedSearchParameterDefinitionManager>(_supportedSearchParameterDefinitionManager));
_fhirOperationDataStore = new CosmosFhirOperationDataStore(
documentClient,
@ -178,10 +182,19 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Persistence
new QueryBuilder(),
_searchParameterDefinitionManager,
fhirRequestContextAccessor,
new CosmosDataStoreConfiguration(),
_cosmosDataStoreConfiguration,
cosmosDbPhysicalPartitionInfo,
new QueryPartitionStatisticsCache());
ISearchParameterSupportResolver searchParameterSupportResolver = Substitute.For<ISearchParameterSupportResolver>();
searchParameterSupportResolver.IsSearchParameterSupported(Arg.Any<SearchParameterInfo>()).Returns((true, false));
_searchParameterStatusManager = new SearchParameterStatusManager(
_searchParameterStatusDataStore,
_searchParameterDefinitionManager,
searchParameterSupportResolver,
mediator);
_fhirStorageTestHelper = new CosmosDbFhirStorageTestHelper(_container);
}
@ -242,6 +255,11 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Persistence
return _supportedSearchParameterDefinitionManager;
}
if (serviceType == typeof(SearchParameterStatusManager))
{
return _searchParameterStatusManager;
}
return null;
}
}

Просмотреть файл

@ -6,6 +6,8 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Hl7.Fhir.ElementModel;
@ -18,14 +20,16 @@ using Microsoft.Health.Fhir.Core;
using Microsoft.Health.Fhir.Core.Exceptions;
using Microsoft.Health.Fhir.Core.Extensions;
using Microsoft.Health.Fhir.Core.Features.Conformance;
using Microsoft.Health.Fhir.Core.Features.Definition;
using Microsoft.Health.Fhir.Core.Features.Persistence;
using Microsoft.Health.Fhir.Core.Features.Search;
using Microsoft.Health.Fhir.Core.Features.Search.Registry;
using Microsoft.Health.Fhir.Core.Features.Search.SearchValues;
using Microsoft.Health.Fhir.Core.Models;
using Microsoft.Health.Fhir.CosmosDb.Features.Storage;
using Microsoft.Health.Fhir.Tests.Common;
using Microsoft.Health.Fhir.Tests.Common.FixtureParameters;
using Microsoft.Health.Test.Utilities;
using NSubstitute;
using Xunit;
using Task = System.Threading.Tasks.Task;
@ -42,7 +46,8 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Persistence
private readonly ResourceDeserializer _deserializer;
private readonly FhirJsonParser _fhirJsonParser;
private readonly IFhirDataStore _dataStore;
private ConformanceProviderBase _conformanceProvider;
private readonly SearchParameterDefinitionManager _searchParameterDefinitionManager;
private readonly ConformanceProviderBase _conformanceProvider;
public FhirStorageTests(FhirStorageTestsFixture fixture)
{
@ -52,6 +57,7 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Persistence
_dataStore = fixture.DataStore;
_fhirJsonParser = fixture.JsonParser;
_conformanceProvider = fixture.ConformanceProvider;
_searchParameterDefinitionManager = fixture.SearchParameterDefinitionManager;
Mediator = fixture.Mediator;
}
@ -603,93 +609,321 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Persistence
}
[Fact]
[FhirStorageTestsFixtureArgumentSets(DataStore.CosmosDb)]
public async Task GivenAnUpdatedResource_WhenUpdateSearchIndexForResourceAsync_ThenResourceGetsUpdated()
public async Task GivenAnUpdatedResource_WhenUpdatingSearchParameterIndexAsync_ThenResourceMetadataIsUnchanged()
{
ResourceElement patientResource = Samples.GetJsonSample("Patient");
ResourceElement patientResource = CreatePatientResourceElement("Patient", Guid.NewGuid().ToString());
SaveOutcome upsertResult = await Mediator.UpsertResourceAsync(patientResource);
(ResourceWrapper original, ResourceWrapper updated) = await CreateUpdatedWrapperFromExistingResource(upsertResult);
SearchParameter searchParam = null;
const string searchParamName = "newSearchParam";
ResourceWrapper replaceResult = await _dataStore.UpdateSearchIndexForResourceAsync(updated, WeakETag.FromVersionId(original.Version), CancellationToken.None);
try
{
searchParam = await CreatePatientSearchParam(searchParamName, SearchParamType.String, "Patient.name");
ISearchValue searchValue = new StringSearchValue(searchParamName);
(ResourceWrapper original, ResourceWrapper updated) = await CreateUpdatedWrapperFromExistingPatient(upsertResult, searchParam, searchValue);
await _dataStore.UpdateSearchIndexForResourceAsync(updated, WeakETag.FromVersionId(original.Version), CancellationToken.None);
// Get the reindexed resource from the database
var resourceKey1 = new ResourceKey(upsertResult.RawResourceElement.InstanceType, upsertResult.RawResourceElement.Id, upsertResult.RawResourceElement.VersionId);
ResourceWrapper reindexed = await _dataStore.GetAsync(resourceKey1, CancellationToken.None);
VerifyReindexedResource(original, reindexed);
}
finally
{
if (searchParam != null)
{
_searchParameterDefinitionManager.DeleteSearchParameter(searchParam.ToTypedElement());
await _fixture.TestHelper.DeleteSearchParameterStatusAsync(searchParam.Url, CancellationToken.None);
}
}
}
[Fact]
public async Task GivenAnUpdatedResourceWithWrongWeakETag_WhenUpdatingSearchParameterIndexAsync_ThenExceptionIsThrown()
{
ResourceElement patientResource = CreatePatientResourceElement("Patient", Guid.NewGuid().ToString());
SaveOutcome upsertResult = await Mediator.UpsertResourceAsync(patientResource);
SearchParameter searchParam1 = null;
const string searchParamName1 = "newSearchParam1";
SearchParameter searchParam2 = null;
const string searchParamName2 = "newSearchParam2";
try
{
searchParam1 = await CreatePatientSearchParam(searchParamName1, SearchParamType.String, "Patient.name");
ISearchValue searchValue1 = new StringSearchValue(searchParamName1);
(ResourceWrapper original, ResourceWrapper updatedWithSearchParam1) = await CreateUpdatedWrapperFromExistingPatient(upsertResult, searchParam1, searchValue1);
await _dataStore.UpsertAsync(updatedWithSearchParam1, WeakETag.FromVersionId(original.Version), allowCreate: false, keepHistory: false, CancellationToken.None);
// Let's update the resource again with new information
searchParam2 = await CreatePatientSearchParam(searchParamName2, SearchParamType.Token, "Patient.gender");
ISearchValue searchValue2 = new TokenSearchValue("system", "code", "text");
// Create the updated wrapper from the original resource that has the outdated version
(_, ResourceWrapper updatedWithSearchParam2) = await CreateUpdatedWrapperFromExistingPatient(upsertResult, searchParam2, searchValue2, original);
// Attempt to reindex the resource
await Assert.ThrowsAsync<PreconditionFailedException>(() => _dataStore.UpdateSearchIndexForResourceAsync(updatedWithSearchParam2, WeakETag.FromVersionId(original.Version), CancellationToken.None));
}
finally
{
if (searchParam1 != null)
{
_searchParameterDefinitionManager.DeleteSearchParameter(searchParam1.ToTypedElement());
await _fixture.TestHelper.DeleteSearchParameterStatusAsync(searchParam1.Url, CancellationToken.None);
}
if (searchParam2 != null)
{
_searchParameterDefinitionManager.DeleteSearchParameter(searchParam2.ToTypedElement());
await _fixture.TestHelper.DeleteSearchParameterStatusAsync(searchParam2.Url, CancellationToken.None);
}
}
}
[Fact]
public async Task GivenAnUpdatedResourceWithWrongResourceId_WhenUpdatingSearchParameterIndexAsync_ThenExceptionIsThrown()
{
ResourceElement patientResource = CreatePatientResourceElement("Patient", Guid.NewGuid().ToString());
SaveOutcome upsertResult = await Mediator.UpsertResourceAsync(patientResource);
SearchParameter searchParam = null;
const string searchParamName = "newSearchParam";
try
{
searchParam = await CreatePatientSearchParam(searchParamName, SearchParamType.String, "Patient.name");
ISearchValue searchValue = new StringSearchValue(searchParamName);
// Update the resource wrapper, adding the new search parameter and a different ID
(ResourceWrapper original, ResourceWrapper updated) = await CreateUpdatedWrapperFromExistingPatient(upsertResult, searchParam, searchValue, null, Guid.NewGuid().ToString());
await Assert.ThrowsAsync<ResourceNotFoundException>(() => _dataStore.UpdateSearchIndexForResourceAsync(updated, WeakETag.FromVersionId(original.Version), CancellationToken.None));
}
finally
{
if (searchParam != null)
{
_searchParameterDefinitionManager.DeleteSearchParameter(searchParam.ToTypedElement());
await _fixture.TestHelper.DeleteSearchParameterStatusAsync(searchParam.Url, CancellationToken.None);
}
}
}
[Fact]
public async Task GivenUpdatedResources_WhenBulkUpdatingSearchParameterIndicesAsync_ThenResourceMetadataIsUnchanged()
{
ResourceElement patientResource1 = CreatePatientResourceElement("Patient1", Guid.NewGuid().ToString());
SaveOutcome upsertResult1 = await Mediator.UpsertResourceAsync(patientResource1);
ResourceElement patientResource2 = CreatePatientResourceElement("Patient2", Guid.NewGuid().ToString());
SaveOutcome upsertResult2 = await Mediator.UpsertResourceAsync(patientResource2);
SearchParameter searchParam = null;
const string searchParamName = "newSearchParam";
try
{
searchParam = await CreatePatientSearchParam(searchParamName, SearchParamType.String, "Patient.name");
ISearchValue searchValue = new StringSearchValue(searchParamName);
(ResourceWrapper original1, ResourceWrapper updated1) = await CreateUpdatedWrapperFromExistingPatient(upsertResult1, searchParam, searchValue);
(ResourceWrapper original2, ResourceWrapper updated2) = await CreateUpdatedWrapperFromExistingPatient(upsertResult2, searchParam, searchValue);
var resources = new List<ResourceWrapper> { updated1, updated2 };
await _dataStore.UpdateSearchParameterIndicesBatchAsync(resources, CancellationToken.None);
// Get the reindexed resources from the database
var resourceKey1 = new ResourceKey(upsertResult1.RawResourceElement.InstanceType, upsertResult1.RawResourceElement.Id, upsertResult1.RawResourceElement.VersionId);
ResourceWrapper reindexed1 = await _dataStore.GetAsync(resourceKey1, CancellationToken.None);
var resourceKey2 = new ResourceKey(upsertResult2.RawResourceElement.InstanceType, upsertResult2.RawResourceElement.Id, upsertResult2.RawResourceElement.VersionId);
ResourceWrapper reindexed2 = await _dataStore.GetAsync(resourceKey2, CancellationToken.None);
VerifyReindexedResource(original1, reindexed1);
VerifyReindexedResource(original2, reindexed2);
}
finally
{
if (searchParam != null)
{
_searchParameterDefinitionManager.DeleteSearchParameter(searchParam.ToTypedElement());
await _fixture.TestHelper.DeleteSearchParameterStatusAsync(searchParam.Url, CancellationToken.None);
}
}
}
[Fact]
public async Task GivenUpdatedResourcesWithWrongWeakETag_WhenBulkUpdatingSearchParameterIndicesAsync_ThenExceptionIsThrown()
{
ResourceElement patientResource1 = CreatePatientResourceElement("Patient1", Guid.NewGuid().ToString());
SaveOutcome upsertResult1 = await Mediator.UpsertResourceAsync(patientResource1);
ResourceElement patientResource2 = CreatePatientResourceElement("Patient2", Guid.NewGuid().ToString());
SaveOutcome upsertResult2 = await Mediator.UpsertResourceAsync(patientResource2);
SearchParameter searchParam1 = null;
const string searchParamName1 = "newSearchParam1";
SearchParameter searchParam2 = null;
const string searchParamName2 = "newSearchParam2";
try
{
searchParam1 = await CreatePatientSearchParam(searchParamName1, SearchParamType.String, "Patient.name");
ISearchValue searchValue1 = new StringSearchValue(searchParamName1);
(ResourceWrapper original1, ResourceWrapper updated1) = await CreateUpdatedWrapperFromExistingPatient(upsertResult1, searchParam1, searchValue1);
(ResourceWrapper original2, ResourceWrapper updated2) = await CreateUpdatedWrapperFromExistingPatient(upsertResult2, searchParam1, searchValue1);
await _dataStore.UpsertAsync(updated1, WeakETag.FromVersionId(original1.Version), allowCreate: false, keepHistory: false, CancellationToken.None);
await _dataStore.UpsertAsync(updated2, WeakETag.FromVersionId(original2.Version), allowCreate: false, keepHistory: false, CancellationToken.None);
// Let's update the resources again with new information
searchParam2 = await CreatePatientSearchParam(searchParamName2, SearchParamType.Token, "Patient.gender");
ISearchValue searchValue2 = new TokenSearchValue("system", "code", "text");
// Create the updated wrappers using the original resource and its outdated version
(_, ResourceWrapper updated1WithSearchParam2) = await CreateUpdatedWrapperFromExistingPatient(upsertResult1, searchParam2, searchValue2, original1);
(_, ResourceWrapper updated2WithSearchParam2) = await CreateUpdatedWrapperFromExistingPatient(upsertResult2, searchParam2, searchValue2, original2);
var resources = new List<ResourceWrapper> { updated1WithSearchParam2, updated2WithSearchParam2 };
// Attempt to reindex resources with the old versions
await Assert.ThrowsAsync<PreconditionFailedException>(() => _dataStore.UpdateSearchParameterIndicesBatchAsync(resources, CancellationToken.None));
}
finally
{
if (searchParam1 != null)
{
_searchParameterDefinitionManager.DeleteSearchParameter(searchParam1.ToTypedElement());
await _fixture.TestHelper.DeleteSearchParameterStatusAsync(searchParam1.Url, CancellationToken.None);
}
if (searchParam2 != null)
{
_searchParameterDefinitionManager.DeleteSearchParameter(searchParam2.ToTypedElement());
await _fixture.TestHelper.DeleteSearchParameterStatusAsync(searchParam2.Url, CancellationToken.None);
}
}
}
[Fact]
public async Task GivenUpdatedResourcesWithWrongResourceId_WhenBulkUpdatingSearchParameterIndicesAsync_ThenExceptionIsThrown()
{
ResourceElement patientResource1 = CreatePatientResourceElement("Patient1", Guid.NewGuid().ToString());
SaveOutcome upsertResult1 = await Mediator.UpsertResourceAsync(patientResource1);
ResourceElement patientResource2 = CreatePatientResourceElement("Patient2", Guid.NewGuid().ToString());
SaveOutcome upsertResult2 = await Mediator.UpsertResourceAsync(patientResource2);
SearchParameter searchParam = null;
const string searchParamName = "newSearchParam";
try
{
searchParam = await CreatePatientSearchParam(searchParamName, SearchParamType.String, "Patient.name");
ISearchValue searchValue = new StringSearchValue(searchParamName);
// Update the resource wrappers, adding the new search parameter and a different ID
(_, ResourceWrapper updated1) = await CreateUpdatedWrapperFromExistingPatient(upsertResult1, searchParam, searchValue, null, Guid.NewGuid().ToString());
(_, ResourceWrapper updated2) = await CreateUpdatedWrapperFromExistingPatient(upsertResult2, searchParam, searchValue, null, Guid.NewGuid().ToString());
var resources = new List<ResourceWrapper> { updated1, updated2 };
await Assert.ThrowsAsync<ResourceNotFoundException>(() => _dataStore.UpdateSearchParameterIndicesBatchAsync(resources, CancellationToken.None));
}
finally
{
if (searchParam != null)
{
_searchParameterDefinitionManager.DeleteSearchParameter(searchParam.ToTypedElement());
await _fixture.TestHelper.DeleteSearchParameterStatusAsync(searchParam.Url, CancellationToken.None);
}
}
}
private static void VerifyReindexedResource(ResourceWrapper original, ResourceWrapper replaceResult)
{
Assert.Equal(original.ResourceId, replaceResult.ResourceId);
Assert.Equal(original.Version, replaceResult.Version);
Assert.Equal(original.ResourceTypeName, replaceResult.ResourceTypeName);
Assert.Equal(original.LastModified, replaceResult.LastModified);
Assert.NotEqual((original as FhirCosmosResourceWrapper).ETag, (replaceResult as FhirCosmosResourceWrapper).ETag);
}
[Fact]
[FhirStorageTestsFixtureArgumentSets(DataStore.CosmosDb)]
public async Task GivenAnUpdatedResourceWithWrongWeakETag_WhenUpdateSearchIndexForResourceAsync_ThenExceptionIsThrown()
{
ResourceElement patientResource = Samples.GetJsonSample("Patient");
SaveOutcome upsertResult = await Mediator.UpsertResourceAsync(patientResource);
(ResourceWrapper originalWrapper, ResourceWrapper updatedWrapper) = await CreateUpdatedWrapperFromExistingResource(upsertResult);
UpsertOutcome upsertOutcome = await _dataStore.UpsertAsync(updatedWrapper, WeakETag.FromVersionId(originalWrapper.Version), allowCreate: false, keepHistory: false, CancellationToken.None);
// Let's update the resource again with new information.
var searchParamInfo = new SearchParameterInfo("newSearchParam2", "newSearchParam2");
var searchIndex = new SearchIndexEntry(searchParamInfo, new TokenSearchValue("system", "code", "text"));
var searchIndices = new List<SearchIndexEntry>() { searchIndex };
updatedWrapper = new ResourceWrapper(
originalWrapper.ResourceId,
originalWrapper.Version,
originalWrapper.ResourceTypeName,
originalWrapper.RawResource,
originalWrapper.Request,
originalWrapper.LastModified,
deleted: false,
searchIndices,
originalWrapper.CompartmentIndices,
originalWrapper.LastModifiedClaims);
// Attempt to replace resource with the old weaketag
await Assert.ThrowsAsync<PreconditionFailedException>(() => _dataStore.UpdateSearchIndexForResourceAsync(updatedWrapper, WeakETag.FromVersionId(originalWrapper.Version), CancellationToken.None));
}
[Fact]
[FhirStorageTestsFixtureArgumentSets(DataStore.CosmosDb)]
public async Task GivenAnUpdatedResourceWithWrongResourceId_WhenUpdateSearchIndexForResourceAsync_ThenExceptionIsThrown()
{
ResourceElement patientResource = Samples.GetJsonSample("Patient");
SaveOutcome upsertResult = await Mediator.UpsertResourceAsync(patientResource);
(ResourceWrapper original, ResourceWrapper updated) = await CreateUpdatedWrapperFromExistingResource(upsertResult, Guid.NewGuid().ToString());
await Assert.ThrowsAsync<ResourceNotFoundException>(() => _dataStore.UpdateSearchIndexForResourceAsync(updated, WeakETag.FromVersionId(original.Version), CancellationToken.None));
}
private async Task<(ResourceWrapper original, ResourceWrapper updated)> CreateUpdatedWrapperFromExistingResource(
private async Task<(ResourceWrapper original, ResourceWrapper updated)> CreateUpdatedWrapperFromExistingPatient(
SaveOutcome upsertResult,
SearchParameter searchParam,
ISearchValue searchValue,
ResourceWrapper originalWrapper = null,
string updatedId = null)
{
// Get wrapper from data store directly
ResourceKey resourceKey = new ResourceKey(upsertResult.RawResourceElement.InstanceType, upsertResult.RawResourceElement.Id, upsertResult.RawResourceElement.VersionId);
FhirCosmosResourceWrapper originalWrapper = (FhirCosmosResourceWrapper)await _dataStore.GetAsync(resourceKey, CancellationToken.None);
var searchIndex = new SearchIndexEntry(searchParam.ToInfo(), searchValue);
var searchIndices = new List<SearchIndexEntry> { searchIndex };
// Add new search index entry to existing wrapper.
SearchParameterInfo searchParamInfo = new SearchParameterInfo("newSearchParam", "newSearchParam");
SearchIndexEntry searchIndex = new SearchIndexEntry(searchParamInfo, new NumberSearchValue(12));
List<SearchIndexEntry> searchIndices = new List<SearchIndexEntry>() { searchIndex };
if (originalWrapper == null)
{
// Get wrapper from data store directly
var resourceKey = new ResourceKey(upsertResult.RawResourceElement.InstanceType, upsertResult.RawResourceElement.Id, upsertResult.RawResourceElement.VersionId);
originalWrapper = await _dataStore.GetAsync(resourceKey, CancellationToken.None);
}
// Add new search index entry to existing wrapper
var updatedWrapper = new ResourceWrapper(
updatedId == null ? originalWrapper.Id : updatedId,
updatedId ?? originalWrapper.ResourceId,
originalWrapper.Version,
originalWrapper.ResourceTypeName,
originalWrapper.RawResource,
originalWrapper.Request,
new ResourceRequest(HttpMethod.Post, null),
originalWrapper.LastModified,
deleted: false,
searchIndices,
originalWrapper.CompartmentIndices,
originalWrapper.LastModifiedClaims);
originalWrapper.LastModifiedClaims,
_searchParameterDefinitionManager.GetSearchParameterHashForResourceType("Patient"));
return (originalWrapper, updatedWrapper);
}
private async Task<SearchParameter> CreatePatientSearchParam(string searchParamName, SearchParamType type, string expression)
{
var searchParam = new SearchParameter
{
Url = $"http://hl7.org/fhir/SearchParameter/Patient-{searchParamName}",
Type = type,
Base = new List<ResourceType?> { ResourceType.Patient },
Expression = expression,
Name = searchParamName,
Code = searchParamName,
};
_searchParameterDefinitionManager.AddNewSearchParameters(new List<ITypedElement> { searchParam.ToTypedElement() });
// Add the search parameter to the datastore
await _fixture.SearchParameterStatusManager.UpdateSearchParameterStatusAsync(new List<string> { searchParam.Url }, SearchParameterStatus.Supported);
return searchParam;
}
private ResourceElement CreatePatientResourceElement(string patientName, string id)
{
var json = Samples.GetJson("Patient");
json = json.Replace("Chalmers", patientName);
json = json.Replace("\"id\": \"example\"", "\"id\": \"" + id + "\"");
var rawResource = new RawResource(json, FhirResourceFormat.Json, isMetaSet: false);
return Deserializers.ResourceDeserializer.DeserializeRaw(rawResource, "v1", DateTimeOffset.UtcNow);
}
private async Task ExecuteAndVerifyException<TException>(Func<Task> action)
where TException : Exception
{

Просмотреть файл

@ -5,6 +5,7 @@
using System;
using System.Net.Http;
using System.Security.AccessControl;
using Hl7.Fhir.ElementModel;
using Hl7.Fhir.Model;
using Hl7.Fhir.Serialization;
@ -96,6 +97,8 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Persistence
public SchemaUpgradeRunner SchemaUpgradeRunner => _fixture.GetRequiredService<SchemaUpgradeRunner>();
public SearchParameterStatusManager SearchParameterStatusManager => _fixture.GetRequiredService<SearchParameterStatusManager>();
public void Dispose()
{
(_fixture as IDisposable)?.Dispose();

Просмотреть файл

@ -126,8 +126,25 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Persistence
(string type, string name)[] deprecatedObjectToIgnore =
{
("Procedure", "[dbo].[UpsertResource]"),
("Procedure", "[dbo].[UpsertResource_2]"),
("TableType", "[dbo].[ReferenceSearchParamTableType_1]"),
("TableType", "[dbo].[ReferenceTokenCompositeSearchParamTableType_1]"),
("TableType", "[dbo].[ResourceWriteClaimTableType_1]"),
("TableType", "[dbo].[CompartmentAssignmentTableType_1]"),
("TableType", "[dbo].[ReferenceSearchParamTableType_2]"),
("TableType", "[dbo].[TokenSearchParamTableType_1]"),
("TableType", "[dbo].[TokenTextTableType_1]"),
("TableType", "[dbo].[StringSearchParamTableType_1]"),
("TableType", "[dbo].[UriSearchParamTableType_1]"),
("TableType", "[dbo].[NumberSearchParamTableType_1]"),
("TableType", "[dbo].[QuantitySearchParamTableType_1]"),
("TableType", "[dbo].[DateTimeSearchParamTableType_1]"),
("TableType", "[dbo].[ReferenceTokenCompositeSearchParamTableType_2]"),
("TableType", "[dbo].[TokenTokenCompositeSearchParamTableType_1]"),
("TableType", "[dbo].[TokenDateTimeCompositeSearchParamTableType_1]"),
("TableType", "[dbo].[TokenQuantityCompositeSearchParamTableType_1]"),
("TableType", "[dbo].[TokenStringCompositeSearchParamTableType_1]"),
("TableType", "[dbo].[TokenNumberNumberCompositeSearchParamTableType_1]"),
};
var remainingDifferences = result.Differences.Where(
@ -175,6 +192,8 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Persistence
await command.Connection.OpenAsync(cancellationToken);
await command.ExecuteNonQueryAsync(cancellationToken);
}
_sqlServerFhirModel.RemoveSearchParamIdToUriMapping(uri);
}
public async Task DeleteAllReindexJobRecordsAsync(CancellationToken cancellationToken = default)

Просмотреть файл

@ -20,6 +20,7 @@ using Microsoft.Health.Fhir.Core.Features.Operations;
using Microsoft.Health.Fhir.Core.Features.Persistence;
using Microsoft.Health.Fhir.Core.Features.Search;
using Microsoft.Health.Fhir.Core.Features.Search.Expressions.Parsers;
using Microsoft.Health.Fhir.Core.Features.Search.Parameters;
using Microsoft.Health.Fhir.Core.Features.Search.Registry;
using Microsoft.Health.Fhir.Core.Features.Search.SearchValues;
using Microsoft.Health.Fhir.Core.Models;
@ -58,6 +59,7 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Persistence
private readonly ISearchService _searchService;
private readonly SearchParameterDefinitionManager _searchParameterDefinitionManager;
private readonly SupportedSearchParameterDefinitionManager _supportedSearchParameterDefinitionManager;
private readonly SearchParameterStatusManager _searchParameterStatusManager;
public SqlServerFhirStorageTestsFixture()
: this(SchemaVersionConstants.Max, $"FHIRINTEGRATIONTEST_{DateTimeOffset.UtcNow.ToUnixTimeSeconds()}_{BigInteger.Abs(new BigInteger(Guid.NewGuid().ToByteArray()))}")
@ -100,18 +102,23 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Persistence
sqlConnectionStringProvider,
NullLogger<SqlServerFhirModel>.Instance);
var searchParameterToSearchValueTypeMap = new SearchParameterToSearchValueTypeMap();
var serviceCollection = new ServiceCollection();
serviceCollection.AddSqlServerTableRowParameterGenerators();
serviceCollection.AddSingleton(sqlServerFhirModel);
serviceCollection.AddSingleton(searchParameterToSearchValueTypeMap);
ServiceProvider serviceProvider = serviceCollection.BuildServiceProvider();
var upsertResourceTvpGeneratorV6 = serviceProvider.GetRequiredService<V6.UpsertResourceTvpGenerator<ResourceMetadata>>();
var upsertResourceTvpGeneratorVLatest = serviceProvider.GetRequiredService<VLatest.UpsertResourceTvpGenerator<ResourceMetadata>>();
var upsertResourceTvpGeneratorV7 = serviceProvider.GetRequiredService<V7.UpsertResourceTvpGenerator<ResourceMetadata>>();
var upsertResourceTvpGeneratorVLatest = serviceProvider.GetRequiredService<VLatest.UpsertResourceTvpGenerator<IReadOnlyList<ResourceWrapper>>>();
var upsertSearchParamsTvpGenerator = serviceProvider.GetRequiredService<VLatest.UpsertSearchParamsTvpGenerator<List<ResourceSearchParameterStatus>>>();
var reindexResourceTvpGenerator = serviceProvider.GetRequiredService<VLatest.ReindexResourceTvpGenerator<IReadOnlyList<ResourceWrapper>>>();
var bulkReindexResourceTvpGenerator = serviceProvider.GetRequiredService<VLatest.BulkReindexResourcesTvpGenerator<IReadOnlyList<ResourceWrapper>>>();
_supportedSearchParameterDefinitionManager = new SupportedSearchParameterDefinitionManager(_searchParameterDefinitionManager);
var searchParameterToSearchValueTypeMap = new SearchParameterToSearchValueTypeMap();
SqlTransactionHandler = new SqlTransactionHandler();
SqlConnectionWrapperFactory = new SqlConnectionWrapperFactory(SqlTransactionHandler, new SqlCommandWrapperFactory(), sqlConnectionFactory);
@ -120,16 +127,19 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Persistence
() => SqlConnectionWrapperFactory.CreateMockScope(),
upsertSearchParamsTvpGenerator,
() => _filebasedSearchParameterStatusDataStore,
schemaInformation);
schemaInformation,
sqlServerFhirModel);
IOptions<CoreFeatureConfiguration> options = Options.Create(new CoreFeatureConfiguration());
_fhirDataStore = new SqlServerFhirDataStore(
config,
sqlServerFhirModel,
searchParameterToSearchValueTypeMap,
upsertResourceTvpGeneratorV6,
upsertResourceTvpGeneratorV7,
upsertResourceTvpGeneratorVLatest,
reindexResourceTvpGenerator,
bulkReindexResourceTvpGenerator,
options,
SqlConnectionWrapperFactory,
NullLogger<SqlServerFhirDataStore>.Instance,
@ -172,6 +182,15 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Persistence
fhirRequestContextAccessor,
NullLogger<SqlServerSearchService>.Instance);
ISearchParameterSupportResolver searchParameterSupportResolver = Substitute.For<ISearchParameterSupportResolver>();
searchParameterSupportResolver.IsSearchParameterSupported(Arg.Any<SearchParameterInfo>()).Returns((true, false));
_searchParameterStatusManager = new SearchParameterStatusManager(
SqlServerSearchParameterStatusDataStore,
_searchParameterDefinitionManager,
searchParameterSupportResolver,
mediator);
_testHelper = new SqlServerFhirStorageTestHelper(initialConnectionString, MasterDatabaseName, sqlServerFhirModel, sqlConnectionFactory);
}
@ -255,6 +274,11 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Persistence
return _schemaUpgradeRunner;
}
if (serviceType == typeof(SearchParameterStatusManager))
{
return _searchParameterStatusManager;
}
return null;
}
}

Просмотреть файл

@ -7,6 +7,7 @@ using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Data.SqlClient;
using Microsoft.Health.Core.Extensions;
using Microsoft.Health.Fhir.Tests.Common.FixtureParameters;
using Microsoft.Health.SqlServer.Features.Client;
using Xunit;
@ -30,6 +31,7 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Persistence
public async Task GivenATransactionScope_WhenReading_TheUncommittedValuesShouldOnlyBeAvailableWithTheTransactionAndWithHints()
{
var newId = Guid.NewGuid().ToString();
var searchParamHash = new string("RandomSearchParam").ComputeHash();
using (var transactionScope = _fixture.SqlTransactionHandler.BeginTransaction())
{
@ -38,9 +40,10 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Persistence
{
sqlCommandWrapper.CommandText = @"
INSERT INTO Resource
VALUES(97, @newId, 1, 0, 5095719085917680000, 0, null, CAST('test' AS VARBINARY(MAX)), 0)";
VALUES(97, @newId, 1, 0, 5095719085917680000, 0, null, CAST('test' AS VARBINARY(MAX)), 0, @searchParamHash)";
sqlCommandWrapper.Parameters.Add(new SqlParameter { ParameterName = "newId", Value = newId });
sqlCommandWrapper.Parameters.Add(new SqlParameter { ParameterName = "searchParamHash", Value = searchParamHash });
await sqlCommandWrapper.ExecuteNonQueryAsync(CancellationToken.None);
}
@ -81,6 +84,7 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Persistence
public async Task GivenATransactionScope_WhenReadingAfterComplete_TheValuesShouldBeAvailable()
{
var newId = Guid.NewGuid().ToString();
var searchParamHash = new string("RandomSearchParam").ComputeHash();
using (var transactionScope = _fixture.SqlTransactionHandler.BeginTransaction())
{
@ -89,9 +93,10 @@ namespace Microsoft.Health.Fhir.Tests.Integration.Persistence
{
sqlCommandWrapper.CommandText = @"
INSERT INTO Resource
VALUES(97, @newId, 1, 0, 5095719085917680001, 0, null, CAST('test' AS VARBINARY(MAX)), 0)";
VALUES(97, @newId, 1, 0, 5095719085917680001, 0, null, CAST('test' AS VARBINARY(MAX)), 0, @searchParamHash)";
sqlCommandWrapper.Parameters.Add(new SqlParameter { ParameterName = "newId", Value = newId });
sqlCommandWrapper.Parameters.Add(new SqlParameter { ParameterName = "searchParamHash", Value = searchParamHash });
await sqlCommandWrapper.ExecuteNonQueryAsync(CancellationToken.None);
}