Adding logging support in place of console output
This commit is contained in:
Родитель
9fa5ca9123
Коммит
aba2d6d2a1
|
@ -13,6 +13,7 @@
|
||||||
<PackageReference Include="Microsoft.Data.SqlClient" Version="5.0.0" />
|
<PackageReference Include="Microsoft.Data.SqlClient" Version="5.0.0" />
|
||||||
<PackageReference Include="Microsoft.Extensions.Configuration.UserSecrets" Version="6.0.1" />
|
<PackageReference Include="Microsoft.Extensions.Configuration.UserSecrets" Version="6.0.1" />
|
||||||
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.1" />
|
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.1" />
|
||||||
|
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="6.0.0" />
|
||||||
<PackageReference Include="System.ComponentModel.Composition" Version="6.0.0" />
|
<PackageReference Include="System.ComponentModel.Composition" Version="6.0.0" />
|
||||||
<PackageReference Include="System.Configuration.ConfigurationManager" Version="6.0.0" />
|
<PackageReference Include="System.Configuration.ConfigurationManager" Version="6.0.0" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
|
@ -3,6 +3,7 @@ using Microsoft.DataTransfer.Interfaces;
|
||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.Extensions.Hosting;
|
using Microsoft.Extensions.Hosting;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
namespace Microsoft.DataTransfer.Core;
|
namespace Microsoft.DataTransfer.Core;
|
||||||
|
|
||||||
|
@ -18,6 +19,8 @@ class Program
|
||||||
.Build();
|
.Build();
|
||||||
|
|
||||||
IConfiguration configuration = host.Services.GetRequiredService<IConfiguration>();
|
IConfiguration configuration = host.Services.GetRequiredService<IConfiguration>();
|
||||||
|
var loggerFactory = host.Services.GetRequiredService<ILoggerFactory>();
|
||||||
|
var log = loggerFactory.CreateLogger<Program>();
|
||||||
|
|
||||||
var options = configuration.Get<DataTransferOptions>();
|
var options = configuration.Get<DataTransferOptions>();
|
||||||
|
|
||||||
|
@ -34,7 +37,7 @@ class Program
|
||||||
var sources = LoadExtensions<IDataSourceExtension>(container);
|
var sources = LoadExtensions<IDataSourceExtension>(container);
|
||||||
var sinks = LoadExtensions<IDataSinkExtension>(container);
|
var sinks = LoadExtensions<IDataSinkExtension>(container);
|
||||||
|
|
||||||
Console.WriteLine($"{sources.Count + sinks.Count} Extensions Loaded");
|
log.LogInformation("{TotalExtensionCount} Extensions Loaded", sources.Count + sinks.Count);
|
||||||
|
|
||||||
var source = GetExtensionSelection(options.Source, sources, "Source");
|
var source = GetExtensionSelection(options.Source, sources, "Source");
|
||||||
var sourceConfig = BuildSettingsConfiguration(configuration, options.SourceSettingsPath, $"{source.DisplayName}SourceSettings", options.Source == null);
|
var sourceConfig = BuildSettingsConfiguration(configuration, options.SourceSettingsPath, $"{source.DisplayName}SourceSettings", options.Source == null);
|
||||||
|
@ -42,10 +45,10 @@ class Program
|
||||||
var sink = GetExtensionSelection(options.Sink, sinks, "Sink");
|
var sink = GetExtensionSelection(options.Sink, sinks, "Sink");
|
||||||
var sinkConfig = BuildSettingsConfiguration(configuration, options.SinkSettingsPath, $"{sink.DisplayName}SinkSettings", options.Sink == null);
|
var sinkConfig = BuildSettingsConfiguration(configuration, options.SinkSettingsPath, $"{sink.DisplayName}SinkSettings", options.Sink == null);
|
||||||
|
|
||||||
var data = source.ReadAsync(sourceConfig);
|
var data = source.ReadAsync(sourceConfig, loggerFactory.CreateLogger(source.GetType().Name));
|
||||||
await sink.WriteAsync(data, sinkConfig);
|
await sink.WriteAsync(data, sinkConfig, source, loggerFactory.CreateLogger(sink.GetType().Name));
|
||||||
|
|
||||||
Console.WriteLine("Done");
|
log.LogInformation("Done");
|
||||||
|
|
||||||
Console.WriteLine("Enter to Quit...");
|
Console.WriteLine("Enter to Quit...");
|
||||||
Console.ReadLine();
|
Console.ReadLine();
|
||||||
|
|
|
@ -1,4 +1,9 @@
|
||||||
{
|
{
|
||||||
|
"Logging": {
|
||||||
|
"LogLevel": {
|
||||||
|
"Default": "Information"
|
||||||
|
}
|
||||||
|
},
|
||||||
"JsonSourceSettings": {
|
"JsonSourceSettings": {
|
||||||
},
|
},
|
||||||
"JsonSinkSettings": {
|
"JsonSinkSettings": {
|
||||||
|
|
|
@ -3,6 +3,7 @@ using Microsoft.DataTransfer.AzureTableAPIExtension.Data;
|
||||||
using Microsoft.DataTransfer.AzureTableAPIExtension.Settings;
|
using Microsoft.DataTransfer.AzureTableAPIExtension.Settings;
|
||||||
using Microsoft.DataTransfer.Interfaces;
|
using Microsoft.DataTransfer.Interfaces;
|
||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
using System.ComponentModel.Composition;
|
using System.ComponentModel.Composition;
|
||||||
|
|
||||||
namespace Microsoft.DataTransfer.AzureTableAPIExtension
|
namespace Microsoft.DataTransfer.AzureTableAPIExtension
|
||||||
|
@ -12,7 +13,7 @@ namespace Microsoft.DataTransfer.AzureTableAPIExtension
|
||||||
{
|
{
|
||||||
public string DisplayName => "AzureTableAPI";
|
public string DisplayName => "AzureTableAPI";
|
||||||
|
|
||||||
public async Task WriteAsync(IAsyncEnumerable<IDataItem> dataItems, IConfiguration config, CancellationToken cancellationToken = default)
|
public async Task WriteAsync(IAsyncEnumerable<IDataItem> dataItems, IConfiguration config, IDataSourceExtension dataSource, ILogger logger, CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
var settings = config.Get<AzureTableAPIDataSinkSettings>();
|
var settings = config.Get<AzureTableAPIDataSinkSettings>();
|
||||||
settings.Validate();
|
settings.Validate();
|
||||||
|
|
|
@ -4,6 +4,7 @@ using Microsoft.DataTransfer.AzureTableAPIExtension.Data;
|
||||||
using Microsoft.DataTransfer.AzureTableAPIExtension.Settings;
|
using Microsoft.DataTransfer.AzureTableAPIExtension.Settings;
|
||||||
using Microsoft.DataTransfer.Interfaces;
|
using Microsoft.DataTransfer.Interfaces;
|
||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
using System.ComponentModel.Composition;
|
using System.ComponentModel.Composition;
|
||||||
using System.Runtime.CompilerServices;
|
using System.Runtime.CompilerServices;
|
||||||
|
|
||||||
|
@ -14,7 +15,7 @@ namespace Microsoft.DataTransfer.AzureTableAPIExtension
|
||||||
{
|
{
|
||||||
public string DisplayName => "AzureTableAPI";
|
public string DisplayName => "AzureTableAPI";
|
||||||
|
|
||||||
public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, [EnumeratorCancellation] CancellationToken cancellationToken = default)
|
public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogger logger, [EnumeratorCancellation] CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
var settings = config.Get<AzureTableAPIDataSourceSettings>();
|
var settings = config.Get<AzureTableAPIDataSourceSettings>();
|
||||||
settings.Validate();
|
settings.Validate();
|
||||||
|
|
|
@ -8,6 +8,7 @@ using System.Text;
|
||||||
using Microsoft.Azure.Cosmos;
|
using Microsoft.Azure.Cosmos;
|
||||||
using Microsoft.DataTransfer.Interfaces;
|
using Microsoft.DataTransfer.Interfaces;
|
||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
using Newtonsoft.Json;
|
using Newtonsoft.Json;
|
||||||
using Polly;
|
using Polly;
|
||||||
using Polly.Retry;
|
using Polly.Retry;
|
||||||
|
@ -19,7 +20,7 @@ namespace Microsoft.DataTransfer.CosmosExtension
|
||||||
{
|
{
|
||||||
public string DisplayName => "Cosmos-nosql";
|
public string DisplayName => "Cosmos-nosql";
|
||||||
|
|
||||||
public async Task WriteAsync(IAsyncEnumerable<IDataItem> dataItems, IConfiguration config, CancellationToken cancellationToken = default)
|
public async Task WriteAsync(IAsyncEnumerable<IDataItem> dataItems, IConfiguration config, IDataSourceExtension dataSource, ILogger logger, CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
var settings = config.Get<CosmosSinkSettings>();
|
var settings = config.Get<CosmosSinkSettings>();
|
||||||
settings.Validate();
|
settings.Validate();
|
||||||
|
@ -33,7 +34,7 @@ namespace Microsoft.DataTransfer.CosmosExtension
|
||||||
|
|
||||||
var entryAssembly = Assembly.GetEntryAssembly();
|
var entryAssembly = Assembly.GetEntryAssembly();
|
||||||
bool isShardedImport = false;
|
bool isShardedImport = false;
|
||||||
string sourceName = "Unknown"; // TODO: add source as parameter
|
string sourceName = dataSource.DisplayName;
|
||||||
string sinkName = DisplayName;
|
string sinkName = DisplayName;
|
||||||
string userAgentString = string.Format(CultureInfo.InvariantCulture, "{0}-{1}-{2}-{3}{4}",
|
string userAgentString = string.Format(CultureInfo.InvariantCulture, "{0}-{1}-{2}-{3}{4}",
|
||||||
entryAssembly == null ? "dtr" : entryAssembly.GetName().Name,
|
entryAssembly == null ? "dtr" : entryAssembly.GetName().Name,
|
||||||
|
@ -82,7 +83,7 @@ namespace Microsoft.DataTransfer.CosmosExtension
|
||||||
insertCount += i;
|
insertCount += i;
|
||||||
if (insertCount % 500 == 0)
|
if (insertCount % 500 == 0)
|
||||||
{
|
{
|
||||||
Console.WriteLine($"{insertCount} records added after {timer.ElapsedMilliseconds / 1000.0:F2}s");
|
logger.LogInformation("{InsertCount} records added after {TotalSeconds}s", insertCount, $"{timer.ElapsedMilliseconds / 1000.0:F2}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -92,14 +93,14 @@ namespace Microsoft.DataTransfer.CosmosExtension
|
||||||
await foreach (var batch in batches.WithCancellation(cancellationToken))
|
await foreach (var batch in batches.WithCancellation(cancellationToken))
|
||||||
{
|
{
|
||||||
var insertTasks = settings.InsertStreams
|
var insertTasks = settings.InsertStreams
|
||||||
? batch.Select(item => InsertItemStreamAsync(container, item, settings.PartitionKeyPath, retry, cancellationToken)).ToList()
|
? batch.Select(item => InsertItemStreamAsync(container, item, settings.PartitionKeyPath, retry, logger, cancellationToken)).ToList()
|
||||||
: batch.Select(item => InsertItemAsync(container, item, retry, cancellationToken)).ToList();
|
: batch.Select(item => InsertItemAsync(container, item, retry, logger, cancellationToken)).ToList();
|
||||||
|
|
||||||
var results = await Task.WhenAll(insertTasks);
|
var results = await Task.WhenAll(insertTasks);
|
||||||
ReportCount(results.Sum());
|
ReportCount(results.Sum());
|
||||||
}
|
}
|
||||||
|
|
||||||
Console.WriteLine($"Added {insertCount} total records in {timer.ElapsedMilliseconds / 1000.0:F2}s");
|
logger.LogInformation("Added {InsertCount} total records in {TotalSeconds}s", insertCount, $"{timer.ElapsedMilliseconds / 1000.0:F2}");
|
||||||
}
|
}
|
||||||
|
|
||||||
private static AsyncRetryPolicy GetRetryPolicy(int maxRetryCount, int initialRetryDuration)
|
private static AsyncRetryPolicy GetRetryPolicy(int maxRetryCount, int initialRetryDuration)
|
||||||
|
@ -115,8 +116,9 @@ namespace Microsoft.DataTransfer.CosmosExtension
|
||||||
return retryPolicy;
|
return retryPolicy;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Task<int> InsertItemAsync(Container container, ExpandoObject item, AsyncRetryPolicy retryPolicy, CancellationToken cancellationToken)
|
private static Task<int> InsertItemAsync(Container container, ExpandoObject item, AsyncRetryPolicy retryPolicy, ILogger logger, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
|
logger.LogTrace("Inserting item {Id}", GetPropertyValue(item, "id"));
|
||||||
var task = retryPolicy.ExecuteAsync(() => container.CreateItemAsync(item, cancellationToken: cancellationToken))
|
var task = retryPolicy.ExecuteAsync(() => container.CreateItemAsync(item, cancellationToken: cancellationToken))
|
||||||
.ContinueWith(t =>
|
.ContinueWith(t =>
|
||||||
{
|
{
|
||||||
|
@ -127,7 +129,7 @@ namespace Microsoft.DataTransfer.CosmosExtension
|
||||||
|
|
||||||
if (t.IsFaulted)
|
if (t.IsFaulted)
|
||||||
{
|
{
|
||||||
Console.WriteLine($"Error adding record: {t.Exception?.Message}");
|
logger.LogWarning(t.Exception, "Error adding record: {ErrorMessage}", t.Exception?.Message);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -135,12 +137,16 @@ namespace Microsoft.DataTransfer.CosmosExtension
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Task<int> InsertItemStreamAsync(Container container, ExpandoObject item, string partitionKeyPath, AsyncRetryPolicy retryPolicy, CancellationToken cancellationToken)
|
private static Task<int> InsertItemStreamAsync(Container container, ExpandoObject item, string? partitionKeyPath, AsyncRetryPolicy retryPolicy, ILogger logger, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
|
if (partitionKeyPath == null)
|
||||||
|
throw new ArgumentNullException(nameof(partitionKeyPath));
|
||||||
|
|
||||||
|
logger.LogTrace("Inserting item {Id}", GetPropertyValue(item, "id"));
|
||||||
var json = JsonConvert.SerializeObject(item);
|
var json = JsonConvert.SerializeObject(item);
|
||||||
|
|
||||||
var ms = new MemoryStream(Encoding.UTF8.GetBytes(json));
|
var ms = new MemoryStream(Encoding.UTF8.GetBytes(json));
|
||||||
var task = retryPolicy.ExecuteAsync(() => container.CreateItemStreamAsync(ms, new PartitionKey(((IDictionary<string, object?>)item)[partitionKeyPath.TrimStart('/')]?.ToString()), cancellationToken: cancellationToken))
|
var task = retryPolicy.ExecuteAsync(() => container.CreateItemStreamAsync(ms, new PartitionKey(GetPropertyValue(item, partitionKeyPath.TrimStart('/'))), cancellationToken: cancellationToken))
|
||||||
.ContinueWith(t =>
|
.ContinueWith(t =>
|
||||||
{
|
{
|
||||||
if (t.IsCompletedSuccessfully)
|
if (t.IsCompletedSuccessfully)
|
||||||
|
@ -150,7 +156,7 @@ namespace Microsoft.DataTransfer.CosmosExtension
|
||||||
|
|
||||||
if (t.IsFaulted)
|
if (t.IsFaulted)
|
||||||
{
|
{
|
||||||
Console.WriteLine($"Error adding record: {t.Exception?.Message}");
|
logger.LogWarning(t.Exception, "Error adding record: {ErrorMessage}", t.Exception?.Message);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -158,6 +164,11 @@ namespace Microsoft.DataTransfer.CosmosExtension
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static string? GetPropertyValue(ExpandoObject item, string propertyName)
|
||||||
|
{
|
||||||
|
return ((IDictionary<string, object?>)item)[propertyName]?.ToString();
|
||||||
|
}
|
||||||
|
|
||||||
private static ExpandoObject? BuildObject(IDataItem? source, bool requireStringId = false)
|
private static ExpandoObject? BuildObject(IDataItem? source, bool requireStringId = false)
|
||||||
{
|
{
|
||||||
if (source == null)
|
if (source == null)
|
||||||
|
|
|
@ -4,6 +4,7 @@ using Microsoft.Azure.Cosmos;
|
||||||
using Microsoft.DataTransfer.Interfaces;
|
using Microsoft.DataTransfer.Interfaces;
|
||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
using System;
|
using System;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
namespace Microsoft.DataTransfer.CosmosExtension
|
namespace Microsoft.DataTransfer.CosmosExtension
|
||||||
{
|
{
|
||||||
|
@ -12,7 +13,7 @@ namespace Microsoft.DataTransfer.CosmosExtension
|
||||||
{
|
{
|
||||||
public string DisplayName => "Cosmos-nosql";
|
public string DisplayName => "Cosmos-nosql";
|
||||||
|
|
||||||
public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, [EnumeratorCancellation] CancellationToken cancellationToken = default)
|
public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogger logger, [EnumeratorCancellation] CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
var settings = config.Get<CosmosSourceSettings>();
|
var settings = config.Get<CosmosSourceSettings>();
|
||||||
settings.Validate();
|
settings.Validate();
|
||||||
|
@ -31,7 +32,7 @@ namespace Microsoft.DataTransfer.CosmosExtension
|
||||||
requestOptions.PartitionKey = new PartitionKey(settings.PartitionKey);
|
requestOptions.PartitionKey = new PartitionKey(settings.PartitionKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
Console.WriteLine($"Reading from {settings.Database}.{settings.Container}");
|
logger.LogInformation("Reading from {Database}.{Container}", settings.Database, settings.Container);
|
||||||
using FeedIterator<Dictionary<string, object?>> feedIterator = GetFeedIterator<Dictionary<string, object?>>(settings, container, requestOptions);
|
using FeedIterator<Dictionary<string, object?>> feedIterator = GetFeedIterator<Dictionary<string, object?>>(settings, container, requestOptions);
|
||||||
while (feedIterator.HasMoreResults)
|
while (feedIterator.HasMoreResults)
|
||||||
{
|
{
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
using Newtonsoft.Json.Linq;
|
using Newtonsoft.Json.Linq;
|
||||||
|
|
||||||
namespace Microsoft.DataTransfer.JsonExtension.UnitTests
|
namespace Microsoft.DataTransfer.JsonExtension.UnitTests
|
||||||
|
@ -24,7 +25,7 @@ namespace Microsoft.DataTransfer.JsonExtension.UnitTests
|
||||||
{ "Indented", "true" },
|
{ "Indented", "true" },
|
||||||
});
|
});
|
||||||
|
|
||||||
await output.WriteAsync(input.ReadAsync(sourceConfig), sinkConfig);
|
await output.WriteAsync(input.ReadAsync(sourceConfig, NullLogger.Instance), sinkConfig, input, NullLogger.Instance);
|
||||||
|
|
||||||
bool areEqual = JToken.DeepEquals(JToken.Parse(await File.ReadAllTextAsync(fileIn)), JToken.Parse(await File.ReadAllTextAsync(fileOut)));
|
bool areEqual = JToken.DeepEquals(JToken.Parse(await File.ReadAllTextAsync(fileIn)), JToken.Parse(await File.ReadAllTextAsync(fileOut)));
|
||||||
Assert.IsTrue(areEqual);
|
Assert.IsTrue(areEqual);
|
||||||
|
@ -50,7 +51,7 @@ namespace Microsoft.DataTransfer.JsonExtension.UnitTests
|
||||||
{ "Indented", "true" },
|
{ "Indented", "true" },
|
||||||
});
|
});
|
||||||
|
|
||||||
await output.WriteAsync(input.ReadAsync(sourceConfig), sinkConfig);
|
await output.WriteAsync(input.ReadAsync(sourceConfig, NullLogger.Instance), sinkConfig, input, NullLogger.Instance);
|
||||||
|
|
||||||
bool areEqual = JToken.DeepEquals(JToken.Parse(await File.ReadAllTextAsync(fileCompare)), JToken.Parse(await File.ReadAllTextAsync(fileOut)));
|
bool areEqual = JToken.DeepEquals(JToken.Parse(await File.ReadAllTextAsync(fileCompare)), JToken.Parse(await File.ReadAllTextAsync(fileOut)));
|
||||||
Assert.IsTrue(areEqual);
|
Assert.IsTrue(areEqual);
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
using Microsoft.DataTransfer.Interfaces;
|
using Microsoft.DataTransfer.Interfaces;
|
||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
using Newtonsoft.Json;
|
using Newtonsoft.Json;
|
||||||
|
|
||||||
namespace Microsoft.DataTransfer.JsonExtension.UnitTests
|
namespace Microsoft.DataTransfer.JsonExtension.UnitTests
|
||||||
|
@ -35,7 +36,7 @@ namespace Microsoft.DataTransfer.JsonExtension.UnitTests
|
||||||
{ "FilePath", outputFile }
|
{ "FilePath", outputFile }
|
||||||
});
|
});
|
||||||
|
|
||||||
await sink.WriteAsync(data.ToAsyncEnumerable(), config);
|
await sink.WriteAsync(data.ToAsyncEnumerable(), config, new JsonDataSourceExtension(), NullLogger.Instance);
|
||||||
|
|
||||||
var outputData = JsonConvert.DeserializeObject<List<TestDataObject>>(await File.ReadAllTextAsync(outputFile));
|
var outputData = JsonConvert.DeserializeObject<List<TestDataObject>>(await File.ReadAllTextAsync(outputFile));
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
using Microsoft.DataTransfer.Interfaces;
|
using Microsoft.DataTransfer.Interfaces;
|
||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
|
||||||
namespace Microsoft.DataTransfer.JsonExtension.UnitTests
|
namespace Microsoft.DataTransfer.JsonExtension.UnitTests
|
||||||
{
|
{
|
||||||
|
@ -14,7 +15,7 @@ namespace Microsoft.DataTransfer.JsonExtension.UnitTests
|
||||||
{ "FilePath", "Data/SimpleIdName.json" }
|
{ "FilePath", "Data/SimpleIdName.json" }
|
||||||
});
|
});
|
||||||
|
|
||||||
await foreach (var dataItem in extension.ReadAsync(config))
|
await foreach (var dataItem in extension.ReadAsync(config, NullLogger.Instance))
|
||||||
{
|
{
|
||||||
CollectionAssert.AreEquivalent(new[] { "id", "name" }, dataItem.GetFieldNames().ToArray());
|
CollectionAssert.AreEquivalent(new[] { "id", "name" }, dataItem.GetFieldNames().ToArray());
|
||||||
Assert.IsNotNull(dataItem.GetValue("id"));
|
Assert.IsNotNull(dataItem.GetValue("id"));
|
||||||
|
@ -31,7 +32,7 @@ namespace Microsoft.DataTransfer.JsonExtension.UnitTests
|
||||||
{ "FilePath", "Data/NestedObjects.json" }
|
{ "FilePath", "Data/NestedObjects.json" }
|
||||||
});
|
});
|
||||||
|
|
||||||
await foreach (var dataItem in extension.ReadAsync(config))
|
await foreach (var dataItem in extension.ReadAsync(config, NullLogger.Instance))
|
||||||
{
|
{
|
||||||
if (dataItem.GetValue("child") is IDataItem child)
|
if (dataItem.GetValue("child") is IDataItem child)
|
||||||
{
|
{
|
||||||
|
@ -56,7 +57,7 @@ namespace Microsoft.DataTransfer.JsonExtension.UnitTests
|
||||||
});
|
});
|
||||||
|
|
||||||
int counter = 0;
|
int counter = 0;
|
||||||
await foreach (var dataItem in extension.ReadAsync(config))
|
await foreach (var dataItem in extension.ReadAsync(config, NullLogger.Instance))
|
||||||
{
|
{
|
||||||
counter++;
|
counter++;
|
||||||
CollectionAssert.AreEquivalent(new[] { "id", "name" }, dataItem.GetFieldNames().ToArray());
|
CollectionAssert.AreEquivalent(new[] { "id", "name" }, dataItem.GetFieldNames().ToArray());
|
||||||
|
@ -79,7 +80,7 @@ namespace Microsoft.DataTransfer.JsonExtension.UnitTests
|
||||||
|
|
||||||
int counter = 0;
|
int counter = 0;
|
||||||
double lastId = -1;
|
double lastId = -1;
|
||||||
await foreach (var dataItem in extension.ReadAsync(config))
|
await foreach (var dataItem in extension.ReadAsync(config, NullLogger.Instance))
|
||||||
{
|
{
|
||||||
counter++;
|
counter++;
|
||||||
CollectionAssert.AreEquivalent(new[] { "id", "name" }, dataItem.GetFieldNames().ToArray());
|
CollectionAssert.AreEquivalent(new[] { "id", "name" }, dataItem.GetFieldNames().ToArray());
|
||||||
|
@ -104,7 +105,7 @@ namespace Microsoft.DataTransfer.JsonExtension.UnitTests
|
||||||
});
|
});
|
||||||
|
|
||||||
int counter = 0;
|
int counter = 0;
|
||||||
await foreach (var dataItem in extension.ReadAsync(config))
|
await foreach (var dataItem in extension.ReadAsync(config, NullLogger.Instance))
|
||||||
{
|
{
|
||||||
counter++;
|
counter++;
|
||||||
CollectionAssert.AreEquivalent(new[] { "id", "name" }, dataItem.GetFieldNames().ToArray());
|
CollectionAssert.AreEquivalent(new[] { "id", "name" }, dataItem.GetFieldNames().ToArray());
|
||||||
|
@ -125,7 +126,7 @@ namespace Microsoft.DataTransfer.JsonExtension.UnitTests
|
||||||
});
|
});
|
||||||
|
|
||||||
int counter = 0;
|
int counter = 0;
|
||||||
await foreach (var dataItem in extension.ReadAsync(config))
|
await foreach (var dataItem in extension.ReadAsync(config, NullLogger.Instance))
|
||||||
{
|
{
|
||||||
counter++;
|
counter++;
|
||||||
CollectionAssert.AreEquivalent(new[] { "id", "name" }, dataItem.GetFieldNames().ToArray());
|
CollectionAssert.AreEquivalent(new[] { "id", "name" }, dataItem.GetFieldNames().ToArray());
|
||||||
|
|
|
@ -4,6 +4,7 @@ using System.Text.Json;
|
||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
using System;
|
using System;
|
||||||
using Microsoft.DataTransfer.JsonExtension.Settings;
|
using Microsoft.DataTransfer.JsonExtension.Settings;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
namespace Microsoft.DataTransfer.JsonExtension
|
namespace Microsoft.DataTransfer.JsonExtension
|
||||||
{
|
{
|
||||||
|
@ -12,14 +13,14 @@ namespace Microsoft.DataTransfer.JsonExtension
|
||||||
{
|
{
|
||||||
public string DisplayName => "JSON";
|
public string DisplayName => "JSON";
|
||||||
|
|
||||||
public async Task WriteAsync(IAsyncEnumerable<IDataItem> dataItems, IConfiguration config, CancellationToken cancellationToken = default)
|
public async Task WriteAsync(IAsyncEnumerable<IDataItem> dataItems, IConfiguration config, IDataSourceExtension dataSource, ILogger logger, CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
var settings = config.Get<JsonSinkSettings>();
|
var settings = config.Get<JsonSinkSettings>();
|
||||||
settings.Validate();
|
settings.Validate();
|
||||||
|
|
||||||
if (settings.FilePath != null)
|
if (settings.FilePath != null)
|
||||||
{
|
{
|
||||||
Console.WriteLine($"Writing to file '{settings.FilePath}'");
|
logger.LogInformation("Writing to file '{FilePath}'", settings.FilePath);
|
||||||
await using var stream = File.Create(settings.FilePath);
|
await using var stream = File.Create(settings.FilePath);
|
||||||
await using var writer = new Utf8JsonWriter(stream, new JsonWriterOptions
|
await using var writer = new Utf8JsonWriter(stream, new JsonWriterOptions
|
||||||
{
|
{
|
||||||
|
@ -33,7 +34,7 @@ namespace Microsoft.DataTransfer.JsonExtension
|
||||||
}
|
}
|
||||||
|
|
||||||
writer.WriteEndArray();
|
writer.WriteEndArray();
|
||||||
Console.WriteLine($"Completed writing data to file '{settings.FilePath}'");
|
logger.LogInformation("Completed writing data to file '{FilePath}'", settings.FilePath);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ using Microsoft.DataTransfer.Interfaces;
|
||||||
using Microsoft.DataTransfer.JsonExtension.Settings;
|
using Microsoft.DataTransfer.JsonExtension.Settings;
|
||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
using System;
|
using System;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
namespace Microsoft.DataTransfer.JsonExtension
|
namespace Microsoft.DataTransfer.JsonExtension
|
||||||
{
|
{
|
||||||
|
@ -13,7 +14,7 @@ namespace Microsoft.DataTransfer.JsonExtension
|
||||||
public class JsonDataSourceExtension : IDataSourceExtension
|
public class JsonDataSourceExtension : IDataSourceExtension
|
||||||
{
|
{
|
||||||
public string DisplayName => "JSON";
|
public string DisplayName => "JSON";
|
||||||
public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, [EnumeratorCancellation] CancellationToken cancellationToken = default)
|
public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogger logger, [EnumeratorCancellation] CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
var settings = config.Get<JsonSourceSettings>();
|
var settings = config.Get<JsonSourceSettings>();
|
||||||
settings.Validate();
|
settings.Validate();
|
||||||
|
@ -22,8 +23,8 @@ namespace Microsoft.DataTransfer.JsonExtension
|
||||||
{
|
{
|
||||||
if (File.Exists(settings.FilePath))
|
if (File.Exists(settings.FilePath))
|
||||||
{
|
{
|
||||||
Console.WriteLine($"Reading file '{settings.FilePath}'");
|
logger.LogInformation("Reading file '{FilePath}'", settings.FilePath);
|
||||||
var list = await ReadFileAsync(cancellationToken, settings.FilePath);
|
var list = await ReadFileAsync(cancellationToken, settings.FilePath, logger);
|
||||||
|
|
||||||
if (list != null)
|
if (list != null)
|
||||||
{
|
{
|
||||||
|
@ -36,11 +37,11 @@ namespace Microsoft.DataTransfer.JsonExtension
|
||||||
else if (Directory.Exists(settings.FilePath))
|
else if (Directory.Exists(settings.FilePath))
|
||||||
{
|
{
|
||||||
string[] files = Directory.GetFiles(settings.FilePath, "*.json", SearchOption.AllDirectories);
|
string[] files = Directory.GetFiles(settings.FilePath, "*.json", SearchOption.AllDirectories);
|
||||||
Console.WriteLine($"Reading {files.Length} files from '{settings.FilePath}'");
|
logger.LogInformation("Reading {FileCount} files from '{Folder}'", files.Length, settings.FilePath);
|
||||||
foreach (string filePath in files.OrderBy(f => f))
|
foreach (string filePath in files.OrderBy(f => f))
|
||||||
{
|
{
|
||||||
Console.WriteLine($"Reading file '{filePath}'");
|
logger.LogInformation("Reading file '{FilePath}'", filePath);
|
||||||
var list = await ReadFileAsync(cancellationToken, filePath);
|
var list = await ReadFileAsync(cancellationToken, filePath, logger);
|
||||||
|
|
||||||
if (list != null)
|
if (list != null)
|
||||||
{
|
{
|
||||||
|
@ -51,11 +52,11 @@ namespace Microsoft.DataTransfer.JsonExtension
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Console.WriteLine($"Completed reading '{settings.FilePath}'");
|
logger.LogInformation("Completed reading '{FilePath}'", settings.FilePath);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static async Task<List<Dictionary<string, object?>>?> ReadFileAsync(CancellationToken cancellationToken, string filePath)
|
private static async Task<List<Dictionary<string, object?>>?> ReadFileAsync(CancellationToken cancellationToken, string filePath, ILogger logger)
|
||||||
{
|
{
|
||||||
var file = await File.ReadAllTextAsync(filePath, cancellationToken);
|
var file = await File.ReadAllTextAsync(filePath, cancellationToken);
|
||||||
try
|
try
|
||||||
|
@ -85,7 +86,7 @@ namespace Microsoft.DataTransfer.JsonExtension
|
||||||
|
|
||||||
if (!list.Any())
|
if (!list.Any())
|
||||||
{
|
{
|
||||||
Console.WriteLine($"No records read from '{filePath}'");
|
logger.LogWarning("No records read from '{FilePath}'", filePath);
|
||||||
}
|
}
|
||||||
|
|
||||||
return list;
|
return list;
|
||||||
|
|
|
@ -3,6 +3,7 @@ using Microsoft.DataTransfer.MongoExtension.Settings;
|
||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
using MongoDB.Bson;
|
using MongoDB.Bson;
|
||||||
using System.ComponentModel.Composition;
|
using System.ComponentModel.Composition;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
namespace Microsoft.DataTransfer.MongoExtension;
|
namespace Microsoft.DataTransfer.MongoExtension;
|
||||||
[Export(typeof(IDataSinkExtension))]
|
[Export(typeof(IDataSinkExtension))]
|
||||||
|
@ -10,7 +11,7 @@ public class MongoDataSinkExtension : IDataSinkExtension
|
||||||
{
|
{
|
||||||
public string DisplayName => "Mongo";
|
public string DisplayName => "Mongo";
|
||||||
|
|
||||||
public async Task WriteAsync(IAsyncEnumerable<IDataItem> dataItems, IConfiguration config, CancellationToken cancellationToken = default)
|
public async Task WriteAsync(IAsyncEnumerable<IDataItem> dataItems, IConfiguration config, IDataSourceExtension dataSource, ILogger logger, CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
var settings = config.Get<MongoSinkSettings>();
|
var settings = config.Get<MongoSinkSettings>();
|
||||||
settings.Validate();
|
settings.Validate();
|
||||||
|
|
|
@ -4,6 +4,7 @@ using Microsoft.Extensions.Configuration;
|
||||||
using Microsoft.DataTransfer.Interfaces;
|
using Microsoft.DataTransfer.Interfaces;
|
||||||
using MongoDB.Bson;
|
using MongoDB.Bson;
|
||||||
using Microsoft.DataTransfer.MongoExtension.Settings;
|
using Microsoft.DataTransfer.MongoExtension.Settings;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
namespace Microsoft.DataTransfer.MongoExtension;
|
namespace Microsoft.DataTransfer.MongoExtension;
|
||||||
[Export(typeof(IDataSourceExtension))]
|
[Export(typeof(IDataSourceExtension))]
|
||||||
|
@ -11,7 +12,7 @@ internal class MongoDataSourceExtension : IDataSourceExtension
|
||||||
{
|
{
|
||||||
public string DisplayName => "Mongo";
|
public string DisplayName => "Mongo";
|
||||||
|
|
||||||
public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, [EnumeratorCancellation] CancellationToken cancellationToken = default)
|
public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogger logger, [EnumeratorCancellation] CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
var settings = config.Get<MongoSourceSettings>();
|
var settings = config.Get<MongoSourceSettings>();
|
||||||
settings.Validate();
|
settings.Validate();
|
||||||
|
|
|
@ -4,6 +4,7 @@ using System.Data;
|
||||||
using Microsoft.Data.SqlClient;
|
using Microsoft.Data.SqlClient;
|
||||||
using Microsoft.DataTransfer.Interfaces;
|
using Microsoft.DataTransfer.Interfaces;
|
||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
namespace Microsoft.DataTransfer.SqlServerExtension
|
namespace Microsoft.DataTransfer.SqlServerExtension
|
||||||
{
|
{
|
||||||
|
@ -12,7 +13,7 @@ namespace Microsoft.DataTransfer.SqlServerExtension
|
||||||
{
|
{
|
||||||
public string DisplayName => "SqlServer";
|
public string DisplayName => "SqlServer";
|
||||||
|
|
||||||
public async Task WriteAsync(IAsyncEnumerable<IDataItem> dataItems, IConfiguration config, CancellationToken cancellationToken = default)
|
public async Task WriteAsync(IAsyncEnumerable<IDataItem> dataItems, IConfiguration config, IDataSourceExtension dataSource, ILogger logger, CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
var settings = config.Get<SqlServerSinkSettings>();
|
var settings = config.Get<SqlServerSinkSettings>();
|
||||||
settings.Validate();
|
settings.Validate();
|
||||||
|
|
|
@ -3,6 +3,7 @@ using System.Runtime.CompilerServices;
|
||||||
using Microsoft.Data.SqlClient;
|
using Microsoft.Data.SqlClient;
|
||||||
using Microsoft.DataTransfer.Interfaces;
|
using Microsoft.DataTransfer.Interfaces;
|
||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
namespace Microsoft.DataTransfer.SqlServerExtension
|
namespace Microsoft.DataTransfer.SqlServerExtension
|
||||||
{
|
{
|
||||||
|
@ -11,7 +12,7 @@ namespace Microsoft.DataTransfer.SqlServerExtension
|
||||||
{
|
{
|
||||||
public string DisplayName => "SqlServer";
|
public string DisplayName => "SqlServer";
|
||||||
|
|
||||||
public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, [EnumeratorCancellation] CancellationToken cancellationToken = default)
|
public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogger logger, [EnumeratorCancellation] CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
var settings = config.Get<SqlServerSourceSettings>();
|
var settings = config.Get<SqlServerSourceSettings>();
|
||||||
settings.Validate();
|
settings.Validate();
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
namespace Microsoft.DataTransfer.Interfaces
|
namespace Microsoft.DataTransfer.Interfaces
|
||||||
{
|
{
|
||||||
public interface IDataSinkExtension : IDataTransferExtension
|
public interface IDataSinkExtension : IDataTransferExtension
|
||||||
{
|
{
|
||||||
Task WriteAsync(IAsyncEnumerable<IDataItem> dataItems, IConfiguration config, CancellationToken cancellationToken = default);
|
Task WriteAsync(IAsyncEnumerable<IDataItem> dataItems, IConfiguration config, IDataSourceExtension dataSource, ILogger logger, CancellationToken cancellationToken = default);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
namespace Microsoft.DataTransfer.Interfaces
|
namespace Microsoft.DataTransfer.Interfaces
|
||||||
{
|
{
|
||||||
public interface IDataSourceExtension : IDataTransferExtension
|
public interface IDataSourceExtension : IDataTransferExtension
|
||||||
{
|
{
|
||||||
IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, CancellationToken cancellationToken = default);
|
IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogger logger, CancellationToken cancellationToken = default);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,7 @@
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="6.0.0" />
|
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="6.0.0" />
|
||||||
|
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.0" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
||||||
|
|
Загрузка…
Ссылка в новой задаче