azure-kusto-java/samples
ohad bitton 84218c18c1
Managed streaming - better estimation for decision for streaming vs queuing (#368)
* stream source info size

* some good code, unfished

* manaed streaming queuing policy per format

* ser raw size for stream

* remove todo

* format

* fix test

* print exception message on last retry

* used undeclared jackson dep

* format

* add comment

* good comments

* better

* changelog

* format

* comments - E2E

* changelog

* comments

* comment

* fix

* last comments

---------

Co-authored-by: Ohad Bitton <ohbitton@microsoft.com>
2024-11-10 14:23:52 +02:00
..
src/main Managed streaming - better estimation for decision for streaming vs queuing (#368) 2024-11-10 14:23:52 +02:00
README.md New client missing feats (#393) 2024-11-07 18:08:22 +02:00
pom.xml Implements azure core http client (#388) 2024-10-27 15:11:44 +02:00

README.md

Getting Started with Kusto Java SDK

Prerequisites

  • A Java Developer Kit (JDK), version 11 or later
  • Maven
  • Clone the project and enter the samples directory:
      git clone https://github.com/Azure/azure-kusto-java.git
      cd samples

Execute Query Sample

This sample will demonstrate how to execute a query.
Sample Code

Prerequisites

Steps to follow

  1. Build Connection string and initialize the client
ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(
        System.getProperty("clusterPath"),
        System.getProperty("appId"),
        System.getProperty("appKey"),
        System.getProperty("appTenant"));
        Client client = ClientFactory.createClient(csb);

If you'd like to tweak the underlying HTTP client used to make the requests, build an HTTP client properties object and use that along the Connection string to initialise the client:

ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(
        System.getProperty("clusterPath"),
        System.getProperty("appId"),
        System.getProperty("appKey"),
        System.getProperty("appTenant"));

        HttpClientProperties properties = HttpClientProperties.builder()
        .keepAlive(true)
        .maxKeepAliveTime(120)
        .maxConnectionsTotal(40)
        .build();

        Client client = ClientFactory.createClient(csb, properties);
  1. Execute query
KustoOperationResult results = client.execute( System.getProperty("dbName"), System.getProperty("query"));

How to run this sample

cd samples
mvn clean compile exec:java -Dexec.mainClass="Query" \
                            -DclusterPath="cluster/path" \
                            -DappId="app-id" \
                            -DappKey="appKey" \
                            -DappTenant="subscription-id" \
                            -DdbName="dbName" \
                            -Dquery="your | query"

Advanced Query Sample

This sample shows some more advanced options available when querying data, like using query parameters to guard against injection attacks and extracting individual values from the query results.
Sample Code for Advanced Query

Prerequisites

Notable Features

  1. Creating a table with initial data using the .set-or-replace command
String tableCommand = String.join(newLine,
        ".set-or-replace Events <|",
        "range x from 1 to 100 step 1",
        "| extend ts = totimespan(strcat(x,'.00:00:00'))",
        "| project timestamp = now(ts), eventName = strcat('event ', x)");
client.execute(database, tableCommand);
  1. Using query parameters to guard against injection attacks
ClientRequestProperties clientRequestProperties = new ClientRequestProperties();
clientRequestProperties.setParameter("eventNameFilter", "event 1");
String query = String.join(newLine,
        "declare query_parameters(eventNameFilter:string);",
        "Events",
        "| where eventName == eventNameFilter");
KustoOperationResult results = client.execute(database, query, clientRequestProperties);
  1. Extracting individual values from the query results
KustoResultSetTable mainTableResult = results.getPrimaryResults();
System.out.printf("Kusto sent back %s rows.%n", mainTableResult.count());

// iterate values
List<Event> events = new ArrayList<>();
while (mainTableResult.next()) {
    events.add(new Event(
        mainTableResult.getKustoDateTime("timestamp"), 
        mainTableResult.getString("eventName")));
}

How to run this sample

Note: Running this sample will create a table named Events in the given database.

cd samples
mvn clean compile exec:java -Dexec.mainClass="AdvancedQuery" \
                            -DclusterPath="cluster/path" \
                            -DappId="app-id" \
                            -DappKey="appKey" \
                            -DappTenant="tenant-id" \
                            -DdbName="dbName" 

File Ingestion Sample

This sample will demonstrate how to ingest data from a file into table.
Sample Code

Prerequisites

Steps to follow

  1. Build connection string and initialize
ConnectionStringBuilder csb =
                    ConnectionStringBuilder.createWithAadApplicationCredentials(System.getProperty("clusterPath"),
                            System.getProperty("appId"),
                            System.getProperty("appKey"),
                            System.getProperty("appTenant"));
  1. Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);

IngestionProperties ingestionProperties = new IngestionProperties(System.getProperty("dbName"),
                    System.getProperty("tableName"));
ingestionProperties.getIngestionMapping().setIngestionMappingReference(System.getProperty("dataMappingName"), IngestionMapping.IngestionMappingKind.Csv);
  1. Load file and ingest it into table
FileSourceInfo fileSourceInfo = new FileSourceInfo(System.getProperty("filePath"), 0);
IngestionResult ingestionResult = client.ingestFromFile(fileSourceInfo, ingestionProperties);

How to run this sample

cd samples
mvn clean compile exec:java -Dexec.cleanupDaemonThreads=false \
                              -Dexec.mainClass="FileIngestion" \
                              -DclusterPath="cluster/path" \
                              -DappId="app-id" \
                              -DappKey="appKey" \
                              -DappTenant="subscription-id" \
                              -DdbName="dbName" \
                              -DtableName="tableName" \
                              -DdataMappingName="dataMappingName" \
                              -DfilePath="file/path"

StreamingIngest Sample

This sample will demonstrate how to ingest data using the streaming ingest client. {Sample Code](src/main/java/StreamingIngest)

Prerequisites

.create table StreamingIngest (rownumber:int, rowguid:string, xdouble:real, xfloat:real, xbool:bool, xint16:int, xint32:int, xint64:long, xuint8:long, xuint16:long, xuint32:long, xuint64:long, xdate:datetime, xsmalltext:string, xtext:string, xnumberAsText:string, xtime:timespan, xtextWithNulls:string, xdynamicWithNulls:dynamic)
.create table StreamingIngest ingestion json mapping "JsonMapping" '[{"column":"rownumber","path": "$.rownumber", "datatype":"int" },{"column":"rowguid", "path":"$.rowguid","datatype":"string" },{"column":"xdouble", "path":"$.xdouble", "datatype":"real" },{"column":"xfloat", "path":"$.xfloat", "datatype":"real" },{"column":"xbool", "path":"$.xbool", "datatype":"bool" },{"column":"xint16", "path":"$.xint16", "datatype":"int" },{"column":"xint32", "path":"$.xint32", "datatype":"int" },{"column":"xint64", "path":"$.xint64", "datatype":"long" },{"column":"xuint8", "path":"$.xuint8", "datatype":"long"},{"column":"xuint16", "path":"$.xuint16", "datatype":"long"},{"column":"xuint32", "path":"$.xuint32", "datatype":"long"},{"column":"xuint64", "path":"$.xuint64", "datatype":"long"},{"column":"xdate", "path":"$.xdate", "datatype":"datetime"},{"column":"xsmalltext", "path":"$.xsmalltext","datatype":"string"},{"column":"xtext", "path":"$.xtext","datatype":"string"},{"column":"xnumberAsText", "path":"$.xnumberAsText","datatype":"string"},{"column":"xtime", "path":"$.xtime","datatype":"timespan"}, {"column":"xtextWithNulls", "path":"$.xtextWithNulls","datatype":"string"}, {"column":"xdynamicWithNulls", "path":"$.xdynamicWithNulls","datatype":"dynamic"}]'

Steps to follow

  1. Build connection string and initialize
ConnectionStringBuilder csb =
                    ConnectionStringBuilder.createWithAadApplicationCredentials(System.getProperty("clusterPath"),
                            System.getProperty("appId"),
                            System.getProperty("appKey"),
                            System.getProperty("appTenant"));
  1. Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);

IngestionProperties ingestionProperties = new IngestionProperties(System.getProperty("dbName"),
                    System.getProperty("tableName"));
  1. Create Source info

StreamSourceInfo:

InputStream inputStream = new ByteArrayInputStream(Charset.forName("UTF-8").encode(data).array());
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream);

If the data is compressed:

streamSourceInfo.setCompressionType(CompressionType.gz);

FileSourceInfo:

FileSourceInfo fileSourceInfo = new FileSourceInfo(path, new File(path).length());
  1. Ingest into table and verify ingestion status

From stream:

OperationStatus status = streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;

From File:

OperationStatus status = streamingIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;

How to run this sample

cd samples
mvn clean compile exec:java -Dexec.cleanupDaemonThreads=false \
                              -Dexec.mainClass="StreamingIngest" \
                              -DclusterPath="cluster/path" \
                              -DappId="app-id" \
                              -DappKey="appKey" \
                              -DappTenant="subscription-id" \
                              -DdbName="dbName" \
                              -DtableName="tableName" \
                              -DdataMappingName="dataMappingName"

Using CompletableFutures

Take a look at this Sample Code to learn how to run File Ingestion using CompletableFutures in order to make the calls asynchronously.

Note: The implementation itself of the File Ingestion API, like all the other APIs in this version, is not asynchronous.

Query Table Status Sample

This sample will demonstrate how to retrieve ingestion status.
Sample Code

Prerequisites

Steps to follow

  1. Set timeout for status retrieval :
Integer timeoutInSec = Integer.getInteger("timeoutInSec");
  1. Build connection string and initialize
ConnectionStringBuilder csb =
        ConnectionStringBuilder.createWithAadApplicationCredentials( System.getProperty("clusterPath"),
                System.getProperty("appId"),
                System.getProperty("appKey"),
                System.getProperty("appTenant"));
  1. Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);

IngestionProperties ingestionProperties = new IngestionProperties( System.getProperty("dbName"),
                    System.getProperty("tableName"));
            ingestionProperties.getIngestionMapping().setIngestionMappingReference(System.getProperty("dataMappingName"), IngestionMapping.IngestionMappingKind.Csv);
            ingestionProperties.setReportMethod(QueueAndTable);
            ingestionProperties.setReportLevel(IngestionProperties.IngestionReportLevel.FAILURES_AND_SUCCESSES);
  1. Load file and ingest it into table
FileSourceInfo fileSourceInfo = new FileSourceInfo(System.getProperty("filePath"), 0);
IngestionResult ingestionResult = client.ingestFromFile(fileSourceInfo, ingestionProperties);
  1. Retrieve ingestion status and wait for result
List<IngestionStatus> statuses = ingestionResult.getIngestionStatusCollection();

while (statuses.get(0).status == OperationStatus.Pending && timeoutInSec > 0) {
    Thread.sleep(1000);
    timeoutInSec -= 1;
    statuses = ingestionResult.getIngestionStatusCollection();
}

Running this sample

To run this sample:

cd samples
mvn clean compile exec:java -Dexec.mainClass="TableStatus" \
                            -DclusterPath="cluster/path" \
                            -DappId="app-id" \
                            -DappKey="appKey" \
                            -DappTenant="subscription-id" \
                            -DdbName="dbName" \
                            -DtableName="tableName" \
                            -DdataMappingName="dataMappingName" \
                            -DfilePath="file/path"
                            -DtimeoutInSec=300
  1. Simple Jmeter sample to test stress test of the clients and cluster

Running this sample

Fill the right parameters in jmeter_test_load.properties file, the application provided should be granted User and Ingestor permissions on the database. The performance we tested here was of the client, and therefore we use the logs to compare total run time of each request. A warn log is logged for each request with and can be parsed as one TXT (given column name is 'data') with the following KQL:

jmeterLog | parse-where data with Datetime:string " WARN " Text:string "after: " ms:int * | summarize percentiles(ms, 5,80, 90,95),count(), avg(ms) by substring(Text,39)

More information

http://azure.com/java

If you don't have a Microsoft Azure subscription you can get a FREE trial account here


This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.