Adding Change Feed Processor code migration sample (#782)
* Adding migration sample * Adding more options * Code comments * comments
This commit is contained in:
Родитель
bb5f531337
Коммит
6459cec853
|
@ -9,6 +9,7 @@
|
|||
</PropertyGroup>
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.1.1" />
|
||||
<PackageReference Include="Microsoft.Azure.DocumentDB.ChangeFeedProcessor" Version="2.2.7" />
|
||||
<PackageReference Include="Microsoft.Extensions.Configuration" Version="2.2.0" />
|
||||
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.2.0" />
|
||||
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.2.0" />
|
||||
|
|
|
@ -6,7 +6,9 @@
|
|||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.Cosmos;
|
||||
using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using ChangeFeedProcessorLibrary = Microsoft.Azure.Documents.ChangeFeedProcessor;
|
||||
|
||||
// ----------------------------------------------------------------------------------------------------------
|
||||
// Prerequisites -
|
||||
|
@ -26,6 +28,8 @@
|
|||
// 3. Listening for changes that happen since the container was created.
|
||||
//
|
||||
// 4. Generate Estimator metrics to expose current Change Feed Processor progress
|
||||
//
|
||||
// 5. Code migration template from existing Change Feed Processor library V2
|
||||
//-----------------------------------------------------------------------------------------------------------
|
||||
|
||||
|
||||
|
@ -66,6 +70,8 @@
|
|||
await Program.RunStartFromBeginningChangeFeed("changefeed-beginning", client);
|
||||
Console.WriteLine($"\n4. Generate Estimator metrics to expose current Change Feed Processor progress.");
|
||||
await Program.RunEstimatorChangeFeed("changefeed-estimator", client);
|
||||
Console.WriteLine($"\n5. Code migration template from existing Change Feed Processor library V2.");
|
||||
await Program.RunMigrationSample("changefeed-migration", client, configuration);
|
||||
}
|
||||
}
|
||||
finally
|
||||
|
@ -243,6 +249,89 @@
|
|||
await changeFeedEstimator.StopAsync();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Example of a code migration template from Change Feed Processor V2 to SDK V3.
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public static async Task RunMigrationSample(
|
||||
string databaseId,
|
||||
CosmosClient client,
|
||||
IConfigurationRoot configuration)
|
||||
{
|
||||
await Program.InitializeAsync(databaseId, client);
|
||||
|
||||
Console.WriteLine("Generating 10 items that will be picked up by the old Change Feed Processor library...");
|
||||
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
|
||||
|
||||
// This is how you would initialize the processor in V2
|
||||
// <ChangeFeedProcessorLibrary>
|
||||
ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
|
||||
{
|
||||
DatabaseName = databaseId,
|
||||
CollectionName = Program.monitoredContainer,
|
||||
Uri = new Uri(configuration["EndPointUrl"]),
|
||||
MasterKey = configuration["AuthorizationKey"]
|
||||
};
|
||||
|
||||
ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
|
||||
{
|
||||
DatabaseName = databaseId,
|
||||
CollectionName = Program.leasesContainer,
|
||||
Uri = new Uri(configuration["EndPointUrl"]),
|
||||
MasterKey = configuration["AuthorizationKey"]
|
||||
};
|
||||
|
||||
ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
|
||||
var oldChangeFeedProcessor = await builder
|
||||
.WithHostName("consoleHost")
|
||||
.WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions {
|
||||
StartFromBeginning = true,
|
||||
LeasePrefix = "MyLeasePrefix" })
|
||||
.WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions()
|
||||
{
|
||||
MaxItemCount = 10,
|
||||
FeedPollDelay = TimeSpan.FromSeconds(1)
|
||||
})
|
||||
.WithFeedCollection(monitoredCollectionInfo)
|
||||
.WithLeaseCollection(leaseCollectionInfo)
|
||||
.WithObserver<ChangeFeedObserver>()
|
||||
.BuildAsync();
|
||||
// </ChangeFeedProcessorLibrary>
|
||||
|
||||
await oldChangeFeedProcessor.StartAsync();
|
||||
|
||||
// Wait random time for the delegate to output all messages after initialization is done
|
||||
await Task.Delay(5000);
|
||||
Console.WriteLine("Now we will stop the V2 Processor and start a V3 with the same parameters to pick up from the same state, press any key to continue...");
|
||||
Console.ReadKey();
|
||||
await oldChangeFeedProcessor.StopAsync();
|
||||
|
||||
Console.WriteLine("Generating 5 items that will be picked up by the new Change Feed Processor...");
|
||||
await Program.GenerateItems(5, client.GetContainer(databaseId, Program.monitoredContainer));
|
||||
|
||||
// This is how you would do the same initialization in V3
|
||||
// <ChangeFeedProcessorMigrated>
|
||||
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
|
||||
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
|
||||
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
|
||||
.GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
|
||||
.WithInstanceName("consoleHost")
|
||||
.WithLeaseContainer(leaseContainer)
|
||||
.WithMaxItems(10)
|
||||
.WithPollInterval(TimeSpan.FromSeconds(1))
|
||||
.WithStartTime(DateTime.MinValue.ToUniversalTime())
|
||||
.Build();
|
||||
// </ChangeFeedProcessorMigrated>
|
||||
|
||||
await changeFeedProcessor.StartAsync();
|
||||
|
||||
// Wait random time for the delegate to output all messages after initialization is done
|
||||
await Task.Delay(5000);
|
||||
Console.WriteLine("Press any key to continue with the next demo...");
|
||||
Console.ReadKey();
|
||||
await changeFeedProcessor.StopAsync();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
|
||||
/// </summary>
|
||||
|
@ -322,4 +411,27 @@
|
|||
public DateTime creationTime { get; set; }
|
||||
}
|
||||
// </Model>
|
||||
|
||||
internal class ChangeFeedObserver : IChangeFeedObserver
|
||||
{
|
||||
public Task CloseAsync(IChangeFeedObserverContext context, ChangeFeedObserverCloseReason reason)
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task OpenAsync(IChangeFeedObserverContext context)
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Microsoft.Azure.Documents.Document> docs, CancellationToken cancellationToken)
|
||||
{
|
||||
foreach (var doc in docs)
|
||||
{
|
||||
Console.WriteLine($"\t[OLD Processor] Detected operation for item with id {doc.Id}, created at {doc.GetPropertyValue<DateTime>("creationTime")}.");
|
||||
}
|
||||
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче