Small fixes - expand streaming ingest to all Readable, fixed some imports, typos and updated gitignore (#153)

* Small fixes - fixed type and updated gitignore

* Expand IngestFromStream to any Readable, and fix some import errors
This commit is contained in:
AsafMah 2021-11-18 09:25:48 +02:00 коммит произвёл GitHub
Родитель f46e5e96f0
Коммит 7e7082ded7
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
9 изменённых файлов: 71 добавлений и 50 удалений

9
.gitignore поставляемый
Просмотреть файл

@ -333,3 +333,12 @@ ASALocalRun/
package.json
__main.js
package-lock.json
# Typescript product files
*.tsbuildinfo
*.js
*.js.map
*.d.ts
# We still want example.js files
!example.js

Просмотреть файл

@ -5,7 +5,7 @@ import "./tokenProvider";
import * as TokenProvider from "./tokenProvider";
export class AadHelper {
tokeProvider: TokenProvider.TokenProviderBase;
tokenProvider: TokenProvider.TokenProviderBase;
constructor(kcsb: KustoConnectionStringBuilder) {
if (!kcsb.dataSource) {
@ -13,32 +13,32 @@ export class AadHelper {
}
if (!!kcsb.aadUserId && !!kcsb.password) {
this.tokeProvider = new TokenProvider.UserPassTokenProvider(kcsb.dataSource, kcsb.aadUserId, kcsb.password, kcsb.authorityId);
this.tokenProvider = new TokenProvider.UserPassTokenProvider(kcsb.dataSource, kcsb.aadUserId, kcsb.password, kcsb.authorityId);
} else if (!!kcsb.applicationClientId && !!kcsb.applicationKey) {
this.tokeProvider = new TokenProvider.ApplicationKeyTokenProvider(kcsb.dataSource, kcsb.applicationClientId, kcsb.applicationKey, kcsb.authorityId);
this.tokenProvider = new TokenProvider.ApplicationKeyTokenProvider(kcsb.dataSource, kcsb.applicationClientId, kcsb.applicationKey, kcsb.authorityId);
} else if (!!kcsb.applicationClientId &&
!!kcsb.applicationCertificateThumbprint && !!kcsb.applicationCertificatePrivateKey) {
this.tokeProvider = new TokenProvider.ApplicationCertificateTokenProvider(kcsb.dataSource, kcsb.applicationClientId, kcsb.applicationCertificateThumbprint, kcsb.applicationCertificatePrivateKey, kcsb.applicationCertificateX5c as string | undefined, kcsb.authorityId);
this.tokenProvider = new TokenProvider.ApplicationCertificateTokenProvider(kcsb.dataSource, kcsb.applicationClientId, kcsb.applicationCertificateThumbprint, kcsb.applicationCertificatePrivateKey, kcsb.applicationCertificateX5c as string | undefined, kcsb.authorityId);
} else if (kcsb.managedIdentity) {
this.tokeProvider = new TokenProvider.MsiTokenProvider(kcsb.dataSource, kcsb.msiClientId as string | undefined);
this.tokenProvider = new TokenProvider.MsiTokenProvider(kcsb.dataSource, kcsb.msiClientId as string | undefined);
} else if (kcsb.azLoginIdentity) {
this.tokeProvider = new TokenProvider.AzCliTokenProvider(kcsb.dataSource);
this.tokenProvider = new TokenProvider.AzCliTokenProvider(kcsb.dataSource);
} else if (kcsb.accessToken) {
this.tokeProvider = new TokenProvider.BasicTokenProvider(kcsb.dataSource, kcsb.accessToken as string);
this.tokenProvider = new TokenProvider.BasicTokenProvider(kcsb.dataSource, kcsb.accessToken as string);
} else {
let callback = kcsb.deviceCodeCallback;
if (!callback) {
// tslint:disable-next-line:no-console
callback = (response) => console.log(response.message);
}
this.tokeProvider = new TokenProvider.DeviceLoginTokenProvider(kcsb.dataSource, callback);
this.tokenProvider = new TokenProvider.DeviceLoginTokenProvider(kcsb.dataSource, callback);
}
}
async _getAuthHeader(): Promise<string> {
const token = await this.tokeProvider.acquireToken();
const token = await this.tokenProvider.acquireToken();
return `${token.tokenType} ${token.accessToken}`;
}
};
}
export default AadHelper;

38
azure-kusto-ingest/package-lock.json сгенерированный
Просмотреть файл

@ -41,9 +41,9 @@
}
},
"@azure/core-client": {
"version": "1.3.1",
"resolved": "https://registry.npmjs.org/@azure/core-client/-/core-client-1.3.1.tgz",
"integrity": "sha512-7IHm2DGg2u7dJYtCW84Ik7uENHfE8VsM/sWloZezPKYDoWZrg7JzwjvdGAfsaELKi2p0GE+JBaAbDYnNpr5V1w==",
"version": "1.3.2",
"resolved": "https://registry.npmjs.org/@azure/core-client/-/core-client-1.3.2.tgz",
"integrity": "sha512-qfkRYKmeEmisluMdGTbBtXeyBLaImjFeVW0gcT5yRAwxJmlnTvSyD+a3PjukAtjIrl/tnb4WSJOBpONSJ91+5Q==",
"requires": {
"@azure/abort-controller": "^1.0.0",
"@azure/core-asynciterator-polyfill": "^1.0.0",
@ -169,9 +169,9 @@
}
},
"@azure/core-rest-pipeline": {
"version": "1.3.1",
"resolved": "https://registry.npmjs.org/@azure/core-rest-pipeline/-/core-rest-pipeline-1.3.1.tgz",
"integrity": "sha512-xTQiv47O5cWzJFkwiDrUTT4K4IYbUIts0gaou5TZxAAuhQi9kAKWHEmFTjHVMOeAmyDhlMM5cb21M2n4WDto1A==",
"version": "1.3.2",
"resolved": "https://registry.npmjs.org/@azure/core-rest-pipeline/-/core-rest-pipeline-1.3.2.tgz",
"integrity": "sha512-kymICKESeHBpVLgQiAxllgWdSTopkqtmfPac8ITwMCxNEC6hzbSpqApYbjzxbBNkBMgoD4GESo6LLhR/sPh6kA==",
"requires": {
"@azure/abort-controller": "^1.0.0",
"@azure/core-auth": "^1.3.0",
@ -305,20 +305,20 @@
}
},
"@azure/msal-node": {
"version": "1.3.2",
"resolved": "https://registry.npmjs.org/@azure/msal-node/-/msal-node-1.3.2.tgz",
"integrity": "sha512-aKU2lVRKhZa1IJ/Za/Ir6qlythQ3FHz0g0px3SbM4iC1otyr3ANS4mIn/6fmkpZDIHc8eAgJh2KMep1Yn2zpig==",
"version": "1.3.3",
"resolved": "https://registry.npmjs.org/@azure/msal-node/-/msal-node-1.3.3.tgz",
"integrity": "sha512-ZtVCVzr7V4xEeqICa7E9g6BY3noZv96XG11ENuqEiz/PA1OzPD1/x0QF6BPHVldST8wwoevXxPw+t/h3AFII7w==",
"requires": {
"@azure/msal-common": "^5.0.1",
"@azure/msal-common": "^5.1.0",
"axios": "^0.21.4",
"jsonwebtoken": "^8.5.1",
"uuid": "^8.3.0"
},
"dependencies": {
"@azure/msal-common": {
"version": "5.0.1",
"resolved": "https://registry.npmjs.org/@azure/msal-common/-/msal-common-5.0.1.tgz",
"integrity": "sha512-CmPR3XM9+CGUu7V/+bAwDxyN6XqWJJhVLmv7utT3sbgay4l5roVXsD1t4wURTs8PwzxmmnJOrhvvGhoDxUW69g==",
"version": "5.1.0",
"resolved": "https://registry.npmjs.org/@azure/msal-common/-/msal-common-5.1.0.tgz",
"integrity": "sha512-4zHZ5Ec7jAgTIWZO3ap1ozgIPGAirF1wL8UhsmPF9QDoZz0cMHdaNmtov5i2+6Xq37YMzhN5s50EFHBuXd7sDQ==",
"requires": {
"debug": "^4.1.1"
}
@ -1154,9 +1154,9 @@
}
},
"follow-redirects": {
"version": "1.14.4",
"resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.14.4.tgz",
"integrity": "sha512-zwGkiSXC1MUJG/qmeIFH2HBJx9u0V46QGUe3YR1fXG8bXQxq7fLj0RjLZQ5nubr9qNJUZrH+xUcwXEoXNpfS+g=="
"version": "1.14.5",
"resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.14.5.tgz",
"integrity": "sha512-wtphSXy7d4/OR+MvIFbCVBDzZ5520qV8XfPklSN5QtxuMUJZ+b0Wnst1e1lCDocfzuCkHqj8k0FpZqO+UIaKNA=="
},
"forever-agent": {
"version": "0.6.1",
@ -1899,9 +1899,9 @@
"integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w=="
},
"msal": {
"version": "1.4.14",
"resolved": "https://registry.npmjs.org/msal/-/msal-1.4.14.tgz",
"integrity": "sha512-k8M5+/jbfSQoCf7CyQzBP5HE5mY8TkBujykLGTEp2x0MvOK/FQsfUTNis28zlvvPVzhgrhb5GQiGM8rRpXyHdA==",
"version": "1.4.15",
"resolved": "https://registry.npmjs.org/msal/-/msal-1.4.15.tgz",
"integrity": "sha512-H/CxkeZJ4laEK6GZ/cDKQoYjBTvDNFK3hDC8mfU8IkuZvKFfFdo9KM89r8spXY7xnBK9SQBAjIuQgwUogeUw7g==",
"requires": {
"tslib": "^1.9.3"
}

Просмотреть файл

@ -1,6 +1,6 @@
import IngestionProperties from "./ingestionProperties";
import {FileDescriptor, StreamDescriptor} from "./descriptors";
import fs from "fs";
import { Readable } from "stream";
export abstract class AbstractKustoClient {
protected constructor(public defaultProps: IngestionProperties | null = null) {
@ -20,7 +20,7 @@ export abstract class AbstractKustoClient {
return this.defaultProps.merge(newProperties) || new IngestionProperties({});
}
abstract ingestFromStream(stream: StreamDescriptor | fs.ReadStream, ingestionProperties: IngestionProperties): Promise<any>;
abstract ingestFromStream(stream: StreamDescriptor | Readable, ingestionProperties: IngestionProperties): Promise<any>;
abstract ingestFromFile(file: FileDescriptor | string, ingestionProperties: IngestionProperties): Promise<any>;
}

Просмотреть файл

@ -5,7 +5,7 @@ import { v4 as uuidv4 } from 'uuid';
import uuidValidate from "uuid-validate";
import zlib from "zlib";
import pathlib from "path";
import fs, {ReadStream} from "fs";
import fs from "fs";
import { Readable } from 'stream';
export enum CompressionType {
@ -79,7 +79,7 @@ export class StreamDescriptor {
size: number | null;
compressionType: CompressionType;
sourceId: string;
constructor(readonly stream: ReadStream | Readable, sourceId: string | null = null, compressionType: CompressionType = CompressionType.None) {
constructor(readonly stream: Readable, sourceId: string | null = null, compressionType: CompressionType = CompressionType.None) {
this.name = "stream";
this.size = null;
this.compressionType = compressionType;

Просмотреть файл

@ -14,8 +14,8 @@ import {QueueClient, QueueSendMessageResponse} from "@azure/storage-queue";
import {ContainerClient} from "@azure/storage-blob";
import IngestionProperties from "./ingestionProperties";
import {ReadStream} from "fs";
import {AbstractKustoClient} from "./abstractKustoClient";
import { Readable } from "stream";
export class KustoIngestClient extends AbstractKustoClient{
@ -41,7 +41,7 @@ export class KustoIngestClient extends AbstractKustoClient{
return containerClient.getBlockBlobClient(blobName);
}
async ingestFromStream(stream: ReadStream | StreamDescriptor, ingestionProperties: IngestionProperties): Promise<QueueSendMessageResponse> {
async ingestFromStream(stream: StreamDescriptor | Readable, ingestionProperties: IngestionProperties): Promise<any> {
const props = this._mergeProps(ingestionProperties);
props.validate();
const descriptor: StreamDescriptor = stream instanceof StreamDescriptor ? stream : new StreamDescriptor(stream);

Просмотреть файл

@ -4,7 +4,6 @@
import IngestionProperties from "./ingestionProperties";
import {FileDescriptor, StreamDescriptor} from "./descriptors";
import fs from "fs";
import {AbstractKustoClient} from "./abstractKustoClient";
import {KustoConnectionStringBuilder} from "azure-kusto-data";
import {KustoResponseDataSet} from "azure-kusto-data/source/response";
@ -14,6 +13,7 @@ import IngestClient from "./ingestClient";
import { QueueSendMessageResponse } from "@azure/storage-queue";
import streamify from "stream-array";
import toArray from "stream-to-array";
import { Readable } from "stream";
const maxSteamSize = 1024 * 1024 * 4;
const maxRetries = 3
@ -29,7 +29,7 @@ class KustoManagedStreamingIngestClient extends AbstractKustoClient {
this.maxRetries = maxRetries;
}
async ingestFromStream(stream: StreamDescriptor | fs.ReadStream, ingestionProperties: IngestionProperties): Promise<KustoResponseDataSet | QueueSendMessageResponse> {
async ingestFromStream(stream: StreamDescriptor | Readable, ingestionProperties: IngestionProperties): Promise<any> {
const props = this._mergeProps(ingestionProperties);
props.validate();
const descriptor = stream instanceof StreamDescriptor ? stream : new StreamDescriptor(stream);
@ -52,9 +52,7 @@ class KustoManagedStreamingIngestClient extends AbstractKustoClient {
}
}
return await this.queuedIngestClient.ingestFromStream(
new StreamDescriptor(streamify(buffer)).merge(descriptor)
, ingestionProperties);
return await this.queuedIngestClient.ingestFromStream(new StreamDescriptor(streamify(buffer)).merge(descriptor), ingestionProperties);
}
async ingestFromFile(file: FileDescriptor | string, ingestionProperties: IngestionProperties): Promise<KustoResponseDataSet | QueueSendMessageResponse> {

Просмотреть файл

@ -5,11 +5,11 @@ import IngestionProperties from "./ingestionProperties";
import {CompressionType, FileDescriptor, StreamDescriptor} from "./descriptors";
import zlib from "zlib";
import fs from "fs";
import {AbstractKustoClient} from "./abstractKustoClient";
import {Client as KustoClient, KustoConnectionStringBuilder} from "azure-kusto-data";
import {KustoResponseDataSet} from "azure-kusto-data/source/response";
import { fileToStream } from "./utils";
import { Readable } from "stream";
class KustoStreamingIngestClient extends AbstractKustoClient {
private kustoClient: KustoClient;
@ -20,7 +20,7 @@ class KustoStreamingIngestClient extends AbstractKustoClient {
this.kustoClient = new KustoClient(kcsb);
}
async ingestFromStream(stream: StreamDescriptor | fs.ReadStream, ingestionProperties: IngestionProperties): Promise<KustoResponseDataSet> {
async ingestFromStream(stream: StreamDescriptor | Readable, ingestionProperties: IngestionProperties): Promise<any> {
const props = this._mergeProps(ingestionProperties);
props.validate();
const descriptor: StreamDescriptor = stream instanceof StreamDescriptor ? stream : new StreamDescriptor(stream);

Просмотреть файл

@ -2,12 +2,12 @@
// Licensed under the MIT License.
import sinon from "sinon";
import { StreamingIngestClient } from "..";
import { StreamingIngestClient } from "../index";
import { StreamDescriptor } from "../source/descriptors";
import {KustoIngestClient} from "../source/ingestClient";
import {DataFormat, IngestionProperties} from "../source/ingestionProperties";
import KustoManagedStreamingIngestClient from "../source/managedStreamingIngestClient";
var Stream = require('stream');
import { Readable } from "stream";
describe("ManagedStreamingIngestClient", function () {
describe("fallback", function () {
@ -23,20 +23,34 @@ describe("ManagedStreamingIngestClient", function () {
const mockedManagedStreamingIngestClient: KustoManagedStreamingIngestClient =
Object.setPrototypeOf({ streamingIngestClient: mockedStreamingIngestClient,
queuedIngestClient: mockedIngestClient, maxRetries: 1 }, KustoManagedStreamingIngestClient.prototype);
var stream = new Stream();
stream.on('data', function(data: any) {
console.log(data)
const stream = new Readable();
stream._read = () => {
stream.push("this is my string");
stream.push(null);
};
stream.on('data', function(data: Buffer) {
console.log(data.toString("utf-8"))
});
stream.emit('data', 'this is my string');
try{
await mockedManagedStreamingIngestClient.ingestFromStream(new StreamDescriptor(stream), new IngestionProperties({
database: 'db',
table: 't1',
format: DataFormat.CSV,
}));
} catch {}
} catch (e: unknown) {
if (e instanceof Error) {
let expectedError = "Failed to get cloud info for cluster engine";
if (!e.message.startsWith(expectedError)) {
throw e;
}
return;
}
throw e;
}
sandbox.assert.calledOnce(spy);
}).timeout(10000);
});