/* Copyright (c) Microsoft Corporation. All rights reserved.
Licensed under the MIT License. */
using log4net;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Polly;
using Quartz;
using RecurringIntegrationsScheduler.Common.Contracts;
using RecurringIntegrationsScheduler.Common.Helpers;
using RecurringIntegrationsScheduler.Common.JobSettings;
using RecurringIntegrationsScheduler.Job.Properties;
using System;
using System.Collections.Concurrent;
using System.Globalization;
using System.IO;
using System.IO.Compression;
using System.Threading.Tasks;
using System.Xml;
namespace RecurringIntegrationsScheduler.Job
{
///
/// Job that uploads data packages using new method introduced in platform update 5
///
///
[DisallowConcurrentExecution]
public class Import : IJob
{
///
/// The log
///
private static readonly ILog Log = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
///
/// The settings
///
private readonly ImportJobSettings _settings = new ImportJobSettings();
///
/// The HTTP client helper
///
private HttpClientHelper _httpClientHelper;
///
/// Job execution context
///
private IJobExecutionContext _context;
///
/// Gets or sets the input queue.
///
///
/// The input queue.
///
private ConcurrentQueue InputQueue { get; set; }
///
/// Retry policy for IO operations
///
private Policy _retryPolicyForIo;
///
/// Called by the when a
/// fires that is associated with the .
///
/// The execution context.
/// false
///
/// The implementation may wish to set a result object on the
/// JobExecutionContext before this method exits. The result itself
/// is meaningless to Quartz, but may be informative to
/// s or
/// s that are watching the job's
/// execution.
///
public async Task Execute(IJobExecutionContext context)
{
try
{
log4net.Config.XmlConfigurator.Configure();
_context = context;
_settings.Initialize(context);
if (_settings.IndefinitePause)
{
await context.Scheduler.PauseJob(context.JobDetail.Key);
Log.InfoFormat(CultureInfo.InvariantCulture,
string.Format(Resources.Job_0_was_paused_indefinitely, _context.JobDetail.Key));
return;
}
_retryPolicyForIo = Policy.Handle().WaitAndRetry(
retryCount: _settings.RetryCount,
sleepDurationProvider: attempt => TimeSpan.FromSeconds(_settings.RetryDelay),
onRetry: (exception, calculatedWaitDuration) =>
{
Log.WarnFormat(CultureInfo.InvariantCulture, string.Format(Resources.Job_0_Retrying_IO_operation_Exception_1, _context.JobDetail.Key, exception.Message));
});
if (_settings.LogVerbose || Log.IsDebugEnabled)
{
Log.DebugFormat(CultureInfo.InvariantCulture, string.Format(Resources.Job_0_starting, _context.JobDetail.Key));
}
await Process();
if (_settings.LogVerbose || Log.IsDebugEnabled)
{
Log.DebugFormat(CultureInfo.InvariantCulture, string.Format(Resources.Job_0_ended, _context.JobDetail.Key));
}
}
catch (Exception ex)
{
if (_settings.PauseJobOnException)
{
await context.Scheduler.PauseJob(context.JobDetail.Key);
Log.WarnFormat(CultureInfo.InvariantCulture, string.Format(Resources.Job_0_was_paused_because_of_error, _context.JobDetail.Key));
}
if (_settings.LogVerbose || Log.IsDebugEnabled)
{
if (!string.IsNullOrEmpty(ex.Message))
{
Log.Error(ex.Message, ex);
}
}
if (context.Scheduler.SchedulerName != "Private")
{
throw new JobExecutionException(string.Format(Resources.Import_job_0_failed, _context.JobDetail.Key), ex, false);
}
Log.Error(string.Format(Resources.Job_0_thrown_an_error_1, _context.JobDetail.Key, ex.Message));
}
}
///
/// Processes this instance.
///
///
private async Task Process()
{
InputQueue = new ConcurrentQueue();
foreach (
var dataMessage in FileOperationsHelper.GetFiles(MessageStatus.Input, _settings.InputDir, _settings.SearchPattern, SearchOption.AllDirectories, _settings.OrderBy, _settings.ReverseOrder))
{
if (_settings.LogVerbose || Log.IsDebugEnabled)
{
Log.DebugFormat(CultureInfo.InvariantCulture, string.Format(Resources.Job_0_File_1_found_in_input_location, _context.JobDetail.Key, dataMessage.FullPath.Replace(@"{", @"{{").Replace(@"}", @"}}")));
}
InputQueue.Enqueue(dataMessage);
}
if (!InputQueue.IsEmpty)
{
Log.InfoFormat(CultureInfo.InvariantCulture, string.Format(Resources.Job_0_Found_1_file_s_in_input_folder, _context.JobDetail.Key, InputQueue.Count));
await ProcessInputQueue();
}
}
///
/// Processes input queue
///
///
/// Task object for continuation
///
private async Task ProcessInputQueue()
{
using (_httpClientHelper = new HttpClientHelper(_settings))
{
var fileCount = 0;
string fileNameInPackageTemplate = "";
FileStream zipToOpen = null;
ZipArchive archive = null;
if (_settings.InputFilesArePackages == false)
{
fileNameInPackageTemplate = GetFileNameInPackageTemplate();
if (string.IsNullOrEmpty(fileNameInPackageTemplate))
{
throw new JobExecutionException(string.Format(Resources.Job_0_Please_check_your_package_template_Input_file_name_in_Manifest_cannot_be_identified, _context.JobDetail.Key));
}
}
while (InputQueue.TryDequeue(out DataMessage dataMessage))
{
try
{
if (fileCount > 0 && _settings.DelayBetweenFiles > 0) //Only delay after first file and never after last.
{
System.Threading.Thread.Sleep(TimeSpan.FromSeconds(_settings.DelayBetweenFiles));
}
fileCount++;
var sourceStream = _retryPolicyForIo.Execute(() => FileOperationsHelper.Read(dataMessage.FullPath));
if (sourceStream == null) continue;//Nothing to do here
string tempFileName = "";
//If we need to "wrap" file in package envelope
if (_settings.InputFilesArePackages == false)
{
using (zipToOpen = new FileStream(_settings.PackageTemplate, FileMode.Open))
{
tempFileName = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString());
_retryPolicyForIo.Execute(() => FileOperationsHelper.Create(zipToOpen, tempFileName));
var tempZipStream = _retryPolicyForIo.Execute(() => FileOperationsHelper.Read(tempFileName));
using (archive = new ZipArchive(tempZipStream, ZipArchiveMode.Update))
{
//Check if package template contains input file and remove it first. It should not be there in the first place.
ZipArchiveEntry entry = archive.GetEntry(fileNameInPackageTemplate);
if (entry != null)
{
entry.Delete();
Log.WarnFormat(CultureInfo.InvariantCulture, string.Format(Resources.Job_0_Package_template_contains_input_file_1_Please_remove_it_from_the_template, _context.JobDetail.Key, fileNameInPackageTemplate));
}
// Update Manifest file with the original file name for end-to-end traceability. Use the new file name in the rest of the method.
fileNameInPackageTemplate = UpdateManifestFile(archive, dataMessage, fileNameInPackageTemplate);
var importedFile = archive.CreateEntry(fileNameInPackageTemplate, CompressionLevel.Fastest);
using var entryStream = importedFile.Open();
sourceStream.CopyTo(entryStream);
sourceStream.Close();
sourceStream.Dispose();
}
sourceStream = _retryPolicyForIo.Execute(() => FileOperationsHelper.Read(tempFileName));
}
}
if (Log.IsDebugEnabled)
Log.DebugFormat(CultureInfo.InvariantCulture, string.Format(Resources.Job_0_Uploading_file_1_File_size_2_bytes, _context.JobDetail.Key, dataMessage.FullPath.Replace(@"{", @"{{").Replace(@"}", @"}}"), sourceStream.Length));
// Get blob url and id. Returns in json format
var response = await _httpClientHelper.GetAzureWriteUrl();
if(!response.IsSuccessStatusCode)
{
throw new JobExecutionException($"Job: {_settings.JobKey}. Request GetAzureWriteUrl failed.");
}
var blobInfo = (JObject)JsonConvert.DeserializeObject(HttpClientHelper.ReadResponseString(response));
var blobUrl = blobInfo["BlobUrl"].ToString();
var blobUri = new Uri(blobUrl);
//Upload package to blob storage
var uploadResponse = await _httpClientHelper.UploadContentsToBlob(blobUri, sourceStream);
if (sourceStream != null)
{
sourceStream.Close();
sourceStream.Dispose();
if (!_settings.InputFilesArePackages)//if we wraped file in package envelop we need to delete temp file
{
_retryPolicyForIo.Execute(() => FileOperationsHelper.Delete(tempFileName));
}
}
if (uploadResponse.IsSuccessStatusCode)
{
//Now send import request
var targetLegalEntity = _settings.Company;
if (_settings.MultiCompanyImport && _settings.GetLegalEntityFromSubfolder)
{
targetLegalEntity = new FileInfo(dataMessage.FullPath).Directory.Name;
}
if (_settings.MultiCompanyImport && _settings.GetLegalEntityFromFilename)
{
String[] separator = { _settings.FilenameSeparator };
var tokenList = dataMessage.Name.Split(separator, 10, StringSplitOptions.RemoveEmptyEntries);
targetLegalEntity = tokenList[_settings.LegalEntityTokenPosition - 1];
}
if(targetLegalEntity.Length > 4)
{
Log.ErrorFormat(CultureInfo.InvariantCulture, string.Format(Resources.Job_0_Target_legal_entity_is_not_valid_1, _context.JobDetail.Key, targetLegalEntity));
}
if(string.IsNullOrEmpty(targetLegalEntity))
{
throw new Exception(string.Format(Resources.Job_0_Unable_to_get_target_legal_entity_name, _context.JobDetail.Key));
}
var executionIdGenerated = CreateExecutionId(_settings.DataProject);
var importResponse = await _httpClientHelper.ImportFromPackage(blobUri.AbsoluteUri, _settings.DataProject, executionIdGenerated, _settings.ExecuteImport, _settings.OverwriteDataProject, targetLegalEntity);
if (importResponse.IsSuccessStatusCode)
{
var result = importResponse.Content.ReadAsStringAsync().Result;
var jsonResponse = (JObject)JsonConvert.DeserializeObject(result);
string executionId = jsonResponse["value"].ToString();
var targetDataMessage = new DataMessage(dataMessage)
{
MessageId = executionId,
FullPath = dataMessage.FullPath.Replace(_settings.InputDir, _settings.UploadSuccessDir),
MessageStatus = MessageStatus.Enqueued
};
// Move to inprocess/success location
_retryPolicyForIo.Execute(() => FileOperationsHelper.Move(dataMessage.FullPath, targetDataMessage.FullPath));
if (_settings.ExecutionJobPresent)
_retryPolicyForIo.Execute(() => FileOperationsHelper.WriteStatusFile(targetDataMessage, _settings.StatusFileExtension));
if (Log.IsDebugEnabled)
Log.DebugFormat(CultureInfo.InvariantCulture, string.Format(Resources.Job_0_File_1_uploaded_successfully, _context.JobDetail.Key, dataMessage.FullPath.Replace(@"{", @"{{").Replace(@"}", @"}}")));
}
else
{
// import request failed. Move message to error location.
Log.ErrorFormat(CultureInfo.InvariantCulture, string.Format(Resources.Job_0_Upload_failed_for_file_1_Failure_response_Status_2_Reason_3, _context.JobDetail.Key, dataMessage.FullPath.Replace(@"{", @"{{").Replace(@"}", @"}}"), importResponse.StatusCode, importResponse.ReasonPhrase, $"{Environment.NewLine}packageUrl: {blobUri.AbsoluteUri}{Environment.NewLine}definitionGroupId: {_settings.DataProject}{Environment.NewLine}executionId: {executionIdGenerated}{Environment.NewLine}execute: {_settings.ExecuteImport}{Environment.NewLine}overwrite: {_settings.OverwriteDataProject}{Environment.NewLine}legalEntityId: {targetLegalEntity}"));
var targetDataMessage = new DataMessage(dataMessage)
{
FullPath = dataMessage.FullPath.Replace(_settings.InputDir, _settings.UploadErrorsDir),
MessageStatus = MessageStatus.Failed
};
// Move data to error location
_retryPolicyForIo.Execute(() => FileOperationsHelper.Move(dataMessage.FullPath, targetDataMessage.FullPath));
// Save the log with import failure details
_retryPolicyForIo.Execute(() => FileOperationsHelper.WriteStatusLogFile(targetDataMessage, importResponse, _settings.StatusFileExtension));
}
}
else
{
// upload failed. Move message to error location.
Log.ErrorFormat(CultureInfo.InvariantCulture, string.Format(Resources.Job_0_Upload_failed_for_file_1_Failure_response_Status_2_Reason_3, _context.JobDetail.Key, dataMessage.FullPath.Replace(@"{", @"{{").Replace(@"}", @"}}"), uploadResponse.StatusCode, uploadResponse.ReasonPhrase));
var targetDataMessage = new DataMessage(dataMessage)
{
FullPath = dataMessage.FullPath.Replace(_settings.InputDir, _settings.UploadErrorsDir),
MessageStatus = MessageStatus.Failed
};
// Move data to error location
_retryPolicyForIo.Execute(() => FileOperationsHelper.Move(dataMessage.FullPath, targetDataMessage.FullPath));
// Save the log with import failure details
_retryPolicyForIo.Execute(() => FileOperationsHelper.WriteStatusLogFile(targetDataMessage, uploadResponse, _settings.StatusFileExtension));
}
}
catch (Exception ex)
{
Log.ErrorFormat(CultureInfo.InvariantCulture, string.Format(Resources.Job_0_Failure_processing_file_1_Exception_2, _context.JobDetail.Key, dataMessage.FullPath.Replace(@"{", @"{{").Replace(@"}", @"}}"), ex.Message), ex);
throw;
}
finally
{
if (zipToOpen != null)
{
zipToOpen.Close();
zipToOpen.Dispose();
}
archive?.Dispose();
}
}
}
}
private string GetFileNameInPackageTemplate()
{
using var package = ZipFile.OpenRead(_settings.PackageTemplate);
foreach (var entry in package.Entries)
{
if (entry.FullName.Equals("Manifest.xml", StringComparison.OrdinalIgnoreCase))
{
var manifestPath = Path.Combine(Path.GetTempPath(), $"{_context.JobDetail.Key}-{entry.FullName}");
entry.ExtractToFile(manifestPath, true);
var doc = new XmlDocument();
using var manifestFile = new StreamReader(manifestPath);
doc.Load(new XmlTextReader(manifestFile) { Namespaces = false });
return doc.SelectSingleNode("//InputFilePath[1]")?.InnerText;
}
}
return null;
}
///
/// Updates the Manifest.xml file of the datapackage to be uploaded with the original file name of the related data message before the upload
///
/// The data package which is being prepared for upload
/// The data message object containing the inforation related to the file which is getting uploaded
/// Original file name in the package
/// Final file name in the package
private string UpdateManifestFile(ZipArchive archive, DataMessage dataMessage, string fileNameInPackageOrig)
{
if (archive is null || dataMessage is null)
{
return fileNameInPackageOrig;
}
// This modification will cause that the original file name is used in the package that is going to be uploaded to D365
// Get the manifest.xml entery
ZipArchiveEntry manifestEntry = archive.GetEntry("Manifest.xml");
// Save the Manifest.xml as temporary file
string tempManifestFileName = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString());
string tempManifestFileNameNew = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString());
manifestEntry.ExtractToFile(tempManifestFileName, true);
// Modify the file name to the original filename
XmlDocument tempXmlDocManifest = new XmlDocument();
using (var tempManifestFile = new StreamReader(tempManifestFileName))
{
tempXmlDocManifest.Load(new XmlTextReader(tempManifestFile) { Namespaces = false });
tempXmlDocManifest.SelectSingleNode("//InputFilePath[1]").InnerText = Path.GetFileName(dataMessage.FullPath);
// Save the document to a file and auto-indent the output.
using XmlTextWriter writer = new XmlTextWriter(tempManifestFileNameNew, null)
{
Namespaces = false,
Formatting = System.Xml.Formatting.Indented
};
tempXmlDocManifest.Save(writer);
}
// Delete the Manifest.xml from the archive file
manifestEntry.Delete();
// Add a new Manifest.xml based on the adjusted file
archive.CreateEntryFromFile(tempManifestFileNameNew, "Manifest.xml");
// Delete the tempoirary file
File.Delete(tempManifestFileName);
File.Delete(tempManifestFileNameNew);
// Adapt the fileNameInPackage
string fileNameInPackage = Path.GetFileName(dataMessage.FullPath);
// Check if package template contains input file and remove it first. It should not be there in the first place.
ZipArchiveEntry entry = archive.GetEntry(fileNameInPackage);
if (entry != null)
{
entry.Delete();
Log.WarnFormat(CultureInfo.InvariantCulture, string.Format(Resources.Job_0_Package_template_contains_input_file_1_Please_remove_it_from_the_template, fileNameInPackage));
}
return fileNameInPackage;
}
private static string CreateExecutionId(string dataProject)
{
return $"{dataProject}-{DateTime.Now:yyyy-MM-dd_HH-mm-ss}-{Guid.NewGuid()}";
}
}
}