* local vite

* table report

* table status reporting with tests

* node 18

* comments

* update lock swith node 18

* fix test

* log successes

* format

* fix test

* fix test

* format

* fix test

* p

* changelog

* p

* f

---------

Co-authored-by: Ohad Bitton <ohbitton@microsoft.com>
This commit is contained in:
ohad bitton 2024-01-08 12:53:10 +02:00 коммит произвёл GitHub
Родитель 1d449c27cb
Коммит 91cb96252b
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
25 изменённых файлов: 345 добавлений и 136 удалений

2
.github/workflows/build.yml поставляемый
Просмотреть файл

@ -12,7 +12,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-node@v3
with:
node-version: 16
node-version: 18
registry-url: https://registry.npmjs.org/
cache: "npm"
cache-dependency-path: |

2
.github/workflows/npmpublish.yml поставляемый
Просмотреть файл

@ -11,7 +11,7 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-node@v1
with:
node-version: 16
node-version: 18
registry-url: https://registry.npmjs.org/
- name: Publish new versions

2
.github/workflows/release.yml поставляемый
Просмотреть файл

@ -12,7 +12,7 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-node@v3
with:
node-version: 16
node-version: 18
registry-url: https://registry.npmjs.org/
cache: "npm"
cache-dependency-path: |

Просмотреть файл

@ -7,13 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Unversioned
### Added
- When working with storage resources, use success/failure statistics and retry mechanism to improve ingestion stability
### Changed
- [BREAKING] - Minimal node version is now 18
- [BREAKING] - The default converters for DateTime and TimeSpan will now return null if the value is null or an empty string. This is to align with the behavior of the service.
- [BREAKING] - IngestClient returns Promise<IngestionResult> instead of Promise<QueueSendMessageResponse>
### Added
- Support table status reporting like explained here https://learn.microsoft.com/en-us/azure/data-explorer/kusto/api/netfx/kusto-ingest-client-status
- When working with storage resources, use success/failure statistics and retry mechanism to improve ingestion stability
## [5.2.3] - 2023-11-07

Просмотреть файл

@ -33,7 +33,7 @@ See the SDK [best practices guide](https://docs.microsoft.com/azure/data-explore
## Platforms compatibility
The Azure Kusto SDK for is built for Node **v16.x.x** and above.
The Azure Kusto SDK for is built for Node **v18.x.x** and above.
## Looking for SDKs for other languages/platforms?
- [Python](https://github.com/azure/azure-kusto-python)

89
package-lock.json сгенерированный
Просмотреть файл

@ -11,7 +11,7 @@
],
"devDependencies": {
"@types/jest": "^29.5.0",
"@types/node": "^16.0.0",
"@types/node": "^18.0.0",
"@types/webpack-dev-server": "^4.7.2",
"@typescript-eslint/eslint-plugin": "^5.32.0",
"@typescript-eslint/parser": "^5.32.0",
@ -213,6 +213,48 @@
"node": ">=16.0.0"
}
},
"node_modules/@azure/core-xml": {
"version": "1.3.4",
"resolved": "https://registry.npmjs.org/@azure/core-xml/-/core-xml-1.3.4.tgz",
"integrity": "sha512-B1xI79Ur/u+KR69fGTcsMNj8KDjBSqAy0Ys6Byy4Qm1CqoUy7gCT5A7Pej0EBWRskuH6bpCwrAnosfmQEalkcg==",
"dependencies": {
"fast-xml-parser": "^4.2.4",
"tslib": "^2.2.0"
},
"engines": {
"node": ">=14.0.0"
}
},
"node_modules/@azure/data-tables": {
"version": "13.2.2",
"resolved": "https://registry.npmjs.org/@azure/data-tables/-/data-tables-13.2.2.tgz",
"integrity": "sha512-Dq2Aq0mMMF0BPzYQKdBY/OtO7VemP/foh6z+mJpUO1hRL+65C1rGQUJf20LJHotSyU8wHb4HJzOs+Z50GXSy1w==",
"dependencies": {
"@azure/core-auth": "^1.3.0",
"@azure/core-client": "^1.0.0",
"@azure/core-paging": "^1.1.1",
"@azure/core-rest-pipeline": "^1.1.0",
"@azure/core-tracing": "^1.0.0",
"@azure/core-xml": "^1.0.0",
"@azure/logger": "^1.0.0",
"tslib": "^2.2.0",
"uuid": "^8.3.0"
},
"engines": {
"node": ">=14.0.0"
}
},
"node_modules/@azure/data-tables/node_modules/@azure/core-tracing": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/@azure/core-tracing/-/core-tracing-1.0.1.tgz",
"integrity": "sha512-I5CGMoLtX+pI17ZdiFJZgxMJApsK6jjfm85hpgp3oazCdq5Wxgh4wMr7ge/TTWW1B5WBuvIOI1fMU/FrOAMKrw==",
"dependencies": {
"tslib": "^2.2.0"
},
"engines": {
"node": ">=12.0.0"
}
},
"node_modules/@azure/identity": {
"version": "3.3.2",
"resolved": "https://registry.npmjs.org/@azure/identity/-/identity-3.3.2.tgz",
@ -3302,9 +3344,12 @@
"dev": true
},
"node_modules/@types/node": {
"version": "16.18.60",
"resolved": "https://registry.npmjs.org/@types/node/-/node-16.18.60.tgz",
"integrity": "sha512-ZUGPWx5vKfN+G2/yN7pcSNLkIkXEvlwNaJEd4e0ppX7W2S8XAkdc/37hM4OUNJB9sa0p12AOvGvxL4JCPiz9DA=="
"version": "18.19.3",
"resolved": "https://registry.npmjs.org/@types/node/-/node-18.19.3.tgz",
"integrity": "sha512-k5fggr14DwAytoA/t8rPrIz++lXK7/DqckthCmoZOKNsEbJkId4Z//BqgApXBUGrGddrigYa1oqheo/7YmW4rg==",
"dependencies": {
"undici-types": "~5.26.4"
}
},
"node_modules/@types/node-fetch": {
"version": "2.6.8",
@ -6492,6 +6537,27 @@
"integrity": "sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw==",
"dev": true
},
"node_modules/fast-xml-parser": {
"version": "4.3.2",
"resolved": "https://registry.npmjs.org/fast-xml-parser/-/fast-xml-parser-4.3.2.tgz",
"integrity": "sha512-rmrXUXwbJedoXkStenj1kkljNF7ugn5ZjR9FJcwmCfcCbtOMDghPajbc+Tck6vE6F5XsDmx+Pr2le9fw8+pXBg==",
"funding": [
{
"type": "github",
"url": "https://github.com/sponsors/NaturalIntelligence"
},
{
"type": "paypal",
"url": "https://paypal.me/naturalintelligence"
}
],
"dependencies": {
"strnum": "^1.0.5"
},
"bin": {
"fxparser": "src/cli/cli.js"
}
},
"node_modules/fastest-levenshtein": {
"version": "1.0.16",
"resolved": "https://registry.npmjs.org/fastest-levenshtein/-/fastest-levenshtein-1.0.16.tgz",
@ -16948,6 +17014,11 @@
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/strnum": {
"version": "1.0.5",
"resolved": "https://registry.npmjs.org/strnum/-/strnum-1.0.5.tgz",
"integrity": "sha512-J8bbNyKKXl5qYcR36TIO8W3mVGVHrmmxsd5PAItGkmyzwJvybiw2IVq5nqd0i4LSNSkB/sx9VHllbfFdr9k1JA=="
},
"node_modules/strong-log-transformer": {
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/strong-log-transformer/-/strong-log-transformer-2.1.0.tgz",
@ -17806,6 +17877,11 @@
"node": ">=0.8.0"
}
},
"node_modules/undici-types": {
"version": "5.26.5",
"resolved": "https://registry.npmjs.org/undici-types/-/undici-types-5.26.5.tgz",
"integrity": "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA=="
},
"node_modules/unique-filename": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/unique-filename/-/unique-filename-2.0.1.tgz",
@ -18744,13 +18820,14 @@
"rimraf": "^3.0.2"
},
"engines": {
"node": ">= 16.0.0"
"node": ">= 18.0.0"
}
},
"packages/azure-kusto-ingest": {
"version": "5.2.3",
"license": "ISC",
"dependencies": {
"@azure/data-tables": "^13.2.2",
"@azure/storage-blob": "^12.11.0",
"@azure/storage-queue": "^12.10.0",
"@types/pako": "^2.0.0",
@ -18774,7 +18851,7 @@
"assert": "^2.0.0"
},
"engines": {
"node": ">= 16.0.0"
"node": ">= 18.0.0"
}
},
"packages/azure-kusto-ingest/node_modules/buffer": {

Просмотреть файл

@ -27,7 +27,7 @@
},
"devDependencies": {
"@types/jest": "^29.5.0",
"@types/node": "^16.0.0",
"@types/node": "^18.0.0",
"@types/webpack-dev-server": "^4.7.2",
"@typescript-eslint/eslint-plugin": "^5.32.0",
"@typescript-eslint/parser": "^5.32.0",

Просмотреть файл

@ -21,7 +21,7 @@
"./dist-esm/src/connectionBuilder.js": "./dist-esm/src/connectionBuilder.browser.js"
},
"engines": {
"node": ">= 16.0.0"
"node": ">= 18.0.0"
},
"publishConfig": {
"access": "public"

Просмотреть файл

@ -10,7 +10,7 @@
"clean": "rimraf dist/* dist-esm/* types/*"
},
"engines": {
"node": ">= 16.0.0"
"node": ">= 18.0.0"
},
"publishConfig": {
"access": "public"
@ -54,6 +54,7 @@
"./dist-esm/src/ingestClient.js": "./dist-esm/src/ingestClient.browser.js"
},
"dependencies": {
"@azure/data-tables": "^13.2.2",
"@azure/storage-blob": "^12.11.0",
"@azure/storage-queue": "^12.10.0",
"@types/pako": "^2.0.0",

Просмотреть файл

@ -9,6 +9,7 @@ import managedStreamingIngestClient from "./managedStreamingIngestClient";
import KustoIngestStatusQueues from "./status";
import { IngestionResult, OperationStatus, IngestionStatus, IngestionStatusInTableDescription } from "./ingestionResult";
import {
DataFormat,
IngestionMappingKind,
@ -78,6 +79,10 @@ export {
ValidationPolicy,
W3CLogFileMapping,
dataFormatMappingKind,
IngestionResult,
OperationStatus,
IngestionStatus,
IngestionStatusInTableDescription,
};
/**

Просмотреть файл

@ -5,10 +5,10 @@ import { KustoConnectionStringBuilder } from "azure-kusto-data";
import { BlobDescriptor, generateBlobName, StreamDescriptor } from "./descriptors";
import { FileDescriptor } from "./fileDescriptor.browser";
import { QueueSendMessageResponse } from "@azure/storage-queue";
import { IngestionPropertiesInput } from "./ingestionProperties";
import { KustoIngestClientBase } from "./ingestClientBase";
import { IngestionResult } from "./ingestionResult";
export class KustoIngestClient extends KustoIngestClientBase {
constructor(kcsb: string | KustoConnectionStringBuilder, defaultProps?: IngestionPropertiesInput) {
@ -18,7 +18,7 @@ export class KustoIngestClient extends KustoIngestClientBase {
/**
* Use string for Node.js and Blob in browser
*/
async ingestFromFile(file: Blob | FileDescriptor, ingestionProperties?: IngestionPropertiesInput): Promise<QueueSendMessageResponse> {
async ingestFromFile(file: Blob | FileDescriptor, ingestionProperties?: IngestionPropertiesInput): Promise<IngestionResult> {
this.ensureOpen();
const descriptor = file instanceof FileDescriptor ? file : new FileDescriptor(file);
@ -36,13 +36,13 @@ export class KustoIngestClient extends KustoIngestClientBase {
/**
* Use Readable for Node.js and ArrayBuffer in browser
*/
async ingestFromStream(stream: ArrayBuffer | StreamDescriptor, ingestionProperties?: IngestionPropertiesInput): Promise<QueueSendMessageResponse> {
async ingestFromStream(stream: ArrayBuffer | StreamDescriptor, ingestionProperties?: IngestionPropertiesInput): Promise<IngestionResult> {
this.ensureOpen();
const props = this._getMergedProps(ingestionProperties);
const descriptor: StreamDescriptor = stream instanceof StreamDescriptor ? stream : new StreamDescriptor(stream);
const blobName = generateBlobName(descriptor, props);
const blobUri = await this.uploadToBlobWithRetry(descriptor.stream as ArrayBuffer, blobName);
return this.ingestFromBlob(new BlobDescriptor(blobUri), props); // descriptor.size?
return this.ingestFromBlob(new BlobDescriptor(blobUri, descriptor.size, descriptor.sourceId), props);
}
}

Просмотреть файл

@ -6,11 +6,10 @@ import { KustoConnectionStringBuilder } from "azure-kusto-data";
import { BlobDescriptor, generateBlobName, StreamDescriptor } from "./descriptors";
import { FileDescriptor } from "./fileDescriptor";
import { QueueSendMessageResponse } from "@azure/storage-queue";
import { IngestionPropertiesInput } from "./ingestionProperties";
import { KustoIngestClientBase } from "./ingestClientBase";
import { Readable } from "stream";
import { IngestionResult } from "./ingestionResult";
export class KustoIngestClient extends KustoIngestClientBase {
constructor(kcsb: string | KustoConnectionStringBuilder, defaultProps?: IngestionPropertiesInput) {
@ -20,7 +19,7 @@ export class KustoIngestClient extends KustoIngestClientBase {
/**
* Use string in Node.JS and Blob in browser
*/
async ingestFromFile(file: FileDescriptor | string | Blob, ingestionProperties?: IngestionPropertiesInput): Promise<QueueSendMessageResponse> {
async ingestFromFile(file: FileDescriptor | string | Blob, ingestionProperties?: IngestionPropertiesInput): Promise<IngestionResult> {
this.ensureOpen();
const props = this._getMergedProps(ingestionProperties);
@ -39,10 +38,7 @@ export class KustoIngestClient extends KustoIngestClientBase {
/**
* Use Readable in Node.JS and ArrayBuffer in browser
*/
async ingestFromStream(
stream: StreamDescriptor | Readable | ArrayBuffer,
ingestionProperties?: IngestionPropertiesInput
): Promise<QueueSendMessageResponse> {
async ingestFromStream(stream: StreamDescriptor | Readable | ArrayBuffer, ingestionProperties?: IngestionPropertiesInput): Promise<IngestionResult> {
this.ensureOpen();
const props = this._getMergedProps(ingestionProperties);
const descriptor: StreamDescriptor = stream instanceof StreamDescriptor ? stream : new StreamDescriptor(stream);

Просмотреть файл

@ -3,17 +3,27 @@
import { Client as KustoClient, KustoConnectionStringBuilder } from "azure-kusto-data";
import ResourceManager, { createStatusTableClient } from "./resourceManager";
import IngestionBlobInfo from "./ingestionBlobInfo";
import { ContainerClient } from "@azure/storage-blob";
import { QueueClient, QueueSendMessageResponse } from "@azure/storage-queue";
import { QueueClient } from "@azure/storage-queue";
import IngestionProperties, { IngestionPropertiesInput, ReportLevel, ReportMethod } from "./ingestionProperties";
import { AbstractKustoClient } from "./abstractKustoClient";
import {
IngestionStatus,
TableReportIngestionResult,
IngestionResult,
IngestionStatusInTableDescription,
IngestionStatusResult,
OperationStatus,
putRecordInTable,
} from "./ingestionResult";
import { Readable } from "stream";
import { IngestionPropertiesInput } from "./ingestionProperties";
import { AbstractKustoClient } from "./abstractKustoClient";
import { BlobDescriptor, StreamDescriptor } from "./descriptors";
import ResourceManager from "./resourceManager";
import IngestionBlobInfo from "./ingestionBlobInfo";
export abstract class KustoIngestClientBase extends AbstractKustoClient {
resourceManager: ResourceManager;
@ -31,22 +41,40 @@ export abstract class KustoIngestClientBase extends AbstractKustoClient {
blob: string | BlobDescriptor,
ingestionProperties?: IngestionPropertiesInput,
maxRetries: number = KustoIngestClientBase.MaxNumberOfRetryAttempts
): Promise<QueueSendMessageResponse> {
): Promise<IngestionResult> {
this.ensureOpen();
const props = this._getMergedProps(ingestionProperties);
const descriptor = blob instanceof BlobDescriptor ? blob : new BlobDescriptor(blob);
const authorizationContext = await this.resourceManager.getAuthorizationContext();
const ingestionBlobInfo = new IngestionBlobInfo(descriptor, props, authorizationContext);
const reportToTable = props.reportLevel !== ReportLevel.DoNotReport && props.reportMethod !== ReportMethod.Queue;
if (reportToTable) {
const statusTableClient = await this.resourceManager.createStatusTable();
const status = this.createStatusObject(props, OperationStatus.Pending, ingestionBlobInfo);
await putRecordInTable(statusTableClient, { ...status, partitionKey: ingestionBlobInfo.Id, rowKey: ingestionBlobInfo.Id });
const desc = new IngestionStatusInTableDescription(statusTableClient.url, ingestionBlobInfo.Id, ingestionBlobInfo.Id);
ingestionBlobInfo.IngestionStatusInTable = desc;
await this.sendQueueMessage(maxRetries, ingestionBlobInfo);
return new TableReportIngestionResult(desc, statusTableClient);
}
await this.sendQueueMessage(maxRetries, ingestionBlobInfo);
return new IngestionStatusResult(this.createStatusObject(props, OperationStatus.Queued, ingestionBlobInfo));
}
private async sendQueueMessage(maxRetries: number, blobInfo: IngestionBlobInfo) {
const queues = await this.resourceManager.getIngestionQueues();
if (queues == null) {
throw new Error("Failed to get queues");
}
const authorizationContext = await this.resourceManager.getAuthorizationContext();
const ingestionBlobInfo = new IngestionBlobInfo(descriptor, props, authorizationContext);
const ingestionBlobInfoJson = JSON.stringify(ingestionBlobInfo);
const ingestionBlobInfoJson = JSON.stringify(blobInfo);
const encoded = Buffer.from(ingestionBlobInfoJson).toString("base64");
const retryCount = Math.min(maxRetries, queues.length);
for (let i = 0; i < retryCount; i++) {
@ -59,9 +87,24 @@ export abstract class KustoIngestClientBase extends AbstractKustoClient {
this.resourceManager.reportResourceUsageResult(queueClient.accountName, false);
}
}
throw new Error("Failed to send message to queue.");
}
private createStatusObject(props: IngestionProperties, status: OperationStatus, ingestionBlobInfo: IngestionBlobInfo): IngestionStatus {
const time = Date.now().toString();
return {
Status: status,
Timestamp: time,
IngestionSourceId: ingestionBlobInfo.Id,
IngestionSourcePath: ingestionBlobInfo.BlobPath.split(/[?;]/)[0],
Database: props.database,
Table: props.table,
UpdatedOn: time,
Details: "",
} as IngestionStatus;
}
async uploadToBlobWithRetry(
descriptor: string | Blob | ArrayBuffer | StreamDescriptor,
blobName: string,

Просмотреть файл

@ -4,6 +4,7 @@
import { v4 as uuidv4 } from "uuid";
import { BlobDescriptor } from "./descriptors";
import IngestionProperties, { ReportLevel, ReportMethod } from "./ingestionProperties";
import { IngestionStatusInTableDescription } from "./ingestionResult";
export class IngestionBlobInfo {
BlobPath: string;
@ -18,6 +19,7 @@ export class IngestionBlobInfo {
SourceMessageCreationTime: Date;
Id: string;
AdditionalProperties: { [additional: string]: any };
IngestionStatusInTable: IngestionStatusInTableDescription | null = null;
constructor(blobDescriptor: BlobDescriptor, ingestionProperties: IngestionProperties, authContext: string | null = null) {
this.BlobPath = blobDescriptor.path;

Просмотреть файл

@ -249,6 +249,10 @@ export class IngestionProperties {
}
}
}
if (this.reportMethod !== ReportMethod.Queue && this.reportLevel === ReportLevel.FailuresOnly) {
throw new IngestionPropertiesValidationError("ReportLevel.FailuresOnly is not supported with ReportMethod.Table");
}
}
merge(extraProps: IngestionPropertiesInput) {

Просмотреть файл

@ -0,0 +1,73 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
import { TableClient, TableEntity } from "@azure/data-tables";
import { ExponentialRetry } from "./retry";
import { createStatusTableClient } from "./resourceManager";
export interface IngestionResult {
/// <summary>
/// Retrieves the detailed ingestion status of
/// all data ingestion operations into Kusto associated with this IKustoIngestionResult instance.
/// </summary>
getIngestionStatusCollection(): Promise<IngestionStatus>;
}
export const putRecordInTable = async (tableClient: TableClient, entity: TableEntity<IngestionStatus>): Promise<void> => {
const retry = new ExponentialRetry(3, 1, 1);
while (retry.shouldTry()) {
try {
await tableClient.createEntity(entity);
} catch (ex) {
await retry.backoff();
}
}
};
export enum OperationStatus {
Pending = "Pending",
Succeede = "Succeeded",
Failed = "Failed",
Queued = "Queued",
Skipped = "Skipped",
PartiallySucceeded = "PartiallySucceeded",
}
export class TableReportIngestionResult implements IngestionResult {
public constructor(private ingestionStatusInTableDescription: IngestionStatusInTableDescription, public tableClient: TableClient | null = null) {}
public async getIngestionStatusCollection(): Promise<IngestionStatus> {
if (!this.tableClient) {
this.tableClient = createStatusTableClient(this.ingestionStatusInTableDescription.tableConnectionString);
}
return await this.tableClient.getEntity<IngestionStatus>(
this.ingestionStatusInTableDescription.partitionKey,
this.ingestionStatusInTableDescription.rowKey
);
}
}
export class IngestionStatusResult implements IngestionResult {
constructor(private ingestionStatus: IngestionStatus) {
this.ingestionStatus = ingestionStatus;
}
public getIngestionStatusCollection(): Promise<IngestionStatus> {
return Promise.resolve(this.ingestionStatus);
}
}
export class IngestionStatusInTableDescription {
constructor(public tableConnectionString: string, public partitionKey: string, public rowKey: string) {}
}
export interface IngestionStatus {
Timestamp: string;
Status: OperationStatus;
IngestionSourceId: string;
IngestionSourcePath: string;
Database: string;
Table: string;
UpdatedOn: string;
Details: string;
}

Просмотреть файл

@ -4,7 +4,6 @@
import { IngestionPropertiesInput } from "./ingestionProperties";
import { isNode } from "@azure/core-util";
import { QueueSendMessageResponse } from "@azure/storage-queue";
import { KustoConnectionStringBuilder, KustoResponseDataSet } from "azure-kusto-data";
import { Readable } from "stream";
import { AbstractKustoClient } from "./abstractKustoClient";
@ -14,6 +13,7 @@ import IngestClient from "./ingestClient";
import { ExponentialRetry } from "./retry";
import { readableToStream, tryFileToBuffer, tryStreamToArray } from "./streamUtils";
import StreamingIngestClient from "./streamingIngestClient";
import { IngestionResult } from "./ingestionResult";
const maxStreamSize = 1024 * 1024 * 4;
const attemptCount = 3;
@ -90,7 +90,7 @@ class KustoManagedStreamingIngestClient extends AbstractKustoClient {
stream: StreamDescriptor | Readable | ArrayBuffer,
ingestionProperties?: IngestionPropertiesInput,
clientRequestId?: string
): Promise<any> {
): Promise<KustoResponseDataSet | IngestionResult> {
this.ensureOpen();
const props = this._getMergedProps(ingestionProperties);
let descriptor = stream instanceof StreamDescriptor ? stream : new StreamDescriptor(stream);
@ -119,14 +119,18 @@ class KustoManagedStreamingIngestClient extends AbstractKustoClient {
async ingestFromFile(
file: FileDescriptor | string | Blob,
ingestionProperties?: IngestionPropertiesInput
): Promise<KustoResponseDataSet | QueueSendMessageResponse> {
): Promise<KustoResponseDataSet | IngestionResult> {
this.ensureOpen();
const stream = file instanceof FileDescriptor ? await tryFileToBuffer(file) : await tryFileToBuffer(new FileDescriptor(file));
return await this.ingestFromStream(stream, ingestionProperties);
}
async ingestFromBlob(blob: string | BlobDescriptor, ingestionProperties?: IngestionPropertiesInput, clientRequestId?: string): Promise<any> {
async ingestFromBlob(
blob: string | BlobDescriptor,
ingestionProperties?: IngestionPropertiesInput,
clientRequestId?: string
): Promise<KustoResponseDataSet | IngestionResult> {
const props = this._getMergedProps(ingestionProperties);
const descriptor = blob instanceof BlobDescriptor ? blob : new BlobDescriptor(blob);
// No need to check blob size if it was given to us that it's not empty

Просмотреть файл

@ -4,6 +4,7 @@
import { Client, KustoDataErrors, TimeUtils } from "azure-kusto-data";
import { ExponentialRetry } from "./retry";
import { ContainerClient } from "@azure/storage-blob";
import { TableClient } from "@azure/data-tables";
import { RankedStorageAccountSet } from "./rankedStorageAccountSet";
import { QueueClient } from "@azure/storage-queue";
@ -12,6 +13,7 @@ const ATTEMPT_COUNT = 4;
export enum ResourceType {
Queue,
Container,
Table,
}
export class ResourceURI {
@ -23,11 +25,12 @@ export class IngestClientResources {
readonly securedReadyForAggregationQueues: ResourceURI[] | null = null,
readonly failedIngestionsQueues: ResourceURI[] | null = null,
readonly successfulIngestionsQueues: ResourceURI[] | null = null,
readonly containers: ResourceURI[] | null = null
readonly containers: ResourceURI[] | null = null,
readonly statusTables: ResourceURI[] | null = null
) {}
valid() {
const resources = [this.securedReadyForAggregationQueues, this.failedIngestionsQueues, this.failedIngestionsQueues, this.containers];
const resources = [this.securedReadyForAggregationQueues, this.failedIngestionsQueues, this.failedIngestionsQueues, this.containers, this.statusTables];
return resources.reduce((prev, current) => !!(prev && current), true);
}
}
@ -77,7 +80,8 @@ export class ResourceManager {
this.getResourceByName(table, "SecuredReadyForAggregationQueue", ResourceType.Queue),
this.getResourceByName(table, "FailedIngestionsQueue", ResourceType.Queue),
this.getResourceByName(table, "SuccessfulIngestionsQueue", ResourceType.Queue),
this.getResourceByName(table, "TempStorage", ResourceType.Container)
this.getResourceByName(table, "TempStorage", ResourceType.Container),
this.getResourceByName(table, "IngestionsStatusTable", ResourceType.Table)
);
if (!resoures.valid()) {
@ -115,7 +119,7 @@ export class ResourceManager {
return result;
}
pupulateStorageAccounts() {
pupulateStorageAccounts(): void {
if (this.ingestClientResources == null) {
return;
}
@ -230,20 +234,20 @@ export class ResourceManager {
throw new Error(`Failed to get identity token from server - the request was throttled ${ATTEMPT_COUNT} times.`);
}
async getIngestionQueues() {
async getIngestionQueues(): Promise<ResourceURI[] | null> {
const queues = (await this.refreshIngestClientResources()).securedReadyForAggregationQueues;
return queues ? this.getRoundRobinRankedAndShuffledResources(queues) : null;
}
async getFailedIngestionsQueues() {
async getFailedIngestionsQueues(): Promise<ResourceURI[] | null> {
return (await this.refreshIngestClientResources()).failedIngestionsQueues;
}
async getSuccessfulIngestionsQueues() {
async getSuccessfulIngestionsQueues(): Promise<ResourceURI[] | null> {
return (await this.refreshIngestClientResources()).successfulIngestionsQueues;
}
async getContainers() {
async getContainers(): Promise<ResourceURI[] | null> {
const containers = (await this.refreshIngestClientResources()).containers;
return containers ? this.getRoundRobinRankedAndShuffledResources(containers) : null;
}
@ -252,13 +256,34 @@ export class ResourceManager {
return this.refreshAuthorizationContext();
}
close() {
async getStatusTables(): Promise<ResourceURI[] | null> {
return (await this.refreshIngestClientResources()).statusTables;
}
async createStatusTable() {
const statusTables = await this.getStatusTables();
if (!statusTables) {
throw new Error("Failed to get status table");
}
return createStatusTableClient(statusTables![0].uri);
}
close(): void {
this.kustoClient.close();
}
reportResourceUsageResult(accountName: string, success: boolean) {
reportResourceUsageResult(accountName: string, success: boolean): void {
this.rankedStorageAccountSet.logResultToAccount(accountName, success);
}
}
export const createStatusTableClient = (uri: string): TableClient => {
const tableUrl = new URL(uri);
const origin = tableUrl.origin;
const sasToken = tableUrl.search;
const tableName = tableUrl.pathname.replace("/", "");
return new TableClient(origin + sasToken, tableName);
};
export default ResourceManager;

Просмотреть файл

@ -7,7 +7,7 @@ import assert from "assert";
import IngestClient from "../src/ingestClient.browser";
import { KustoConnectionStringBuilder as ConnectionStringBuilder } from "azure-kusto-data/src/connectionBuilder.browser";
import sinon from "sinon";
import { QueueSendMessageResponse } from "@azure/storage-queue";
import { IngestionResult } from "../src/ingestionResult";
describe(`Browser Unit tests`, () => {
const cluster = "https://somecluster.kusto.windows.net";
@ -55,7 +55,7 @@ describe(`Browser Unit tests`, () => {
});
const queuedStub = sinon.stub(mockedIngestClient, "ingestFromBlob");
queuedStub.resolves({} as QueueSendMessageResponse);
queuedStub.resolves({} as IngestionResult);
const blobUploadStub = sinon.stub(mockedIngestClient, "uploadToBlobWithRetry");
blobUploadStub.resolves("https://storage.blob.windows.net/container/file.json.gz");

Просмотреть файл

@ -3,7 +3,6 @@
/* eslint-disable no-console */
import IngestClient from "../../src/ingestClient";
import KustoIngestStatusQueues from "../../src/status";
import {
Client,
@ -14,10 +13,20 @@ import {
kustoTrustedEndpoints,
MatchRule,
} from "azure-kusto-data";
import StreamingIngestClient from "../../src/streamingIngestClient";
import ManagedStreamingIngestClient from "../../src/managedStreamingIngestClient";
import { CompressionType, StreamDescriptor } from "../../src/descriptors";
import { DataFormat, IngestionProperties, JsonColumnMapping, ReportLevel } from "../../src";
import {
IngestClient,
CompressionType,
StreamDescriptor,
DataFormat,
IngestionProperties,
JsonColumnMapping,
ReportLevel,
ReportMethod,
ManagedStreamingIngestClient,
StreamingIngestClient,
IngestionStatus,
IngestionResult,
} from "../../src";
import { sleep } from "../../src/retry";
import assert from "assert";
@ -26,6 +35,7 @@ import util from "util";
import { v4 as uuidv4 } from "uuid";
import pathlib from "path";
import sinon from "sinon";
import { TableReportIngestionResult } from "../../src/ingestionResult";
interface ParsedJsonMapping {
Properties: { Path: string };
@ -53,7 +63,6 @@ const main = (): void => {
const ingestClient = new IngestClient(dmKcsb);
const dmKustoClient = new Client(dmKcsb);
const statusQueues = new KustoIngestStatusQueues(ingestClient);
const managedStreamingIngestClient = new ManagedStreamingIngestClient(engineKcsb, dmKcsb);
const mockedStreamingIngestClient = new StreamingIngestClient(engineKcsb);
const streamStub = sinon.stub(mockedStreamingIngestClient, "ingestFromStream");
@ -81,6 +90,7 @@ const main = (): void => {
"managed_stream",
"status_success",
"status_fail",
"status_table",
] as const;
class TestDataItem {
@ -222,6 +232,35 @@ const main = (): void => {
await assertRowsCount(item, table as Table);
});
it.concurrent.each(
testItems.map((i) => {
return { item: i };
})
)("ingestFromFile_TableReporting_$item.description", async ({ item }) => {
const table = tableNames[("status_table" + "_" + item.description) as Table];
const props = item.ingestionPropertiesCallback(table);
props.reportLevel = ReportLevel.FailuresAndSuccesses;
props.reportMethod = ReportMethod.QueueAndTable;
try {
const res: IngestionResult = await ingestClient.ingestFromFile(item.path, props);
assert.ok(res, "ingest result returned null or undefined");
assert.ok(res instanceof TableReportIngestionResult);
let status: IngestionStatus;
const endTime = Date.now() + 180000; // Timeout is 3 minutes
while (Date.now() < endTime) {
status = await res.getIngestionStatusCollection();
if (status.Status === "Pending") {
await sleep(1000);
}
}
assert.equal(status!.Status, "Succeeded");
} catch (err) {
assert.fail(`Failed to ingest ${item.description}, ${util.format(err)}`);
}
await assertRowsCount(item, table as Table);
});
it.concurrent.each(
testItems.map((i) => {
return { item: i };
@ -351,53 +390,6 @@ const main = (): void => {
});
});
it.concurrent("KustoIngestStatusQueues", async () => {
try {
await cleanStatusQueues();
} catch (err) {
assert.fail(`Failed to Clean status queues - ${util.format(err)}`);
}
const checkSuccess = async () => {
const item = testItems[0];
const table = tableNames[("status_success" + "_" + item.description) as Table];
const ingestionProperties = item.ingestionPropertiesCallback(table);
ingestionProperties.reportLevel = ReportLevel.FailuresAndSuccesses;
try {
await ingestClient.ingestFromFile(item.path, ingestionProperties);
const status = await waitForStatus();
assert.strictEqual(status.SuccessCount, 1);
assert.strictEqual(status.FailureCount, 0);
} catch (err) {
assert.fail(`Failed to ingest ${item.description} - ${util.format(err)}`);
}
};
await checkSuccess();
try {
await cleanStatusQueues();
} catch (err) {
assert.fail(`Failed to Clean status queues - ${util.format(err)}`);
}
const checkFail = async () => {
const item = testItems[0];
const table = tableNames[("status_fail" + "_" + item.description) as Table];
const ingestionProperties = item.ingestionPropertiesCallback(table);
ingestionProperties.reportLevel = ReportLevel.FailuresAndSuccesses;
ingestionProperties.database = "invalid";
try {
await ingestClient.ingestFromFile(item.path, ingestionProperties);
const status = await waitForStatus();
assert.strictEqual(status.SuccessCount, 0);
assert.strictEqual(status.FailureCount, 1);
} catch (err) {
assert.fail(`Failed to ingest ${item.description} - ${util.format(err)}`);
}
};
await checkFail();
});
describe("QueryClient", () => {
it.concurrent("General BadRequest", async () => {
try {
@ -494,27 +486,6 @@ const main = (): void => {
});
});
const cleanStatusQueues = async () => {
while (!(await statusQueues.failure.isEmpty())) {
await statusQueues.failure.pop();
}
while (!(await statusQueues.success.isEmpty())) {
await statusQueues.success.pop();
}
};
const waitForStatus = async () => {
while ((await statusQueues.failure.isEmpty()) && (await statusQueues.success.isEmpty())) {
await sleep(500);
}
const failures = await statusQueues.failure.pop();
const successes = await statusQueues.success.pop();
return { SuccessCount: successes.length, FailureCount: failures.length };
};
const assertRowsCount = async (testItem: TestDataItem, table: string) => {
let count = 0;
const expected = testItem.rows;

Просмотреть файл

@ -118,6 +118,7 @@ describe("KustoIngestClient", () => {
ingestionMappingReference: "MappingRef",
format: DataFormat.AVRO,
reportMethod: ReportMethod.Table,
reportLevel: ReportLevel.FailuresAndSuccesses,
};
const client = new KustoIngestClient("https://cluster.region.kusto.windows.net", defaultProps);
@ -127,7 +128,7 @@ describe("KustoIngestClient", () => {
assert.strictEqual(actual.table, "table");
assert.strictEqual(actual.format, "avro");
assert.strictEqual(actual.ingestionMappingReference, "MappingRef");
assert.strictEqual(actual.reportLevel, ReportLevel.FailuresOnly);
assert.strictEqual(actual.reportLevel, ReportLevel.FailuresAndSuccesses);
assert.strictEqual(actual.reportMethod, ReportMethod.Table);
});

Просмотреть файл

@ -10,12 +10,12 @@ import { KustoIngestClient } from "../src/ingestClient";
import { DataFormat, IngestionProperties, IngestionPropertiesInput } from "../src/ingestionProperties";
import KustoManagedStreamingIngestClient from "../src/managedStreamingIngestClient";
import { Readable } from "stream";
import { QueueSendMessageResponse } from "@azure/storage-queue";
import { CloudSettings, KustoConnectionStringBuilder } from "azure-kusto-data";
import assert from "assert";
import uuidValidate from "uuid-validate";
import { IngestionResult } from "../src/ingestionResult";
type IngestFromStreamStub = sinon.SinonStub<[StreamDescriptor | Readable | ArrayBuffer, IngestionPropertiesInput?, string?], Promise<QueueSendMessageResponse>>;
type IngestFromStreamStub = sinon.SinonStub<[StreamDescriptor | Readable | ArrayBuffer, IngestionPropertiesInput?, string?], Promise<IngestionResult>>;
beforeAll(() => {
CloudSettings.writeToCache("https://cluster.kusto.windows.net");
});
@ -131,7 +131,7 @@ describe("ManagedStreamingIngestClient", () => {
// Mock ManagedStreamingIngestClient with mocked streamingIngestClient
const transientError = { "@permanent": false };
streamStub.throws(transientError);
queuedStub.returns(Promise.resolve({} as QueueSendMessageResponse));
queuedStub.returns(Promise.resolve({} as IngestionResult));
managedClient._getMergedProps();
@ -162,7 +162,7 @@ describe("ManagedStreamingIngestClient", () => {
const streamStub = sinon.stub(mockedStreamingIngestClient, "ingestFromStream");
streamStub.throws(new Error("Should not be called"));
const queuedStub = sinon.stub(mockedIngestClient, "ingestFromStream");
queuedStub.returns(Promise.resolve({} as QueueSendMessageResponse));
queuedStub.returns(Promise.resolve({} as IngestionResult));
const mockedManagedStreamingIngestClient: KustoManagedStreamingIngestClient = Object.setPrototypeOf(
{
streamingIngestClient: mockedStreamingIngestClient,

Просмотреть файл

@ -27,8 +27,8 @@
"@typescript-eslint/eslint-plugin": "^5.61.0",
"@typescript-eslint/parser": "^5.61.0",
"@vitejs/plugin-react": "^4.0.1",
"rollup": "3.27.2",
"typescript": "^5.0.2",
"rollup": "3.27.2",
"vite": "^4.5.0"
},
"repository": {

Просмотреть файл

@ -1,3 +1,5 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
import { Link } from "@fluentui/react";
import { ArrowUpload16Regular, DocumentMultiple20Regular } from "@fluentui/react-icons";
import React from "react";

Просмотреть файл

@ -1,3 +1,5 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
import { Link } from "@fluentui/react-components";
import React from "react";
import { InputText } from "./InputText";
@ -116,7 +118,7 @@ export const UpperFields: React.FunctionComponent<UpperFieldsProps> = ({ config,
</p>
{/* Usage for this flow is:
const tokenProvider = () => Promise.resolve("some_token");
const kcsbs = [KustoConnectionStringBuilder.withTokenProvider("localhost", tokenProvider)];
const kcsbs = [KustoConnectionStringBuilder.withTokenProvider("clusteruri", tokenProvider)];
*/}
</>
);