* Initial commit.
The intent is to review this in its current feature-complete state, so I can also commit the equivalent changes in the Python SDK.
Then, in the next PR (or before merging this PR), I will make some additional improvements as discussed, including break main into individual methods and refactor this "query-ingest-query" approach into an additional "query-only" mode.

* Remove comment

* Optimize if/else

* Minor verbiage changes

* Minor verbiage changes

* Minor verbiage changes

* Update README.md

Co-authored-by: Yochai Gilad <yogilad@microsoft.com>

* - Break main up into methods
- Consider new parameters
- Few code review changes, like consistent periods at end of sentences and filename consistency with Python

* Only alter batching policy if !useExistingTable

* - Pass filesize of 0 for autodetection
- Do not create a File object to get its path; just pass along the provided path
- Move alterBatchingPolicy to within createNewTable, and wrap it in if(false) so it won't run
- Add comment explaining that we generally wouldn't print the output of a control command

* - Improve README text
- Change .alter command to .alter-merge command

* - Change filename kusto-sample-config.json to kusto_sample_config.json to be consistent with Python

* - Fix reference to Python
- Aad a comment for AppCertificate auth

Co-authored-by: Yochai Gilad <yogilad@microsoft.com>
This commit is contained in:
Yihezkel Schoenbrun 2021-12-05 10:39:22 +02:00 коммит произвёл GitHub
Родитель d43492caff
Коммит 438ec1397f
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
11 изменённых файлов: 695 добавлений и 21 удалений

Просмотреть файл

@ -9,9 +9,10 @@ master: [![Build status](https://msazure.visualstudio.com/One/_apis/build/status
This is the Microsoft Azure Kusto client library which allows communication with Kusto to bring data in (ingest) and query information already stored in the database.
This library contains 3 different modules:
- data: the main client that allows interaction with Kusto. It's able to create a connection, issue (control) commands and query data.
- ingest: this provides an easy way to bring data into Kusto
- samples
- data: The main client that allows interaction with Kusto, including creating a connection, issuing (control) commands and querying data
- ingest: Provides an easy way to bring data into Kusto
- quickstart: Self-contained, configurable and runnable sample app for easily getting started with the SDK
- samples: Sample code implementing various scenarios
# Install

Просмотреть файл

@ -292,23 +292,39 @@ public class IngestionProperties {
}
public enum DataFormat {
csv,
tsv,
scsv,
sohsv,
psv,
txt,
tsve,
json,
singlejson,
multijson,
avro,
apacheavro,
parquet,
sstream,
orc,
raw,
w3clogfile
csv(IngestionMapping.IngestionMappingKind.Csv, false),
tsv(IngestionMapping.IngestionMappingKind.Csv, false),
scsv(IngestionMapping.IngestionMappingKind.Csv, false),
sohsv(IngestionMapping.IngestionMappingKind.Csv, false),
psv(IngestionMapping.IngestionMappingKind.Csv, false),
txt(IngestionMapping.IngestionMappingKind.unknown, false),
tsve(IngestionMapping.IngestionMappingKind.Csv, false),
json(IngestionMapping.IngestionMappingKind.Json, true),
singlejson(IngestionMapping.IngestionMappingKind.Json, true),
multijson(IngestionMapping.IngestionMappingKind.Json, true),
avro(IngestionMapping.IngestionMappingKind.Avro, true),
apacheavro(IngestionMapping.IngestionMappingKind.ApacheAvro, false),
parquet(IngestionMapping.IngestionMappingKind.Parquet, false),
sstream(IngestionMapping.IngestionMappingKind.SStream, false),
orc(IngestionMapping.IngestionMappingKind.Orc, false),
raw(IngestionMapping.IngestionMappingKind.unknown, false),
w3clogfile(IngestionMapping.IngestionMappingKind.W3CLogFile, false);
private final IngestionMapping.IngestionMappingKind ingestionMappingKind;
private final boolean mappingRequired;
DataFormat(IngestionMapping.IngestionMappingKind ingestionMappingKind, boolean mappingRequired) {
this.ingestionMappingKind = ingestionMappingKind;
this.mappingRequired = mappingRequired;
}
public IngestionMapping.IngestionMappingKind getIngestionMappingKind() {
return ingestionMappingKind;
}
public boolean isMappingRequired() {
return mappingRequired;
}
}
public enum IngestionReportLevel {

Просмотреть файл

@ -14,7 +14,7 @@ import java.security.interfaces.RSAPublicKey;
import java.security.spec.PKCS8EncodedKeySpec;
import java.security.spec.X509EncodedKeySpec;
class SecurityUtils {
public class SecurityUtils {
private SecurityUtils() {
// Hide constructor as this is a Utils class
}

Просмотреть файл

@ -64,6 +64,7 @@
<maven-javadoc-plugin.version>3.2.0</maven-javadoc-plugin.version>
<maven-source-plugin.version>3.2.1</maven-source-plugin.version>
<maven-dependency-plugin.version>3.1.2</maven-dependency-plugin.version>
<maven-jar-plugin.version>3.2.0</maven-jar-plugin.version>
<flatten-maven-plugin.version>1.2.5</flatten-maven-plugin.version>
<junit.version>5.7.1</junit.version>
<mockito.version>3.7.7</mockito.version>
@ -73,6 +74,7 @@
<module>ingest</module>
<module>data</module>
<module>samples</module>
<module>quickstart</module>
</modules>
<build>

39
quickstart/README.md Normal file
Просмотреть файл

@ -0,0 +1,39 @@
# Quick Start App
The quick start application is a **self-contained and runnable** example app that demonstrates authenticating, connecting to, administering, ingesting data into and querying Azure Data Explorer using the azure-kusto-java SDK.
You can use it as a baseline to write your own first kusto client application, altering the code as you go, or copy code sections out of it into your app.
**Tip:** The app includes comments with tips on recommendations, coding best practices, links to reference materials and recommended TODO changes when adapting the code to your needs.
## Using the App for the first time
### Prerequisites
1. Set up Java on your machine. For instructions, consult a Java environment setup tutorial, like [this one](https://www.tutorialspoint.com/java/java_environment_setup.htm).
2. Set up [Apache Maven](https://maven.apache.org/install.html), which is the most popular Java dependency management tool.
### Retrieving the app from GitHub
1. Download the app files from this GitHub repo
2. Modify `kusto_sample_config.json`, changing `KustoUri`, `IngestUri` and `DatabaseName` appropriately for your ADX cluster
### Retrieving the app from OneClick
1. Open a browser and type your cluster's URL (e.g. https://myadxcluster.westeurope.kusto.windows.net/), and you will be redirected to the _Azure Data Explorer_ website
2. Open the left-side pane via the hamburger menu, if it isn't already open
3. On the left-side pane, choose _Data_
4. Click on _Generate Sample App Code_ button
5. Follow the wizard
6. Download the app as a zip file
7. Extract the app source code to your folder of choice
8. The configuration parameters defined in `kusto_sample_config.json` are already defined appropriately for your ADX cluster, which you can verify
### Running the app
1. Open a command line window to the folder extracted to above
2. Run `mvn clean install` to compile the source code into a binary
3. Run the binary using `java -jar target\kusto-quickstart-[version]-jar-with-dependencies.jar`
### Optional Changes
1. Within the app itself, you may alter the default User-Prompt authentication method by editing `authenticationMode`
2. You can also make the app run without stopping between steps by setting `waitForUser = False`
###Troubleshooting
* If you are having trouble running the app from your IDE, first check if the app runs from command line, then consult the troubleshooting references of your IDE.

10
quickstart/dataset.csv Normal file
Просмотреть файл

@ -0,0 +1,10 @@
0,00000000-0000-0000-0001-020304050607,0,0,0,0,0,0,0,0,0,0,2014-01-01T01:01:01.0000000Z,Zero,Zero,0,00:00:00,,null
1,00000001-0000-0000-0001-020304050607,1.0001,1.01,1,1,1,1,1,1,1,1,2015-01-01T01:01:01.0000000Z,One,One,1,00:01.0,,"{""rowId"": 1, ""arr"": [0,1]}"
2,00000002-0000-0000-0001-020304050607,2.0002,2.02,0,2,2,2,2,2,2,2,2016-01-01T01:01:01.0000000Z,Two,Two,2,-00:00:02.0020002,,"{""rowId"": 2, ""arr"": [0,2]}"
3,00000003-0000-0000-0001-020304050607,3.0003,3.03,1,3,3,3,3,3,3,3,2017-01-01T01:01:01.0000000Z,Three,Three,3,00:03.0,,"{""rowId"": 3, ""arr"": [0,3]}"
4,00000004-0000-0000-0001-020304050607,4.0004,4.04,0,4,4,4,4,4,4,4,2018-01-01T01:01:01.0000000Z,Four,Four,4,-00:00:04.0040004,,"{""rowId"": 4, ""arr"": [0,4]}"
5,00000005-0000-0000-0001-020304050607,5.0005,5.05,1,5,5,5,5,5,5,5,2019-01-01T01:01:01.0000000Z,Five,Five,5,00:05.0,,"{""rowId"": 5, ""arr"": [0,5]}"
6,00000006-0000-0000-0001-020304050607,6.0006,6.06,0,6,6,6,6,6,6,6,2020-01-01T01:01:01.0000000Z,Six,Six,6,-00:00:06.0060006,,"{""rowId"": 6, ""arr"": [0,6]}"
7,00000007-0000-0000-0001-020304050607,7.0007,7.07,1,7,7,7,7,7,7,7,2021-01-01T01:01:01.0000000Z,Seven,Seven,7,00:07.0,,"{""rowId"": 7, ""arr"": [0,7]}"
8,00000008-0000-0000-0001-020304050607,8.0008,8.08,0,8,8,8,8,8,8,8,2022-01-01T01:01:01.0000000Z,Eight,Eight,8,-00:00:08.0080008,,"{""rowId"": 8, ""arr"": [0,8]}"
9,00000009-0000-0000-0001-020304050607,9.0009,9.09,1,9,9,9,9,9,9,9,2023-01-01T01:01:01.0000000Z,Nine,Nine,9,00:09.0,,"{""rowId"": 9, ""arr"": [0,9]}"
1 0 00000000-0000-0000-0001-020304050607 0 0 0 0 0 0 0 0 0 0 2014-01-01T01:01:01.0000000Z Zero Zero 0 00:00:00 null
2 1 00000001-0000-0000-0001-020304050607 1.0001 1.01 1 1 1 1 1 1 1 1 2015-01-01T01:01:01.0000000Z One One 1 00:01.0 {"rowId": 1, "arr": [0,1]}
3 2 00000002-0000-0000-0001-020304050607 2.0002 2.02 0 2 2 2 2 2 2 2 2016-01-01T01:01:01.0000000Z Two Two 2 -00:00:02.0020002 {"rowId": 2, "arr": [0,2]}
4 3 00000003-0000-0000-0001-020304050607 3.0003 3.03 1 3 3 3 3 3 3 3 2017-01-01T01:01:01.0000000Z Three Three 3 00:03.0 {"rowId": 3, "arr": [0,3]}
5 4 00000004-0000-0000-0001-020304050607 4.0004 4.04 0 4 4 4 4 4 4 4 2018-01-01T01:01:01.0000000Z Four Four 4 -00:00:04.0040004 {"rowId": 4, "arr": [0,4]}
6 5 00000005-0000-0000-0001-020304050607 5.0005 5.05 1 5 5 5 5 5 5 5 2019-01-01T01:01:01.0000000Z Five Five 5 00:05.0 {"rowId": 5, "arr": [0,5]}
7 6 00000006-0000-0000-0001-020304050607 6.0006 6.06 0 6 6 6 6 6 6 6 2020-01-01T01:01:01.0000000Z Six Six 6 -00:00:06.0060006 {"rowId": 6, "arr": [0,6]}
8 7 00000007-0000-0000-0001-020304050607 7.0007 7.07 1 7 7 7 7 7 7 7 2021-01-01T01:01:01.0000000Z Seven Seven 7 00:07.0 {"rowId": 7, "arr": [0,7]}
9 8 00000008-0000-0000-0001-020304050607 8.0008 8.08 0 8 8 8 8 8 8 8 2022-01-01T01:01:01.0000000Z Eight Eight 8 -00:00:08.0080008 {"rowId": 8, "arr": [0,8]}
10 9 00000009-0000-0000-0001-020304050607 9.0009 9.09 1 9 9 9 9 9 9 9 2023-01-01T01:01:01.0000000Z Nine Nine 9 00:09.0 {"rowId": 9, "arr": [0,9]}

2
quickstart/dataset.json Normal file
Просмотреть файл

@ -0,0 +1,2 @@
{"rownumber": 0, "rowguid": "00000000-0000-0000-0001-020304050607", "xdouble": 0.0, "xfloat": 0.0, "xbool": 0, "xint16": 0, "xint32": 0, "xint64": 0, "xunit8": 0, "xuint16": 0, "xunit32": 0, "xunit64": 0, "xdate": "2014-01-01T01:01:01Z", "xsmalltext": "Zero", "xtext": "Zero", "xnumberAsText": "0", "xtime": "00:00:00", "xtextWithNulls": null, "xdynamicWithNulls": ""}
{"rownumber": 1, "rowguid": "00000001-0000-0000-0001-020304050607", "xdouble": 1.00001, "xfloat": 1.01, "xbool": 1, "xint16": 1, "xint32": 1, "xint64": 1, "xuint8": 1, "xuint16": 1, "xuint32": 1, "xuint64": 1, "xdate": "2015-01-01T01:01:01Z", "xsmalltext": "One", "xtext": "One", "xnumberAsText": "1", "xtime": "00:00:01.0010001", "xtextWithNulls": null, "xdynamicWithNulls": "{\"rowId\":1,\"arr\":[0,1]}"}

Просмотреть файл

@ -0,0 +1,30 @@
{
"kustoUri" : "https://sdkse2etest.eastus.kusto.windows.net",
"ingestUri" : "https://ingest-sdkse2etest.eastus.kusto.windows.net",
"databaseName" : "e2e",
"tableName" : "SampleTable",
"useExistingTable": "False",
"alterTable": "True",
"queryData": "True",
"ingestData": "True",
"tableSchema" : "(rownumber:int, rowguid:string, xdouble:real, xfloat:real, xbool:bool, xint16:int, xint32:int, xint64:long, xuint8:long, xuint16:long, xuint32:long, xuint64:long, xdate:datetime, xsmalltext:string, xtext:string, xnumberAsText:string, xtime:timespan, xtextWithNulls:string, xdynamicWithNulls:dynamic)",
"data" :
[
{
"sourceType": "localFileSource",
"dataSourceUri": "dataset.csv",
"format": "CSV",
"useExistingMapping": "True",
"mappingName": "",
"mappingValue": ""
},
{
"sourceType": "localFileSource",
"dataSourceUri": "dataset.json",
"format": "MULTIJSON",
"useExistingMapping": "False",
"mappingName": "SampleTableMapping",
"mappingValue": "[{\"Properties\":{\"Path\":\"$.rownumber\"},\"column\":\"rownumber\",\"datatype\":\"int\"}, {\"Properties\":{\"Path\":\"$.rowguid\"},\"column\":\"rowguid\",\"datatype\":\"string\"}, {\"Properties\":{\"Path\":\"$.xdouble\"},\"column\":\"xdouble\",\"datatype\":\"real\"}, {\"Properties\":{\"Path\":\"$.xfloat\"},\"column\":\"xfloat\",\"datatype\":\"real\"}, {\"Properties\":{\"Path\":\"$.xbool\"},\"column\":\"xbool\",\"datatype\":\"bool\"}, {\"Properties\":{\"Path\":\"$.xint16\"},\"column\":\"xint16\",\"datatype\":\"int\"}, {\"Properties\":{\"Path\":\"$.xint32\"},\"column\":\"xint32\",\"datatype\":\"int\"}, {\"Properties\":{\"Path\":\"$.xint64\"},\"column\":\"xint64\",\"datatype\":\"long\"}, {\"Properties\":{\"Path\":\"$.xuint8\"},\"column\":\"xuint8\",\"datatype\":\"long\"}, {\"Properties\":{\"Path\":\"$.xuint16\"},\"column\":\"xuint16\",\"datatype\":\"long\"}, {\"Properties\":{\"Path\":\"$.xuint32\"},\"column\":\"xuint32\",\"datatype\":\"long\"}, {\"Properties\":{\"Path\":\"$.xuint64\"},\"column\":\"xuint64\",\"datatype\":\"long\"}, {\"Properties\":{\"Path\":\"$.xdate\"},\"column\":\"xdate\",\"datatype\":\"datetime\"}, {\"Properties\":{\"Path\":\"$.xsmalltext\"},\"column\":\"xsmalltext\",\"datatype\":\"string\"}, {\"Properties\":{\"Path\":\"$.xtext\"},\"column\":\"xtext\",\"datatype\":\"string\"}, {\"Properties\":{\"Path\":\"$.rowguid\"},\"column\":\"xnumberAsText\",\"datatype\":\"string\"}, {\"Properties\":{\"Path\":\"$.xtime\"},\"column\":\"xtime\",\"datatype\":\"timespan\"}, {\"Properties\":{\"Path\":\"$.xtextWithNulls\"},\"column\":\"xtextWithNulls\",\"datatype\":\"string\"}, {\"Properties\":{\"Path\":\"$.xdynamicWithNulls\"},\"column\":\"xdynamicWithNulls\",\"datatype\":\"dynamic\"}]"
}
]
}

Просмотреть файл

@ -0,0 +1,14 @@
### Prerequisites
1. Set up Java on your machine. For instructions, consult a Java environment setup tutorial, like [this one](https://www.tutorialspoint.com/java/java_environment_setup.htm).
2. Set up [Apache Maven](https://maven.apache.org/install.html), which is the most popular Java dependency management tool.
### Instructions
1. Download the **DOWNLOAD_LINK** as a zip file
2. Extract the app source code to your folder of choice
3. Open a command line window to the folder extracted to above
4. Run `mvn clean install` to compile the source code into a binary
5. Run the binary using `java -jar target\kusto-quickstart-[version]-jar-with-dependencies.jar`
### Troubleshooting
* If you are having trouble running the script from your IDE, first check if the script runs from command line, then consult the troubleshooting references of your IDE.

94
quickstart/pom.xml Normal file
Просмотреть файл

@ -0,0 +1,94 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>kusto-quickstart</artifactId>
<parent>
<artifactId>kusto-client</artifactId>
<groupId>com.microsoft.azure.kusto</groupId>
<version>${revision}</version>
</parent>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>${maven-jar-plugin.version}</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>com.microsoft.azure.kusto.quickstart.KustoSampleApp</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>${maven-assembly-plugin.version}</version>
<configuration>
<archive>
<manifest>
<mainClass>com.microsoft.azure.kusto.quickstart.KustoSampleApp</mainClass>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- bind to the packaging phase -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>kusto-data</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>kusto-ingest</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>${json.version}</version>
</dependency>
</dependencies>
</project>

Просмотреть файл

@ -0,0 +1,466 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package com.microsoft.azure.kusto.quickstart;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.microsoft.azure.kusto.data.*;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.ingest.*;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.GeneralSecurityException;
import java.security.PrivateKey;
import java.security.cert.X509Certificate;
import java.util.*;
import java.util.concurrent.TimeUnit;
import static com.microsoft.azure.kusto.data.ClientRequestProperties.OPTION_SERVER_TIMEOUT;
public class KustoSampleApp {
// TODO - Config:
// If this quickstart app was downloaded from OneClick, kusto_sample_config.json should be pre-populated with your cluster's details
// If this quickstart app was downloaded from GitHub, edit kusto_sample_config.json and modify the cluster URL and database fields appropriately
private static final String CONFIG_FILE_NAME = "kusto_sample_config.json";
// TODO - Config (Optional): Change the authentication method from User Prompt to any of the other options
// Some of the auth modes require additional environment variables to be set in order to work (see usage in generateConnectionString below)
// Managed Identity Authentication only works when running as an Azure service (webapp, function, etc.)
private static final String AUTHENTICATION_MODE = "UserPrompt"; // Options: (UserPrompt|ManagedIdentity|AppKey|AppCertificate|DeviceCode)
// TODO - Config (Optional): Toggle to false to execute this script "unattended"
private static final boolean WAIT_FOR_USER = true;
private static String databaseName;
private static String tableName;
private static String kustoUrl;
private static String ingestUrl;
private static boolean useExistingTable;
private static boolean alterTable;
private static boolean queryData;
private static boolean ingestData;
private static List<Map<String, String>> dataToIngest;
private static String tableSchema;
private static int step = 1;
private static final String BATCHING_POLICY = "{ \"MaximumBatchingTimeSpan\": \"00:00:10\", \"MaximumNumberOfItems\": 500, \"MaximumRawDataSizeMB\": 1024 }";
private static final int WAIT_FOR_INGEST_SECONDS = 20;
public static void main(String[] args) throws InterruptedException, IOException {
System.out.println("Kusto sample app is starting...");
loadConfigs(KustoSampleApp.CONFIG_FILE_NAME);
if (AUTHENTICATION_MODE.equals("UserPrompt")) {
waitForUserToProceed("You may be prompted more than once for credentials during this script. Please return to the console after authenticating.");
}
try (IngestClient ingestClient = IngestClientFactory.createClient(generateConnectionString(ingestUrl, AUTHENTICATION_MODE))) {
// Tip: Avoid creating a new Kusto/ingest Client for each use. Instead, create the clients once and reuse them.
Client kustoClient = new ClientImpl(generateConnectionString(kustoUrl, AUTHENTICATION_MODE));
if (useExistingTable) {
if (alterTable) {
// Tip: Usually table was originally created with a schema appropriate for the data being ingested, so this wouldn't be needed.
// Learn More: For more information about altering table schemas, see: https://docs.microsoft.com/en-us/azure/data-explorer/kusto/management/alter-table-command
alterMergeExistingTableToProvidedSchema(kustoClient, databaseName, tableName, tableSchema);
}
if (queryData) {
// Learn More: For more information about Kusto Query Language (KQL), see: https://docs.microsoft.com/azure/data-explorer/write-queries
queryExistingNumberOfRows(kustoClient, databaseName, tableName);
}
} else {
// Tip: This is generally a one-time configuration.
// Learn More: For more information about creating tables, see: https://docs.microsoft.com/azure/data-explorer/one-click-table
createNewTable(kustoClient, databaseName, tableName, tableSchema);
}
if (ingestData) {
for (Map<String, String> file : dataToIngest) {
IngestionProperties.DataFormat dataFormat = IngestionProperties.DataFormat.valueOf(file.get("format").toLowerCase());
String mappingName = file.get("mappingName");
// Tip: This is generally a one-time configuration.
// Learn More: For more information about providing inline mappings and mapping references, see: https://docs.microsoft.com/azure/data-explorer/kusto/management/mappings
if (!createIngestionMappings(Boolean.parseBoolean(file.get("useExistingMapping")), kustoClient, databaseName, mappingName, file.get("mappingValue"), dataFormat)) {
continue;
}
// Learn More: For more information about ingesting data to Kusto in Java, see: https://docs.microsoft.com/azure/data-explorer/java-ingest-data
ingestData(file, dataFormat, ingestClient, databaseName, tableName, mappingName);
}
waitForIngestionToComplete();
}
if (queryData) {
executeValidationQueries(kustoClient, databaseName, tableName, ingestData);
}
} catch (URISyntaxException e) {
die("Couldn't create Kusto client", e);
}
}
private static void waitForUserToProceed(String upcomingOperation) {
System.out.println();
System.out.printf("Step %s: %s%n", step++, upcomingOperation);
if (WAIT_FOR_USER) {
System.out.println("Press ENTER to proceed with this operation...");
Scanner scanner = new Scanner(System.in);
scanner.nextLine();
}
}
private static void die(String error) {
die(error, null);
}
private static void die(String error, Exception ex) {
System.out.println("Script failed with error: " + error);
if (ex != null) {
System.out.println("Exception:");
ex.printStackTrace();
}
System.exit(-1);
}
private static void loadConfigs(String configFileName) {
File configFile = new File(".\\" + configFileName);
Map<String, Object> configs = null;
try {
configs = new ObjectMapper().readValue(configFile, HashMap.class);
// Required configs
useExistingTable = Boolean.parseBoolean((String) configs.get("useExistingTable"));
databaseName = (String) configs.get("databaseName");
tableName = (String) configs.get("tableName");
tableSchema = (String) configs.get("tableSchema");
kustoUrl = (String) configs.get("kustoUri");
ingestUrl = (String) configs.get("ingestUri");
dataToIngest = (List<Map<String, String>>) configs.get("data");
alterTable = Boolean.parseBoolean((String) configs.get("alterTable"));
queryData = Boolean.parseBoolean((String) configs.get("queryData"));
ingestData = Boolean.parseBoolean((String) configs.get("ingestData"));
if (StringUtils.isBlank(databaseName) || StringUtils.isBlank(tableName) || StringUtils.isBlank(tableSchema)
|| StringUtils.isBlank(kustoUrl) || StringUtils.isBlank(ingestUrl)
|| StringUtils.isBlank((String) configs.get("useExistingTable")) || dataToIngest.isEmpty()) {
die(String.format("%s is missing required fields", CONFIG_FILE_NAME));
}
} catch (IOException e) {
die("Couldn't read config file", e);
}
}
private static ConnectionStringBuilder generateConnectionString(String endpointUrl, String authenticationMode) {
ConnectionStringBuilder csb = null;
switch (authenticationMode) {
case "UserPrompt":
csb = ConnectionStringBuilder.createWithUserPrompt(endpointUrl);
break;
case "ManagedIdentity":
// Connect using the system or user provided managed identity (Azure service only)
// TODO - Config (Optional): Managed identity client id if you are using a User Assigned Managed Id
String clientId = System.getenv("MANAGED_IDENTITY_CLIENT_ID");
if (StringUtils.isBlank(clientId)) {
csb = ConnectionStringBuilder.createWithAadManagedIdentity(endpointUrl);
} else {
csb = ConnectionStringBuilder.createWithAadManagedIdentity(endpointUrl, clientId);
}
break;
case "AppKey":
// TODO - Config (Optional): App Id & tenant, and App Key to authenticate with
// For information about how to procure an AAD Application see: https://docs.microsoft.com/azure/data-explorer/provision-azure-ad-app
csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpointUrl, System.getenv("APP_ID"), System.getenv("APP_KEY"), System.getenv("APP_TENANT"));
break;
case "AppCertificate":
// TODO - Config (Optional): App Id & tenant, path to public certificate and path to private certificate pem file to authenticate with
csb = createAppCertificateStringBuilder(endpointUrl);
break;
case "DeviceCode":
csb = ConnectionStringBuilder.createWithDeviceCode(endpointUrl);
break;
default:
die(String.format("Authentication mode %s is not supported", authenticationMode));
}
csb.setApplicationNameForTracing("KustoJavaSdkQuickStart");
return csb;
}
private static ConnectionStringBuilder createAppCertificateStringBuilder(String endpointUrl) {
String appId = System.getenv("APP_ID");
String tenantId = System.getenv("APP_TENANT");
String privateKeyFilePath = System.getenv("PRIVATE_KEY_PEM_FILE_PATH");
String publicCertFilePath = System.getenv("PUBLIC_CERT_FILE_PATH");
try {
PrivateKey privateKey = SecurityUtils.getPrivateKey(publicCertFilePath);
X509Certificate x509Certificate = SecurityUtils.getPublicCertificate(privateKeyFilePath);
return ConnectionStringBuilder.createWithAadApplicationCertificate(endpointUrl, appId, x509Certificate, privateKey, tenantId);
} catch (IOException | GeneralSecurityException e) {
die("Couldn't create ConnectionStringBuilder for app certificate authentication", e);
return null;
}
}
private static void createNewTable(Client kustoClient, String databaseName, String tableName, String tableSchema) {
waitForUserToProceed(String.format("Create table '%s.%s'", databaseName, tableName));
String command = String.format(".create table %s %s", tableName, tableSchema);
if (!executeControlCommand(kustoClient, databaseName, command)) {
die("Failed to create table or validate it exists");
}
/*
Learn More:
Kusto batches data for ingestion efficiency. The default batching policy ingests data when one of the following conditions are met:
1) More than 1,000 files were queued for ingestion for the same table by the same user
2) More than 1GB of data was queued for ingestion for the same table by the same user
3) More than 5 minutes have passed since the first file was queued for ingestion for the same table by the same user
For more information about customizing the ingestion batching policy, see: https://docs.microsoft.com/azure/data-explorer/kusto/management/batchingpolicy
*/
// Disabled to prevent the surprising behavior of an existing batching policy being changed, without intent.
if (false) {
alterBatchingPolicy(kustoClient, databaseName, tableName);
}
}
private static void alterMergeExistingTableToProvidedSchema(Client kustoClient, String databaseName, String tableName, String tableSchema) {
waitForUserToProceed(String.format("Alter-merge existing table '%s.%s' to align with the provided schema", databaseName, tableName));
String command = String.format(".alter-merge table %s %s", tableName, tableSchema);
if (!executeControlCommand(kustoClient, databaseName, command)) {
die("Failed to alter table");
}
}
private static boolean executeControlCommand(Client kustoClient, String databaseName, String controlCommand) {
ClientRequestProperties clientRequestProperties = createClientRequestProperties("SampleApp_ControlCommand");
KustoOperationResult result;
try {
result = kustoClient.execute(databaseName, controlCommand, clientRequestProperties);
// Tip: Generally wouldn't print the response from a control command. We print here to demonstrate what the response looks like.
System.out.printf("Response from executed control command '%s':%n", controlCommand);
KustoResultSetTable primaryResults = result.getPrimaryResults();
for (int rowNum = 1; primaryResults.next(); rowNum++) {
KustoResultColumn[] columns = primaryResults.getColumns();
List<Object> currentRow = primaryResults.getCurrentRow();
System.out.printf("Record %s%n", rowNum);
for (int j = 0; j < currentRow.size(); j++) {
Object cell = currentRow.get(j);
System.out.printf("Column: '%s' of type '%s', Value: '%s'%n", columns[j].getColumnName(), columns[j].getColumnType(), cell == null ? "[null]" : cell);
}
System.out.println();
}
return true;
} catch (DataServiceException e) {
System.out.printf("Server error while trying to execute control command '%s' on database '%s'%n%n", controlCommand, databaseName);
e.printStackTrace();
} catch (DataClientException e) {
System.out.printf("Client error while trying to execute control command '%s' on database '%s'%n%n", controlCommand, databaseName);
e.printStackTrace();
} catch (Exception e) {
System.out.printf("Unexpected error while trying to execute control command '%s' on database '%s'%n%n", controlCommand, databaseName);
e.printStackTrace();
}
return false;
}
private static boolean executeQuery(Client kustoClient, String databaseName, String query) {
ClientRequestProperties clientRequestProperties = createClientRequestProperties("SampleApp_Query");
KustoOperationResult result;
try {
result = kustoClient.execute(databaseName, query, clientRequestProperties);
System.out.printf("Response from executed query '%s':%n", query);
KustoResultSetTable primaryResults = result.getPrimaryResults();
for (int rowNum = 1; primaryResults.next(); rowNum++) {
KustoResultColumn[] columns = primaryResults.getColumns();
List<Object> currentRow = primaryResults.getCurrentRow();
System.out.printf("Record %s%n", rowNum);
for (int j = 0; j < currentRow.size(); j++) {
Object cell = currentRow.get(j);
System.out.printf("Column: '%s' of type '%s', Value: '%s'%n", columns[j].getColumnName(), columns[j].getColumnType(), cell == null ? "[null]" : cell);
}
System.out.println();
}
return true;
} catch (DataServiceException e) {
System.out.printf("Server error while trying to execute query '%s' on database '%s'%n%n", query, databaseName);
e.printStackTrace();
} catch (DataClientException e) {
System.out.printf("Client error while trying to execute query '%s' on database '%s'%n%n", query, databaseName);
e.printStackTrace();
} catch (Exception e) {
System.out.printf("Unexpected error while trying to execute query '%s' on database '%s'%n%n", query, databaseName);
e.printStackTrace();
}
return false;
}
private static ClientRequestProperties createClientRequestProperties(String scope) {
return createClientRequestProperties(scope, null);
}
private static ClientRequestProperties createClientRequestProperties(String scope, String timeout) {
ClientRequestProperties clientRequestProperties = new ClientRequestProperties();
clientRequestProperties.setClientRequestId(String.format("%s;%s", scope, UUID.randomUUID()));
// Tip: While not common, you can alter the request default command timeout using the below command, e.g. to set the timeout to 10 minutes, use "10m".
if (StringUtils.isNotBlank(timeout)) {
clientRequestProperties.setOption(OPTION_SERVER_TIMEOUT, timeout);
}
return clientRequestProperties;
}
private static void queryExistingNumberOfRows(Client kustoClient, String databaseName, String tableName) {
waitForUserToProceed(String.format("Get existing row count in '%s.%s'", databaseName, tableName));
executeQuery(kustoClient, databaseName, String.format("%s | count", tableName));
}
private static void alterBatchingPolicy(Client kustoClient, String databaseName, String tableName) {
/*
Tip 1: Though most users should be fine with the defaults, to speed up ingestion, such as during development and
in this sample app, we opt to modify the default ingestion policy to ingest data after 10 seconds have passed.
Tip 2: This is generally a one-time configuration.
Tip 3: You can also skip the batching for some files using the Flush-Immediately property, though this option should be used with care as it is inefficient.
*/
waitForUserToProceed(String.format("Alter the batching policy for '%s.%s'", databaseName, tableName));
String command = String.format(".alter table %s policy ingestionbatching @'%s'", tableName, BATCHING_POLICY);
if (!executeControlCommand(kustoClient, databaseName, command)) {
System.out.println("Failed to alter the ingestion policy, which could be the result of insufficient permissions. The sample will still run, though ingestion will be delayed for up to 5 minutes.");
}
}
private static boolean createIngestionMappings(boolean useExistingMapping, Client kustoClient, String databaseName, String mappingName, String mappingValue, IngestionProperties.DataFormat dataFormat) {
if (!useExistingMapping) {
if (dataFormat.isMappingRequired() && StringUtils.isBlank(mappingValue)) {
System.out.printf("The data format %s requires a mapping, but configuration indicates to not use an existing mapping and no mapping was provided. Skipping this ingestion.%n", dataFormat.name());
return false;
}
if (StringUtils.isNotBlank(mappingValue)) {
IngestionMapping.IngestionMappingKind ingestionMappingKind = dataFormat.getIngestionMappingKind();
waitForUserToProceed(String.format("Create a %s mapping reference named '%s'", ingestionMappingKind, mappingName));
if (StringUtils.isBlank(mappingName)) {
mappingName = "DefaultQuickstartMapping" + UUID.randomUUID().toString().substring(0, 5);
}
String mappingCommand = String.format(".create-or-alter table %s ingestion %s mapping '%s' '%s'", tableName, ingestionMappingKind.name().toLowerCase(), mappingName, mappingValue);
if (!executeControlCommand(kustoClient, databaseName, mappingCommand)) {
System.out.printf("Failed to create a %s mapping reference named '%s'. Skipping this ingestion.%n", ingestionMappingKind, mappingName);
return false;
}
}
} else if (StringUtils.isBlank(mappingName)) {
System.out.println("The configuration indicates an existing mapping should be used, but none was provided. Skipping this ingestion.");
return false;
}
return true;
}
private static void ingestData(Map<String, String> file, IngestionProperties.DataFormat dataFormat, IngestClient ingestClient, String databaseName, String tableName, String mappingName) {
String sourceType = file.get("sourceType");
String uri = file.get("dataSourceUri");
waitForUserToProceed(String.format("Ingest '%s' from %s", uri, sourceType));
// Tip: When ingesting json files, if each line represents a single-line json, use MULTIJSON format even if the file only contains one line.
// If the json contains whitespace formatting, use SINGLEJSON. In this case, only one data row json object is allowed per file.
if (dataFormat == IngestionProperties.DataFormat.json) {
dataFormat = IngestionProperties.DataFormat.multijson;
}
// Tip: Kusto's Java SDK can ingest data from files, blobs, java.sql.ResultSet objects, and open streams.
// See the SDK's kusto-samples module and the E2E tests in kusto-ingest for additional references.
if (sourceType.equalsIgnoreCase("localfilesource")) {
ingestDataFromFile(ingestClient, databaseName, tableName, uri, dataFormat, mappingName);
} else if (sourceType.equalsIgnoreCase("blobsource")) {
ingestDataFromBlob(ingestClient, databaseName, tableName, uri, dataFormat, mappingName);
} else {
System.out.printf("Unknown source '%s' for file '%s'%n", sourceType, uri);
}
}
private static void ingestDataFromFile(IngestClient ingestClient, String databaseName, String tableName, String uri, IngestionProperties.DataFormat dataFormat, String mappingName) {
IngestionProperties ingestionProperties = createIngestionProperties(databaseName, tableName, dataFormat, mappingName);
// Tip 1: For optimal ingestion batching and performance, specify the uncompressed data size in the file descriptor (e.g. fileToIngest.length()).
// Otherwise, the service will determine the file size, requiring an additional s2s call and may not be accurate for compressed files.
// Tip 2: To correlate between ingestion operations in your applications and Kusto, set the source id and log it somewhere.
FileSourceInfo fileSourceInfo = new FileSourceInfo(uri, 0, UUID.randomUUID());
try {
ingestClient.ingestFromFile(fileSourceInfo, ingestionProperties);
} catch (IngestionClientException e) {
System.out.printf("Client exception while trying to ingest '%s' into '%s.%s'%n%n", uri, databaseName, tableName);
e.printStackTrace();
} catch (IngestionServiceException e) {
System.out.printf("Service exception while trying to ingest '%s' into '%s.%s'%n%n", uri, databaseName, tableName);
e.printStackTrace();
}
}
private static void ingestDataFromBlob(IngestClient ingestClient, String databaseName, String tableName, String blobUrl, IngestionProperties.DataFormat dataFormat, String mappingName) {
IngestionProperties ingestionProperties = createIngestionProperties(databaseName, tableName, dataFormat, mappingName);
// Tip 1: For optimal ingestion batching and performance, specify the uncompressed data size in the file descriptor instead of the default below of 0.
// Otherwise, the service will determine the file size, requiring an additional s2s call and may not be accurate for compressed files.
// Tip 2: To correlate between ingestion operations in your applications and Kusto, set the source id and log it somewhere.
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobUrl, 0, UUID.randomUUID());
try {
ingestClient.ingestFromBlob(blobSourceInfo, ingestionProperties);
} catch (IngestionClientException e) {
System.out.printf("Client exception while trying to ingest '%s' into '%s.%s'%n%n", blobUrl, databaseName, tableName);
e.printStackTrace();
} catch (IngestionServiceException e) {
System.out.printf("Service exception while trying to ingest '%s' into '%s.%s'%n%n", blobUrl, databaseName, tableName);
e.printStackTrace();
}
}
@NotNull
private static IngestionProperties createIngestionProperties(String databaseName, String tableName, IngestionProperties.DataFormat dataFormat, String mappingName) {
IngestionProperties ingestionProperties = new IngestionProperties(databaseName, tableName);
ingestionProperties.setDataFormat(dataFormat);
// Learn More: For more information about supported data formats, see: https://docs.microsoft.com/azure/data-explorer/ingestion-supported-formats
if (StringUtils.isNotBlank(mappingName) && dataFormat != null) {
ingestionProperties.setIngestionMapping(mappingName, dataFormat.getIngestionMappingKind());
}
// TODO - Config: Setting the ingestion batching policy takes up to 5 minutes to take effect.
// We therefore set Flush-Immediately for the sake of the sample, but it generally shouldn't be used in practice.
// Comment out the line below after running the sample the first few times.
ingestionProperties.setFlushImmediately(true);
return ingestionProperties;
}
private static void waitForIngestionToComplete() throws InterruptedException {
System.out.printf("Sleeping %s seconds for queued ingestion to complete. Note: This may take longer depending on the file size and ingestion policy.%n", WAIT_FOR_INGEST_SECONDS);
for (int i = WAIT_FOR_INGEST_SECONDS; i >= 0; i--) {
System.out.printf("%s.", i);
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
} catch (InterruptedException e) {
System.out.println("Exception while sleeping waiting for ingestion, with stack trace:");
e.printStackTrace();
throw e;
}
}
System.out.println();
System.out.println();
}
private static void executeValidationQueries(Client kustoClient, String databaseName, String tableName, boolean ingestData) {
String optionalPostIngestionMessage = ingestData ? "post-ingestion " : "";
System.out.printf("Step %s: Get %srow count for '%s.%s'%n", step++, optionalPostIngestionMessage, databaseName, tableName);
executeQuery(kustoClient, databaseName, String.format("%s | count", tableName));
System.out.println();
System.out.printf("Step %s: Get sample of %sdata:%n", step++, optionalPostIngestionMessage);
executeQuery(kustoClient, databaseName, String.format("%s | take 2", tableName));
System.out.println();
}
}