Merge pull request #142 from arramac/master

move sample to new SDK build
This commit is contained in:
Aravind Ramachandran 2016-08-12 13:33:34 -07:00 коммит произвёл GitHub
Родитель 0e839ad273 b6ddd7645b
Коммит 2dfce14b8f
14 изменённых файлов: 745 добавлений и 149 удалений

Просмотреть файл

@ -0,0 +1,22 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 2013
VisualStudioVersion = 12.0.40629.0
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DocDBClientBulk", "DocDBClientBulk\DocDBClientBulk.csproj", "{14D4D128-02CF-45BE-B14C-D195B0AFD181}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{14D4D128-02CF-45BE-B14C-D195B0AFD181}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{14D4D128-02CF-45BE-B14C-D195B0AFD181}.Debug|Any CPU.Build.0 = Debug|Any CPU
{14D4D128-02CF-45BE-B14C-D195B0AFD181}.Release|Any CPU.ActiveCfg = Release|Any CPU
{14D4D128-02CF-45BE-B14C-D195B0AFD181}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
EndGlobal

Просмотреть файл

@ -0,0 +1,14 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5" />
</startup>
<runtime>
<assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">
<dependentAssembly>
<assemblyIdentity name="Newtonsoft.Json" publicKeyToken="30ad4fe6b2a6aeed" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-9.0.0.0" newVersion="9.0.0.0" />
</dependentAssembly>
</assemblyBinding>
</runtime>
</configuration>

Просмотреть файл

@ -0,0 +1,105 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProjectGuid>{14D4D128-02CF-45BE-B14C-D195B0AFD181}</ProjectGuid>
<OutputType>Exe</OutputType>
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>DocDBClientBulk</RootNamespace>
<AssemblyName>DocDBClientBulk</AssemblyName>
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<NuGetPackageImportStamp>8116ee7e</NuGetPackageImportStamp>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget>
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget>
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\Release\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Reference Include="Microsoft.Azure.Documents.Client, Version=1.9.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\packages\Microsoft.Azure.DocumentDB.1.9.2\lib\net45\Microsoft.Azure.Documents.Client.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Microsoft.Azure.KeyVault.Core, Version=1.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\packages\Microsoft.Azure.KeyVault.Core.1.0.0\lib\net40\Microsoft.Azure.KeyVault.Core.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Microsoft.Data.Edm, Version=5.6.4.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\packages\Microsoft.Data.Edm.5.6.4\lib\net40\Microsoft.Data.Edm.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Microsoft.Data.OData, Version=5.6.4.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\packages\Microsoft.Data.OData.5.6.4\lib\net40\Microsoft.Data.OData.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Microsoft.Data.Services.Client, Version=5.6.4.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\packages\Microsoft.Data.Services.Client.5.6.4\lib\net40\Microsoft.Data.Services.Client.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Microsoft.WindowsAzure.Storage, Version=7.1.2.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\packages\WindowsAzure.Storage.7.1.2\lib\net40\Microsoft.WindowsAzure.Storage.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Newtonsoft.Json, Version=9.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
<HintPath>..\packages\Newtonsoft.Json.9.0.1\lib\net45\Newtonsoft.Json.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.Spatial, Version=5.6.4.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\packages\System.Spatial.5.6.4\lib\net40\System.Spatial.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
<Reference Include="Microsoft.CSharp" />
<Reference Include="System.Data" />
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="Program.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
<ItemGroup>
<None Include="App.config" />
<None Include="packages.config" />
</ItemGroup>
<ItemGroup>
<Content Include="addActiveTransaction.js" />
<Content Include="bulkDelete.js" />
<Content Include="bulkImport.js" />
<Content Include="removeActiveTransaction.js" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<Import Project="..\packages\Microsoft.Azure.DocumentDB.1.9.2\build\Microsoft.Azure.DocumentDB.targets" Condition="Exists('..\packages\Microsoft.Azure.DocumentDB.1.9.2\build\Microsoft.Azure.DocumentDB.targets')" />
<Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild">
<PropertyGroup>
<ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText>
</PropertyGroup>
<Error Condition="!Exists('..\packages\Microsoft.Azure.DocumentDB.1.9.2\build\Microsoft.Azure.DocumentDB.targets')" Text="$([System.String]::Format('$(ErrorText)', '..\packages\Microsoft.Azure.DocumentDB.1.9.2\build\Microsoft.Azure.DocumentDB.targets'))" />
</Target>
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
<Target Name="BeforeBuild">
</Target>
<Target Name="AfterBuild">
</Target>
-->
</Project>

Просмотреть файл

@ -0,0 +1,269 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Net;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Newtonsoft.Json;
using System.IO;
namespace DocDBClientBulk
{
public partial class Program
{
const string Endpoint = "https://localhost:443/";
const string AuthKey = "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";
const string DbName = "db";
const string CollectionName = "c";
private static Guid activeTransactionsDocId;
private static Guid transactionId;
// This console application accomplishes the following:
// 1. Creates and inserts a document designated for keeping track of the list of currently active transaction ids.
// 2. Generates a GUID (let's call this tId0 for the new transaction and adds this to the document created in step 1.
// 3. Invokes the bulk import stored procedure. Upon insertion of each document, a new property TransactionId with value = tId0 is added to associate each processed document
// with the current active transaction.
// 4. If the bulk import stored procedure completes successfully, remove tId0 from the document created in step 1.
// 5. If the bulk import stored procedure fails to complete, clean up the processed documents with TransactionId = tId0. Do this by invoking the bulk delete stored procedure.
// Remove tId0 from teh document created in step 1.
static void Main(string[] args)
{
Console.WriteLine("Using {0}, {1}, {2}", Endpoint, DbName, CollectionName);
activeTransactionsDocId = Guid.NewGuid();
ActiveTransactions activeTransactionsDoc = new ActiveTransactions
{
Id = activeTransactionsDocId,
Transactions = new List<string>()
};
// Generate the document that keeps track of active transactions if it does not already exist.
CreateActiveTransactionsDocIfNotExists(activeTransactionsDoc).Wait();
// Add a new guid to the list of active transactions for the current transaction.
AddActiveTransaction().Wait();
// Invoke bulk import sproc, adding new property TransactionId with value = current active transaction to each imported document.
InvokeBulkImportSproc().Wait();
// If bulk import succeeded (or if bulk delete completed successfully), remove id of the current transaction from the active transactions document.
RemoveActiveTransaction().Wait();
}
private static async Task InvokeBulkImportSproc()
{
int numDocs = 1000;
ExampleDoc[] docs = new ExampleDoc[numDocs];
for (int i = 0; i < numDocs; i++)
{
ExampleDoc doc = new ExampleDoc
{
Id = Guid.NewGuid().ToString()
};
docs[i] = doc;
}
var client = new DocumentClient(new Uri(Endpoint), AuthKey);
Uri collectionLink = UriFactory.CreateDocumentCollectionUri(DbName, CollectionName);
string scriptFileName = @"bulkImport.js";
string scriptId = Path.GetFileNameWithoutExtension(scriptFileName);
string scriptName = "bulkImport";
await CreateSprocIfNotExists(scriptFileName, scriptId, scriptName);
Uri sprocUri = UriFactory.CreateStoredProcedureUri(DbName, CollectionName, scriptName);
try
{
await client.ExecuteStoredProcedureAsync<int>(sprocUri, transactionId.ToString(), docs);
}
catch (DocumentClientException ex)
{
throw;
}
catch (AggregateException ex)
{
// If bulk import failed, delete all documents in the collection with TransactionId = id of the failed transaction.
InvokeBulkDeleteSproc().Wait();
}
}
private static async Task InvokeBulkDeleteSproc()
{
var client = new DocumentClient(new Uri(Endpoint), AuthKey);
Uri collectionLink = UriFactory.CreateDocumentCollectionUri(DbName, CollectionName);
string scriptFileName = @"bulkDelete.js";
string scriptId = Path.GetFileNameWithoutExtension(scriptFileName);
string scriptName = "bulkDelete";
await CreateSprocIfNotExists(scriptFileName, scriptId, scriptName);
Uri sprocUri = UriFactory.CreateStoredProcedureUri(DbName, CollectionName, scriptName);
try
{
await client.ExecuteStoredProcedureAsync<Document>(sprocUri, transactionId.ToString());
}
catch (DocumentClientException ex)
{
throw;
}
}
private static async Task AddActiveTransaction()
{
var client = new DocumentClient(new Uri(Endpoint), AuthKey);
Uri collectionLink = UriFactory.CreateDocumentCollectionUri(DbName, CollectionName);
transactionId = Guid.NewGuid();
string scriptFileName = @"addActiveTransaction.js";
string scriptId = Path.GetFileNameWithoutExtension(scriptFileName);
string scriptName = "addActiveTransaction";
await CreateSprocIfNotExists(scriptFileName, scriptId, scriptName);
Uri sprocUri = UriFactory.CreateStoredProcedureUri(DbName, CollectionName, scriptName);
try
{
await client.ExecuteStoredProcedureAsync<Document>(sprocUri, activeTransactionsDocId.ToString(), transactionId.ToString());
}
catch (DocumentClientException ex)
{
throw;
}
}
private static async Task RemoveActiveTransaction()
{
var client = new DocumentClient(new Uri(Endpoint), AuthKey);
Uri collectionLink = UriFactory.CreateDocumentCollectionUri(DbName, CollectionName);
string scriptFileName = @"removeActiveTransaction.js";
string scriptId = Path.GetFileNameWithoutExtension(scriptFileName);
string scriptName = "removeActiveTransaction";
await CreateSprocIfNotExists(scriptFileName, scriptId, scriptName);
Uri sprocUri = UriFactory.CreateStoredProcedureUri(DbName, CollectionName, scriptName);
try
{
await client.ExecuteStoredProcedureAsync<Document>(sprocUri, activeTransactionsDocId.ToString(), transactionId.ToString());
}
catch (DocumentClientException ex)
{
throw;
}
}
private static async Task CreateActiveTransactionsDocIfNotExists(ActiveTransactions activeTransactionsDoc)
{
var client = new DocumentClient(new Uri(Endpoint), AuthKey);
Uri collectionLink = UriFactory.CreateDocumentCollectionUri(DbName, CollectionName);
bool needToCreate = false;
try
{
await client.ReadDocumentAsync(UriFactory.CreateDocumentUri(DbName, CollectionName, activeTransactionsDoc.Id.ToString()));
}
catch (DocumentClientException de)
{
if (de.StatusCode != HttpStatusCode.NotFound)
{
throw;
}
else
{
needToCreate = true;
}
}
if (needToCreate)
{
await client.CreateDocumentAsync(collectionLink, activeTransactionsDoc);
}
}
private static async Task ResetActiveTransactionsDocIfNotExists()
{
var client = new DocumentClient(new Uri(Endpoint), AuthKey);
Uri collectionLink = UriFactory.CreateDocumentUri(DbName, CollectionName, activeTransactionsDocId.ToString());
ActiveTransactions activeTransactionsDoc = new ActiveTransactions
{
Id = activeTransactionsDocId,
Transactions = new List<string>()
};
try {
await client.ReplaceDocumentAsync(collectionLink, activeTransactionsDoc);
}
catch (DocumentClientException de)
{
throw;
}
}
private static async Task CreateSprocIfNotExists(string scriptFileName, string scriptId, string scriptName)
{
var client = new DocumentClient(new Uri(Endpoint), AuthKey);
Uri collectionLink = UriFactory.CreateDocumentCollectionUri(DbName, CollectionName);
var sproc = new StoredProcedure
{
Id = scriptId,
Body = File.ReadAllText(scriptFileName)
};
bool needToCreate = false;
Uri sprocUri = UriFactory.CreateStoredProcedureUri(DbName, CollectionName, scriptName);
try
{
await client.ReadStoredProcedureAsync(sprocUri);
}
catch (DocumentClientException de)
{
if (de.StatusCode != HttpStatusCode.NotFound)
{
throw;
}
else
{
needToCreate = true;
}
}
if (needToCreate)
{
await client.CreateStoredProcedureAsync(collectionLink, sproc);
}
}
public class ActiveTransactions
{
[JsonProperty(PropertyName = "id")]
public Guid Id { get; set; }
public List<string> Transactions { get; set; }
public override string ToString()
{
return JsonConvert.SerializeObject(this);
}
}
public class ExampleDoc
{
[JsonProperty(PropertyName = "id")]
public string Id { get; set; }
public override string ToString()
{
return JsonConvert.SerializeObject(this);
}
}
}
}

Просмотреть файл

@ -0,0 +1,36 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("DocDBClientBulk")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("DocDBClientBulk")]
[assembly: AssemblyCopyright("Copyright © 2016")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]
// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("7970fdf2-313c-4816-8c43-59584f00451a")]
// Version information for an assembly consists of the following four values:
//
// Major Version
// Minor Version
// Build Number
// Revision
//
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]

Просмотреть файл

@ -0,0 +1,49 @@
// @function
// @param {string} id - The id for the active transactions document.
// @param {string} transactionId - The id for the new transaction.
function addActiveTransaction(id, transactionId) {
var collection = getContext().getCollection();
var collectionLink = collection.getSelfLink();
var response = getContext().getResponse();
// Validate input
if (!id) throw new Error("The active transactions document id is undefined or null.");
if (!transactionId) throw new Error("The transactionId is undefined or null.");
tryQueryAndUpdate();
function tryQueryAndUpdate() {
var query = {query: "select * from root r where r.id = @id", parameters: [{name: "@id", value: id}]};
var requestOptions = {};
var isAccepted = collection.queryDocuments(collectionLink, query, requestOptions, function(err, documents, responseOptions) {
if (err) throw err;
if (documents.length > 0) {
tryUpdate(documents[0])
} else {
throw new Error("Document not found.");
}
});
// If we hit execution bounds - throw an exception.
// This is highly unlikely given that this is a query by id; but is included to serve as an example for larger queries.
if(!isAccepted) {
throw new Error("The stored procedure timed out.");
}
}
// Updates the active transactions document according to the id passed in to the sproc.
function tryUpdate(doc) {
doc.Transactions.push(transactionId);
requestOptions = {};
var isAccepted = collection.replaceDocument(doc._self, doc, requestOptions, function (err, updatedDocument, responseOptions) {
if (err) throw err;
// If we have successfully updated the document - return it in the response body.
response.setBody(updatedDocument);
});
if (!isAccepted) {
throw new Error("The stored procedure timed out.");
}
}
}

Просмотреть файл

@ -0,0 +1,78 @@
/**
* A DocumentDB stored procedure that bulk deletes documents for a given query.<br/>
* Note: You may need to execute this sproc multiple times (depending whether the sproc is able to delete every document within the execution timeout limit).
*
* @function
* @param {string} query - A query that provides the documents to be deleted (e.g. "SELECT * FROM c WHERE c.founded_year = 2008")
* @returns {Object.<number, boolean>} Returns an object with the two properties:<br/>
* deleted - contains a count of documents deleted<br/>
* continuation - a boolean whether you should execute the sproc again (true if there are more documents to delete; false otherwise).
*/
function bulkDelete(transactionId) {
var collection = getContext().getCollection();
var collectionLink = collection.getSelfLink();
var response = getContext().getResponse();
var responseBody = {
deleted: 0,
continuation: true
};
// Validate input.
if (!transactionId) throw new Error("The transactionId is undefined or null.");
tryQueryAndDelete()
// Recursively runs the query w/ support for continuation tokens.
// Calls tryDelete(documents) as soon as the query returns documents.
function tryQueryAndDelete(continuation) {
var requestOptions = {continuation: continuation};
var query = {query: "select * from root r where r.TransactionId = @id", parameters: [{name: "@id", value: transactionId}]};
var isAccepted = collection.queryDocuments(collectionLink, query, requestOptions, function (err, retrievedDocs, responseOptions) {
if (err) throw err;
if (retrievedDocs.length > 0) {
// Begin deleting documents as soon as documents are returned form the query results.
// tryDelete() resumes querying after deleting; no need to page through continuation tokens.
// - this is to prioritize writes over reads given timeout constraints.
tryDelete(retrievedDocs);
} else if (responseOptions.continuation) {
// Else if the query came back empty, but with a continuation token; repeat the query w/ the token.
tryQueryAndDelete(responseOptions.continuation);
} else {
// Else if there are no more documents and no continuation token - we are finished deleting documents.
responseBody.continuation = false;
response.setBody(responseBody);
}
});
// If we hit execution bounds - return continuation: true.
if (!isAccepted) {
response.setBody(responseBody);
}
}
// Recursively deletes documents passed in as an array argument.
// Attempts to query for more on empty array.
function tryDelete(documents) {
if (documents.length > 0) {
// Delete the first document in the array.
var isAccepted = collection.deleteDocument(documents[0]._self, {}, function (err, responseOptions) {
if (err) throw err;
responseBody.deleted++;
documents.shift();
// Delete the next document in the array.
tryDelete(documents);
});
// If we hit execution bounds - return continuation: true.
if (!isAccepted) {
response.setBody(responseBody);
}
} else {
// If the document array is empty, query for more documents.
tryQueryAndDelete();
}
}
}

Просмотреть файл

@ -0,0 +1,60 @@
/**
* This script called as stored procedure to import lots of documents in one batch.
* The script sets response body to the number of docs imported and is called multiple times
* by the client until total number of docs desired by the client is imported.
* @param {Object[]} docs - Array of documents to import.
*/
function bulkImport(transactionId, docs) {
var collection = getContext().getCollection();
var collectionLink = collection.getSelfLink();
// The count of imported docs, also used as current doc index.
var count = 0;
// Validate input.
if (!docs) throw new Error("The array is undefined or null.");
if (!transactionId) throw new Error("The transactionId is undefined or null.")
var docsLength = docs.length;
if (docsLength == 0) {
getContext().getResponse().setBody(0);
return;
}
// Call the CRUD API to create a document.
tryCreate(docs[count], callback);
// Note that there are 2 exit conditions:
// 1) The createDocument request was not accepted.
// In this case the callback will not be called, we just call setBody and we are done.
// 2) The callback was called docs.length times.
// In this case all documents were created and we don't need to call tryCreate anymore. Just call setBody and we are done.
function tryCreate(doc, callback) {
doc.TransactionId = transactionId;
var isAccepted = collection.createDocument(collectionLink, doc, callback);
// If the request was accepted, callback will be called.
// Otherwise report current count back to the client,
// which will call the script again with remaining set of docs.
// This condition will happen when this stored procedure has been running too long
// and is about to get cancelled by the server. This will allow the calling client
// to resume this batch from the point we got to before isAccepted was set to false
if (!isAccepted) getContext().getResponse().setBody(count);
}
// This is called when collection.createDocument is done and the document has been persisted.
function callback(err, doc, options) {
if (err) throw err;
// One more document has been inserted, increment the count.
count++;
if (count >= docsLength) {
// If we have created all documents, we are done. Just set the response.
getContext().getResponse().setBody(count);
} else {
// Create next document.
tryCreate(docs[count], callback);
}
}
}

Просмотреть файл

@ -0,0 +1,11 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Microsoft.Azure.DocumentDB" version="1.9.2" targetFramework="net45" />
<package id="Microsoft.Azure.KeyVault.Core" version="1.0.0" targetFramework="net45" />
<package id="Microsoft.Data.Edm" version="5.6.4" targetFramework="net45" />
<package id="Microsoft.Data.OData" version="5.6.4" targetFramework="net45" />
<package id="Microsoft.Data.Services.Client" version="5.6.4" targetFramework="net45" />
<package id="Newtonsoft.Json" version="9.0.1" targetFramework="net45" />
<package id="System.Spatial" version="5.6.4" targetFramework="net45" />
<package id="WindowsAzure.Storage" version="7.1.2" targetFramework="net45" />
</packages>

Просмотреть файл

@ -0,0 +1,53 @@
// @function
// @param {string} id - The id for the active transactions document.
// @param {string} transactionId - The id for the transaction that completed.
function removeActiveTransaction(id, transactionId) {
var collection = getContext().getCollection();
var collectionLink = collection.getSelfLink();
var response = getContext().getResponse();
// Validate input
if (!id) throw new Error("The active transactions document id is undefined or null.");
if (!transactionId) throw new Error("The transactionId is undefined or null.");
tryQueryAndUpdate();
function tryQueryAndUpdate() {
var query = {query: "select * from root r where r.id = @id", parameters: [{name: "@id", value: id}]};
var requestOptions = {};
var isAccepted = collection.queryDocuments(collectionLink, query, requestOptions, function(err, documents, responseOptions) {
if (err) throw err;
if (documents.length > 0) {
tryUpdate(documents[0])
} else {
throw new Error("Document not found.");
}
});
// If we hit execution bounds - throw an exception.
// This is highly unlikely given that this is a query by id; but is included to serve as an example for larger queries.
if(!isAccepted) {
throw new Error("The stored procedure timed out.");
}
}
// Removes transactionId from the active transactions document.
function tryUpdate(doc) {
var index = doc.Transactions.indexOf(transactionId);
if (index > -1) {
doc.Transactions.splice(index, 1);
requestOptions = {};
var isAccepted = collection.replaceDocument(doc._self, doc, requestOptions, function (err, updatedDocument, responseOptions) {
if (err) throw err;
// If we have successfully updated the document - return it in the response body.
response.setBody(updatedDocument);
});
if (!isAccepted) {
throw new Error("The stored procedure timed out.");
}
}
}
}

Просмотреть файл

@ -12,13 +12,14 @@
<add key="DatabaseName" value="db"/>
<add key="CollectionName" value="data"/>
<add key="MetricCollectionName" value="metrics"/>
<add key="CollectionThroughput" value="50000"/>
<add key="CollectionThroughput" value="10100"/>
<add key="ShouldCleanupOnStart" value="false"/>
<add key="ShouldCleanupOnFinish" value="true"/>
<add key="DegreeOfParallelism" value="500"/>
<add key="NumberOfDocumentsToInsert" value="100000"/>
<add key="ShouldCleanupOnFinish" value="false"/>
<!-- determined by the tool -->
<add key="DegreeOfParallelism" value="-1"/>
<add key="NumberOfDocumentsToInsert" value="1000000"/>
<add key="CollectionPartitionKey" value="/partitionKey"/>
<add key="DocumentTemplateFile" value="Player.json"/>

Просмотреть файл

@ -11,7 +11,7 @@
<AssemblyName>DocumentDBBenchmark</AssemblyName>
<TargetFrameworkVersion>v4.5.1</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<NuGetPackageImportStamp>b4fb0ea4</NuGetPackageImportStamp>
<NuGetPackageImportStamp>f78fc7b6</NuGetPackageImportStamp>
<TargetFrameworkProfile />
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
@ -34,8 +34,8 @@
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Reference Include="Microsoft.Azure.Documents.Client, Version=1.7.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>packages\Microsoft.Azure.DocumentDB.1.7.1\lib\net45\Microsoft.Azure.Documents.Client.dll</HintPath>
<Reference Include="Microsoft.Azure.Documents.Client, Version=1.9.0.0, Culture=neutral, processorArchitecture=MSIL">
<HintPath>packages\Microsoft.Azure.DocumentDB.1.9.2\lib\net45\Microsoft.Azure.Documents.Client.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Newtonsoft.Json, Version=6.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
@ -72,12 +72,12 @@
<Folder Include="Properties\" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<Import Project="packages\Microsoft.Azure.DocumentDB.1.7.1\build\Microsoft.Azure.DocumentDB.targets" Condition="Exists('packages\Microsoft.Azure.DocumentDB.1.7.1\build\Microsoft.Azure.DocumentDB.targets')" />
<Import Project="packages\Microsoft.Azure.DocumentDB.1.9.2\build\Microsoft.Azure.DocumentDB.targets" Condition="Exists('packages\Microsoft.Azure.DocumentDB.1.9.2\build\Microsoft.Azure.DocumentDB.targets')" />
<Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild">
<PropertyGroup>
<ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText>
</PropertyGroup>
<Error Condition="!Exists('packages\Microsoft.Azure.DocumentDB.1.7.1\build\Microsoft.Azure.DocumentDB.targets')" Text="$([System.String]::Format('$(ErrorText)', 'packages\Microsoft.Azure.DocumentDB.1.7.1\build\Microsoft.Azure.DocumentDB.targets'))" />
<Error Condition="!Exists('packages\Microsoft.Azure.DocumentDB.1.9.2\build\Microsoft.Azure.DocumentDB.targets')" Text="$([System.String]::Format('$(ErrorText)', 'packages\Microsoft.Azure.DocumentDB.1.9.2\build\Microsoft.Azure.DocumentDB.targets'))" />
</Target>
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.

Просмотреть файл

@ -28,13 +28,21 @@
{
private static readonly string DatabaseName = ConfigurationManager.AppSettings["DatabaseName"];
private static readonly string DataCollectionName = ConfigurationManager.AppSettings["CollectionName"];
private static readonly string MetricCollectionName = ConfigurationManager.AppSettings["MetricCollectionName"];
private static readonly int CollectionThroughput = int.Parse(ConfigurationManager.AppSettings["CollectionThroughput"]);
private static readonly ConnectionPolicy ConnectionPolicy = new ConnectionPolicy { ConnectionMode = ConnectionMode.Direct, ConnectionProtocol = Protocol.Tcp, RequestTimeout = new TimeSpan(1, 0, 0) };
private static readonly ConnectionPolicy ConnectionPolicy = new ConnectionPolicy
{
ConnectionMode = ConnectionMode.Direct,
ConnectionProtocol = Protocol.Tcp,
RequestTimeout = new TimeSpan(1, 0, 0),
MaxConnectionLimit = 1000,
RetryOptions = new RetryOptions
{
MaxRetryAttemptsOnThrottledRequests = 100,
MaxRetryWaitTimeInSeconds = 60
}
};
private static readonly int TaskCount = int.Parse(ConfigurationManager.AppSettings["DegreeOfParallelism"]);
private static readonly int DefaultConnectionLimit = int.Parse(ConfigurationManager.AppSettings["DegreeOfParallelism"]);
private static readonly string InstanceId = Dns.GetHostEntry("LocalHost").HostName + Process.GetCurrentProcess().Id;
private const int MinThreadPoolSize = 100;
@ -58,9 +66,6 @@
/// <param name="args">command line arguments.</param>
public static void Main(string[] args)
{
ServicePointManager.UseNagleAlgorithm = true;
ServicePointManager.Expect100Continue = true;
ServicePointManager.DefaultConnectionLimit = DefaultConnectionLimit;
ThreadPool.SetMinThreads(MinThreadPoolSize, MinThreadPoolSize);
string endpoint = ConfigurationManager.AppSettings["EndPointUrl"];
@ -71,7 +76,7 @@
Console.WriteLine("Endpoint: {0}", endpoint);
Console.WriteLine("Collection : {0}.{1} at {2} request units per second", DatabaseName, DataCollectionName, ConfigurationManager.AppSettings["CollectionThroughput"]);
Console.WriteLine("Document Template*: {0}", ConfigurationManager.AppSettings["DocumentTemplateFile"]);
Console.WriteLine("Degree of parallelism*: {0}", TaskCount);
Console.WriteLine("Degree of parallelism*: {0}", ConfigurationManager.AppSettings["DegreeOfParallelism"]);
Console.WriteLine("--------------------------------------------------------------------- ");
Console.WriteLine();
@ -112,7 +117,9 @@
/// <returns>a Task object.</returns>
private async Task RunAsync()
{
DocumentCollection dataCollection = GetCollectionIfExists(DatabaseName, DataCollectionName);
int currentCollectionThroughput = 0;
if (bool.Parse(ConfigurationManager.AppSettings["ShouldCleanupOnStart"]) || dataCollection == null)
{
@ -127,49 +134,39 @@
Console.WriteLine("Creating collection {0} with {1} RU/s", DataCollectionName, CollectionThroughput);
dataCollection = await this.CreatePartitionedCollectionAsync(DatabaseName, DataCollectionName);
currentCollectionThroughput = CollectionThroughput;
}
else
{
OfferV2 offer = (OfferV2)client.CreateOfferQuery().Where(o => o.ResourceLink == dataCollection.SelfLink).AsEnumerable().FirstOrDefault();
int throughput = offer.Content.OfferThroughput;
Console.WriteLine("Found collection {0} with {1} RU/s", DataCollectionName, CollectionThroughput);
currentCollectionThroughput = offer.Content.OfferThroughput;
Console.WriteLine("Found collection {0} with {1} RU/s", DataCollectionName, currentCollectionThroughput);
}
DocumentCollection metricCollection = GetCollectionIfExists(DatabaseName, MetricCollectionName);
int taskCount;
int degreeOfParallelism = int.Parse(ConfigurationManager.AppSettings["DegreeOfParallelism"]);
// Configure to expire metrics for old clients if not updated for longer than a minute
int defaultTimeToLive = 60;
if (metricCollection == null)
if (degreeOfParallelism == -1)
{
Console.WriteLine("Creating metric collection {0}", MetricCollectionName);
DocumentCollection metricCollectionDefinition = new DocumentCollection();
metricCollectionDefinition.Id = MetricCollectionName;
metricCollectionDefinition.DefaultTimeToLive = defaultTimeToLive;
metricCollection = await ExecuteWithRetries<ResourceResponse<DocumentCollection>>(
this.client,
() => client.CreateDocumentCollectionAsync(
UriFactory.CreateDatabaseUri(DatabaseName),
new DocumentCollection { Id = MetricCollectionName },
new RequestOptions { OfferThroughput = 5000 }),
true);
// set TaskCount = 10 for each 10k RUs
taskCount = currentCollectionThroughput / 1000;
}
else
{
metricCollection.DefaultTimeToLive = defaultTimeToLive;
await client.ReplaceDocumentCollectionAsync(metricCollection);
taskCount = degreeOfParallelism;
}
Console.WriteLine("Starting Inserts with {0} tasks", TaskCount);
Console.WriteLine("Starting Inserts with {0} tasks", taskCount);
string sampleDocument = File.ReadAllText(ConfigurationManager.AppSettings["DocumentTemplateFile"]);
pendingTaskCount = TaskCount;
pendingTaskCount = taskCount;
var tasks = new List<Task>();
tasks.Add(this.LogOutputStats());
long numberOfDocumentsToInsert = long.Parse(ConfigurationManager.AppSettings["NumberOfDocumentsToInsert"])/TaskCount;
for (var i = 0; i < TaskCount; i++)
long numberOfDocumentsToInsert = long.Parse(ConfigurationManager.AppSettings["NumberOfDocumentsToInsert"]) / taskCount;
for (var i = 0; i < taskCount; i++)
{
tasks.Add(this.InsertDocument(i, client, dataCollection, sampleDocument, numberOfDocumentsToInsert));
}
@ -179,10 +176,7 @@
if (bool.Parse(ConfigurationManager.AppSettings["ShouldCleanupOnFinish"]))
{
Console.WriteLine("Deleting Database {0}", DatabaseName);
await ExecuteWithRetries<ResourceResponse<Database>>(
this.client,
() => client.DeleteDatabaseAsync(UriFactory.CreateDatabaseUri(DatabaseName)),
true);
await client.DeleteDatabaseAsync(UriFactory.CreateDatabaseUri(DatabaseName));
}
}
@ -199,12 +193,10 @@
try
{
ResourceResponse<Document> response = await ExecuteWithRetries<ResourceResponse<Document>>(
client,
() => client.CreateDocumentAsync(
ResourceResponse<Document> response = await client.CreateDocumentAsync(
UriFactory.CreateDocumentCollectionUri(DatabaseName, DataCollectionName),
newDictionary,
new RequestOptions() { }));
new RequestOptions() { });
string partition = response.SessionToken.Split(':')[0];
requestUnitsConsumed[taskId] += response.RequestCharge;
@ -252,20 +244,6 @@
Math.Round(ruPerSecond),
Math.Round(ruPerMonth / (1000 * 1000 * 1000)));
Dictionary<string, object> latestStats = new Dictionary<string, object>();
latestStats["id"] = string.Format("latest{0}", InstanceId);
latestStats["type"] = "latest";
latestStats["totalDocumentsCreated"] = currentCount;
latestStats["documentsCreatedPerSecond"] = Math.Round(this.documentsInserted / seconds);
latestStats["requestUnitsPerSecond"] = Math.Round(ruPerSecond);
latestStats["requestUnitsPerMonth"] = Math.Round(ruPerSecond) * 86400 * 30;
latestStats["documentsCreatedInLastSecond"] = Math.Round((currentCount - lastCount) / (seconds - lastSeconds));
latestStats["requestUnitsInLastSecond"] = Math.Round((requestUnits - lastRequestUnits) / (seconds - lastSeconds));
latestStats["requestUnitsPerMonthBasedOnLastSecond"] =
Math.Round(((requestUnits - lastRequestUnits) / (seconds - lastSeconds)) * 86400 * 30);
await InsertMetricsToDocumentDB(latestStats);
lastCount = documentsInserted;
lastSeconds = seconds;
lastRequestUnits = requestUnits;
@ -286,22 +264,6 @@
Console.WriteLine("--------------------------------------------------------------------- ");
}
private async Task InsertMetricsToDocumentDB(Dictionary<string, object> latestStats)
{
try
{
await ExecuteWithRetries<ResourceResponse<Document>>(
client,
() => client.UpsertDocumentAsync(
UriFactory.CreateDocumentCollectionUri(DatabaseName, MetricCollectionName),
latestStats));
}
catch (Exception e)
{
Trace.TraceError("Insert metrics document failed with {0}", e);
}
}
/// <summary>
/// Create a partitioned collection.
/// </summary>
@ -317,16 +279,14 @@
// Show user cost of running this test
double estimatedCostPerMonth = 0.06 * CollectionThroughput;
double estimatedCostPerHour = estimatedCostPerMonth / (24 * 30);
Console.WriteLine("The collection will cost an estimated ${0} per hour (${1} per month)", estimatedCostPerHour, estimatedCostPerMonth);
Console.WriteLine("The collection will cost an estimated ${0} per hour (${1} per month)", Math.Round(estimatedCostPerHour, 2), Math.Round(estimatedCostPerMonth, 2));
Console.WriteLine("Press enter to continue ...");
Console.ReadLine();
return await ExecuteWithRetries<ResourceResponse<DocumentCollection>>(
this.client,
() => client.CreateDocumentCollectionAsync(
return await client.CreateDocumentCollectionAsync(
UriFactory.CreateDatabaseUri(databaseName),
collection,
new RequestOptions { OfferThroughput = CollectionThroughput }));
new RequestOptions { OfferThroughput = CollectionThroughput });
}
/// <summary>
@ -352,67 +312,5 @@
return client.CreateDocumentCollectionQuery(UriFactory.CreateDatabaseUri(databaseName))
.Where(c => c.Id == collectionName).AsEnumerable().FirstOrDefault();
}
/// <summary>
/// Execute the function with retries on throttle.
/// </summary>
/// <typeparam name="V">The type of return value from the execution.</typeparam>
/// <param name="client">The DocumentDB client instance.</param>
/// <param name="function">The function to execute.</param>
/// <returns>The response from the execution.</returns>
public static async Task<V> ExecuteWithRetries<V>(DocumentClient client, Func<Task<V>> function, bool shouldLogRetries = false)
{
TimeSpan sleepTime = TimeSpan.Zero;
int[] expectedStatusCodes = new int[] { 429, 400, 503 };
while (true)
{
try
{
return await function();
}
catch (System.Net.Http.HttpRequestException)
{
sleepTime = TimeSpan.FromSeconds(1);
}
catch (Exception e)
{
DocumentClientException de;
if (!TryExtractDocumentClientException(e, out de))
{
throw;
}
sleepTime = de.RetryAfter;
if (shouldLogRetries)
{
Console.WriteLine("Retrying after sleeping for {0}", sleepTime);
}
}
await Task.Delay(sleepTime);
}
}
private static bool TryExtractDocumentClientException(Exception e, out DocumentClientException de)
{
if (e is DocumentClientException)
{
de = (DocumentClientException)e;
return true;
}
if (e is AggregateException)
{
if (e.InnerException is DocumentClientException)
{
de = (DocumentClientException)e.InnerException;
return true;
}
}
de = null;
return false;
}
}
}

Просмотреть файл

@ -1,5 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Microsoft.Azure.DocumentDB" version="1.7.1" targetFramework="net451" />
<package id="Microsoft.Azure.DocumentDB" version="1.9.2" targetFramework="net451" />
<package id="Newtonsoft.Json" version="6.0.8" targetFramework="net451" />
</packages>