Use retry mechanism on potentially throttled operations. (#189)
This commit is contained in:
Родитель
7cdf70d351
Коммит
77837749ef
|
@ -4,5 +4,6 @@
|
|||
import KustoClient from "./source/client";
|
||||
import ClientRequestProperties from "./source/clientRequestProperties";
|
||||
import KustoConnectionStringBuilder from "./source/connectionBuilder";
|
||||
import * as KustoDataErrors from "./source/errors";
|
||||
|
||||
export { KustoClient as Client, ClientRequestProperties, KustoConnectionStringBuilder };
|
||||
export { KustoClient as Client, ClientRequestProperties, KustoConnectionStringBuilder, KustoDataErrors };
|
||||
|
|
|
@ -7,6 +7,7 @@ import AadHelper from "./security";
|
|||
import { KustoResponseDataSet, KustoResponseDataSetV1, KustoResponseDataSetV2, V1, V2Frames } from "./response";
|
||||
import ConnectionStringBuilder from "./connectionBuilder";
|
||||
import ClientRequestProperties from "./clientRequestProperties";
|
||||
import { ThrottlingError } from "./errors";
|
||||
import pkg from "../package.json";
|
||||
import axios, { AxiosInstance } from "axios";
|
||||
import http from "http";
|
||||
|
@ -190,6 +191,9 @@ export class KustoClient {
|
|||
axiosResponse = await this.axiosInstance.post(endpoint, payload, axiosConfig);
|
||||
} catch (error: unknown) {
|
||||
if (axios.isAxiosError(error) && error.response) {
|
||||
if (error.response.status === 429) {
|
||||
throw new ThrottlingError("POST request failed with status 429 (Too Many Requests)", error);
|
||||
}
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
|
||||
throw error.response.data?.error || error.response.data;
|
||||
}
|
||||
|
|
|
@ -1,9 +1,18 @@
|
|||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT License.
|
||||
|
||||
/* eslint-disable max-classes-per-file -- gather all exceptions in one file */
|
||||
|
||||
export class KustoAuthenticationError extends Error {
|
||||
constructor(message: string, public inner: Error | undefined, public tokenProviderName: string, public context: Record<string, any>) {
|
||||
super(message);
|
||||
this.name = "KustoAuthenticationError";
|
||||
}
|
||||
}
|
||||
|
||||
export class ThrottlingError extends Error {
|
||||
constructor(message: string, public inner: Error | undefined) {
|
||||
super(message);
|
||||
this.name = "ThrottlingError";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,11 +3,12 @@
|
|||
|
||||
/* eslint-disable max-classes-per-file -- We want all the Resources related classes in this file */
|
||||
|
||||
import { Client } from "azure-kusto-data";
|
||||
import { Client, KustoDataErrors } from "azure-kusto-data";
|
||||
import { ExponentialRetry } from "./retry";
|
||||
import moment from "moment";
|
||||
|
||||
const URI_FORMAT = /https:\/\/(\w+).(queue|blob|table).core.windows.net\/([\w,-]+)\?(.*)/;
|
||||
|
||||
const ATTEMPT_COUNT = 4;
|
||||
export class ResourceURI {
|
||||
constructor(readonly storageAccountName: string, readonly objectType: string, readonly objectName: string, readonly sas: string) {}
|
||||
|
||||
|
@ -52,6 +53,9 @@ export class ResourceManager {
|
|||
public authorizationContext: string | null;
|
||||
public authorizationContextLastUpdate: moment.Moment | null;
|
||||
|
||||
private baseSleepTimeSecs = 1;
|
||||
private baseJitterSecs = 1;
|
||||
|
||||
constructor(readonly kustoClient: Client) {
|
||||
this.refreshPeriod = moment.duration(1, "h");
|
||||
|
||||
|
@ -78,15 +82,25 @@ export class ResourceManager {
|
|||
}
|
||||
|
||||
async getIngestClientResourcesFromService(): Promise<IngestClientResources> {
|
||||
const response = await this.kustoClient.execute("NetDefaultDB", ".get ingestion resources");
|
||||
const table = response.primaryResults[0];
|
||||
|
||||
return new IngestClientResources(
|
||||
this.getResourceByName(table, "SecuredReadyForAggregationQueue"),
|
||||
this.getResourceByName(table, "FailedIngestionsQueue"),
|
||||
this.getResourceByName(table, "SuccessfulIngestionsQueue"),
|
||||
this.getResourceByName(table, "TempStorage")
|
||||
);
|
||||
const retry = new ExponentialRetry(ATTEMPT_COUNT, this.baseSleepTimeSecs, this.baseJitterSecs);
|
||||
while (retry.shouldTry()) {
|
||||
try {
|
||||
const response = await this.kustoClient.execute("NetDefaultDB", ".get ingestion resources");
|
||||
const table = response.primaryResults[0];
|
||||
return new IngestClientResources(
|
||||
this.getResourceByName(table, "SecuredReadyForAggregationQueue"),
|
||||
this.getResourceByName(table, "FailedIngestionsQueue"),
|
||||
this.getResourceByName(table, "SuccessfulIngestionsQueue"),
|
||||
this.getResourceByName(table, "TempStorage")
|
||||
);
|
||||
} catch (error: unknown) {
|
||||
if (!(error instanceof KustoDataErrors.ThrottlingError)) {
|
||||
throw error;
|
||||
}
|
||||
await retry.backoff();
|
||||
}
|
||||
}
|
||||
throw new Error(`Failed to get ingestion resources from server - the request was throttled ${ATTEMPT_COUNT} times.`);
|
||||
}
|
||||
|
||||
getResourceByName(table: { rows: () => any }, resourceName: string): ResourceURI[] {
|
||||
|
@ -118,12 +132,23 @@ export class ResourceManager {
|
|||
}
|
||||
|
||||
async getAuthorizationContextFromService() {
|
||||
const response = await this.kustoClient.execute("NetDefaultDB", ".get kusto identity token");
|
||||
const next = response.primaryResults[0].rows().next();
|
||||
if (next.done) {
|
||||
throw new Error("Failed to get authorization context - got empty results");
|
||||
const retry = new ExponentialRetry(ATTEMPT_COUNT, this.baseSleepTimeSecs, this.baseJitterSecs);
|
||||
while (retry.shouldTry()) {
|
||||
try {
|
||||
const response = await this.kustoClient.execute("NetDefaultDB", ".get kusto identity token");
|
||||
const next = response.primaryResults[0].rows().next();
|
||||
if (next.done) {
|
||||
throw new Error("Failed to get authorization context - got empty results");
|
||||
}
|
||||
return next.value.toJSON<{ AuthorizationContext: string }>().AuthorizationContext;
|
||||
} catch (error: unknown) {
|
||||
if (!(error instanceof KustoDataErrors.ThrottlingError)) {
|
||||
throw error;
|
||||
}
|
||||
await retry.backoff();
|
||||
}
|
||||
}
|
||||
return next.value.toJSON<{ AuthorizationContext: string }>().AuthorizationContext;
|
||||
throw new Error(`Failed to get identity token from server - the request was throttled ${ATTEMPT_COUNT} times.`);
|
||||
}
|
||||
|
||||
async getIngestionQueues() {
|
||||
|
|
|
@ -19,10 +19,16 @@ export class ExponentialRetry {
|
|||
throw new Error("Max retries exceeded");
|
||||
}
|
||||
|
||||
const base = this.sleepBaseSecs * Math.pow(2, this.currentAttempt);
|
||||
this.currentAttempt++;
|
||||
|
||||
if (!this.shouldTry()) {
|
||||
// This was the last retry - no need to sleep
|
||||
return;
|
||||
}
|
||||
|
||||
const base = this.sleepBaseSecs * Math.pow(2, this.currentAttempt - 1);
|
||||
const jitter = Math.floor(this.maxJitterSecs * Math.random());
|
||||
await sleep(1000 * (base + jitter));
|
||||
this.currentAttempt++;
|
||||
}
|
||||
|
||||
public shouldTry(): boolean {
|
||||
|
|
Загрузка…
Ссылка в новой задаче