fhir-loader/FHIRBulkImport/ExportAllOrchestrator.cs

469 строки
23 KiB
C#

using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using System.Linq;
using System;
using Newtonsoft.Json;
using System.Collections.Generic;
namespace FHIRBulkImport
{
public class ExportAllOrchestrator
{
private static int _exportResourceCount = Utils.GetIntEnvironmentVariable("FS-EXPORTRESOURCECOUNT", "1000");
private static string _storageAccount = Utils.GetEnvironmentVariable("FBI-STORAGEACCT");
private static int _maxInstances = Utils.GetIntEnvironmentVariable("FBI-MAXEXPORTS", "0");
private static int _maxParallelizationCount = 100;
private static int _parallelSearchBundleSize = _maxInstances = Utils.GetIntEnvironmentVariable("FBI-PARALLELSEARCHBUNDLESIZE", "50");
private static RetryOptions _exportAllRetryOptions = new RetryOptions(firstRetryInterval: TimeSpan.FromSeconds(Utils.GetIntEnvironmentVariable("FBI-EXPORTALLRETRYINTERVAL", "30")), maxNumberOfAttempts: 5)
{ BackoffCoefficient = Utils.GetIntEnvironmentVariable("FBI-EXPORTALLBACKOFFCOEFFICIENT", "3") };
[FunctionName(nameof(ExportAllOrchestrator_HttpStart))]
public async Task<HttpResponseMessage> ExportAllOrchestrator_HttpStart(
[HttpTrigger(AuthorizationLevel.Function, "post", Route = "$alt-export-all")] HttpRequestMessage req,
[DurableClient] IDurableOrchestrationClient starter,
ILogger log)
{
string requestParameters = await req.Content.ReadAsStringAsync();
var state = await ExportOrchestrator.runningInstances(starter, log);
int running = state.Count();
if (_maxInstances > 0 && running >= _maxInstances)
{
string msg = $"Unable to start export there are {running} exports the max concurrent allowed is {_maxInstances}";
StringContent sc = new StringContent("{\"error\":\"" + msg + "\"");
return new HttpResponseMessage() { Content = sc, StatusCode = System.Net.HttpStatusCode.TooManyRequests };
}
// Function input comes from the request content.
string instanceId = await starter.StartNewAsync(nameof(ExportAll_RunOrchestrator), null, requestParameters);
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
return starter.CreateCheckStatusResponse(req, instanceId);
}
[FunctionName(nameof(ExportAll_RunOrchestrator))]
public async Task<JObject> ExportAll_RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context,
ILogger logger)
{
// Setup function level variables.
JObject retVal = new JObject();
retVal["instanceid"] = context.InstanceId;
var fileTracker = new Dictionary<string, int>();
ExportAllOrchestratorOptions options;
try
{
string inputJson = context.GetInput<string>();
options = ParseOptions(inputJson, context.CurrentUtcDateTime);
logger.LogInformation($"Got options: {JsonConvert.SerializeObject(options)}");
}
catch (Exception ex)
{
retVal["error"] = $"Error while parsing body. Please check the inputs and try again. Error: {ex.Message}. Trace: {ex.StackTrace}";
return retVal;
}
// If we aren't running in parallel, seed the ranges with a single value for the request.
List<(DateTime? Start, DateTime? End)> searchRanges = new()
{
(options.Since, options.Till)
};
// For parallel requests, we need to find ranges.
if (options.ParallelSearchRanges is not null && options.ParallelizationCount is not null)
{
// Signal function start in the durable task status object.
retVal["parallelizationLogicStarted"] = context.CurrentUtcDateTime;
context.SetCustomStatus(retVal);
var parallelizationTasks = options.ParallelSearchRanges.Select(
x => context.CallActivityWithRetryAsync<List<(DateTime Start, DateTime End, int Count)>>(
nameof(ExportAllOrchestrator_GetCountsForListOfDateRanges),
_exportAllRetryOptions,
new GetCountsForListOfDateRangesRequest(options.ResourceType, x.Select(y => (y.Start, y.End, -1)).ToList())));
var results = await Task.WhenAll(parallelizationTasks);
var flattenedResult = FlattenCountsByParallelizationCount(results, options.ParallelizationCount.Value);
searchRanges = flattenedResult.Select(x => ((DateTime?)x.Start, (DateTime?)x.End)).ToList();
}
// Signal function start in the durable task status object.
retVal["exportStarted"] = context.CurrentUtcDateTime;
context.SetCustomStatus(retVal);
// Build the search Urls for the export
List<Task<DataPageResult>> exportTasks = new();
for (int i = 0; i < searchRanges.Count; i++)
{
string nextLink = $"{options.BaseAddress}{options.ResourceType}?_count={_exportResourceCount}";
// Add custom date range if given.
if (searchRanges[i].Start is not null)
{
nextLink += $"&_lastUpdated=ge{searchRanges[i].Start.Value.ToString("o")}";
}
if (searchRanges[i].End is not null)
{
nextLink += $"&_lastUpdated=lt{searchRanges[i].End.Value.ToString("o")}";
}
exportTasks.Add(context.CallActivityWithRetryAsync<DataPageResult>(
nameof(ExportAllOrchestrator_GetAndWriteDataPage),
_exportAllRetryOptions,
new DataPageRequest(FhirRequestPath: nextLink, InstanceId: context.InstanceId, ParallelTaskIndex: i, ResourceType: options.ResourceType, Audience: options.Audience)));
}
// Start and continue export as long as there are pending tasks
while (exportTasks.Count > 0)
{
// Wait for the next task to complete
Task<DataPageResult> completedTask = await Task.WhenAny(exportTasks);
// Remove the completed task from the list
exportTasks.Remove(completedTask);
// Get the result of the completed sub-orchestrator
DataPageResult fhirResult = await completedTask;
if (fhirResult.ResourceCount < 0)
{
retVal["error"] = fhirResult.Error;
context.SetCustomStatus(retVal);
return retVal;
}
else if (fhirResult.ResourceCount > 0)
{
if (fileTracker.ContainsKey(fhirResult.BlobUrl))
{
fileTracker[fhirResult.BlobUrl] = fileTracker[fhirResult.BlobUrl] + fhirResult.ResourceCount;
}
else
{
fileTracker[fhirResult.BlobUrl] = fhirResult.ResourceCount;
}
}
// Attempt to add output array - untested.
// Putting every file in the array was too large. Durable functions have a max payload size of 16KB. After about 50 files it started throwing errors.
// Adding the url of the first exported file as a sample url, the other files can be found based on it.
if (fileTracker.Count > 0)
{
retVal["exportFilesSampleUrl"] = fileTracker.First().Key;
}
// Update durable function status
retVal["exportFilesCompleted"] = fileTracker.Keys.Count;
retVal["exportResourceCount"] = fileTracker.Values.Sum();
context.SetCustomStatus(retVal);
if (fhirResult.NextLink != null)
{
exportTasks.Add(context.CallActivityWithRetryAsync<DataPageResult>(
nameof(ExportAllOrchestrator_GetAndWriteDataPage),
_exportAllRetryOptions,
new DataPageRequest(FhirRequestPath: fhirResult.NextLink, InstanceId: fhirResult.InstanceId, ParallelTaskIndex: fhirResult.ParallelTaskIndex, ResourceType: options.ResourceType, Audience: options.Audience)));
}
}
// Report completed export
retVal["exportCompleted"] = context.CurrentUtcDateTime;
// Remove details from the custom status to avoid payload bloat / duplication.
var completedStatus = new JObject();
completedStatus["status"] = "Success";
context.SetCustomStatus(completedStatus);
logger.LogInformation($"Completed orchestration with ID = '{context.InstanceId}'.");
return retVal;
}
[FunctionName(nameof(ExportAllOrchestrator_GetCountsForListOfDateRanges))]
public async Task<List<(DateTime Start, DateTime End, int Count)>> ExportAllOrchestrator_GetCountsForListOfDateRanges(
[ActivityTrigger] GetCountsForListOfDateRangesRequest input,
ILogger logger)
{
JObject requestBody = new();
requestBody["resourceType"] = "Bundle";
requestBody["type"] = "batch";
JArray entries = new();
foreach (var item in input.SearchRangeList)
{
JObject request = new JObject();
request["method"] = "GET";
request["url"] = $"{input.ResourceType}?_lastUpdated=ge{item.Start.ToString("o")}&_lastUpdated=lt{item.End.ToString("o")}&_summary=count";
JObject entry = new JObject();
entry["request"] = request;
entries.Add(entry);
}
requestBody["entry"] = entries;
logger.LogTrace($"Running parallelization logic bundle: {requestBody}");
var response = await FHIRUtils.CallFHIRServer("", requestBody.ToString(), HttpMethod.Post, logger, null);
logger.LogTrace($"Parallelization logic response: {response.Content}");
if (response.Success && !string.IsNullOrEmpty(response.Content))
{
List<(DateTime start, DateTime end, int count)> resp = new();
var result = JObject.Parse(response.Content);
var entry = result["entry"];
if (entry is null || entry is not JArray entryArray || input.SearchRangeList.Count != entryArray.Count)
{
string exceptionMessage = $"Did not get matching result set back for {nameof(ExportAllOrchestrator_GetCountsForListOfDateRanges)}. EntryExists: {entry is not null}";
if (entry is not null)
{
exceptionMessage += $", Entry Type: {entry.GetType()}";
}
if (entry is JArray entryArray2)
{
exceptionMessage += $", EntryCount: {entryArray2.Count}";
}
throw new Exception(exceptionMessage);
}
for (int i = 0; i < input.SearchRangeList.Count; i++)
{
var singleEntry = ((JArray)entry)[i];
resp.Add((input.SearchRangeList[i].Start, input.SearchRangeList[i].End, (int)singleEntry["resource"]["total"]));
}
return resp;
}
string message = $"{nameof(ExportAllOrchestrator_GetCountsForListOfDateRanges)}: FHIR Server Call Failed: {response.Status} Content:{response.Content}";
logger.LogError(message);
throw new Exception(message);
}
[FunctionName(nameof(ExportAllOrchestrator_GetAndWriteDataPage))]
public async Task<DataPageResult> ExportAllOrchestrator_GetAndWriteDataPage(
[ActivityTrigger] DataPageRequest input,
[DurableClient] IDurableEntityClient ec,
ILogger logger)
{
logger.LogInformation($"Fetching page of resources using query {input.FhirRequestPath}");
var response = await FHIRUtils.CallFHIRServer(input.FhirRequestPath, "", HttpMethod.Get, logger, input.Audience);
if (response != null && response.Success && !string.IsNullOrEmpty(response.Content))
{
// Parse the content and try to find the continuation token.
var result = JObject.Parse(response.Content);
var nextLinkObject = result["link"]?.FirstOrDefault(link => (string)link["relation"] == "next");
var nextLinkUrl = nextLinkObject != null ? nextLinkObject.Value<string>("url") : null;
var entry = ((JArray)result["entry"]);
if (entry == null || entry.Count < 1)
{
logger.LogWarning($"Zero result bundle returned for query {input.FhirRequestPath}. Ensure your inputs will return data.");
return new DataPageResult(nextLinkUrl, 0, null, input.InstanceId, input.ParallelTaskIndex, null);
}
// Write bundle resources to NDJSON using the file manager - this will prevent many small files.
ExportOrchestrator.ConvertToNDJSONResponse? ndjsonResult = null;
try
{
logger.LogInformation($"Bundle response successfully received. Writing it to NDJSON. InstanceId: {input.InstanceId}, ResourceType: {input.ResourceType}, ParallelFileId: {input.ParallelTaskIndex}, Query: {input.FhirRequestPath}.");
ndjsonResult = await ExportOrchestrator.ConvertToNDJSON(result, input.InstanceId, input.ResourceType, ec, logger, input.ParallelTaskIndex);
}
catch (Exception ex)
{
string exceptionMessage = $"Unhandled error occurred in ConvertToNDJSON. Exception: {ex.Message}, InnerException: {ex.InnerException?.Message}, Trace: {ex.StackTrace}";
return new DataPageResult(null, -1, null, input.InstanceId, input.ParallelTaskIndex, exceptionMessage);
}
// Placeholder in case any issues are found writing to NDJSON.
if (ndjsonResult is null)
{
return new DataPageResult(null, -1, null, input.InstanceId, input.ParallelTaskIndex, "ConvertToNDJSON returned null unexpectedly");
}
logger.LogInformation($"ExportAll: FHIR Server Call Succeeded: {response.Status} Next: {nextLinkUrl}, Count: {ndjsonResult.Value.ResourceCount}");
return new DataPageResult(nextLinkUrl, ndjsonResult.Value.ResourceCount, ndjsonResult.Value.BlobUrl, input.InstanceId, input.ParallelTaskIndex, null);
}
string message = $"ExportAll: FHIR Server Call Failed: {response?.Status} Content:{response?.Content} Query:{input.FhirRequestPath}";
logger.LogError(message);
return new DataPageResult(null, -1, null, input.InstanceId, input.ParallelTaskIndex, message); ;
}
private static List<List<(DateTime start, DateTime end)>> GetSearchRanges(DateTime start, DateTime end, int rangeSizeInSeconds)
{
List<(DateTime start, DateTime end)> searchRanges = new();
for (DateTime currentStart = start; currentStart < end; currentStart = currentStart.AddSeconds(rangeSizeInSeconds))
{
DateTime currentEnd = currentStart.AddSeconds(rangeSizeInSeconds);
searchRanges.Add((currentStart, currentEnd));
if (currentEnd >= end)
{
break;
}
}
List<List<(DateTime start, DateTime end)>> parallelSearchRanges = new();
for (int i = 0; i < searchRanges.Count; i += _parallelSearchBundleSize)
{
int remainingElements = searchRanges.Count - i;
if (remainingElements > 0)
{
var range = searchRanges.GetRange(i, Math.Min(_parallelSearchBundleSize, remainingElements));
parallelSearchRanges.Add(range);
}
}
return parallelSearchRanges;
}
private static ExportAllOrchestratorOptions ParseOptions(string inputs, DateTime executionTime)
{
// Get the input from the request to the function.
JObject requestParameters;
try
{
requestParameters = JObject.Parse(inputs);
}
catch (JsonReaderException jre)
{
throw new ArgumentException($"Not a valid JSON Object from starter input:{jre.Message}");
}
// Get and validate the time range for the export if given.
string sinceStr = requestParameters["_since"]?.ToString(), tillStr = requestParameters["_till"]?.ToString();
DateTime since = default, till = default;
if ((sinceStr is not null && !DateTime.TryParse(sinceStr, out since)) ||
(tillStr is not null && !DateTime.TryParse(tillStr, out till)))
{
throw new ArgumentException($"Invalid input for _since or _till parameter. _since: {sinceStr ?? string.Empty} _till: {tillStr ?? string.Empty}");
}
string baseAddress = requestParameters["_baseAddress"]?.ToString();
if (baseAddress is null)
{
baseAddress = string.Empty;
}
else if (!baseAddress.EndsWith('/'))
{
baseAddress += "/";
}
ExportAllOrchestratorOptions options = new(
ResourceType: requestParameters["_type"]?.ToString(),
Since: since == default ? null : since,
Till: till == default ? null : till,
BaseAddress: baseAddress,
Audience: requestParameters["_audience"]?.ToString(),
ParallelizationCount: null,
ParallelSearchInSecondsInterval: null,
ParallelSearchRanges: null); ;
// We only allow execution for a certain resource type.
if (string.IsNullOrEmpty(options.ResourceType))
{
throw new ArgumentException("_type is null. It must be provided to execute this function.");
}
if (requestParameters["_parallelizationCount"] is not null)
{
if (!int.TryParse(requestParameters["_parallelizationCount"].ToString(), out int parallelizationCountParsed) || parallelizationCountParsed < 1 || parallelizationCountParsed > _maxParallelizationCount)
{
throw new ArgumentException($"Invalid parallelization count interval received: {requestParameters["_parallelizationCount"]}");
}
options.ParallelizationCount = parallelizationCountParsed;
}
if (requestParameters["_parallelSearchInSecondsInterval"] is not null)
{
if (!int.TryParse(requestParameters["_parallelSearchInSecondsInterval"].ToString(), out int parallelSearchInSecondsInterval) || parallelSearchInSecondsInterval < 1)
{
throw new ArgumentException($"Invalid parallel search count interval received: {requestParameters["_parallelSearchInSecondsInterval"]}");
}
options.ParallelSearchInSecondsInterval = parallelSearchInSecondsInterval;
}
if (options.Since is not null && options.ParallelizationCount is not null && options.ParallelSearchInSecondsInterval is not null)
{
options.ParallelSearchRanges = GetSearchRanges(options.Since.Value, options.Till ?? executionTime, options.ParallelSearchInSecondsInterval.Value);
}
else if (options.ParallelizationCount is not null || options.ParallelSearchInSecondsInterval is not null)
{
throw new ArgumentException("_parallelizationCount, _parallelSearchInSecondsInterval, and _since must all be specified for parallel export.");
}
return options;
}
private List<(DateTime Start, DateTime End, int Count)> FlattenCountsByParallelizationCount(
List<(DateTime Start, DateTime End, int Count)>[] input,
int parallelizationCount)
{
// Flatten the array of lists into a single list
var flatList = input.SelectMany(x => x).OrderBy(x => x.Start).ToList();
// Initialize the combined list
var combinedList = new List<(DateTime Start, DateTime End, int Count)>();
var targetCountPerEachParallelExecution = flatList.Sum(x => x.Count) / parallelizationCount;
// Iterate through the flat list to combine adjacent ranges
foreach (var tuple in flatList)
{
// Check if the last added range can be combined with the current tuple
if (combinedList.Count > 0 &&
combinedList.Last().End == tuple.Start &&
combinedList.Last().Count + tuple.Count <= targetCountPerEachParallelExecution)
{
var last = combinedList.Last();
combinedList.RemoveAt(combinedList.Count - 1); // Remove the last tuple
combinedList.Add((last.Start, tuple.End, last.Count + tuple.Count)); // Add the combined tuple
}
else
{
// Add the current tuple as-is
combinedList.Add(tuple);
}
}
return combinedList;
}
}
public record struct DataPageRequest(string FhirRequestPath, string InstanceId, int? ParallelTaskIndex, string ResourceType, string Audience);
public record struct DataPageResult(string NextLink, int ResourceCount, string BlobUrl, string InstanceId, int? ParallelTaskIndex, string Error);
public record struct ResultUpdateRequest(string InstanceId, string ResourceType, int ResourceCount, string NextLink);
public record struct WriteCompletionStatusRequest(string InstanceId, JObject StatusObject);
public record struct ExportAllOrchestratorOptions(string ResourceType, DateTime? Since, DateTime? Till, string BaseAddress, string Audience, int? ParallelizationCount, int? ParallelSearchInSecondsInterval, List<List<(DateTime Start, DateTime End)>> ParallelSearchRanges);
public record struct GetCountsForListOfDateRangesRequest(string ResourceType, List<(DateTime Start, DateTime End, int Count)> SearchRangeList);
}