Many minor improvements to Quickstart (#201)

This commit is contained in:
Yihezkel Schoenbrun 2021-12-20 10:38:21 +02:00 коммит произвёл GitHub
Родитель b3d73b8bb1
Коммит 63519e7a79
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
1 изменённых файлов: 99 добавлений и 91 удалений

Просмотреть файл

@ -25,36 +25,35 @@ import java.security.cert.X509Certificate;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static com.microsoft.azure.kusto.data.ClientRequestProperties.OPTION_SERVER_TIMEOUT;
public class KustoSampleApp { public class KustoSampleApp {
// TODO - Config: // TODO (config - optional): Change the authentication method from "User Prompt" to any of the other options
// If this quickstart app was downloaded from OneClick, kusto_sample_config.json should be pre-populated with your cluster's details
// If this quickstart app was downloaded from GitHub, edit kusto_sample_config.json and modify the cluster URL and database fields appropriately
private static final String CONFIG_FILE_NAME = "kusto_sample_config.json";
// TODO - Config (Optional): Change the authentication method from User Prompt to any of the other options
// Some of the auth modes require additional environment variables to be set in order to work (see usage in generateConnectionString below) // Some of the auth modes require additional environment variables to be set in order to work (see usage in generateConnectionString below)
// Managed Identity Authentication only works when running as an Azure service (webapp, function, etc.) // Managed Identity Authentication only works when running as an Azure service (webapp, function, etc.)
private static final String AUTHENTICATION_MODE = "UserPrompt"; // Options: (UserPrompt|ManagedIdentity|AppKey|AppCertificate|DeviceCode) private static final String AUTHENTICATION_MODE = "UserPrompt"; // Options: (UserPrompt|ManagedIdentity|AppKey|AppCertificate|DeviceCode)
// TODO - Config (Optional): Toggle to false to execute this script "unattended" // TODO (config - optional): Toggle to false to execute this script "unattended"
private static final boolean WAIT_FOR_USER = true; private static final boolean WAIT_FOR_USER = true;
private static String databaseName; // TODO (config):
private static String tableName; // If this quickstart app was downloaded from OneClick, kusto_sample_config.json should be pre-populated with your cluster's details
private static String kustoUrl; // If this quickstart app was downloaded from GitHub, edit kusto_sample_config.json and modify the cluster URL and database fields appropriately
private static String ingestUrl; private static final String CONFIG_FILE_NAME = "kusto_sample_config.json";
private static boolean useExistingTable;
private static boolean alterTable;
private static boolean queryData;
private static boolean ingestData;
private static List<Map<String, String>> dataToIngest;
private static String tableSchema;
private static int step = 1;
private static final String BATCHING_POLICY = "{ \"MaximumBatchingTimeSpan\": \"00:00:10\", \"MaximumNumberOfItems\": 500, \"MaximumRawDataSizeMB\": 1024 }"; private static final String BATCHING_POLICY = "{ \"MaximumBatchingTimeSpan\": \"00:00:10\", \"MaximumNumberOfItems\": 500, \"MaximumRawDataSizeMB\": 1024 }";
private static final int WAIT_FOR_INGEST_SECONDS = 20; private static final int WAIT_FOR_INGEST_SECONDS = 20;
private static int step = 1;
private static boolean useExistingTable;
private static String databaseName;
private static String tableName;
private static String tableSchema;
private static String kustoUrl;
private static String ingestUrl;
private static List<Map<String, String>> dataToIngest;
private static boolean shouldAlterTable;
private static boolean shouldQueryData;
private static boolean shouldIngestData;
public static void main(String[] args) throws InterruptedException, IOException { public static void main(String[] args) throws InterruptedException, IOException {
System.out.println("Kusto sample app is starting..."); System.out.println("Kusto sample app is starting...");
@ -65,17 +64,17 @@ public class KustoSampleApp {
} }
try (IngestClient ingestClient = IngestClientFactory.createClient(generateConnectionString(ingestUrl, AUTHENTICATION_MODE))) { try (IngestClient ingestClient = IngestClientFactory.createClient(generateConnectionString(ingestUrl, AUTHENTICATION_MODE))) {
// Tip: Avoid creating a new Kusto/ingest Client for each use. Instead, create the clients once and reuse them. // Tip: Avoid creating a new Kusto/ingest client for each use. Instead, create the clients once and reuse them.
Client kustoClient = new ClientImpl(generateConnectionString(kustoUrl, AUTHENTICATION_MODE)); Client kustoClient = new ClientImpl(generateConnectionString(kustoUrl, AUTHENTICATION_MODE));
if (useExistingTable) { if (useExistingTable) {
if (alterTable) { if (shouldAlterTable) {
// Tip: Usually table was originally created with a schema appropriate for the data being ingested, so this wouldn't be needed. // Tip: Usually table was originally created with a schema appropriate for the data being ingested, so this wouldn't be needed.
// Learn More: For more information about altering table schemas, see: https://docs.microsoft.com/en-us/azure/data-explorer/kusto/management/alter-table-command // Learn More: For more information about altering table schemas, see: https://docs.microsoft.com/azure/data-explorer/kusto/management/alter-table-command
alterMergeExistingTableToProvidedSchema(kustoClient, databaseName, tableName, tableSchema); alterMergeExistingTableToProvidedSchema(kustoClient, databaseName, tableName, tableSchema);
} }
if (queryData) { if (shouldQueryData) {
// Learn More: For more information about Kusto Query Language (KQL), see: https://docs.microsoft.com/azure/data-explorer/write-queries // Learn More: For more information about Kusto Query Language (KQL), see: https://docs.microsoft.com/azure/data-explorer/write-queries
queryExistingNumberOfRows(kustoClient, databaseName, tableName); queryExistingNumberOfRows(kustoClient, databaseName, tableName);
} }
@ -85,25 +84,25 @@ public class KustoSampleApp {
createNewTable(kustoClient, databaseName, tableName, tableSchema); createNewTable(kustoClient, databaseName, tableName, tableSchema);
} }
if (ingestData) { if (shouldIngestData) {
for (Map<String, String> file : dataToIngest) { for (Map<String, String> file : dataToIngest) {
IngestionProperties.DataFormat dataFormat = IngestionProperties.DataFormat.valueOf(file.get("format").toLowerCase()); IngestionProperties.DataFormat dataFormat = IngestionProperties.DataFormat.valueOf(file.get("format").toLowerCase());
String mappingName = file.get("mappingName"); String mappingName = file.get("mappingName");
// Tip: This is generally a one-time configuration. // Tip: This is generally a one-time configuration.
// Learn More: For more information about providing inline mappings and mapping references, see: https://docs.microsoft.com/azure/data-explorer/kusto/management/mappings // Learn More: For more information about providing inline mappings and mapping references, see: https://docs.microsoft.com/azure/data-explorer/kusto/management/mappings
if (!createIngestionMappings(Boolean.parseBoolean(file.get("useExistingMapping")), kustoClient, databaseName, mappingName, file.get("mappingValue"), dataFormat)) { if (!createIngestionMappings(Boolean.parseBoolean(file.get("useExistingMapping")), kustoClient, databaseName, tableName, mappingName, file.get("mappingValue"), dataFormat)) {
continue; continue;
} }
// Learn More: For more information about ingesting data to Kusto in Java, see: https://docs.microsoft.com/azure/data-explorer/java-ingest-data // Learn More: For more information about ingesting data to Kusto in Java, see: https://docs.microsoft.com/azure/data-explorer/java-ingest-data
ingestData(file, dataFormat, ingestClient, databaseName, tableName, mappingName); ingest(file, dataFormat, ingestClient, databaseName, tableName, mappingName);
} }
waitForIngestionToComplete(); waitForIngestionToComplete();
} }
if (queryData) { if (shouldQueryData) {
executeValidationQueries(kustoClient, databaseName, tableName, ingestData); executeValidationQueries(kustoClient, databaseName, tableName, shouldIngestData);
} }
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
die("Couldn't create Kusto client", e); die("Couldn't create Kusto client", e);
@ -139,7 +138,6 @@ public class KustoSampleApp {
try { try {
configs = new ObjectMapper().readValue(configFile, HashMap.class); configs = new ObjectMapper().readValue(configFile, HashMap.class);
// Required configs
useExistingTable = Boolean.parseBoolean((String) configs.get("useExistingTable")); useExistingTable = Boolean.parseBoolean((String) configs.get("useExistingTable"));
databaseName = (String) configs.get("databaseName"); databaseName = (String) configs.get("databaseName");
tableName = (String) configs.get("tableName"); tableName = (String) configs.get("tableName");
@ -147,67 +145,77 @@ public class KustoSampleApp {
kustoUrl = (String) configs.get("kustoUri"); kustoUrl = (String) configs.get("kustoUri");
ingestUrl = (String) configs.get("ingestUri"); ingestUrl = (String) configs.get("ingestUri");
dataToIngest = (List<Map<String, String>>) configs.get("data"); dataToIngest = (List<Map<String, String>>) configs.get("data");
alterTable = Boolean.parseBoolean((String) configs.get("alterTable")); shouldAlterTable = Boolean.parseBoolean((String) configs.get("alterTable"));
queryData = Boolean.parseBoolean((String) configs.get("queryData")); shouldQueryData = Boolean.parseBoolean((String) configs.get("queryData"));
ingestData = Boolean.parseBoolean((String) configs.get("ingestData")); shouldIngestData = Boolean.parseBoolean((String) configs.get("ingestData"));
if (StringUtils.isBlank(databaseName) || StringUtils.isBlank(tableName) || StringUtils.isBlank(tableSchema) if (StringUtils.isBlank(databaseName) || StringUtils.isBlank(tableName) || StringUtils.isBlank(tableSchema)
|| StringUtils.isBlank(kustoUrl) || StringUtils.isBlank(ingestUrl) || StringUtils.isBlank(kustoUrl) || StringUtils.isBlank(ingestUrl)
|| StringUtils.isBlank((String) configs.get("useExistingTable")) || dataToIngest.isEmpty()) { || StringUtils.isBlank((String) configs.get("useExistingTable")) || dataToIngest.isEmpty()) {
die(String.format("%s is missing required fields", CONFIG_FILE_NAME)); die(String.format("File '%s' is missing required fields", configFileName));
} }
} catch (IOException e) { } catch (IOException e) {
die("Couldn't read config file", e); die(String.format("Couldn't read config file from file '%s'", configFileName), e);
} }
} }
private static ConnectionStringBuilder generateConnectionString(String endpointUrl, String authenticationMode) { private static ConnectionStringBuilder generateConnectionString(String clusterUrl, String authenticationMode) {
// Learn More: For additional information on how to authorize users and apps in Kusto, see: https://docs.microsoft.com/azure/data-explorer/manage-database-permissions
ConnectionStringBuilder csb = null; ConnectionStringBuilder csb = null;
switch (authenticationMode) { switch (authenticationMode) {
case "UserPrompt": case "UserPrompt":
csb = ConnectionStringBuilder.createWithUserPrompt(endpointUrl); // Prompt user for credentials
csb = ConnectionStringBuilder.createWithUserPrompt(clusterUrl);
break; break;
case "ManagedIdentity": case "ManagedIdentity":
// Connect using the system or user provided managed identity (Azure service only) csb = createManagedIdentityConnectionString(clusterUrl);
// TODO - Config (Optional): Managed identity client id if you are using a User Assigned Managed Id
String clientId = System.getenv("MANAGED_IDENTITY_CLIENT_ID");
if (StringUtils.isBlank(clientId)) {
csb = ConnectionStringBuilder.createWithAadManagedIdentity(endpointUrl);
} else {
csb = ConnectionStringBuilder.createWithAadManagedIdentity(endpointUrl, clientId);
}
break; break;
case "AppKey": case "AppKey":
// TODO - Config (Optional): App Id & tenant, and App Key to authenticate with // Learn More: For information about how to procure an AAD Application, see: https://docs.microsoft.com/azure/data-explorer/provision-azure-ad-app
// For information about how to procure an AAD Application see: https://docs.microsoft.com/azure/data-explorer/provision-azure-ad-app // TODO (config - optional): App ID & tenant, and App Key to authenticate with
csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpointUrl, System.getenv("APP_ID"), System.getenv("APP_KEY"), System.getenv("APP_TENANT")); csb = ConnectionStringBuilder.createWithAadApplicationCredentials(clusterUrl,
System.getenv("APP_ID"),
System.getenv("APP_KEY"),
System.getenv("APP_TENANT"));
break; break;
case "AppCertificate": case "AppCertificate":
// TODO - Config (Optional): App Id & tenant, path to public certificate and path to private certificate pem file to authenticate with csb = createApplicationCertificateConnectionString(clusterUrl);
csb = createAppCertificateStringBuilder(endpointUrl);
break; break;
case "DeviceCode": case "DeviceCode":
csb = ConnectionStringBuilder.createWithDeviceCode(endpointUrl); csb = ConnectionStringBuilder.createWithDeviceCode(clusterUrl);
break; break;
default: default:
die(String.format("Authentication mode %s is not supported", authenticationMode)); die(String.format("Authentication mode '%s' is not supported", authenticationMode));
} }
csb.setApplicationNameForTracing("KustoJavaSdkQuickStart"); csb.setApplicationNameForTracing("KustoJavaSdkQuickStart");
return csb; return csb;
} }
private static ConnectionStringBuilder createAppCertificateStringBuilder(String endpointUrl) { @NotNull
private static ConnectionStringBuilder createManagedIdentityConnectionString(String clusterUrl) {
// Connect using the system- or user-assigned managed identity (Azure service only)
// TODO (config - optional): Managed identity client ID if you are using a user-assigned managed identity
String clientId = System.getenv("MANAGED_IDENTITY_CLIENT_ID");
if (StringUtils.isBlank(clientId)) {
return ConnectionStringBuilder.createWithAadManagedIdentity(clusterUrl);
} else {
return ConnectionStringBuilder.createWithAadManagedIdentity(clusterUrl, clientId);
}
}
private static ConnectionStringBuilder createApplicationCertificateConnectionString(String clusterUrl) {
// TODO (config - optional): App ID & tenant, path to public certificate and path to private certificate pem file to authenticate with
String appId = System.getenv("APP_ID"); String appId = System.getenv("APP_ID");
String tenantId = System.getenv("APP_TENANT"); String appTenant = System.getenv("APP_TENANT");
String privateKeyFilePath = System.getenv("PRIVATE_KEY_PEM_FILE_PATH"); String privateKeyPemFilePath = System.getenv("PRIVATE_KEY_PEM_FILE_PATH");
String publicCertFilePath = System.getenv("PUBLIC_CERT_FILE_PATH"); String publicCertFilePath = System.getenv("PUBLIC_CERT_FILE_PATH");
try { try {
PrivateKey privateKey = SecurityUtils.getPrivateKey(publicCertFilePath); PrivateKey privateKey = SecurityUtils.getPrivateKey(publicCertFilePath);
X509Certificate x509Certificate = SecurityUtils.getPublicCertificate(privateKeyFilePath); X509Certificate x509Certificate = SecurityUtils.getPublicCertificate(privateKeyPemFilePath);
return ConnectionStringBuilder.createWithAadApplicationCertificate(endpointUrl, appId, x509Certificate, privateKey, tenantId); return ConnectionStringBuilder.createWithAadApplicationCertificate(clusterUrl, appId, x509Certificate, privateKey, appTenant);
} catch (IOException | GeneralSecurityException e) { } catch (IOException | GeneralSecurityException e) {
die("Couldn't create ConnectionStringBuilder for app certificate authentication", e); die("Couldn't create ConnectionStringBuilder for application certificate authentication", e);
return null; return null;
} }
} }
@ -216,7 +224,7 @@ public class KustoSampleApp {
waitForUserToProceed(String.format("Create table '%s.%s'", databaseName, tableName)); waitForUserToProceed(String.format("Create table '%s.%s'", databaseName, tableName));
String command = String.format(".create table %s %s", tableName, tableSchema); String command = String.format(".create table %s %s", tableName, tableSchema);
if (!executeControlCommand(kustoClient, databaseName, command)) { if (!executeControlCommand(kustoClient, databaseName, command)) {
die("Failed to create table or validate it exists"); die(String.format("Failed to create table or validate it exists using command '%s'", command));
} }
/* /*
@ -227,7 +235,7 @@ public class KustoSampleApp {
3) More than 5 minutes have passed since the first file was queued for ingestion for the same table by the same user 3) More than 5 minutes have passed since the first file was queued for ingestion for the same table by the same user
For more information about customizing the ingestion batching policy, see: https://docs.microsoft.com/azure/data-explorer/kusto/management/batchingpolicy For more information about customizing the ingestion batching policy, see: https://docs.microsoft.com/azure/data-explorer/kusto/management/batchingpolicy
*/ */
// Disabled to prevent the surprising behavior of an existing batching policy being changed, without intent. // Disabled to prevent an existing batching policy from being unintentionally changed
if (false) { if (false) {
alterBatchingPolicy(kustoClient, databaseName, tableName); alterBatchingPolicy(kustoClient, databaseName, tableName);
} }
@ -237,16 +245,16 @@ public class KustoSampleApp {
waitForUserToProceed(String.format("Alter-merge existing table '%s.%s' to align with the provided schema", databaseName, tableName)); waitForUserToProceed(String.format("Alter-merge existing table '%s.%s' to align with the provided schema", databaseName, tableName));
String command = String.format(".alter-merge table %s %s", tableName, tableSchema); String command = String.format(".alter-merge table %s %s", tableName, tableSchema);
if (!executeControlCommand(kustoClient, databaseName, command)) { if (!executeControlCommand(kustoClient, databaseName, command)) {
die("Failed to alter table"); die(String.format("Failed to alter table using command '%s'", command));
} }
} }
private static boolean executeControlCommand(Client kustoClient, String databaseName, String controlCommand) { private static boolean executeControlCommand(Client kustoClient, String databaseName, String controlCommand) {
ClientRequestProperties clientRequestProperties = createClientRequestProperties("SampleApp_ControlCommand"); ClientRequestProperties clientRequestProperties = createClientRequestProperties("Java_SampleApp_ControlCommand");
KustoOperationResult result; KustoOperationResult result;
try { try {
result = kustoClient.execute(databaseName, controlCommand, clientRequestProperties); result = kustoClient.execute(databaseName, controlCommand, clientRequestProperties);
// Tip: Generally wouldn't print the response from a control command. We print here to demonstrate what the response looks like. // Tip: Actual implementations wouldn't generally print the response from a control command. We print here to demonstrate what the response looks like.
System.out.printf("Response from executed control command '%s':%n", controlCommand); System.out.printf("Response from executed control command '%s':%n", controlCommand);
KustoResultSetTable primaryResults = result.getPrimaryResults(); KustoResultSetTable primaryResults = result.getPrimaryResults();
for (int rowNum = 1; primaryResults.next(); rowNum++) { for (int rowNum = 1; primaryResults.next(); rowNum++) {
@ -274,7 +282,7 @@ public class KustoSampleApp {
} }
private static boolean executeQuery(Client kustoClient, String databaseName, String query) { private static boolean executeQuery(Client kustoClient, String databaseName, String query) {
ClientRequestProperties clientRequestProperties = createClientRequestProperties("SampleApp_Query"); ClientRequestProperties clientRequestProperties = createClientRequestProperties("Java_SampleApp_Query");
KustoOperationResult result; KustoOperationResult result;
try { try {
result = kustoClient.execute(databaseName, query, clientRequestProperties); result = kustoClient.execute(databaseName, query, clientRequestProperties);
@ -312,9 +320,9 @@ public class KustoSampleApp {
ClientRequestProperties clientRequestProperties = new ClientRequestProperties(); ClientRequestProperties clientRequestProperties = new ClientRequestProperties();
clientRequestProperties.setClientRequestId(String.format("%s;%s", scope, UUID.randomUUID())); clientRequestProperties.setClientRequestId(String.format("%s;%s", scope, UUID.randomUUID()));
// Tip: While not common, you can alter the request default command timeout using the below command, e.g. to set the timeout to 10 minutes, use "10m". // Tip: Though uncommon, you can alter the request default command timeout using the below command, e.g. to set the timeout to 10 minutes, use "10m".
if (StringUtils.isNotBlank(timeout)) { if (StringUtils.isNotBlank(timeout)) {
clientRequestProperties.setOption(OPTION_SERVER_TIMEOUT, timeout); clientRequestProperties.setOption(ClientRequestProperties.OPTION_SERVER_TIMEOUT, timeout);
} }
return clientRequestProperties; return clientRequestProperties;
} }
@ -327,48 +335,48 @@ public class KustoSampleApp {
private static void alterBatchingPolicy(Client kustoClient, String databaseName, String tableName) { private static void alterBatchingPolicy(Client kustoClient, String databaseName, String tableName) {
/* /*
Tip 1: Though most users should be fine with the defaults, to speed up ingestion, such as during development and Tip 1: Though most users should be fine with the defaults, to speed up ingestion, such as during development and
in this sample app, we opt to modify the default ingestion policy to ingest data after 10 seconds have passed. in this sample app, we opt to modify the default ingestion policy to ingest data after at most 10 seconds.
Tip 2: This is generally a one-time configuration. Tip 2: This is generally a one-time configuration.
Tip 3: You can also skip the batching for some files using the Flush-Immediately property, though this option should be used with care as it is inefficient. Tip 3: You can also skip the batching for some files using the Flush-Immediately property, though this option should be used with care as it is inefficient.
*/ */
waitForUserToProceed(String.format("Alter the batching policy for '%s.%s'", databaseName, tableName)); waitForUserToProceed(String.format("Alter the batching policy for table '%s.%s'", databaseName, tableName));
String command = String.format(".alter table %s policy ingestionbatching @'%s'", tableName, BATCHING_POLICY); String command = String.format(".alter table %s policy ingestionbatching @'%s'", tableName, BATCHING_POLICY);
if (!executeControlCommand(kustoClient, databaseName, command)) { if (!executeControlCommand(kustoClient, databaseName, command)) {
System.out.println("Failed to alter the ingestion policy, which could be the result of insufficient permissions. The sample will still run, though ingestion will be delayed for up to 5 minutes."); System.out.println("Failed to alter the ingestion policy, which could be the result of insufficient permissions. The sample will still run, though ingestion will be delayed for up to 5 minutes.");
} }
} }
private static boolean createIngestionMappings(boolean useExistingMapping, Client kustoClient, String databaseName, String mappingName, String mappingValue, IngestionProperties.DataFormat dataFormat) { private static boolean createIngestionMappings(boolean useExistingMapping, Client kustoClient, String databaseName, String tableName, String mappingName, String mappingValue, IngestionProperties.DataFormat dataFormat) {
if (!useExistingMapping) { if (!useExistingMapping) {
if (dataFormat.isMappingRequired() && StringUtils.isBlank(mappingValue)) { if (dataFormat.isMappingRequired() && StringUtils.isBlank(mappingValue)) {
System.out.printf("The data format %s requires a mapping, but configuration indicates to not use an existing mapping and no mapping was provided. Skipping this ingestion.%n", dataFormat.name()); System.out.printf("The data format '%s' requires a mapping, but configuration indicates to not use an existing mapping and no mapping was provided. Skipping this ingestion.%n", dataFormat.name());
return false; return false;
} }
if (StringUtils.isNotBlank(mappingValue)) { if (StringUtils.isNotBlank(mappingValue)) {
IngestionMapping.IngestionMappingKind ingestionMappingKind = dataFormat.getIngestionMappingKind(); IngestionMapping.IngestionMappingKind ingestionMappingKind = dataFormat.getIngestionMappingKind();
waitForUserToProceed(String.format("Create a %s mapping reference named '%s'", ingestionMappingKind, mappingName)); waitForUserToProceed(String.format("Create a '%s' mapping reference named '%s'", ingestionMappingKind, mappingName));
if (StringUtils.isBlank(mappingName)) { if (StringUtils.isBlank(mappingName)) {
mappingName = "DefaultQuickstartMapping" + UUID.randomUUID().toString().substring(0, 5); mappingName = "DefaultQuickstartMapping" + UUID.randomUUID().toString().substring(0, 5);
} }
String mappingCommand = String.format(".create-or-alter table %s ingestion %s mapping '%s' '%s'", tableName, ingestionMappingKind.name().toLowerCase(), mappingName, mappingValue); String mappingCommand = String.format(".create-or-alter table %s ingestion %s mapping '%s' '%s'", tableName, ingestionMappingKind.name().toLowerCase(), mappingName, mappingValue);
if (!executeControlCommand(kustoClient, databaseName, mappingCommand)) { if (!executeControlCommand(kustoClient, databaseName, mappingCommand)) {
System.out.printf("Failed to create a %s mapping reference named '%s'. Skipping this ingestion.%n", ingestionMappingKind, mappingName); System.out.printf("Failed to create a '%s' mapping reference named '%s'. Skipping this ingestion.%n", ingestionMappingKind, mappingName);
return false; return false;
} }
} }
} else if (StringUtils.isBlank(mappingName)) { } else if (dataFormat.isMappingRequired() && StringUtils.isBlank(mappingName)) {
System.out.println("The configuration indicates an existing mapping should be used, but none was provided. Skipping this ingestion."); System.out.printf("The data format '%s' requires a mapping and the configuration indicates an existing mapping should be used, but none was provided. Skipping this ingestion.%n", dataFormat.name());
return false; return false;
} }
return true; return true;
} }
private static void ingestData(Map<String, String> file, IngestionProperties.DataFormat dataFormat, IngestClient ingestClient, String databaseName, String tableName, String mappingName) { private static void ingest(Map<String, String> file, IngestionProperties.DataFormat dataFormat, IngestClient ingestClient, String databaseName, String tableName, String mappingName) {
String sourceType = file.get("sourceType"); String sourceType = file.get("sourceType");
String uri = file.get("dataSourceUri"); String uri = file.get("dataSourceUri");
waitForUserToProceed(String.format("Ingest '%s' from %s", uri, sourceType)); waitForUserToProceed(String.format("Ingest '%s' from '%s'", uri, sourceType));
// Tip: When ingesting json files, if each line represents a single-line json, use MULTIJSON format even if the file only contains one line. // Tip: When ingesting json files, if each line represents a single-line json, use MULTIJSON format even if the file only contains one line.
// If the json contains whitespace formatting, use SINGLEJSON. In this case, only one data row json object is allowed per file. // If the json contains whitespace formatting, use SINGLEJSON. In this case, only one data row json object is allowed per file.
if (dataFormat == IngestionProperties.DataFormat.json) { if (dataFormat == IngestionProperties.DataFormat.json) {
@ -378,38 +386,38 @@ public class KustoSampleApp {
// Tip: Kusto's Java SDK can ingest data from files, blobs, java.sql.ResultSet objects, and open streams. // Tip: Kusto's Java SDK can ingest data from files, blobs, java.sql.ResultSet objects, and open streams.
// See the SDK's kusto-samples module and the E2E tests in kusto-ingest for additional references. // See the SDK's kusto-samples module and the E2E tests in kusto-ingest for additional references.
if (sourceType.equalsIgnoreCase("localfilesource")) { if (sourceType.equalsIgnoreCase("localfilesource")) {
ingestDataFromFile(ingestClient, databaseName, tableName, uri, dataFormat, mappingName); ingestFromFile(ingestClient, databaseName, tableName, uri, dataFormat, mappingName);
} else if (sourceType.equalsIgnoreCase("blobsource")) { } else if (sourceType.equalsIgnoreCase("blobsource")) {
ingestDataFromBlob(ingestClient, databaseName, tableName, uri, dataFormat, mappingName); ingestFromBlob(ingestClient, databaseName, tableName, uri, dataFormat, mappingName);
} else { } else {
System.out.printf("Unknown source '%s' for file '%s'%n", sourceType, uri); System.out.printf("Unknown source '%s' for file '%s'%n", sourceType, uri);
} }
} }
private static void ingestDataFromFile(IngestClient ingestClient, String databaseName, String tableName, String uri, IngestionProperties.DataFormat dataFormat, String mappingName) { private static void ingestFromFile(IngestClient ingestClient, String databaseName, String tableName, String filePath, IngestionProperties.DataFormat dataFormat, String mappingName) {
IngestionProperties ingestionProperties = createIngestionProperties(databaseName, tableName, dataFormat, mappingName); IngestionProperties ingestionProperties = createIngestionProperties(databaseName, tableName, dataFormat, mappingName);
// Tip 1: For optimal ingestion batching and performance, specify the uncompressed data size in the file descriptor (e.g. fileToIngest.length()). // Tip 1: For optimal ingestion batching and performance, specify the uncompressed data size in the file descriptor (e.g. fileToIngest.length()) instead of the default below of 0.
// Otherwise, the service will determine the file size, requiring an additional s2s call and may not be accurate for compressed files. // Otherwise, the service will determine the file size, requiring an additional s2s call and may not be accurate for compressed files.
// Tip 2: To correlate between ingestion operations in your applications and Kusto, set the source id and log it somewhere. // Tip 2: To correlate between ingestion operations in your applications and Kusto, set the source ID and log it somewhere.
FileSourceInfo fileSourceInfo = new FileSourceInfo(uri, 0, UUID.randomUUID()); FileSourceInfo fileSourceInfo = new FileSourceInfo(filePath, 0, UUID.randomUUID());
try { try {
ingestClient.ingestFromFile(fileSourceInfo, ingestionProperties); ingestClient.ingestFromFile(fileSourceInfo, ingestionProperties);
} catch (IngestionClientException e) { } catch (IngestionClientException e) {
System.out.printf("Client exception while trying to ingest '%s' into '%s.%s'%n%n", uri, databaseName, tableName); System.out.printf("Client exception while trying to ingest '%s' into '%s.%s'%n%n", filePath, databaseName, tableName);
e.printStackTrace(); e.printStackTrace();
} catch (IngestionServiceException e) { } catch (IngestionServiceException e) {
System.out.printf("Service exception while trying to ingest '%s' into '%s.%s'%n%n", uri, databaseName, tableName); System.out.printf("Service exception while trying to ingest '%s' into '%s.%s'%n%n", filePath, databaseName, tableName);
e.printStackTrace(); e.printStackTrace();
} }
} }
private static void ingestDataFromBlob(IngestClient ingestClient, String databaseName, String tableName, String blobUrl, IngestionProperties.DataFormat dataFormat, String mappingName) { private static void ingestFromBlob(IngestClient ingestClient, String databaseName, String tableName, String blobUrl, IngestionProperties.DataFormat dataFormat, String mappingName) {
IngestionProperties ingestionProperties = createIngestionProperties(databaseName, tableName, dataFormat, mappingName); IngestionProperties ingestionProperties = createIngestionProperties(databaseName, tableName, dataFormat, mappingName);
// Tip 1: For optimal ingestion batching and performance, specify the uncompressed data size in the file descriptor instead of the default below of 0. // Tip 1: For optimal ingestion batching and performance, specify the uncompressed data size in the file descriptor instead of the default below of 0.
// Otherwise, the service will determine the file size, requiring an additional s2s call and may not be accurate for compressed files. // Otherwise, the service will determine the file size, requiring an additional s2s call and may not be accurate for compressed files.
// Tip 2: To correlate between ingestion operations in your applications and Kusto, set the source id and log it somewhere. // Tip 2: To correlate between ingestion operations in your applications and Kusto, set the source ID and log it somewhere.
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobUrl, 0, UUID.randomUUID()); BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobUrl, 0, UUID.randomUUID());
try { try {
ingestClient.ingestFromBlob(blobSourceInfo, ingestionProperties); ingestClient.ingestFromBlob(blobSourceInfo, ingestionProperties);
@ -430,7 +438,7 @@ public class KustoSampleApp {
if (StringUtils.isNotBlank(mappingName) && dataFormat != null) { if (StringUtils.isNotBlank(mappingName) && dataFormat != null) {
ingestionProperties.setIngestionMapping(mappingName, dataFormat.getIngestionMappingKind()); ingestionProperties.setIngestionMapping(mappingName, dataFormat.getIngestionMappingKind());
} }
// TODO - Config: Setting the ingestion batching policy takes up to 5 minutes to take effect. // TODO (config - optional): Setting the ingestion batching policy takes up to 5 minutes to take effect.
// We therefore set Flush-Immediately for the sake of the sample, but it generally shouldn't be used in practice. // We therefore set Flush-Immediately for the sake of the sample, but it generally shouldn't be used in practice.
// Comment out the line below after running the sample the first few times. // Comment out the line below after running the sample the first few times.
ingestionProperties.setFlushImmediately(true); ingestionProperties.setFlushImmediately(true);
@ -438,7 +446,7 @@ public class KustoSampleApp {
} }
private static void waitForIngestionToComplete() throws InterruptedException { private static void waitForIngestionToComplete() throws InterruptedException {
System.out.printf("Sleeping %s seconds for queued ingestion to complete. Note: This may take longer depending on the file size and ingestion policy.%n", WAIT_FOR_INGEST_SECONDS); System.out.printf("Sleeping %s seconds for queued ingestion to complete. Note: This may take longer depending on the file size and ingestion batching policy.%n", WAIT_FOR_INGEST_SECONDS);
for (int i = WAIT_FOR_INGEST_SECONDS; i >= 0; i--) { for (int i = WAIT_FOR_INGEST_SECONDS; i >= 0; i--) {
System.out.printf("%s.", i); System.out.printf("%s.", i);
try { try {
@ -453,13 +461,13 @@ public class KustoSampleApp {
System.out.println(); System.out.println();
} }
private static void executeValidationQueries(Client kustoClient, String databaseName, String tableName, boolean ingestData) { private static void executeValidationQueries(Client kustoClient, String databaseName, String tableName, boolean shouldIngestData) {
String optionalPostIngestionMessage = ingestData ? "post-ingestion " : ""; String optionalPostIngestionMessage = shouldIngestData ? "post-ingestion " : "";
System.out.printf("Step %s: Get %srow count for '%s.%s'%n", step++, optionalPostIngestionMessage, databaseName, tableName); System.out.printf("Step %s: Get %srow count for '%s.%s'%n", step++, optionalPostIngestionMessage, databaseName, tableName);
executeQuery(kustoClient, databaseName, String.format("%s | count", tableName)); executeQuery(kustoClient, databaseName, String.format("%s | count", tableName));
System.out.println(); System.out.println();
System.out.printf("Step %s: Get sample of %sdata:%n", step++, optionalPostIngestionMessage); System.out.printf("Step %s: Get sample (2 records) of %sdata:%n", step++, optionalPostIngestionMessage);
executeQuery(kustoClient, databaseName, String.format("%s | take 2", tableName)); executeQuery(kustoClient, databaseName, String.format("%s | take 2", tableName));
System.out.println(); System.out.println();
} }