From 2f1178635b4ef0fde4487848348a2549ad3d69d7 Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Thu, 12 Sep 2019 09:48:55 -0700 Subject: [PATCH] Adding sample for Bulk support (#787) * Adding sample * Removing comparison * texts * Using continuewith * Counting --- .../Usage/BulkSupport/BulkSupport.csproj | 25 +++ .../Usage/BulkSupport/Program.cs | 196 ++++++++++++++++++ .../Usage/Cosmos.Samples.Usage.sln | 7 + 3 files changed, 228 insertions(+) create mode 100644 Microsoft.Azure.Cosmos.Samples/Usage/BulkSupport/BulkSupport.csproj create mode 100644 Microsoft.Azure.Cosmos.Samples/Usage/BulkSupport/Program.cs diff --git a/Microsoft.Azure.Cosmos.Samples/Usage/BulkSupport/BulkSupport.csproj b/Microsoft.Azure.Cosmos.Samples/Usage/BulkSupport/BulkSupport.csproj new file mode 100644 index 000000000..c29acac0a --- /dev/null +++ b/Microsoft.Azure.Cosmos.Samples/Usage/BulkSupport/BulkSupport.csproj @@ -0,0 +1,25 @@ + + + + Exe + netcoreapp2.1 + Cosmos.Samples.BulkSupport + Cosmos.Samples.BulkSupport + latest + + + + + + + + + + + + + + PreserveNewest + + + diff --git a/Microsoft.Azure.Cosmos.Samples/Usage/BulkSupport/Program.cs b/Microsoft.Azure.Cosmos.Samples/Usage/BulkSupport/Program.cs new file mode 100644 index 000000000..2d398f02b --- /dev/null +++ b/Microsoft.Azure.Cosmos.Samples/Usage/BulkSupport/Program.cs @@ -0,0 +1,196 @@ +namespace Cosmos.Samples.Shared +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Net; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos; + using Microsoft.Extensions.Configuration; + using Newtonsoft.Json; + + // ---------------------------------------------------------------------------------------------------------- + // Prerequisites - + // + // 1. An Azure Cosmos account - + // https://azure.microsoft.com/en-us/itemation/articles/itemdb-create-account/ + // + // 2. Microsoft.Azure.Cosmos NuGet package - + // http://www.nuget.org/packages/Microsoft.Azure.Cosmos/ + // ---------------------------------------------------------------------------------------------------------- + // Sample - demonstrates the basic usage of the CosmosClient bulk mode by performing a high volume of operations + // ---------------------------------------------------------------------------------------------------------- + + public class Program + { + private const int concurrentWorkers = 3; + private const int concurrentDocuments = 100; + private const string databaseId = "samples"; + private const string containerId = "bulk-support"; + private static readonly JsonSerializer Serializer = new JsonSerializer(); + + //Reusable instance of ItemClient which represents the connection to a Cosmos endpoint + private static Database database = null; + + // Async main requires c# 7.1 which is set in the csproj with the LangVersion attribute + //
+ public static async Task Main(string[] args) + { + try + { + // Read the Cosmos endpointUrl and authorisationKeys from configuration + // These values are available from the Azure Management Portal on the Cosmos Account Blade under "Keys" + // Keep these values in a safe & secure location. Together they provide Administrative access to your Cosmos account + IConfigurationRoot configuration = new ConfigurationBuilder() + .AddJsonFile("appSettings.json") + .Build(); + + string endpoint = configuration["EndPointUrl"]; + if (string.IsNullOrEmpty(endpoint)) + { + throw new ArgumentNullException("Please specify a valid endpoint in the appSettings.json"); + } + + string authKey = configuration["AuthorizationKey"]; + if (string.IsNullOrEmpty(authKey) || string.Equals(authKey, "Super secret key")) + { + throw new ArgumentException("Please specify a valid AuthorizationKey in the appSettings.json"); + } + + CosmosClient bulkClient = Program.GetBulkClientInstance(endpoint, authKey); + // Create the require container, can be done with any client + await Program.InitializeAsync(bulkClient); + + Console.WriteLine("Running demo with a Bulk enabled CosmosClient..."); + // Execute inserts for 30 seconds on a Bulk enabled client + await Program.CreateItemsConcurrentlyAsync(bulkClient); + } + catch (CosmosException cre) + { + Console.WriteLine(cre.ToString()); + } + catch (Exception e) + { + Exception baseException = e.GetBaseException(); + Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message); + } + finally + { + await Program.CleanupAsync(); + Console.WriteLine("End of demo, press any key to exit."); + Console.ReadKey(); + } + } + //
+ + private static CosmosClient GetBulkClientInstance( + string endpoint, + string authKey) => + // + new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true } ); + // + + private static async Task CreateItemsConcurrentlyAsync(CosmosClient client) + { + // Create concurrent workers that will insert items for 30 seconds + CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); + cancellationTokenSource.CancelAfter(30000); + CancellationToken cancellationToken = cancellationTokenSource.Token; + + Container container = client.GetContainer(Program.databaseId, Program.containerId); + List> workerTasks = new List>(Program.concurrentWorkers); + Console.WriteLine($"Initiating process with {Program.concurrentWorkers} worker threads writing groups of {Program.concurrentDocuments} items for 30 seconds."); + for (var i = 0; i < Program.concurrentWorkers; i++) + { + workerTasks.Add(CreateItemsAsync(container, cancellationToken)); + } + + await Task.WhenAll(workerTasks); + Console.WriteLine($"Inserted {workerTasks.Sum(task => task.Result)} items."); + } + + private static async Task CreateItemsAsync( + Container container, + CancellationToken cancellationToken) + { + int itemsCreated = 0; + string partitionKeyValue = Guid.NewGuid().ToString(); + while (!cancellationToken.IsCancellationRequested) + { + List tasks = new List(Program.concurrentDocuments); + for (int i = 0; i < Program.concurrentDocuments; i++) + { + string id = Guid.NewGuid().ToString(); + MyDocument myDocument = new MyDocument() { id = id, pk = partitionKeyValue }; + tasks.Add( + container.CreateItemAsync(myDocument, new PartitionKey(partitionKeyValue)) + .ContinueWith((Task> task) => + { + if (!task.IsCompletedSuccessfully) + { + AggregateException innerExceptions = task.Exception.Flatten(); + CosmosException cosmosException = innerExceptions.InnerExceptions.FirstOrDefault(innerEx => innerEx is CosmosException) as CosmosException; + Console.WriteLine($"Item {myDocument.id} failed with status code {cosmosException.StatusCode}"); + } + })); + } + + await Task.WhenAll(tasks); + + itemsCreated += tasks.Count(task => task.IsCompletedSuccessfully); + } + + return itemsCreated; + } + + // + private class MyDocument + { + public string id { get; set; } + + public string pk { get; set; } + + public bool Updated { get; set; } + } + // + + private static async Task CleanupAsync() + { + if (Program.database != null) + { + await Program.database.DeleteAsync(); + } + } + + private static async Task InitializeAsync(CosmosClient client) + { + Program.database = await client.CreateDatabaseIfNotExistsAsync(Program.databaseId); + + // Delete the existing container to prevent create item conflicts + using (await database.GetContainer(containerId).DeleteContainerStreamAsync()) + { } + + // We create a partitioned collection here which needs a partition key. Partitioned collections + // can be created with very high values of provisioned throughput (up to Throughput = 250,000) + // and used to store up to 250 GB of data. + ContainerProperties containerProperties = new ContainerProperties(containerId, partitionKeyPath: "/pk"); + + Console.WriteLine("The demo will create a 20000 RU/s container, press any key to continue."); + Console.ReadKey(); + // Create with a throughput of 20000 RU/s - this demo is about throughput so it needs a higher degree of RU/s to show volume + // Indexing Policy to exclude all attributes to maximize RU/s usage + await database.DefineContainer(containerId, "/pk") + .WithIndexingPolicy() + .WithIndexingMode(IndexingMode.Consistent) + .WithIncludedPaths() + .Attach() + .WithExcludedPaths() + .Path("/*") + .Attach() + .Attach() + .CreateAsync(20000); + } + } +} + diff --git a/Microsoft.Azure.Cosmos.Samples/Usage/Cosmos.Samples.Usage.sln b/Microsoft.Azure.Cosmos.Samples/Usage/Cosmos.Samples.Usage.sln index 7bdfa6810..d9211816d 100644 --- a/Microsoft.Azure.Cosmos.Samples/Usage/Cosmos.Samples.Usage.sln +++ b/Microsoft.Azure.Cosmos.Samples/Usage/Cosmos.Samples.Usage.sln @@ -24,8 +24,11 @@ EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "CustomSerialization", "CustomSerialization\CustomSerialization.csproj", "{61EA3F61-F2ED-4FD4-89EC-06342C5AE3C9}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PowerShellRestApi", "PowerShellRestApi\PowerShellRestApi.csproj", "{A67B259B-9E3F-4689-9691-B250A88593BB}" +EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ChangeFeed", "ChangeFeed\ChangeFeed.csproj", "{D4959F3F-14FD-4AEA-851F-EEF944393603}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "BulkSupport", "BulkSupport\BulkSupport.csproj", "{71ADD826-17CB-4095-B4F7-C5CDEDC731EA}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -80,6 +83,10 @@ Global {D4959F3F-14FD-4AEA-851F-EEF944393603}.Debug|Any CPU.Build.0 = Debug|Any CPU {D4959F3F-14FD-4AEA-851F-EEF944393603}.Release|Any CPU.ActiveCfg = Release|Any CPU {D4959F3F-14FD-4AEA-851F-EEF944393603}.Release|Any CPU.Build.0 = Release|Any CPU + {71ADD826-17CB-4095-B4F7-C5CDEDC731EA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {71ADD826-17CB-4095-B4F7-C5CDEDC731EA}.Debug|Any CPU.Build.0 = Debug|Any CPU + {71ADD826-17CB-4095-B4F7-C5CDEDC731EA}.Release|Any CPU.ActiveCfg = Release|Any CPU + {71ADD826-17CB-4095-B4F7-C5CDEDC731EA}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE