Select resources by usage statistics. (#286)

Select resources by usage statistics.
This commit is contained in:
alonadam 2023-11-16 12:40:14 +02:00 коммит произвёл GitHub
Родитель 7cbffe6be1
Коммит 94efa7c688
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
12 изменённых файлов: 515 добавлений и 89 удалений

Просмотреть файл

@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Unversioned
### Added
- When working with storage resources, use success/failure statistics and retry mechanism to improve ingestion stability
## [5.2.3] - 2023-11-07
### Fixed

Просмотреть файл

@ -25,13 +25,12 @@ export class KustoIngestClient extends KustoIngestClientBase {
const blob = descriptor.file as Blob;
const props = this._getMergedProps(ingestionProperties);
const [fileToUpload, blockBlobClient] = await Promise.all([
descriptor.prepare(),
this.resourceManager.getBlockBlobClient(generateBlobName(descriptor, props)),
]);
const fileToUpload = await descriptor.prepare();
const blobName = generateBlobName(descriptor, props);
await blockBlobClient.uploadData(fileToUpload);
return this.ingestFromBlob(new BlobDescriptor(blockBlobClient.url, blob.size, descriptor.sourceId), props);
const blobUri = await this.uploadToBlobWithRetry(fileToUpload, blobName);
return this.ingestFromBlob(new BlobDescriptor(blobUri, blob.size, descriptor.sourceId), props);
}
/**
@ -42,10 +41,8 @@ export class KustoIngestClient extends KustoIngestClientBase {
const props = this._getMergedProps(ingestionProperties);
const descriptor: StreamDescriptor = stream instanceof StreamDescriptor ? stream : new StreamDescriptor(stream);
const blobName = generateBlobName(descriptor, props);
const blockBlobClient = await this.resourceManager.getBlockBlobClient(blobName);
await blockBlobClient.uploadData(descriptor.stream as ArrayBuffer);
return this.ingestFromBlob(new BlobDescriptor(blockBlobClient.url), props); // descriptor.size?
const blobUri = await this.uploadToBlobWithRetry(descriptor.stream as ArrayBuffer, blobName);
return this.ingestFromBlob(new BlobDescriptor(blobUri), props); // descriptor.size?
}
}

Просмотреть файл

@ -28,9 +28,9 @@ export class KustoIngestClient extends KustoIngestClientBase {
try {
const blobName = generateBlobName(descriptor, props);
const [fileToUpload, blockBlobClient] = await Promise.all([descriptor.prepare(), this.resourceManager.getBlockBlobClient(blobName)]);
await blockBlobClient.uploadFile(fileToUpload);
return this.ingestFromBlob(new BlobDescriptor(blockBlobClient.url, descriptor.size, descriptor.sourceId), props);
const fileToUpload = await descriptor.prepare();
const blobUri = await this.uploadToBlobWithRetry(fileToUpload, blobName);
return this.ingestFromBlob(new BlobDescriptor(blobUri, descriptor.size, descriptor.sourceId), props);
} finally {
await descriptor.cleanup();
}
@ -49,14 +49,9 @@ export class KustoIngestClient extends KustoIngestClientBase {
const blobName = generateBlobName(descriptor, props);
const blockBlobClient = await this.resourceManager.getBlockBlobClient(blobName);
if (descriptor.stream instanceof Buffer) {
await blockBlobClient.uploadData(descriptor.stream as Buffer);
} else {
await blockBlobClient.uploadStream(descriptor.stream as Readable);
}
const blobUri = await this.uploadToBlobWithRetry(descriptor, blobName);
return this.ingestFromBlob(new BlobDescriptor(blockBlobClient.url), props); // descriptor.size?
return this.ingestFromBlob(new BlobDescriptor(blobUri), props); // descriptor.size?
}
}

Просмотреть файл

@ -3,20 +3,23 @@
import { Client as KustoClient, KustoConnectionStringBuilder } from "azure-kusto-data";
import { BlobDescriptor } from "./descriptors";
import ResourceManager from "./resourceManager";
import IngestionBlobInfo from "./ingestionBlobInfo";
import { ContainerClient } from "@azure/storage-blob";
import { QueueClient, QueueSendMessageResponse } from "@azure/storage-queue";
import { Readable } from "stream";
import { IngestionPropertiesInput } from "./ingestionProperties";
import { AbstractKustoClient } from "./abstractKustoClient";
import { BlobDescriptor, StreamDescriptor } from "./descriptors";
import ResourceManager from "./resourceManager";
import IngestionBlobInfo from "./ingestionBlobInfo";
export abstract class KustoIngestClientBase extends AbstractKustoClient {
resourceManager: ResourceManager;
static readonly MaxNumberOfRetryAttempts = 3;
constructor(kcsb: string | KustoConnectionStringBuilder, defaultProps?: IngestionPropertiesInput, isBrowser?: boolean) {
super(defaultProps);
const kustoClient = new KustoClient(kcsb);
@ -24,7 +27,11 @@ export abstract class KustoIngestClientBase extends AbstractKustoClient {
this.defaultDatabase = kustoClient.defaultDatabase;
}
async ingestFromBlob(blob: string | BlobDescriptor, ingestionProperties?: IngestionPropertiesInput): Promise<QueueSendMessageResponse> {
async ingestFromBlob(
blob: string | BlobDescriptor,
ingestionProperties?: IngestionPropertiesInput,
maxRetries: number = KustoIngestClientBase.MaxNumberOfRetryAttempts
): Promise<QueueSendMessageResponse> {
this.ensureOpen();
const props = this._getMergedProps(ingestionProperties);
@ -36,16 +43,62 @@ export abstract class KustoIngestClientBase extends AbstractKustoClient {
}
const authorizationContext = await this.resourceManager.getAuthorizationContext();
const queueDetails = queues[Math.floor(Math.random() * queues.length)];
const queueClient = new QueueClient(queueDetails.uri);
const ingestionBlobInfo = new IngestionBlobInfo(descriptor, props, authorizationContext);
const ingestionBlobInfoJson = JSON.stringify(ingestionBlobInfo);
const encoded = Buffer.from(ingestionBlobInfoJson).toString("base64");
return queueClient.sendMessage(encoded);
const retryCount = Math.min(maxRetries, queues.length);
for (let i = 0; i < retryCount; i++) {
const queueClient = new QueueClient(queues[i].uri);
try {
const queueResponse = await queueClient.sendMessage(encoded);
this.resourceManager.reportResourceUsageResult(queueClient.accountName, true);
return queueResponse;
} catch (_) {
this.resourceManager.reportResourceUsageResult(queueClient.accountName, false);
}
}
throw new Error("Failed to send message to queue.");
}
async uploadToBlobWithRetry(
descriptor: string | Blob | ArrayBuffer | StreamDescriptor,
blobName: string,
maxRetries: number = KustoIngestClientBase.MaxNumberOfRetryAttempts
): Promise<string> {
const containers = await this.resourceManager.getContainers();
if (containers == null || containers.length === 0) {
throw new Error("Failed to get containers");
}
const retryCount = Math.min(maxRetries, containers.length);
// Go over all containers and try to upload the file to the first one that succeeds
for (let i = 0; i < retryCount; i++) {
const containerClient = new ContainerClient(containers[i].uri);
const blockBlobClient = containerClient.getBlockBlobClient(blobName);
try {
if (typeof descriptor == "string") {
await blockBlobClient.uploadFile(descriptor);
} else if (descriptor instanceof StreamDescriptor) {
if (descriptor.stream instanceof Buffer) {
await blockBlobClient.uploadData(descriptor.stream as Buffer);
} else {
await blockBlobClient.uploadStream(descriptor.stream as Readable);
}
} else {
await blockBlobClient.uploadData(descriptor);
}
this.resourceManager.reportResourceUsageResult(containerClient.accountName, true);
return blockBlobClient.url;
} catch (ex) {
this.resourceManager.reportResourceUsageResult(containerClient.accountName, false);
}
}
throw new Error("Failed to upload to blob.");
}
close() {

Просмотреть файл

@ -153,7 +153,7 @@ class KustoManagedStreamingIngestClient extends AbstractKustoClient {
clientRequestId ??
`KNC.executeManagedStreamingIngest${isBlob ? "FromBlob" : "FromStream"};${descriptor.sourceId};${retry.currentAttempt}`;
if (isBlob) {
return this.streamingIngestClient.ingestFromBlob(descriptor as BlobDescriptor, props, sourceId);
return await this.streamingIngestClient.ingestFromBlob(descriptor as BlobDescriptor, props, sourceId);
}
if (isNode) {

Просмотреть файл

@ -0,0 +1,82 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
class StorageAccountStats {
public successCount: number;
public totalCount: number;
constructor() {
this.successCount = 0;
this.totalCount = 0;
}
logResult(success: boolean): void {
this.totalCount += 1;
if (success) {
this.successCount += 1;
}
}
reset(): void {
this.successCount = 0;
this.totalCount = 0;
}
}
export class RankedStorageAccount {
private buckets: StorageAccountStats[];
private lastUpdateTime: number;
private currentBucketIndex: number;
constructor(private accountName: string, private numberOfBuckets: number, private bucketDuration: number, private timeProvider: () => number) {
this.buckets = new Array<StorageAccountStats>(numberOfBuckets).fill(new StorageAccountStats()).map(() => new StorageAccountStats());
this.lastUpdateTime = this.timeProvider();
this.currentBucketIndex = 0;
}
logResult(success: boolean): void {
this.currentBucketIndex = this.adjustForTimePassed();
this.buckets[this.currentBucketIndex].logResult(success);
}
getAccountName(): string {
return this.accountName;
}
adjustForTimePassed(): number {
const currentTime = this.timeProvider();
const timeDelta = currentTime - this.lastUpdateTime;
let window_size = 0;
if (timeDelta >= this.bucketDuration) {
this.lastUpdateTime = currentTime;
window_size = Math.min(Math.floor(timeDelta / this.bucketDuration), this.numberOfBuckets);
for (let i = 1; i < window_size + 1; i++) {
const indexToReset = (this.currentBucketIndex + i) % this.numberOfBuckets;
this.buckets[indexToReset].reset();
}
}
return (this.currentBucketIndex + window_size) % this.numberOfBuckets;
}
getRank(): number {
let rank: number = 0;
let totalWeight: number = 0;
for (let i = 1; i <= this.numberOfBuckets; i++) {
const bucketIndex: number = (this.currentBucketIndex + i) % this.numberOfBuckets;
const bucket: StorageAccountStats = this.buckets[bucketIndex];
if (bucket.totalCount === 0) {
continue;
}
const successRate: number = bucket.successCount / bucket.totalCount;
rank += successRate * i;
totalWeight += i;
}
if (totalWeight === 0) {
return 1;
}
return rank / totalWeight;
}
}

Просмотреть файл

@ -0,0 +1,68 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
import { RankedStorageAccount } from "./rankedStorageAccount";
export class RankedStorageAccountSet {
public static readonly DefaultNumberOfBuckets: number = 6;
public static readonly DefaultBucketDurationInSeconds: number = 10;
public static readonly DefaultTiers: number[] = [90, 70, 30, 0];
public static readonly DefaultTimeProviderInSeconds: () => number = () => {
return new Date().getTime() / 1000;
};
private accounts: Map<string, RankedStorageAccount>;
constructor(
private numberOfBuckets: number = RankedStorageAccountSet.DefaultNumberOfBuckets,
private bucketDuration: number = RankedStorageAccountSet.DefaultBucketDurationInSeconds,
private tiers: number[] = RankedStorageAccountSet.DefaultTiers,
private timeProvider: () => number = RankedStorageAccountSet.DefaultTimeProviderInSeconds
) {
this.accounts = new Map<string, RankedStorageAccount>();
}
logResultToAccount(accountName: string, result: boolean) {
if (!this.accounts.has(accountName)) {
throw new Error("Storage account name is not part of the set.");
}
this.accounts.get(accountName)?.logResult(result);
}
registerStorageAccount(accountName: string) {
if (this.accounts.has(accountName)) {
return;
}
this.accounts.set(accountName, new RankedStorageAccount(accountName, this.numberOfBuckets, this.bucketDuration, this.timeProvider));
}
getStorageAccount(accountName: string): RankedStorageAccount {
const account = this.accounts.get(accountName);
if (account) {
return account;
}
throw new Error("Storage account name is not part of the set.");
}
getRankedShuffledAccounts(): RankedStorageAccount[] {
const accountsByTier: RankedStorageAccount[][] = new Array<RankedStorageAccount[]>(this.tiers.length);
// Group accounts by tier and rank
for (const account of this.accounts.values()) {
const rank = account.getRank() * 100;
const tierInedx = this.tiers.findIndex((tier) => rank >= tier);
accountsByTier[tierInedx] = accountsByTier[tierInedx] || [];
accountsByTier[tierInedx].push(account);
}
// Shuffle each tier
for (let i = 0; i < this.tiers.length; i++) {
if (accountsByTier[i]) {
accountsByTier[i].sort(() => Math.random() - 0.5);
}
}
// Flatten the array
return accountsByTier.flat();
}
}

Просмотреть файл

@ -4,10 +4,18 @@
import { Client, KustoDataErrors, TimeUtils } from "azure-kusto-data";
import { ExponentialRetry } from "./retry";
import { ContainerClient } from "@azure/storage-blob";
import { RankedStorageAccountSet } from "./rankedStorageAccountSet";
import { QueueClient } from "@azure/storage-queue";
const ATTEMPT_COUNT = 4;
export enum ResourceType {
Queue,
Container,
}
export class ResourceURI {
constructor(readonly uri: string) {}
constructor(readonly uri: string, readonly accountName: string, readonly resourceType: ResourceType) {}
}
export class IngestClientResources {
@ -34,6 +42,7 @@ export class ResourceManager {
private baseSleepTimeSecs = 1;
private baseJitterSecs = 1;
private rankedStorageAccountSet: RankedStorageAccountSet;
constructor(readonly kustoClient: Client, readonly isBrowser: boolean = false) {
this.refreshPeriod = TimeUtils.toMilliseconds(1, 0, 0);
@ -44,6 +53,8 @@ export class ResourceManager {
this.authorizationContext = null;
this.authorizationContextLastUpdate = null;
this.rankedStorageAccountSet = new RankedStorageAccountSet();
}
async refreshIngestClientResources(): Promise<IngestClientResources> {
@ -63,10 +74,10 @@ export class ResourceManager {
const response = await this.kustoClient.execute("NetDefaultDB", cmd);
const table = response.primaryResults[0];
const resoures = new IngestClientResources(
this.getResourceByName(table, "SecuredReadyForAggregationQueue"),
this.getResourceByName(table, "FailedIngestionsQueue"),
this.getResourceByName(table, "SuccessfulIngestionsQueue"),
this.getResourceByName(table, "TempStorage")
this.getResourceByName(table, "SecuredReadyForAggregationQueue", ResourceType.Queue),
this.getResourceByName(table, "FailedIngestionsQueue", ResourceType.Queue),
this.getResourceByName(table, "SuccessfulIngestionsQueue", ResourceType.Queue),
this.getResourceByName(table, "TempStorage", ResourceType.Container)
);
if (!resoures.valid()) {
@ -84,7 +95,7 @@ export class ResourceManager {
throw new Error(`Failed to get ingestion resources from server - the request was throttled ${ATTEMPT_COUNT} times.`);
}
getResourceByName(table: { rows: () => any }, resourceName: string): ResourceURI[] {
getResourceByName(table: { rows: () => any }, resourceName: string, resourceType: ResourceType): ResourceURI[] {
const result: ResourceURI[] = [];
for (const row of table.rows()) {
const typedRow = row as {
@ -92,12 +103,80 @@ export class ResourceManager {
StorageRoot: string;
};
if (typedRow.ResourceTypeName === resourceName) {
result.push(new ResourceURI(typedRow.StorageRoot));
let accountName = "";
if (resourceType === ResourceType.Queue) {
accountName = new QueueClient(typedRow.StorageRoot).accountName;
} else if (resourceType === ResourceType.Container) {
accountName = new ContainerClient(typedRow.StorageRoot).accountName;
}
result.push(new ResourceURI(typedRow.StorageRoot, accountName, resourceType));
}
}
return result;
}
pupulateStorageAccounts() {
if (this.ingestClientResources == null) {
return;
}
// containers
const accounts = new Set<string>();
if (this.ingestClientResources.containers != null) {
for (const container of this.ingestClientResources.containers) {
accounts.add(container.accountName);
}
}
// queues
if (this.ingestClientResources.securedReadyForAggregationQueues != null) {
for (const queue of this.ingestClientResources.securedReadyForAggregationQueues) {
accounts.add(queue.accountName);
}
}
for (const account of accounts) {
this.rankedStorageAccountSet.registerStorageAccount(account);
}
}
groupResourcesByStorageAccount(resources: ResourceURI[]): Map<string, ResourceURI[]> {
const result = new Map<string, ResourceURI[]>();
for (const resource of resources) {
if (!result.has(resource.accountName)) {
result.set(resource.accountName, []);
}
result.get(resource.accountName)?.push(resource);
}
return result;
}
getRankedAndShuffledStorageAccounts(resources: ResourceURI[]): ResourceURI[][] {
const resourcesByAccount = this.groupResourcesByStorageAccount(resources);
const rankedStorageAccounts = this.rankedStorageAccountSet.getRankedShuffledAccounts();
const result = new Array<ResourceURI[]>();
for (const account of rankedStorageAccounts) {
const accountName = account.getAccountName();
if (resourcesByAccount.has(accountName)) {
result.push(resourcesByAccount.get(accountName) as ResourceURI[]);
}
}
return result;
}
getRoundRobinRankedAndShuffledResources(resources: ResourceURI[]): ResourceURI[] {
const rankedAccounts = this.getRankedAndShuffledStorageAccounts(resources);
const result = new Array<ResourceURI>();
let index = 0;
while (result.length < resources.length) {
const account = rankedAccounts[index % rankedAccounts.length];
if (account.length > 0) {
result.push(account.shift() as ResourceURI);
}
index++;
}
return result;
}
async refreshAuthorizationContext(): Promise<string> {
const error = await this.tryRefresh(true);
@ -121,6 +200,7 @@ export class ResourceManager {
} else {
this.ingestClientResources = await this.getIngestClientResourcesFromService();
this.ingestClientResourcesLastUpdate = now;
this.pupulateStorageAccounts();
}
} catch (e) {
error = e as Error;
@ -151,7 +231,8 @@ export class ResourceManager {
}
async getIngestionQueues() {
return (await this.refreshIngestClientResources()).securedReadyForAggregationQueues;
const queues = (await this.refreshIngestClientResources()).securedReadyForAggregationQueues;
return queues ? this.getRoundRobinRankedAndShuffledResources(queues) : null;
}
async getFailedIngestionsQueues() {
@ -163,26 +244,21 @@ export class ResourceManager {
}
async getContainers() {
return (await this.refreshIngestClientResources()).containers;
const containers = (await this.refreshIngestClientResources()).containers;
return containers ? this.getRoundRobinRankedAndShuffledResources(containers) : null;
}
async getAuthorizationContext(): Promise<string> {
return this.refreshAuthorizationContext();
}
async getBlockBlobClient(blobName: string) {
const containers = await this.getContainers();
if (containers == null) {
throw new Error("Failed to get containers");
}
const container = containers[Math.floor(Math.random() * containers.length)];
const containerClient = new ContainerClient(container.uri);
return containerClient.getBlockBlobClient(blobName);
}
close() {
this.kustoClient.close();
}
reportResourceUsageResult(accountName: string, success: boolean) {
this.rankedStorageAccountSet.logResultToAccount(accountName, success);
}
}
export default ResourceManager;

Просмотреть файл

@ -6,16 +6,11 @@
import assert from "assert";
import IngestClient from "../src/ingestClient.browser";
import { KustoConnectionStringBuilder as ConnectionStringBuilder } from "azure-kusto-data/src/connectionBuilder.browser";
import { Client } from "azure-kusto-data";
import sinon from "sinon";
import ResourceManager from "../src/resourceManager";
import { BlockBlobClient } from "@azure/storage-blob";
import { QueueSendMessageResponse } from "@azure/storage-queue";
import { FileDescriptor as BrowserFileDescriptor } from "../src/fileDescriptor.browser";
describe(`Browser Unit tests`, () => {
const cluster = "https://somecluster.kusto.windows.net";
const storage = "https://storage.blob.windows.net/container";
describe("Kcsb", () => {
it.concurrent("Fail to create non-browser compatible authentication", () => {
@ -58,20 +53,15 @@ describe(`Browser Unit tests`, () => {
table: "t1",
database: "d1",
});
const queuedStub = sinon.stub(mockedIngestClient, "ingestFromBlob");
queuedStub.resolves({} as QueueSendMessageResponse);
const blobUploadStub = sinon.stub(mockedIngestClient, "uploadToBlobWithRetry");
blobUploadStub.resolves("https://storage.blob.windows.net/container/file.json.gz");
const resource = new BlockBlobClient(storage);
const resourceStub = sinon.stub(resource, "uploadData");
resourceStub.resolves();
const resourceManager = new ResourceManager(new Client(cluster));
const resourceManagerStub = sinon.stub(resourceManager, "getBlockBlobClient");
resourceManagerStub.returns(Promise.resolve<BlockBlobClient>(resource));
mockedIngestClient.resourceManager = resourceManager;
await mockedIngestClient.ingestFromFile(new BrowserFileDescriptor({} as Blob));
await mockedIngestClient.ingestFromFile({} as Blob);
sandbox.assert.calledOnce(queuedStub);
sandbox.assert.calledOnce(resourceStub);
sandbox.assert.calledOnce(blobUploadStub);
});
});
});

Просмотреть файл

@ -19,7 +19,6 @@ import ManagedStreamingIngestClient from "../../src/managedStreamingIngestClient
import { CompressionType, StreamDescriptor } from "../../src/descriptors";
import { DataFormat, IngestionProperties, JsonColumnMapping, ReportLevel } from "../../src";
import { sleep } from "../../src/retry";
import ResourceManager from "../../src/resourceManager";
import assert from "assert";
import fs, { ReadStream } from "fs";
@ -286,12 +285,12 @@ const main = (): void => {
return { item: i };
})
)("ingestFromBlob_$item.description", async ({ item }) => {
const resourceManager = new ResourceManager(dmKustoClient);
const blob = await resourceManager.getBlockBlobClient(uuidv4() + pathlib.basename(item.path));
await blob.uploadFile(item.path);
const blobName = uuidv4() + pathlib.basename(item.path);
const blobUri = await ingestClient.uploadToBlobWithRetry(item.path, blobName);
const table = tableNames[("streaming_blob" + "_" + item.description) as Table];
try {
await streamingIngestClient.ingestFromBlob(blob.url, item.ingestionPropertiesCallback(table));
await streamingIngestClient.ingestFromBlob(blobUri, item.ingestionPropertiesCallback(table));
} catch (err) {
assert.fail(`Failed to ingest ${item.description} - ${util.format(err)}`);
}

Просмотреть файл

@ -0,0 +1,131 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
import assert from "assert";
import { RankedStorageAccountSet } from "../src/rankedStorageAccountSet";
describe("RankedStorageAccountSet", () => {
describe("Input validation.", () => {
it("Validate registerStorageAccount().", () => {
const accounts = new RankedStorageAccountSet();
// Register accounts
for (let i = 0; i < 10; i++) {
accounts.registerStorageAccount("account_" + i.toString());
}
// Validate accounts
for (let i = 0; i < 10; i++) {
assert.equal(accounts.getStorageAccount("account_" + i.toString()).getAccountName(), "account_" + i.toString());
}
});
it("Validate logResultToAccount().", () => {
const accounts = new RankedStorageAccountSet();
// Register accounts
accounts.registerStorageAccount("account_1");
// Should work
accounts.logResultToAccount("account_1", true);
// Should throw
assert.throws(() => accounts.logResultToAccount("account_2", true), Error);
});
it("Validate getStorageAccount().", () => {
const accounts = new RankedStorageAccountSet();
// Register accounts
accounts.registerStorageAccount("account_1");
// Should work
assert.equal(accounts.getStorageAccount("account_1").getAccountName(), "account_1");
// Should throw
assert.throws(() => accounts.getStorageAccount("account_2"), Error);
});
});
describe("Check rank using getRankedShuffledAccounts.", () => {
it("Validate rank when no data.", () => {
const accounts = new RankedStorageAccountSet();
// Register accounts
for (let i = 0; i < 10; i++) {
accounts.registerStorageAccount("account_" + i.toString());
}
// get shuffeled accounts
const rankedAccounts = accounts.getRankedShuffledAccounts();
// validate rank
for (const account of rankedAccounts) {
// All accounts should have rank 1 (highest rank)
assert.equal(account.getRank(), 1);
}
});
it("Verify that getRankedShuffledAccounts returns shuffled accounts in each call.", () => {
const accounts = new RankedStorageAccountSet();
// Register accounts
for (let i = 0; i < 100; i++) {
accounts.registerStorageAccount("account_" + i.toString());
}
// get shuffeled accounts
const shuffledAccounts1 = accounts.getRankedShuffledAccounts();
const shuffledAccounts2 = accounts.getRankedShuffledAccounts();
// make sure buth list has the same accounts
const set1 = new Set(shuffledAccounts1);
const set2 = new Set(shuffledAccounts2);
// check intersection
const intersection = new Set([...set1].filter((x) => !set2.has(x)));
assert.equal(intersection.size, 0);
// Check that the order is different
assert.notDeepEqual(shuffledAccounts1, shuffledAccounts2);
});
it("Validate rank when success rate is different.", () => {
let time = 0;
const accounts = new RankedStorageAccountSet(undefined, undefined, undefined, () => {
return time;
});
// Register accounts
for (let i = 1; i <= 5; i++) {
accounts.registerStorageAccount("account_" + i.toString());
}
// log results for 60 seconds
for (time = 0; time < 60; time++) {
accounts.logResultToAccount("account_1", true); // 100% success
accounts.logResultToAccount("account_2", time % 10 !== 0); // ~90% success
accounts.logResultToAccount("account_3", time % 2 === 0); // 50% success
accounts.logResultToAccount("account_4", time % 3 === 0); // ~33% success
accounts.logResultToAccount("account_5", false); // 0% success
}
// get shuffeled accounts and validate order
const rankedAccounts = accounts.getRankedShuffledAccounts();
assert.equal(rankedAccounts[0].getAccountName(), "account_1");
assert.equal(rankedAccounts[1].getAccountName(), "account_2");
expect(["account_3", "account_4"]).toContain(rankedAccounts[2].getAccountName());
expect(["account_3", "account_4"]).toContain(rankedAccounts[3].getAccountName());
assert.equal(rankedAccounts[4].getAccountName(), "account_5");
// validate rank
assert.equal(accounts.getStorageAccount("account_1").getRank(), 1);
expect(accounts.getStorageAccount("account_2").getRank()).toBeCloseTo(0.9);
assert.equal(accounts.getStorageAccount("account_3").getRank(), 0.5);
expect(accounts.getStorageAccount("account_4").getRank()).toBeCloseTo(0.32);
assert.equal(accounts.getStorageAccount("account_5").getRank(), 0);
});
it("Validate that newer results have more weight.", () => {
let time = 0;
const accounts = new RankedStorageAccountSet(undefined, 1, undefined, () => {
return time;
});
// Register accounts
accounts.registerStorageAccount("account_1");
// log results
accounts.logResultToAccount("account_1", true);
time++;
accounts.logResultToAccount("account_1", true);
time++;
accounts.logResultToAccount("account_1", true);
time++;
accounts.logResultToAccount("account_1", false);
time++;
accounts.logResultToAccount("account_1", false);
time++;
accounts.logResultToAccount("account_1", false);
expect(accounts.getStorageAccount("account_1").getRank()).toBeLessThan(0.5);
});
});
});

Просмотреть файл

@ -6,25 +6,24 @@ import assert from "assert";
import sinon from "sinon";
import { Client as KustoClient, KustoResponseDataSet, TimeUtils } from "azure-kusto-data";
import { IngestClientResources, ResourceManager } from "../src/resourceManager";
import { IngestClientResources, ResourceManager, ResourceType } from "../src/resourceManager";
describe("ResourceManager", () => {
const rows = [
{
ResourceTypeName: "SecuredReadyForAggregationQueue",
StorageRoot: "https://account.queue.core.windows.net/ready1?sas",
},
{ ResourceTypeName: "SecuredReadyForAggregationQueue", StorageRoot: "https://account1.queue.core.windows.net/ready1?sas" },
{ ResourceTypeName: "SecuredReadyForAggregationQueue", StorageRoot: "https://account1.queue.core.windows.net/ready2?sas" },
{ ResourceTypeName: "SecuredReadyForAggregationQueue", StorageRoot: "https://account2.queue.core.windows.net/ready1?sas" },
{ ResourceTypeName: "SecuredReadyForAggregationQueue", StorageRoot: "https://account2.queue.core.windows.net/ready2?sas" },
{ ResourceTypeName: "SecuredReadyForAggregationQueue", StorageRoot: "https://account3.queue.core.windows.net/ready1?sas" },
{ ResourceTypeName: "SecuredReadyForAggregationQueue", StorageRoot: "https://account3.queue.core.windows.net/ready2?sas" },
{ ResourceTypeName: "FailedIngestionsQueue", StorageRoot: "https://account.queue.core.windows.net/failed?sas" },
{
ResourceTypeName: "SuccessfulIngestionsQueue",
StorageRoot: "https://account.queue.core.windows.net/success?sas",
},
{
ResourceTypeName: "SecuredReadyForAggregationQueue",
StorageRoot: "https://account.queue.core.windows.net/ready2?sas",
},
{ ResourceTypeName: "SuccessfulIngestionsQueue", StorageRoot: "https://account.queue.core.windows.net/success?sas" },
{ ResourceTypeName: "TempStorage", StorageRoot: "https://account.blob.core.windows.net/t1?sas" },
{ ResourceTypeName: "TempStorage", StorageRoot: "https://account.blob.core.windows.net/t2?sas" },
{ ResourceTypeName: "TempStorage", StorageRoot: "https://account2.blob.core.windows.net/t1?sas" },
{ ResourceTypeName: "TempStorage", StorageRoot: "https://account2.blob.core.windows.net/t2?sas" },
{ ResourceTypeName: "TempStorage", StorageRoot: "https://account3.blob.core.windows.net/t1?sas" },
{ ResourceTypeName: "TempStorage", StorageRoot: "https://account3.blob.core.windows.net/t2?sas" },
];
const mockedResourcesResponse = {
@ -56,10 +55,10 @@ describe("ResourceManager", () => {
const resourceManager = new ResourceManager(client);
const resources = await resourceManager.getIngestClientResourcesFromService();
assert.strictEqual(resources.containers!.length, 2);
assert.strictEqual(resources.containers!.length, 6);
assert.strictEqual(resources.successfulIngestionsQueues!.length, 1);
assert.strictEqual(resources.failedIngestionsQueues!.length, 1);
assert.strictEqual(resources.securedReadyForAggregationQueues!.length, 2);
assert.strictEqual(resources.securedReadyForAggregationQueues!.length, 6);
});
it.concurrent("error response", async () => {
@ -82,8 +81,8 @@ describe("ResourceManager", () => {
it.concurrent("valid input", () => {
const resourceManager = new ResourceManager(new KustoClient("https://cluster.kusto.windows.net"));
const resources = resourceManager.getResourceByName(mockedResourcesResponse.primaryResults[0], "TempStorage");
assert.strictEqual(resources.length, 2);
const resources = resourceManager.getResourceByName(mockedResourcesResponse.primaryResults[0], "TempStorage", ResourceType.Container);
assert.strictEqual(resources.length, 6);
});
});
@ -120,4 +119,36 @@ describe("ResourceManager", () => {
assert.strictEqual(res, initialResources);
});
});
describe("#getIngestionQueues()", () => {
it.concurrent("Ingestion queues are ordered by round robin", async () => {
const client = new KustoClient("https://cluster.kusto.windows.net");
sinon.stub(client, "execute").returns(Promise.resolve(mockedResourcesResponse as KustoResponseDataSet));
const resourceManager = new ResourceManager(client);
const queues = await resourceManager.getIngestionQueues();
assert.ok(queues);
assert.strictEqual(queues.length, 6);
assert.notStrictEqual(queues[0].accountName, queues[1].accountName);
assert.notStrictEqual(queues[1].accountName, queues[2].accountName);
assert.notStrictEqual(queues[3].accountName, queues[4].accountName);
assert.notStrictEqual(queues[4].accountName, queues[5].accountName);
});
});
describe("#getContainers()", () => {
it.concurrent("Temp containers are ordered by round robin", async () => {
const client = new KustoClient("https://cluster.kusto.windows.net");
sinon.stub(client, "execute").returns(Promise.resolve(mockedResourcesResponse as KustoResponseDataSet));
const resourceManager = new ResourceManager(client);
const queues = await resourceManager.getContainers();
assert.ok(queues);
assert.strictEqual(queues.length, 6);
assert.notStrictEqual(queues[0].accountName, queues[1].accountName);
assert.notStrictEqual(queues[1].accountName, queues[2].accountName);
assert.notStrictEqual(queues[3].accountName, queues[4].accountName);
assert.notStrictEqual(queues[4].accountName, queues[5].accountName);
});
});
});