Moved the project to typescript

Changed the project to use typescript instead of javascript.

Typed all of the classes and functions.
This commit is contained in:
AsafMah 2020-12-16 16:39:25 +02:00 коммит произвёл GitHub
Родитель ecd43f4ff5
Коммит 28ff1f98bb
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
53 изменённых файлов: 2616 добавлений и 3356 удалений

Просмотреть файл

@ -1,41 +0,0 @@
{
"env": {
"node": true,
"commonjs": true,
"es6": true,
"mocha": true
},
"extends": "eslint:recommended",
"parserOptions": {
"ecmaVersion": 2018
},
"rules": {
"eol-last":"error",
"no-console": "off",
"indent": [
"error",
4,
{
"SwitchCase": 1
}
],
"linebreak-style": [
"warn",
"windows"
],
"quotes": [
"error",
"double"
],
"semi": [
"error",
"always"
],
"max-len": [
"error",
{
"code": 140
}
]
}
}

Просмотреть файл

@ -1,11 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
const Client = require("./source/client");
const ClientRequestProperties = require("./source/clientRequestProperties");
const KustoConnectionStringBuilder = require("./source/connectionBuilder");
module.exports = {
Client,
KustoConnectionStringBuilder,
ClientRequestProperties
};

13
azure-kusto-data/index.ts Normal file
Просмотреть файл

@ -0,0 +1,13 @@
/* tslint:disable:no-var-requires */
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
import KustoClient from "./source/client";
import ClientRequestProperties from "./source/clientRequestProperties";
import KustoConnectionStringBuilder from "./source/connectionBuilder";
export {
KustoClient as Client,
ClientRequestProperties,
KustoConnectionStringBuilder
}

932
azure-kusto-data/package-lock.json сгенерированный

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -1,8 +1,9 @@
{
"name": "azure-kusto-data",
"version": "1.0.3",
"version": "2.0.0",
"description": "Azure Data Explorer Query SDK",
"main": "index.js",
"types": "index.d.ts",
"engines": {
"node": ">= 8.0.0"
},
@ -17,9 +18,11 @@
"kusto"
],
"scripts": {
"example": "node example.js",
"test": "mocha",
"lint": "eslint source --quiet"
"build": "tsc -b",
"prepublish": "npm run build ",
"example": "npm run build && node example.js",
"lint": "npm run build && tslint --project tsconfig.json --quiet",
"test": "npm run build && mocha --require ts-node/register"
},
"author": "",
"license": "ISC",
@ -31,8 +34,13 @@
"uuid": "^3.4.0"
},
"devDependencies": {
"eslint": "^6.8.0",
"mocha": "^7.1.2",
"sinon": "^7.2.3"
"sinon": "^7.2.3",
"tslint": "^6.1.3",
"typescript": "^4.1.3",
"ts-node": "^9.1.1",
"@types/mocha": "^8.2.0",
"@types/node": "^14.14.13",
"@types/uuid": "^8.3.0"
}
}

Просмотреть файл

@ -1,14 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
const {AzureCliCredentials} = require("@azure/ms-rest-nodeauth");
module.exports = function acquireToken(connectionString, callback) {
AzureCliCredentials.create({ resource: connectionString }).then((res)=>{
const tokenData = res.tokenInfo;
return callback(null, { tokenType: tokenData.tokenType, accessToken: tokenData.accessToken });
}).catch(err=>callback(err));
};

Просмотреть файл

@ -0,0 +1,15 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
import {AzureCliCredentials} from "@azure/ms-rest-nodeauth";
import {TokenResponse} from "adal-node";
export default function acquireToken<T>(connectionString: string, callback: (err: Error | null, data?: { tokenType: string; accessToken: string }) => T) {
AzureCliCredentials.create({resource: connectionString}).then((res) => {
const tokenData = res.tokenInfo;
return callback(null, {tokenType: tokenData.tokenType, accessToken: tokenData.accessToken});
}).catch(err => callback(err));
}

Просмотреть файл

@ -1,30 +1,36 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
const moment = require("moment");
const uuidv4 = require("uuid/v4");
const AadHelper = require("./security");
const { KustoResponseDataSetV1, KustoResponseDataSetV2 } = require("./response");
const ConnectionStringBuilder = require("./connectionBuilder");
const ClientRequestProperties = require("./clientRequestProperties");
const pkg = require("../package.json");
const axios = require("axios");
import moment from "moment";
import uuid from "uuid";
import AadHelper from "./security";
import {KustoResponseDataSet, KustoResponseDataSetV1, KustoResponseDataSetV2} from "./response";
import ConnectionStringBuilder from "./connectionBuilder";
import ClientRequestProperties from "./clientRequestProperties";
import pkg from "../package.json";
import axios from "axios";
const COMMAND_TIMEOUT_IN_MILLISECS = moment.duration(10.5, "minutes").asMilliseconds();
const QUERY_TIMEOUT_IN_MILLISECS = moment.duration(4.5, "minutes").asMilliseconds();
const CLIENT_SERVER_DELTA_IN_MILLISECS = moment.duration(0.5, "minutes").asMilliseconds();
const MGMT_PREFIX = ".";
const ExecutionType = Object.freeze({
Mgmt: 0,
Query: 1,
Ingest: 2
});
enum ExecutionType {
Mgmt = 0,
Query = 1,
Ingest = 2
}
module.exports = class KustoClient {
constructor(kcsb) {
export class KustoClient {
connectionString: ConnectionStringBuilder;
cluster: string;
endpoints: { mgmt: string; query: string; ingest: string; };
aadHelper: AadHelper;
headers: { [name: string]: string };
constructor(kcsb: string | ConnectionStringBuilder) {
this.connectionString = typeof (kcsb) === "string" ? new ConnectionStringBuilder(kcsb) : kcsb;
this.cluster = this.connectionString.dataSource;
this.cluster = (this.connectionString.dataSource as string);
this.endpoints = {
mgmt: `${this.cluster}/v1/rest/mgmt`,
query: `${this.cluster}/v2/rest/query`,
@ -38,7 +44,7 @@ module.exports = class KustoClient {
};
}
async execute(db, query, properties) {
async execute(db: string, query: string, properties?: ClientRequestProperties) {
query = query.trim();
if (query.startsWith(MGMT_PREFIX)) {
return this.executeMgmt(db, query, properties);
@ -47,15 +53,15 @@ module.exports = class KustoClient {
return this.executeQuery(db, query, properties);
}
async executeQuery(db, query, properties) {
async executeQuery(db: string, query: string, properties?: ClientRequestProperties) {
return this._execute(this.endpoints.query, ExecutionType.Query, db, query, null, properties);
}
async executeMgmt(db, query, properties) {
async executeMgmt(db: string, query: string, properties?: ClientRequestProperties) {
return this._execute(this.endpoints.mgmt, ExecutionType.Mgmt, db, query, null, properties);
}
async executeStreamingIngest(db, table, stream, streamFormat, mappingName) {
async executeStreamingIngest(db: string, table: string, stream: any, streamFormat: any, mappingName: string | null): Promise<KustoResponseDataSet> {
let endpoint = `${this.endpoints.ingest}/${db}/${table}?streamFormat=${streamFormat}`;
if (mappingName != null) {
endpoint += `&mappingName=${mappingName}`;
@ -64,68 +70,77 @@ module.exports = class KustoClient {
return this._execute(endpoint, ExecutionType.Ingest, db, null, stream, null);
}
async _execute(endpoint, executionType, db, query, stream, properties) {
const headers = {};
async _execute(
endpoint: string,
executionType: ExecutionType,
db: string,
query: string | null,
stream: string | null,
properties?: ClientRequestProperties | null): Promise<KustoResponseDataSet> {
const headers: { [header: string]: string } = {};
Object.assign(headers, this.headers);
let payload;
let payload: { db: string, csl: string, properties?: any };
let clientRequestPrefix = "";
let clientRequestId;
let timeout = this._getClientTimeout(executionType, properties);
const timeout = this._getClientTimeout(executionType, properties);
let payloadStr = "";
if (query != null) {
payload = {
"db": db,
"csl": query
};
if (properties != null && properties instanceof ClientRequestProperties) {
if (properties != null) {
payload.properties = properties.toJson();
clientRequestId = properties.clientRequestId;
}
payload = JSON.stringify(payload);
payloadStr = JSON.stringify(payload);
headers["Content-Type"] = "application/json; charset=utf-8";
clientRequestPrefix = "KNC.execute;";
} else if (stream != null) {
payload = stream;
payloadStr = stream;
clientRequestPrefix = "KNC.executeStreamingIngest;";
headers["Content-Encoding"] = "gzip";
headers["Content-Type"] = "multipart/form-data";
}
headers["x-ms-client-request-id"] = clientRequestId || clientRequestPrefix + `${uuidv4()}`;
headers["x-ms-client-request-id"] = clientRequestId || clientRequestPrefix + `${uuid.v4()}`;
headers["Authorization"] = await this.aadHelper._getAuthHeader();
headers.Authorization = await this.aadHelper._getAuthHeader();
return this._doRequest(endpoint, executionType, headers, payload, timeout, properties);
return this._doRequest(endpoint, executionType, headers, payloadStr, timeout, properties);
}
async _doRequest(endpoint, executionType, headers, payload, timeout, properties) {
let axiosConfig = {
headers: headers,
async _doRequest(endpoint: string,
executionType: ExecutionType,
headers: { [header: string]: string; },
payload: string,
timeout: number,
properties?: ClientRequestProperties | null): Promise<KustoResponseDataSet> {
const axiosConfig = {
headers,
gzip: true,
timeout: timeout
timeout
};
let axiosResponse;
try {
axiosResponse = await axios.post(endpoint, payload, axiosConfig);
}
catch (error) {
} catch (error) {
if (error.response) {
throw error.response.data.error;
}
throw error;
}
return this._parseResponse(axiosResponse.data, executionType, properties);
return this._parseResponse(axiosResponse.data, executionType, properties, axiosResponse.status);
}
_parseResponse(response, executionType, properties) {
const { raw } = properties || {};
_parseResponse(response: any, executionType: ExecutionType, properties?: ClientRequestProperties | null, status?: number) : KustoResponseDataSet {
const {raw} = properties || {};
if (raw === true || executionType == ExecutionType.Ingest) {
return response;
}
@ -138,16 +153,16 @@ module.exports = class KustoClient {
kustoResponse = new KustoResponseDataSetV1(response);
}
} catch (ex) {
throw `Failed to parse response ({${response.status}}) with the following error [${ex}].`;
throw new Error(`Failed to parse response ({${status}}) with the following error [${ex}].`);
}
if (kustoResponse.getErrorsCount() > 0) {
throw `Kusto request had errors. ${kustoResponse.getExceptions()}`;
throw new Error(`Kusto request had errors. ${kustoResponse.getExceptions()}`);
}
return kustoResponse;
}
_getClientTimeout(executionType, properties) {
if (properties != null && properties instanceof ClientRequestProperties) {
_getClientTimeout(executionType: ExecutionType, properties?: ClientRequestProperties | null): number {
if (properties != null) {
const clientTimeout = properties.getClientTimeout();
if (clientTimeout) {
return clientTimeout;
@ -159,7 +174,8 @@ module.exports = class KustoClient {
}
}
const defaultTimeout = executionType == ExecutionType.Query ? QUERY_TIMEOUT_IN_MILLISECS : COMMAND_TIMEOUT_IN_MILLISECS;
return defaultTimeout;
return executionType == ExecutionType.Query ? QUERY_TIMEOUT_IN_MILLISECS : COMMAND_TIMEOUT_IN_MILLISECS;
}
};
}
export default KustoClient;

Просмотреть файл

@ -1,29 +1,35 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
module.exports = class ClientRequestProperties {
constructor(options, parameters, clientRequestId) {
export class ClientRequestProperties {
private _options: { [option: string]: any };
private _parameters: { [option: string]: any };
private _clientTimeOut?: number;
public clientRequestId: string | null;
public raw?: boolean;
constructor(options?: {}, parameters?: {}, clientRequestId?: null) {
this._options = options || {};
this._parameters = parameters || {};
this.clientRequestId = clientRequestId || null;
}
setOption(name, value) {
setOption(name: string, value: any) {
this._options[name] = value;
}
getOption(name, defaultValue) {
getOption(name: string, defaultValue?: any) {
if (!this._options || this._options[name] === undefined)
return defaultValue;
return this._options[name];
}
setParameter(name, value) {
setParameter(name: string, value: any) {
this._parameters[name] = value;
}
getParameter(name, defaultValue) {
getParameter(name: string, defaultValue?: any) {
if (!this._parameters || this._parameters[name] === undefined) {
return defaultValue;
}
@ -35,7 +41,7 @@ module.exports = class ClientRequestProperties {
this._parameters = {};
}
setTimeout(timeoutMillis) {
setTimeout(timeoutMillis: number) {
this.setOption("servertimeout", timeoutMillis);
}
@ -43,7 +49,7 @@ module.exports = class ClientRequestProperties {
return this.getOption("servertimeout");
}
setClientTimeout(timeoutMillis) {
setClientTimeout(timeoutMillis: number) {
this._clientTimeOut = timeoutMillis;
}
@ -56,7 +62,7 @@ module.exports = class ClientRequestProperties {
}
toJson() {
let json = {};
const json: { Options?: { [option: string]: any }, Parameters?: { [option: string]: any } } = {};
if (Object.keys(this._options).length !== 0) {
json.Options = this._options;
@ -76,16 +82,18 @@ module.exports = class ClientRequestProperties {
return JSON.stringify(this.toJson());
}
_msToTimespan(duration) {
var milliseconds = parseInt((duration % 1000) / 100)
, seconds = parseInt((duration / 1000) % 60)
, minutes = parseInt((duration / (1000 * 60)) % 60)
, hours = parseInt((duration / (1000 * 60 * 60)) % 24);
_msToTimespan(duration: number): string {
const milliseconds = Math.floor((duration % 1000) / 100);
const seconds = Math.floor((duration / 1000) % 60);
const minutes = Math.floor((duration / (1000 * 60)) % 60);
const hours = Math.floor((duration / (1000 * 60 * 60)) % 24);
hours = (hours < 10) ? "0" + hours : hours;
minutes = (minutes < 10) ? "0" + minutes : minutes;
seconds = (seconds < 10) ? "0" + seconds : seconds;
const hoursStr = (hours < 10) ? "0" + hours : String(hours);
const minutesStr = (minutes < 10) ? "0" + minutes : String(minutes);
const secondsStr = (seconds < 10) ? "0" + seconds : String(seconds);
return hours + ":" + minutes + ":" + seconds + "." + milliseconds;
return hoursStr + ":" + minutesStr + ":" + secondsStr + "." + milliseconds;
}
};
}
export default ClientRequestProperties;

Просмотреть файл

@ -1,7 +1,15 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
const KeywordMapping = Object.freeze({
import {UserCodeInfo} from "adal-node";
interface MappingType {
propName: string,
mappedTo: string,
validNames: string[]
}
const KeywordMapping: { [name: string]: MappingType } = Object.freeze({
dataSource: {
propName: "dataSource",
mappedTo: "Data Source",
@ -44,11 +52,11 @@ const KeywordMapping = Object.freeze({
}
});
const getPropName = (key) => {
let _key = key.trim().toLowerCase();
const getPropName = (key: string): string => {
const _key = key.trim().toLowerCase();
for (let keyword of Object.keys(KeywordMapping)) {
let k = KeywordMapping[keyword];
for (const keyword of Object.keys(KeywordMapping)) {
const k = KeywordMapping[keyword];
if (k.validNames.indexOf(_key) >= 0) {
return k.propName;
}
@ -56,8 +64,19 @@ const getPropName = (key) => {
throw new Error(key);
};
module.exports = class KustoConnectionStringBuilder {
constructor(connectionString) {
export class KustoConnectionStringBuilder {
[prop: string]: string | boolean | ((info: UserCodeInfo) => void) | undefined;
dataSource?: string;
aadUserId?: string;
password?: string;
applicationClientId?: string;
applicationKey?: string;
applicationCertificate?: string;
applicationCertificateThumbprint?: string;
authorityId?: string;
AuthorizationCallback?: (info: UserCodeInfo) => void;
constructor(connectionString: string) {
if (!connectionString || connectionString.trim().length === 0) throw new Error("Missing connection string");
if (connectionString.endsWith("/") || connectionString.endsWith("\\")) {
@ -70,14 +89,14 @@ module.exports = class KustoConnectionStringBuilder {
this[KeywordMapping.authorityId.propName] = "common";
let params = connectionString.split(";");
for (let i = 0; i < params.length; i++) {
let kvp = params[i].split("=");
const params = connectionString.split(";");
for (const item of params) {
const kvp = item.split("=");
this[getPropName(kvp[0])] = kvp[1].trim();
}
}
static withAadUserPasswordAuthentication(connectionString, userId, password, authorityId) {
static withAadUserPasswordAuthentication(connectionString: string, userId: string, password: string, authorityId?: string) {
if (!userId || userId.trim().length == 0) throw new Error("Invalid user");
if (!password || password.trim().length == 0) throw new Error("Invalid password");
@ -89,7 +108,7 @@ module.exports = class KustoConnectionStringBuilder {
return kcsb;
}
static withAadApplicationKeyAuthentication(connectionString, aadAppId, appKey, authorityId) {
static withAadApplicationKeyAuthentication(connectionString: string, aadAppId: string, appKey: string, authorityId?: string) {
if (!aadAppId || aadAppId.trim().length == 0) throw new Error("Invalid app id");
if (!appKey || appKey.trim().length == 0) throw new Error("Invalid app key");
@ -101,7 +120,7 @@ module.exports = class KustoConnectionStringBuilder {
return kcsb;
}
static withAadApplicationCertificateAuthentication(connectionString, aadAppId, certificate, thumbprint, authorityId) {
static withAadApplicationCertificateAuthentication(connectionString: string, aadAppId: string, certificate: string, thumbprint: string, authorityId: string) {
if (!aadAppId || aadAppId.trim().length == 0) throw new Error("Invalid app id");
if (!certificate || certificate.trim().length == 0) throw new Error("Invalid certificate");
if (!thumbprint || thumbprint.trim().length == 0) throw new Error("Invalid thumbprint");
@ -116,7 +135,7 @@ module.exports = class KustoConnectionStringBuilder {
}
static withAadDeviceAuthentication(connectionString, authorityId, authCallback) {
static withAadDeviceAuthentication(connectionString: string, authorityId: string, authCallback?: (info: UserCodeInfo) => void) {
const kcsb = new KustoConnectionStringBuilder(connectionString);
kcsb[KeywordMapping.authorityId.propName] = authorityId;
kcsb.AuthorizationCallback = authCallback;
@ -124,8 +143,8 @@ module.exports = class KustoConnectionStringBuilder {
return kcsb;
}
// Notice: you can leave `msiEndpoint` and `clientId`
static withAadManagedIdentities(connectionString, msiEndpoint, clientId) {
// Notice: you can leave `msiEndpoint` and `clientId`
static withAadManagedIdentities(connectionString: string, msiEndpoint?: string, clientId?: string) {
const kcsb = new KustoConnectionStringBuilder(connectionString);
kcsb.msiEndpoint = msiEndpoint;
@ -144,7 +163,7 @@ module.exports = class KustoConnectionStringBuilder {
return kcsb;
}
static withAzLoginIdentity(connectionString) {
static withAzLoginIdentity(connectionString: string) {
const kcsb = new KustoConnectionStringBuilder(connectionString);
kcsb.azLoginIdentity = true;
@ -152,11 +171,13 @@ module.exports = class KustoConnectionStringBuilder {
return kcsb;
}
static withAccessToken(connectionString, accessToken) {
static withAccessToken(connectionString: string, accessToken?: string) {
const kcsb = new KustoConnectionStringBuilder(connectionString);
kcsb.accessToken = accessToken;
return kcsb;
}
};
}
export default KustoConnectionStringBuilder;

Просмотреть файл

@ -1,19 +1,21 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
const request = require("request");
// @ts-ignore
import request from "request";
const MSI_API_VERSION = "2018-02-01";
const MSI_FUNCTION_API_VERSION = "2017-09-01";
module.exports = function acquireToken(resource, msiEndpoint, msiClientId, msiSecret, callback) {
export default function acquireToken<T>(resource: string, msiEndpoint: string, msiClientId: string, msiSecret: string, callback: (error: string | null, token?: { tokenType: string; accessToken: string }) => T) {
let msiUri = `${msiEndpoint}/?resource=${resource}&api-version=${msiSecret ? MSI_FUNCTION_API_VERSION : MSI_API_VERSION}`;
if (msiClientId) {
msiUri += `&client_id=${msiClientId}`;
}
const headers = {};
const headers: any = {};
if (msiSecret) {
headers.Secret = msiSecret;
@ -23,7 +25,7 @@ module.exports = function acquireToken(resource, msiEndpoint, msiClientId, msiSe
method: "GET",
url: msiUri,
headers
}, (error, response, body) => {
}, (error: string | null, response: {statusCode: number, json: string, body: string}, body: any) => {
if (error) return callback(error);
if (response.statusCode < 200 || response.statusCode >= 400) {
@ -31,6 +33,6 @@ module.exports = function acquireToken(resource, msiEndpoint, msiClientId, msiSe
}
const tokenData = JSON.parse(body);
return callback(null, { tokenType: tokenData.token_type, accessToken: tokenData.access_token });
return callback(null, {tokenType: tokenData.token_type, accessToken: tokenData.access_token});
});
};

Просмотреть файл

@ -1,115 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
const moment = require("moment");
const WellKnownDataSet = {
PrimaryResult: "PrimaryResult",
QueryCompletionInformation: "QueryCompletionInformation",
TableOfContents: "TableOfContents",
QueryProperties: "QueryProperties"
};
module.exports.WellKnownDataSet = WellKnownDataSet;
const ValueParser = {
datetime: moment,
timespan: moment.duration,
DateTime: moment,
TimeSpan: moment.duration,
};
class KustoResultRow {
constructor(columns, row) {
this.columns = columns.sort((a, b) => a.ordinal - b.ordinal);
this.raw = row;
for (let col of this.columns) {
let parse = ValueParser[col.type];
this[col.name] = parse ? parse(row[col.ordinal]) : row[col.ordinal];
}
}
* values() {
for (let item in this.rows) {
yield item;
}
}
getValueAt(index) {
return this[this.columns[index].name];
}
toJson() {
let obj = {};
for (let col of this.columns) {
obj[col.name] = this[col.name];
}
return obj;
}
toString() {
return JSON.stringify(this.toJson());
}
}
module.exports.KustoResultRow = KustoResultRow;
class KustoResultColumn {
constructor(columnObj, ordinal) {
this.name = columnObj.ColumnName;
// TODO: should validate type? should coarse value to type?
this.type = columnObj.ColumnType || columnObj.DateType;
this.ordinal = ordinal;
}
}
module.exports.KustoResultColumn = KustoResultColumn;
module.exports.KustoResultTable = class KustoResultTable {
constructor(tableObj) {
this.name = tableObj.TableName;
if (tableObj.TableId !== undefined) {
this.id = tableObj.TableId;
}
if (tableObj.TableKind) {
this.kind = tableObj.TableKind;
}
this.columns = tableObj.Columns.map((item, index) => new KustoResultColumn(item, index));
this._rows = tableObj.Rows;
if (this._rows && this._rows.length > 0) {
for (let i = 0; i<tableObj.Rows.length; i++) {
Object.defineProperty(this, i, { get: () => new KustoResultRow(this.columns, this._rows[i])});
}
}
}
* rows() {
for (let row of this._rows) {
yield new KustoResultRow(this.columns, row);
}
}
toJson() {
let table = {};
table.name = this.name;
table.data = [];
for (let row of this.rows()) {
table.data.push(row.toJson());
}
return table;
}
toString() {
return JSON.stringify(this.toJson());
}
};

Просмотреть файл

@ -0,0 +1,142 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
import moment from "moment";
export enum WellKnownDataSet {
PrimaryResult = "PrimaryResult",
QueryCompletionInformation = "QueryCompletionInformation",
TableOfContents = "TableOfContents",
QueryProperties = "QueryProperties"
};
const ValueParser: { [fromString: string]: (typeof moment | typeof moment.duration) } = {
datetime: moment,
timespan: moment.duration,
DateTime: moment,
TimeSpan: moment.duration,
}
export interface Table {
TableKind?: string;
TableName: string;
TableId?: number;
Columns: Column[];
Rows: any[][];
}
interface Column {
ColumnName: string,
ColumnType?: string,
DateType?: string
}
export class KustoResultRow {
columns: KustoResultColumn[];
raw: any;
[column: string]: any;
constructor(columns: KustoResultColumn[], row: { [ord: number]: any }) {
this.columns = columns.sort((a, b) => a.ordinal - b.ordinal);
this.raw = row;
for (const col of this.columns) {
const parse = ValueParser[col.type as string];
this[col.name as string] = parse ? parse(row[col.ordinal]) : row[col.ordinal];
}
}
* values() {
// tslint:disable-next-line:forin
for (const item in this.rows) {
yield item;
}
}
getValueAt(index: number) {
return this[this.columns[index].name as string];
}
toJson() {
const obj: any = {};
for (const col of this.columns) {
obj[col.name as string] = this[col.name as string];
}
return obj;
}
toString() {
return JSON.stringify(this.toJson());
}
}
export class KustoResultColumn {
name: string | null
type: string | null;
ordinal: number;
constructor(columnObj: { ColumnName?: string, ColumnType?: string, DateType?: string }, ordinal: number) {
this.name = columnObj.ColumnName ?? null;
// TODO: should validate type? should coarse value to type?
this.type = (columnObj.ColumnType || columnObj.DateType) ?? null;
this.ordinal = ordinal;
}
}
export class KustoResultTable {
name: string;
id?: number;
kind?: string;
columns: KustoResultColumn[];
readonly _rows: any[];
[row: number]: any;
constructor(tableObj: Table) {
this.name = tableObj.TableName;
if (tableObj.TableId !== undefined) {
this.id = tableObj.TableId;
}
if (tableObj.TableKind) {
this.kind = tableObj.TableKind;
}
this.columns = tableObj.Columns.map((item, index) => new KustoResultColumn(item, index));
this._rows = tableObj.Rows;
if (this._rows && this._rows.length > 0) {
for (let i = 0; i < tableObj.Rows.length; i++) {
Object.defineProperty(this, i, {get: () => new KustoResultRow(this.columns, this._rows[i])});
}
}
}
* rows() {
for (const row of this._rows) {
yield new KustoResultRow(this.columns, row);
}
}
toJson() {
const table: any = {};
table.name = this.name;
table.data = [];
for (const row of this.rows()) {
table.data.push(row.toJson());
}
return table;
}
toString() {
return JSON.stringify(this.toJson());
}
}

Просмотреть файл

@ -1,10 +1,53 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
const { KustoResultTable, WellKnownDataSet } = require("./models");
import {KustoResultTable, Table, WellKnownDataSet} from "./models";
class KustoResponseDataSet {
constructor(tables) {
interface V2DataSetHeaderFrame {
FrameType: "DataSetHeader"
IsProgressive: boolean
Version: string
}
interface V2DataSetTableFrame extends Table {
FrameType: "DataTable"
TableId: number
TableName: string
TableKind: string
Columns: Column[]
Rows: any[][]
}
interface V2DataSetCompletionFrame {
FrameType: "DataSetCompletion"
HasErrors: boolean
Cancelled: boolean
}
type V2Frames = (V2DataSetHeaderFrame | V2DataSetTableFrame | V2DataSetCompletionFrame)[];
type V1 = { Tables: Table[] };
interface Column {
ColumnName: string
ColumnType: string
}
export abstract class KustoResponseDataSet {
tables: KustoResultTable[];
tableNames: string[];
primaryResults: KustoResultTable[];
statusTable?: KustoResultTable;
abstract dataSetCompletion: { HasErrors: boolean, OneApiErrors?: any[] } | null;
abstract getStatusColumn(): string;
abstract getErrorColumn(): string;
abstract getCridColumn(): string;
protected constructor(tables: Table[]) {
let _tables = tables;
if (!Array.isArray(tables)) {
@ -14,8 +57,8 @@ class KustoResponseDataSet {
this.tables = [];
this.tableNames = [];
this.primaryResults = [];
for (let table of _tables) {
let resultTable = new KustoResultTable(table);
for (const table of _tables) {
const resultTable = new KustoResultTable(table);
this.tables.push(resultTable);
this.tableNames.push(resultTable.name);
@ -33,8 +76,8 @@ class KustoResponseDataSet {
if (this.statusTable && this.statusTable._rows.length != 0) {
let minLevel = 4;
const errorColumn = this.constructor.getErrorColumn();
for (let row of this.statusTable.rows()) {
const errorColumn = this.getErrorColumn();
for (const row of this.statusTable.rows()) {
if (row[errorColumn] < 4) {
if (row[errorColumn] < minLevel) {
minLevel = row[errorColumn];
@ -45,7 +88,7 @@ class KustoResponseDataSet {
}
}
}
if (this.dataSetCompletion && this.dataSetCompletion["HasErrors"]) {
if (this.dataSetCompletion && this.dataSetCompletion.HasErrors) {
errors += 1;
}
@ -56,18 +99,18 @@ class KustoResponseDataSet {
const result = [];
if (this.statusTable && this.statusTable._rows.length != 0) {
const errorColumn = this.constructor.getErrorColumn();
const cridColumn = this.constructor.getCridColumn();
const statusColumn = this.constructor.getStatusColumn();
for (let row of this.statusTable.rows()) {
const errorColumn = this.getErrorColumn();
const cridColumn = this.getCridColumn();
const statusColumn = this.getStatusColumn();
for (const row of this.statusTable.rows()) {
if (row[errorColumn] < 4) {
result.push(`Please provide the following data to Kusto: CRID=${row[cridColumn]} Description: ${row[statusColumn]}`);
}
}
}
if (this.dataSetCompletion && this.dataSetCompletion["HasErrors"]) {
for (let row of this.dataSetCompletion["OneApiErrors"]) {
result.push( row["error"]["@message"]);
if (this.dataSetCompletion && this.dataSetCompletion.HasErrors && this.dataSetCompletion.OneApiErrors) {
for (const row of this.dataSetCompletion.OneApiErrors) {
result.push(row.error["@message"]);
}
}
return result;
@ -75,12 +118,23 @@ class KustoResponseDataSet {
}
// TODO: should only expose 1 response type, versioning should be handled internally
module.exports.KustoResponseDataSetV1 = class KustoResponseDataSetV1 extends KustoResponseDataSet {
static getStatusColumn() { return "StatusDescription"; }
static getCridColumn() { return "ClientActivityId"; }
static getErrorColumn() { return "Severity"; }
export class KustoResponseDataSetV1 extends KustoResponseDataSet {
version: string;
dataSetCompletion: null = null;
static getTablesKinds() {
getStatusColumn() {
return "StatusDescription";
}
getCridColumn() {
return "ClientActivityId";
}
getErrorColumn() {
return "Severity";
}
static getTablesKinds(): { [name: string]: WellKnownDataSet } {
return {
"QueryResult": WellKnownDataSet.PrimaryResult,
"QueryProperties": WellKnownDataSet.QueryProperties,
@ -88,7 +142,7 @@ module.exports.KustoResponseDataSetV1 = class KustoResponseDataSetV1 extends Kus
};
}
constructor(data) {
constructor(data: V1) {
super(data.Tables);
if (this.tables.length <= 2) {
@ -108,26 +162,38 @@ module.exports.KustoResponseDataSetV1 = class KustoResponseDataSetV1 extends Kus
toc.kind = WellKnownDataSet.TableOfContents;
toc.id = this.tables.length - 1;
for (let i = 0; i < this.tables.length - 1; i++) {
this.tables[i].name = toc[i]["Name"];
this.tables[i].id = toc[i]["Id"];
this.tables[i].kind = KustoResponseDataSetV1.getTablesKinds()[toc[i]["Kind"]];
this.tables[i].name = toc[i].Name;
this.tables[i].id = toc[i].Id;
this.tables[i].kind = KustoResponseDataSetV1.getTablesKinds()[toc[i].Kind];
}
}
this.version = "1.0";
}
};
}
// TODO: should only expose 1 response type, versioning should be handled internally
module.exports.KustoResponseDataSetV2 = class KustoResponseDataSetV2 extends KustoResponseDataSet {
static getStatusColumn() { return "Payload"; }
static getErrorColumn() { return "Level"; }
static getCridColumn() { return "ClientRequestId"; }
export class KustoResponseDataSetV2 extends KustoResponseDataSet {
dataSetHeader: V2DataSetHeaderFrame | null;
dataSetCompletion: V2DataSetCompletionFrame | null;
version: string;
constructor(data) {
let dataTables = [];
let dataSetHeader;
let dataSetCompletion;
getStatusColumn() {
return "Payload";
}
getErrorColumn() {
return "Level";
}
getCridColumn() {
return "ClientRequestId";
}
constructor(data: V2Frames) {
const dataTables: V2DataSetTableFrame[] = [];
let dataSetHeader: V2DataSetHeaderFrame | null = null;
let dataSetCompletion: V2DataSetCompletionFrame | null = null;
data.forEach(frame => {
switch (frame.FrameType) {
case "DataTable":

Просмотреть файл

@ -1,155 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
const { AuthenticationContext } = require("adal-node");
const acquireManagedIdentityToken = require("./managedIdentitiesClient");
const azLoginIndentityToken = require("./azLoginIdentityClient");
const AuthenticationMethod = Object.freeze({
username: 0,
appKey: 1,
appCertificate: 2,
deviceLogin: 3,
managedIdentities: 4,
azLogin: 5,
accessToken: 6,
});
module.exports = class AadHelper {
constructor(kcsb) {
this.token = {};
const authority = kcsb.authorityId || "common";
let url;
// support node compatibility
try {
url = new URL(kcsb.dataSource);
} catch (e) {
const URL = require("url").URL;
url = new URL(kcsb.dataSource);
}
const aadAuthorityUri = process.env.AadAuthorityUri;
const fullAuthorityUri = aadAuthorityUri ?
aadAuthorityUri + (aadAuthorityUri.endsWith("/") ? "" : "/") + authority
: `https://login.microsoftonline.com/${authority}`;
this.kustoCluster = `${url.protocol}//${url.hostname}`;
this.adalContext = new AuthenticationContext(fullAuthorityUri);
if (!!kcsb.aadUserId && !!kcsb.password) {
this.authMethod = AuthenticationMethod.username;
this.clientId = "db662dc1-0cfe-4e1c-a843-19a68e65be58";
this.username = kcsb.aadUserId;
this.password = kcsb.password;
} else if (!!kcsb.applicationClientId && !!kcsb.applicationKey) {
this.authMethod = AuthenticationMethod.appKey;
this.clientId = kcsb.applicationClientId;
this.clientSecret = kcsb.applicationKey;
} else if (!!kcsb.applicationClientId &&
!!kcsb.applicationCertificate && !!kcsb.applicationCertificateThumbprint) {
this.authMethod = AuthenticationMethod.appCertificate;
this.clientId = kcsb.applicationClientId;
this.certificate = kcsb.applicationCertificate;
this.thumbprint = kcsb.applicationCertificateThumbprint;
} else if (kcsb.managedIdentity) {
this.authMethod = AuthenticationMethod.managedIdentities;
this.msiEndpoint = kcsb.msiEndpoint;
this.msiSecret = kcsb.msiSecret;
this.msiClientId = kcsb.msiClientId;
} else if (kcsb.azLoginIdentity) {
this.authMethod = AuthenticationMethod.azLogin;
} else if (kcsb.accessToken) {
this.authMethod = AuthenticationMethod.accessToken;
this.accessToken = kcsb.accessToken;
} else {
this.authMethod = AuthenticationMethod.deviceLogin;
this.clientId = "db662dc1-0cfe-4e1c-a843-19a68e65be58";
this.authCallback = kcsb.AuthorizationCallback;
}
}
_getAuthHeader() {
return new Promise((resolve, reject) => {
this._getAuthHeaderWithCallback((error, authHeader) => {
if (error) {
reject(error);
} else {
resolve(authHeader);
}
});
});
}
_getAuthHeaderWithCallback(cb) {
let resource = this.kustoCluster;
let formatHeader = ({ tokenType, accessToken }) => `${tokenType} ${accessToken}`;
switch (this.authMethod) {
case AuthenticationMethod.username:
return this.adalContext.acquireTokenWithUsernamePassword(
resource, this.username, this.password, this.clientId, (err, tokenResponse) => {
return cb(err, tokenResponse && formatHeader(tokenResponse));
}
);
case AuthenticationMethod.appKey:
return this.adalContext.acquireTokenWithClientCredentials(
resource, this.clientId, this.clientSecret, (err, tokenResponse) => {
return cb(err, tokenResponse && formatHeader(tokenResponse));
}
);
case AuthenticationMethod.appCertificate:
return this.adalContext.acquireTokenWithClientCertificate(
resource, this.clientId, this.certificate, this.thumbprint, (err, tokenResponse) => {
return cb(err, tokenResponse && formatHeader(tokenResponse));
}
);
case AuthenticationMethod.deviceLogin:
return this.adalContext.acquireUserCode(resource, this.clientId, null, (err, tokenResponse) => {
if (err) {
return cb(err);
} else {
if (this.authCallback) {
this.authCallback(tokenResponse);
} else {
console.log(tokenResponse.message);
}
return this.adalContext.acquireTokenWithDeviceCode(resource, this.clientId, tokenResponse, (err, tokenResponse) => {
if (err) {
return cb(err);
}
return cb(err, tokenResponse && formatHeader(tokenResponse));
});
}
});
case AuthenticationMethod.managedIdentities:
return acquireManagedIdentityToken(
resource, this.msiEndpoint, this.msiClientId, this.msiSecret, (err, tokenResponse) => {
if (err) {
return cb(err);
}
return cb(err, tokenResponse && formatHeader(tokenResponse));
}
);
case AuthenticationMethod.azLogin:
return azLoginIndentityToken(resource, (err, tokenResponse) => {
if(err) {
return cb(err);
}
return cb(err, tokenResponse && formatHeader(tokenResponse));
});
case AuthenticationMethod.accessToken:
return cb(undefined, `Bearer ${this.accessToken}`);
default:
return cb("Couldn't Authenticate, something went wrong trying to choose authentication method");
}
}
};

Просмотреть файл

@ -0,0 +1,234 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
import {AuthenticationContext, TokenResponse, UserCodeInfo} from "adal-node";
import acquireManagedIdentityToken from "./managedIdentitiesClient";
import azLoginIdentityToken from "./azLoginIdentityClient";
import KustoConnectionStringBuilder from "./connectionBuilder";
enum AuthenticationMethod {
username = 0,
appKey = 1,
appCertificate = 2,
deviceLogin = 3,
managedIdentities = 4,
azLogin = 5,
accessToken = 6,
}
interface UsernameMethod {
authMethod: AuthenticationMethod.username;
clientId: string;
username: string;
password: string;
}
interface AppKeyMethod {
authMethod: AuthenticationMethod.appKey;
clientId: string;
clientSecret: string;
}
interface AppCertificateMethod {
authMethod: AuthenticationMethod.appCertificate;
clientId: string;
certificate: string;
thumbprint: string;
}
interface AppManagedIdentityMethod {
authMethod: AuthenticationMethod.managedIdentities;
msiEndpoint: string;
msiSecret: string;
msiClientId: string;
}
interface AzLoginMethod {
authMethod: AuthenticationMethod.azLogin;
}
interface AccessTokenMethod {
authMethod: AuthenticationMethod.accessToken;
accessToken: string;
}
interface DeviceLoginMethod {
authMethod: AuthenticationMethod.deviceLogin;
clientId: string;
authCallback: (info: UserCodeInfo) => void;
}
type Method =
UsernameMethod
| AppKeyMethod
| AppCertificateMethod
| AppManagedIdentityMethod
| AzLoginMethod
| AccessTokenMethod
| DeviceLoginMethod;
export class AadHelper {
token: {};
kustoCluster: string;
adalContext: AuthenticationContext;
method: Method;
constructor(kcsb: KustoConnectionStringBuilder) {
this.token = {};
const authority = kcsb.authorityId || "common";
let url;
if (!kcsb.dataSource) {
throw new Error("Invalid string builder - missing dataSource");
}
// support node compatibility
try {
url = new URL(kcsb.dataSource); // CHANGE
} catch (e) {
const URL = require("url").URL;
url = new URL(kcsb.dataSource);
}
const aadAuthorityUri = process.env.AadAuthorityUri;
const fullAuthorityUri = aadAuthorityUri ?
aadAuthorityUri + (aadAuthorityUri.endsWith("/") ? "" : "/") + authority
: `https://login.microsoftonline.com/${authority}`;
this.kustoCluster = `${url.protocol}//${url.hostname}`;
this.adalContext = new AuthenticationContext(fullAuthorityUri);
if (!!kcsb.aadUserId && !!kcsb.password) {
this.method = {
authMethod: AuthenticationMethod.username,
clientId: "db662dc1-0cfe-4e1c-a843-19a68e65be58",
username: kcsb.aadUserId,
password: kcsb.password,
}
} else if (!!kcsb.applicationClientId && !!kcsb.applicationKey) {
this.method = {
authMethod: AuthenticationMethod.appKey,
clientId: kcsb.applicationClientId,
clientSecret: kcsb.applicationKey,
}
} else if (!!kcsb.applicationClientId &&
!!kcsb.applicationCertificate && !!kcsb.applicationCertificateThumbprint) {
this.method = {
authMethod: AuthenticationMethod.appCertificate,
clientId: kcsb.applicationClientId,
certificate: kcsb.applicationCertificate,
thumbprint: kcsb.applicationCertificateThumbprint
}
} else if (kcsb.managedIdentity) {
this.method = {
authMethod: AuthenticationMethod.managedIdentities,
msiEndpoint: kcsb.msiEndpoint as string,
msiSecret: kcsb.msiSecret as string,
msiClientId: kcsb.msiClientId as string
}
} else if (kcsb.azLoginIdentity) {
this.method = {authMethod: AuthenticationMethod.azLogin}
} else if (kcsb.accessToken) {
this.method = {
authMethod: AuthenticationMethod.accessToken,
accessToken: kcsb.accessToken as string
}
} else {
this.method = {
authMethod: AuthenticationMethod.deviceLogin,
clientId: "db662dc1-0cfe-4e1c-a843-19a68e65be58",
authCallback: kcsb.AuthorizationCallback as (info: UserCodeInfo) => void
}
}
}
_getAuthHeader(): Promise<string> {
return new Promise((resolve, reject) => {
this._getAuthHeaderWithCallback((error, authHeader) => {
if (error) {
reject(error);
} else {
resolve(authHeader as string);
}
});
});
}
_getAuthHeaderWithCallback(cb: (e: string | Error | null | undefined, token?: string) => any) {
const resource = this.kustoCluster;
const formatHeader = ({
tokenType,
accessToken
}: TokenResponse) => `${tokenType} ${accessToken}`;
switch (this.method.authMethod) {
case AuthenticationMethod.username:
return this.adalContext.acquireTokenWithUsernamePassword(
resource, this.method.username, this.method.password, this.method.clientId, (err, tokenResponse) => {
return cb(err, tokenResponse && formatHeader(tokenResponse as TokenResponse));
}
);
case AuthenticationMethod.appKey:
return this.adalContext.acquireTokenWithClientCredentials(
resource, this.method.clientId, this.method.clientSecret, (err, tokenResponse) => {
return cb(err, tokenResponse && formatHeader(tokenResponse as TokenResponse));
}
);
case AuthenticationMethod.appCertificate:
return this.adalContext.acquireTokenWithClientCertificate(
resource, this.method.clientId, this.method.certificate, this.method.thumbprint, (err, tokenResponse) => {
return cb(err, tokenResponse && formatHeader(tokenResponse as TokenResponse));
}
);
case AuthenticationMethod.deviceLogin:
return this.adalContext.acquireUserCode(resource, this.method.clientId, "", (err, tokenResponse) => {
this.method = (this.method as DeviceLoginMethod);
if (err) {
return cb(err);
} else {
if (this.method.authCallback) {
this.method.authCallback(tokenResponse);
} else {
// tslint:disable-next-line:no-console
console.log(tokenResponse.message);
}
return this.adalContext.acquireTokenWithDeviceCode(resource, this.method.clientId, tokenResponse, (innerError, innerResponse) => {
if (innerError) {
return cb(innerError);
}
return cb(innerError, innerResponse && formatHeader(innerResponse as TokenResponse));
});
}
});
case AuthenticationMethod.managedIdentities:
return acquireManagedIdentityToken(
resource, this.method.msiEndpoint, this.method.msiClientId, this.method.msiSecret, (err, tokenResponse) => {
if (err) {
return cb(err);
}
return cb(err, tokenResponse && formatHeader(tokenResponse as TokenResponse));
}
);
case AuthenticationMethod.azLogin:
return azLoginIdentityToken(resource, (err, tokenResponse) => {
if (err) {
return cb(err);
}
return cb(err, tokenResponse && formatHeader(tokenResponse as TokenResponse));
});
case AuthenticationMethod.accessToken:
return cb(undefined, `Bearer ${this.method.accessToken}`);
default:
return cb("Couldn't Authenticate, something went wrong trying to choose authentication method");
}
}
};
export default AadHelper;

Просмотреть файл

@ -1,168 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
const assert = require("assert");
const v2Response = require("./data/response/v2");
const v2ResponseError = require("./data/response/v2error");
const v1Response = require("./data/response/v1");
const v1_2Response = require("./data/response/v1_2");
const uuidv4 = require("uuid/v4");
const moment = require("moment");
const sinon = require("sinon");
const KustoClient = require("../source/client");
const KustoClientRequestProperties = require("../source/clientRequestProperties");
const ExecutionType = Object.freeze({
Mgmt: 0,
Query: 1,
Ingest: 2
});
describe("KustoClient", function () {
describe("#constructor", function () {
it("valid", function () {
let url = "https://cluster.kusto.windows.net";
let client = new KustoClient(url);
assert.equal(client.connectionString.authorityId, "common");
assert.equal(client.connectionString.dataSource, url);
assert.equal(client.aadHelper.authMethod, 3);
assert.equal(client.aadHelper.kustoCluster, url);
});
});
describe("#_parseResponse()", function () {
it("valid v1", function () {
let url = "https://cluster.kusto.windows.net";
let client = new KustoClient(url);
const response = client._parseResponse(v1Response, ExecutionType.Mgmt);
assert.equal(response.version, "1.0");
});
it("valid v1 more data", function () {
let url = "https://cluster.kusto.windows.net";
let client = new KustoClient(url);
const response = client._parseResponse(v1_2Response, ExecutionType.Mgmt);
assert.equal(response.version, "1.0");
});
it("valid v2", function () {
let url = "https://cluster.kusto.windows.net";
let client = new KustoClient(url);
const response = client._parseResponse(v2Response, ExecutionType.Query);
assert.equal(response.version, "2.0");
});
it("valid v2 raw", function () {
let url = "https://cluster.kusto.windows.net";
let client = new KustoClient(url);
const response = client._parseResponse(v2Response, ExecutionType.Query, { raw: true });
assert.equal(response, v2Response);
});
it("malformed body", function () {
let url = "https://cluster.kusto.windows.net";
let client = new KustoClient(url);
try{
const response = client._parseResponse({}, ExecutionType.Query);
}
catch(ex){
assert.equal(ex, "Failed to parse response ({undefined}) with the following error [TypeError: data.forEach is not a function].");
return;
}
assert.fail();
});
it("erred v2 not partial", function () {
let url = "https://cluster.kusto.windows.net";
let client = new KustoClient(url);
try{
const response = client._parseResponse(v2ResponseError, ExecutionType.Query);
}
catch(ex){
assert.equal(ex.startsWith("Kusto request had errors"), true);
return;
}
assert.fail();
});
it("setTimout for request", async function () {
let url = "https://cluster.kusto.windows.net";
let client = new KustoClient(url);
let clientRequestProps = new KustoClientRequestProperties();
let timeoutMs = moment.duration(2.51, "minutes").asMilliseconds();
clientRequestProps.setTimeout(timeoutMs);
client.aadHelper._getAuthHeader = () => { return "MockToken" };
client._doRequest = (endpoint, executionType, headers, payload, timeout, properties) => {
let payloadObj = JSON.parse(payload);
assert.equal(payloadObj.properties.Options.servertimeout, "00:02:30.6");
assert.equal(timeout, timeoutMs + moment.duration(0.5, "minutes").asMilliseconds());
};
await client.execute("Database", "Table | count", clientRequestProps);
});
it("setClientTimout for request", async function () {
let url = "https://cluster.kusto.windows.net";
let client = new KustoClient(url);
let clientRequestProps = new KustoClientRequestProperties();
let timeoutMs = moment.duration(2.51, "minutes").asMilliseconds();
clientRequestProps.setClientTimeout(timeoutMs);
client.aadHelper._getAuthHeader = () => { return "MockToken" };
client._doRequest = (endpoint, executionType, headers, payload, timeout, properties) => {
let payloadObj = JSON.parse(payload);
assert.equal(timeout, timeoutMs);
};
await client.execute("Database", "Table | count", clientRequestProps);
});
it("default timeout for query", async function () {
let url = "https://cluster.kusto.windows.net";
let client = new KustoClient(url);
client.aadHelper._getAuthHeader = () => { return "MockToken" };
client._doRequest = (endpoint, executionType, headers, payload, timeout, properties) => {
assert.equal(timeout, moment.duration(4.5, "minutes").asMilliseconds());
};
await client.execute("Database", "Table | count", () => { });
});
it("default timeout for admin", async function () {
let url = "https://cluster.kusto.windows.net";
let client = new KustoClient(url);
client.aadHelper._getAuthHeader = () => { return "MockToken" };
client._doRequest = (endpoint, executionType, headers, payload, timeout, properties) => {
assert.equal(timeout, moment.duration(10.5, "minutes").asMilliseconds());
};
await client.execute("Database", ".show database DataBase schema");
});
it("set clientRequestId for request", async function () {
let url = "https://cluster.kusto.windows.net";
let client = new KustoClient(url);
const clientRequestId = `MyApp.MyActivity;${uuidv4()}`;
let clientRequestProps = new KustoClientRequestProperties();
clientRequestProps.clientRequestId = clientRequestId;
client.aadHelper._getAuthHeader = () => { return "MockToken" };
client._doRequest = (endpoint, executionType, headers, payload, timeout, properties) => {
assert.equal(headers["x-ms-client-request-id"], clientRequestId);
};
await client.execute("Database", "Table | count", clientRequestProps);
});
});
});

Просмотреть файл

@ -0,0 +1,185 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
import assert from "assert";
// tslint:disable-next-line:no-var-requires
import uuid from "uuid";
import moment from "moment";
import {KustoClient} from "../source/client";
import {ClientRequestProperties} from "../source/clientRequestProperties";
import {
KustoResponseDataSet,
KustoResponseDataSetV1,
KustoResponseDataSetV2
} from "../source/response";
// tslint:disable-next-line:no-var-requires
const v2Response = require("./data/response/v2");
// tslint:disable-next-line:no-var-requires
const v2ResponseError = require("./data/response/v2error");
// tslint:disable-next-line:no-var-requires
const v1Response = require("./data/response/v1");
// tslint:disable-next-line:no-var-requires variable-name
const v1_2Response = require("./data/response/v1_2");
const ExecutionType = Object.freeze({
Mgmt: 0,
Query: 1,
Ingest: 2
});
describe("KustoClient", function () {
describe("#constructor", function () {
it("valid", function () {
const url = "https://cluster.kusto.windows.net";
const client = new KustoClient(url);
assert.equal(client.connectionString.authorityId, "common");
assert.equal(client.connectionString.dataSource, url);
assert.equal(client.aadHelper.method.authMethod, 3);
assert.equal(client.aadHelper.kustoCluster, url);
});
});
describe("#_parseResponse()", function () {
it("valid v1", function () {
const url = "https://cluster.kusto.windows.net";
const client = new KustoClient(url);
const response = client._parseResponse(v1Response, ExecutionType.Mgmt);
assert.equal((response as KustoResponseDataSetV1).version, "1.0");
});
it("valid v1 more data", function () {
const url = "https://cluster.kusto.windows.net";
const client = new KustoClient(url);
const response = client._parseResponse(v1_2Response, ExecutionType.Mgmt);
assert.equal((response as KustoResponseDataSetV1).version, "1.0");
});
it("valid v2", function () {
const url = "https://cluster.kusto.windows.net";
const client = new KustoClient(url);
const response = client._parseResponse(v2Response, ExecutionType.Query);
assert.equal((response as KustoResponseDataSetV2).version, "2.0");
});
it("valid v2 raw", function () {
const url = "https://cluster.kusto.windows.net";
const client = new KustoClient(url);
const response = client._parseResponse(v2Response, ExecutionType.Query, { raw: true } as ClientRequestProperties);
assert.equal(response, v2Response);
});
it("malformed body", function () {
const url = "https://cluster.kusto.windows.net";
const client = new KustoClient(url);
try{
const response = client._parseResponse({}, ExecutionType.Query);
}
catch(ex){
ex.message.startsWith("Failed to parse response ({undefined}) with the following error [TypeError: data.forEach is not a function].");
return;
}
assert.fail();
});
it("erred v2 not partial", function () {
const url = "https://cluster.kusto.windows.net";
const client = new KustoClient(url);
try{
const response = client._parseResponse(v2ResponseError, ExecutionType.Query);
}
catch(ex){
assert.equal(ex.message.startsWith("Kusto request had errors"), true);
return;
}
assert.fail();
});
it("setTimout for request", async function () {
const url = "https://cluster.kusto.windows.net";
const client = new KustoClient(url);
const clientRequestProps = new ClientRequestProperties();
const timeoutMs = moment.duration(2.51, "minutes").asMilliseconds();
clientRequestProps.setTimeout(timeoutMs);
client.aadHelper._getAuthHeader = () => { return Promise.resolve("MockToken") };
client._doRequest = (endpoint, executionType, headers, payload, timeout, properties) => {
const payloadObj = JSON.parse(payload);
assert.equal(payloadObj.properties.Options.servertimeout, "00:02:30.6");
assert.equal(timeout, timeoutMs + moment.duration(0.5, "minutes").asMilliseconds());
return Promise.resolve(new KustoResponseDataSetV2([]));
};
await client.execute("Database", "Table | count", clientRequestProps);
});
it("setClientTimout for request", async function () {
const url = "https://cluster.kusto.windows.net";
const client = new KustoClient(url);
const clientRequestProps = new ClientRequestProperties();
const timeoutMs = moment.duration(2.51, "minutes").asMilliseconds();
clientRequestProps.setClientTimeout(timeoutMs);
client.aadHelper._getAuthHeader = () => { return Promise.resolve("MockToken") };
client._doRequest = (endpoint, executionType, headers, payload, timeout, properties) => {
const payloadObj = JSON.parse(payload);
assert.equal(timeout, timeoutMs);
return Promise.resolve(new KustoResponseDataSetV2([]));
};
await client.execute("Database", "Table | count", clientRequestProps);
});
it("default timeout for query", async function () {
const url = "https://cluster.kusto.windows.net";
const client = new KustoClient(url);
client.aadHelper._getAuthHeader = () => { return Promise.resolve("MockToken") };
client._doRequest = (endpoint, executionType, headers, payload, timeout, properties) => {
assert.equal(timeout, moment.duration(4.5, "minutes").asMilliseconds());
return Promise.resolve(new KustoResponseDataSetV2([]));
};
await client.execute("Database", "Table | count");
});
it("default timeout for admin", async function () {
const url = "https://cluster.kusto.windows.net";
const client = new KustoClient(url);
client.aadHelper._getAuthHeader = () => { return Promise.resolve("MockToken") };
client._doRequest = (endpoint, executionType, headers, payload, timeout, properties) => {
assert.equal(timeout, moment.duration(10.5, "minutes").asMilliseconds());
return Promise.resolve(new KustoResponseDataSetV2([]));
};
await client.execute("Database", ".show database DataBase schema");
});
it("set clientRequestId for request", async function () {
const url = "https://cluster.kusto.windows.net";
const client = new KustoClient(url);
const clientRequestId = `MyApp.MyActivity;${uuid.v4()}`;
const clientRequestProps = new ClientRequestProperties();
clientRequestProps.clientRequestId = clientRequestId;
client.aadHelper._getAuthHeader = () => { return Promise.resolve("MockToken") };
client._doRequest = (endpoint, executionType, headers, payload, timeout, properties) => {
assert.equal(headers["x-ms-client-request-id"], clientRequestId);
return Promise.resolve(new KustoResponseDataSetV2([]));
};
await client.execute("Database", "Table | count", clientRequestProps);
});
});
});

Просмотреть файл

@ -1,14 +1,16 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
const assert = require("assert");
const uuidv4 = require("uuid/v4");
import assert from "assert";
import uuid from "uuid";
import {KustoConnectionStringBuilder} from "../source/connectionBuilder";
const KustoConnectionStringBuilder = require("../source/connectionBuilder");
describe("KustoConnectionStringBuilder", function () {
describe("#constructor(connectionString)", function () {
it("from string with no creds", function () {
let kcsbs = [
const kcsbs = [
new KustoConnectionStringBuilder("localhost"),
new KustoConnectionStringBuilder("data Source=localhost"),
new KustoConnectionStringBuilder("Addr=localhost"),
@ -16,11 +18,11 @@ describe("KustoConnectionStringBuilder", function () {
KustoConnectionStringBuilder.withAadDeviceAuthentication("localhost", "common"),
];
for (let kcsb of kcsbs) {
for (const kcsb of kcsbs) {
assert.equal(kcsb.dataSource, "localhost");
assert.equal(kcsb.authorityId, "common");
let emptyFields = ["aadUserId", "password", "applicationClientId", "applicationKey"];
for (let field of emptyFields) {
const emptyFields = ["aadUserId", "password", "applicationClientId", "applicationKey"];
for (const field of emptyFields) {
assert.equal(kcsb[field], null);
}
}
@ -41,13 +43,13 @@ describe("KustoConnectionStringBuilder", function () {
kcsb1.password = expectedPassword;
kcsbs.push(kcsb1);
for (let kcsb of kcsbs) {
for (const kcsb of kcsbs) {
assert.equal(kcsb.dataSource, "localhost");
assert.equal(kcsb.aadUserId, expectedUser);
assert.equal(kcsb.password, expectedPassword);
assert.equal(kcsb.authorityId, "common");
let emptyFields = ["applicationClientId", "applicationKey"];
for (let field of emptyFields) {
const emptyFields = ["applicationClientId", "applicationKey"];
for (const field of emptyFields) {
assert.equal(kcsb[field], null);
}
}
@ -55,35 +57,35 @@ describe("KustoConnectionStringBuilder", function () {
it("from string with app auth", function () {
const uuid = uuidv4();
const uuidv4 = uuid.v4();
const key = "key of application";
let kcsbs = [
new KustoConnectionStringBuilder(`localhost;Application client Id=${uuid};application Key=${key}`),
new KustoConnectionStringBuilder(`Data Source=localhost ; Application Client Id=${uuid}; Appkey =${key}`),
new KustoConnectionStringBuilder(` Addr = localhost ; AppClientId = ${uuid} ; AppKey =${key}`),
new KustoConnectionStringBuilder(`Network Address = localhost; AppClientId = ${uuid} ; AppKey =${key}`),
KustoConnectionStringBuilder.withAadApplicationKeyAuthentication("localhost", uuid, key)
const kcsbs = [
new KustoConnectionStringBuilder(`localhost;Application client Id=${uuidv4};application Key=${key}`),
new KustoConnectionStringBuilder(`Data Source=localhost ; Application Client Id=${uuidv4}; Appkey =${key}`),
new KustoConnectionStringBuilder(` Addr = localhost ; AppClientId = ${uuidv4} ; AppKey =${key}`),
new KustoConnectionStringBuilder(`Network Address = localhost; AppClientId = ${uuidv4} ; AppKey =${key}`),
KustoConnectionStringBuilder.withAadApplicationKeyAuthentication("localhost", uuidv4, key)
];
let kcsb1 = new KustoConnectionStringBuilder("server=localhost");
kcsb1.applicationClientId = uuid;
const kcsb1 = new KustoConnectionStringBuilder("server=localhost");
kcsb1.applicationClientId = uuidv4;
kcsb1.applicationKey = key;
kcsbs.push(kcsb1);
for (let kcsb of kcsbs) {
for (const kcsb of kcsbs) {
assert.equal(kcsb.dataSource, "localhost");
assert.equal(kcsb.applicationClientId, uuid);
assert.equal(kcsb.applicationClientId, uuidv4);
assert.equal(kcsb.applicationKey, key);
assert.equal(kcsb.authorityId, "common");
let emptyFields = ["aadUserId", "password"];
for (let field of emptyFields) {
const emptyFields = ["aadUserId", "password"];
for (const field of emptyFields) {
assert.equal(kcsb[field], null);
}
}
});
it("from string with managed identity", function () {
it("from string with managed identity", function () {
const kcsb1 = KustoConnectionStringBuilder.withAadManagedIdentities("https://dadubovs1.westus.kusto.windows.net");
assert.equal(kcsb1.msiEndpoint, "http://169.254.169.254/metadata/identity/oauth2/token");

Просмотреть файл

@ -1,11 +1,11 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
const assert = require("assert");
const moment = require("moment");
const { KustoResultTable, KustoResultColumn, KustoResultRow } = require("../source/models");
import assert from "assert";
import moment from "moment";
import {KustoResultColumn, KustoResultRow, KustoResultTable} from "../source/models";
// tslint:disable-next-line:no-var-requires
const v2Response = require("./data/response/v2.json");
describe("KustoResultRow", function () {
@ -64,31 +64,31 @@ describe("KustoResultRow", function () {
3493235670000
];
const reverseOrderColumns = rawColumns.slice().reverse();
const reverseOrderColumns = rawColumns.slice().reverse();
const actual = new KustoResultRow(
reverseOrderColumns.map((c, i) => new KustoResultColumn(c, rawColumns.length - i - 1)),
inputValues
);
let asJson = actual.toJson();
let expectedValues = [
moment(inputValues[0]),
const asJson = actual.toJson();
const expectedValues = [
moment(inputValues[0] as string),
inputValues[1],
inputValues[2],
inputValues[3],
inputValues[4],
moment(inputValues[5]),
moment(inputValues[5] as number),
];
for (let index = 0; index < inputColumns.length; index++) {
let actual = asJson[inputColumns[index].name];
for (let index = 0; index < inputColumns.length; index++) {
const currentActual = asJson[inputColumns[index].name as string];
if (inputColumns[index].type === "timespan") {
assert.equal(Number(actual), expectedValues[index]);
assert.equal(Number(currentActual), expectedValues[index]);
}
else if (typeof(actual) == "object") {
assert.equal(actual.toString(), expectedValues[index].toString());
else if (typeof(currentActual) == "object") {
assert.equal(currentActual.toString(), expectedValues[index].toString());
} else {
assert.equal(actual, expectedValues[index]);
assert.equal(currentActual, expectedValues[index]);
}
}
@ -156,7 +156,7 @@ describe("KustoResultRow", function () {
let i = 0;
for (let v of actual.values()) {
for (const v of actual.values()) {
assert.equal(v, inputValues[i]);
values.push(v);
i++;
@ -284,8 +284,8 @@ describe("KustoResultTable", function () {
it("iterate over rows", function () {
const actual = new KustoResultTable(v2Response[2]);
let rows = [];
for (let row of actual.rows()) {
const rows = [];
for (const row of actual.rows()) {
rows.push(row);
assert.equal(
JSON.stringify(row),

Просмотреть файл

@ -0,0 +1,15 @@
{
"compilerOptions": {
"target": "es6",
"module": "commonjs",
"esModuleInterop": true,
"strict": true,
"incremental": true,
"sourceMap": true,
"resolveJsonModule": true,
"allowJs": false,
"allowSyntheticDefaultImports": true,
"declaration": true
},
"include": ["source/**/*.ts", "test/**/*.ts", "index.ts"]
}

Просмотреть файл

@ -0,0 +1,18 @@
{
"defaultSeverity": "error",
"extends": [
"tslint:recommended"
],
"jsRules": {},
"rules": {
"max-classes-per-file": false,
"triple-equals": false,
"only-arrow-functions": false
},
"linterOptions": {
"exclude": [
"./test/**/*"
]
},
"rulesDirectory": []
}

Просмотреть файл

@ -0,0 +1,3 @@
{
"enable-source-maps": true
}

Просмотреть файл

@ -2,7 +2,7 @@
// Licensed under the MIT License.
const IngestClient = require("azure-kusto-ingest").IngestClient;
const IngestStatusQueues = require("azure-kusto-ingest").KustoIngestStatusQueues;
const IngestStatusQueues = require("azure-kusto-ingest").IngestStatusQueues;
const IngestionProps = require("azure-kusto-ingest").IngestionProperties;
const { ReportLevel, ReportMethod } = require("azure-kusto-ingest").IngestionPropertiesEnums;
const KustoConnectionStringBuilder = require("azure-kusto-data").KustoConnectionStringBuilder;
@ -115,7 +115,7 @@ async function startStreamingIngestion() {
}
// Ingest from stream with either ReadStream or StreamDescriptor
const stream = fs.createReadStream("file.json");
let stream = fs.createReadStream("file.json");
try {
await streamingIngestClient.ingestFromStream("file.json", props2);
console.log("Ingestion done");
@ -125,7 +125,7 @@ async function startStreamingIngestion() {
}
// For gzip data set StreamDescriptor.compressionType to CompressionType.GZIP
const stream = fs.createReadStream("file.json.gz");
stream = fs.createReadStream("file.json.gz");
const streamDescriptor = new StreamDescriptor(stream, "id", CompressionType.GZIP);
try {
await streamingIngestClient.ingestFromStream(streamDescriptor, props2);

Просмотреть файл

@ -1,30 +1,32 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
const client = require("./source/ingestClient");
const streamingIngestClient = require("./source/streamingIngestClient");
const KustoIngestStatusQueues = require("./source/status");
const {
import client from "./source/ingestClient";
import streamingIngestClient from "./source/streamingIngestClient";
import KustoIngestStatusQueues from "./source/status";
import {
CsvColumnMapping, DataFormat, IngestionMappingType,
IngestionProperties,
JsonColumnMapping,
CsvColumnMapping,
ValidationPolicy,
ReportLevel,
ReportMethod,
ValidationImplications,
ValidationOptions,
DataFormat,
IngestionMappingType
} = require("./source/ingestionProperties");
ValidationPolicy
} from "./source/ingestionProperties";
const {
import {
BlobDescriptor,
CompressionType,
FileDescriptor,
StreamDescriptor,
CompressionType
} = require("./source/descriptors");
StreamDescriptor
} from "./source/descriptors";
module.exports = {
const out = {
IngestClient: client,
StreamingIngestClient: streamingIngestClient,
IngestStatusQueues: KustoIngestStatusQueues,
@ -47,3 +49,5 @@ module.exports = {
CompressionType
}
};
export default out;

943
azure-kusto-ingest/package-lock.json сгенерированный

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -1,6 +1,6 @@
{
"name": "azure-kusto-ingest",
"version": "1.0.1",
"version": "2.0.0",
"description": "Azure Data Explorer Ingestion SDK",
"main": "index.js",
"engines": {
@ -17,10 +17,13 @@
"kusto"
],
"scripts": {
"example": "node example.js",
"lint": "eslint source --quiet",
"test": "mocha",
"allTests": "mocha --timeout 240000 --recursive"
"build": "npm link ../azure-kusto-data && tsc -b",
"prepublish": "npm run build",
"example": "npm run build && node example.js",
"lint": "npm run build && tslint --project tsconfig.json --quiet",
"test": "npm run build && mocha --require ts-node/register",
"e2e": "npm run build && mocha --require ts-node/register test/e2eTests/e2eTest.ts",
"allTests": "npm run build && mocha --timeout 240000 --recursive --require ts-node/register"
},
"author": "",
"license": "ISC",
@ -35,8 +38,15 @@
"uuid-validate": "0.0.3"
},
"devDependencies": {
"eslint": "^6.8.0",
"@types/mocha": "^8.2.0",
"@types/node": "^14.14.13",
"@types/sinon": "^9.0.9",
"@types/uuid": "^8.3.0",
"@types/uuid-validate": "0.0.1",
"mocha": "^7.2.0",
"sinon": "^7.2.3"
"sinon": "^7.2.3",
"ts-node": "^9.1.1",
"tslint": "^6.1.3",
"typescript": "^4.1.3"
}
}

Просмотреть файл

@ -0,0 +1,26 @@
import IngestionProperties from "./ingestionProperties";
import {FileDescriptor, StreamDescriptor} from "./descriptors";
import fs from "fs";
export abstract class AbstractKustoClient {
protected constructor(public defaultProps: IngestionProperties | null = null) {
}
_mergeProps(newProperties?: IngestionProperties | null): IngestionProperties {
// no default props
if (newProperties == null || Object.keys(newProperties).length == 0) {
return this.defaultProps || new IngestionProperties({});
}
// no new props
if (this.defaultProps == null || Object.keys(this.defaultProps).length == 0) {
return newProperties || new IngestionProperties({});
}
// both exist - merge
return this.defaultProps.merge(newProperties) || new IngestionProperties({});
}
abstract ingestFromStream(stream: StreamDescriptor | fs.ReadStream, ingestionProperties: IngestionProperties): Promise<any>;
abstract ingestFromFile(file: FileDescriptor | string, ingestionProperties: IngestionProperties): Promise<any>;
}

Просмотреть файл

@ -1,92 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
const fs = require("fs");
const path = require("path");
const zlib = require("zlib");
const uuidValidate = require("uuid-validate");
const uuidv4 = require("uuid/v4");
const CompressionType = Object.freeze({
ZIP : ".zip",
GZIP : ".gz",
None : ""
});
function getSourceId(sourceId){
if(sourceId){
if(!uuidValidate(sourceId, 4)){
throw Error("sourceId is not a valid uuid/v4");
}
return sourceId;
}
return uuidv4();
}
class FileDescriptor {
constructor(filePath, sourceId = null, size = null) {
this.filePath = filePath;
this.name = path.basename(this.filePath);
this.extension = path.extname(this.filePath).toLowerCase();
this.size = size;
this.zipped = this.extension === ".gz" || this.extension === ".zip";
this.sourceId = getSourceId(sourceId);
}
async _gzip() {
let zipper = zlib.createGzip();
let input = fs.createReadStream(this.filePath, { autoClose: true });
let output = fs.createWriteStream(this.filePath + ".gz");
await new Promise((resolve, reject) => {
input.pipe(zipper).pipe(output)
.on("error", (err) => {
reject(err);
});
output.once("close", function() {
resolve();
});
});
return this.filePath + ".gz";
}
async prepare() {
if(this.zipped){
if (this.size == null || this.size <= 0) {
this.size = fs.statSync(this.filePath).size * 11;
}
return this.filePath;
}
else{
await this._gzip();
if (this.size == null || this.size <= 0) {
this.size = fs.statSync(this.filePath).size;
}
return this.filePath + ".gz";
}
}
}
class StreamDescriptor {
constructor(stream, sourceId = null, compressionType = CompressionType.None) {
this.stream = stream;
this.name = "stream";
this.size = null;
this.compressionType = compressionType;
this.sourceId = getSourceId(sourceId);
}
}
class BlobDescriptor {
constructor(path, size = null, sourceId = null) {
this.path = path;
this.size = size;
this.sourceId = getSourceId(sourceId);
}
}
module.exports.FileDescriptor = FileDescriptor;
module.exports.BlobDescriptor = BlobDescriptor;
module.exports.StreamDescriptor = StreamDescriptor;
module.exports.CompressionType = CompressionType;

Просмотреть файл

@ -0,0 +1,96 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
import uuid from "uuid";
import uuidValidate from "uuid-validate";
import zlib from "zlib";
import pathlib from "path";
import fs, {ReadStream} from "fs";
export enum CompressionType {
ZIP= ".zip",
GZIP= ".gz",
None= "",
}
function getSourceId(sourceId: string | null): string {
if (sourceId) {
if (!uuidValidate(sourceId, 4)) {
throw Error("sourceId is not a valid uuid/v4");
}
return sourceId;
}
return uuid.v4();
}
export class FileDescriptor {
readonly name: string;
readonly extension: string;
size: number | null;
sourceId: string;
zipped: boolean;
constructor(readonly filePath: string, sourceId: string | null = null, size: number | null = null) {
this.name = pathlib.basename(this.filePath);
this.extension = pathlib.extname(this.filePath).toLowerCase();
this.size = size;
this.zipped = this.extension === ".gz" || this.extension === ".zip";
this.sourceId = getSourceId(sourceId);
}
async _gzip(): Promise<string> {
const zipper = zlib.createGzip();
const input = fs.createReadStream(this.filePath, {autoClose: true});
const output = fs.createWriteStream(this.filePath + ".gz");
await new Promise((resolve, reject) => {
input.pipe(zipper).pipe(output)
.on("error", (err) => {
reject(err);
});
output.once("close", function () {
resolve(null);
});
});
return this.filePath + ".gz";
}
async prepare(): Promise<string> {
if (this.zipped) {
if (this.size == null || this.size <= 0) {
this.size = fs.statSync(this.filePath).size * 11;
}
return this.filePath;
}
await this._gzip();
if (this.size == null || this.size <= 0) {
this.size = fs.statSync(this.filePath).size;
}
return this.filePath + ".gz";
}
}
export class StreamDescriptor {
name: string;
size: number | null;
compressionType: CompressionType;
sourceId: string;
constructor(readonly stream: ReadStream, sourceId: string | null = null, compressionType: CompressionType = CompressionType.None) {
this.name = "stream";
this.size = null;
this.compressionType = compressionType;
this.sourceId = getSourceId(sourceId);
}
}
export class BlobDescriptor {
size: number | null;
sourceId: string;
constructor(readonly path: string, size: number | null = null, sourceId: string | null = null) {
this.size = size;
this.sourceId = getSourceId(sourceId);
}
}

Просмотреть файл

@ -1,54 +1,54 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
const KustoClient = require("azure-kusto-data").Client;
const { FileDescriptor, BlobDescriptor, StreamDescriptor } = require("./descriptors");
const { ResourceManager } = require("./resourceManager");
const IngestionBlobInfo = require("./ingestionBlobInfo");
const { QueueClient } = require("@azure/storage-queue");
const { ContainerClient } = require("@azure/storage-blob");
// @ts-ignore
import {Client as KustoClient, KustoConnectionStringBuilder} from "azure-kusto-data";
module.exports = class KustoIngestClient {
constructor(kcsb, defaultProps) {
import {BlobDescriptor, CompressionType, FileDescriptor, StreamDescriptor} from "./descriptors";
import ResourceManager from "./resourceManager";
import IngestionBlobInfo from "./ingestionBlobInfo";
import {QueueClient, QueueSendMessageResponse} from "@azure/storage-queue";
import {ContainerClient} from "@azure/storage-blob";
import IngestionProperties from "./ingestionProperties";
import {ReadStream} from "fs";
import {AbstractKustoClient} from "./abstractKustoClient";
export class KustoIngestClient extends AbstractKustoClient{
resourceManager: ResourceManager;
constructor(kcsb: string | KustoConnectionStringBuilder, public defaultProps: IngestionProperties | null = null) {
super(defaultProps);
this.resourceManager = new ResourceManager(new KustoClient(kcsb));
this.defaultProps = defaultProps;
}
_mergeProps(newProperties) {
// no default props
if (newProperties == null || Object.keys(newProperties).length == 0) {
return this.defaultProps;
}
// no new props
if (this.defaultProps == null || Object.keys(this.defaultProps) == 0) {
return newProperties;
}
// both exist - merge
return this.defaultProps.merge(newProperties);
}
_getBlobNameSuffix(format, compressionType) {
_getBlobNameSuffix(format : string | null, compressionType: CompressionType) {
const formatSuffix = format ? `.${format}` : "";
return `${formatSuffix}${compressionType}`;
}
async _getBlockBlobClient(blobName){
async _getBlockBlobClient(blobName: string) {
const containers = await this.resourceManager.getContainers();
if (containers == null) {
throw new Error("Failed to get containers");
}
const container = containers[Math.floor(Math.random() * containers.length)];
const containerClient = new ContainerClient(container.getSASConnectionString(), container.objectName);
const blockBlobClient = containerClient.getBlockBlobClient(blobName);
return blockBlobClient;
return containerClient.getBlockBlobClient(blobName);
}
async ingestFromStream(stream, ingestionProperties) {
async ingestFromStream(stream: ReadStream | StreamDescriptor, ingestionProperties: IngestionProperties): Promise<QueueSendMessageResponse> {
const props = this._mergeProps(ingestionProperties);
props.validate();
const descriptor = stream instanceof StreamDescriptor ? stream : new StreamDescriptor(stream);
const blobName = `${props.database}__${props.table}__${descriptor.sourceId}` +
`${this._getBlobNameSuffix(props.format, descriptor.compressionType)}`;
`${this._getBlobNameSuffix(props.format ?? "", descriptor.compressionType)}`;
const blockBlobClient = await this._getBlockBlobClient(blobName);
await blockBlobClient.uploadStream(descriptor.stream);
@ -56,7 +56,7 @@ module.exports = class KustoIngestClient {
return this.ingestFromBlob(new BlobDescriptor(blockBlobClient.url), props); // descriptor.size?
}
async ingestFromFile(file, ingestionProperties) {
async ingestFromFile(file: string | FileDescriptor, ingestionProperties: IngestionProperties | null = null): Promise<QueueSendMessageResponse> {
const props = this._mergeProps(ingestionProperties);
props.validate();
@ -71,13 +71,18 @@ module.exports = class KustoIngestClient {
return this.ingestFromBlob(new BlobDescriptor(blockBlobClient.url, descriptor.size, descriptor.sourceId), props);
}
async ingestFromBlob(blob, ingestionProperties) {
async ingestFromBlob(blob: string | BlobDescriptor, ingestionProperties: IngestionProperties | null = null) : Promise<QueueSendMessageResponse> {
const props = this._mergeProps(ingestionProperties);
props.validate();
const descriptor = blob instanceof BlobDescriptor ? blob : new BlobDescriptor(blob);
let queues = await this.resourceManager.getIngestionQueues();
let authorizationContext = await this.resourceManager.getAuthorizationContext();
const queues = await this.resourceManager.getIngestionQueues();
if (queues == null)
{
throw new Error("Failed to get queues");
}
const authorizationContext = await this.resourceManager.getAuthorizationContext();
const queueDetails = queues[Math.floor(Math.random() * queues.length)];
@ -89,4 +94,6 @@ module.exports = class KustoIngestClient {
return queueClient.sendMessage(encoded);
}
};
}
export default KustoIngestClient;

Просмотреть файл

@ -1,27 +1,42 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
const uuidv4 = require("uuid/v4");
const moment = require("moment");
import uuid from "uuid";
import moment from "moment";
import {BlobDescriptor} from "./descriptors";
import IngestionProperties, {ReportLevel, ReportMethod} from "./ingestionProperties";
module.exports = class IngestionBlobInfo {
constructor(blobDescriptor, ingestionProperties, authContext) {
export class IngestionBlobInfo {
BlobPath: string;
RawDataSize: number | null;
DatabaseName: string | null;
TableName: string | null;
RetainBlobOnSuccess: boolean;
FlushImmediately: boolean;
IgnoreSizeLimit: boolean;
ReportLevel: ReportLevel | null;
ReportMethod: ReportMethod | null;
SourceMessageCreationTime: moment.Moment;
Id: string;
AdditionalProperties: { [additional: string]: any; };
constructor(blobDescriptor: BlobDescriptor, ingestionProperties: IngestionProperties, authContext: string | null = null) {
this.BlobPath = blobDescriptor.path;
this.RawDataSize = blobDescriptor.size;
this.DatabaseName = ingestionProperties.database;
this.TableName = ingestionProperties.table;
this.DatabaseName = ingestionProperties.database ?? null;
this.TableName = ingestionProperties.table ?? null;
this.RetainBlobOnSuccess = true;
this.FlushImmediately = !!ingestionProperties.flushImmediately;
this.IgnoreSizeLimit = false;
this.ReportLevel = ingestionProperties.reportLevel;
this.ReportMethod = ingestionProperties.reportMethod;
this.ReportLevel = ingestionProperties.reportLevel ?? null;
this.ReportMethod = ingestionProperties.reportMethod ?? null;
this.SourceMessageCreationTime = moment.utc();
this.Id = blobDescriptor.sourceId || uuidv4();
this.Id = blobDescriptor.sourceId || uuid.v4();
let additionalProperties = ingestionProperties.additionalProperties || {};
const additionalProperties = ingestionProperties.additionalProperties || {};
additionalProperties.authorizationContext = authContext;
let tags = [];
const tags: string[] = [];
if (ingestionProperties.additionalTags) {
tags.concat(ingestionProperties.additionalTags);
}
@ -42,11 +57,11 @@ module.exports = class IngestionBlobInfo {
if (ingestionProperties.ingestionMapping && ingestionProperties.ingestionMapping.length > 0) {
// server expects a string
additionalProperties["ingestionMapping"] = JSON.stringify(ingestionProperties.ingestionMapping);
additionalProperties.ingestionMapping = JSON.stringify(ingestionProperties.ingestionMapping);
}
if (ingestionProperties.ingestionMappingReference) {
additionalProperties["ingestionMappingReference"] = ingestionProperties.ingestionMappingReference;
additionalProperties.ingestionMappingReference = ingestionProperties.ingestionMappingReference;
}
if (ingestionProperties.validationPolicy) {
@ -59,4 +74,6 @@ module.exports = class IngestionBlobInfo {
this.AdditionalProperties = additionalProperties;
}
};
}
export default IngestionBlobInfo;

Просмотреть файл

@ -1,151 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
const DataFormat = Object.freeze({
CSV: "csv",
TSV: "tsv",
SCSV: "scsv",
SOHSV: "sohsv",
PSV: "psv",
TXT: "txt",
JSON: "json",
SINGLEJSON: "singlejson",
AVRO: "avro",
PARQUET: "parquet",
TSVE: "tsve",
ORC: "orc"
});
module.exports.DataFormat = DataFormat;
const IngestionMappingType = Object.freeze({
CSV: "Csv",
PARQUET: "Parquet",
AVRO: "Avro",
JSON: "Json",
ORC: "orc"
});
module.exports.IngestionMappingType = IngestionMappingType;
const ValidationOptions = Object.freeze({
DoNotValidate: 0,
ValidateCsvInputConstantColumns: 1,
ValidateCsvInputColumnLevelOnly: 2
});
module.exports.ValidationOptions = ValidationOptions;
let ValidationImplications = Object.freeze({
Fail: 0,
BestEffort: 1
});
module.exports.ValidationImplications = ValidationImplications;
module.exports.ValidationPolicy = class ValidationPolicy {
constructor(validationOptions = ValidationOptions.DoNotValidate, validationImplications = ValidationImplications.BestEffort) {
this.ValidationOptions = validationOptions;
this.ValidationImplications = validationImplications;
}
};
const ReportLevel = Object.freeze({
FailuresOnly: 0,
DoNotReport: 1,
FailuresAndSuccesses: 2
});
module.exports.ReportLevel = ReportLevel;
const ReportMethod = Object.freeze({
Queue: 0
});
module.exports.ReportMethod = ReportMethod;
class ColumnMapping { }
module.exports.CsvColumnMapping = class CsvColumnMapping extends ColumnMapping {
constructor(columnName, cslDataType, ordinal) {
super();
this.Name = columnName;
this.DataType = cslDataType;
this.Ordinal = ordinal;
}
};
module.exports.JsonColumnMapping = class JsonColumnMapping extends ColumnMapping {
constructor(columnName, jsonPath, cslDataType = null) {
super();
this.column = columnName;
this.path = jsonPath;
this.datatype = cslDataType;
}
};
module.exports.IngestionProperties = class IngestionProperties {
constructor({
database = null,
table = null,
format = null,
ingestionMapping = null,
ingestionMappingReference = null,
ingestionMappingType = null,
additionalTags = null,
ingestIfNotExists = null,
ingestByTags = null,
dropByTags = null,
flushImmediately = null,
reportLevel = null,
reportMethod = null,
validationPolicy = null,
additionalProperties = null
}) {
if (ingestionMapping && ingestionMappingReference) throw new Error("Both mapping and a mapping reference detected");
this.database = database;
this.table = table;
this.format = format;
this.ingestionMapping = ingestionMapping;
this.ingestionMappingType = ingestionMappingType;
this.ingestionMappingReference = ingestionMappingReference;
this.additionalTags = additionalTags;
this.ingestIfNotExists = ingestIfNotExists;
this.ingestByTags = ingestByTags;
this.dropByTags = dropByTags;
this.flushImmediately = flushImmediately;
this.reportLevel = reportLevel;
this.reportMethod = reportMethod;
this.validationPolicy = validationPolicy;
this.additionalProperties = additionalProperties;
}
validate() {
if (!this.flushImmediately) this.flushImmediately = false;
if (!this.reportLevel) this.reportLevel = ReportLevel.DoNotReport;
if (!this.reportMethod) this.reportMethod = ReportMethod.Queue;
if (!this.database) throw new Error("Must define a target database");
if (!this.table) throw new Error("Must define a target table");
if (!this.format) throw new Error("Must define a data format");
if (this.ingestionMapping && this.ingestionMappingReference)
throw new Error("Both mapping and a mapping reference detected");
if (!this.ingestionMapping && !this.ingestionMappingReference && this.format === DataFormat.JSON)
throw new Error("Json must have a mapping defined");
}
merge(extraProps) {
const merged = new IngestionProperties(this);
for (let key of Object.keys(extraProps)) {
if (extraProps[key] != null) {
merged[key] = extraProps[key];
}
}
return merged;
}
};

Просмотреть файл

@ -0,0 +1,156 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
export enum DataFormat {
CSV = "csv",
TSV = "tsv",
SCSV = "scsv",
SOHSV = "sohsv",
PSV = "psv",
TXT = "txt",
JSON = "json",
SINGLEJSON = "singlejson",
AVRO = "avro",
PARQUET = "parquet",
TSVE = "tsve",
ORC = "orc"
}
export enum IngestionMappingType {
CSV = "Csv",
PARQUET = "Parquet",
AVRO = "Avro",
JSON = "Json",
ORC = "orc"
}
export enum ValidationOptions {
DoNotValidate = 0,
ValidateCsvInputConstantColumns = 1,
ValidateCsvInputColumnLevelOnly = 2
}
export enum ValidationImplications {
Fail = 0,
BestEffort = 1
}
export class ValidationPolicy {
constructor(readonly validationOptions: ValidationOptions = ValidationOptions.DoNotValidate, readonly validationImplications: ValidationImplications = ValidationImplications.BestEffort) {
}
}
export enum ReportLevel {
FailuresOnly = 0,
DoNotReport = 1,
FailuresAndSuccesses = 2
}
export enum ReportMethod {
Queue = 0
}
class ColumnMapping {
}
export class CsvColumnMapping extends ColumnMapping {
constructor(readonly columnName: string, readonly cslDataType: string, readonly ordinal: string) {
super();
}
}
export class JsonColumnMapping extends ColumnMapping {
constructor(readonly columnName: string, readonly jsonPath: string, readonly cslDataType: string | null = null) {
super();
}
}
class IngestionPropertiesFields {
database?: string | null = null;
table?: string | null = null;
format?: string | null = null;
ingestionMapping?: ColumnMapping[] | null = null;
ingestionMappingReference?: string | null = null;
ingestionMappingType?: string | null = null;
additionalTags?: string | null = null;
ingestIfNotExists?: string | null = null;
ingestByTags?: string[] | null = null;
dropByTags?: string[] | null = null;
flushImmediately?: boolean | null = null;
reportLevel?: ReportLevel | null = null;
reportMethod?: ReportMethod | null = null;
validationPolicy?: string | null = null;
additionalProperties?: {[additional:string] : any} | null = null;
}
export class IngestionProperties extends IngestionPropertiesFields {
constructor({
database = null,
table = null,
format = null,
ingestionMapping = null,
ingestionMappingReference = null,
ingestionMappingType = null,
additionalTags = null,
ingestIfNotExists = null,
ingestByTags = null,
dropByTags = null,
flushImmediately = null,
reportLevel = null,
reportMethod = null,
validationPolicy = null,
additionalProperties = null
}: IngestionPropertiesFields) {
super();
if (ingestionMapping && ingestionMappingReference) throw new Error("Both mapping and a mapping reference detected");
this.database = database;
this.table = table;
this.format = format;
this.ingestionMapping = ingestionMapping;
this.ingestionMappingType = ingestionMappingType;
this.ingestionMappingReference = ingestionMappingReference;
this.additionalTags = additionalTags;
this.ingestIfNotExists = ingestIfNotExists;
this.ingestByTags = ingestByTags;
this.dropByTags = dropByTags;
this.flushImmediately = flushImmediately;
this.reportLevel = reportLevel;
this.reportMethod = reportMethod;
this.validationPolicy = validationPolicy;
this.additionalProperties = additionalProperties;
}
validate() {
if (!this.flushImmediately) this.flushImmediately = false;
if (!this.reportLevel) this.reportLevel = ReportLevel.DoNotReport;
if (!this.reportMethod) this.reportMethod = ReportMethod.Queue;
if (!this.database) throw new Error("Must define a target database");
if (!this.table) throw new Error("Must define a target table");
if (!this.format) throw new Error("Must define a data format");
if (this.ingestionMapping && this.ingestionMappingReference)
throw new Error("Both mapping and a mapping reference detected");
if (!this.ingestionMapping && !this.ingestionMappingReference && this.format === DataFormat.JSON)
throw new Error("Json must have a mapping defined");
}
[extraProps: string] : any;
merge(extraProps: any) {
const merged = new IngestionProperties(this);
for (const key of Object.keys(extraProps)) {
if (extraProps[key] != null) {
merged[key] = extraProps[key];
}
}
return merged;
}
}
export default IngestionProperties;

Просмотреть файл

@ -1,144 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
const moment = require("moment");
const URI_FORMAT = /https:\/\/(\w+).(queue|blob|table).core.windows.net\/([\w,-]+)\?(.*)/;
class ResourceURI {
constructor(storageAccountName, objectType, objectName, sas) {
this.storageAccountName = storageAccountName;
this.objectType = objectType;
this.objectName = objectName;
this.sas = sas;
}
static fromURI(uri) {
const match = URI_FORMAT.exec(uri);
return new ResourceURI(match[1], match[2], match[3], match[4]);
}
getSASConnectionString() {
if(this.objectType == "queue"){
return `QueueEndpoint=https://${this.storageAccountName}.queue.core.windows.net/;SharedAccessSignature=${this.sas}`;
}
if(this.objectType == "blob"){
return `BlobEndpoint=https://${this.storageAccountName}.blob.core.windows.net/;SharedAccessSignature=${this.sas}`;
}
}
}
module.exports.ResourceURI = ResourceURI;
class IngestClientResources {
constructor(
securedReadyForAggregationQueues = null,
failedIngestionsQueues = null,
successfulIngestionsQueues = null,
containers = null
) {
this.securedReadyForAggregationQueues = securedReadyForAggregationQueues;
this.failedIngestionsQueues = failedIngestionsQueues;
this.successfulIngestionsQueues = successfulIngestionsQueues;
this.containers = containers;
}
valid() {
let resources = [
this.securedReadyForAggregationQueues,
this.failedIngestionsQueues,
this.failedIngestionsQueues,
this.containers
];
return resources.reduce((prev, current) => prev && current, true);
}
}
module.exports.IngestClientResources = IngestClientResources;
module.exports.ResourceManager = class ResourceManager {
constructor(kustoClient) {
this.kustoClient = kustoClient;
this.refreshPeriod = moment.duration(1, "h");
this.ingestClientResources = null;
this.ingestClientResourcesLastUpdate = null;
this.authorizationContext = null;
this.authorizationContextLastUpdate = null;
}
async refreshIngestClientResources() {
let now = moment.now();
if (!this.ingestClientResources ||
(this.ingestClientResourcesLastUpdate + this.refreshPeriod) <= now || !this.ingestClientResources.valid()) {
this.ingestClientResources = await this.getIngestClientResourcesFromService();
this.ingestClientResourcesLastUpdate = now;
}
}
async getIngestClientResourcesFromService() {
let response = await this.kustoClient.execute("NetDefaultDB", ".get ingestion resources");
const table = response.primaryResults[0];
const resources = new IngestClientResources(
this.getResourceByName(table, "SecuredReadyForAggregationQueue"),
this.getResourceByName(table, "FailedIngestionsQueue"),
this.getResourceByName(table, "SuccessfulIngestionsQueue"),
this.getResourceByName(table, "TempStorage")
);
return resources;
}
getResourceByName(table, resourceName) {
let result = [];
for (let row of table.rows()) {
if (row.ResourceTypeName == resourceName) {
result.push(ResourceURI.fromURI(row.StorageRoot));
}
}
return result;
}
async refreshAuthorizationContext() {
let now = moment.utc();
if (!this.authorizationContext || this.authorizationContext.trim() ||
(this.authorizationContextLastUpdate + this.refreshPeriod) <= now) {
this.authorizationContext = await this.getAuthorizationContextFromService();
this.authorizationContextLastUpdate = now;
}
}
async getAuthorizationContextFromService() {
let response = await this.kustoClient.execute("NetDefaultDB", ".get kusto identity token");
const authContext = response.primaryResults[0].rows().next().value.AuthorizationContext;
return authContext;
}
async getIngestionQueues() {
await this.refreshIngestClientResources();
return this.ingestClientResources.securedReadyForAggregationQueues;
}
async getFailedIngestionsQueues() {
await this.refreshIngestClientResources();
return this.ingestClientResources.failedIngestionsQueues;
}
async getSuccessfulIngestionsQueues() {
await this.refreshIngestClientResources();
return this.ingestClientResources.successfulIngestionsQueues;
}
async getContainers() {
await this.refreshIngestClientResources();
return this.ingestClientResources.containers;
}
async getAuthorizationContext() {
await this.refreshAuthorizationContext();
return this.authorizationContext;
}
};

Просмотреть файл

@ -0,0 +1,147 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
import {Client} from "azure-kusto-data";
import moment from "moment";
const URI_FORMAT = /https:\/\/(\w+).(queue|blob|table).core.windows.net\/([\w,-]+)\?(.*)/;
export class ResourceURI {
constructor(readonly storageAccountName: string, readonly objectType: string, readonly objectName: string, readonly sas: string) {
}
static fromURI(uri: string) {
const match = URI_FORMAT.exec(uri);
if (match == null || match.length < 5) {
throw Error(`Failed to create ResourceManager from URI - invalid uri (${uri})`);
}
return new ResourceURI(match[1], match[2], match[3], match[4]);
}
getSASConnectionString(): string {
if (this.objectType == "queue") {
return `QueueEndpoint=https://${this.storageAccountName}.queue.core.windows.net/;SharedAccessSignature=${this.sas}`;
}
if (this.objectType == "blob") {
return `BlobEndpoint=https://${this.storageAccountName}.blob.core.windows.net/;SharedAccessSignature=${this.sas}`;
}
throw new Error(`Can't make the current object type (${this.objectType}) to connection string`)
}
}
export class IngestClientResources {
constructor(
readonly securedReadyForAggregationQueues: ResourceURI[] | null = null,
readonly failedIngestionsQueues: ResourceURI[] | null = null,
readonly successfulIngestionsQueues: ResourceURI[] | null = null,
readonly containers: ResourceURI[] | null = null
) {
}
valid() {
const resources = [
this.securedReadyForAggregationQueues,
this.failedIngestionsQueues,
this.failedIngestionsQueues,
this.containers
];
return resources.reduce((prev, current) => !!(prev && current), true);
}
}
export class ResourceManager {
public readonly refreshPeriod: moment.Duration;
public ingestClientResources: IngestClientResources | null;
public ingestClientResourcesLastUpdate: moment.Moment | null;
public authorizationContext: string | null;
public authorizationContextLastUpdate: moment.Moment | null;
constructor(readonly kustoClient: Client) {
this.refreshPeriod = moment.duration(1, "h");
this.ingestClientResources = null;
this.ingestClientResourcesLastUpdate = null;
this.authorizationContext = null;
this.authorizationContextLastUpdate = null;
}
async refreshIngestClientResources(): Promise<IngestClientResources> {
const now = moment();
if (!this.ingestClientResources ||
!this.ingestClientResourcesLastUpdate ||
(this.ingestClientResourcesLastUpdate.add(this.refreshPeriod) <= now) ||
!this.ingestClientResources.valid()) {
this.ingestClientResources = await this.getIngestClientResourcesFromService();
this.ingestClientResourcesLastUpdate = now;
}
return this.ingestClientResources;
}
async getIngestClientResourcesFromService(): Promise<IngestClientResources> {
const response = await this.kustoClient.execute("NetDefaultDB", ".get ingestion resources");
const table = response.primaryResults[0];
return new IngestClientResources(
this.getResourceByName(table, "SecuredReadyForAggregationQueue"),
this.getResourceByName(table, "FailedIngestionsQueue"),
this.getResourceByName(table, "SuccessfulIngestionsQueue"),
this.getResourceByName(table, "TempStorage")
);
}
getResourceByName(table: { rows: () => any; }, resourceName: string): ResourceURI[] {
const result = [];
for (const row of table.rows()) {
if (row.ResourceTypeName == resourceName) {
result.push(ResourceURI.fromURI(row.StorageRoot));
}
}
return result;
}
async refreshAuthorizationContext(): Promise<string> {
const now = moment.utc();
if (!this.authorizationContext?.trim() ||
!this.authorizationContextLastUpdate ||
(this.authorizationContextLastUpdate.add(this.refreshPeriod)) <= now) {
this.authorizationContext = await this.getAuthorizationContextFromService();
this.authorizationContextLastUpdate = now;
if (this.authorizationContext == null) {
throw new Error("Authorization context can't be null");
}
}
return this.authorizationContext;
}
async getAuthorizationContextFromService() {
const response = await this.kustoClient.execute("NetDefaultDB", ".get kusto identity token");
return (response.primaryResults[0].rows().next().value as any).AuthorizationContext;
}
async getIngestionQueues() {
return (await this.refreshIngestClientResources()).securedReadyForAggregationQueues;
}
async getFailedIngestionsQueues() {
return (await this.refreshIngestClientResources()).failedIngestionsQueues;
}
async getSuccessfulIngestionsQueues() {
return (await this.refreshIngestClientResources()).successfulIngestionsQueues;
}
async getContainers() {
return (await this.refreshIngestClientResources()).containers;
}
async getAuthorizationContext(): Promise<string> {
return this.refreshAuthorizationContext();
}
}
export default ResourceManager;

Просмотреть файл

@ -1,10 +1,21 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
const StatusQueue = require("./statusQ");
class StatusMessage {
constructor(raw, obj, extraProps) {
let props = [
import {StatusQueue} from "./statusQ";
import KustoIngestClient from "./ingestClient";
import {ResourceURI} from "./resourceManager";
export class StatusMessage {
OperationId?: string;
Database?: string;
Table?: string;
IngestionSourceId?: string;
IngestionSourcePath?: string;
RootActivityId?: string;
[other: string] : any;
constructor(raw: any, obj: any, extraProps: string[] | null) {
let props : string[] = [
"OperationId", "Database", "Table",
"IngestionSourceId", "IngestionSourcePath", "RootActivityId"
];
@ -15,7 +26,7 @@ class StatusMessage {
const _obj = obj || JSON.parse(raw || JSON.stringify(raw));
for (let prop of props) {
for (const prop of props) {
this[prop] = _obj[prop];
}
}
@ -23,7 +34,9 @@ class StatusMessage {
class SuccessMessage extends StatusMessage {
constructor(raw, obj) {
SucceededOn?: string;
constructor(raw: any, obj: any) {
super(raw, obj, [
"SucceededOn"
]);
@ -32,7 +45,13 @@ class SuccessMessage extends StatusMessage {
class FailureMessage extends StatusMessage {
constructor(raw, obj) {
FailedOn? : string;
Details? : string;
ErrorCode? : string;
FailureStatus? : string;
OriginatesFromUpdatePolicy? : string;
ShouldRetry? : string;
constructor(raw: any, obj: any) {
super(raw, obj, [
"FailedOn",
"Details",
@ -45,15 +64,19 @@ class FailureMessage extends StatusMessage {
}
module.exports = class KustoIngestStatusQueues {
constructor(kustoIngestClient) {
export class KustoIngestStatusQueues {
success: StatusQueue;
failure: StatusQueue;
constructor(kustoIngestClient: KustoIngestClient) {
this.success = new StatusQueue(
() => kustoIngestClient.resourceManager.getSuccessfulIngestionsQueues(),
() => kustoIngestClient.resourceManager.getSuccessfulIngestionsQueues().then(r => r as ResourceURI[]),
SuccessMessage
);
this.failure = new StatusQueue(
() => kustoIngestClient.resourceManager.getFailedIngestionsQueues(),
() => kustoIngestClient.resourceManager.getFailedIngestionsQueues().then(r => r as ResourceURI[]),
FailureMessage
);
}
};
}
export default KustoIngestStatusQueues;

Просмотреть файл

@ -1,134 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
const { QueueClient } = require("@azure/storage-queue");
class QueueDetails {
constructor(name, service) {
this.name = name;
this.service = service;
}
}
function shuffle(a) {
for (let i = a.length - 1; i > 0; i--) {
const j = Math.floor(Math.random() * (i + 1));
let temp = a[j];
a[j] = a[i];
a[i] = temp;
}
return a;
}
module.exports = class StatusQueue {
constructor(getQueuesFunc, messageCls) {
this.getQueuesFunc = getQueuesFunc;
this.messageCls = messageCls;
}
_getQServices(queuesDetails) {
return queuesDetails.map(q => new QueueDetails(q.objectName,
new QueueClient(q.getSASConnectionString(), q.objectName)));
}
async isEmpty() {
const result = await this.peek(1, { raw: true });
return !result || result.length === 0;
}
decodeContent(content) {
return Buffer.from(content, "base64").toString("ascii");
}
deserializeMessage(m) {
return new this.messageCls(this.decodeContent(m.messageText));
}
async _peek(qs, n, options) {
const result = [];
const nonEmptyQs = [];
for (let i = 0; i < qs.length; i++) {
let q = qs[i];
let response = await q.service.peekMessages();
let messages = response.peekedMessageItems;
if (messages && messages.length > 0) {
nonEmptyQs.push(q);
}
for (let m of messages) {
if (m && Object.keys(m).length > 0) {
result.push(options && options.raw ? m : this.deserializeMessage(m));
if (result.length == n) {
return { done: true, nonEmptyQs, result };
}
}
}
}
return { done: nonEmptyQs.length === 0, nonEmptyQs, result };
}
async peek(n = 1, options = null) {
const queues = await this.getQueuesFunc();
const qServices = shuffle(this._getQServices(queues));
const perQ = qServices.length > 1 ? Math.floor(n / qServices.length) : qServices.length;
// First, iterate evenly and randomly on status queues
const partial = await this._peek(qServices, perQ, options);
if (partial.done) {
return partial.result;
}
let messagesLeftToPeek = n - partial.result.length;
// In case queues are uneven, iterate again. This time, request for all n messages and trim
return await this._peek(partial.result.nonEmptyQs, messagesLeftToPeek, options);
}
async _pop(qs, n, options) {
const nonEmptyQs = [];
const result = [];
for (let i = 0; i < qs.length; i++) {
let q = qs[i];
const response = await q.service.receiveMessages({ numOfMessages: n });
let messages = response.receivedMessageItems;
for (let m of messages) {
if (m && Object.keys(m).length > 0) {
result.push(options && options.raw ? m : this.deserializeMessage(m));
if (!(options && options.remove === false)) {
q.service.deleteMessage(m.messageId, m.popReceipt);
}
if (result.length == n) {
return { done: true, nonEmptyQs, result };
}
}
}
}
return { done: nonEmptyQs.length === 0, nonEmptyQs, result };
}
async pop(n = 1, options = null) {
const queues = await this.getQueuesFunc();
const qServices = shuffle(this._getQServices(queues));
const perQ = qServices.length > 1 ? Math.floor(n / qServices.length) : qServices.length;
// First, iterate evenly and randomly on status queues
const partial = await this._pop(qServices, perQ, options);
if (partial.done) {
return partial.result;
}
let messagesLeftToPop = n - partial.result.length;
// In case queues are uneven, iterate again. This time, request for all n messages and trim
const final = await this._pop(partial.result.nonEmptyQs, messagesLeftToPop, options);
return partial.result.concat(final.result);
}
};

Просмотреть файл

@ -0,0 +1,149 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
import {PeekedMessageItem, QueueClient} from "@azure/storage-queue";
import {ResourceURI} from "./resourceManager"
import {StatusMessage} from "./status";
class QueueDetails {
constructor(readonly name: string, readonly service: QueueClient) {
}
}
function shuffle<T>(a: T[]): T[] {
for (let i = a.length - 1; i > 0; i--) {
const j = Math.floor(Math.random() * (i + 1));
const temp = a[j];
a[j] = a[i];
a[i] = temp;
}
return a;
}
interface PeekParams {
raw: boolean;
}
interface PopParams {
raw: boolean;
remove: boolean;
}
type Message = PeekedMessageItem | StatusMessage;
export class StatusQueue {
constructor(readonly getQueuesFunc: () => Promise<ResourceURI[]>, readonly messageCls: typeof StatusMessage) {
}
_getQServices(queuesDetails: ResourceURI[]) {
return queuesDetails.map(function (q) {
const sasConnectionString = q.getSASConnectionString();
if (!sasConnectionString) {
throw new Error("Empty or null connection string");
}
return new QueueDetails(q.objectName,
new QueueClient(sasConnectionString, q.objectName));
});
}
async isEmpty() {
const result = await this.peek(1, {raw: true});
return !result || result.length === 0;
}
decodeContent(content: string) {
return Buffer.from(content, "base64").toString("ascii");
}
deserializeMessage(m: PeekedMessageItem): StatusMessage {
return new this.messageCls(this.decodeContent(m.messageText), null, null);
}
async _peek(qs: QueueDetails[], n: number, options: PeekParams | null): Promise<{ result: Message[]; nonEmptyQs: QueueDetails[]; done: boolean }> {
const result: Message[] = [];
const nonEmptyQs: QueueDetails[] = [];
for (const q of qs) {
const response = await q.service.peekMessages();
const messages = response.peekedMessageItems;
if (messages && messages.length > 0) {
nonEmptyQs.push(q);
}
for (const m of messages) {
if (m && Object.keys(m).length > 0) {
result.push(options && options.raw ? m : this.deserializeMessage(m));
if (result.length == n) {
return {done: true, nonEmptyQs, result};
}
}
}
}
return {done: nonEmptyQs.length === 0, nonEmptyQs, result};
}
async peek(n = 1, options: PeekParams | null = null): Promise<Message[]> {
const queues = await this.getQueuesFunc();
const qServices: QueueDetails[] = shuffle(this._getQServices(queues));
const perQ = qServices.length > 1 ? Math.floor(n / qServices.length) : qServices.length;
// First, iterate evenly and randomly on status queues
const partial = await this._peek(qServices, perQ, options);
if (partial.done) {
return partial.result;
}
const messagesLeftToPeek = n - partial.result.length;
// In case queues are uneven, iterate again. This time, request for all n messages and trim
return (await this._peek(partial.nonEmptyQs, messagesLeftToPeek, options)).result;
}
async _pop(qs: QueueDetails[], n: number, options: PopParams | null):
Promise<{ result: Message[] & {nonEmptyQs?: QueueDetails[]}; nonEmptyQs: any[]; done: boolean }> {
const nonEmptyQs: any[] = [];
const result = [];
for (const q of qs) {
const response = await q.service.receiveMessages({numOfMessages: n});
const messages = response.receivedMessageItems;
for (const m of messages) {
if (m && Object.keys(m).length > 0) {
result.push(options && options.raw ? m : this.deserializeMessage(m));
if (!(options && !options.remove)) {
q.service.deleteMessage(m.messageId, m.popReceipt);
}
if (result.length == n) {
return {done: true, nonEmptyQs, result};
}
}
}
}
return {done: nonEmptyQs.length === 0, nonEmptyQs, result};
}
async pop(n = 1, options: PopParams | null = null): Promise<Message[]> {
const queues = await this.getQueuesFunc();
const qServices = shuffle(this._getQServices(queues));
const perQ = qServices.length > 1 ? Math.floor(n / qServices.length) : qServices.length;
// First, iterate evenly and randomly on status queues
const partial = await this._pop(qServices, perQ, options);
if (partial.done) {
return partial.result;
}
const messagesLeftToPop = n - partial.result.length;
// In case queues are uneven, iterate again. This time, request for all n messages and trim
const final = await this._pop(partial.result.nonEmptyQs ?? [], messagesLeftToPop, options);
return partial.result.concat(final.result);
}
}

Просмотреть файл

@ -1,64 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
const KustoClient = require("azure-kusto-data").Client;
const { FileDescriptor, StreamDescriptor, CompressionType } = require("./descriptors");
const DataFormat = require("./resourceManager");
const zlib = require("zlib");
const fs = require("fs");
module.exports = class KustoStreamingIngestClient {
constructor(kcsb, defaultProps) {
this.kustoClient = new KustoClient(kcsb);
this.defaultProps = defaultProps;
this._mapping_required_formats = Object.freeze([ DataFormat.JSON, DataFormat.SINGLEJSON, DataFormat.AVRO, DataFormat.ORC ]);
}
_mergeProps(newProperties) {
// no default props
if (newProperties == null || Object.keys(newProperties).length == 0) {
return this.defaultProps;
}
// no new props
if (this.defaultProps == null || Object.keys(this.defaultProps) == 0) {
return newProperties;
}
// both exist - merge
return this.defaultProps.merge(newProperties);
}
async ingestFromStream(stream, ingestionProperties) {
const props = this._mergeProps(ingestionProperties);
props.validate();
const descriptor = stream instanceof StreamDescriptor ? stream : new StreamDescriptor(stream);
const compressedStream =
descriptor.compressionType == CompressionType.None ? descriptor.stream.pipe(zlib.createGzip()) : descriptor.stream;
if (props.ingestionMappingReference == null && this._mapping_required_formats.includes(props.format)) {
throw new Error(`Mapping reference required for format ${props.foramt}.`);
}
return this.kustoClient.executeStreamingIngest(
props.database,
props.table,
compressedStream,
props.format,
props.ingestionMappingReference);
}
async ingestFromFile(file, ingestionProperties) {
const props = this._mergeProps(ingestionProperties);
props.validate();
const fileDescriptor = file instanceof FileDescriptor ? file : new FileDescriptor(file);
const stream = fs.createReadStream(fileDescriptor.filePath);
const compressionType = fileDescriptor.zipped ? CompressionType.GZIP : CompressionType.None;
const streamDescriptor = new StreamDescriptor(stream, fileDescriptor.sourceId, compressionType);
return this.ingestFromStream(streamDescriptor, ingestionProperties);
}
};

Просмотреть файл

@ -0,0 +1,50 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
import IngestionProperties, {DataFormat} from "./ingestionProperties";
import {CompressionType, FileDescriptor, StreamDescriptor} from "./descriptors";
import zlib from "zlib";
import fs from "fs";
import {AbstractKustoClient} from "./abstractKustoClient";
import {Client as KustoClient, KustoConnectionStringBuilder} from "azure-kusto-data";
import {KustoResponseDataSet} from "azure-kusto-data/source/response";
class KustoStreamingIngestClient extends AbstractKustoClient {
private kustoClient: KustoClient;
// tslint:disable-next-line:variable-name
private _mapping_required_formats: readonly any[];
constructor(kcsb: string | KustoConnectionStringBuilder, defaultProps: IngestionProperties | null = null) {
super(defaultProps);
this.kustoClient = new KustoClient(kcsb);
this._mapping_required_formats = Object.freeze([DataFormat.JSON, DataFormat.SINGLEJSON, DataFormat.AVRO, DataFormat.ORC]);
}
async ingestFromStream(stream: StreamDescriptor | fs.ReadStream, ingestionProperties: IngestionProperties): Promise<KustoResponseDataSet> {
const props = this._mergeProps(ingestionProperties);
props.validate();
const descriptor = stream instanceof StreamDescriptor ? stream : new StreamDescriptor(stream);
const compressedStream =
descriptor.compressionType == CompressionType.None ? descriptor.stream.pipe(zlib.createGzip()) : descriptor.stream;
if (props.ingestionMappingReference == null && this._mapping_required_formats.includes(props.format)) {
throw new Error(`Mapping reference required for format ${props.foramt}.`);
}
return this.kustoClient.executeStreamingIngest(
props.database as string,
props.table as string,
compressedStream,
props.format,
props.ingestionMappingReference ?? null);
}
async ingestFromFile(file: FileDescriptor | string, ingestionProperties: IngestionProperties): Promise<KustoResponseDataSet> {
const props = this._mergeProps(ingestionProperties);
props.validate();
const fileDescriptor = file instanceof FileDescriptor ? file : new FileDescriptor(file);
const stream = fs.createReadStream(fileDescriptor.filePath);
const compressionType = fileDescriptor.zipped ? CompressionType.GZIP : CompressionType.None;
const streamDescriptor = new StreamDescriptor(stream, fileDescriptor.sourceId, compressionType);
return this.ingestFromStream(streamDescriptor, ingestionProperties);
}
}
export default KustoStreamingIngestClient;

Просмотреть файл

@ -1,29 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
const assert = require("assert");
const { FileDescriptor } = require("../source/descriptors");
describe("FileDescriptor", function () {
describe("#constructor()", function () {
it("valid input zipped", function () {
let desc = new FileDescriptor("./data/events.json.gz");
assert.equal(desc.name, "events.json.gz");
assert.equal(desc.extension, ".gz");
assert.equal(desc.zipped, true);
});
it("valid input json", function () {
let desc = new FileDescriptor("./data/events.json");
assert.equal(desc.name, "events.json");
assert.equal(desc.extension, ".json");
assert.equal(desc.zipped, false);
});
});
});

Просмотреть файл

@ -0,0 +1,28 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
import assert from "assert";
import {FileDescriptor} from "../source/descriptors";
describe("FileDescriptor", function () {
describe("#constructor()", function () {
it("valid input zipped", function () {
const desc = new FileDescriptor("./data/events.json.gz");
assert.strictEqual(desc.name, "events.json.gz");
assert.strictEqual(desc.extension, ".gz");
assert.strictEqual(desc.zipped, true);
});
it("valid input json", function () {
const desc = new FileDescriptor("./data/events.json");
assert.strictEqual(desc.name, "events.json");
assert.strictEqual(desc.extension, ".json");
assert.strictEqual(desc.zipped, false);
});
});
});

Просмотреть файл

@ -1,274 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
const assert = require("assert");
const fs = require('fs');
const path = require('path')
const IngestClient = require("../../source/ingestClient");
const KustoIngestStatusQueues = require("../../source/status");
const ConnectionStringBuilder = require("../../node_modules/azure-kusto-data").KustoConnectionStringBuilder;
const Client = require("../.././node_modules/azure-kusto-data").Client;
const StreamingIngestClient = require("../../source/streamingIngestClient");
const { FileDescriptor, StreamDescriptor, CompressionType } = require("../../source/descriptors");
const { IngestionProperties, DataFormat, ReportLevel } = require("../../source/ingestionProperties");
const databaseName = process.env.TEST_DATABASE;
const appId = process.env.APP_ID;
const appKey = process.env.APP_KEY;
const tenantId = process.env.TENANT_ID;
if (!databaseName || !appId || !appKey || !tenantId) {
process.stdout.write("Skip E2E test - Missing env variables");
return;
}
const engineKcsb = ConnectionStringBuilder.withAadApplicationKeyAuthentication(process.env.ENGINE_CONNECTION_STRING, appId, appKey, tenantId);
const queryClient = new Client(engineKcsb);
const streamingIngestClient = new StreamingIngestClient(engineKcsb);
const dmKcsb = ConnectionStringBuilder.withAadApplicationKeyAuthentication(process.env.DM_CONNECTION_STRING, appId, appKey, tenantId);
const ingestClient = new IngestClient(dmKcsb);
const statusQueues = new KustoIngestStatusQueues(ingestClient);
class testDataItem {
constructor(description, path, rows, ingestionProperties, testOnstreamingIngestion = true) {
this.description = description;
this.path = path;
this.rows = rows;
this.ingestionProperties = ingestionProperties;
this.testOnstreamingIngestion = testOnstreamingIngestion;
}
}
const tableName = "NodeTest" + Date.now();
const mappingName = "mappingRef";
const tableColumns = "(rownumber:int, rowguid:string, xdouble:real, xfloat:real, xbool:bool, xint16:int, xint32:int, xint64:long, xuint8:long, xuint16:long, xuint32:long, xuint64:long, xdate:datetime, xsmalltext:string, xtext:string, xnumberAsText:string, xtime:timespan, xtextWithNulls:string, xdynamicWithNulls:dynamic)";
const mapping = fs.readFileSync(getTestResourcePath("dataset_mapping.json"), { encoding: 'utf8' });
const columnmapping = JSON.parse(mapping);
const ingestionPropertiesWithoutMapping = new IngestionProperties({ database: databaseName, table: tableName, format: DataFormat.CSV, flushImmediately: true });
const ingestionPropertiesWithMappingReference = new IngestionProperties({ database: databaseName, table: tableName, format: DataFormat.JSON, ingestionMappingReference: mappingName, flushImmediately: true });
const ingestionPropertiesWithColumnMapping = new IngestionProperties({ database: databaseName, table: tableName, format: DataFormat.JSON, ingestionMapping: columnmapping, flushImmediately: true });
const testItems = [
new testDataItem("csv", getTestResourcePath("dataset.csv"), 10, ingestionPropertiesWithoutMapping),
new testDataItem("csv.gz", getTestResourcePath("dataset_gzip.csv.gz"), 10, ingestionPropertiesWithoutMapping),
new testDataItem("json with mapping ref", getTestResourcePath("dataset.json"), 2, ingestionPropertiesWithMappingReference),
new testDataItem("json.gz with mapping ref", getTestResourcePath("dataset_gzip.json.gz"), 2, ingestionPropertiesWithMappingReference),
new testDataItem("json with mapping", getTestResourcePath("dataset.json"), 2, ingestionPropertiesWithColumnMapping, false),
new testDataItem("json.gz with mapping", getTestResourcePath("dataset_gzip.json.gz"), 2, ingestionPropertiesWithColumnMapping, false)
];
var currentCount = 0;
describe(`E2E Tests - ${tableName}`, function () {
after(async function () {
try {
await queryClient.execute(databaseName, `.drop table ${tableName} ifexists`);
}
catch (err) {
assert.fail("Failed to drop table");
}
});
describe('SetUp', function () {
it('Create table', async function () {
try {
await queryClient.execute(databaseName, `.create table ${tableName} ${tableColumns}`);
}
catch (err) {
assert.fail("Failed to create table");
}
});
it('Create table ingestion mapping', async function () {
try {
await queryClient.execute(databaseName, `.create-or-alter table ${tableName} ingestion json mapping '${mappingName}' '${mapping}'`);
}
catch (err) {
assert.fail("Failed to create table ingestion mapping" + err);
}
});
});
describe('ingestClient', function () {
it('ingestFromFile', async function () {
for (let item of testItems) {
try {
await ingestClient.ingestFromFile(item.path, item.ingestionProperties);
}
catch (err) {
console.error(err);
assert.fail(`Failed to ingest ${item.description}`);
}
await assertRowsCount(item);
}
}).timeout(240000);
it('ingestFromStream', async function () {
for (let item of testItems) {
let stream = fs.createReadStream(item.path);
if (item.path.endsWith('gz')) {
stream = new StreamDescriptor(stream, null, CompressionType.GZIP);
}
try {
await ingestClient.ingestFromStream(stream, item.ingestionProperties);
}
catch (err) {
assert.fail(`Failed to ingest ${item.description}`);
}
await assertRowsCount(item);
}
}).timeout(240000);
});
describe('StreamingIngestClient', function () {
it('ingestFromFile', async function () {
for (let item of testItems.filter(item => item.testOnstreamingIngestion)) {
try {
await streamingIngestClient.ingestFromFile(item.path, item.ingestionProperties);
}
catch (err) {
console.error(err);
assert.fail(`Failed to ingest ${item.description}`);
}
await assertRowsCount(item);
}
}).timeout(240000);
it('ingestFromStream', async function () {
for (let item of testItems.filter(item => item.testOnstreamingIngestion)) {
let stream = fs.createReadStream(item.path);
if (item.path.endsWith('gz')) {
stream = new StreamDescriptor(stream, null, CompressionType.GZIP);
}
try {
await streamingIngestClient.ingestFromStream(stream, item.ingestionProperties);
}
catch (err) {
assert.fail(`Failed to ingest ${item.description}`);
}
await assertRowsCount(item);
}
}).timeout(240000);
});
describe('KustoIngestStatusQueues', function () {
it('CleanStatusQueues', async function () {
try {
await cleanStatusQueues();
}
catch (err) {
console.error(err);
assert.fail(`Failed to Clean status queues`);
}
}).timeout(240000);
it('CheckSucceededIngestion', async function () {
item = testItems[0];
item.ingestionProperties.reportLevel = ReportLevel.FailuresAndSuccesses;
try {
await ingestClient.ingestFromFile(item.path, item.ingestionProperties);
const status = await waitForStatus();
assert.equal(status.SuccessCount, 1);
assert.equal(status.FailureCount, 0);
}
catch (err) {
console.error(err);
assert.fail(`Failed to ingest ${item.description}`);
}
}).timeout(240000);
it('CheckFailedIngestion', async function () {
item = testItems[0];
item.ingestionProperties.reportLevel = ReportLevel.FailuresAndSuccesses;
item.ingestionProperties.database = "invalid";
try {
await ingestClient.ingestFromFile(item.path, item.ingestionProperties);
const status = await waitForStatus();
assert.equal(status.SuccessCount, 0);
assert.equal(status.FailureCount, 1);
}
catch (err) {
console.error(err);
assert.fail(`Failed to ingest ${item.description}`);
}
}).timeout(240000);
});
describe('QueryClient', function () {
it('General BadRequest', async function () {
try {
response = await queryClient.executeQuery(databaseName, "invalidSyntax ");
}
catch (ex) {
return;
}
assert.fail(`General BadRequest ${item.description}`);
});
it('PartialQueryFailure', async function () {
try {
response = await queryClient.executeQuery(databaseName, "invalidSyntax ");
}
catch (ex) {
return;
}
assert.fail(`Didn't throw PartialQueryFailure ${item.description}`);
});
});
});
async function cleanStatusQueues() {
while (!await statusQueues.failure.isEmpty()) {
await statusQueues.failure.pop();
}
while (!await statusQueues.success.isEmpty()) {
await statusQueues.success.pop();
}
}
async function waitForStatus() {
while (await statusQueues.failure.isEmpty() && await statusQueues.success.isEmpty()) {
await sleep(1000);
}
const failures = await statusQueues.failure.pop();
const successes = await statusQueues.success.pop();
return { "SuccessCount": successes.length, "FailureCount": failures.length }
}
function sleep(ms) {
return new Promise((resolve) => { setTimeout(resolve, ms); });
}
function getTestResourcePath(name) {
return __dirname + `/e2eData/${name}`;
}
async function assertRowsCount(testItem) {
var count = 0;
var expected = testItem.rows;
// Timeout = 3 min
for (var i = 0; i < 18; i++) {
await sleep(10000);
let results;
try {
results = await queryClient.execute(databaseName, `${tableName} | count `);
}
catch (ex) {
continue;
}
count = results.primaryResults[0][0].Count - currentCount;
if (count >= expected) {
break;
}
}
currentCount += count;
assert.equal(count, expected, `Failed to ingest ${testItem.description}`);
}

Просмотреть файл

@ -0,0 +1,281 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
import assert from "assert";
import fs, {ReadStream} from 'fs';
import IngestClient from "../../source/ingestClient";
import KustoIngestStatusQueues from "../../source/status";
import {
Client,
KustoConnectionStringBuilder as ConnectionStringBuilder
// @ts-ignore
} from "../.././node_modules/azure-kusto-data";
import StreamingIngestClient from "../../source/streamingIngestClient";
import {CompressionType, StreamDescriptor} from "../../source/descriptors";
import {DataFormat, IngestionProperties, ReportLevel} from "../../source/ingestionProperties";
const databaseName = process.env.TEST_DATABASE;
const appId = process.env.APP_ID;
const appKey = process.env.APP_KEY;
const tenantId = process.env.TENANT_ID;
function main(): void {
if (!databaseName || !appId || !appKey || !tenantId) {
process.stdout.write("Skip E2E test - Missing env variables");
return;
}
const engineKcsb = ConnectionStringBuilder.withAadApplicationKeyAuthentication(process.env.ENGINE_CONNECTION_STRING ?? "", appId, appKey, tenantId);
const queryClient = new Client(engineKcsb);
const streamingIngestClient = new StreamingIngestClient(engineKcsb);
const dmKcsb = ConnectionStringBuilder.withAadApplicationKeyAuthentication(process.env.DM_CONNECTION_STRING ?? "", appId, appKey, tenantId);
const ingestClient = new IngestClient(dmKcsb);
const statusQueues = new KustoIngestStatusQueues(ingestClient);
class testDataItem {
constructor(public description: string, public path: string, public rows: number, public ingestionProperties: IngestionProperties, public testOnstreamingIngestion = true) {
}
}
const tableName = "NodeTest" + Date.now();
const mappingName = "mappingRef";
const tableColumns = "(rownumber:int, rowguid:string, xdouble:real, xfloat:real, xbool:bool, xint16:int, xint32:int, xint64:long, xuint8:long, xuint16:long, xuint32:long, xuint64:long, xdate:datetime, xsmalltext:string, xtext:string, xnumberAsText:string, xtime:timespan, xtextWithNulls:string, xdynamicWithNulls:dynamic)";
const mapping = fs.readFileSync(getTestResourcePath("dataset_mapping.json"), {encoding: 'utf8'});
const columnmapping = JSON.parse(mapping);
const ingestionPropertiesWithoutMapping = new IngestionProperties({
database: databaseName,
table: tableName,
format: DataFormat.CSV,
flushImmediately: true
});
const ingestionPropertiesWithMappingReference = new IngestionProperties({
database: databaseName,
table: tableName,
format: DataFormat.JSON,
ingestionMappingReference: mappingName,
flushImmediately: true
});
const ingestionPropertiesWithColumnMapping = new IngestionProperties({
database: databaseName,
table: tableName,
format: DataFormat.JSON,
ingestionMapping: columnmapping,
flushImmediately: true
});
const testItems = [
new testDataItem("csv", getTestResourcePath("dataset.csv"), 10, ingestionPropertiesWithoutMapping),
new testDataItem("csv.gz", getTestResourcePath("dataset_gzip.csv.gz"), 10, ingestionPropertiesWithoutMapping),
new testDataItem("json with mapping ref", getTestResourcePath("dataset.json"), 2, ingestionPropertiesWithMappingReference),
new testDataItem("json.gz with mapping ref", getTestResourcePath("dataset_gzip.json.gz"), 2, ingestionPropertiesWithMappingReference),
new testDataItem("json with mapping", getTestResourcePath("dataset.json"), 2, ingestionPropertiesWithColumnMapping, false),
new testDataItem("json.gz with mapping", getTestResourcePath("dataset_gzip.json.gz"), 2, ingestionPropertiesWithColumnMapping, false)
];
let currentCount = 0;
describe(`E2E Tests - ${tableName}`, function () {
after(async function () {
try {
await queryClient.execute(databaseName, `.drop table ${tableName} ifexists`);
} catch (err) {
assert.fail("Failed to drop table");
}
});
describe('SetUp', function () {
it('Create table', async function () {
try {
await queryClient.execute(databaseName, `.create table ${tableName} ${tableColumns}`);
} catch (err) {
assert.fail("Failed to create table");
}
});
it('Create table ingestion mapping', async function () {
try {
await queryClient.execute(databaseName, `.create-or-alter table ${tableName} ingestion json mapping '${mappingName}' '${mapping}'`);
} catch (err) {
assert.fail("Failed to create table ingestion mapping" + err);
}
});
});
describe('ingestClient', function () {
it('ingestFromFile', async function () {
for (const item of testItems) {
try {
await ingestClient.ingestFromFile(item.path, item.ingestionProperties);
} catch (err) {
console.error(err);
assert.fail(`Failed to ingest ${item.description}`);
}
await assertRowsCount(item);
}
}).timeout(240000);
it('ingestFromStream', async function () {
for (const item of testItems) {
let stream: ReadStream | StreamDescriptor = fs.createReadStream(item.path);
if (item.path.endsWith('gz')) {
stream = new StreamDescriptor(stream, null, CompressionType.GZIP);
}
try {
await ingestClient.ingestFromStream(stream, item.ingestionProperties);
} catch (err) {
assert.fail(`Failed to ingest ${item.description}`);
}
await assertRowsCount(item);
}
}).timeout(240000);
});
describe('StreamingIngestClient', function () {
it('ingestFromFile', async function () {
for (const item of testItems.filter(item => item.testOnstreamingIngestion)) {
try {
await streamingIngestClient.ingestFromFile(item.path, item.ingestionProperties);
} catch (err) {
console.error(err);
assert.fail(`Failed to ingest ${item.description}`);
}
await assertRowsCount(item);
}
}).timeout(240000);
it('ingestFromStream', async function () {
for (const item of testItems.filter(item => item.testOnstreamingIngestion)) {
let stream: ReadStream | StreamDescriptor = fs.createReadStream(item.path);
if (item.path.endsWith('gz')) {
stream = new StreamDescriptor(stream, null, CompressionType.GZIP);
}
try {
await streamingIngestClient.ingestFromStream(stream, item.ingestionProperties);
} catch (err) {
assert.fail(`Failed to ingest ${item.description}`);
}
await assertRowsCount(item);
}
}).timeout(240000);
});
describe('KustoIngestStatusQueues', function () {
it('CleanStatusQueues', async function () {
try {
await cleanStatusQueues();
} catch (err) {
console.error(err);
assert.fail(`Failed to Clean status queues`);
}
}).timeout(240000);
it('CheckSucceededIngestion', async function () {
const item = testItems[0];
item.ingestionProperties.reportLevel = ReportLevel.FailuresAndSuccesses;
try {
await ingestClient.ingestFromFile(item.path, item.ingestionProperties);
const status = await waitForStatus();
assert.equal(status.SuccessCount, 1);
assert.equal(status.FailureCount, 0);
} catch (err) {
console.error(err);
assert.fail(`Failed to ingest ${item.description}`);
}
}).timeout(240000);
it('CheckFailedIngestion', async function () {
const item = testItems[0];
item.ingestionProperties.reportLevel = ReportLevel.FailuresAndSuccesses;
item.ingestionProperties.database = "invalid";
try {
await ingestClient.ingestFromFile(item.path, item.ingestionProperties);
const status = await waitForStatus();
assert.equal(status.SuccessCount, 0);
assert.equal(status.FailureCount, 1);
} catch (err) {
console.error(err);
assert.fail(`Failed to ingest ${item.description}`);
}
}).timeout(240000);
});
describe('QueryClient', function () {
it('General BadRequest', async function () {
try {
const response = await queryClient.executeQuery(databaseName, "invalidSyntax ");
} catch (ex) {
return;
}
assert.fail(`General BadRequest`);
});
it('PartialQueryFailure', async function () {
try {
const response = await queryClient.executeQuery(databaseName, "invalidSyntax ");
} catch (ex) {
return;
}
assert.fail(`Didn't throw PartialQueryFailure`);
});
});
});
async function cleanStatusQueues() {
while (!await statusQueues.failure.isEmpty()) {
await statusQueues.failure.pop();
}
while (!await statusQueues.success.isEmpty()) {
await statusQueues.success.pop();
}
}
async function waitForStatus() {
while (await statusQueues.failure.isEmpty() && await statusQueues.success.isEmpty()) {
await sleep(1000);
}
const failures = await statusQueues.failure.pop();
const successes = await statusQueues.success.pop();
return {"SuccessCount": successes.length, "FailureCount": failures.length}
}
function sleep(ms: number) {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
function getTestResourcePath(name: string) {
return __dirname + `/e2eData/${name}`;
}
async function assertRowsCount(testItem: testDataItem) {
let count = 0;
const expected = testItem.rows;
// Timeout = 3 min
for (let i = 0; i < 18; i++) {
await sleep(10000);
let results;
try {
results = await queryClient.execute(databaseName ?? "", `${tableName} | count `);
} catch (ex) {
continue;
}
count = results.primaryResults[0][0].Count - currentCount;
if (count >= expected) {
break;
}
}
currentCount += count;
assert.equal(count, expected, `Failed to ingest ${testItem.description}`);
}
}
main();

Просмотреть файл

@ -1,89 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
const assert = require("assert");
const KustoIngestClient = require("../source/ingestClient");
const { IngestionProperties , DataFormat } = require("../source/ingestionProperties");
describe("KustoIngestClient", function () {
describe("#constructor()", function () {
it("valid input", function () {
let ingestClient = new KustoIngestClient("https://cluster.kusto.windows.net", {
database: "db",
table: "table",
format: "csv"
});
assert.equal(ingestClient.resourceManager.kustoClient.cluster, "https://cluster.kusto.windows.net");
assert.equal(ingestClient.defaultProps.database, "db");
assert.equal(ingestClient.defaultProps.table, "table");
assert.equal(ingestClient.defaultProps.format, "csv");
});
});
describe("#_resolveProperties()", function () {
it("empty default props", function () {
let newProps = new IngestionProperties({database: "db", table: "table", format: DataFormat.CSV});
// TODO: not sure a unit test will be useful here
let client = new KustoIngestClient('https://cluster.region.kusto.windows.net');
let actual = client._mergeProps(newProps);
assert.equal(actual.database, "db");
assert.equal(actual.table, "table");
assert.equal(actual.format, "csv");
});
it("empty new props", function () {
// TODO: not sure a unit test will be useful here
let defaultProps = new IngestionProperties({database: "db", table: "table", format: DataFormat.CSV});
// TODO: not sure a unit test will be useful here
let client = new KustoIngestClient('https://cluster.region.kusto.windows.net', defaultProps);
let actual = client._mergeProps(null);
assert.equal(actual.database, "db");
assert.equal(actual.table, "table");
assert.equal(actual.format, "csv");
});
it("both exist props", function () {
let defaultProps = new IngestionProperties({database: "db", table: "table", format: DataFormat.CSV});
let newProps = new IngestionProperties({});
newProps.database = "db2";
newProps.ingestionMappingReference = "MappingRef";
let client = new KustoIngestClient('https://cluster.region.kusto.windows.net', defaultProps);
let actual = client._mergeProps(newProps);
assert.equal(actual.database, "db2");
assert.equal(actual.table, "table");
assert.equal(actual.format, "csv");
assert.equal(actual.ingestionMappingReference, "MappingRef");
});
it("empty both", function () {
let client = new KustoIngestClient('https://cluster.region.kusto.windows.net');
let actual = client._mergeProps();
assert.equal(actual, undefined);
});
});
describe("#ingestFromFile()", function () {
it("valid input", function () {
// TODO: not sure a unit test will be useful here
});
});
describe("#ingestFromStream()", function () {
it("valid input", function () {
// TODO: not sure a unit test will be useful here
});
});
describe("#ingestFromBlob()", function () {
it("valid input", function () {
// TODO: not sure a unit test will be useful here
});
});
});

Просмотреть файл

@ -0,0 +1,102 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
import assert from "assert";
import {KustoIngestClient} from "../source/ingestClient";
import {DataFormat, IngestionProperties} from "../source/ingestionProperties";
describe("KustoIngestClient", function () {
describe("#constructor()", function () {
it("valid input", function () {
const ingestClient = new KustoIngestClient("https://cluster.kusto.windows.net", {
database: "db",
table: "table",
format: "csv"
} as IngestionProperties);
assert.notStrictEqual(ingestClient.defaultProps, null);
assert.strictEqual(ingestClient.resourceManager.kustoClient.cluster, "https://cluster.kusto.windows.net");
assert.strictEqual(ingestClient.defaultProps!.database, "db");
assert.strictEqual(ingestClient.defaultProps!.table, "table");
assert.strictEqual(ingestClient.defaultProps!.format, "csv");
});
});
describe("#_resolveProperties()", function () {
it("empty default props", function () {
const newProps = new IngestionProperties({
database: "db",
table: "table",
format: DataFormat.CSV
});
// TODO: not sure a unit test will be useful here
const client = new KustoIngestClient('https://cluster.region.kusto.windows.net');
const actual = client._mergeProps(newProps);
assert.strictEqual(actual.database, "db");
assert.strictEqual(actual.table, "table");
assert.strictEqual(actual.format, "csv");
});
it("empty new props", function () {
// TODO: not sure a unit test will be useful here
const defaultProps = new IngestionProperties({
database: "db",
table: "table",
format: DataFormat.CSV
});
// TODO: not sure a unit test will be useful here
const client = new KustoIngestClient('https://cluster.region.kusto.windows.net', defaultProps);
const actual = client._mergeProps(null);
assert.strictEqual(actual.database, "db");
assert.strictEqual(actual.table, "table");
assert.strictEqual(actual.format, "csv");
});
it("both exist props", function () {
const defaultProps = new IngestionProperties({
database: "db",
table: "table",
format: DataFormat.CSV
});
const newProps = new IngestionProperties({});
newProps.database = "db2";
newProps.ingestionMappingReference = "MappingRef";
const client = new KustoIngestClient('https://cluster.region.kusto.windows.net', defaultProps);
const actual = client._mergeProps(newProps);
assert.strictEqual(actual.database, "db2");
assert.strictEqual(actual.table, "table");
assert.strictEqual(actual.format, "csv");
assert.strictEqual(actual.ingestionMappingReference, "MappingRef");
});
it("empty both", function () {
const client = new KustoIngestClient('https://cluster.region.kusto.windows.net');
const actual = client._mergeProps();
assert.deepStrictEqual(actual, new IngestionProperties({}));
});
});
describe("#ingestFromFile()", function () {
it("valid input", function () {
// TODO: not sure a unit test will be useful here
});
});
describe("#ingestFromStream()", function () {
it("valid input", function () {
// TODO: not sure a unit test will be useful here
});
});
describe("#ingestFromBlob()", function () {
it("valid input", function () {
// TODO: not sure a unit test will be useful here
});
});
});

Просмотреть файл

@ -1,76 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
const assert = require("assert");
const IngestionProperties = require("../source/ingestionProperties").IngestionProperties;
const JsonColumnMapping = require("../source/ingestionProperties").JsonColumnMapping;
const IngestionBlobInfo = require("../source/ingestionBlobInfo");
const { DataFormat } = require("../source/ingestionProperties");
describe("IngestionProperties", function () {
describe("#constructor()", function () {
it("valid input", function () {
let props = new IngestionProperties({database: "db", table: "table", format: DataFormat.CSV});
assert.equal(props.database, "db");
assert.equal(props.table, "table");
assert.equal(props.format, DataFormat.CSV);
});
});
describe("#merge()", function () {
it("valid input", function () {
let props = new IngestionProperties({database: "db", table: "table", format: DataFormat.CSV});
let otherProps = new IngestionProperties({ingestionMappingReference: "CsvMappingRef"});
let merged = props.merge(otherProps);
assert.equal(merged.database, "db");
assert.equal(merged.table, "table");
assert.equal(merged.format, DataFormat.CSV);
assert.equal(merged.ingestionMappingReference, "CsvMappingRef");
});
});
describe("#validate()", function () {
it("valid input", function () {
let props = new IngestionProperties({database: "db", table: "table", format: DataFormat.CSV, ingestionMappingReference: "CsvMappingRef"});
try {
props.validate();
} catch (ex) {
assert.fail(ex);
}
});
it("invalid input", function () {
let props = new IngestionProperties({});
try {
props.validate();
} catch (ex) {
assert.equal(ex.message, "Must define a target database");
}
});
it("invalid input json", function () {
let props = new IngestionProperties({database: "db", table: "table", format: DataFormat.JSON});
try {
props.validate();
} catch (ex) {
assert.equal(ex.message, "Json must have a mapping defined");
}
});
it("json mapping as additional props on ingestion blob info", function () {
let columns = [new JsonColumnMapping('Id', '$.Id', 'int'), new JsonColumnMapping('Value', '$.value', 'dynamic')];
let props = new IngestionProperties({database: "db", table: "table", format: DataFormat.CSV, ingestionMapping: columns});
let ingestionBlobInfo = new IngestionBlobInfo('https://account.blob.core.windows.net/blobcontainer/blobfile.json', props);
assert.deepEqual(JSON.parse(ingestionBlobInfo.AdditionalProperties.ingestionMapping), props.ingestionMapping);
});
});
});

Просмотреть файл

@ -0,0 +1,77 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
import assert from "assert";
import {IngestionProperties, JsonColumnMapping} from "../source/ingestionProperties";
import {IngestionBlobInfo} from "../source/ingestionBlobInfo";
import {BlobDescriptor} from "../source/descriptors";
const { DataFormat } = require("../source/ingestionProperties");
describe("IngestionProperties", function () {
describe("#constructor()", function () {
it("valid input", function () {
const props = new IngestionProperties({database: "db", table: "table", format: DataFormat.CSV});
assert.strictEqual(props.database, "db");
assert.strictEqual(props.table, "table");
assert.strictEqual(props.format, DataFormat.CSV);
});
});
describe("#merge()", function () {
it("valid input", function () {
const props = new IngestionProperties({database: "db", table: "table", format: DataFormat.CSV});
const otherProps = new IngestionProperties({ingestionMappingReference: "CsvMappingRef"});
const merged = props.merge(otherProps);
assert.strictEqual(merged.database, "db");
assert.strictEqual(merged.table, "table");
assert.strictEqual(merged.format, DataFormat.CSV);
assert.strictEqual(merged.ingestionMappingReference, "CsvMappingRef");
});
});
describe("#validate()", function () {
it("valid input", function () {
const props = new IngestionProperties({database: "db", table: "table", format: DataFormat.CSV, ingestionMappingReference: "CsvMappingRef"});
try {
props.validate();
} catch (ex) {
assert.fail(ex);
}
});
it("invalid input", function () {
const props = new IngestionProperties({});
try {
props.validate();
} catch (ex) {
assert.strictEqual(ex.message, "Must define a target database");
}
});
it("invalid input json", function () {
const props = new IngestionProperties({database: "db", table: "table", format: DataFormat.JSON});
try {
props.validate();
} catch (ex) {
assert.strictEqual(ex.message, "Json must have a mapping defined");
}
});
it("json mapping as additional props on ingestion blob info", function () {
const columns = [new JsonColumnMapping('Id', '$.Id', 'int'), new JsonColumnMapping('Value', '$.value', 'dynamic')];
const props = new IngestionProperties({database: "db", table: "table", format: DataFormat.CSV, ingestionMapping: columns});
const ingestionBlobInfo = new IngestionBlobInfo(new BlobDescriptor('https://account.blob.core.windows.net/blobcontainer/blobfile.json'), props);
const reParsed = JSON.parse(JSON.stringify(props.ingestionMapping)); // Stringify and pass to make the object identical to a json one
assert.deepStrictEqual(JSON.parse(ingestionBlobInfo.AdditionalProperties.ingestionMapping), reParsed);
});
});
});

Просмотреть файл

@ -1,15 +1,16 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
const assert = require("assert");
const moment = require("moment");
const sinon = require("sinon");
import assert from "assert";
const KustoClient = require("azure-kusto-data").Client;
import moment from "moment";
const ResourceManager = require("../source/resourceManager").ResourceManager;
const IngestClientResources = require("../source/resourceManager").IngestClientResources;
const ResourceURI = require("../source/resourceManager").ResourceURI;
import sinon from "sinon";
import {Client as KustoClient} from "azure-kusto-data";
import {IngestClientResources, ResourceManager, ResourceURI} from "../source/resourceManager";
import {KustoResponseDataSet} from "azure-kusto-data/source/response";
describe("ResourceURI", function () {
describe("#fromUri()", function () {
@ -19,13 +20,13 @@ describe("ResourceURI", function () {
const objectName = "container";
const sas = "sas";
let uri = `https://${accountName}.${objectType}.core.windows.net/${objectName}?${sas}`;
const uri = `https://${accountName}.${objectType}.core.windows.net/${objectName}?${sas}`;
const storageUrl = ResourceURI.fromURI(uri);
assert.equal(storageUrl.storageAccountName, accountName);
assert.equal(storageUrl.objectType, objectType);
assert.equal(storageUrl.objectName, objectName);
assert.equal(storageUrl.sas, sas);
assert.strictEqual(storageUrl.storageAccountName, accountName);
assert.strictEqual(storageUrl.objectType, objectType);
assert.strictEqual(storageUrl.objectName, objectName);
assert.strictEqual(storageUrl.sas, sas);
});
});
@ -39,7 +40,7 @@ describe("ResourceURI", function () {
const storageUrl = new ResourceURI(accountName, objectType, objectName, sas);
assert.equal(storageUrl.getSASConnectionString(), `BlobEndpoint=https://${accountName}.blob.core.windows.net/;SharedAccessSignature=${sas}`);
assert.strictEqual(storageUrl.getSASConnectionString(), `BlobEndpoint=https://${accountName}.blob.core.windows.net/;SharedAccessSignature=${sas}`);
});
});
});
@ -57,8 +58,8 @@ describe("ResourceManager", function () {
const mockedResourcesResponse = {
primaryResults: [{
rows: function* () {
for (let row of rows) {
*rows () {
for (const row of rows) {
yield row;
}
}
@ -67,38 +68,38 @@ describe("ResourceManager", function () {
describe("#constructor()", function () {
it("valid input", function () {
let resourceManager = new ResourceManager(new KustoClient("https://cluster.kusto.windows.net"));
const resourceManager = new ResourceManager(new KustoClient("https://cluster.kusto.windows.net"));
assert.equal(resourceManager.ingestClientResources, null);
assert.equal(resourceManager.authorizationContext, null);
assert.strictEqual(resourceManager.ingestClientResources, null);
assert.strictEqual(resourceManager.authorizationContext, null);
});
});
describe("#getIngestClientResourcesFromService()", function () {
it("valid input", async function () {
const client = new KustoClient("https://cluster.kusto.windows.net")
sinon.stub(client, "execute").returns(mockedResourcesResponse);
sinon.stub(client, "execute").returns(Promise.resolve(mockedResourcesResponse as KustoResponseDataSet));
let resourceManager = new ResourceManager(client);
const resourceManager = new ResourceManager(client);
const resources = await resourceManager.getIngestClientResourcesFromService();
assert.equal(resources.containers.length, 2);
assert.equal(resources.successfulIngestionsQueues.length, 1);
assert.equal(resources.failedIngestionsQueues.length, 1);
assert.equal(resources.securedReadyForAggregationQueues.length, 2);
assert.strictEqual(resources.containers!!.length, 2);
assert.strictEqual(resources.successfulIngestionsQueues!!.length, 1);
assert.strictEqual(resources.failedIngestionsQueues!!.length, 1);
assert.strictEqual(resources.securedReadyForAggregationQueues!!.length, 2);
});
it("error response", async function () {
const client = new KustoClient("https://cluster.kusto.windows.net");
sinon.stub(client, "execute").throwsException("Kusto request erred (403)");
sinon.stub(client, "execute").throwsException(new Error("Kusto request erred (403)"));
const resourceManager = new ResourceManager(client);
try{
await resourceManager.getIngestClientResourcesFromService();
}
catch(ex){
assert.equal(ex, "Kusto request erred (403)");
assert(ex.message.startsWith( "Kusto request erred (403)"));
return;
}
assert.fail();
@ -107,32 +108,32 @@ describe("ResourceManager", function () {
describe("#getResourceByName()", function () {
it("valid input", function () {
let resourceManager = new ResourceManager(new KustoClient("https://cluster.kusto.windows.net"));
const resourceManager = new ResourceManager(new KustoClient("https://cluster.kusto.windows.net"));
let resources = resourceManager.getResourceByName(mockedResourcesResponse.primaryResults[0], "TempStorage");
assert.equal(resources.length, 2);
const resources = resourceManager.getResourceByName(mockedResourcesResponse.primaryResults[0], "TempStorage");
assert.strictEqual(resources.length, 2);
});
});
describe("#refreshIngestClientResources()", function () {
it("should refresh", async function () {
let resourceManager = new ResourceManager(new KustoClient("https://cluster.kusto.windows.net"));
const resourceManager = new ResourceManager(new KustoClient("https://cluster.kusto.windows.net"));
let call = sinon.stub(resourceManager, "getIngestClientResourcesFromService");
const call = sinon.stub(resourceManager, "getIngestClientResourcesFromService");
await resourceManager.refreshIngestClientResources();
assert.equal(call.calledOnce, true);
assert.strictEqual(call.calledOnce, true);
});
it("shouldn't refresh", async function () {
let resourceManager = new ResourceManager(new KustoClient("https://cluster.kusto.windows.net"));
const resourceManager = new ResourceManager(new KustoClient("https://cluster.kusto.windows.net"));
let call = sinon.stub(resourceManager, "getIngestClientResourcesFromService");
resourceManager.ingestClientResourcesLastUpdate = moment.now();
resourceManager.ingestClientResources = new IngestClientResources({}, {}, {}, {});
const call = sinon.stub(resourceManager, "getIngestClientResourcesFromService");
resourceManager.ingestClientResourcesLastUpdate = moment();
resourceManager.ingestClientResources = new IngestClientResources([], [], [], []);
await resourceManager.refreshIngestClientResources();
assert.equal(call.calledOnce, false);
assert.strictEqual(call.calledOnce, false);
});
});
});

Просмотреть файл

@ -0,0 +1,14 @@
{
"compilerOptions": {
"target": "es6",
"module": "commonjs",
"esModuleInterop": true,
"strict": true,
"incremental": true,
"sourceMap": true,
"resolveJsonModule": true,
"allowJs": false,
"declaration": true
},
"include": ["source/**/*.ts", "test/**/*.ts", "index.ts"]
}

Просмотреть файл

@ -0,0 +1,18 @@
{
"defaultSeverity": "error",
"extends": [
"tslint:recommended"
],
"jsRules": {},
"rules": {
"max-classes-per-file": false,
"triple-equals": false,
"only-arrow-functions": false
},
"linterOptions": {
"exclude": [
"./test/**/*"
]
},
"rulesDirectory": []
}