Addign both Version 1 and Version 2 code samples for ChangeFeedProcessor Library

This commit is contained in:
REDMOND\rafats 2018-04-25 16:14:17 -07:00
Родитель 40c5c331fd
Коммит efd185c945
21 изменённых файлов: 675 добавлений и 103 удалений

Просмотреть файл

@ -5,16 +5,23 @@
</startup>
<appSettings>
<add key="monitoredUri" value="https://rafat-metric-demo.documents.azure.com:443/" />
<add key="monitoredSecretKey" value="E0wCMaBIzyoK dlALwwMhg==" />
<add key="monitoredSecretKey" value="NfACLk04SHevvjF3HtMdyrH3RwlPVUZKK3KdlALwwMhg==" />
<add key="monitoredDbName" value="IoT" />
<add key="monitoredCollectionName" value="IoT" />
<add key="monitoredThroughput" value="400" />
<add key="leaseUri" value="https://rafat-metric-demo.documents.azure.com:443/" />
<add key="leaseSecretKey" value="E0wCMaBIzyoKD wwMhg==" />
<add key="leaseSecretKey" value="HevvjF3HtMdyrH3RwlPVUZKK3KdlALwwMhg==" />
<add key="leaseDbName" value="IoT" />
<add key="leaseCollectionName" value="leases" />
<add key="leaseThroughput" value="400" />
<add key="destUri" value="https://rafat-metric-demo.documents.azure.com:443/" />
<add key="destSecretKey" value="F3HtMdyrH3RwlPVUZKK3KdlALwwMhg==" />
<add key="destDbName" value="test" />
<add key="destCollectionName" value="destColl" />
<add key="destThroughput" value="400" />
</appSettings>
<runtime>
<assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">

Просмотреть файл

@ -54,8 +54,8 @@
<StartupObject>ChangeFeedProcessor.Program</StartupObject>
</PropertyGroup>
<ItemGroup>
<Reference Include="Microsoft.Azure.Documents.ChangeFeedProcessor, Version=2.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>packages\Microsoft.Azure.DocumentDB.ChangeFeedProcessor.2.0.0-beta\lib\net451\Microsoft.Azure.Documents.ChangeFeedProcessor.dll</HintPath>
<Reference Include="Microsoft.Azure.Documents.ChangeFeedProcessor, Version=1.3.2.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>packages\Microsoft.Azure.DocumentDB.ChangeFeedProcessor.1.3.2\lib\net452\Microsoft.Azure.Documents.ChangeFeedProcessor.dll</HintPath>
</Reference>
<Reference Include="Microsoft.Azure.Documents.Client, Version=1.21.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>packages\Microsoft.Azure.DocumentDB.1.21.1\lib\net45\Microsoft.Azure.Documents.Client.dll</HintPath>

Просмотреть файл

@ -18,7 +18,7 @@ namespace ChangeFeedProcessor
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;
using Microsoft.Azure.Documents.ChangeFeedProcessor;
using Microsoft.Azure.Documents.Client;
/// <summary>
@ -29,6 +29,8 @@ namespace ChangeFeedProcessor
public class DocumentFeedObserver : IChangeFeedObserver
{
private static int totalDocs = 0;
private DocumentClient client;
private Uri destinationCollectionUri;
/// <summary>
/// Initializes a new instance of the <see cref="DocumentFeedObserver" /> class.
@ -36,9 +38,13 @@ namespace ChangeFeedProcessor
/// </summary>
/// <param name="client"> Client connected to destination collection </param>
/// <param name="destCollInfo"> Destination collection information </param>
public DocumentFeedObserver()
public DocumentFeedObserver(DocumentClient client, DocumentCollectionInfo destCollInfo)
{
this.client = client;
if (destCollInfo == null) return;
this.destinationCollectionUri = UriFactory.CreateDocumentCollectionUri(destCollInfo.DatabaseName, destCollInfo.CollectionName);
}
/// <summary>
@ -47,7 +53,7 @@ namespace ChangeFeedProcessor
/// </summary>
/// <param name="context">The context specifying partition for this observer, etc.</param>
/// <returns>A Task to allow asynchronous execution</returns>
public Task OpenAsync(IChangeFeedObserverContext context)
public Task OpenAsync(ChangeFeedObserverContext context)
{
Console.ForegroundColor = ConsoleColor.Magenta;
Console.WriteLine("Observer opened for partition Key Range: {0}", context.PartitionKeyRangeId);
@ -61,7 +67,7 @@ namespace ChangeFeedProcessor
/// <param name="context">The context specifying partition for this observer, etc.</param>
/// <param name="reason">Specifies the reason the observer is closed.</param>
/// <returns>A Task to allow asynchronous execution</returns>
public Task CloseAsync(IChangeFeedObserverContext context, ChangeFeedObserverCloseReason reason)
public Task CloseAsync(ChangeFeedObserverContext context, ChangeFeedObserverCloseReason reason)
{
Console.ForegroundColor = ConsoleColor.Cyan;
Console.WriteLine("Observer closed, {0}", context.PartitionKeyRangeId);
@ -69,8 +75,14 @@ namespace ChangeFeedProcessor
return Task.CompletedTask;
}
public Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Document> docs, CancellationToken cancellationToken)
/// <summary>
/// When document changes are available on change feed, changes are copied to destination connection;
/// this function prints out the changed document ID.
/// </summary>
/// <param name="context">The context specifying partition for this observer, etc.</param>
/// <param name="docs">The documents changed.</param>
/// <returns>A Task to allow asynchronous execution</returns>
public Task ProcessChangesAsync(ChangeFeedObserverContext context, IReadOnlyList<Document> docs)
{
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine("Change feed: PartitionId {0} total {1} doc(s)", context.PartitionKeyRangeId, Interlocked.Add(ref totalDocs, docs.Count));
@ -78,6 +90,12 @@ namespace ChangeFeedProcessor
{
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine(doc.Id.ToString());
if (this.destinationCollectionUri != null)
{
this.client.UpsertDocumentAsync(this.destinationCollectionUri, doc);
}
}
return Task.CompletedTask;

Просмотреть файл

@ -13,19 +13,27 @@
namespace ChangeFeedProcessor
{
using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;
using Microsoft.Azure.Documents.ChangeFeedProcessor;
using Microsoft.Azure.Documents.Client;
/// <summary>
/// Factory class to create instance of document feed observer.
/// </summary>
public class DocumentFeedObserverFactory : IChangeFeedObserverFactory
{
private DocumentClient client;
private DocumentCollectionInfo collectionInfo;
/// <summary>
/// Initializes a new instance of the <see cref="DocumentFeedObserverFactory" /> class.
/// Saves input DocumentClient and DocumentCollectionInfo parameters to class fields
/// </summary>
public DocumentFeedObserverFactory()
/// <param name="destClient">Client connected to destination collection</param>
/// <param name="destCollInfo">Destination collection information</param>
public DocumentFeedObserverFactory(DocumentClient destClient, DocumentCollectionInfo destCollInfo)
{
this.collectionInfo = destCollInfo;
this.client = destClient;
}
/// <summary>
@ -34,8 +42,8 @@ namespace ChangeFeedProcessor
/// <returns>DocumentFeedObserver with client and destination collection information</returns>
public IChangeFeedObserver CreateObserver()
{
DocumentFeedObserver newObserver = new DocumentFeedObserver();
return newObserver as IChangeFeedObserver;
DocumentFeedObserver newObserver = new DocumentFeedObserver(this.client, this.collectionInfo);
return newObserver;
}
}
}

Просмотреть файл

@ -1,17 +1,28 @@

//---------------------------------------------------------------------------------
// <copyright file="Program.cs" company="Microsoft">
// Microsoft (R) Azure SDK
// Software Development Kit
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// THIS CODE AND INFORMATION ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND,
// EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED WARRANTIES
// OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE.
// </copyright>
//---------------------------------------------------------------------------------
namespace ChangeFeedProcessor
{
using System;
using System.Configuration;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.ChangeFeedProcessor;
using Microsoft.Azure.Documents.ChangeFeedProcessor.Logging;
using Microsoft.Azure.Documents.Client;
/// ------------------------------------------------------------------------------------------------
/// <summary> This sample demonstrates using change processor library to read changes from source collection
/// to destination collection </summary>
/// ------------------------------------------------------------------------------------------------
public class Program
{
@ -25,40 +36,27 @@ namespace ChangeFeedProcessor
// optional setting to store lease collection on different account
// set lease Uri, secretKey and DbName to same as monitored if both collections
// are on the same account
private string leaseUri = ConfigurationManager.AppSettings["leaseUri"];
private string leaseSecretKey = ConfigurationManager.AppSettings["leaseSecretKey"];
private string leaseDbName = ConfigurationManager.AppSettings["leaseDbName"];
private string leaseCollectionName = ConfigurationManager.AppSettings["leaseCollectionName"];
private int leaseThroughput = int.Parse(ConfigurationManager.AppSettings["leaseThroughput"]);
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private readonly ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorBuilder();
// destination collection for data movement in this sample
// could be same or different account
private string destUri = ConfigurationManager.AppSettings["destUri"];
private string destSecretKey = ConfigurationManager.AppSettings["destSecretKey"];
private string destDbName = ConfigurationManager.AppSettings["destDbName"];
private string destCollectionName = ConfigurationManager.AppSettings["destCollectionName"];
private int destThroughput = int.Parse(ConfigurationManager.AppSettings["destThroughput"]);
/// <summary>
/// Main program function; called when program runs
/// </summary>
/// <param name="args">Command line parameters (not used)</param>
///
public static void Main(string[] args)
{
string hostName = "HostName " + DateTime.Now.Ticks.ToString();
if (args.Length == 1)
{
hostName = args[0];
}
Console.WriteLine("Change Feed Processor client Started at:{0} for HostName: {1} ", DateTime.Now.ToShortTimeString(), hostName);
//Setting up Logging
var tracelogProvider = new TraceLogProvider();
using (tracelogProvider.OpenNestedContext(hostName))
{
LogProvider.SetCurrentLogProvider(tracelogProvider);
// After this, create IChangeFeedProcessor instance and start/stop it.
}
Console.WriteLine("Change Feed Processor client Started at: " + DateTime.Now.ToShortTimeString());
Program newApp = new Program();
newApp.MainAsync().Wait();
}
@ -84,69 +82,16 @@ namespace ChangeFeedProcessor
this.leaseCollectionName,
this.leaseThroughput);
await this.CreateCollectionIfNotExistsAsync(
this.destUri,
this.destSecretKey,
this.destDbName,
this.destCollectionName,
this.destThroughput);
await this.RunChangeFeedHostAsync();
}
/// <summary>
/// Registers change feed observer to update changes read on change feed to destination
/// collection. Deregisters change feed observer and closes process when enter key is pressed
/// </summary>
/// <returns>A Task to allow asynchronous execution</returns>
public async Task RunChangeFeedHostAsync()
{
string hostName = Guid.NewGuid().ToString();
// monitored collection info
DocumentCollectionInfo documentCollectionInfo = new DocumentCollectionInfo
{
Uri = new Uri(this.monitoredUri),
MasterKey = this.monitoredSecretKey,
DatabaseName = this.monitoredDbName,
CollectionName = this.monitoredCollectionName
};
DocumentCollectionInfo leaseCollectionInfo = new DocumentCollectionInfo
{
Uri = new Uri(this.leaseUri),
MasterKey = this.leaseSecretKey,
DatabaseName = this.leaseDbName,
CollectionName = this.leaseCollectionName
};
DocumentFeedObserverFactory docObserverFactory = new DocumentFeedObserverFactory();
ChangeFeedOptions feedOptions = new ChangeFeedOptions();
/* ie customize StartFromBeginning so change feed reads from beginning
can customize MaxItemCount, PartitonKeyRangeId, RequestContinuation, SessionToken and StartFromBeginning
*/
feedOptions.StartFromBeginning = true;
ChangeFeedProcessorOptions feedProcessorOptions = new ChangeFeedProcessorOptions();
// ie. customizing lease renewal interval to 15 seconds
// can customize LeaseRenewInterval, LeaseAcquireInterval, LeaseExpirationInterval, FeedPollDelay
feedProcessorOptions.LeaseRenewInterval = TimeSpan.FromSeconds(15);
this.builder
.WithHostName(hostName)
.WithFeedCollection(documentCollectionInfo)
.WithLeaseCollection(leaseCollectionInfo)
.WithProcessorOptions(feedProcessorOptions)
.WithObserverFactory(new DocumentFeedObserverFactory());
// .WithObserver<DocumentFeedObserver>(); or just pass a observer
var result = await this.builder.BuildAsync();
await result.StartAsync();
Console.Read();
await result.StopAsync();
}
/// <summary>
/// Checks whether collections exists. Creates new collection if collection does not exist
/// WARNING: CreateCollectionIfNotExistsAsync will create a new
@ -177,5 +122,69 @@ namespace ChangeFeedProcessor
}
}
/// <summary>
/// Registers change feed observer to update changes read on change feed to destination
/// collection. Deregisters change feed observer and closes process when enter key is pressed
/// </summary>
/// <returns>A Task to allow asynchronous execution</returns>
public async Task RunChangeFeedHostAsync()
{
string hostName = Guid.NewGuid().ToString();
// monitored collection info
DocumentCollectionInfo documentCollectionLocation = new DocumentCollectionInfo
{
Uri = new Uri(this.monitoredUri),
MasterKey = this.monitoredSecretKey,
DatabaseName = this.monitoredDbName,
CollectionName = this.monitoredCollectionName
};
// lease collection info
DocumentCollectionInfo leaseCollectionLocation = new DocumentCollectionInfo
{
Uri = new Uri(this.leaseUri),
MasterKey = this.leaseSecretKey,
DatabaseName = this.leaseDbName,
CollectionName = this.leaseCollectionName
};
// destination collection info
DocumentCollectionInfo destCollInfo = new DocumentCollectionInfo
{
Uri = new Uri(this.destUri),
MasterKey = this.destSecretKey,
DatabaseName = this.destDbName,
CollectionName = this.destCollectionName
};
// Customizable change feed option and host options
ChangeFeedOptions feedOptions = new ChangeFeedOptions();
// ie customize StartFromBeginning so change feed reads from beginning
// can customize MaxItemCount, PartitonKeyRangeId, RequestContinuation, SessionToken and StartFromBeginning
feedOptions.StartFromBeginning = true;
ChangeFeedHostOptions feedHostOptions = new ChangeFeedHostOptions();
// ie. customizing lease renewal interval to 15 seconds
// can customize LeaseRenewInterval, LeaseAcquireInterval, LeaseExpirationInterval, FeedPollDelay
feedHostOptions.LeaseRenewInterval = TimeSpan.FromSeconds(15);
using (DocumentClient destClient = new DocumentClient(destCollInfo.Uri, destCollInfo.MasterKey))
{
DocumentFeedObserverFactory docObserverFactory = new DocumentFeedObserverFactory(destClient, destCollInfo);
ChangeFeedEventHost host = new ChangeFeedEventHost(hostName, documentCollectionLocation, leaseCollectionLocation, feedOptions, feedHostOptions);
await host.RegisterObserverFactoryAsync(docObserverFactory);
Console.WriteLine("Running... Press enter to stop.");
Console.ReadLine();
await host.UnregisterObserversAsync();
}
}
}
}

Просмотреть файл

@ -1,2 +1 @@
If you are using Change FeedProcessor library earlier than version 2 then please take a look at the code commit on
https://github.com/Azure/azure-documentdb-dotnet/tree/125976b573d3d87f9e5f3f518cc0161dc4d15045/samples/code-samples/ChangeFeedProcessor
If you are using Change FeedProcessor library version 2 then please take a look at the code in ChangeFeedProcessorV2 directory.

Просмотреть файл

@ -1,7 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Microsoft.Azure.DocumentDB" version="1.21.1" targetFramework="net461" />
<package id="Microsoft.Azure.DocumentDB.ChangeFeedProcessor" version="2.0.0-beta" targetFramework="net461" />
<package id="Microsoft.Azure.DocumentDB.ChangeFeedProcessor" version="1.3.2" targetFramework="net461" />
<package id="Microsoft.CSharp" version="4.4.1" targetFramework="net461" />
<package id="Newtonsoft.Json" version="9.0.1" targetFramework="net461" />
</packages>

4
samples/code-samples/ChangeFeedProcessorV2/.gitignore поставляемый Normal file
Просмотреть файл

@ -0,0 +1,4 @@
bin/
obj/
packages/
.vs/ChangeFeedProcessor/v15/sqlite3/storage.ide

Двоичный файл не отображается.

Просмотреть файл

Просмотреть файл

@ -0,0 +1,31 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.6.1" />
</startup>
<appSettings>
<add key="monitoredUri" value="https://rafat-metric-demo.documents.azure.com:443/" />
<add key="monitoredSecretKey" value="E0wCMaBIzyoK dlALwwMhg==" />
<add key="monitoredDbName" value="IoT" />
<add key="monitoredCollectionName" value="IoT" />
<add key="monitoredThroughput" value="400" />
<add key="leaseUri" value="https://rafat-metric-demo.documents.azure.com:443/" />
<add key="leaseSecretKey" value="E0wCMaBIzyoKD wwMhg==" />
<add key="leaseDbName" value="IoT" />
<add key="leaseCollectionName" value="leases" />
<add key="leaseThroughput" value="400" />
</appSettings>
<runtime>
<assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">
<dependentAssembly>
<assemblyIdentity name="Newtonsoft.Json" publicKeyToken="30ad4fe6b2a6aeed" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-9.0.0.0" newVersion="9.0.0.0" />
</dependentAssembly>
<dependentAssembly>
<assemblyIdentity name="Microsoft.Azure.Documents.Client" publicKeyToken="31bf3856ad364e35" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-1.17.0.0" newVersion="1.17.0.0" />
</dependentAssembly>
</assemblyBinding>
</runtime>
</configuration>

Просмотреть файл

@ -0,0 +1,99 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProjectGuid>{3ACC802F-8C9D-43C2-B945-F1F48F990586}</ProjectGuid>
<OutputType>Exe</OutputType>
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>ChangeFeedProcessor</RootNamespace>
<AssemblyName>ChangeFeedProcessor</AssemblyName>
<TargetFrameworkVersion>v4.6.1</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects>
<NuGetPackageImportStamp>
</NuGetPackageImportStamp>
<IsWebBootstrapper>false</IsWebBootstrapper>
<TargetFrameworkProfile />
<PublishUrl>publish\</PublishUrl>
<Install>true</Install>
<InstallFrom>Disk</InstallFrom>
<UpdateEnabled>false</UpdateEnabled>
<UpdateMode>Foreground</UpdateMode>
<UpdateInterval>7</UpdateInterval>
<UpdateIntervalUnits>Days</UpdateIntervalUnits>
<UpdatePeriodically>false</UpdatePeriodically>
<UpdateRequired>false</UpdateRequired>
<MapFileExtensions>true</MapFileExtensions>
<ApplicationRevision>0</ApplicationRevision>
<ApplicationVersion>1.0.0.%2a</ApplicationVersion>
<UseApplicationTrust>false</UseApplicationTrust>
<BootstrapperEnabled>true</BootstrapperEnabled>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget>
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget>
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\Release\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup>
<StartupObject>ChangeFeedProcessor.Program</StartupObject>
</PropertyGroup>
<ItemGroup>
<Reference Include="Microsoft.Azure.Documents.ChangeFeedProcessor, Version=2.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>packages\Microsoft.Azure.DocumentDB.ChangeFeedProcessor.2.0.0-beta\lib\net451\Microsoft.Azure.Documents.ChangeFeedProcessor.dll</HintPath>
</Reference>
<Reference Include="Microsoft.Azure.Documents.Client, Version=1.21.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>packages\Microsoft.Azure.DocumentDB.1.21.1\lib\net45\Microsoft.Azure.Documents.Client.dll</HintPath>
</Reference>
<Reference Include="Microsoft.CSharp" />
<Reference Include="Newtonsoft.Json, Version=9.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
<HintPath>packages\Newtonsoft.Json.9.0.1\lib\net45\Newtonsoft.Json.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Configuration" />
</ItemGroup>
<ItemGroup>
<Compile Include="DocumentFeedObserver.cs" />
<Compile Include="DocumentFeedObserverFactory.cs" />
<Compile Include="Program.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
<ItemGroup>
<None Include="App.config">
<SubType>Designer</SubType>
</None>
<None Include="packages.config">
<SubType>Designer</SubType>
</None>
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<Import Project="packages\Microsoft.Azure.DocumentDB.1.21.1\build\Microsoft.Azure.DocumentDB.targets" Condition="Exists('packages\Microsoft.Azure.DocumentDB.1.21.1\build\Microsoft.Azure.DocumentDB.targets')" />
<Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild">
<PropertyGroup>
<ErrorText>This project references NuGet package(s) that are missing on this computer. Use NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText>
</PropertyGroup>
<Error Condition="!Exists('packages\Microsoft.Azure.DocumentDB.1.21.1\build\Microsoft.Azure.DocumentDB.targets')" Text="$([System.String]::Format('$(ErrorText)', 'packages\Microsoft.Azure.DocumentDB.1.21.1\build\Microsoft.Azure.DocumentDB.targets'))" />
</Target>
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
<Target Name="BeforeBuild">
</Target>
<Target Name="AfterBuild">
</Target>
-->
</Project>

Просмотреть файл

@ -0,0 +1,25 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26730.15
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ChangeFeedProcessor", "ChangeFeedProcessor.csproj", "{3ACC802F-8C9D-43C2-B945-F1F48F990586}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{3ACC802F-8C9D-43C2-B945-F1F48F990586}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{3ACC802F-8C9D-43C2-B945-F1F48F990586}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3ACC802F-8C9D-43C2-B945-F1F48F990586}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3ACC802F-8C9D-43C2-B945-F1F48F990586}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A4B9BB10-7DBC-4B3B-93F3-931F9394762B}
EndGlobalSection
EndGlobal

Просмотреть файл

@ -0,0 +1,86 @@
//---------------------------------------------------------------------------------
// <copyright file="DocumentFeedObserver.cs" company="Microsoft">
// Microsoft (R) Azure SDK
// Software Development Kit
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// THIS CODE AND INFORMATION ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND,
// EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED WARRANTIES
// OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE.
// </copyright>
//---------------------------------------------------------------------------------
namespace ChangeFeedProcessor
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;
using Microsoft.Azure.Documents.Client;
/// <summary>
/// This class implements the IChangeFeedObserver interface and is used to observe
/// changes on change feed. ChangeFeedEventHost will create as many instances of
/// this class as needed.
/// </summary>
public class DocumentFeedObserver : IChangeFeedObserver
{
private static int totalDocs = 0;
/// <summary>
/// Initializes a new instance of the <see cref="DocumentFeedObserver" /> class.
/// Saves input DocumentClient and DocumentCollectionInfo parameters to class fields
/// </summary>
/// <param name="client"> Client connected to destination collection </param>
/// <param name="destCollInfo"> Destination collection information </param>
public DocumentFeedObserver()
{
}
/// <summary>
/// Called when change feed observer is opened;
/// this function prints out observer partition key id.
/// </summary>
/// <param name="context">The context specifying partition for this observer, etc.</param>
/// <returns>A Task to allow asynchronous execution</returns>
public Task OpenAsync(IChangeFeedObserverContext context)
{
Console.ForegroundColor = ConsoleColor.Magenta;
Console.WriteLine("Observer opened for partition Key Range: {0}", context.PartitionKeyRangeId);
return Task.CompletedTask;
}
/// <summary>
/// Called when change feed observer is closed;
/// this function prints out observer partition key id and reason for shut down.
/// </summary>
/// <param name="context">The context specifying partition for this observer, etc.</param>
/// <param name="reason">Specifies the reason the observer is closed.</param>
/// <returns>A Task to allow asynchronous execution</returns>
public Task CloseAsync(IChangeFeedObserverContext context, ChangeFeedObserverCloseReason reason)
{
Console.ForegroundColor = ConsoleColor.Cyan;
Console.WriteLine("Observer closed, {0}", context.PartitionKeyRangeId);
Console.WriteLine("Reason for shutdown, {0}", reason);
return Task.CompletedTask;
}
public Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Document> docs, CancellationToken cancellationToken)
{
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine("Change feed: PartitionId {0} total {1} doc(s)", context.PartitionKeyRangeId, Interlocked.Add(ref totalDocs, docs.Count));
foreach (Document doc in docs)
{
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine(doc.Id.ToString());
}
return Task.CompletedTask;
}
}
}

Просмотреть файл

@ -0,0 +1,41 @@
//---------------------------------------------------------------------------------
// <copyright file="DocumentFeedObserverFactory.cs" company="Microsoft">
// Microsoft (R) Azure SDK
// Software Development Kit
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// THIS CODE AND INFORMATION ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND,
// EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED WARRANTIES
// OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE.
// </copyright>
//---------------------------------------------------------------------------------
namespace ChangeFeedProcessor
{
using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;
/// <summary>
/// Factory class to create instance of document feed observer.
/// </summary>
public class DocumentFeedObserverFactory : IChangeFeedObserverFactory
{
/// <summary>
/// Initializes a new instance of the <see cref="DocumentFeedObserverFactory" /> class.
/// Saves input DocumentClient and DocumentCollectionInfo parameters to class fields
/// </summary>
public DocumentFeedObserverFactory()
{
}
/// <summary>
/// Creates document observer instance with client and destination collection information
/// </summary>
/// <returns>DocumentFeedObserver with client and destination collection information</returns>
public IChangeFeedObserver CreateObserver()
{
DocumentFeedObserver newObserver = new DocumentFeedObserver();
return newObserver as IChangeFeedObserver;
}
}
}

Просмотреть файл

@ -0,0 +1,20 @@
The MIT License (MIT)
Copyright (c) 2014 Microsoft Corporation
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

Просмотреть файл

@ -0,0 +1,181 @@

namespace ChangeFeedProcessor
{
using System;
using System.Configuration;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.ChangeFeedProcessor;
using Microsoft.Azure.Documents.ChangeFeedProcessor.Logging;
using Microsoft.Azure.Documents.Client;
/// ------------------------------------------------------------------------------------------------
public class Program
{
// Modify EndPointUrl and PrimaryKey to connect to your own subscription
private string monitoredUri = ConfigurationManager.AppSettings["monitoredUri"];
private string monitoredSecretKey = ConfigurationManager.AppSettings["monitoredSecretKey"];
private string monitoredDbName = ConfigurationManager.AppSettings["monitoredDbName"];
private string monitoredCollectionName = ConfigurationManager.AppSettings["monitoredCollectionName"];
private int monitoredThroughput = int.Parse(ConfigurationManager.AppSettings["monitoredThroughput"]);
// optional setting to store lease collection on different account
// set lease Uri, secretKey and DbName to same as monitored if both collections
// are on the same account
private string leaseUri = ConfigurationManager.AppSettings["leaseUri"];
private string leaseSecretKey = ConfigurationManager.AppSettings["leaseSecretKey"];
private string leaseDbName = ConfigurationManager.AppSettings["leaseDbName"];
private string leaseCollectionName = ConfigurationManager.AppSettings["leaseCollectionName"];
private int leaseThroughput = int.Parse(ConfigurationManager.AppSettings["leaseThroughput"]);
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private readonly ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorBuilder();
/// <summary>
/// Main program function; called when program runs
/// </summary>
/// <param name="args">Command line parameters (not used)</param>
///
public static void Main(string[] args)
{
string hostName = "HostName " + DateTime.Now.Ticks.ToString();
if (args.Length == 1)
{
hostName = args[0];
}
Console.WriteLine("Change Feed Processor client Started at:{0} for HostName: {1} ", DateTime.Now.ToShortTimeString(), hostName);
//Setting up Logging
var tracelogProvider = new TraceLogProvider();
using (tracelogProvider.OpenNestedContext(hostName))
{
LogProvider.SetCurrentLogProvider(tracelogProvider);
// After this, create IChangeFeedProcessor instance and start/stop it.
}
Program newApp = new Program();
newApp.MainAsync().Wait();
}
/// <summary>
/// Main Async function; checks for or creates monitored/lease collections and runs
/// Change Feed Host (RunChangeFeedHostAsync)
/// </summary>
/// <returns>A Task to allow asynchronous execution</returns>
private async Task MainAsync()
{
await this.CreateCollectionIfNotExistsAsync(
this.monitoredUri,
this.monitoredSecretKey,
this.monitoredDbName,
this.monitoredCollectionName,
this.monitoredThroughput);
await this.CreateCollectionIfNotExistsAsync(
this.leaseUri,
this.leaseSecretKey,
this.leaseDbName,
this.leaseCollectionName,
this.leaseThroughput);
await this.RunChangeFeedHostAsync();
}
/// <summary>
/// Registers change feed observer to update changes read on change feed to destination
/// collection. Deregisters change feed observer and closes process when enter key is pressed
/// </summary>
/// <returns>A Task to allow asynchronous execution</returns>
public async Task RunChangeFeedHostAsync()
{
string hostName = Guid.NewGuid().ToString();
// monitored collection info
DocumentCollectionInfo documentCollectionInfo = new DocumentCollectionInfo
{
Uri = new Uri(this.monitoredUri),
MasterKey = this.monitoredSecretKey,
DatabaseName = this.monitoredDbName,
CollectionName = this.monitoredCollectionName
};
DocumentCollectionInfo leaseCollectionInfo = new DocumentCollectionInfo
{
Uri = new Uri(this.leaseUri),
MasterKey = this.leaseSecretKey,
DatabaseName = this.leaseDbName,
CollectionName = this.leaseCollectionName
};
DocumentFeedObserverFactory docObserverFactory = new DocumentFeedObserverFactory();
ChangeFeedOptions feedOptions = new ChangeFeedOptions();
/* ie customize StartFromBeginning so change feed reads from beginning
can customize MaxItemCount, PartitonKeyRangeId, RequestContinuation, SessionToken and StartFromBeginning
*/
feedOptions.StartFromBeginning = true;
ChangeFeedProcessorOptions feedProcessorOptions = new ChangeFeedProcessorOptions();
// ie. customizing lease renewal interval to 15 seconds
// can customize LeaseRenewInterval, LeaseAcquireInterval, LeaseExpirationInterval, FeedPollDelay
feedProcessorOptions.LeaseRenewInterval = TimeSpan.FromSeconds(15);
this.builder
.WithHostName(hostName)
.WithFeedCollection(documentCollectionInfo)
.WithLeaseCollection(leaseCollectionInfo)
.WithProcessorOptions(feedProcessorOptions)
.WithObserverFactory(new DocumentFeedObserverFactory());
// .WithObserver<DocumentFeedObserver>(); or just pass a observer
var result = await this.builder.BuildAsync();
await result.StartAsync();
Console.Read();
await result.StopAsync();
}
/// <summary>
/// Checks whether collections exists. Creates new collection if collection does not exist
/// WARNING: CreateCollectionIfNotExistsAsync will create a new
/// with reserved throughput which has pricing implications. For details
/// visit: https://azure.microsoft.com/en-us/pricing/details/cosmos-db/
/// </summary>
/// <param name="endPointUri">End point URI for account </param>
/// <param name="secretKey">Primary key to access the account </param>
/// <param name="databaseName">Name of database </param>
/// <param name="collectionName">Name of collection</param>
/// <param name="throughput">Amount of throughput to provision</param>
/// <returns>A Task to allow asynchronous execution</returns>
public async Task CreateCollectionIfNotExistsAsync(string endPointUri, string secretKey, string databaseName, string collectionName, int throughput)
{
// connecting client
using (DocumentClient client = new DocumentClient(new Uri(endPointUri), secretKey))
{
await client.CreateDatabaseIfNotExistsAsync(new Database { Id = databaseName });
// create collection if it does not exist
// WARNING: CreateDocumentCollectionIfNotExistsAsync will create a new
// with reserved throughput which has pricing implications. For details
// visit: https://azure.microsoft.com/en-us/pricing/details/cosmos-db/
await client.CreateDocumentCollectionIfNotExistsAsync(
UriFactory.CreateDatabaseUri(databaseName),
new DocumentCollection { Id = collectionName },
new RequestOptions { OfferThroughput = throughput });
}
}
}
}

Просмотреть файл

@ -0,0 +1,36 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("ChangefeedMigrationSample")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("ChangefeedMigrationSample")]
[assembly: AssemblyCopyright("Copyright © 2017")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]
// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("3acc802f-8c9d-43c2-b945-f1f48f990586")]
// Version information for an assembly consists of the following four values:
//
// Major Version
// Minor Version
// Build Number
// Revision
//
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]

Просмотреть файл

@ -0,0 +1 @@
If you are using Change FeedProcessor library earlier than version 2 then please take a look at the code in ChangeFeedProcessor directory.

Просмотреть файл

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Microsoft.Azure.DocumentDB" version="1.21.1" targetFramework="net461" />
<package id="Microsoft.Azure.DocumentDB.ChangeFeedProcessor" version="2.0.0-beta" targetFramework="net461" />
<package id="Microsoft.CSharp" version="4.4.1" targetFramework="net461" />
<package id="Newtonsoft.Json" version="9.0.1" targetFramework="net461" />
</packages>