update benchmarks for corpus connection, periodic test (#206)
This commit is contained in:
Родитель
6ea05fe748
Коммит
bd0a5d8228
|
@ -17,7 +17,6 @@ namespace PerformanceTests.FileHash
|
|||
/// </summary>
|
||||
public static class FileOrchestration
|
||||
{
|
||||
|
||||
[FunctionName(nameof(FileOrchestration))]
|
||||
public static async Task<double> Run([OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)
|
||||
{
|
||||
|
|
|
@ -5,13 +5,12 @@ namespace PerformanceTests.FileHash
|
|||
{
|
||||
using System;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.WebJobs;
|
||||
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
|
||||
using Microsoft.Azure.Storage;
|
||||
using Microsoft.Azure.Storage.Blob;
|
||||
using System.Linq;
|
||||
using System.Collections.Generic;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Azure.Storage.Blobs;
|
||||
using Microsoft.Azure.WebJobs;
|
||||
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
|
||||
|
||||
/// <summary>
|
||||
/// An activity that
|
||||
|
@ -19,18 +18,20 @@ namespace PerformanceTests.FileHash
|
|||
public static class GetFilesActivity
|
||||
{
|
||||
[FunctionName(nameof(GetFilesActivity))]
|
||||
public static Task<List<string>> Run([ActivityTrigger] IDurableActivityContext context, ILogger log)
|
||||
public static async Task<List<string>> Run([ActivityTrigger] IDurableActivityContext context, ILogger log)
|
||||
{
|
||||
// setup connection to the corpus with the text files
|
||||
CloudBlobClient serviceClient = new CloudBlobClient(new Uri(@"https://gutenbergcorpus.blob.core.windows.net"));
|
||||
CloudBlobContainer blobContainer = serviceClient.GetContainerReference("gutenberg");
|
||||
CloudBlobDirectory blobDirectory = blobContainer.GetDirectoryReference($"Gutenberg/txt");
|
||||
// list all the books in the corpus
|
||||
var storageConnectionString = Environment.GetEnvironmentVariable("CorpusConnection");
|
||||
var blobContainerClient = new BlobContainerClient(storageConnectionString, blobContainerName: "gutenberg");
|
||||
|
||||
// get the list of files(books) from blob storage
|
||||
List<IListBlobItem> books = blobDirectory.ListBlobs().ToList();
|
||||
List<string> bookNames = books.Select(x => ((CloudBlockBlob)x).Name).ToList();
|
||||
List<string> bookNames = new List<string>();
|
||||
await foreach (var blob in blobContainerClient.GetBlobsAsync(prefix: "Gutenberg/txt"))
|
||||
{
|
||||
bookNames.Add(blob.Name);
|
||||
}
|
||||
log.LogWarning($"{bookNames.Count} books in total");
|
||||
return Task.FromResult(bookNames);
|
||||
|
||||
return bookNames;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,6 +10,8 @@ namespace PerformanceTests.FileHash
|
|||
using Microsoft.Azure.Storage;
|
||||
using Microsoft.Azure.Storage.Blob;
|
||||
using System.Linq;
|
||||
using Azure.Storage.Blobs;
|
||||
using Azure.Storage.Blobs.Models;
|
||||
|
||||
/// <summary>
|
||||
/// An activity that
|
||||
|
@ -19,16 +21,13 @@ namespace PerformanceTests.FileHash
|
|||
[FunctionName(nameof(HashActivity))]
|
||||
public static async Task<long> Run([ActivityTrigger] IDurableActivityContext context)
|
||||
{
|
||||
char[] separators = { ' ', '\n', '<', '>', '=', '\"', '\'', '/', '\\', '(', ')', '\t', '{', '}', '[', ']', ',', '.', ':', ';' };
|
||||
|
||||
// setup connection to the corpus with the text files
|
||||
CloudBlobClient serviceClient = new CloudBlobClient(new Uri(@"https://gutenbergcorpus.blob.core.windows.net"));
|
||||
|
||||
// download the book from blob storage
|
||||
var input = context.GetInput<(string book, int multiplier)>();
|
||||
CloudBlobContainer blobContainer = serviceClient.GetContainerReference("gutenberg");
|
||||
CloudBlockBlob blob = blobContainer.GetBlockBlobReference(input.book);
|
||||
string doc = await blob.DownloadTextAsync();
|
||||
|
||||
// download the book content from the corpus
|
||||
var storageConnectionString = Environment.GetEnvironmentVariable("CorpusConnection");
|
||||
var blobClient = new BlobClient(storageConnectionString, blobContainerName: "gutenberg", blobName: input.book);
|
||||
BlobDownloadResult result = await blobClient.DownloadContentAsync();
|
||||
string doc = result.Content.ToString();
|
||||
|
||||
long wordCount = 0;
|
||||
string[] words = doc.Split(separators, StringSplitOptions.RemoveEmptyEntries);
|
||||
|
@ -46,5 +45,7 @@ namespace PerformanceTests.FileHash
|
|||
// return the number of words hashed
|
||||
return wordCount;
|
||||
}
|
||||
|
||||
static readonly char[] separators = { ' ', '\n', '<', '>', '=', '\"', '\'', '/', '\\', '(', ')', '\t', '{', '}', '[', ']', ',', '.', ':', ';' };
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,15 +20,16 @@ namespace PerformanceTests.Periodic
|
|||
{
|
||||
[FunctionName(nameof(Periodic))]
|
||||
public static async Task<IActionResult> Run(
|
||||
[HttpTrigger(AuthorizationLevel.Anonymous, "get")] HttpRequest req,
|
||||
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = "periodic/{iterations}/{minutes}/")] HttpRequest req,
|
||||
[DurableClient] IDurableClient client,
|
||||
int iterations,
|
||||
double minutes,
|
||||
ILogger log)
|
||||
{
|
||||
// start the orchestration
|
||||
string orchestrationInstanceId = await client.StartNewAsync(nameof(PeriodicOrchestration));
|
||||
string orchestrationInstanceId = await client.StartNewAsync(nameof(PeriodicOrchestration), null, (iterations, minutes));
|
||||
|
||||
// wait for it to complete and return the result
|
||||
return await client.WaitForCompletionOrCreateCheckStatusResponseAsync(req, orchestrationInstanceId, TimeSpan.FromSeconds(200));
|
||||
}
|
||||
return client.CreateCheckStatusResponse(req, orchestrationInstanceId, false);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,20 +19,22 @@ namespace PerformanceTests.Periodic
|
|||
[FunctionName(nameof(PeriodicOrchestration))]
|
||||
public static async Task Run([OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)
|
||||
{
|
||||
for (int i = 0; i < 5; i++)
|
||||
(int iterations, double minutes) = context.GetInput<(int,double)>();
|
||||
|
||||
for (int i = 0; i < iterations; i++)
|
||||
{
|
||||
DateTime fireAt = context.CurrentUtcDateTime + TimeSpan.FromSeconds(60);
|
||||
DateTime fireAt = context.CurrentUtcDateTime + TimeSpan.FromMinutes(minutes);
|
||||
|
||||
if (!context.IsReplaying)
|
||||
{
|
||||
log.LogWarning($"{context.InstanceId}: starting timer for iteration {i}, to fire at {fireAt}");
|
||||
log.LogWarning("{instanceId}: periodic timer: starting iteration {iteration}, to fire at {fireAt}", context.InstanceId, i, fireAt);
|
||||
}
|
||||
|
||||
await context.CreateTimer(fireAt, CancellationToken.None);
|
||||
|
||||
if (!context.IsReplaying)
|
||||
{
|
||||
log.LogWarning($"{context.InstanceId}: timer for iteration {i} fired at {(UtcNowWithNoWarning() - fireAt).TotalMilliseconds:F2}ms relative to deadline");
|
||||
log.LogWarning("{instanceId}: periodic timer: iteration {iteration} fired at {accuracyMs:F2}ms relative to deadline", context.InstanceId, i, (UtcNowWithNoWarning() - fireAt).TotalMilliseconds);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -15,10 +15,10 @@ namespace PerformanceTests.WordCount
|
|||
using Microsoft.Azure.Storage;
|
||||
using System.Collections.Generic;
|
||||
using System.Text;
|
||||
using Microsoft.Azure.Storage.Blob;
|
||||
using DurableTask.Netherite.Faster;
|
||||
using System.Net.Http;
|
||||
using System.Net;
|
||||
using Azure.Storage.Blobs;
|
||||
|
||||
/// <summary>
|
||||
/// Defines the REST operations for the word count test.
|
||||
|
@ -36,14 +36,6 @@ namespace PerformanceTests.WordCount
|
|||
string[] counts = shape.Split('x');
|
||||
int mapperCount, reducerCount;
|
||||
|
||||
// setup connection to the corpus with the text files
|
||||
CloudBlobClient serviceClient = new CloudBlobClient(new Uri(@"https://gutenbergcorpus.blob.core.windows.net"));
|
||||
CloudBlobContainer blobContainer = serviceClient.GetContainerReference("gutenberg");
|
||||
CloudBlobDirectory blobDirectory = blobContainer.GetDirectoryReference($"Gutenberg/txt");
|
||||
|
||||
// get the list of files(books) from blob storage
|
||||
IEnumerable<IListBlobItem> books = blobDirectory.ListBlobs();
|
||||
|
||||
if (!(counts.Length == 2 && int.TryParse(counts[0], out mapperCount) && int.TryParse(counts[1], out reducerCount)))
|
||||
{
|
||||
return new JsonResult(new { message = "Please specify the mapper count and reducer count in the query parameters, e.g. &shape=10x10" })
|
||||
|
@ -52,6 +44,21 @@ namespace PerformanceTests.WordCount
|
|||
};
|
||||
}
|
||||
|
||||
// get a book name for each mapper from the corpus
|
||||
var storageConnectionString = Environment.GetEnvironmentVariable("CorpusConnection");
|
||||
var blobContainerClient = new BlobContainerClient(storageConnectionString, blobContainerName: "gutenberg");
|
||||
List<string> bookNames = new List<string>();
|
||||
await foreach (var blob in blobContainerClient.GetBlobsAsync(prefix: "Gutenberg/txt"))
|
||||
{
|
||||
if (bookNames.Count == mapperCount)
|
||||
{
|
||||
break; // we send only one book to each mapper
|
||||
}
|
||||
|
||||
bookNames.Add(blob.Name);
|
||||
}
|
||||
int bookCount = bookNames.Count;
|
||||
|
||||
// ----- PHASE 1 ----------
|
||||
// initialize all three types of entities prior to running the mapreduce
|
||||
|
||||
|
@ -70,19 +77,11 @@ namespace PerformanceTests.WordCount
|
|||
// ----- PHASE 2 ----------
|
||||
// send work to the mappers
|
||||
|
||||
int bookCount = 0;
|
||||
|
||||
foreach (var book in books)
|
||||
for (int mapper = 0; mapper < bookCount; mapper++)
|
||||
{
|
||||
log.LogWarning($"The book name is {((CloudBlockBlob)book).Name}");
|
||||
int mapper = bookCount++;
|
||||
var _ = client.SignalEntityAsync(Mapper.GetEntityId(mapper), nameof(Mapper.Ops.Item), ((CloudBlockBlob)book).Name);
|
||||
|
||||
if (bookCount == mapperCount)
|
||||
{
|
||||
log.LogWarning($"Processed {bookCount} books, exiting");
|
||||
break;
|
||||
}
|
||||
string book = bookNames[mapper];
|
||||
log.LogWarning($"The book name is {book}");
|
||||
var _ = client.SignalEntityAsync(Mapper.GetEntityId(mapper), nameof(Mapper.Ops.Item), book);
|
||||
}
|
||||
|
||||
// ----- PHASE 3 ----------
|
||||
|
|
|
@ -9,9 +9,9 @@ namespace PerformanceTests.WordCount
|
|||
using System.Net;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Azure.Storage.Blobs.Models;
|
||||
using Azure.Storage.Blobs;
|
||||
using DurableTask.Netherite.Faster;
|
||||
using Microsoft.Azure.Storage;
|
||||
using Microsoft.Azure.Storage.Blob;
|
||||
using Microsoft.Azure.WebJobs;
|
||||
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
@ -60,14 +60,13 @@ namespace PerformanceTests.WordCount
|
|||
Stopwatch s = new Stopwatch();
|
||||
s.Start();
|
||||
|
||||
// setup connection to the corpus with the text files
|
||||
CloudBlobClient serviceClient = new CloudBlobClient(new Uri(@"https://gutenbergcorpus.blob.core.windows.net"));
|
||||
|
||||
// download the book from blob storage
|
||||
string book = context.GetInput<string>();
|
||||
CloudBlobContainer blobContainer = serviceClient.GetContainerReference("gutenberg");
|
||||
CloudBlockBlob blob = blobContainer.GetBlockBlobReference(book);
|
||||
string doc = await blob.DownloadTextAsync();
|
||||
|
||||
// download the book content from the corpus
|
||||
var storageConnectionString = Environment.GetEnvironmentVariable("CorpusConnection");
|
||||
var blobClient = new BlobClient(storageConnectionString, blobContainerName: "gutenberg", blobName: book);
|
||||
BlobDownloadResult result = await blobClient.DownloadContentAsync();
|
||||
string doc = result.Content.ToString();
|
||||
|
||||
string[] words = doc.Split(separators, StringSplitOptions.RemoveEmptyEntries);
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче