3.1.0 - Fixes to props and convenience features (#171)
* -Bumped version to 3.10 -Ingestion Properties - fixed bug that caused the wrong properties to be sent when overriding the defaults. -Added option to pass an object with the correct interface as ingestionProperties (instead of always creating a class) -`withConstantValue` value's type had been relaxed from just string, to any type that implements `toString` * Fixed managed mock * Removed needless interface * Changed type to {}
This commit is contained in:
Родитель
725ffb3b0b
Коммит
a1a8b682c6
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"name": "azure-kusto-data",
|
||||
"version": "3.0.0",
|
||||
"version": "3.1.0",
|
||||
"lockfileVersion": 1,
|
||||
"requires": true,
|
||||
"dependencies": {
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"name": "azure-kusto-data",
|
||||
"version": "3.0.0",
|
||||
"version": "3.1.0",
|
||||
"description": "Azure Data Explorer Query SDK",
|
||||
"main": "index.js",
|
||||
"types": "index.d.ts",
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"name": "azure-kusto-ingest",
|
||||
"version": "3.0.0",
|
||||
"version": "3.1.0",
|
||||
"lockfileVersion": 1,
|
||||
"requires": true,
|
||||
"dependencies": {
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"name": "azure-kusto-ingest",
|
||||
"version": "3.0.0",
|
||||
"version": "3.1.0",
|
||||
"description": "Azure Data Explorer Ingestion SDK",
|
||||
"main": "index.js",
|
||||
"types": "index.d.ts",
|
||||
|
|
|
@ -1,28 +1,31 @@
|
|||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT License.
|
||||
|
||||
import IngestionProperties from "./ingestionProperties";
|
||||
import { IngestionProperties, IngestionPropertiesInput } from "./ingestionProperties";
|
||||
import { FileDescriptor, StreamDescriptor } from "./descriptors";
|
||||
import { Readable } from "stream";
|
||||
|
||||
export abstract class AbstractKustoClient {
|
||||
protected constructor(public defaultProps: IngestionProperties | null = null) {}
|
||||
public defaultProps: IngestionProperties;
|
||||
|
||||
_mergeProps(newProperties?: IngestionProperties | null): IngestionProperties {
|
||||
// no default props
|
||||
if (!newProperties || Object.keys(newProperties).length === 0) {
|
||||
return this.defaultProps || new IngestionProperties({});
|
||||
protected constructor(defaultProps: IngestionPropertiesInput) {
|
||||
if (!defaultProps) {
|
||||
this.defaultProps = new IngestionProperties({});
|
||||
} else if (!(defaultProps instanceof IngestionProperties)) {
|
||||
this.defaultProps = new IngestionProperties(defaultProps);
|
||||
} else {
|
||||
this.defaultProps = defaultProps;
|
||||
}
|
||||
|
||||
// no new props
|
||||
if (this.defaultProps == null || Object.keys(this.defaultProps).length === 0) {
|
||||
return newProperties || new IngestionProperties({});
|
||||
}
|
||||
// both exist - merge
|
||||
return this.defaultProps.merge(newProperties) || new IngestionProperties({});
|
||||
}
|
||||
|
||||
abstract ingestFromStream(stream: StreamDescriptor | Readable, ingestionProperties: IngestionProperties): Promise<any>;
|
||||
_getMergedProps(newProperties?: IngestionPropertiesInput): IngestionProperties {
|
||||
const props = this.defaultProps.merge(newProperties);
|
||||
props.setDefaults();
|
||||
props.validate();
|
||||
return props;
|
||||
}
|
||||
|
||||
abstract ingestFromFile(file: FileDescriptor | string, ingestionProperties: IngestionProperties): Promise<any>;
|
||||
abstract ingestFromStream(stream: StreamDescriptor | Readable, ingestionProperties: IngestionPropertiesInput): Promise<any>;
|
||||
|
||||
abstract ingestFromFile(file: FileDescriptor | string, ingestionProperties: IngestionPropertiesInput): Promise<any>;
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
// Licensed under the MIT License.
|
||||
|
||||
/* eslint-disable max-classes-per-file -- We want all the Column Mappings to be in this file */
|
||||
/* eslint-disable @typescript-eslint/ban-types -- We legitimately want to use {} as a "any non-nullable type" */
|
||||
|
||||
import { IngestionMappingKind } from "./ingestionProperties";
|
||||
|
||||
|
@ -24,7 +25,7 @@ interface MappingProperties {
|
|||
Field?: string;
|
||||
Path?: string;
|
||||
Ordinal?: number;
|
||||
ConstValue?: string;
|
||||
ConstValue?: {};
|
||||
Transform?: Transformation;
|
||||
}
|
||||
|
||||
|
@ -73,7 +74,7 @@ export class CsvColumnMapping extends ColumnMapping {
|
|||
/**
|
||||
* @deprecated Use the factory methods instead.
|
||||
*/
|
||||
protected constructor(readonly columnName: string, readonly cslDataType?: string, readonly ordinal?: string, constantValue?: string) {
|
||||
protected constructor(readonly columnName: string, readonly cslDataType?: string, readonly ordinal?: string, constantValue?: {}) {
|
||||
super(columnName, cslDataType, {
|
||||
Ordinal: ordinal === undefined ? undefined : parseInt(ordinal, 10),
|
||||
ConstValue: constantValue,
|
||||
|
@ -84,7 +85,7 @@ export class CsvColumnMapping extends ColumnMapping {
|
|||
return new CsvColumnMapping(columnName, cslDataType, ordinal.toString());
|
||||
}
|
||||
|
||||
public static withConstantValue(columnName: string, constantValue: string, cslDataType?: string): CsvColumnMapping {
|
||||
public static withConstantValue(columnName: string, constantValue: {}, cslDataType?: string): CsvColumnMapping {
|
||||
return new CsvColumnMapping(columnName, cslDataType, undefined, constantValue);
|
||||
}
|
||||
|
||||
|
@ -95,13 +96,7 @@ export class JsonColumnMapping extends ColumnMapping {
|
|||
/**
|
||||
* @deprecated Use the factory methods instead.
|
||||
*/
|
||||
constructor(
|
||||
readonly columnName: string,
|
||||
readonly jsonPath?: string,
|
||||
cslDataType: string | null = null,
|
||||
constantValue?: string,
|
||||
transform?: Transformation
|
||||
) {
|
||||
constructor(readonly columnName: string, readonly jsonPath?: string, cslDataType: string | null = null, constantValue?: {}, transform?: Transformation) {
|
||||
super(columnName, cslDataType ?? undefined, {
|
||||
Path: jsonPath,
|
||||
ConstValue: constantValue,
|
||||
|
@ -113,7 +108,7 @@ export class JsonColumnMapping extends ColumnMapping {
|
|||
return new JsonColumnMapping(columnName, path, cslDataType, undefined, transform);
|
||||
}
|
||||
|
||||
public static withConstantValue(columnName: string, constantValue: string, cslDataType?: string): JsonColumnMapping {
|
||||
public static withConstantValue(columnName: string, constantValue: {}, cslDataType?: string): JsonColumnMapping {
|
||||
return new JsonColumnMapping(columnName, undefined, cslDataType, constantValue);
|
||||
}
|
||||
|
||||
|
@ -125,7 +120,7 @@ export class JsonColumnMapping extends ColumnMapping {
|
|||
}
|
||||
|
||||
export class AvroColumnMapping extends ColumnMapping {
|
||||
private constructor(readonly columnName: string, cslDataType?: string, path?: string, field?: string, constantValue?: string, transform?: Transformation) {
|
||||
private constructor(readonly columnName: string, cslDataType?: string, path?: string, field?: string, constantValue?: {}, transform?: Transformation) {
|
||||
super(columnName, cslDataType ?? undefined, {
|
||||
Path: path,
|
||||
Field: field,
|
||||
|
@ -142,7 +137,7 @@ export class AvroColumnMapping extends ColumnMapping {
|
|||
return new AvroColumnMapping(columnName, cslDataType, undefined, field, undefined, transform);
|
||||
}
|
||||
|
||||
public static withConstantValue(columnName: string, constantValue: string, cslDataType?: string): AvroColumnMapping {
|
||||
public static withConstantValue(columnName: string, constantValue: {}, cslDataType?: string): AvroColumnMapping {
|
||||
return new AvroColumnMapping(columnName, cslDataType, undefined, undefined, constantValue);
|
||||
}
|
||||
|
||||
|
@ -154,7 +149,7 @@ export class AvroColumnMapping extends ColumnMapping {
|
|||
}
|
||||
|
||||
export class ApacheAvroColumnMapping extends ColumnMapping {
|
||||
private constructor(readonly columnName: string, cslDataType?: string, path?: string, field?: string, constantValue?: string, transform?: Transformation) {
|
||||
private constructor(readonly columnName: string, cslDataType?: string, path?: string, field?: string, constantValue?: {}, transform?: Transformation) {
|
||||
super(columnName, cslDataType ?? undefined, {
|
||||
Path: path,
|
||||
Field: field,
|
||||
|
@ -171,7 +166,7 @@ export class ApacheAvroColumnMapping extends ColumnMapping {
|
|||
return new ApacheAvroColumnMapping(columnName, cslDataType, undefined, field, undefined, transform);
|
||||
}
|
||||
|
||||
public static withConstantValue(columnName: string, constantValue: string, cslDataType?: string): ApacheAvroColumnMapping {
|
||||
public static withConstantValue(columnName: string, constantValue: {}, cslDataType?: string): ApacheAvroColumnMapping {
|
||||
return new ApacheAvroColumnMapping(columnName, cslDataType, undefined, undefined, constantValue);
|
||||
}
|
||||
|
||||
|
@ -183,7 +178,7 @@ export class ApacheAvroColumnMapping extends ColumnMapping {
|
|||
}
|
||||
|
||||
export class SStreamColumnMapping extends ColumnMapping {
|
||||
private constructor(readonly columnName: string, cslDataType?: string, path?: string, field?: string, constantValue?: string, transform?: Transformation) {
|
||||
private constructor(readonly columnName: string, cslDataType?: string, path?: string, field?: string, constantValue?: {}, transform?: Transformation) {
|
||||
super(columnName, cslDataType ?? undefined, {
|
||||
Path: path,
|
||||
Field: field,
|
||||
|
@ -200,7 +195,7 @@ export class SStreamColumnMapping extends ColumnMapping {
|
|||
return new SStreamColumnMapping(columnName, cslDataType, undefined, field, undefined, transform);
|
||||
}
|
||||
|
||||
public static withConstantValue(columnName: string, constantValue: string, cslDataType?: string): SStreamColumnMapping {
|
||||
public static withConstantValue(columnName: string, constantValue: {}, cslDataType?: string): SStreamColumnMapping {
|
||||
return new SStreamColumnMapping(columnName, cslDataType, undefined, undefined, constantValue);
|
||||
}
|
||||
|
||||
|
@ -212,7 +207,7 @@ export class SStreamColumnMapping extends ColumnMapping {
|
|||
}
|
||||
|
||||
export class ParquetColumnMapping extends ColumnMapping {
|
||||
private constructor(readonly columnName: string, cslDataType?: string, path?: string, field?: string, constantValue?: string, transform?: Transformation) {
|
||||
private constructor(readonly columnName: string, cslDataType?: string, path?: string, field?: string, constantValue?: {}, transform?: Transformation) {
|
||||
super(columnName, cslDataType ?? undefined, {
|
||||
Path: path,
|
||||
Field: field,
|
||||
|
@ -229,7 +224,7 @@ export class ParquetColumnMapping extends ColumnMapping {
|
|||
return new ParquetColumnMapping(columnName, cslDataType, undefined, field, undefined, transform);
|
||||
}
|
||||
|
||||
public static withConstantValue(columnName: string, constantValue: string, cslDataType?: string): ParquetColumnMapping {
|
||||
public static withConstantValue(columnName: string, constantValue: {}, cslDataType?: string): ParquetColumnMapping {
|
||||
return new ParquetColumnMapping(columnName, cslDataType, undefined, undefined, constantValue);
|
||||
}
|
||||
|
||||
|
@ -241,7 +236,7 @@ export class ParquetColumnMapping extends ColumnMapping {
|
|||
}
|
||||
|
||||
export class OrcColumnMapping extends ColumnMapping {
|
||||
private constructor(readonly columnName: string, cslDataType?: string, path?: string, field?: string, constantValue?: string, transform?: Transformation) {
|
||||
private constructor(readonly columnName: string, cslDataType?: string, path?: string, field?: string, constantValue?: {}, transform?: Transformation) {
|
||||
super(columnName, cslDataType ?? undefined, {
|
||||
Path: path,
|
||||
Field: field,
|
||||
|
@ -258,7 +253,7 @@ export class OrcColumnMapping extends ColumnMapping {
|
|||
return new OrcColumnMapping(columnName, cslDataType, undefined, field, undefined, transform);
|
||||
}
|
||||
|
||||
public static withConstantValue(columnName: string, constantValue: string, cslDataType?: string): OrcColumnMapping {
|
||||
public static withConstantValue(columnName: string, constantValue: {}, cslDataType?: string): OrcColumnMapping {
|
||||
return new OrcColumnMapping(columnName, cslDataType, undefined, undefined, constantValue);
|
||||
}
|
||||
|
||||
|
@ -270,7 +265,7 @@ export class OrcColumnMapping extends ColumnMapping {
|
|||
}
|
||||
|
||||
export class W3CLogFileMapping extends ColumnMapping {
|
||||
private constructor(readonly columnName: string, cslDataType?: string, field?: string, constantValue?: string, transform?: Transformation) {
|
||||
private constructor(readonly columnName: string, cslDataType?: string, field?: string, constantValue?: {}, transform?: Transformation) {
|
||||
super(columnName, cslDataType ?? undefined, { Field: field, ConstValue: constantValue, Transform: transform });
|
||||
}
|
||||
|
||||
|
@ -278,7 +273,7 @@ export class W3CLogFileMapping extends ColumnMapping {
|
|||
return new W3CLogFileMapping(columnName, cslDataType, field, undefined, transform);
|
||||
}
|
||||
|
||||
public static withConstantValue(columnName: string, constantValue: string, cslDataType?: string): W3CLogFileMapping {
|
||||
public static withConstantValue(columnName: string, constantValue: {}, cslDataType?: string): W3CLogFileMapping {
|
||||
return new W3CLogFileMapping(columnName, cslDataType, undefined, constantValue);
|
||||
}
|
||||
|
||||
|
|
|
@ -12,14 +12,14 @@ import IngestionBlobInfo from "./ingestionBlobInfo";
|
|||
import { QueueClient, QueueSendMessageResponse } from "@azure/storage-queue";
|
||||
|
||||
import { ContainerClient } from "@azure/storage-blob";
|
||||
import IngestionProperties from "./ingestionProperties";
|
||||
import { IngestionPropertiesInput } from "./ingestionProperties";
|
||||
import { AbstractKustoClient } from "./abstractKustoClient";
|
||||
import { Readable } from "stream";
|
||||
|
||||
export class KustoIngestClient extends AbstractKustoClient {
|
||||
resourceManager: ResourceManager;
|
||||
|
||||
constructor(kcsb: string | KustoConnectionStringBuilder, public defaultProps: IngestionProperties | null = null) {
|
||||
constructor(kcsb: string | KustoConnectionStringBuilder, defaultProps?: IngestionPropertiesInput) {
|
||||
super(defaultProps);
|
||||
this.resourceManager = new ResourceManager(new KustoClient(kcsb));
|
||||
}
|
||||
|
@ -39,9 +39,8 @@ export class KustoIngestClient extends AbstractKustoClient {
|
|||
return containerClient.getBlockBlobClient(blobName);
|
||||
}
|
||||
|
||||
async ingestFromStream(stream: StreamDescriptor | Readable, ingestionProperties: IngestionProperties): Promise<QueueSendMessageResponse> {
|
||||
const props = this._mergeProps(ingestionProperties);
|
||||
props.validate();
|
||||
async ingestFromStream(stream: StreamDescriptor | Readable, ingestionProperties?: IngestionPropertiesInput): Promise<QueueSendMessageResponse> {
|
||||
const props = this._getMergedProps(ingestionProperties);
|
||||
const descriptor: StreamDescriptor = stream instanceof StreamDescriptor ? stream : new StreamDescriptor(stream);
|
||||
|
||||
const blobName =
|
||||
|
@ -53,9 +52,8 @@ export class KustoIngestClient extends AbstractKustoClient {
|
|||
return this.ingestFromBlob(new BlobDescriptor(blockBlobClient.url), props); // descriptor.size?
|
||||
}
|
||||
|
||||
async ingestFromFile(file: string | FileDescriptor, ingestionProperties: IngestionProperties | null = null): Promise<QueueSendMessageResponse> {
|
||||
const props = this._mergeProps(ingestionProperties);
|
||||
props.validate();
|
||||
async ingestFromFile(file: string | FileDescriptor, ingestionProperties?: IngestionPropertiesInput): Promise<QueueSendMessageResponse> {
|
||||
const props = this._getMergedProps(ingestionProperties);
|
||||
|
||||
const descriptor = file instanceof FileDescriptor ? file : new FileDescriptor(file);
|
||||
|
||||
|
@ -68,9 +66,8 @@ export class KustoIngestClient extends AbstractKustoClient {
|
|||
return this.ingestFromBlob(new BlobDescriptor(blockBlobClient.url, descriptor.size, descriptor.sourceId), props);
|
||||
}
|
||||
|
||||
async ingestFromBlob(blob: string | BlobDescriptor, ingestionProperties: IngestionProperties | null = null): Promise<QueueSendMessageResponse> {
|
||||
const props = this._mergeProps(ingestionProperties);
|
||||
props.validate();
|
||||
async ingestFromBlob(blob: string | BlobDescriptor, ingestionProperties?: IngestionPropertiesInput): Promise<QueueSendMessageResponse> {
|
||||
const props = this._getMergedProps(ingestionProperties);
|
||||
|
||||
const descriptor = blob instanceof BlobDescriptor ? blob : new BlobDescriptor(blob);
|
||||
const queues = await this.resourceManager.getIngestionQueues();
|
||||
|
|
|
@ -26,7 +26,7 @@ export class IngestionBlobInfo {
|
|||
this.DatabaseName = ingestionProperties.database ?? null;
|
||||
this.TableName = ingestionProperties.table ?? null;
|
||||
this.RetainBlobOnSuccess = true;
|
||||
this.FlushImmediately = ingestionProperties.flushImmediately;
|
||||
this.FlushImmediately = ingestionProperties.flushImmediately ?? false;
|
||||
this.IgnoreSizeLimit = false;
|
||||
this.ReportLevel = ingestionProperties.reportLevel ?? null;
|
||||
this.ReportMethod = ingestionProperties.reportMethod ?? null;
|
||||
|
|
|
@ -165,12 +165,14 @@ export enum ReportLevel {
|
|||
|
||||
export enum ReportMethod {
|
||||
Queue = 0,
|
||||
Table,
|
||||
QueueAndTable,
|
||||
}
|
||||
|
||||
export class IngestionProperties {
|
||||
export interface IngestionPropertiesFields {
|
||||
database?: string;
|
||||
table?: string;
|
||||
format: DataFormat = DataFormat.CSV;
|
||||
format?: DataFormat;
|
||||
/**
|
||||
* @deprecated. Use ingestionMappingColumns instead.
|
||||
*/
|
||||
|
@ -186,13 +188,20 @@ export class IngestionProperties {
|
|||
ingestIfNotExists?: string;
|
||||
ingestByTags?: string[];
|
||||
dropByTags?: string[];
|
||||
flushImmediately: boolean = false;
|
||||
reportLevel: ReportLevel = ReportLevel.DoNotReport;
|
||||
reportMethod: ReportMethod = ReportMethod.Queue;
|
||||
flushImmediately?: boolean;
|
||||
reportLevel?: ReportLevel;
|
||||
reportMethod?: ReportMethod;
|
||||
validationPolicy?: ValidationPolicy;
|
||||
additionalProperties?: { [additional: string]: any } | null;
|
||||
}
|
||||
|
||||
constructor(data: Partial<IngestionProperties>) {
|
||||
// This trick lets us avoid duplicating all the properties from the interface. See https://github.com/microsoft/TypeScript/issues/3407
|
||||
// eslint-disable-next-line @typescript-eslint/no-empty-interface
|
||||
export interface IngestionProperties extends IngestionPropertiesFields {}
|
||||
|
||||
// eslint-disable-next-line no-redeclare
|
||||
export class IngestionProperties {
|
||||
constructor(data: Partial<IngestionPropertiesFields>) {
|
||||
Object.assign(this, data);
|
||||
}
|
||||
|
||||
|
@ -243,19 +252,43 @@ export class IngestionProperties {
|
|||
}
|
||||
}
|
||||
|
||||
merge(extraProps: IngestionProperties) {
|
||||
merge(extraProps: IngestionPropertiesInput) {
|
||||
const merged = new IngestionProperties(this);
|
||||
|
||||
for (const key of Object.keys(extraProps) as (keyof IngestionProperties)[]) {
|
||||
if (!extraProps) {
|
||||
return merged;
|
||||
}
|
||||
|
||||
const assign = <K extends keyof IngestionPropertiesFields, V extends IngestionPropertiesFields[K]>(
|
||||
obj: IngestionPropertiesFields,
|
||||
prop: K,
|
||||
value: V
|
||||
) => {
|
||||
obj[prop] = value;
|
||||
};
|
||||
|
||||
for (const key of Object.keys(extraProps) as (keyof IngestionPropertiesFields)[]) {
|
||||
if (extraProps[key]) {
|
||||
(<K extends keyof IngestionProperties>(k: K) => {
|
||||
merged[k] = extraProps[k];
|
||||
})(key);
|
||||
assign(merged, key, extraProps[key]);
|
||||
}
|
||||
}
|
||||
|
||||
return merged;
|
||||
}
|
||||
|
||||
setDefaults() {
|
||||
if (!this.format) {
|
||||
this.format = DataFormat.CSV;
|
||||
}
|
||||
if (!this.reportLevel) {
|
||||
this.reportLevel = ReportLevel.FailuresOnly;
|
||||
}
|
||||
if (!this.reportMethod) {
|
||||
this.reportMethod = ReportMethod.Queue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export type IngestionPropertiesInput = IngestionProperties | IngestionPropertiesFields | null | undefined;
|
||||
|
||||
export default IngestionProperties;
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT License.
|
||||
|
||||
import IngestionProperties from "./ingestionProperties";
|
||||
import { IngestionPropertiesInput } from "./ingestionProperties";
|
||||
|
||||
import { FileDescriptor, StreamDescriptor } from "./descriptors";
|
||||
import { AbstractKustoClient } from "./abstractKustoClient";
|
||||
|
@ -35,7 +35,7 @@ class KustoManagedStreamingIngestClient extends AbstractKustoClient {
|
|||
*/
|
||||
static fromDmConnectionString(
|
||||
dmConnectionString: KustoConnectionStringBuilder,
|
||||
defaultProps: IngestionProperties | null = null
|
||||
defaultProps?: IngestionPropertiesInput
|
||||
): KustoManagedStreamingIngestClient {
|
||||
if (dmConnectionString.dataSource == null || !dmConnectionString.dataSource.startsWith(ingestPrefix)) {
|
||||
throw new Error(`DM connection string must include the prefix '${ingestPrefix}'`);
|
||||
|
@ -57,7 +57,7 @@ class KustoManagedStreamingIngestClient extends AbstractKustoClient {
|
|||
*/
|
||||
static fromEngineConnectionString(
|
||||
engineConnectionString: KustoConnectionStringBuilder,
|
||||
defaultProps: IngestionProperties | null = null
|
||||
defaultProps?: IngestionPropertiesInput
|
||||
): KustoManagedStreamingIngestClient {
|
||||
if (engineConnectionString.dataSource == null || engineConnectionString.dataSource.startsWith(ingestPrefix)) {
|
||||
throw new Error(`Engine connection string must not include the prefix '${ingestPrefix}'`);
|
||||
|
@ -69,19 +69,14 @@ class KustoManagedStreamingIngestClient extends AbstractKustoClient {
|
|||
return new KustoManagedStreamingIngestClient(engineConnectionString, dmConnectionString, defaultProps);
|
||||
}
|
||||
|
||||
constructor(
|
||||
engineKcsb: string | KustoConnectionStringBuilder,
|
||||
dmKcsb: string | KustoConnectionStringBuilder,
|
||||
defaultProps: IngestionProperties | null = null
|
||||
) {
|
||||
constructor(engineKcsb: string | KustoConnectionStringBuilder, dmKcsb: string | KustoConnectionStringBuilder, defaultProps?: IngestionPropertiesInput) {
|
||||
super(defaultProps);
|
||||
this.streamingIngestClient = new StreamingIngestClient(engineKcsb, defaultProps);
|
||||
this.queuedIngestClient = new IngestClient(dmKcsb, defaultProps);
|
||||
}
|
||||
|
||||
async ingestFromStream(stream: StreamDescriptor | Readable, ingestionProperties: IngestionProperties): Promise<any> {
|
||||
const props = this._mergeProps(ingestionProperties);
|
||||
props.validate();
|
||||
async ingestFromStream(stream: StreamDescriptor | Readable, ingestionProperties?: IngestionPropertiesInput): Promise<any> {
|
||||
const props = this._getMergedProps(ingestionProperties);
|
||||
const descriptor = stream instanceof StreamDescriptor ? stream : new StreamDescriptor(stream);
|
||||
|
||||
let result = await tryStreamToArray(descriptor.stream, maxStreamSize);
|
||||
|
@ -92,11 +87,7 @@ class KustoManagedStreamingIngestClient extends AbstractKustoClient {
|
|||
while (retry.shouldTry()) {
|
||||
try {
|
||||
const sourceId = `KNC.executeManagedStreamingIngest;${descriptor.sourceId};${retry.currentAttempt}`;
|
||||
return await this.streamingIngestClient.ingestFromStream(
|
||||
new StreamDescriptor(streamify([result])).merge(descriptor),
|
||||
ingestionProperties,
|
||||
sourceId
|
||||
);
|
||||
return await this.streamingIngestClient.ingestFromStream(new StreamDescriptor(streamify([result])).merge(descriptor), props, sourceId);
|
||||
} catch (err: unknown) {
|
||||
const oneApiError = err as { "@permanent"?: boolean };
|
||||
if (oneApiError["@permanent"]) {
|
||||
|
@ -112,9 +103,10 @@ class KustoManagedStreamingIngestClient extends AbstractKustoClient {
|
|||
return await this.queuedIngestClient.ingestFromStream(new StreamDescriptor(result).merge(descriptor), ingestionProperties);
|
||||
}
|
||||
|
||||
async ingestFromFile(file: FileDescriptor | string, ingestionProperties: IngestionProperties): Promise<KustoResponseDataSet | QueueSendMessageResponse> {
|
||||
const props = this._mergeProps(ingestionProperties);
|
||||
props.validate();
|
||||
async ingestFromFile(
|
||||
file: FileDescriptor | string,
|
||||
ingestionProperties?: IngestionPropertiesInput
|
||||
): Promise<KustoResponseDataSet | QueueSendMessageResponse> {
|
||||
return await this.ingestFromStream(fileToStream(file), ingestionProperties);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT License.
|
||||
|
||||
import IngestionProperties from "./ingestionProperties";
|
||||
import { IngestionPropertiesInput } from "./ingestionProperties";
|
||||
|
||||
import { CompressionType, FileDescriptor, StreamDescriptor } from "./descriptors";
|
||||
import zlib from "zlib";
|
||||
|
@ -14,14 +14,13 @@ import { Readable } from "stream";
|
|||
class KustoStreamingIngestClient extends AbstractKustoClient {
|
||||
private kustoClient: KustoClient;
|
||||
|
||||
constructor(kcsb: string | KustoConnectionStringBuilder, defaultProps: IngestionProperties | null = null) {
|
||||
constructor(kcsb: string | KustoConnectionStringBuilder, defaultProps?: IngestionPropertiesInput) {
|
||||
super(defaultProps);
|
||||
this.kustoClient = new KustoClient(kcsb);
|
||||
}
|
||||
|
||||
async ingestFromStream(stream: StreamDescriptor | Readable, ingestionProperties: IngestionProperties, clientRequestId?: string): Promise<any> {
|
||||
const props = this._mergeProps(ingestionProperties);
|
||||
props.validate();
|
||||
async ingestFromStream(stream: StreamDescriptor | Readable, ingestionProperties?: IngestionPropertiesInput, clientRequestId?: string): Promise<any> {
|
||||
const props = this._getMergedProps(ingestionProperties);
|
||||
const descriptor: StreamDescriptor = stream instanceof StreamDescriptor ? stream : new StreamDescriptor(stream);
|
||||
|
||||
const compressedStream = descriptor.compressionType === CompressionType.None ? descriptor.stream.pipe(zlib.createGzip()) : descriptor.stream;
|
||||
|
@ -35,9 +34,7 @@ class KustoStreamingIngestClient extends AbstractKustoClient {
|
|||
);
|
||||
}
|
||||
|
||||
async ingestFromFile(file: FileDescriptor | string, ingestionProperties: IngestionProperties): Promise<KustoResponseDataSet> {
|
||||
const props = this._mergeProps(ingestionProperties);
|
||||
props.validate();
|
||||
async ingestFromFile(file: FileDescriptor | string, ingestionProperties?: IngestionPropertiesInput): Promise<KustoResponseDataSet> {
|
||||
return this.ingestFromStream(fileToStream(file), ingestionProperties);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,7 +3,8 @@
|
|||
|
||||
import assert from "assert";
|
||||
import { KustoIngestClient } from "../source/ingestClient";
|
||||
import { DataFormat, IngestionProperties } from "../source/ingestionProperties";
|
||||
import { DataFormat, IngestionProperties, ReportLevel, ReportMethod } from "../source/ingestionProperties";
|
||||
import { IngestionPropertiesValidationError } from "../source/errors";
|
||||
|
||||
describe("KustoIngestClient", () => {
|
||||
describe("#constructor()", () => {
|
||||
|
@ -11,14 +12,14 @@ describe("KustoIngestClient", () => {
|
|||
const ingestClient = new KustoIngestClient("https://cluster.kusto.windows.net", {
|
||||
database: "db",
|
||||
table: "table",
|
||||
format: "csv",
|
||||
format: "json",
|
||||
} as IngestionProperties);
|
||||
|
||||
assert.notStrictEqual(ingestClient.defaultProps, null);
|
||||
assert.strictEqual(ingestClient.resourceManager.kustoClient.cluster, "https://cluster.kusto.windows.net");
|
||||
assert.strictEqual(ingestClient.defaultProps!.database, "db");
|
||||
assert.strictEqual(ingestClient.defaultProps!.table, "table");
|
||||
assert.strictEqual(ingestClient.defaultProps!.format, "csv");
|
||||
assert.strictEqual(ingestClient.defaultProps!.format, "json");
|
||||
});
|
||||
});
|
||||
|
||||
|
@ -27,57 +28,131 @@ describe("KustoIngestClient", () => {
|
|||
const newProps = new IngestionProperties({
|
||||
database: "db",
|
||||
table: "table",
|
||||
format: DataFormat.CSV,
|
||||
format: DataFormat.JSON,
|
||||
});
|
||||
// TODO: not sure a unit test will be useful here
|
||||
const client = new KustoIngestClient("https://cluster.region.kusto.windows.net");
|
||||
const actual = client._mergeProps(newProps);
|
||||
const actual = client._getMergedProps(newProps);
|
||||
|
||||
assert.strictEqual(actual.database, "db");
|
||||
assert.strictEqual(actual.table, "table");
|
||||
assert.strictEqual(actual.format, "csv");
|
||||
assert.strictEqual(actual.format, "json");
|
||||
});
|
||||
|
||||
it("new props object", () => {
|
||||
const newProps = {
|
||||
database: "db",
|
||||
table: "table",
|
||||
format: DataFormat.JSON,
|
||||
};
|
||||
const client = new KustoIngestClient("https://cluster.region.kusto.windows.net");
|
||||
const actual = client._getMergedProps(newProps);
|
||||
|
||||
assert.strictEqual(actual.database, "db");
|
||||
assert.strictEqual(actual.table, "table");
|
||||
assert.strictEqual(actual.format, "json");
|
||||
});
|
||||
|
||||
it("empty new props", () => {
|
||||
// TODO: not sure a unit test will be useful here
|
||||
const defaultProps = new IngestionProperties({
|
||||
database: "db",
|
||||
table: "table",
|
||||
format: DataFormat.CSV,
|
||||
format: DataFormat.JSON,
|
||||
});
|
||||
// TODO: not sure a unit test will be useful here
|
||||
const client = new KustoIngestClient("https://cluster.region.kusto.windows.net", defaultProps);
|
||||
const actual = client._mergeProps(null);
|
||||
const actual = client._getMergedProps(null);
|
||||
|
||||
assert.strictEqual(actual.database, "db");
|
||||
assert.strictEqual(actual.table, "table");
|
||||
assert.strictEqual(actual.format, "csv");
|
||||
assert.strictEqual(actual.format, "json");
|
||||
});
|
||||
|
||||
it("both exist props", () => {
|
||||
it("default props object", () => {
|
||||
const defaultProps = {
|
||||
database: "db",
|
||||
table: "table",
|
||||
format: DataFormat.JSON,
|
||||
};
|
||||
const client = new KustoIngestClient("https://cluster.region.kusto.windows.net", defaultProps);
|
||||
const actual = client._getMergedProps(null);
|
||||
|
||||
assert.strictEqual(actual.database, "db");
|
||||
assert.strictEqual(actual.table, "table");
|
||||
assert.strictEqual(actual.format, "json");
|
||||
});
|
||||
|
||||
it("both exists props", () => {
|
||||
const defaultProps = new IngestionProperties({
|
||||
database: "db",
|
||||
table: "table",
|
||||
format: DataFormat.CSV,
|
||||
format: DataFormat.JSON,
|
||||
reportLevel: ReportLevel.DoNotReport,
|
||||
});
|
||||
const newProps = new IngestionProperties({});
|
||||
newProps.database = "db2";
|
||||
newProps.ingestionMappingReference = "MappingRef";
|
||||
newProps.format = DataFormat.AVRO;
|
||||
|
||||
const client = new KustoIngestClient("https://cluster.region.kusto.windows.net", defaultProps);
|
||||
const actual = client._mergeProps(newProps);
|
||||
const actual = client._getMergedProps(newProps);
|
||||
|
||||
assert.strictEqual(actual.database, "db2");
|
||||
assert.strictEqual(actual.table, "table");
|
||||
assert.strictEqual(actual.format, "csv");
|
||||
assert.strictEqual(actual.format, "avro");
|
||||
assert.strictEqual(actual.ingestionMappingReference, "MappingRef");
|
||||
assert.strictEqual(actual.reportLevel, ReportLevel.DoNotReport);
|
||||
assert.strictEqual(actual.reportMethod, ReportMethod.Queue);
|
||||
});
|
||||
|
||||
it("both exists objects", () => {
|
||||
const defaultProps = {
|
||||
database: "db",
|
||||
table: "table",
|
||||
format: DataFormat.JSON,
|
||||
};
|
||||
const newProps = {
|
||||
database: "db2",
|
||||
ingestionMappingReference: "MappingRef",
|
||||
format: DataFormat.AVRO,
|
||||
reportMethod: ReportMethod.Table,
|
||||
};
|
||||
|
||||
const client = new KustoIngestClient("https://cluster.region.kusto.windows.net", defaultProps);
|
||||
const actual = client._getMergedProps(newProps);
|
||||
|
||||
assert.strictEqual(actual.database, "db2");
|
||||
assert.strictEqual(actual.table, "table");
|
||||
assert.strictEqual(actual.format, "avro");
|
||||
assert.strictEqual(actual.ingestionMappingReference, "MappingRef");
|
||||
assert.strictEqual(actual.reportLevel, ReportLevel.FailuresOnly);
|
||||
assert.strictEqual(actual.reportMethod, ReportMethod.Table);
|
||||
});
|
||||
|
||||
it("test defaults", () => {
|
||||
const defaultProps = {
|
||||
database: "db",
|
||||
table: "table",
|
||||
};
|
||||
const newProps = {
|
||||
database: "db2",
|
||||
ingestionMappingReference: "MappingRef",
|
||||
};
|
||||
|
||||
const client = new KustoIngestClient("https://cluster.region.kusto.windows.net", defaultProps);
|
||||
const actual = client._getMergedProps(newProps);
|
||||
|
||||
assert.strictEqual(actual.database, "db2");
|
||||
assert.strictEqual(actual.table, "table");
|
||||
assert.strictEqual(actual.ingestionMappingReference, "MappingRef");
|
||||
assert.strictEqual(actual.format, "csv");
|
||||
assert.strictEqual(actual.reportLevel, ReportLevel.FailuresOnly);
|
||||
assert.strictEqual(actual.reportMethod, ReportMethod.Queue);
|
||||
});
|
||||
|
||||
it("empty both", () => {
|
||||
const client = new KustoIngestClient("https://cluster.region.kusto.windows.net");
|
||||
|
||||
const actual = client._mergeProps();
|
||||
assert.deepStrictEqual(actual, new IngestionProperties({}));
|
||||
assert.throws(() => client._getMergedProps(), new IngestionPropertiesValidationError("Must define a target database"));
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
@ -206,6 +206,40 @@ describe("IngestionProperties", () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe("Should return the correct mapping when passing ConstantValue with different types", () => {
|
||||
const types = [
|
||||
JsonColumnMapping,
|
||||
CsvColumnMapping,
|
||||
AvroColumnMapping,
|
||||
ApacheAvroColumnMapping,
|
||||
SStreamColumnMapping,
|
||||
ParquetColumnMapping,
|
||||
OrcColumnMapping,
|
||||
W3CLogFileMapping,
|
||||
];
|
||||
const obj = {
|
||||
toString: () => {
|
||||
return "custom toString";
|
||||
},
|
||||
};
|
||||
|
||||
types.forEach((type) => {
|
||||
it(`should handle correctly for type ${type}`, () => {
|
||||
const result = [
|
||||
{ Column: "a", Properties: { ConstValue: "custom toString" } },
|
||||
{
|
||||
Column: "b",
|
||||
Properties: { ConstValue: "5" },
|
||||
},
|
||||
];
|
||||
assert.deepStrictEqual(
|
||||
[type.withConstantValue("a", obj), type.withConstantValue("b", 5)].map((m) => m.toApiMapping()),
|
||||
result
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("Should return the correct mapping when passing Transform", () => {
|
||||
const types = [
|
||||
JsonColumnMapping,
|
|
@ -8,7 +8,7 @@ import Sinon from "sinon";
|
|||
import { StreamingIngestClient } from "../index";
|
||||
import { StreamDescriptor } from "../source/descriptors";
|
||||
import { KustoIngestClient } from "../source/ingestClient";
|
||||
import { DataFormat, IngestionProperties } from "../source/ingestionProperties";
|
||||
import { DataFormat, IngestionProperties, IngestionPropertiesInput } from "../source/ingestionProperties";
|
||||
import KustoManagedStreamingIngestClient from "../source/managedStreamingIngestClient";
|
||||
import { Readable } from "stream";
|
||||
import { QueueSendMessageResponse } from "@azure/storage-queue";
|
||||
|
@ -18,7 +18,7 @@ import assert from "assert";
|
|||
import uuidValidate from "uuid-validate";
|
||||
import { KustoConnectionStringBuilder } from "azure-kusto-data";
|
||||
|
||||
type IngestFromStreamStub = Sinon.SinonStub<[StreamDescriptor | Readable, IngestionProperties, string?], Promise<QueueSendMessageResponse>>;
|
||||
type IngestFromStreamStub = Sinon.SinonStub<[StreamDescriptor | Readable, IngestionPropertiesInput?, string?], Promise<QueueSendMessageResponse>>;
|
||||
|
||||
describe("ManagedStreamingIngestClient", () => {
|
||||
const getMockedClient = () => {
|
||||
|
@ -34,6 +34,10 @@ describe("ManagedStreamingIngestClient", () => {
|
|||
queuedIngestClient: mockedIngestClient,
|
||||
baseSleepTimeSecs: 0,
|
||||
baseJitterSecs: 0,
|
||||
defaultProps: new IngestionProperties({
|
||||
database: "db",
|
||||
table: "table",
|
||||
}),
|
||||
},
|
||||
KustoManagedStreamingIngestClient.prototype
|
||||
);
|
||||
|
@ -132,7 +136,7 @@ describe("ManagedStreamingIngestClient", () => {
|
|||
streamStub.throws(transientError);
|
||||
queuedStub.returns(Promise.resolve({} as QueueSendMessageResponse));
|
||||
|
||||
managedClient._mergeProps();
|
||||
managedClient._getMergedProps();
|
||||
|
||||
const items = [Buffer.from("string1"), Buffer.from("string2"), Buffer.from("string3")];
|
||||
const stream = createStream(items);
|
||||
|
@ -169,6 +173,7 @@ describe("ManagedStreamingIngestClient", () => {
|
|||
maxRetries: 1,
|
||||
baseSleepTimeSecs: 0,
|
||||
baseJitterSecs: 0,
|
||||
defaultProps: new IngestionProperties({}),
|
||||
},
|
||||
KustoManagedStreamingIngestClient.prototype
|
||||
);
|
||||
|
|
Загрузка…
Ссылка в новой задаче