This commit is contained in:
Alexander Ivanov 2015-07-10 13:21:45 -07:00
Родитель ddfd28b6ef
Коммит 6168db981d
400 изменённых файлов: 10017 добавлений и 941 удалений

Просмотреть файл

@ -13,7 +13,7 @@ using System.Threading.Tasks;
namespace Microsoft.DataTransfer.AzureTable.FunctionalTests
{
[TestClass]
public class AzureTableDataSourceAdapterTests : DataTransferTestBase
public class AzureTableDataSourceAdapterTests : DataTransferAdapterTestBase
{
private const int NumberOfItems = 2000;
@ -46,22 +46,8 @@ namespace Microsoft.DataTransfer.AzureTable.FunctionalTests
c.InternalFields == AzureTableInternalFields.None)
.First();
var readResults = new List<IDataItem>();
using (var adapter = await new AzureTableSourceAdapterFactory()
.CreateAsync(configuration, DataTransferContextMock.Instance))
{
IDataItem dataItem;
var readOutput = new ReadOutputByRef();
while ((dataItem = await adapter.ReadNextAsync(readOutput, CancellationToken.None)) != null)
{
readResults.Add(dataItem);
Assert.IsNotNull(readOutput.DataItemId, CommonTestResources.MissingDataItemId);
readOutput.Wipe();
}
}
DataItemCollectionAssert.AreEquivalent(sampleData, readResults, TestResources.InvalidDocumentsRead);
DataItemCollectionAssert.AreEquivalent(sampleData,
await ReadData(configuration), TestResources.InvalidDocumentsRead);
}
[TestMethod, Timeout(120000)]
@ -77,22 +63,8 @@ namespace Microsoft.DataTransfer.AzureTable.FunctionalTests
c.InternalFields == AzureTableInternalFields.None)
.First();
var readResults = new List<IDataItem>();
using (var adapter = await new AzureTableSourceAdapterFactory()
.CreateAsync(configuration, DataTransferContextMock.Instance))
{
IDataItem dataItem;
var readOutput = new ReadOutputByRef();
while ((dataItem = await adapter.ReadNextAsync(readOutput, CancellationToken.None)) != null)
{
readResults.Add(dataItem);
Assert.IsNotNull(readOutput.DataItemId, CommonTestResources.MissingDataItemId);
readOutput.Wipe();
}
}
DataItemCollectionAssert.AreEquivalent(sampleData.Where(e => (int)e[IntegerPropertyName] < 100), readResults, TestResources.InvalidDocumentsRead);
DataItemCollectionAssert.AreEquivalent(sampleData.Where(e => (int)e[IntegerPropertyName] < 100),
await ReadData(configuration), TestResources.InvalidDocumentsRead);
}
[TestMethod, Timeout(120000)]
@ -108,26 +80,11 @@ namespace Microsoft.DataTransfer.AzureTable.FunctionalTests
c.InternalFields == AzureTableInternalFields.None)
.First();
var readResults = new List<IDataItem>();
using (var adapter = await new AzureTableSourceAdapterFactory()
.CreateAsync(configuration, DataTransferContextMock.Instance))
{
IDataItem dataItem;
var readOutput = new ReadOutputByRef();
while ((dataItem = await adapter.ReadNextAsync(readOutput, CancellationToken.None)) != null)
{
readResults.Add(dataItem);
Assert.IsNotNull(readOutput.DataItemId, CommonTestResources.MissingDataItemId);
readOutput.Wipe();
}
}
DataItemCollectionAssert.AreEquivalent(
sampleData
.Select(i => new Dictionary<string, object> { { StringPropertyName, i[StringPropertyName] } })
.ToArray(),
readResults,
await ReadData(configuration),
TestResources.InvalidDocumentsRead);
}
@ -157,10 +114,19 @@ namespace Microsoft.DataTransfer.AzureTable.FunctionalTests
await ReadAndVerifyFields(configuration, new[] { "RowKey", "PartitionKey", "Timestamp" });
}
private async Task<List<IDataItem>> ReadData(IAzureTableSourceAdapterConfiguration configuration)
{
using (var adapter = await new AzureTableSourceAdapterFactory()
.CreateAsync(configuration, DataTransferContextMock.Instance, CancellationToken.None))
{
return await ReadDataAsync(adapter);
}
}
private static async Task ReadAndVerifyFields(IAzureTableSourceAdapterConfiguration configuration, string[] expectedInternalProperties)
{
using (var adapter = await new AzureTableSourceAdapterFactory()
.CreateAsync(configuration, DataTransferContextMock.Instance))
.CreateAsync(configuration, DataTransferContextMock.Instance, CancellationToken.None))
{
IDataItem dataItem;
var readOutput = new ReadOutputByRef();

Просмотреть файл

@ -96,6 +96,10 @@
</Compile>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\Core\Microsoft.DataTransfer.Extensibility.Basics\Microsoft.DataTransfer.Extensibility.Basics.csproj">
<Project>{83329196-46be-4cd0-b498-74e9ac463ed9}</Project>
<Name>Microsoft.DataTransfer.Extensibility.Basics</Name>
</ProjectReference>
<ProjectReference Include="..\..\Core\Microsoft.DataTransfer.Extensibility\Microsoft.DataTransfer.Extensibility.csproj">
<Project>{acc3b08a-2706-4857-b374-8f6311db0e6f}</Project>
<Name>Microsoft.DataTransfer.Extensibility</Name>

Просмотреть файл

@ -1,10 +1,10 @@
using Microsoft.DataTransfer.AzureTable.Shared;
using Microsoft.DataTransfer.Basics.Extensions;
using Microsoft.DataTransfer.WpfHost.Extensibility.Basics;
using Microsoft.DataTransfer.WpfHost.Basics;
namespace Microsoft.DataTransfer.AzureTable.Wpf.Shared
{
abstract class AzureTableAdapterConfiguration : ValidatableConfiguration, IAzureTableAdapterConfiguration
abstract class AzureTableAdapterConfiguration : ValidatableBindableBase, IAzureTableAdapterConfiguration
{
public static readonly string ConnectionStringPropertyName =
ObjectExtensions.MemberName<IAzureTableAdapterConfiguration>(c => c.ConnectionString);

Просмотреть файл

@ -5,6 +5,8 @@ namespace Microsoft.DataTransfer.AzureTable
{
sealed class Errors : CommonErrors
{
private Errors() { }
public static Exception ConnectionStringMissing()
{
return new ArgumentException(Resources.ConnectionStringMissing);

Просмотреть файл

@ -107,6 +107,10 @@
</EmbeddedResource>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\Core\Microsoft.DataTransfer.Extensibility.Basics\Microsoft.DataTransfer.Extensibility.Basics.csproj">
<Project>{83329196-46be-4cd0-b498-74e9ac463ed9}</Project>
<Name>Microsoft.DataTransfer.Extensibility.Basics</Name>
</ProjectReference>
<ProjectReference Include="..\..\Core\Microsoft.DataTransfer.Extensibility\Microsoft.DataTransfer.Extensibility.csproj">
<Project>{acc3b08a-2706-4857-b374-8f6311db0e6f}</Project>
<Name>Microsoft.DataTransfer.Extensibility</Name>

Просмотреть файл

@ -1,6 +1,8 @@
using Microsoft.DataTransfer.Basics;
using Microsoft.DataTransfer.Extensibility;
using Microsoft.DataTransfer.Extensibility.Basics;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.DataTransfer.AzureTable.Source
@ -8,7 +10,7 @@ namespace Microsoft.DataTransfer.AzureTable.Source
/// <summary>
/// Provides data source adapters capable of reading data from Azure Table storage.
/// </summary>
public sealed class AzureTableSourceAdapterFactory : IDataSourceAdapterFactory<IAzureTableSourceAdapterConfiguration>
public sealed class AzureTableSourceAdapterFactory : DataAdapterFactoryBase, IDataSourceAdapterFactory<IAzureTableSourceAdapterConfiguration>
{
/// <summary>
/// Gets the description of the data adapter.
@ -23,10 +25,11 @@ namespace Microsoft.DataTransfer.AzureTable.Source
/// </summary>
/// <param name="configuration">Data source adapter configuration.</param>
/// <param name="context">Data transfer operation context.</param>
/// <param name="cancellation">Cancellation token.</param>
/// <returns>Task that represents asynchronous create operation.</returns>
public Task<IDataSourceAdapter> CreateAsync(IAzureTableSourceAdapterConfiguration configuration, IDataTransferContext context)
public Task<IDataSourceAdapter> CreateAsync(IAzureTableSourceAdapterConfiguration configuration, IDataTransferContext context, CancellationToken cancellation)
{
return Task.Factory.StartNew(() => Create(configuration));
return Task.Factory.StartNew(() => Create(configuration), cancellation);
}
private static IDataSourceAdapter Create(IAzureTableSourceAdapterConfiguration configuration)

Просмотреть файл

@ -1,5 +1,4 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following

Просмотреть файл

@ -1,5 +1,4 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following

Просмотреть файл

@ -102,6 +102,9 @@
<LastGenOutput>TestResources.Designer.cs</LastGenOutput>
</EmbeddedResource>
</ItemGroup>
<ItemGroup>
<None Include="app.config" />
</ItemGroup>
<Choose>
<When Condition="'$(VisualStudioVersion)' == '10.0' And '$(IsCodedUITest)' == 'True'">
<ItemGroup>

Просмотреть файл

@ -1,5 +1,4 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following

Просмотреть файл

@ -0,0 +1,15 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<runtime>
<assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">
<dependentAssembly>
<assemblyIdentity name="Newtonsoft.Json" publicKeyToken="30ad4fe6b2a6aeed" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-6.0.0.0" newVersion="6.0.0.0" />
</dependentAssembly>
<dependentAssembly>
<assemblyIdentity name="Microsoft.Azure.Documents.Client" publicKeyToken="31bf3856ad364e35" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-1.2.0.0" newVersion="1.2.0.0" />
</dependentAssembly>
</assemblyBinding>
</runtime>
</configuration>

Просмотреть файл

@ -8,6 +8,8 @@
<dataTransfer.sources>
<add name="DocumentDB" type="Microsoft.DataTransfer.DocumentDb.Source.DocumentDbSourceAdapterFactory, Microsoft.DataTransfer.DocumentDb" />
<add name="HBase" type="Microsoft.DataTransfer.HBase.Source.HBaseSourceAdapterFactory, Microsoft.DataTransfer.HBase" />
<add name="DynamoDB" type="Microsoft.DataTransfer.DynamoDb.Source.DynamoDbSourceAdapterFactory, Microsoft.DataTransfer.DynamoDb" />
<add name="RavenDB" type="Microsoft.DataTransfer.RavenDb.Source.RavenDbSourceAdapterFactory, Microsoft.DataTransfer.RavenDb" />
<add name="AzureTable" type="Microsoft.DataTransfer.AzureTable.Source.AzureTableSourceAdapterFactory, Microsoft.DataTransfer.AzureTable" />
<add name="CsvFile" type="Microsoft.DataTransfer.CsvFile.Source.CsvFileSourceAdapterFactory, Microsoft.DataTransfer.CsvFile" />
@ -40,6 +42,10 @@
<assemblyIdentity name="Newtonsoft.Json" publicKeyToken="30ad4fe6b2a6aeed" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-6.0.0.0" newVersion="6.0.0.0" />
</dependentAssembly>
<dependentAssembly>
<assemblyIdentity name="Microsoft.Azure.Documents.Client" publicKeyToken="31bf3856ad364e35" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-1.2.0.0" newVersion="1.2.0.0" />
</dependentAssembly>
</assemblyBinding>
</runtime>
</configuration>

Просмотреть файл

@ -1,10 +1,12 @@
using Microsoft.DataTransfer.ServiceModel.Statistics;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.DataTransfer.ConsoleHost.App.Handlers
{
interface ITransferStatisticsHandler
{
ITransferStatistics CreateNew(ITransferStatisticsConfiguration configuration);
Task<ITransferStatistics> CreateNew(ITransferStatisticsConfiguration configuration, CancellationToken cancellation);
void PrintProgress(ITransferStatisticsSnapshot statistics);
void PrintResult(ITransferStatisticsSnapshot statistics);
}

Просмотреть файл

@ -41,10 +41,13 @@ namespace Microsoft.DataTransfer.ConsoleHost.App.Handlers
if (!transferService.GetKnownSinks().TryGetValue(configuration.TargetName, out sinkDefinition))
throw Errors.UnknownDestination(configuration.TargetName);
var statistics = statisticsHandler.CreateNew(infrastructureConfiguration.Create(configuration.InfrastructureConfiguration));
ITransferStatistics statistics = null;
using (var cancellation = new ConsoleCancellationSource())
{
statistics = await statisticsHandler.CreateNew(
infrastructureConfiguration.Create(configuration.InfrastructureConfiguration), cancellation.Token);
using (new Timer(PrintStatistics, statistics, TimeSpan.Zero, TimeSpan.FromSeconds(1)))
{
await transferService
@ -62,7 +65,8 @@ namespace Microsoft.DataTransfer.ConsoleHost.App.Handlers
}
}
statisticsHandler.PrintResult(statistics.GetSnapshot());
if (statistics != null)
statisticsHandler.PrintResult(statistics.GetSnapshot());
}
private void ValidateConfiguration()

Просмотреть файл

@ -3,6 +3,8 @@ using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.DataTransfer.ConsoleHost.App.Handlers
{
@ -15,9 +17,9 @@ namespace Microsoft.DataTransfer.ConsoleHost.App.Handlers
this.statisticsFactory = statisticsFactory;
}
public ITransferStatistics CreateNew(ITransferStatisticsConfiguration configuration)
public Task<ITransferStatistics> CreateNew(ITransferStatisticsConfiguration configuration, CancellationToken cancellation)
{
return statisticsFactory.Create(configuration);
return statisticsFactory.Create(configuration, cancellation);
}
public void PrintProgress(ITransferStatisticsSnapshot statistics)

Просмотреть файл

@ -105,6 +105,14 @@
<Project>{81a18d96-39fc-4165-8c52-4c887dce4a61}</Project>
<Name>Microsoft.DataTransfer.DocumentDb</Name>
</ProjectReference>
<ProjectReference Include="..\..\DynamoDb\Microsoft.DataTransfer.DynamoDb\Microsoft.DataTransfer.DynamoDb.csproj">
<Project>{af2a444f-cabd-45be-9b9e-9d53501b2041}</Project>
<Name>Microsoft.DataTransfer.DynamoDb</Name>
</ProjectReference>
<ProjectReference Include="..\..\HBase\Microsoft.DataTransfer.HBase\Microsoft.DataTransfer.HBase.csproj">
<Project>{ad24db36-8623-4e11-9995-a6eb5ef50a38}</Project>
<Name>Microsoft.DataTransfer.HBase</Name>
</ProjectReference>
<ProjectReference Include="..\..\MongoDb\Microsoft.DataTransfer.MongoDb\Microsoft.DataTransfer.MongoDb.csproj">
<Project>{782ce839-40a8-4c47-9998-c3b0b7c3d2a1}</Project>
<Name>Microsoft.DataTransfer.MongoDb</Name>

Просмотреть файл

@ -4,6 +4,7 @@ using Microsoft.DataTransfer.TestsCommon.Mocks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.DataTransfer.Core.UnitTests.FactoryAdapters
@ -20,7 +21,7 @@ namespace Microsoft.DataTransfer.Core.UnitTests.FactoryAdapters
var adapterFactoryMock = new Mock<IDataSourceAdapterFactory<ITestAdapterConfiguration>>();
adapterFactoryMock
.Setup(f => f.CreateAsync(It.IsAny<ITestAdapterConfiguration>(), It.IsAny<IDataTransferContext>()))
.Setup(f => f.CreateAsync(It.IsAny<ITestAdapterConfiguration>(), It.IsAny<IDataTransferContext>(), It.IsAny<CancellationToken>()))
.Returns(() => Task.FromResult(adapterMock));
var configuration =
@ -31,7 +32,7 @@ namespace Microsoft.DataTransfer.Core.UnitTests.FactoryAdapters
var factoryAdapter = new DataSourceAdapterFactoryAdapter<ITestAdapterConfiguration>(adapterFactoryMock.Object, TestDisplayName);
var adapter = await factoryAdapter.CreateAsync(configuration, DataTransferContextMock.Instance);
var adapter = await factoryAdapter.CreateAsync(configuration, DataTransferContextMock.Instance, CancellationToken.None);
Assert.AreEqual(TestDisplayName, factoryAdapter.DisplayName, TestResources.InvalidDataAdapter);
@ -49,7 +50,7 @@ namespace Microsoft.DataTransfer.Core.UnitTests.FactoryAdapters
var adapterFactoryMock = new Mock<IDataSinkAdapterFactory<ITestAdapterConfiguration>>();
adapterFactoryMock
.Setup(f => f.CreateAsync(It.IsAny<ITestAdapterConfiguration>(), It.IsAny<IDataTransferContext>()))
.Setup(f => f.CreateAsync(It.IsAny<ITestAdapterConfiguration>(), It.IsAny<IDataTransferContext>(), It.IsAny<CancellationToken>()))
.Returns(() => Task.FromResult(adapterMock));
var configuration =
@ -60,7 +61,7 @@ namespace Microsoft.DataTransfer.Core.UnitTests.FactoryAdapters
var factoryAdapter = new DataSinkAdapterFactoryAdapter<ITestAdapterConfiguration>(adapterFactoryMock.Object, TestDisplayName);
var adapter = await factoryAdapter.CreateAsync(configuration, DataTransferContextMock.Instance);
var adapter = await factoryAdapter.CreateAsync(configuration, DataTransferContextMock.Instance, CancellationToken.None);
Assert.AreEqual(TestDisplayName, factoryAdapter.DisplayName, TestResources.InvalidDataAdapter);

Просмотреть файл

@ -56,9 +56,11 @@ namespace Microsoft.DataTransfer.Core.UnitTests.Service
});
var statistics = new InMemoryTransferStatistics();
statistics.Start();
using (var source = new DataSourceAdapterMock(sourceData))
using (var sink = sinkMock)
await action.ExecuteAsync(source, sink, statistics, CancellationToken.None);
statistics.Stop();
var receivedData = sinkMock.ReceivedData;
@ -94,9 +96,11 @@ namespace Microsoft.DataTransfer.Core.UnitTests.Service
var sinkMock = new DataSinkAdapterMock();
var statistics = new InMemoryTransferStatistics();
statistics.Start();
using (var source = sourceMock)
using (var sink = sinkMock)
await action.ExecuteAsync(source, sink, statistics, CancellationToken.None);
statistics.Stop();
var receivedData = sinkMock.ReceivedData;

Просмотреть файл

@ -43,12 +43,14 @@ namespace Microsoft.DataTransfer.Core.UnitTests.Service
new Dictionary<string, IDataSourceAdapterFactoryAdapter>
{
{ SourceAdapterName, Mocks.Of<IDataSourceAdapterFactoryAdapter>()
.Where(m => m.CreateAsync(It.IsAny<object>(), It.IsAny<IDataTransferContext>()) == Task.FromResult(sourceMock.Object)).First() }
.Where(m => m.CreateAsync(It.IsAny<object>(), It.IsAny<IDataTransferContext>(), It.IsAny<CancellationToken>()) ==
Task.FromResult(sourceMock.Object)).First() }
},
new Dictionary<string, IDataSinkAdapterFactoryAdapter>
{
{ SinkAdapterName, Mocks.Of<IDataSinkAdapterFactoryAdapter>()
.Where(m => m.CreateAsync(It.IsAny<object>(), It.IsAny<IDataTransferContext>()) == Task.FromResult(sinkMock.Object)).First() }
.Where(m => m.CreateAsync(It.IsAny<object>(), It.IsAny<IDataTransferContext>(), It.IsAny<CancellationToken>()) ==
Task.FromResult(sinkMock.Object)).First() }
},
Mocks.Of<IDataTransferAction>()
.Where(a => a.ExecuteAsync(
@ -84,15 +86,15 @@ namespace Microsoft.DataTransfer.Core.UnitTests.Service
var sourceFactoryMock = new Mock<IDataSourceAdapterFactoryAdapter>();
sourceFactoryMock
.Setup(m => m.CreateAsync(It.IsAny<object>(), It.IsAny<IDataTransferContext>()))
.Callback<object, IDataTransferContext>((_, c) =>
.Setup(m => m.CreateAsync(It.IsAny<object>(), It.IsAny<IDataTransferContext>(), It.IsAny<CancellationToken>()))
.Callback<object, IDataTransferContext, CancellationToken>((a, c, ct) =>
Assert.AreEqual(SourceAdapterName, c.SourceName, TestResources.InvalidDataSourceNameInTransferContext))
.Returns(Task.FromResult(sourceMock.Object));
var sinkFactoryMock = new Mock<IDataSinkAdapterFactoryAdapter>();
sinkFactoryMock
.Setup(m => m.CreateAsync(It.IsAny<object>(), It.IsAny<IDataTransferContext>()))
.Callback<object, IDataTransferContext>((_, c) =>
.Setup(m => m.CreateAsync(It.IsAny<object>(), It.IsAny<IDataTransferContext>(), It.IsAny<CancellationToken>()))
.Callback<object, IDataTransferContext, CancellationToken>((a, c, ct) =>
Assert.AreEqual(SinkAdapterName, c.SinkName, TestResources.InvalidDataSinkNameInTransferContext))
.Returns(Task.FromResult(sinkMock.Object));

Просмотреть файл

@ -6,6 +6,8 @@ namespace Microsoft.DataTransfer.Core
{
sealed class Errors : CommonErrors
{
private Errors() { }
public static Exception InvalidDataAdapterConfigrationType(Type expected, Type actual)
{
return new InvalidOperationException(FormatMessage(

Просмотреть файл

@ -4,6 +4,7 @@ using Microsoft.DataTransfer.Extensibility;
using System;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.DataTransfer.Core.FactoryAdapters
@ -35,7 +36,7 @@ namespace Microsoft.DataTransfer.Core.FactoryAdapters
DisplayName = displayName;
ConfigurationType = GetConfigurationType(factory.GetType());
createMethod = factory.GetType().GetMethod("CreateAsync", new[] { ConfigurationType, typeof(IDataTransferContext) });
createMethod = factory.GetType().GetMethod("CreateAsync", new[] { ConfigurationType, typeof(IDataTransferContext), typeof(CancellationToken) });
}
public static Type GetConfigurationType(Type adapterFactoryType)
@ -53,14 +54,14 @@ namespace Microsoft.DataTransfer.Core.FactoryAdapters
return factoryInterface.GetGenericArguments()[0];
}
public Task<TDataAdapter> CreateAsync(object configuration, IDataTransferContext context)
public Task<TDataAdapter> CreateAsync(object configuration, IDataTransferContext context, CancellationToken cancellation)
{
if (configuration != null && !ConfigurationType.IsAssignableFrom(configuration.GetType()))
throw Errors.InvalidDataAdapterConfigrationType(ConfigurationType, configuration.GetType());
try
{
return (Task<TDataAdapter>)createMethod.Invoke(factory, new[] { configuration, context });
return (Task<TDataAdapter>)createMethod.Invoke(factory, new[] { configuration, context, cancellation });
}
catch (TargetInvocationException invocationException)
{

Просмотреть файл

@ -1,11 +1,12 @@
using Microsoft.DataTransfer.Extensibility;
using Microsoft.DataTransfer.ServiceModel.Entities;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.DataTransfer.Core.FactoryAdapters
{
interface IDataAdapterFactoryAdapter<TDataAdapter> : IDataAdapterDefinition
{
Task<TDataAdapter> CreateAsync(object configuration, IDataTransferContext context);
Task<TDataAdapter> CreateAsync(object configuration, IDataTransferContext context, CancellationToken cancellation);
}
}

Просмотреть файл

@ -86,6 +86,10 @@
<Project>{e31f1956-78be-4960-82e9-e1d8a73fa6d5}</Project>
<Name>Microsoft.DataTransfer.Autofac</Name>
</ProjectReference>
<ProjectReference Include="..\..\Shared\Microsoft.DataTransfer.Basics.Files\Microsoft.DataTransfer.Basics.Files.csproj">
<Project>{da182d5c-79f4-4af6-bf15-6e4496353a6a}</Project>
<Name>Microsoft.DataTransfer.Basics.Files</Name>
</ProjectReference>
<ProjectReference Include="..\..\Shared\Microsoft.DataTransfer.Basics\Microsoft.DataTransfer.Basics.csproj">
<Project>{ccd5f3bd-e95e-46b6-8688-394f592c6a2a}</Project>
<Name>Microsoft.DataTransfer.Basics</Name>

Просмотреть файл

@ -16,21 +16,8 @@ namespace Microsoft.DataTransfer.Core.Service
{
Guard.NotNull("source", source);
Guard.NotNull("sink", sink);
Guard.NotNull("statistics", statistics);
try
{
statistics.Start();
await TransferData(source, sink, statistics, cancellation);
}
finally
{
statistics.Stop();
}
}
private static async Task TransferData(IDataSourceAdapter source, IDataSinkAdapter sink, ITransferStatistics statistics, CancellationToken cancellation)
{
var writeTasks = Enumerable
.Range(0, sink.MaxDegreeOfParallelism)
.Select(i => (Task)Task.FromResult<object>(null))

Просмотреть файл

@ -58,15 +58,22 @@ namespace Microsoft.DataTransfer.Core.Service
SinkName = sinkName
};
// Lets start timer now, since factories may take some time as well and we want to capture that
statistics.Start();
try
{
// Lets start timer now, since factories may take some time as well and we want to capture that
statistics.Start();
var source = sourceFactoryAdapter.CreateAsync(sourceConfiguration, context);
var sink = sinkFactoryAdapter.CreateAsync(sinkConfiguration, context);
var source = sourceFactoryAdapter.CreateAsync(sourceConfiguration, context, cancellation);
var sink = sinkFactoryAdapter.CreateAsync(sinkConfiguration, context, cancellation);
using (var sourceInstance = await source)
using (var sinkInstance = await sink)
await transferAction.ExecuteAsync(sourceInstance, sinkInstance, statistics, cancellation);
using (var sourceInstance = await source)
using (var sinkInstance = await sink)
await transferAction.ExecuteAsync(sourceInstance, sinkInstance, statistics, cancellation);
}
finally
{
statistics.Stop();
}
}
}
}

Просмотреть файл

@ -10,8 +10,8 @@ namespace Microsoft.DataTransfer.Core.Statistics
{
private static IReadOnlyCollection<KeyValuePair<string, Exception>> EmptyErrors = new KeyValuePair<string, Exception>[0];
private Stream errorLogStream;
private TextWriter errorLogWriter;
private StreamWriter errorLogStreamWriter;
private TextWriter errorLogSynchronizedWriter;
private int errorsCount;
public override int Failed
@ -19,12 +19,12 @@ namespace Microsoft.DataTransfer.Core.Statistics
get { return errorsCount; }
}
public CsvErrorLogTransferStatistics(Stream errorLogStream)
public CsvErrorLogTransferStatistics(StreamWriter errorLogStreamWriter)
{
Guard.NotNull("errorLogStream", errorLogStream);
Guard.NotNull("errorLogStream", errorLogStreamWriter);
this.errorLogStream = errorLogStream;
errorLogWriter = TextWriter.Synchronized(new StreamWriter(errorLogStream));
this.errorLogStreamWriter = errorLogStreamWriter;
errorLogSynchronizedWriter = TextWriter.Synchronized(errorLogStreamWriter);
}
public override void Start()
@ -35,15 +35,15 @@ namespace Microsoft.DataTransfer.Core.Statistics
public override void Stop()
{
base.Stop();
TrashCan.Throw(ref errorLogWriter);
TrashCan.Throw(ref errorLogStream);
TrashCan.Throw(ref errorLogSynchronizedWriter);
TrashCan.Throw(ref errorLogStreamWriter);
}
public override void AddError(string dataItemId, Exception error)
{
Interlocked.Increment(ref errorsCount);
var writer = errorLogWriter;
var writer = errorLogSynchronizedWriter;
if (writer != null)
{
writer.WriteLine(EscapeValue(dataItemId) + "," + EscapeValue(error.Message));

Просмотреть файл

@ -1,28 +1,23 @@
using Microsoft.DataTransfer.ServiceModel.Statistics;
using Microsoft.DataTransfer.Basics;
using Microsoft.DataTransfer.Basics.Files.Sink;
using Microsoft.DataTransfer.ServiceModel.Statistics;
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.DataTransfer.Core.Statistics
{
sealed class TransferStatisticsFactory : ITransferStatisticsFactory
{
public ITransferStatistics Create(ITransferStatisticsConfiguration configuration)
public async Task<ITransferStatistics> Create(ITransferStatisticsConfiguration configuration, CancellationToken cancellation)
{
Guard.NotNull("configuration", configuration);
return String.IsNullOrEmpty(configuration.ErrorLog)
? (ITransferStatistics)new InMemoryTransferStatistics()
: new CsvErrorLogTransferStatistics(CreateErrorLog(configuration));
}
private static FileStream CreateErrorLog(ITransferStatisticsConfiguration configuration)
{
// Ensure output folder exists
try
{
Directory.CreateDirectory(Path.GetDirectoryName(configuration.ErrorLog));
}
catch { }
return File.Open(configuration.ErrorLog, configuration.OverwriteErrorLog ? FileMode.Create : FileMode.CreateNew);
: new CsvErrorLogTransferStatistics(await SinkStreamProvidersFactory.Create(
configuration.ErrorLog, configuration.OverwriteErrorLog).CreateWriter(cancellation));
}
}
}

Просмотреть файл

@ -0,0 +1,38 @@
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.DataTransfer.Extensibility.Basics.Collections
{
/// <summary>
/// Represents a no-op asynchronous enumerator.
/// </summary>
/// <typeparam name="T">The type of objects to enumerate.</typeparam>
public sealed class EmptyAsyncEnumerator<T> : IAsyncEnumerator<T>
{
/// <summary>
/// Singleton instance of the enumerator.
/// </summary>
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Security", "CA2104:DoNotDeclareReadOnlyMutableReferenceTypes",
Justification = "Immutable singleton instance")]
public static readonly EmptyAsyncEnumerator<T> Instance = new EmptyAsyncEnumerator<T>();
/// <summary>
/// Gets the element in the collection at the current position of the enumerator.
/// </summary>
public T Current { get { return default(T); } }
/// <summary>
/// Always returns false.
/// </summary>
/// <returns>false</returns>
public Task<bool> MoveNextAsync(CancellationToken cancellation)
{
return Task.FromResult(false);
}
/// <summary>
/// Releases all associated resources.
/// </summary>
public void Dispose() { }
}
}

Просмотреть файл

@ -0,0 +1,29 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.DataTransfer.Extensibility.Basics.Collections
{
/// <summary>
/// Supports an asynchronous iteration over a generic collection.
/// </summary>
/// <typeparam name="T">The type of objects to enumerate.</typeparam>
public interface IAsyncEnumerator<out T> : IDisposable
{
/// <summary>
/// Gets the element in the collection at the current position of the enumerator.
/// </summary>
T Current { get; }
/// <summary>
/// Advances the enumerator to the next element of the collection.
/// </summary>
/// <param name="cancellation">Cancellation token.</param>
/// <returns>
/// true if the enumerator was successfully advanced to the next element; false
/// if the enumerator has passed the end of the collection.
/// </returns>
Task<bool> MoveNextAsync(CancellationToken cancellation);
}
}

Просмотреть файл

@ -0,0 +1,28 @@
using System;
using System.IO;
namespace Microsoft.DataTransfer.Extensibility.Basics
{
/// <summary>
/// Provides basic functionality for the data adapter factories.
/// </summary>
public abstract class DataAdapterFactoryBase
{
/// <summary>
/// Picks a value from raw input or the file.
/// </summary>
/// <param name="value">Raw string value.</param>
/// <param name="fileName">Name of the file to read value from.</param>
/// <param name="ambiguousErrorProvider">An <see cref="Exception" /> factory delegate to use when both arguments are set.</param>
/// <returns>Raw input value or file content.</returns>
protected static string StringValueOrFile(string value, string fileName, Func<Exception> ambiguousErrorProvider)
{
var isFileSet = !String.IsNullOrEmpty(fileName);
if (!String.IsNullOrEmpty(value) && isFileSet)
throw ambiguousErrorProvider();
return isFileSet ? File.ReadAllText(fileName) : value;
}
}
}

Просмотреть файл

@ -40,17 +40,15 @@
<Compile Include="..\..\Solution Items\CommonAssemblyInfo.cs">
<Link>Properties\CommonAssemblyInfo.cs</Link>
</Compile>
<Compile Include="Collections\EmptyAsyncEnumerator.cs" />
<Compile Include="Collections\IAsyncEnumerator.cs" />
<Compile Include="DataAdapterFactoryBase.cs" />
<Compile Include="Source\AggregateDataSourceAdapter.cs" />
<Compile Include="Sink\DataSinkAdapterFactoryWrapper.cs" />
<Compile Include="Source\DataSourceAdapterFactoryWrapper.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Source\DictionaryDataItem.cs" />
<Compile Include="Source\NestedDataItem.cs" />
<Compile Include="Source\StreamProviders\ISourceStreamProvider.cs" />
<Compile Include="Source\StreamProviders\LocalFileStreamProvider.cs" />
<Compile Include="Source\StreamProviders\ResponseStreamReader.cs" />
<Compile Include="Source\StreamProviders\SourceStreamProvidersFactory.cs" />
<Compile Include="Source\StreamProviders\WebFileStreamProvider.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\Shared\Microsoft.DataTransfer.Basics\Microsoft.DataTransfer.Basics.csproj">

Просмотреть файл

@ -1,5 +1,4 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following

Просмотреть файл

@ -1,4 +1,5 @@
using System.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.DataTransfer.Extensibility.Basics.Sink
{
@ -35,10 +36,11 @@ namespace Microsoft.DataTransfer.Extensibility.Basics.Sink
/// </summary>
/// <param name="configuration">Data sink adapter configuration.</param>
/// <param name="context">Data transfer operation context.</param>
/// <param name="cancellation">Cancellation token.</param>
/// <returns>Task that represents asynchronous create operation.</returns>
public Task<IDataSinkAdapter> CreateAsync(TConfiguration configuration, IDataTransferContext context)
public Task<IDataSinkAdapter> CreateAsync(TConfiguration configuration, IDataTransferContext context, CancellationToken cancellation)
{
return Factory.CreateAsync(configuration, context);
return Factory.CreateAsync(configuration, context, cancellation);
}
}
}

Просмотреть файл

@ -1,4 +1,5 @@
using System.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.DataTransfer.Extensibility.Basics.Source
{
@ -35,10 +36,11 @@ namespace Microsoft.DataTransfer.Extensibility.Basics.Source
/// </summary>
/// <param name="configuration">Data source adapter configuration.</param>
/// <param name="context">Data transfer operation context.</param>
/// <param name="cancellation">Cancellation token.</param>
/// <returns>Task that represents asynchronous create operation.</returns>
public Task<IDataSourceAdapter> CreateAsync(TConfiguration configuration, IDataTransferContext context)
public Task<IDataSourceAdapter> CreateAsync(TConfiguration configuration, IDataTransferContext context, CancellationToken cancellation)
{
return Factory.CreateAsync(configuration, context);
return Factory.CreateAsync(configuration, context, cancellation);
}
}
}

Просмотреть файл

@ -1,20 +0,0 @@
using System.IO;
using System.Threading.Tasks;
namespace Microsoft.DataTransfer.Extensibility.Basics.Source.StreamProviders
{
sealed class LocalFileStreamProvider : ISourceStreamProvider
{
public string Id { get; private set; }
public LocalFileStreamProvider(string fileName)
{
Id = fileName;
}
public Task<StreamReader> CreateReader()
{
return Task.FromResult(File.OpenText(Id));
}
}
}

Просмотреть файл

@ -1,38 +0,0 @@
using Microsoft.DataTransfer.Basics.IO;
using System;
using System.Collections.Generic;
using System.Text.RegularExpressions;
namespace Microsoft.DataTransfer.Extensibility.Basics.Source.StreamProviders
{
/// <summary>
/// Creates instances of <see cref="ISourceStreamProvider" /> based on the source stream identifier.
/// </summary>
public static class SourceStreamProvidersFactory
{
private readonly static Regex WebAddressRegex = new Regex("^https?://", RegexOptions.IgnoreCase | RegexOptions.Compiled);
/// <summary>
/// Creates new instances of <see cref="ISourceStreamProvider" /> for the specified <paramref name="sourceStreamId" />.
/// </summary>
/// <param name="sourceStreamId">Identifier of the source stream.</param>
/// <returns><see cref="IEnumerable{T}" /> of <see cref="ISourceStreamProvider" /> to read data from the specified source stream.</returns>
public static IEnumerable<ISourceStreamProvider> Create(string sourceStreamId)
{
if (WebAddressRegex.IsMatch(sourceStreamId))
{
yield return new WebFileStreamProvider(sourceStreamId);
}
else
{
foreach (var localFile in DirectoryHelper.EnumerateFiles(TrimUriFormat(sourceStreamId)))
yield return new LocalFileStreamProvider(localFile);
}
}
private static string TrimUriFormat(string localFile)
{
return localFile.StartsWith("file:///", StringComparison.OrdinalIgnoreCase) ? localFile.Substring(8) : localFile;
}
}
}

Просмотреть файл

@ -1,4 +1,5 @@
using System.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.DataTransfer.Extensibility
{
@ -13,7 +14,8 @@ namespace Microsoft.DataTransfer.Extensibility
/// </summary>
/// <param name="configuration">Data sink adapter configuration.</param>
/// <param name="context">Data transfer operation context.</param>
/// <param name="cancellation">Cancellation token.</param>
/// <returns>Task that represents asynchronous create operation.</returns>
Task<IDataSinkAdapter> CreateAsync(TConfiguration configuration, IDataTransferContext context);
Task<IDataSinkAdapter> CreateAsync(TConfiguration configuration, IDataTransferContext context, CancellationToken cancellation);
}
}

Просмотреть файл

@ -1,4 +1,5 @@
using System.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.DataTransfer.Extensibility
{
@ -13,7 +14,8 @@ namespace Microsoft.DataTransfer.Extensibility
/// </summary>
/// <param name="configuration">Data source adapter configuration.</param>
/// <param name="context">Data transfer operation context.</param>
/// <param name="cancellation">Cancellation token.</param>
/// <returns>Task that represents asynchronous create operation.</returns>
Task<IDataSourceAdapter> CreateAsync(TConfiguration configuration, IDataTransferContext context);
Task<IDataSourceAdapter> CreateAsync(TConfiguration configuration, IDataTransferContext context, CancellationToken cancellation);
}
}

Просмотреть файл

@ -1,5 +1,4 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following

Просмотреть файл

@ -1,5 +1,4 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following

Просмотреть файл

@ -1,4 +1,6 @@

using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.DataTransfer.ServiceModel.Statistics
{
/// <summary>
@ -9,7 +11,9 @@ namespace Microsoft.DataTransfer.ServiceModel.Statistics
/// <summary>
/// Creates a new instance of data transfer statistics.
/// </summary>
/// <returns>New instance of data transfer statistics.</returns>
ITransferStatistics Create(ITransferStatisticsConfiguration configuration);
/// <param name="configuration">Configuration for the data transfer statistics.</param>
/// <param name="cancellation">Cancellation token.</param>
/// <returns>Task that represents asynchronous create operation.</returns>
Task<ITransferStatistics> Create(ITransferStatisticsConfiguration configuration, CancellationToken cancellation);
}
}

Просмотреть файл

@ -1,46 +0,0 @@
using Microsoft.DataTransfer.CsvFile.Source;
using Microsoft.DataTransfer.Extensibility;
using Microsoft.DataTransfer.TestsCommon;
using Microsoft.DataTransfer.TestsCommon.Mocks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.DataTransfer.CsvFile.FunctionalTests
{
static class CsvFileHelper
{
public static async Task<List<IDataItem>> ReadCsv(ICsvFileSourceAdapterConfiguration configuration)
{
var records = new List<IDataItem>();
using (var source = await new CsvFileSourceAdapterFactory().CreateAsync(configuration, DataTransferContextMock.Instance))
{
IDataItem record = null;
var readOutput = new ReadOutputByRef();
while (true)
{
try
{
record = await source.ReadNextAsync(readOutput, CancellationToken.None);
}
catch (NonFatalReadException)
{
continue;
}
if (record == null)
break;
records.Add(record);
Assert.IsNotNull(readOutput.DataItemId, CommonTestResources.MissingDataItemId);
readOutput.Wipe();
}
}
return records;
}
}
}

Просмотреть файл

@ -1,14 +1,18 @@
using Microsoft.DataTransfer.CsvFile.Source;
using Microsoft.DataTransfer.Extensibility;
using Microsoft.DataTransfer.TestsCommon;
using Microsoft.DataTransfer.TestsCommon.Mocks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.DataTransfer.CsvFile.FunctionalTests
{
[TestClass]
public class CsvFileSourceAdapterTests
public class CsvFileSourceAdapterTests : DataTransferAdapterTestBase
{
[TestMethod, Timeout(120000)]
[DeploymentItem(@"TestData\BasicTest.csv", @"InputData")]
@ -19,10 +23,10 @@ namespace Microsoft.DataTransfer.CsvFile.FunctionalTests
c.Files == new[] { @"InputData\BasicTest.csv" })
.First();
var records = await CsvFileHelper.ReadCsv(configuration);
var readResults = await ReadData(configuration);
Assert.AreEqual(3, records.Count, TestResources.UnexpectedRecordsProcessed);
Assert.AreEqual(6, records[0].GetFieldNames().Count(), TestResources.UnexpectedFieldsProcessed);
Assert.AreEqual(3, readResults.Count, TestResources.UnexpectedRecordsProcessed);
Assert.AreEqual(6, readResults[0].GetFieldNames().Count(), TestResources.UnexpectedFieldsProcessed);
}
[TestMethod, Timeout(120000)]
@ -34,10 +38,10 @@ namespace Microsoft.DataTransfer.CsvFile.FunctionalTests
c.Files == new[] { @"InputData\MalformedTest.csv" })
.First();
var records = await CsvFileHelper.ReadCsv(configuration);
var readResults = await ReadData(configuration);
Assert.AreEqual(3, records.Count, TestResources.UnexpectedRecordsProcessed);
Assert.AreEqual(6, records[0].GetFieldNames().Count(), TestResources.UnexpectedFieldsProcessed);
Assert.AreEqual(3, readResults.Count, TestResources.UnexpectedRecordsProcessed);
Assert.AreEqual(6, readResults[0].GetFieldNames().Count(), TestResources.UnexpectedFieldsProcessed);
}
[TestMethod, Timeout(120000)]
@ -52,19 +56,28 @@ namespace Microsoft.DataTransfer.CsvFile.FunctionalTests
c.NestingSeparator == ".")
.First();
var records = await CsvFileHelper.ReadCsv(configuration);
var readResults = await ReadData(configuration);
Assert.AreEqual(4, records.Count, TestResources.UnexpectedRecordsProcessed);
Assert.AreEqual(4, readResults.Count, TestResources.UnexpectedRecordsProcessed);
var fields = records[0].GetFieldNames().ToArray();
var fields = readResults[0].GetFieldNames().ToArray();
Assert.AreEqual(6, fields.Count(), TestResources.UnexpectedFieldsProcessed);
CollectionAssert.Contains(fields, NestedDocumentFieldName, TestResources.NestedDocumentFieldNotProcessed);
var nestedDocument = records[0].GetValue(NestedDocumentFieldName) as IDataItem;
var nestedDocument = readResults[0].GetValue(NestedDocumentFieldName) as IDataItem;
Assert.IsNotNull(nestedDocument, TestResources.NestedDocumentFieldNotProcessed);
Assert.AreEqual(2, nestedDocument.GetFieldNames().Count(), TestResources.UnexpectedFieldsProcessed);
}
private async Task<List<IDataItem>> ReadData(ICsvFileSourceAdapterConfiguration configuration)
{
using (var adapter = await new CsvFileSourceAdapterFactory()
.CreateAsync(configuration, DataTransferContextMock.Instance, CancellationToken.None))
{
return await ReadDataAsync(adapter);
}
}
}
}

Просмотреть файл

@ -57,7 +57,6 @@
<Compile Include="..\..\Solution Items\CommonAssemblyInfo.cs">
<Link>Properties\CommonAssemblyInfo.cs</Link>
</Compile>
<Compile Include="CsvFileHelper.cs" />
<Compile Include="CsvFileSourceAdapterTests.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="TestResources.Designer.cs">
@ -67,6 +66,10 @@
</Compile>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\Core\Microsoft.DataTransfer.Extensibility.Basics\Microsoft.DataTransfer.Extensibility.Basics.csproj">
<Project>{83329196-46be-4cd0-b498-74e9ac463ed9}</Project>
<Name>Microsoft.DataTransfer.Extensibility.Basics</Name>
</ProjectReference>
<ProjectReference Include="..\..\Core\Microsoft.DataTransfer.Extensibility\Microsoft.DataTransfer.Extensibility.csproj">
<Project>{acc3b08a-2706-4857-b374-8f6311db0e6f}</Project>
<Name>Microsoft.DataTransfer.Extensibility</Name>
@ -81,6 +84,7 @@
</ProjectReference>
</ItemGroup>
<ItemGroup>
<None Include="app.config" />
<None Include="packages.config" />
<None Include="TestData\BasicTest.csv">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>

Просмотреть файл

@ -1,5 +1,4 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following

Просмотреть файл

@ -0,0 +1,11 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<runtime>
<assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">
<dependentAssembly>
<assemblyIdentity name="Newtonsoft.Json" publicKeyToken="30ad4fe6b2a6aeed" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-6.0.0.0" newVersion="6.0.0.0" />
</dependentAssembly>
</assemblyBinding>
</runtime>
</configuration>

Просмотреть файл

@ -41,7 +41,7 @@ namespace Microsoft.DataTransfer.CsvFile.UnitTests
ReadAndVerify("IntegerValues.csv", new CsvReaderConfiguration(),
new[]
{
new object[] { "Tom", "Jones", (double)10, "buyer@salesforcesample.com" },
new object[] { "Tom", "Jones", (long)10, "buyer@salesforcesample.com" },
new object[] { "Ian", "Dury", "20", "cto@salesforcesample.com" }
});
}

Просмотреть файл

@ -68,6 +68,7 @@
</ProjectReference>
</ItemGroup>
<ItemGroup>
<None Include="app.config" />
<None Include="TestData\EmptySpaceUnquotedValues.csv">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>

Просмотреть файл

@ -0,0 +1,11 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<runtime>
<assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">
<dependentAssembly>
<assemblyIdentity name="Newtonsoft.Json" publicKeyToken="30ad4fe6b2a6aeed" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-6.0.0.0" newVersion="6.0.0.0" />
</dependentAssembly>
</assemblyBinding>
</runtime>
</configuration>

Просмотреть файл

@ -101,6 +101,9 @@
<Generator>MSBuild:Compile</Generator>
</Page>
</ItemGroup>
<ItemGroup>
<None Include="app.config" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.

Просмотреть файл

@ -1,5 +1,4 @@
using System.Reflection;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information

Просмотреть файл

@ -1,13 +1,13 @@
using Microsoft.DataTransfer.Basics.Extensions;
using Microsoft.DataTransfer.CsvFile.Source;
using Microsoft.DataTransfer.WpfHost.Extensibility.Basics;
using Microsoft.DataTransfer.WpfHost.Basics;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Collections.Specialized;
namespace Microsoft.DataTransfer.CsvFile.Wpf.Source
{
sealed class CsvFileSourceAdapterConfiguration : ValidatableConfiguration, ICsvFileSourceAdapterConfiguration
sealed class CsvFileSourceAdapterConfiguration : ValidatableBindableBase, ICsvFileSourceAdapterConfiguration
{
private static readonly string EditableFilesPropertyName =
ObjectExtensions.MemberName<CsvFileSourceAdapterConfiguration>(c => c.EditableFiles);

Просмотреть файл

@ -0,0 +1,11 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<runtime>
<assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">
<dependentAssembly>
<assemblyIdentity name="Newtonsoft.Json" publicKeyToken="30ad4fe6b2a6aeed" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-6.0.0.0" newVersion="6.0.0.0" />
</dependentAssembly>
</assemblyBinding>
</runtime>
</configuration>

Просмотреть файл

@ -6,6 +6,8 @@ namespace Microsoft.DataTransfer.CsvFile
{
sealed class Errors : CommonErrors
{
private Errors() { }
public static Exception UnexpectedCharacter(long row, long position, char character)
{
return new FormatException(FormatMessage(Resources.UnexpectedCharacterFormat, row, position, character));

Просмотреть файл

@ -70,6 +70,10 @@
<Project>{acc3b08a-2706-4857-b374-8f6311db0e6f}</Project>
<Name>Microsoft.DataTransfer.Extensibility</Name>
</ProjectReference>
<ProjectReference Include="..\..\Shared\Microsoft.DataTransfer.Basics.Files\Microsoft.DataTransfer.Basics.Files.csproj">
<Project>{da182d5c-79f4-4af6-bf15-6e4496353a6a}</Project>
<Name>Microsoft.DataTransfer.Basics.Files</Name>
</ProjectReference>
<ProjectReference Include="..\..\Shared\Microsoft.DataTransfer.Basics\Microsoft.DataTransfer.Basics.csproj">
<Project>{ccd5f3bd-e95e-46b6-8688-394f592c6a2a}</Project>
<Name>Microsoft.DataTransfer.Basics</Name>
@ -85,6 +89,9 @@
<LastGenOutput>Resources.Designer.cs</LastGenOutput>
</EmbeddedResource>
</ItemGroup>
<ItemGroup>
<None Include="app.config" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.

Просмотреть файл

@ -125,10 +125,14 @@ namespace Microsoft.DataTransfer.CsvFile.Reader
if (String.IsNullOrEmpty(value))
return null;
double number;
if (double.TryParse(value, out number))
long number;
if (long.TryParse(value, out number))
return number;
double doubleNumber;
if (double.TryParse(value, out doubleNumber))
return doubleNumber;
DateTime dateTime;
if (DateTime.TryParse(value, CultureInfo.InvariantCulture,
DateTimeStyles.AssumeUniversal | DateTimeStyles.AdjustToUniversal, out dateTime))

Просмотреть файл

@ -1,8 +1,8 @@
using Microsoft.DataTransfer.Basics;
using Microsoft.DataTransfer.Basics.Files.Source;
using Microsoft.DataTransfer.CsvFile.Reader;
using Microsoft.DataTransfer.Extensibility;
using Microsoft.DataTransfer.Extensibility.Basics.Source;
using Microsoft.DataTransfer.Extensibility.Basics.Source.StreamProviders;
using System;
using System.Collections.Generic;
using System.Globalization;
@ -36,7 +36,7 @@ namespace Microsoft.DataTransfer.CsvFile.Source
if (reader == null)
{
reader = new CsvReader(
await sourceStreamProvider.CreateReader(),
await sourceStreamProvider.CreateReader(cancellation),
new CsvReaderConfiguration
{
TrimQuoted = configuration.TrimQuoted,

Просмотреть файл

@ -1,8 +1,10 @@
using Microsoft.DataTransfer.Basics;
using Microsoft.DataTransfer.Basics.Files.Source;
using Microsoft.DataTransfer.Extensibility;
using Microsoft.DataTransfer.Extensibility.Basics;
using Microsoft.DataTransfer.Extensibility.Basics.Source;
using Microsoft.DataTransfer.Extensibility.Basics.Source.StreamProviders;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.DataTransfer.CsvFile.Source
@ -10,7 +12,7 @@ namespace Microsoft.DataTransfer.CsvFile.Source
/// <summary>
/// Provides data source adapters capable of reading data from one or more CSV files.
/// </summary>
public sealed class CsvFileSourceAdapterFactory : IDataSourceAdapterFactory<ICsvFileSourceAdapterConfiguration>
public sealed class CsvFileSourceAdapterFactory : DataAdapterFactoryBase, IDataSourceAdapterFactory<ICsvFileSourceAdapterConfiguration>
{
/// <summary>
/// Gets the description of the data adapter.
@ -25,10 +27,11 @@ namespace Microsoft.DataTransfer.CsvFile.Source
/// </summary>
/// <param name="configuration">Data source adapter configuration.</param>
/// <param name="context">Data transfer operation context.</param>
/// <param name="cancellation">Cancellation token.</param>
/// <returns>Task that represents asynchronous create operation.</returns>
public Task<IDataSourceAdapter> CreateAsync(ICsvFileSourceAdapterConfiguration configuration, IDataTransferContext context)
public Task<IDataSourceAdapter> CreateAsync(ICsvFileSourceAdapterConfiguration configuration, IDataTransferContext context, CancellationToken cancellation)
{
return Task.Factory.StartNew(() => Create(configuration));
return Task.Factory.StartNew(() => Create(configuration), cancellation);
}
private IDataSourceAdapter Create(ICsvFileSourceAdapterConfiguration configuration)

Просмотреть файл

@ -0,0 +1,11 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<runtime>
<assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">
<dependentAssembly>
<assemblyIdentity name="Newtonsoft.Json" publicKeyToken="30ad4fe6b2a6aeed" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-6.0.0.0" newVersion="6.0.0.0" />
</dependentAssembly>
</assemblyBinding>
</runtime>
</configuration>

Просмотреть файл

@ -130,6 +130,24 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.DataTransfer.Rave
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.DataTransfer.CsvFile.UnitTests", "CsvFile\Microsoft.DataTransfer.CsvFile.UnitTests\Microsoft.DataTransfer.CsvFile.UnitTests.csproj", "{F1D74F07-96B2-4150-830F-4DFB2E3A1707}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "DynamoDb", "DynamoDb", "{2885C0CB-3878-4FB8-BA4B-F7DEAA660366}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.DataTransfer.DynamoDb", "DynamoDb\Microsoft.DataTransfer.DynamoDb\Microsoft.DataTransfer.DynamoDb.csproj", "{AF2A444F-CABD-45BE-9B9E-9D53501B2041}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.DataTransfer.DynamoDb.FunctionalTests", "DynamoDb\Microsoft.DataTransfer.DynamoDb.FunctionalTests\Microsoft.DataTransfer.DynamoDb.FunctionalTests.csproj", "{B301926E-9833-483D-AF09-E4006FC6E663}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.DataTransfer.DynamoDb.Wpf", "DynamoDb\Microsoft.DataTransfer.DynamoDb.Wpf\Microsoft.DataTransfer.DynamoDb.Wpf.csproj", "{6FD86D77-5F97-45EE-ACA9-4E826E288D22}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "HBase", "HBase", "{1BD0D669-8E45-4E7C-A20F-707A1887E8ED}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.DataTransfer.HBase", "HBase\Microsoft.DataTransfer.HBase\Microsoft.DataTransfer.HBase.csproj", "{AD24DB36-8623-4E11-9995-A6EB5EF50A38}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.DataTransfer.HBase.Wpf", "HBase\Microsoft.DataTransfer.HBase.Wpf\Microsoft.DataTransfer.HBase.Wpf.csproj", "{98706C69-8F15-4D97-9DD9-D7FD6F5A038F}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.DataTransfer.HBase.FunctionalTests", "HBase\Microsoft.DataTransfer.HBase.FunctionalTests\Microsoft.DataTransfer.HBase.FunctionalTests.csproj", "{6544F78F-4EE3-489E-87B7-5FCA9C4D50BD}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.DataTransfer.Basics.Files", "Shared\Microsoft.DataTransfer.Basics.Files\Microsoft.DataTransfer.Basics.Files.csproj", "{DA182D5C-79F4-4AF6-BF15-6E4496353A6A}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -294,6 +312,34 @@ Global
{F1D74F07-96B2-4150-830F-4DFB2E3A1707}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F1D74F07-96B2-4150-830F-4DFB2E3A1707}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F1D74F07-96B2-4150-830F-4DFB2E3A1707}.Release|Any CPU.Build.0 = Release|Any CPU
{AF2A444F-CABD-45BE-9B9E-9D53501B2041}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{AF2A444F-CABD-45BE-9B9E-9D53501B2041}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AF2A444F-CABD-45BE-9B9E-9D53501B2041}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AF2A444F-CABD-45BE-9B9E-9D53501B2041}.Release|Any CPU.Build.0 = Release|Any CPU
{B301926E-9833-483D-AF09-E4006FC6E663}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B301926E-9833-483D-AF09-E4006FC6E663}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B301926E-9833-483D-AF09-E4006FC6E663}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B301926E-9833-483D-AF09-E4006FC6E663}.Release|Any CPU.Build.0 = Release|Any CPU
{6FD86D77-5F97-45EE-ACA9-4E826E288D22}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6FD86D77-5F97-45EE-ACA9-4E826E288D22}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6FD86D77-5F97-45EE-ACA9-4E826E288D22}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6FD86D77-5F97-45EE-ACA9-4E826E288D22}.Release|Any CPU.Build.0 = Release|Any CPU
{AD24DB36-8623-4E11-9995-A6EB5EF50A38}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{AD24DB36-8623-4E11-9995-A6EB5EF50A38}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AD24DB36-8623-4E11-9995-A6EB5EF50A38}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AD24DB36-8623-4E11-9995-A6EB5EF50A38}.Release|Any CPU.Build.0 = Release|Any CPU
{98706C69-8F15-4D97-9DD9-D7FD6F5A038F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{98706C69-8F15-4D97-9DD9-D7FD6F5A038F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{98706C69-8F15-4D97-9DD9-D7FD6F5A038F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{98706C69-8F15-4D97-9DD9-D7FD6F5A038F}.Release|Any CPU.Build.0 = Release|Any CPU
{6544F78F-4EE3-489E-87B7-5FCA9C4D50BD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6544F78F-4EE3-489E-87B7-5FCA9C4D50BD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6544F78F-4EE3-489E-87B7-5FCA9C4D50BD}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6544F78F-4EE3-489E-87B7-5FCA9C4D50BD}.Release|Any CPU.Build.0 = Release|Any CPU
{DA182D5C-79F4-4AF6-BF15-6E4496353A6A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DA182D5C-79F4-4AF6-BF15-6E4496353A6A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DA182D5C-79F4-4AF6-BF15-6E4496353A6A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DA182D5C-79F4-4AF6-BF15-6E4496353A6A}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -341,6 +387,13 @@ Global
{6E25967D-5286-47DB-AB9F-82AA7416D344} = {0D63DA64-9090-42C1-A066-172C2700F1D4}
{29968BB5-BC00-4976-8B78-1B0244654B5D} = {0D63DA64-9090-42C1-A066-172C2700F1D4}
{F1D74F07-96B2-4150-830F-4DFB2E3A1707} = {8CBD6933-1EC6-48A3-B483-D1BAF83BEF42}
{AF2A444F-CABD-45BE-9B9E-9D53501B2041} = {2885C0CB-3878-4FB8-BA4B-F7DEAA660366}
{B301926E-9833-483D-AF09-E4006FC6E663} = {2885C0CB-3878-4FB8-BA4B-F7DEAA660366}
{6FD86D77-5F97-45EE-ACA9-4E826E288D22} = {2885C0CB-3878-4FB8-BA4B-F7DEAA660366}
{AD24DB36-8623-4E11-9995-A6EB5EF50A38} = {1BD0D669-8E45-4E7C-A20F-707A1887E8ED}
{98706C69-8F15-4D97-9DD9-D7FD6F5A038F} = {1BD0D669-8E45-4E7C-A20F-707A1887E8ED}
{6544F78F-4EE3-489E-87B7-5FCA9C4D50BD} = {1BD0D669-8E45-4E7C-A20F-707A1887E8ED}
{DA182D5C-79F4-4AF6-BF15-6E4496353A6A} = {F9CAC1F5-436E-4406-BACC-FC18C8FE36C5}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
EnterpriseLibraryConfigurationToolBinariesPathV6 = packages\EnterpriseLibrary.TransientFaultHandling.6.0.1304.0\lib\portable-net45+win+wp8;packages\EnterpriseLibrary.TransientFaultHandling.Data.6.0.1304.1\lib\NET45

Просмотреть файл

@ -5,7 +5,7 @@ using System;
namespace Microsoft.DataTransfer.DocumentDb.FunctionalTests
{
[TestClass]
public abstract class DocumentDbAdapterTestBase : DataTransferSinkTestBase
public abstract class DocumentDbAdapterTestBase : DataTransferAdapterTestBase
{
protected string ConnectionString { get; private set; }

Просмотреть файл

@ -7,6 +7,7 @@ using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.DataTransfer.DocumentDb.FunctionalTests
@ -33,7 +34,7 @@ namespace Microsoft.DataTransfer.DocumentDb.FunctionalTests
var sampleData = SampleData.GetSimpleDataItems(NumberOfItems);
using (var adapter = await new DocumentDbBulkSinkAdapterFactory()
.CreateAsync(configuration, DataTransferContextMock.Instance))
.CreateAsync(configuration, DataTransferContextMock.Instance, CancellationToken.None))
{
await WriteDataAsync(adapter, sampleData);
}
@ -59,7 +60,7 @@ namespace Microsoft.DataTransfer.DocumentDb.FunctionalTests
var sampleData = SampleData.GetSimpleDataItems(NumberOfItems);
using (var adapter = await new DocumentDbBulkSinkAdapterFactory()
.CreateAsync(configuration, DataTransferContextMock.Instance))
.CreateAsync(configuration, DataTransferContextMock.Instance, CancellationToken.None))
{
await WriteDataAsync(adapter, sampleData);
}
@ -92,7 +93,7 @@ namespace Microsoft.DataTransfer.DocumentDb.FunctionalTests
var sampleData = SampleData.GetSimpleDataItems(NumberOfItems);
using (var adapter = await new DocumentDbBulkSinkAdapterFactory()
.CreateAsync(configuration, DataTransferContextMock.Instance))
.CreateAsync(configuration, DataTransferContextMock.Instance, CancellationToken.None))
{
await WriteDataAsync(adapter, sampleData);
}
@ -125,7 +126,7 @@ namespace Microsoft.DataTransfer.DocumentDb.FunctionalTests
var sampleData = SampleData.GetSimpleDataItems(NumberOfItems);
using (var adapter = await new DocumentDbBulkSinkAdapterFactory()
.CreateAsync(configuration, DataTransferContextMock.Instance))
.CreateAsync(configuration, DataTransferContextMock.Instance, CancellationToken.None))
{
await WriteDataAsync(adapter, sampleData);
}

Просмотреть файл

@ -2,7 +2,6 @@
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.Documents.Client.TransientFaultHandling;
using Microsoft.DataTransfer.DocumentDb.Client;
using Microsoft.DataTransfer.Extensibility;
using Microsoft.Practices.EnterpriseLibrary.TransientFaultHandling;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
@ -28,7 +27,8 @@ namespace Microsoft.DataTransfer.DocumentDb.FunctionalTests
}
}
public static IEnumerable<IReadOnlyDictionary<string, object>> ReadDocuments(string connectionString, string collectionName)
public static IEnumerable<IReadOnlyDictionary<string, object>> ReadDocuments(string connectionString, string collectionName,
string query = "SELECT * FROM c")
{
var connectionSettings = DocumentDbConnectionStringBuilder.Parse(connectionString);
@ -51,7 +51,7 @@ namespace Microsoft.DataTransfer.DocumentDb.FunctionalTests
Assert.IsNotNull(collection, "Document collection does not exist.");
return client
.CreateDocumentQuery<Dictionary<string, object>>(collection.DocumentsLink)
.CreateDocumentQuery<Dictionary<string, object>>(collection.DocumentsLink, query)
.ToArray();
}
}

Просмотреть файл

@ -3,6 +3,7 @@ using Microsoft.DataTransfer.TestsCommon.Mocks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.DataTransfer.DocumentDb.FunctionalTests
@ -28,7 +29,7 @@ namespace Microsoft.DataTransfer.DocumentDb.FunctionalTests
var sampleData = SampleData.GetSimpleDataItems(NumberOfItems);
using (var adapter = await new DocumentDbParallelSinkAdapterFactory()
.CreateAsync(configuration, DataTransferContextMock.Instance))
.CreateAsync(configuration, DataTransferContextMock.Instance, CancellationToken.None))
{
await WriteDataAsync(adapter, sampleData);
}
@ -36,6 +37,36 @@ namespace Microsoft.DataTransfer.DocumentDb.FunctionalTests
VerifyData(sampleData, DocumentDbHelper.ReadDocuments(ConnectionString, "Data"));
}
[TestMethod, Timeout(300000)]
[DeploymentItem(@"IndexingPolicies\IntegerPropertyRangeIndex.json", "IndexingPolicies")]
public async Task WriteSampleData_RangeIndexOnIntegerProperty_IntegerRangeFilterCanBeUsed()
{
const string CollectionName = "Data";
const int NumberOfItems = 42;
var configuration =
Mocks
.Of<IDocumentDbParallelSinkAdapterConfiguration>(m =>
m.ConnectionString == ConnectionString &&
m.Collection == new[] { CollectionName } &&
m.IndexingPolicyFile == @"IndexingPolicies\IntegerPropertyRangeIndex.json" &&
m.ParallelRequests == 1 &&
m.Retries == 100)
.First();
var sampleData = SampleData.GetSimpleDataItems(NumberOfItems);
using (var adapter = await new DocumentDbParallelSinkAdapterFactory()
.CreateAsync(configuration, DataTransferContextMock.Instance, CancellationToken.None))
{
await WriteDataAsync(adapter, sampleData);
}
VerifyData(
sampleData.Where(i => (int)i.GetValue("IntegerProperty") < 20).ToArray(),
DocumentDbHelper.ReadDocuments(ConnectionString, "Data", "SELECT * FROM c WHERE c.IntegerProperty < 20"));
}
[TestMethod, Timeout(300000)]
public async Task WriteSampleData_RandomPartitioningAcrossTwoCollections_AllDataStored()
{
@ -53,7 +84,7 @@ namespace Microsoft.DataTransfer.DocumentDb.FunctionalTests
var sampleData = SampleData.GetSimpleDataItems(NumberOfItems);
using (var adapter = await new DocumentDbParallelSinkAdapterFactory()
.CreateAsync(configuration, DataTransferContextMock.Instance))
.CreateAsync(configuration, DataTransferContextMock.Instance, CancellationToken.None))
{
await WriteDataAsync(adapter, sampleData);
}
@ -85,7 +116,7 @@ namespace Microsoft.DataTransfer.DocumentDb.FunctionalTests
var sampleData = SampleData.GetSimpleDataItems(NumberOfItems);
using (var adapter = await new DocumentDbParallelSinkAdapterFactory()
.CreateAsync(configuration, DataTransferContextMock.Instance))
.CreateAsync(configuration, DataTransferContextMock.Instance, CancellationToken.None))
{
await WriteDataAsync(adapter, sampleData);
}

Просмотреть файл

@ -5,6 +5,7 @@ using Moq;
using System;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.DataTransfer.DocumentDb.FunctionalTests
@ -37,7 +38,7 @@ namespace Microsoft.DataTransfer.DocumentDb.FunctionalTests
try
{
using (var adapter = await new DocumentDbSourceAdapterFactory()
.CreateAsync(configuration, DataTransferContextMock.Instance))
.CreateAsync(configuration, DataTransferContextMock.Instance, CancellationToken.None))
{
Assert.Fail(TestResources.AmbiguousQueryDidNotFail);
}

Просмотреть файл

@ -4,9 +4,7 @@ using Microsoft.DataTransfer.TestsCommon;
using Microsoft.DataTransfer.TestsCommon.Mocks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@ -40,19 +38,11 @@ namespace Microsoft.DataTransfer.DocumentDb.FunctionalTests
c.InternalFields == false)
.First();
var readResults = new List<IDataItem>();
List<IDataItem> readResults;
using (var adapter = await new DocumentDbSourceAdapterFactory()
.CreateAsync(configuration, DataTransferContextMock.Instance))
.CreateAsync(configuration, DataTransferContextMock.Instance, CancellationToken.None))
{
IDataItem dataItem;
var readOutput = new ReadOutputByRef();
while ((dataItem = await adapter.ReadNextAsync(readOutput, CancellationToken.None)) != null)
{
readResults.Add(dataItem);
Assert.IsNotNull(readOutput.DataItemId, CommonTestResources.MissingDataItemId);
readOutput.Wipe();
}
readResults = await ReadDataAsync(adapter);
}
DataItemCollectionAssert.AreEquivalent(sampleData, readResults, TestResources.InvalidDocumentsRead);

Просмотреть файл

@ -0,0 +1,32 @@
{
"indexingMode": "consistent",
"automatic": true,
"includedPaths": [
{
"path": "/*",
"indexes": [
{
"kind": "Range",
"dataType": "Number",
"precision": -1
},
{
"kind": "Hash",
"dataType": "String",
"precision": 3
}
]
},
{
"path": "/IntegerProperty/?",
"indexes": [
{
"kind": "Range",
"dataType": "Number",
"precision": -1
}
]
}
],
"excludedPaths": []
}

Просмотреть файл

@ -36,9 +36,9 @@
<CodeAnalysisRuleSet>ManagedMinimumRules.ruleset</CodeAnalysisRuleSet>
</PropertyGroup>
<ItemGroup>
<Reference Include="Microsoft.Azure.Documents.Client, Version=1.1.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\..\packages\Microsoft.Azure.DocumentDB.1.1.0\lib\net40\Microsoft.Azure.Documents.Client.dll</HintPath>
<Reference Include="Microsoft.Azure.Documents.Client, Version=1.2.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\..\packages\Microsoft.Azure.DocumentDB.1.2.0\lib\net40\Microsoft.Azure.Documents.Client.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Microsoft.Azure.Documents.Client.TransientFaultHandling, Version=1.1.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
@ -108,6 +108,9 @@
</ItemGroup>
<ItemGroup>
<None Include="app.config" />
<None Include="IndexingPolicies\IntegerPropertyRangeIndex.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Include="packages.config" />
</ItemGroup>
<ItemGroup>

Просмотреть файл

@ -1,5 +1,4 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following

Просмотреть файл

@ -6,6 +6,10 @@
<assemblyIdentity name="Newtonsoft.Json" publicKeyToken="30ad4fe6b2a6aeed" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-6.0.0.0" newVersion="6.0.0.0" />
</dependentAssembly>
<dependentAssembly>
<assemblyIdentity name="Microsoft.Azure.Documents.Client" publicKeyToken="31bf3856ad364e35" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-1.2.0.0" newVersion="1.2.0.0" />
</dependentAssembly>
</assemblyBinding>
</runtime>
</configuration>

Просмотреть файл

@ -1,7 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="EnterpriseLibrary.TransientFaultHandling" version="6.0.1304.0" targetFramework="net45" />
<package id="Microsoft.Azure.DocumentDB" version="1.1.0" targetFramework="net45" />
<package id="Microsoft.Azure.DocumentDB" version="1.2.0" targetFramework="net45" />
<package id="Microsoft.Azure.DocumentDB.TransientFaultHandling" version="1.1.0" targetFramework="net45" />
<package id="Moq" version="4.2.1409.1722" targetFramework="net45" />
<package id="Newtonsoft.Json" version="6.0.5" targetFramework="net45" />

Просмотреть файл

@ -36,6 +36,10 @@
<CodeAnalysisRuleSet>ManagedMinimumRules.ruleset</CodeAnalysisRuleSet>
</PropertyGroup>
<ItemGroup>
<Reference Include="Microsoft.Azure.Documents.Client, Version=1.2.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\..\packages\Microsoft.Azure.DocumentDB.1.2.0\lib\net40\Microsoft.Azure.Documents.Client.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Moq">
<HintPath>..\..\packages\Moq.4.2.1409.1722\lib\net40\Moq.dll</HintPath>
</Reference>

Просмотреть файл

@ -1,5 +1,4 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following

Просмотреть файл

@ -9,7 +9,7 @@ using System.Threading.Tasks;
namespace Microsoft.DataTransfer.DocumentDb.UnitTests.Sink
{
[TestClass]
public class DocumentDbParallelSinkAdapterTests : DataTransferSinkTestBase
public class DocumentDbParallelSinkAdapterTests : DataTransferAdapterTestBase
{
[TestMethod]
public async Task WriteSampleData_AllDataStored()

Просмотреть файл

@ -1,4 +1,5 @@
using Microsoft.DataTransfer.Basics.Threading;
using Microsoft.Azure.Documents;
using Microsoft.DataTransfer.Basics.Threading;
using Microsoft.DataTransfer.DocumentDb.Client;
using Microsoft.DataTransfer.DocumentDb.Sink;
using Microsoft.VisualStudio.TestTools.UnitTesting;
@ -32,7 +33,7 @@ namespace Microsoft.DataTransfer.DocumentDb.UnitTests.Sink
deletedStoredProcedures = new HashSet<string>();
}
public Task<string> GetOrCreateCollectionAsync(string collectionName, CollectionPricingTier collectionTier)
public Task<string> GetOrCreateCollectionAsync(string collectionName, CollectionPricingTier collectionTier, IndexingPolicy indexingPolicy)
{
Assert.IsFalse(String.IsNullOrEmpty(collectionName), TestResources.MissingCollectionNameInGetOrCreateCollection);

Просмотреть файл

@ -1,6 +1,7 @@
using Microsoft.DataTransfer.Basics;
using Microsoft.DataTransfer.DocumentDb.Client.Enumeration;
using Microsoft.DataTransfer.Extensibility.Basics.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.DataTransfer.DocumentDb.UnitTests.Source
@ -19,7 +20,7 @@ namespace Microsoft.DataTransfer.DocumentDb.UnitTests.Source
this.enumerator = enumerator;
}
public Task<bool> MoveNextAsync()
public Task<bool> MoveNextAsync(CancellationToken cancellation)
{
return Task.FromResult(enumerator.MoveNext());
}

Просмотреть файл

@ -3,6 +3,7 @@ using Microsoft.DataTransfer.DocumentDb.Client.Enumeration;
using Microsoft.DataTransfer.DocumentDb.Source;
using Microsoft.DataTransfer.DocumentDb.Transformation;
using Microsoft.DataTransfer.Extensibility;
using Microsoft.DataTransfer.Extensibility.Basics.Collections;
using Microsoft.DataTransfer.TestsCommon;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
@ -29,7 +30,7 @@ namespace Microsoft.DataTransfer.DocumentDb.UnitTests.Source
var client = Mocks
.Of<IDocumentDbReadClient>()
.Where(m =>
m.QueryDocumentsAsync(TestCollectionName, TestQuery) ==
m.QueryDocumentsAsync(TestCollectionName, TestQuery, CancellationToken.None) ==
Task.FromResult<IAsyncEnumerator<IReadOnlyDictionary<string, object>>>(
new AsyncEnumeratorMock<Dictionary<string, object>>(sampleData.OfType<Dictionary<string, object>>().GetEnumerator())))
.First();
@ -44,7 +45,7 @@ namespace Microsoft.DataTransfer.DocumentDb.UnitTests.Source
var readResults = new List<IDataItem>();
using (var adapter = new DocumentDbSourceAdapter(client, PassThroughTransformation.Instance, configuration))
{
await adapter.InitializeAsync();
await adapter.InitializeAsync(CancellationToken.None);
IDataItem dataItem;
while ((dataItem = await adapter.ReadNextAsync(ReadOutputByRef.None, CancellationToken.None)) != null)

Просмотреть файл

@ -6,6 +6,10 @@
<assemblyIdentity name="Newtonsoft.Json" publicKeyToken="30ad4fe6b2a6aeed" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-6.0.0.0" newVersion="6.0.0.0" />
</dependentAssembly>
<dependentAssembly>
<assemblyIdentity name="Microsoft.Azure.Documents.Client" publicKeyToken="31bf3856ad364e35" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-1.2.0.0" newVersion="1.2.0.0" />
</dependentAssembly>
</assemblyBinding>
</runtime>
</configuration>

Просмотреть файл

@ -1,5 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Microsoft.Azure.DocumentDB" version="1.2.0" targetFramework="net45" />
<package id="Moq" version="4.2.1409.1722" targetFramework="net45" />
<package id="Newtonsoft.Json" version="6.0.5" targetFramework="net45" />
</packages>

Просмотреть файл

@ -1,5 +1,4 @@
using System.Reflection;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information

Просмотреть файл

@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
// <auto-generated>
// This code was generated by a tool.
// Runtime Version:4.0.30319.18444
// Runtime Version:4.0.30319.34209
//
// Changes to this file may cause incorrect behavior and will be lost if
// the code is regenerated.
@ -105,15 +105,6 @@ namespace Microsoft.DataTransfer.DocumentDb.Wpf {
}
}
/// <summary>
/// Looks up a localized string similar to Value should be greater than zero.
/// </summary>
internal static string PositiveNumberRequired {
get {
return ResourceManager.GetString("PositiveNumberRequired", resourceCulture);
}
}
/// <summary>
/// Looks up a localized string similar to Verify Connection.
/// </summary>

Просмотреть файл

@ -132,9 +132,6 @@
<data name="InvalidRetryInterval" xml:space="preserve">
<value>Non-negative retry interval should be provided</value>
</data>
<data name="PositiveNumberRequired" xml:space="preserve">
<value>Value should be greater than zero</value>
</data>
<data name="TestConnectionResultTitle" xml:space="preserve">
<value>Verify Connection</value>
</data>

Просмотреть файл

@ -1,12 +1,12 @@
using Microsoft.DataTransfer.Basics.Extensions;
using Microsoft.DataTransfer.DocumentDb.Shared;
using Microsoft.DataTransfer.WpfHost.Extensibility.Basics;
using Microsoft.DataTransfer.WpfHost.Basics;
using System;
using System.Collections.Generic;
namespace Microsoft.DataTransfer.DocumentDb.Wpf.Shared
{
abstract class DocumentDbAdapterConfiguration : ValidatableConfiguration, IDocumentDbAdapterConfiguration
abstract class DocumentDbAdapterConfiguration : ValidatableBindableBase, IDocumentDbAdapterConfiguration
{
public static readonly string ConnectionStringPropertyName =
ObjectExtensions.MemberName<IDocumentDbAdapterConfiguration>(c => c.ConnectionString);

Просмотреть файл

@ -6,7 +6,7 @@
xmlns:local="clr-namespace:Microsoft.DataTransfer.DocumentDb.Wpf.Sink"
xmlns:converters="clr-namespace:Microsoft.DataTransfer.WpfHost.Basics.ValueConverters;assembly=Microsoft.DataTransfer.WpfHost.Basics"
mc:Ignorable="d" x:ClassModifier="internal"
d:DesignHeight="300" d:DesignWidth="300">
d:DesignHeight="360" d:DesignWidth="300">
<UserControl.Resources>
<ResourceDictionary>
@ -15,6 +15,9 @@
</ResourceDictionary.MergedDictionaries>
<local:CollectionPricingTierValueConverter x:Key="CollectionTierConverter" />
<converters:BooleanToVisibilityValueConverter x:Key="IsVisible" />
<converters:BooleanToVisibilityValueConverter x:Key="IsInvisible" True="Collapsed" False="Visible" />
<converters:BooleanToStringValueConverter x:Key="YesNoConverter" />
</ResourceDictionary>
</UserControl.Resources>
@ -38,6 +41,7 @@
<RowDefinition />
<RowDefinition />
<RowDefinition />
<RowDefinition />
</Grid.RowDefinitions>
<Label Grid.Row="0" Grid.Column="0" Content="{DynamicResource ConnectionStringSummaryKey}" />
@ -88,18 +92,26 @@
<TextBlock Text="{Binding Dates}" />
</Label>
<Label Grid.Row="10" Grid.Column="0" Content="{DynamicResource RetriesSummaryKey}" />
<Label Grid.Row="10" Grid.Column="0" Content="{DynamicResource IndexingPolicySummaryKey}" />
<Label Grid.Row="10" Grid.Column="1">
<Grid>
<TextBlock Text="{Binding IndexingPolicy}" Visibility="{Binding UseIndexingPolicyFile, Converter={StaticResource IsInvisible}}" />
<TextBlock Text="{Binding IndexingPolicyFile}" Visibility="{Binding UseIndexingPolicyFile, Converter={StaticResource IsVisible}}" />
</Grid>
</Label>
<Label Grid.Row="11" Grid.Column="0" Content="{DynamicResource RetriesSummaryKey}" />
<Label Grid.Row="11" Grid.Column="1">
<TextBlock Text="{Binding Retries}" />
</Label>
<Label Grid.Row="11" Grid.Column="0" Content="{DynamicResource RetryIntervalSummaryKey}" />
<Label Grid.Row="11" Grid.Column="1">
<Label Grid.Row="12" Grid.Column="0" Content="{DynamicResource RetryIntervalSummaryKey}" />
<Label Grid.Row="12" Grid.Column="1">
<TextBlock Text="{Binding RetryInterval}" />
</Label>
<Label Grid.Row="12" Grid.Column="0" Content="{DynamicResource ConnectionModeSummaryKey}" />
<Label Grid.Row="12" Grid.Column="1">
<Label Grid.Row="13" Grid.Column="0" Content="{DynamicResource ConnectionModeSummaryKey}" />
<Label Grid.Row="13" Grid.Column="1">
<TextBlock Text="{Binding ConnectionMode}" />
</Label>
</Grid>

Просмотреть файл

@ -5,10 +5,11 @@
xmlns:d="http://schemas.microsoft.com/expression/blend/2008"
xmlns:system="clr-namespace:System;assembly=mscorlib"
xmlns:sink="clr-namespace:Microsoft.DataTransfer.DocumentDb.Sink;assembly=Microsoft.DataTransfer.DocumentDb"
xmlns:bcmd="clr-namespace:Microsoft.DataTransfer.WpfHost.Basics.Commands;assembly=Microsoft.DataTransfer.WpfHost.Basics"
xmlns:bc="clr-namespace:Microsoft.DataTransfer.WpfHost.Basics.Controls;assembly=Microsoft.DataTransfer.WpfHost.Basics"
xmlns:shared="clr-namespace:Microsoft.DataTransfer.DocumentDb.Wpf.Shared"
mc:Ignorable="d" x:ClassModifier="internal"
d:DesignHeight="300" d:DesignWidth="300">
d:DesignHeight="400" d:DesignWidth="300">
<UserControl.Resources>
<ResourceDictionary>
@ -16,6 +17,13 @@
<ResourceDictionary Source="..\XamlResources.xaml"/>
</ResourceDictionary.MergedDictionaries>
<Style x:Key="IndexingPolicyEditor" TargetType="bc:StringOrFileConfigurationControl" BasedOn="{StaticResource OptionElement}">
<Setter Property="Height" Value="150" />
</Style>
<ObjectDataProvider x:Key="PasteToFocusedTextBox" ObjectType="{x:Type bcmd:PasteToFocusedTextBoxCommand}" />
<ObjectDataProvider x:Key="ReplaceIndentedTextInFocusedTextBox" ObjectType="{x:Type bcmd:ReplaceIndentedTextInFocusedTextBoxCommand}" />
<ObjectDataProvider x:Key="DateTimeHandlingValues" ObjectType="{x:Type system:Enum}" MethodName="GetValues">
<ObjectDataProvider.MethodParameters>
<x:Type TypeName="sink:DateTimeHandling" />
@ -33,7 +41,28 @@
<Label Content="{DynamicResource DatesHandlingKey}" />
<ComboBox ItemsSource="{Binding Source={StaticResource DateTimeHandlingValues}}" SelectedValue="{Binding Dates}" />
</StackPanel>
<bc:StringOrFileConfigurationControl Style="{StaticResource IndexingPolicyEditor}"
UseFile="{Binding UseIndexingPolicyFile}"
StringValueLabel="{DynamicResource EnterIndexingPolicySelectionKey}" StringValue="{Binding IndexingPolicy}"
FileNameLabel="{DynamicResource IndexingPolicyFileSelectionKey}" FileName="{Binding IndexingPolicyFile}"
FileFilter="{DynamicResource IndexingPolicyFileFilterKey}" FileDefaultExtension="{DynamicResource IndexingPolicyFileDefaultExtensionKey}">
<bc:StringOrFileConfigurationControl.StringEditorContextMenu>
<ContextMenu>
<MenuItem Header="{DynamicResource IndexingPolicyDefaultTemplateMenuHeader}"
Command="{Binding Source={StaticResource PasteToFocusedTextBox}}" CommandParameter="{DynamicResource IndexingPolicyDefaultTemplate}" />
<MenuItem Header="{DynamicResource IndexingPolicyAllHashTemplateMenuHeader}"
Command="{Binding Source={StaticResource PasteToFocusedTextBox}}" CommandParameter="{DynamicResource IndexingPolicyAllHashTemplate}" />
<MenuItem Header="{DynamicResource IndexingPolicyAllRangeTemplateMenuHeader}"
Command="{Binding Source={StaticResource PasteToFocusedTextBox}}" CommandParameter="{DynamicResource IndexingPolicyAllRangeTemplate}" />
<Separator />
<MenuItem Command="ApplicationCommands.Copy" />
<MenuItem Command="ApplicationCommands.Cut" />
<MenuItem Command="ApplicationCommands.Paste" />
</ContextMenu>
</bc:StringOrFileConfigurationControl.StringEditorContextMenu>
</bc:StringOrFileConfigurationControl>
<shared:DocumentDbAdapterAdvancedConfigurationControl DataContext="{Binding}" />
</StackPanel>

Просмотреть файл

@ -21,6 +21,12 @@ namespace Microsoft.DataTransfer.DocumentDb.Wpf.Sink
public static readonly string CollectionTierPropertyName =
ObjectExtensions.MemberName<IDocumentDbSinkAdapterConfiguration>(c => c.CollectionTier);
public static readonly string IndexingPolicyPropertyName =
ObjectExtensions.MemberName<IDocumentDbSinkAdapterConfiguration>(c => c.IndexingPolicy);
public static readonly string IndexingPolicyFilePropertyName =
ObjectExtensions.MemberName<IDocumentDbSinkAdapterConfiguration>(c => c.IndexingPolicyFile);
public static readonly string IdFieldPropertyName =
ObjectExtensions.MemberName<IDocumentDbSinkAdapterConfiguration>(c => c.IdField);
@ -34,6 +40,10 @@ namespace Microsoft.DataTransfer.DocumentDb.Wpf.Sink
private string partitionKey;
private CollectionPricingTier? collectionTier;
private bool useIndexingPolicyFile;
private string indexingPolicy;
private string indexingPolicyFile;
private string idField;
private bool disableIdGeneration;
@ -71,6 +81,24 @@ namespace Microsoft.DataTransfer.DocumentDb.Wpf.Sink
set { SetProperty(ref collectionTier, value); }
}
public bool UseIndexingPolicyFile
{
get { return useIndexingPolicyFile; }
set { SetProperty(ref useIndexingPolicyFile, value); }
}
public string IndexingPolicy
{
get { return useIndexingPolicyFile ? null : indexingPolicy; }
set { SetProperty(ref indexingPolicy, value); }
}
public string IndexingPolicyFile
{
get { return useIndexingPolicyFile ? indexingPolicyFile : null; }
set { SetProperty(ref indexingPolicyFile, value); }
}
public string IdField
{
get { return idField; }
@ -100,10 +128,5 @@ namespace Microsoft.DataTransfer.DocumentDb.Wpf.Sink
{
SetErrors(EditableCollectionsPropertyName, ValidateNonEmptyCollection(sender as IEnumerable<string>));
}
protected static IReadOnlyCollection<string> ValidatePositiveInteger(int? value)
{
return value > 0 ? null : new[] { Resources.PositiveNumberRequired };
}
}
}

Просмотреть файл

@ -1,4 +1,5 @@
using Microsoft.DataTransfer.DocumentDb.Wpf.Shared;
using Microsoft.DataTransfer.Basics;
using Microsoft.DataTransfer.DocumentDb.Wpf.Shared;
using System;
using System.Collections.Generic;
@ -11,6 +12,9 @@ namespace Microsoft.DataTransfer.DocumentDb.Wpf.Sink
{
base.PopulateCommandLineArguments(configuration, arguments);
Guard.NotNull("configuration", configuration);
Guard.NotNull("arguments", arguments);
arguments.Add(DocumentDbSinkAdapterConfiguration.CollectionPropertyName, AsCollectionArgument(configuration.Collection));
if (!String.IsNullOrEmpty(configuration.PartitionKey))
@ -19,6 +23,17 @@ namespace Microsoft.DataTransfer.DocumentDb.Wpf.Sink
if (configuration.CollectionTier.HasValue && configuration.CollectionTier.Value != Defaults.Current.SinkCollectionTier)
arguments.Add(DocumentDbSinkAdapterConfiguration.CollectionTierPropertyName, configuration.CollectionTier.Value.ToString());
if (configuration.UseIndexingPolicyFile)
{
if (!String.IsNullOrEmpty(configuration.IndexingPolicyFile))
arguments.Add(DocumentDbSinkAdapterConfiguration.IndexingPolicyFilePropertyName, configuration.IndexingPolicyFile);
}
else
{
if (!String.IsNullOrEmpty(configuration.IndexingPolicy))
arguments.Add(DocumentDbSinkAdapterConfiguration.IndexingPolicyPropertyName, configuration.IndexingPolicy);
}
if (!String.IsNullOrEmpty(configuration.IdField))
arguments.Add(DocumentDbSinkAdapterConfiguration.IdFieldPropertyName, configuration.IdField);

Просмотреть файл

@ -15,6 +15,9 @@
</ResourceDictionary.MergedDictionaries>
<local:CollectionPricingTierValueConverter x:Key="CollectionTierConverter" />
<converters:BooleanToVisibilityValueConverter x:Key="IsVisible" />
<converters:BooleanToVisibilityValueConverter x:Key="IsInvisible" True="Collapsed" False="Visible" />
<converters:BooleanToStringValueConverter x:Key="YesNoConverter" />
</ResourceDictionary>
</UserControl.Resources>
@ -36,6 +39,7 @@
<RowDefinition />
<RowDefinition />
<RowDefinition />
<RowDefinition />
</Grid.RowDefinitions>
<Label Grid.Row="0" Grid.Column="0" Content="{DynamicResource ConnectionStringSummaryKey}" />
@ -76,18 +80,26 @@
<TextBlock Text="{Binding Dates}" />
</Label>
<Label Grid.Row="8" Grid.Column="0" Content="{DynamicResource RetriesSummaryKey}" />
<Label Grid.Row="8" Grid.Column="0" Content="{DynamicResource IndexingPolicySummaryKey}" />
<Label Grid.Row="8" Grid.Column="1">
<Grid>
<TextBlock Text="{Binding IndexingPolicy}" Visibility="{Binding UseIndexingPolicyFile, Converter={StaticResource IsInvisible}}" />
<TextBlock Text="{Binding IndexingPolicyFile}" Visibility="{Binding UseIndexingPolicyFile, Converter={StaticResource IsVisible}}" />
</Grid>
</Label>
<Label Grid.Row="9" Grid.Column="0" Content="{DynamicResource RetriesSummaryKey}" />
<Label Grid.Row="9" Grid.Column="1">
<TextBlock Text="{Binding Retries}" />
</Label>
<Label Grid.Row="9" Grid.Column="0" Content="{DynamicResource RetryIntervalSummaryKey}" />
<Label Grid.Row="9" Grid.Column="1">
<Label Grid.Row="10" Grid.Column="0" Content="{DynamicResource RetryIntervalSummaryKey}" />
<Label Grid.Row="10" Grid.Column="1">
<TextBlock Text="{Binding RetryInterval}" />
</Label>
<Label Grid.Row="10" Grid.Column="0" Content="{DynamicResource ConnectionModeSummaryKey}" />
<Label Grid.Row="10" Grid.Column="1">
<Label Grid.Row="11" Grid.Column="0" Content="{DynamicResource ConnectionModeSummaryKey}" />
<Label Grid.Row="11" Grid.Column="1">
<TextBlock Text="{Binding ConnectionMode}" />
</Label>
</Grid>

Просмотреть файл

@ -30,7 +30,121 @@
<system:String x:Key="CollectionTierKey">Collection Pricing Tier</system:String>
<system:String x:Key="CollectionTierDescriptionKey">Specify the pricing / performance tier of the collection. Note that this only applies to collection creation, existing collections will not have their pricing tier modified</system:String>
<system:String x:Key="CollectionTierSummaryKey">Collection Pricing Tier:</system:String>
<system:String x:Key="IndexingPolicySummaryKey">Collection Indexing Policy:</system:String>
<system:String x:Key="EnterIndexingPolicySelectionKey">Enter Indexing Policy</system:String>
<system:String x:Key="IndexingPolicyFileSelectionKey">Select Policy File</system:String>
<system:String x:Key="IndexingPolicyFileFilterKey">JSON Documents|*.json|All Files|*.*</system:String>
<system:String x:Key="IndexingPolicyFileDefaultExtensionKey">*.json</system:String>
<system:String x:Key="IndexingPolicyDefaultTemplateMenuHeader">Default</system:String>
<system:String x:Key="IndexingPolicyDefaultTemplate" xml:space="preserve">
<![CDATA[{
"indexingMode": "consistent",
"automatic": true,
"includedPaths": [
{
"path": "/*",
"indexes": [
{
"kind": "Range",
"dataType": "Number",
"precision": -1
},
{
"kind": "Hash",
"dataType": "String",
"precision": 3
}
]
},
{
"path": "/_ts/?",
"indexes": [
{
"kind": "Range",
"dataType": "Number",
"precision": -1
}
]
}
],
"excludedPaths": []
}]]>
</system:String>
<system:String x:Key="IndexingPolicyAllHashTemplateMenuHeader">Hash</system:String>
<system:String x:Key="IndexingPolicyAllHashTemplate" xml:space="preserve">
<![CDATA[{
"indexingMode": "consistent",
"automatic": true,
"includedPaths": [
{
"path": "/*",
"indexes": [
{
"kind": "Hash",
"dataType": "Number",
"precision": -1
},
{
"kind": "Hash",
"dataType": "String",
"precision": -1
}
]
},
{
"path": "/_ts/?",
"indexes": [
{
"kind": "Range",
"dataType": "Number",
"precision": -1
}
]
}
],
"excludedPaths": []
}]]>
</system:String>
<system:String x:Key="IndexingPolicyAllRangeTemplateMenuHeader">Range</system:String>
<system:String x:Key="IndexingPolicyAllRangeTemplate" xml:space="preserve">
<![CDATA[{
"indexingMode": "consistent",
"automatic": true,
"includedPaths": [
{
"path": "/*",
"indexes": [
{
"kind": "Range",
"dataType": "Number",
"precision": -1
},
{
"kind": "Range",
"dataType": "String",
"precision": -1
}
]
},
{
"path": "/_ts/?",
"indexes": [
{
"kind": "Range",
"dataType": "Number",
"precision": -1
}
]
}
],
"excludedPaths": []
}]]>
</system:String>
<system:String x:Key="ConnectionModeKey">Connection Mode</system:String>
<system:String x:Key="ConnectionModeDescriptionKey">Choose the DocumentDB connection mode. Direct connection works faster, but Gateway is more firewall friendly</system:String>
<system:String x:Key="ConnectionModeSummaryKey">Connection Mode:</system:String>

Просмотреть файл

@ -6,6 +6,10 @@
<assemblyIdentity name="Newtonsoft.Json" publicKeyToken="30ad4fe6b2a6aeed" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-6.0.0.0" newVersion="6.0.0.0" />
</dependentAssembly>
<dependentAssembly>
<assemblyIdentity name="Microsoft.Azure.Documents.Client" publicKeyToken="31bf3856ad364e35" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-1.2.0.0" newVersion="1.2.0.0" />
</dependentAssembly>
</assemblyBinding>
</runtime>
</configuration>

Просмотреть файл

@ -0,0 +1,79 @@
/*
Bulk import sink expects stored proc to return an array of objects in the following form:
{
i: <index of the document in the batch>,
e: <error message, if any>
}
"i" should match what was provided in the input "items" array and will be used
to associate error messages with each specific document.
This version of the stored procedure allows to perform custom document transformation.
Look for transformDocument(...) function for additional information.
*/
function BulkImport(items, disableAutomaticIdGeneration) {
var collection = getContext().getCollection();
var collectionLink = collection.getSelfLink();
var itemsState = [];
if (!items) {
throw new Error("The items array is undefined or null.");
}
if (items.length == 0) {
getContext().getResponse().setBody(itemsState);
return;
}
function transformDocument(doc) {
/*
Add custom tranformation code here. You can either modify existing document and
return it or create a new one.
Example #1, rename "ExternalId" field to "id":
doc["id"] = doc["ExternalId"];
delete doc.ExternalId;
Example #2, convert string field to an array:
if (doc["Cities"]) {
doc["Cities"] = doc["Cities"].split("|");
}
*/
return doc;
}
var options = { disableAutomaticIdGeneration: disableAutomaticIdGeneration != 0 };
tryCreate(items[itemsState.length], callback);
function tryCreate(item, callback) {
try {
if (!collection.createDocument(collectionLink, transformDocument(item.d), options,
function (error, doc, options) { callback(item.i, error); })) {
// If request was not accepted, return the results of bulk operation right away
getContext().getResponse().setBody(itemsState);
}
} catch (error) {
callback(item.i, error);
}
}
function callback(docIndex, error) {
var itemState = { i: docIndex };
if (error) {
itemState.e = error.message || "Failed to create document";
}
itemsState.push(itemState);
if (itemsState.length < items.length) {
tryCreate(items[itemsState.length], callback);
} else {
getContext().getResponse().setBody(itemsState);
}
}
}

Просмотреть файл

@ -7,11 +7,13 @@ using Microsoft.DataTransfer.DocumentDb.Client.Enumeration;
using Microsoft.DataTransfer.DocumentDb.Client.PartitionResolvers;
using Microsoft.DataTransfer.DocumentDb.Client.Serialization;
using Microsoft.DataTransfer.DocumentDb.Sink;
using Microsoft.DataTransfer.Extensibility.Basics.Collections;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.DataTransfer.DocumentDb.Client
@ -30,7 +32,7 @@ namespace Microsoft.DataTransfer.DocumentDb.Client
this.databaseName = databaseName;
}
public async Task<string> GetOrCreateCollectionAsync(string collectionName, CollectionPricingTier collectionTier)
public async Task<string> GetOrCreateCollectionAsync(string collectionName, CollectionPricingTier collectionTier, IndexingPolicy indexingPolicy)
{
Guard.NotEmpty("collectionName", collectionName);
@ -43,8 +45,12 @@ namespace Microsoft.DataTransfer.DocumentDb.Client
try
{
collection = await client.CreateDocumentCollectionAsync(database.SelfLink,
new DocumentCollection { Id = collectionName }, new RequestOptions { OfferType = ToOfferType(collectionTier) });
var collectionDefinition = new DocumentCollection { Id = collectionName };
if (indexingPolicy != null)
collectionDefinition.IndexingPolicy = indexingPolicy;
collection = await client.CreateDocumentCollectionAsync(database.SelfLink, collectionDefinition,
new RequestOptions { OfferType = ToOfferType(collectionTier) });
}
catch (DocumentClientException clientException)
{
@ -105,7 +111,7 @@ namespace Microsoft.DataTransfer.DocumentDb.Client
return client.DeleteStoredProcedureAsync(storedProcedureLink);
}
public async Task<IAsyncEnumerator<IReadOnlyDictionary<string, object>>> QueryDocumentsAsync(string collectionNamePattern, string query)
public async Task<IAsyncEnumerator<IReadOnlyDictionary<string, object>>> QueryDocumentsAsync(string collectionNamePattern, string query, CancellationToken cancellation)
{
Guard.NotEmpty("collectionNamePattern", collectionNamePattern);
@ -113,7 +119,7 @@ namespace Microsoft.DataTransfer.DocumentDb.Client
if (database == null)
return EmptyAsyncEnumerator<IReadOnlyDictionary<string, object>>.Instance;
var matchingCollections = await GetMatchingCollections(database, collectionNamePattern);
var matchingCollections = await GetMatchingCollections(database, collectionNamePattern, cancellation);
if (matchingCollections == null || !matchingCollections.Any())
return EmptyAsyncEnumerator<IReadOnlyDictionary<string, object>>.Instance;
@ -128,16 +134,22 @@ namespace Microsoft.DataTransfer.DocumentDb.Client
return new DocumentSurrogateQueryAsyncEnumerator(documentQuery.AsDocumentQuery());
}
private async Task<IReadOnlyList<string>> GetMatchingCollections(Database database, string collectionNamePattern)
private async Task<IReadOnlyList<string>> GetMatchingCollections(Database database, string collectionNamePattern, CancellationToken cancellation)
{
var result = new List<string>();
using (var enumerator = new AsyncEnumerator<DocumentCollection>(
client.CreateDocumentCollectionQuery(database.CollectionsLink).AsDocumentQuery()))
{
while (await enumerator.MoveNextAsync())
if (Regex.IsMatch(enumerator.Current.Id, collectionNamePattern))
var collectionNameRegex = new Regex(collectionNamePattern, RegexOptions.Compiled);
while (await enumerator.MoveNextAsync(cancellation))
{
var match = collectionNameRegex.Match(enumerator.Current.Id);
// Make sure regex matches entire collection name and not just substring
if (match.Success && String.Equals(match.Value, enumerator.Current.Id, StringComparison.InvariantCulture))
result.Add(enumerator.Current.SelfLink);
}
}
return result;

Просмотреть файл

@ -1,7 +1,9 @@
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.Documents.Linq;
using Microsoft.DataTransfer.Basics;
using Microsoft.DataTransfer.Extensibility.Basics.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.DataTransfer.DocumentDb.Client.Enumeration
@ -25,7 +27,7 @@ namespace Microsoft.DataTransfer.DocumentDb.Client.Enumeration
completed = false;
}
public async Task<bool> MoveNextAsync()
public async Task<bool> MoveNextAsync(CancellationToken cancellation)
{
if (completed)
return false;

Просмотреть файл

@ -1,18 +0,0 @@
using System.Threading.Tasks;
namespace Microsoft.DataTransfer.DocumentDb.Client.Enumeration
{
sealed class EmptyAsyncEnumerator<T> : IAsyncEnumerator<T>
{
public static readonly EmptyAsyncEnumerator<T> Instance = new EmptyAsyncEnumerator<T>();
public T Current { get { return default(T); } }
public Task<bool> MoveNextAsync()
{
return Task.FromResult(false);
}
public void Dispose() { }
}
}

Просмотреть файл

@ -1,11 +0,0 @@
using System;
using System.Threading.Tasks;
namespace Microsoft.DataTransfer.DocumentDb.Client.Enumeration
{
interface IAsyncEnumerator<out T> : IDisposable
{
T Current { get; }
Task<bool> MoveNextAsync();
}
}

Просмотреть файл

@ -1,12 +1,14 @@
using Microsoft.DataTransfer.DocumentDb.Client.Enumeration;
using Microsoft.DataTransfer.Extensibility.Basics.Collections;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.DataTransfer.DocumentDb.Client
{
interface IDocumentDbReadClient : IDisposable
{
Task<IAsyncEnumerator<IReadOnlyDictionary<string, object>>> QueryDocumentsAsync(string collectionNamePattern, string query);
Task<IAsyncEnumerator<IReadOnlyDictionary<string, object>>> QueryDocumentsAsync(
string collectionNamePattern, string query, CancellationToken cancellation);
}
}

Просмотреть файл

@ -1,4 +1,5 @@
using Microsoft.DataTransfer.DocumentDb.Sink;
using Microsoft.Azure.Documents;
using Microsoft.DataTransfer.DocumentDb.Sink;
using System;
using System.Threading.Tasks;
@ -6,7 +7,7 @@ namespace Microsoft.DataTransfer.DocumentDb.Client
{
interface IDocumentDbWriteClient : IDisposable
{
Task<string> GetOrCreateCollectionAsync(string collectionName, CollectionPricingTier collectionTier);
Task<string> GetOrCreateCollectionAsync(string collectionName, CollectionPricingTier collectionTier, IndexingPolicy indexingPolicy);
Task CreateDocumentAsync(string collectionLink, object document, bool disableAutomaticIdGeneration);
Task<string> CreateStoredProcedureAsync(string collectionLink, string name, string body);

Просмотреть файл

@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
// <auto-generated>
// This code was generated by a tool.
// Runtime Version:4.0.30319.18444
// Runtime Version:4.0.30319.34209
//
// Changes to this file may cause incorrect behavior and will be lost if
// the code is regenerated.
@ -177,6 +177,24 @@ namespace Microsoft.DataTransfer.DocumentDb {
}
}
/// <summary>
/// Looks up a localized string similar to Optional. Indexing policy for the collection.
/// </summary>
public static string Sink_IndexingPolicy {
get {
return ResourceManager.GetString("Sink_IndexingPolicy", resourceCulture);
}
}
/// <summary>
/// Looks up a localized string similar to Optional. Path to the file containing indexing policy for the collection.
/// </summary>
public static string Sink_IndexingPolicyFile {
get {
return ResourceManager.GetString("Sink_IndexingPolicyFile", resourceCulture);
}
}
/// <summary>
/// Looks up a localized string similar to Optional. Name of the property to use as hash partitioning key. If not provided - records will be randomly distributed across all target collections.
/// </summary>

Просмотреть файл

@ -156,6 +156,12 @@
<data name="Sink_IdField" xml:space="preserve">
<value>Optional. Name of the source field that should be treated as document ID</value>
</data>
<data name="Sink_IndexingPolicy" xml:space="preserve">
<value>Optional. Indexing policy for the collection</value>
</data>
<data name="Sink_IndexingPolicyFile" xml:space="preserve">
<value>Optional. Path to the file containing indexing policy for the collection</value>
</data>
<data name="Sink_PartitionKey" xml:space="preserve">
<value>Optional. Name of the property to use as hash partitioning key. If not provided - records will be randomly distributed across all target collections</value>
</data>

Просмотреть файл

@ -7,6 +7,8 @@ namespace Microsoft.DataTransfer.DocumentDb
{
sealed class Errors : CommonErrors
{
private Errors() { }
public static Exception ConnectionStringMissing()
{
return new ArgumentException(Resources.ConnectionStringMissing);
@ -32,6 +34,11 @@ namespace Microsoft.DataTransfer.DocumentDb
return new ArgumentException(Resources.DatabaseNameMissing);
}
public static Exception AmbiguousIndexingPolicy()
{
return new ArgumentException(Resources.AmbiguousIndexingPolicy);
}
public static Exception InvalidNumberOfParallelRequests()
{
return new ArgumentException(Resources.InvalidNumberOfParallelRequests);

Некоторые файлы не были показаны из-за слишком большого количества измененных файлов Показать больше