This commit is contained in:
Amit Shalev 2020-09-08 13:38:34 +03:00 коммит произвёл GitHub
Родитель 513bb6cf42
Коммит a7c3875afc
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
2 изменённых файлов: 57 добавлений и 91 удалений

Просмотреть файл

@ -13,11 +13,9 @@ const KustoConnectionStringBuilder = require("azure-kusto-data").KustoConnection
const kcsb = KustoConnectionStringBuilder.withAadApplicationKeyAuthentication(`https://${clusterName}.kusto.windows.net`,'appid','appkey','authorityId');
const client = new KustoClient(kcsb);
client.execute("db", "TableName | limit 1", (err, results) => {
if (err) throw new Error(err);
console.log(JSON.stringify(results));
console.log(results.primaryResults[0].toString());
});
const results = await client.execute("db", "TableName | limit 1");
console.log(JSON.stringify(results));
console.log(results.primaryResults[0].toString());
```
@ -93,7 +91,7 @@ T | where amountColumn == amount
const clientRequestProps = new ClientRequestProperties();
clientRequestProps.setOption("servertimeout", 1000 * 60);
clientRequestProps.setParameter("amount", 100);
client.executeQuery("db", query, (err, results) => { console.log(results); }, clientRequestProps);
const results = await client.executeQuery("db", query, clientRequestProps);
```
A full list of those properties can be found at https://docs.microsoft.com/en-us/azure/kusto/api/netfx/request-properties

Просмотреть файл

@ -34,14 +34,18 @@ const ingestClient = new IngestClient(
console.log("Ingest from file");
ingestClient.ingestFromFile("file.json", null, (err) => {
if (err) {
Ingest();
async function Ingest() {
try{
await ingestClient.ingestFromFile("file.json", null);
}
catch(err){
console.log(err);
}
console.log("Ingestion done");
});
console.log("Wait for ingestion status...");
await waitForStatus();
}
```
## Authentication
@ -125,11 +129,13 @@ There are several methods of ingesting data into Kusto (Azure Data Explorer) usi
This is useful for cases you already have streams available (http respinse, file stream, etc..)
```javascript
ingestClient.ingestFromStream(readable, null, (err) => {
if (err) console.log(err);
else console.log("Ingestion from stream DONE");
});
```
try{
await ingestClient.ingestFromStream(readable, null);
}
catch(err){
console.log(err);
}
console.log("Ingestion from stream DONE");
#### From File
@ -137,17 +143,13 @@ ingestClient.ingestFromStream(readable, null, (err) => {
Ingesting a file first makes sure it's zipped (if not, it zips it locally) and then send it for ingestion
```javascript
ingestClient.ingestFromFile(filePath, null, (err) => {
if (err) {
console.log(err);
}
console.log("Ingestion from file DONE");
setTimeout(waitForFailures, 0);
setTimeout(waitForSuccess, 0);
});
let blob = new BlobDescriptor(blobUri, size);
try{
await ingestClient.ingestFromFile("file.json", null);
}
catch(err){
console.log(err);
}
```
#### From Azure Storage Blob
@ -157,17 +159,12 @@ Probably the easiest way would be to provide a uri (with [SAS](https://docs.micr
```javascript
let blob = new BlobDescriptor(blobUri, size);
ingestClient.ingestFromBlob(blob, null, (err) => {
if (err) {
console.log(err);
}
console.log("Ingestion from file DONE");
setTimeout(waitForFailures, 0);
setTimeout(waitForSuccess, 0);
});
try{
await ingestClient.ingestFromBlob(blob, null);
}
catch(err){
console.log(err);
}
```
### Ingestion Status
@ -211,59 +208,30 @@ const ingestClient = new IngestClient(
const statusQueues = new IngestStatusQueues(ingestClient);
function waitForFailures() {
statusQueues.failure.isEmpty((err, empty) => {
if (err) throw new Error(err);
if (empty) {
console.log("no errors...");
return setTimeout(waitForFailures, 1000);
}
else {
statusQueues.failure.pop((err, failures) => {
if (err) throw new Error(err);
for (let failure of failures) {
console.log(JSON.stringify(failure));
}
return setTimeout(waitForFailures, 1000);
});
}
});
}
function waitForSuccess() {
statusQueues.success.isEmpty((err, empty) => {
if (err) throw new Error(err);
if (empty) {
console.log("no successes...");
return setTimeout(waitForSuccess, 1000);
}
else {
statusQueues.success.pop((err, successes) => {
if (err) throw new Error(err);
for (let success of successes) {
console.log(JSON.stringify(success));
}
return setTimeout(waitForSuccess, 1000);
})
}
});
}
ingestClient.ingestFromFile("file.json", null, (err) => {
if (err) {
console.log(err);
async function waitForStatus() {
while (await statusQueues.failure.isEmpty() && await statusQueues.success.isEmpty()) {
await new Promise((resolve) => { setTimeout(resolve, 1000); });
}
console.log("Ingestion done?");
const successes = statusQueues.success.pop();
for (let success of successes) {
console.log(JSON.stringify(success));
}
const failures = statusQueues.failure.pop()
for (let failure of failures) {
console.log(JSON.stringify(failure));
}
}
async function ingestFromFile() {
try{
await ingestClient.ingestFromFile("file.json", null);
}
catch(err){
console.log(err);
}
console.log("Wait for ingestion status...");
await waitForStatus();
}
setTimeout(waitForFailures, 0);
setTimeout(waitForSuccess, 0);
});
```