From 7e7082ded7a51295f45ee55801a39d01372829f9 Mon Sep 17 00:00:00 2001 From: AsafMah Date: Thu, 18 Nov 2021 09:25:48 +0200 Subject: [PATCH] Small fixes - expand streaming ingest to all Readable, fixed some imports, typos and updated gitignore (#153) * Small fixes - fixed type and updated gitignore * Expand IngestFromStream to any Readable, and fix some import errors --- .gitignore | 9 +++++ azure-kusto-data/source/security.ts | 20 +++++----- azure-kusto-ingest/package-lock.json | 38 +++++++++---------- .../source/abstractKustoClient.ts | 4 +- azure-kusto-ingest/source/descriptors.ts | 4 +- azure-kusto-ingest/source/ingestClient.ts | 4 +- .../source/managedStreamingIngestClient.ts | 8 ++-- .../source/streamingIngestClient.ts | 4 +- .../test/managedStreamingIngestClientTest.ts | 30 +++++++++++---- 9 files changed, 71 insertions(+), 50 deletions(-) diff --git a/.gitignore b/.gitignore index 9801a36..6e792d9 100644 --- a/.gitignore +++ b/.gitignore @@ -333,3 +333,12 @@ ASALocalRun/ package.json __main.js package-lock.json + +# Typescript product files +*.tsbuildinfo +*.js +*.js.map +*.d.ts + +# We still want example.js files +!example.js diff --git a/azure-kusto-data/source/security.ts b/azure-kusto-data/source/security.ts index fdb36b5..4457af2 100644 --- a/azure-kusto-data/source/security.ts +++ b/azure-kusto-data/source/security.ts @@ -5,7 +5,7 @@ import "./tokenProvider"; import * as TokenProvider from "./tokenProvider"; export class AadHelper { - tokeProvider: TokenProvider.TokenProviderBase; + tokenProvider: TokenProvider.TokenProviderBase; constructor(kcsb: KustoConnectionStringBuilder) { if (!kcsb.dataSource) { @@ -13,32 +13,32 @@ export class AadHelper { } if (!!kcsb.aadUserId && !!kcsb.password) { - this.tokeProvider = new TokenProvider.UserPassTokenProvider(kcsb.dataSource, kcsb.aadUserId, kcsb.password, kcsb.authorityId); + this.tokenProvider = new TokenProvider.UserPassTokenProvider(kcsb.dataSource, kcsb.aadUserId, kcsb.password, kcsb.authorityId); } else if (!!kcsb.applicationClientId && !!kcsb.applicationKey) { - this.tokeProvider = new TokenProvider.ApplicationKeyTokenProvider(kcsb.dataSource, kcsb.applicationClientId, kcsb.applicationKey, kcsb.authorityId); + this.tokenProvider = new TokenProvider.ApplicationKeyTokenProvider(kcsb.dataSource, kcsb.applicationClientId, kcsb.applicationKey, kcsb.authorityId); } else if (!!kcsb.applicationClientId && !!kcsb.applicationCertificateThumbprint && !!kcsb.applicationCertificatePrivateKey) { - this.tokeProvider = new TokenProvider.ApplicationCertificateTokenProvider(kcsb.dataSource, kcsb.applicationClientId, kcsb.applicationCertificateThumbprint, kcsb.applicationCertificatePrivateKey, kcsb.applicationCertificateX5c as string | undefined, kcsb.authorityId); + this.tokenProvider = new TokenProvider.ApplicationCertificateTokenProvider(kcsb.dataSource, kcsb.applicationClientId, kcsb.applicationCertificateThumbprint, kcsb.applicationCertificatePrivateKey, kcsb.applicationCertificateX5c as string | undefined, kcsb.authorityId); } else if (kcsb.managedIdentity) { - this.tokeProvider = new TokenProvider.MsiTokenProvider(kcsb.dataSource, kcsb.msiClientId as string | undefined); + this.tokenProvider = new TokenProvider.MsiTokenProvider(kcsb.dataSource, kcsb.msiClientId as string | undefined); } else if (kcsb.azLoginIdentity) { - this.tokeProvider = new TokenProvider.AzCliTokenProvider(kcsb.dataSource); + this.tokenProvider = new TokenProvider.AzCliTokenProvider(kcsb.dataSource); } else if (kcsb.accessToken) { - this.tokeProvider = new TokenProvider.BasicTokenProvider(kcsb.dataSource, kcsb.accessToken as string); + this.tokenProvider = new TokenProvider.BasicTokenProvider(kcsb.dataSource, kcsb.accessToken as string); } else { let callback = kcsb.deviceCodeCallback; if (!callback) { // tslint:disable-next-line:no-console callback = (response) => console.log(response.message); } - this.tokeProvider = new TokenProvider.DeviceLoginTokenProvider(kcsb.dataSource, callback); + this.tokenProvider = new TokenProvider.DeviceLoginTokenProvider(kcsb.dataSource, callback); } } async _getAuthHeader(): Promise { - const token = await this.tokeProvider.acquireToken(); + const token = await this.tokenProvider.acquireToken(); return `${token.tokenType} ${token.accessToken}`; } -}; +} export default AadHelper; \ No newline at end of file diff --git a/azure-kusto-ingest/package-lock.json b/azure-kusto-ingest/package-lock.json index 1f44b92..2737c9f 100644 --- a/azure-kusto-ingest/package-lock.json +++ b/azure-kusto-ingest/package-lock.json @@ -41,9 +41,9 @@ } }, "@azure/core-client": { - "version": "1.3.1", - "resolved": "https://registry.npmjs.org/@azure/core-client/-/core-client-1.3.1.tgz", - "integrity": "sha512-7IHm2DGg2u7dJYtCW84Ik7uENHfE8VsM/sWloZezPKYDoWZrg7JzwjvdGAfsaELKi2p0GE+JBaAbDYnNpr5V1w==", + "version": "1.3.2", + "resolved": "https://registry.npmjs.org/@azure/core-client/-/core-client-1.3.2.tgz", + "integrity": "sha512-qfkRYKmeEmisluMdGTbBtXeyBLaImjFeVW0gcT5yRAwxJmlnTvSyD+a3PjukAtjIrl/tnb4WSJOBpONSJ91+5Q==", "requires": { "@azure/abort-controller": "^1.0.0", "@azure/core-asynciterator-polyfill": "^1.0.0", @@ -169,9 +169,9 @@ } }, "@azure/core-rest-pipeline": { - "version": "1.3.1", - "resolved": "https://registry.npmjs.org/@azure/core-rest-pipeline/-/core-rest-pipeline-1.3.1.tgz", - "integrity": "sha512-xTQiv47O5cWzJFkwiDrUTT4K4IYbUIts0gaou5TZxAAuhQi9kAKWHEmFTjHVMOeAmyDhlMM5cb21M2n4WDto1A==", + "version": "1.3.2", + "resolved": "https://registry.npmjs.org/@azure/core-rest-pipeline/-/core-rest-pipeline-1.3.2.tgz", + "integrity": "sha512-kymICKESeHBpVLgQiAxllgWdSTopkqtmfPac8ITwMCxNEC6hzbSpqApYbjzxbBNkBMgoD4GESo6LLhR/sPh6kA==", "requires": { "@azure/abort-controller": "^1.0.0", "@azure/core-auth": "^1.3.0", @@ -305,20 +305,20 @@ } }, "@azure/msal-node": { - "version": "1.3.2", - "resolved": "https://registry.npmjs.org/@azure/msal-node/-/msal-node-1.3.2.tgz", - "integrity": "sha512-aKU2lVRKhZa1IJ/Za/Ir6qlythQ3FHz0g0px3SbM4iC1otyr3ANS4mIn/6fmkpZDIHc8eAgJh2KMep1Yn2zpig==", + "version": "1.3.3", + "resolved": "https://registry.npmjs.org/@azure/msal-node/-/msal-node-1.3.3.tgz", + "integrity": "sha512-ZtVCVzr7V4xEeqICa7E9g6BY3noZv96XG11ENuqEiz/PA1OzPD1/x0QF6BPHVldST8wwoevXxPw+t/h3AFII7w==", "requires": { - "@azure/msal-common": "^5.0.1", + "@azure/msal-common": "^5.1.0", "axios": "^0.21.4", "jsonwebtoken": "^8.5.1", "uuid": "^8.3.0" }, "dependencies": { "@azure/msal-common": { - "version": "5.0.1", - "resolved": "https://registry.npmjs.org/@azure/msal-common/-/msal-common-5.0.1.tgz", - "integrity": "sha512-CmPR3XM9+CGUu7V/+bAwDxyN6XqWJJhVLmv7utT3sbgay4l5roVXsD1t4wURTs8PwzxmmnJOrhvvGhoDxUW69g==", + "version": "5.1.0", + "resolved": "https://registry.npmjs.org/@azure/msal-common/-/msal-common-5.1.0.tgz", + "integrity": "sha512-4zHZ5Ec7jAgTIWZO3ap1ozgIPGAirF1wL8UhsmPF9QDoZz0cMHdaNmtov5i2+6Xq37YMzhN5s50EFHBuXd7sDQ==", "requires": { "debug": "^4.1.1" } @@ -1154,9 +1154,9 @@ } }, "follow-redirects": { - "version": "1.14.4", - "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.14.4.tgz", - "integrity": "sha512-zwGkiSXC1MUJG/qmeIFH2HBJx9u0V46QGUe3YR1fXG8bXQxq7fLj0RjLZQ5nubr9qNJUZrH+xUcwXEoXNpfS+g==" + "version": "1.14.5", + "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.14.5.tgz", + "integrity": "sha512-wtphSXy7d4/OR+MvIFbCVBDzZ5520qV8XfPklSN5QtxuMUJZ+b0Wnst1e1lCDocfzuCkHqj8k0FpZqO+UIaKNA==" }, "forever-agent": { "version": "0.6.1", @@ -1899,9 +1899,9 @@ "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" }, "msal": { - "version": "1.4.14", - "resolved": "https://registry.npmjs.org/msal/-/msal-1.4.14.tgz", - "integrity": "sha512-k8M5+/jbfSQoCf7CyQzBP5HE5mY8TkBujykLGTEp2x0MvOK/FQsfUTNis28zlvvPVzhgrhb5GQiGM8rRpXyHdA==", + "version": "1.4.15", + "resolved": "https://registry.npmjs.org/msal/-/msal-1.4.15.tgz", + "integrity": "sha512-H/CxkeZJ4laEK6GZ/cDKQoYjBTvDNFK3hDC8mfU8IkuZvKFfFdo9KM89r8spXY7xnBK9SQBAjIuQgwUogeUw7g==", "requires": { "tslib": "^1.9.3" } diff --git a/azure-kusto-ingest/source/abstractKustoClient.ts b/azure-kusto-ingest/source/abstractKustoClient.ts index 99bc1ce..99c413b 100644 --- a/azure-kusto-ingest/source/abstractKustoClient.ts +++ b/azure-kusto-ingest/source/abstractKustoClient.ts @@ -1,6 +1,6 @@ import IngestionProperties from "./ingestionProperties"; import {FileDescriptor, StreamDescriptor} from "./descriptors"; -import fs from "fs"; +import { Readable } from "stream"; export abstract class AbstractKustoClient { protected constructor(public defaultProps: IngestionProperties | null = null) { @@ -20,7 +20,7 @@ export abstract class AbstractKustoClient { return this.defaultProps.merge(newProperties) || new IngestionProperties({}); } - abstract ingestFromStream(stream: StreamDescriptor | fs.ReadStream, ingestionProperties: IngestionProperties): Promise; + abstract ingestFromStream(stream: StreamDescriptor | Readable, ingestionProperties: IngestionProperties): Promise; abstract ingestFromFile(file: FileDescriptor | string, ingestionProperties: IngestionProperties): Promise; } \ No newline at end of file diff --git a/azure-kusto-ingest/source/descriptors.ts b/azure-kusto-ingest/source/descriptors.ts index e616ddc..3f83dd4 100644 --- a/azure-kusto-ingest/source/descriptors.ts +++ b/azure-kusto-ingest/source/descriptors.ts @@ -5,7 +5,7 @@ import { v4 as uuidv4 } from 'uuid'; import uuidValidate from "uuid-validate"; import zlib from "zlib"; import pathlib from "path"; -import fs, {ReadStream} from "fs"; +import fs from "fs"; import { Readable } from 'stream'; export enum CompressionType { @@ -79,7 +79,7 @@ export class StreamDescriptor { size: number | null; compressionType: CompressionType; sourceId: string; - constructor(readonly stream: ReadStream | Readable, sourceId: string | null = null, compressionType: CompressionType = CompressionType.None) { + constructor(readonly stream: Readable, sourceId: string | null = null, compressionType: CompressionType = CompressionType.None) { this.name = "stream"; this.size = null; this.compressionType = compressionType; diff --git a/azure-kusto-ingest/source/ingestClient.ts b/azure-kusto-ingest/source/ingestClient.ts index 66d45d9..8871fca 100644 --- a/azure-kusto-ingest/source/ingestClient.ts +++ b/azure-kusto-ingest/source/ingestClient.ts @@ -14,8 +14,8 @@ import {QueueClient, QueueSendMessageResponse} from "@azure/storage-queue"; import {ContainerClient} from "@azure/storage-blob"; import IngestionProperties from "./ingestionProperties"; -import {ReadStream} from "fs"; import {AbstractKustoClient} from "./abstractKustoClient"; +import { Readable } from "stream"; export class KustoIngestClient extends AbstractKustoClient{ @@ -41,7 +41,7 @@ export class KustoIngestClient extends AbstractKustoClient{ return containerClient.getBlockBlobClient(blobName); } - async ingestFromStream(stream: ReadStream | StreamDescriptor, ingestionProperties: IngestionProperties): Promise { + async ingestFromStream(stream: StreamDescriptor | Readable, ingestionProperties: IngestionProperties): Promise { const props = this._mergeProps(ingestionProperties); props.validate(); const descriptor: StreamDescriptor = stream instanceof StreamDescriptor ? stream : new StreamDescriptor(stream); diff --git a/azure-kusto-ingest/source/managedStreamingIngestClient.ts b/azure-kusto-ingest/source/managedStreamingIngestClient.ts index be9e999..9d5f256 100644 --- a/azure-kusto-ingest/source/managedStreamingIngestClient.ts +++ b/azure-kusto-ingest/source/managedStreamingIngestClient.ts @@ -4,7 +4,6 @@ import IngestionProperties from "./ingestionProperties"; import {FileDescriptor, StreamDescriptor} from "./descriptors"; -import fs from "fs"; import {AbstractKustoClient} from "./abstractKustoClient"; import {KustoConnectionStringBuilder} from "azure-kusto-data"; import {KustoResponseDataSet} from "azure-kusto-data/source/response"; @@ -14,6 +13,7 @@ import IngestClient from "./ingestClient"; import { QueueSendMessageResponse } from "@azure/storage-queue"; import streamify from "stream-array"; import toArray from "stream-to-array"; +import { Readable } from "stream"; const maxSteamSize = 1024 * 1024 * 4; const maxRetries = 3 @@ -29,7 +29,7 @@ class KustoManagedStreamingIngestClient extends AbstractKustoClient { this.maxRetries = maxRetries; } - async ingestFromStream(stream: StreamDescriptor | fs.ReadStream, ingestionProperties: IngestionProperties): Promise { + async ingestFromStream(stream: StreamDescriptor | Readable, ingestionProperties: IngestionProperties): Promise { const props = this._mergeProps(ingestionProperties); props.validate(); const descriptor = stream instanceof StreamDescriptor ? stream : new StreamDescriptor(stream); @@ -52,9 +52,7 @@ class KustoManagedStreamingIngestClient extends AbstractKustoClient { } } - return await this.queuedIngestClient.ingestFromStream( - new StreamDescriptor(streamify(buffer)).merge(descriptor) - , ingestionProperties); + return await this.queuedIngestClient.ingestFromStream(new StreamDescriptor(streamify(buffer)).merge(descriptor), ingestionProperties); } async ingestFromFile(file: FileDescriptor | string, ingestionProperties: IngestionProperties): Promise { diff --git a/azure-kusto-ingest/source/streamingIngestClient.ts b/azure-kusto-ingest/source/streamingIngestClient.ts index db1c018..e13310b 100644 --- a/azure-kusto-ingest/source/streamingIngestClient.ts +++ b/azure-kusto-ingest/source/streamingIngestClient.ts @@ -5,11 +5,11 @@ import IngestionProperties from "./ingestionProperties"; import {CompressionType, FileDescriptor, StreamDescriptor} from "./descriptors"; import zlib from "zlib"; -import fs from "fs"; import {AbstractKustoClient} from "./abstractKustoClient"; import {Client as KustoClient, KustoConnectionStringBuilder} from "azure-kusto-data"; import {KustoResponseDataSet} from "azure-kusto-data/source/response"; import { fileToStream } from "./utils"; +import { Readable } from "stream"; class KustoStreamingIngestClient extends AbstractKustoClient { private kustoClient: KustoClient; @@ -20,7 +20,7 @@ class KustoStreamingIngestClient extends AbstractKustoClient { this.kustoClient = new KustoClient(kcsb); } - async ingestFromStream(stream: StreamDescriptor | fs.ReadStream, ingestionProperties: IngestionProperties): Promise { + async ingestFromStream(stream: StreamDescriptor | Readable, ingestionProperties: IngestionProperties): Promise { const props = this._mergeProps(ingestionProperties); props.validate(); const descriptor: StreamDescriptor = stream instanceof StreamDescriptor ? stream : new StreamDescriptor(stream); diff --git a/azure-kusto-ingest/test/managedStreamingIngestClientTest.ts b/azure-kusto-ingest/test/managedStreamingIngestClientTest.ts index 559bbf2..4455414 100644 --- a/azure-kusto-ingest/test/managedStreamingIngestClientTest.ts +++ b/azure-kusto-ingest/test/managedStreamingIngestClientTest.ts @@ -2,12 +2,12 @@ // Licensed under the MIT License. import sinon from "sinon"; -import { StreamingIngestClient } from ".."; +import { StreamingIngestClient } from "../index"; import { StreamDescriptor } from "../source/descriptors"; import {KustoIngestClient} from "../source/ingestClient"; import {DataFormat, IngestionProperties} from "../source/ingestionProperties"; import KustoManagedStreamingIngestClient from "../source/managedStreamingIngestClient"; -var Stream = require('stream'); +import { Readable } from "stream"; describe("ManagedStreamingIngestClient", function () { describe("fallback", function () { @@ -23,20 +23,34 @@ describe("ManagedStreamingIngestClient", function () { const mockedManagedStreamingIngestClient: KustoManagedStreamingIngestClient = Object.setPrototypeOf({ streamingIngestClient: mockedStreamingIngestClient, queuedIngestClient: mockedIngestClient, maxRetries: 1 }, KustoManagedStreamingIngestClient.prototype); - var stream = new Stream(); - - stream.on('data', function(data: any) { - console.log(data) + + const stream = new Readable(); + stream._read = () => { + stream.push("this is my string"); + stream.push(null); + }; + + stream.on('data', function(data: Buffer) { + console.log(data.toString("utf-8")) }); - stream.emit('data', 'this is my string'); try{ await mockedManagedStreamingIngestClient.ingestFromStream(new StreamDescriptor(stream), new IngestionProperties({ database: 'db', table: 't1', format: DataFormat.CSV, })); - } catch {} + } catch (e: unknown) { + if (e instanceof Error) { + let expectedError = "Failed to get cloud info for cluster engine"; + if (!e.message.startsWith(expectedError)) { + throw e; + } + return; + } + + throw e; + } sandbox.assert.calledOnce(spy); }).timeout(10000); });