From d5f9b5d42dc04c03fb63d4807f1d2fc95218eb4b Mon Sep 17 00:00:00 2001 From: arramac Date: Sun, 11 Dec 2016 11:28:33 -0800 Subject: [PATCH] Change Feed samples --- samples/code-samples/ChangeFeed/App.config | 7 + .../code-samples/ChangeFeed/ChangeFeed.csproj | 89 +++++++ samples/code-samples/ChangeFeed/Program.cs | 241 ++++++++++++++++++ .../ChangeFeed/Properties/AssemblyInfo.cs | 36 +++ .../code-samples/ChangeFeed/packages.config | 5 + samples/code-samples/DocumentDB.Samples.sln | 10 +- 6 files changed, 386 insertions(+), 2 deletions(-) create mode 100644 samples/code-samples/ChangeFeed/App.config create mode 100644 samples/code-samples/ChangeFeed/ChangeFeed.csproj create mode 100644 samples/code-samples/ChangeFeed/Program.cs create mode 100644 samples/code-samples/ChangeFeed/Properties/AssemblyInfo.cs create mode 100644 samples/code-samples/ChangeFeed/packages.config diff --git a/samples/code-samples/ChangeFeed/App.config b/samples/code-samples/ChangeFeed/App.config new file mode 100644 index 0000000..2da60f8 --- /dev/null +++ b/samples/code-samples/ChangeFeed/App.config @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/samples/code-samples/ChangeFeed/ChangeFeed.csproj b/samples/code-samples/ChangeFeed/ChangeFeed.csproj new file mode 100644 index 0000000..fe8ca54 --- /dev/null +++ b/samples/code-samples/ChangeFeed/ChangeFeed.csproj @@ -0,0 +1,89 @@ + + + + + Debug + AnyCPU + {7FA488B0-ECAC-4614-94BD-4570FDBACBF3} + Exe + Properties + ChangeFeed + ChangeFeed + v4.5.2 + 512 + true + + + + + AnyCPU + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + AnyCPU + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + ..\packages\Microsoft.Azure.DocumentDB.1.11.0\lib\net45\Microsoft.Azure.Documents.Client.dll + True + + + ..\packages\Newtonsoft.Json.6.0.8\lib\net45\Newtonsoft.Json.dll + True + + + + + + + + + + + + + + + + + + appSettings.config + PreserveNewest + + + + + + + {16c11979-9b7c-4f35-bcd1-cac434d79467} + Shared + + + + + + + This project references NuGet package(s) that are missing on this computer. Use NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}. + + + + + \ No newline at end of file diff --git a/samples/code-samples/ChangeFeed/Program.cs b/samples/code-samples/ChangeFeed/Program.cs new file mode 100644 index 0000000..8c2f0da --- /dev/null +++ b/samples/code-samples/ChangeFeed/Program.cs @@ -0,0 +1,241 @@ +namespace DocumentDB.Samples.Queries +{ + using Shared.Util; + using Microsoft.Azure.Documents; + using Microsoft.Azure.Documents.Client; + using Microsoft.Azure.Documents.Linq; + using Newtonsoft.Json; + using Newtonsoft.Json.Serialization; + using System; + using System.Collections.Generic; + using System.Configuration; + using System.Linq; + using System.Threading.Tasks; + using Newtonsoft.Json.Converters; + + //------------------------------------------------------------------------------------------------ + // This sample demonstrates how + //------------------------------------------------------------------------------------------------ + + public class Program + { + private static DocumentClient client; + + // Assign an id for your database & collection + private static readonly string DatabaseName = ConfigurationManager.AppSettings["DatabaseId"]; + private static readonly string CollectionName = ConfigurationManager.AppSettings["CollectionId"]; + + // Read the DocumentDB endpointUrl and authorizationKeys from config + private static readonly string endpointUrl = ConfigurationManager.AppSettings["EndPointUrl"]; + private static readonly string authorizationKey = ConfigurationManager.AppSettings["AuthorizationKey"]; + + public static void Main(string[] args) + { + try + { + //Get a Document client + using (client = new DocumentClient(new Uri(endpointUrl), authorizationKey, + new ConnectionPolicy { ConnectionMode = ConnectionMode.Direct, ConnectionProtocol = Protocol.Tcp })) + { + RunDemoAsync(DatabaseName, CollectionName).Wait(); + } + } +#if !DEBUG + catch (Exception e) + { + LogException(e); + } +#endif + finally + { + Console.WriteLine("End of demo, press any key to exit."); + Console.ReadKey(); + } + } + + private static async Task RunDemoAsync(string databaseId, string collectionId) + { + Database database = await GetNewDatabaseAsync(databaseId); + DocumentCollection collection = await GetOrCreateCollectionAsync(databaseId, collectionId); + + Uri collectionUri = UriFactory.CreateDocumentCollectionUri(databaseId, collectionId); + + Console.WriteLine("Inserting 100 documents"); + List insertTasks = new List(); + for (int i = 0; i < 100; i++) + { + insertTasks.Add(client.CreateDocumentAsync( + collectionUri, + new DeviceReading { DeviceId = string.Format("xsensr-{0}", i), MetricType = "Temperature", Unit = "Celsius", MetricValue = 990 })); + } + + await Task.WhenAll(insertTasks); + + // Returns all documents in the collection. + Console.WriteLine("Reading all changes from the beginning"); + Dictionary checkpoints = await GetChanges(client, collectionUri, new Dictionary()); + + Console.WriteLine("Inserting 2 new documents"); + await client.CreateDocumentAsync( + collectionUri, + new DeviceReading { DeviceId = "xsensr-201", MetricType = "Temperature", Unit = "Celsius", MetricValue = 1000 }); + await client.CreateDocumentAsync( + collectionUri, + new DeviceReading { DeviceId = "xsensr-212", MetricType = "Pressure", Unit = "psi", MetricValue = 1000 }); + + // Returns only the two documents created above. + Console.WriteLine("Reading changes using Change Feed from the last checkpoint"); + checkpoints = await GetChanges(client, collectionUri, checkpoints); + } + + /// + /// Get changes within the collection since the last checkpoint. This sample shows how to process the change + /// feed from a single worker. When working with large collections, this is typically split across multiple + /// workers each processing a single or set of partition key ranges. + /// + /// DocumentDB client instance + /// Collection to retrieve changes from + /// + /// + private static async Task> GetChanges( + DocumentClient client, + Uri collectionUri, + Dictionary checkpoints) + { + int numChangesRead = 0; + List partitionKeyRanges = new List(); + FeedResponse pkRangesResponse; + + do + { + pkRangesResponse = await client.ReadPartitionKeyRangeFeedAsync(collectionUri); + partitionKeyRanges.AddRange(pkRangesResponse); + } + while (pkRangesResponse.ResponseContinuation != null); + + foreach (PartitionKeyRange pkRange in partitionKeyRanges) + { + string continuation = null; + checkpoints.TryGetValue(pkRange.Id, out continuation); + + IDocumentQuery query = client.CreateDocumentChangeFeedQuery( + collectionUri, + new ChangeFeedOptions + { + PartitionKeyRangeId = pkRange.Id, + StartFromBeginning = true, + RequestContinuation = continuation, + MaxItemCount = -1 + }); + + while (query.HasMoreResults) + { + FeedResponse readChangesResponse = query.ExecuteNextAsync().Result; + + foreach (DeviceReading changedDocument in readChangesResponse) + { + Console.WriteLine("\tRead document {0} from the change feed.", changedDocument.Id); + numChangesRead++; + } + + checkpoints[pkRange.Id] = readChangesResponse.ResponseContinuation; + } + } + + Console.WriteLine("Read {0} documents from the change feed", numChangesRead); + + return checkpoints; + } + + /// + /// Get a Database for this id. Delete if it already exists. + /// + /// The id of the Database to create. + /// The created Database object + private static async Task GetNewDatabaseAsync(string id) + { + Database database = client.CreateDatabaseQuery().Where(c => c.Id == id).ToArray().FirstOrDefault(); + if (database != null) + { + await client.DeleteDatabaseAsync(database.SelfLink); + } + + database = await client.CreateDatabaseAsync(new Database { Id = id }); + return database; + } + + /// + /// Get a DocuemntCollection by id, or create a new one if one with the id provided doesn't exist. + /// + /// The id of the DocumentCollection to search for, or create. + /// The matched, or created, DocumentCollection object + private static async Task GetOrCreateCollectionAsync(string databaseId, string collectionId) + { + DocumentCollection collection = client.CreateDocumentCollectionQuery(UriFactory.CreateDatabaseUri(databaseId)) + .Where(c => c.Id == collectionId) + .ToArray() + .SingleOrDefault(); + + if (collection == null) + { + DocumentCollection collectionDefinition = new DocumentCollection(); + collectionDefinition.Id = collectionId; + collectionDefinition.IndexingPolicy = new IndexingPolicy(new RangeIndex(DataType.String) { Precision = -1 }); + collectionDefinition.PartitionKey.Paths.Add("/deviceId"); + + collection = await DocumentClientHelper.CreateDocumentCollectionWithRetriesAsync( + client, + databaseId, + collectionDefinition, + 400); + } + + return collection; + } + + /// + /// Log exception error message to the console + /// + /// The caught exception. + private static void LogException(Exception e) + { + ConsoleColor color = Console.ForegroundColor; + Console.ForegroundColor = ConsoleColor.Red; + + Exception baseException = e.GetBaseException(); + if (e is DocumentClientException) + { + DocumentClientException de = (DocumentClientException)e; + Console.WriteLine("{0} error occurred: {1}, Message: {2}", de.StatusCode, de.Message, baseException.Message); + } + else + { + Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message); + } + + Console.ForegroundColor = color; + } + + public class DeviceReading + { + [JsonProperty("id")] + public string Id { get; set; } + + [JsonProperty("deviceId")] + public string DeviceId { get; set; } + + [JsonConverter(typeof(IsoDateTimeConverter))] + [JsonProperty("readingTime")] + public DateTime ReadingTime { get; set; } + + [JsonProperty("metricType")] + public string MetricType { get; set; } + + [JsonProperty("unit")] + public string Unit { get; set; } + + [JsonProperty("metricValue")] + public double MetricValue { get; set; } + } + } +} diff --git a/samples/code-samples/ChangeFeed/Properties/AssemblyInfo.cs b/samples/code-samples/ChangeFeed/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..ca99168 --- /dev/null +++ b/samples/code-samples/ChangeFeed/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("ChangeFeed")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("ChangeFeed")] +[assembly: AssemblyCopyright("Copyright © 2016")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("7fa488b0-ecac-4614-94bd-4570fdbacbf3")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/samples/code-samples/ChangeFeed/packages.config b/samples/code-samples/ChangeFeed/packages.config new file mode 100644 index 0000000..2822063 --- /dev/null +++ b/samples/code-samples/ChangeFeed/packages.config @@ -0,0 +1,5 @@ + + + + + \ No newline at end of file diff --git a/samples/code-samples/DocumentDB.Samples.sln b/samples/code-samples/DocumentDB.Samples.sln index cfb03a4..63a5fc2 100644 --- a/samples/code-samples/DocumentDB.Samples.sln +++ b/samples/code-samples/DocumentDB.Samples.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio 2013 -VisualStudioVersion = 12.0.40629.0 +# Visual Studio 14 +VisualStudioVersion = 14.0.25420.1 MinimumVisualStudioVersion = 10.0.40219.1 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{2DF914B1-91A8-4C78-989B-64D1ABEF6A17}" ProjectSection(SolutionItems) = preProject @@ -38,6 +38,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Queries", "Queries\Queries. EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Geospatial", "Geospatial\Geospatial.csproj", "{B0A55C76-4470-482E-BD16-5AA96068FE61}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ChangeFeed", "ChangeFeed\ChangeFeed.csproj", "{7FA488B0-ECAC-4614-94BD-4570FDBACBF3}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -80,6 +82,10 @@ Global {B0A55C76-4470-482E-BD16-5AA96068FE61}.Debug|Any CPU.Build.0 = Debug|Any CPU {B0A55C76-4470-482E-BD16-5AA96068FE61}.Release|Any CPU.ActiveCfg = Release|Any CPU {B0A55C76-4470-482E-BD16-5AA96068FE61}.Release|Any CPU.Build.0 = Release|Any CPU + {7FA488B0-ECAC-4614-94BD-4570FDBACBF3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {7FA488B0-ECAC-4614-94BD-4570FDBACBF3}.Debug|Any CPU.Build.0 = Debug|Any CPU + {7FA488B0-ECAC-4614-94BD-4570FDBACBF3}.Release|Any CPU.ActiveCfg = Release|Any CPU + {7FA488B0-ECAC-4614-94BD-4570FDBACBF3}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE