From 2c583f3a7adb6e6c1700613952523fa05440233d Mon Sep 17 00:00:00 2001 From: Yihezkel Schoenbrun Date: Tue, 11 Jan 2022 16:07:28 +0200 Subject: [PATCH] Fix enum nomenclature (#209) * Fix enum nomenclature * Fix enum nomenclature * Fix IngestionBlobInfo to use the String versions of the enums * Add KustoValue to IngestionReportLevel and IngestionReportMethod enums * From code review --- .../azure/kusto/ingest/ColumnMapping.java | 24 +++-- .../azure/kusto/ingest/IngestionBlobInfo.java | 10 +- .../azure/kusto/ingest/IngestionMapping.java | 19 +++- .../kusto/ingest/IngestionProperties.java | 96 ++++++++++++------- .../kusto/ingest/QueuedIngestClient.java | 28 ++++-- .../kusto/ingest/StreamingIngestClient.java | 6 +- .../kusto/ingest/AzureStorageClientTest.java | 37 +++---- .../microsoft/azure/kusto/ingest/E2ETest.java | 37 ++++--- .../ManagedStreamingIngestClientTest.java | 51 +++++----- .../kusto/ingest/QueuedIngestClientTest.java | 57 +++++++---- .../ingest/StreamingIngestClientTest.java | 54 +++++++---- .../kusto/quickstart/KustoSampleApp.java | 35 ++++--- samples/README.md | 8 +- samples/src/main/java/FileIngestion.java | 12 ++- .../java/FileIngestionCompletableFuture.java | 2 +- samples/src/main/java/StreamingIngest.java | 18 ++-- samples/src/main/java/TableStatus.java | 8 +- 17 files changed, 311 insertions(+), 191 deletions(-) diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ColumnMapping.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ColumnMapping.java index e140ecc5..f873b09e 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ColumnMapping.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ColumnMapping.java @@ -91,27 +91,25 @@ public class ColumnMapping implements Serializable { return properties.get(MappingConsts.STORAGE_DATA_TYPE.getName()); } - public boolean isValid(IngestionMapping.IngestionMappingKind mappingKind) - { - switch (mappingKind) - { - case Csv: - case SStream: + public boolean isValid(IngestionMapping.IngestionMappingKind mappingKind) { + switch (mappingKind) { + case CSV: + case SSTREAM: return !StringUtils.isEmpty(this.columnName); - case Json: - case Parquet: - case Orc: - case W3CLogFile: + case JSON: + case PARQUET: + case ORC: + case W3CLOGFILE: TransformationMethod transformationMethod = getTransform(); return !StringUtils.isEmpty(this.columnName) && (!StringUtils.isEmpty(getPath()) || transformationMethod == TransformationMethod.SourceLineNumber || transformationMethod == TransformationMethod.SourceLocation); - case Avro: - case ApacheAvro: + case AVRO: + case APACHEAVRO: return !StringUtils.isEmpty(this.columnName) && !StringUtils.isEmpty(getColumns()); default: return false; } } -} +} \ No newline at end of file diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestionBlobInfo.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestionBlobInfo.java index 852042e7..a56af983 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestionBlobInfo.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestionBlobInfo.java @@ -15,8 +15,8 @@ final public class IngestionBlobInfo { public String tableName; public UUID id; public Boolean retainBlobOnSuccess; - public IngestionProperties.IngestionReportLevel reportLevel; - public IngestionProperties.IngestionReportMethod reportMethod; + public String reportLevel; + public String reportMethod; public Boolean flushImmediately; public IngestionStatusInTableDescription IngestionStatusInTable; @@ -29,7 +29,7 @@ final public class IngestionBlobInfo { id = UUID.randomUUID(); retainBlobOnSuccess = true; flushImmediately = false; - reportLevel = IngestionProperties.IngestionReportLevel.FailuresOnly; - reportMethod = IngestionProperties.IngestionReportMethod.Queue; + reportLevel = IngestionProperties.IngestionReportLevel.FAILURES_ONLY.getKustoValue(); + reportMethod = IngestionProperties.IngestionReportMethod.QUEUE.getKustoValue(); } -} +} \ No newline at end of file diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestionMapping.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestionMapping.java index 1c892458..29372720 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestionMapping.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestionMapping.java @@ -94,6 +94,23 @@ public class IngestionMapping { Represents an ingestion mapping kind - the format of the source data to map from. */ public enum IngestionMappingKind { - Csv, Json, Avro, Parquet, SStream, Orc, ApacheAvro, W3CLogFile + CSV("Csv"), + JSON("Json"), + AVRO("Avro"), + PARQUET("Parquet"), + SSTREAM("SStream"), + ORC("Orc"), + APACHEAVRO("ApacheAvro"), + W3CLOGFILE("W3CLogFile"); + + private final String kustoValue; + + IngestionMappingKind(String kustoValue) { + this.kustoValue = kustoValue; + } + + public String getKustoValue() { + return kustoValue; + } } } \ No newline at end of file diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestionProperties.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestionProperties.java index cf6580b2..c9691f6b 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestionProperties.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestionProperties.java @@ -53,8 +53,8 @@ public class IngestionProperties { public IngestionProperties(String databaseName, String tableName) { this.databaseName = databaseName; this.tableName = tableName; - this.reportLevel = IngestionReportLevel.FailuresOnly; - this.reportMethod = IngestionReportMethod.Queue; + this.reportLevel = IngestionReportLevel.FAILURES_ONLY; + this.reportMethod = IngestionReportMethod.QUEUE; this.flushImmediately = false; this.additionalProperties = new HashMap<>(); this.dropByTags = new ArrayList<>(); @@ -62,7 +62,7 @@ public class IngestionProperties { this.ingestIfNotExists = new ArrayList<>(); this.additionalTags = new ArrayList<>(); this.ingestionMapping = new IngestionMapping(); - this.dataFormat = DataFormat.csv; + this.dataFormat = DataFormat.CSV; } /** @@ -212,12 +212,12 @@ public class IngestionProperties { fullAdditionalProperties.put("ingestIfNotExists", ingestIfNotExistsJson); } fullAdditionalProperties.putAll(additionalProperties); - fullAdditionalProperties.put("format", dataFormat.name()); + fullAdditionalProperties.put("format", dataFormat.getKustoValue()); String mappingReference = ingestionMapping.getIngestionMappingReference(); if (StringUtils.isNotBlank(mappingReference)) { fullAdditionalProperties.put("ingestionMappingReference", mappingReference); - fullAdditionalProperties.put("ingestionMappingType", ingestionMapping.getIngestionMappingKind().toString()); + fullAdditionalProperties.put("ingestionMappingType", ingestionMapping.getIngestionMappingKind().getKustoValue()); } else if (ingestionMapping.getColumnMappings() != null) { ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, Visibility.NONE); @@ -225,7 +225,7 @@ public class IngestionProperties { String mapping = objectMapper.writeValueAsString(ingestionMapping.getColumnMappings()); fullAdditionalProperties.put("ingestionMapping", mapping); - fullAdditionalProperties.put("ingestionMappingType", ingestionMapping.getIngestionMappingKind().toString()); + fullAdditionalProperties.put("ingestionMappingType", ingestionMapping.getIngestionMappingKind().getKustoValue()); } return fullAdditionalProperties; @@ -235,7 +235,7 @@ public class IngestionProperties { * Sets the data format. * * @param dataFormat One of the values in: {@link DataFormat DataFormat} - * @throws IllegalArgumentException if null argument is passed + * @throws IllegalArgumentException if null argument is passed */ public void setDataFormat(@NotNull DataFormat dataFormat) { Ensure.argIsNotNull(dataFormat, "dataFormat"); @@ -250,7 +250,7 @@ public class IngestionProperties { */ public void setDataFormat(@NotNull String dataFormatName) { try { - this.dataFormat = DataFormat.valueOf(dataFormatName.toLowerCase()); + this.dataFormat = DataFormat.valueOf(dataFormatName.toUpperCase()); } catch (IllegalArgumentException ex) { log.warn("IngestionProperties.setDataFormat(): Invalid dataFormatName of {}. Per the API's specification, DataFormat property value wasn't set.", dataFormatName); } @@ -314,7 +314,7 @@ public class IngestionProperties { if ((ingestionMapping.getColumnMappings() == null) && StringUtils.isBlank(mappingReference)) { if (dataFormat.isMappingRequired()) { - message.appendln("Mapping must be specified for '%s' format.", dataFormat.name()); + message.appendln("Mapping must be specified for '%s' format.", dataFormat.getKustoValue()); } if (ingestionMappingKind != null) { @@ -323,7 +323,7 @@ public class IngestionProperties { } else { // a mapping was provided if (dataFormat.getIngestionMappingKind() != null && !dataFormat.getIngestionMappingKind().equals(ingestionMappingKind)) { message.appendln("Wrong ingestion mapping for format '%s'; mapping kind should be '%s', but was '%s'.", - dataFormat.name(), dataFormat.getIngestionMappingKind(), ingestionMappingKind != null ? ingestionMappingKind.name() : "null"); + dataFormat.getKustoValue(), dataFormat.getIngestionMappingKind().getKustoValue(), ingestionMappingKind != null ? ingestionMappingKind.getKustoValue() : "null"); } if (ingestionMapping.getColumnMappings() != null) { @@ -349,41 +349,47 @@ public class IngestionProperties { } public void validateResultSetProperties() throws IngestionClientException { - Ensure.isTrue(IngestionProperties.DataFormat.csv.equals(dataFormat), + Ensure.isTrue(IngestionProperties.DataFormat.CSV.equals(dataFormat), String.format("ResultSet translates into csv format but '%s' was given", dataFormat)); validate(); } public enum DataFormat { - csv(IngestionMapping.IngestionMappingKind.Csv, false, true), - tsv(IngestionMapping.IngestionMappingKind.Csv, false, true), - scsv(IngestionMapping.IngestionMappingKind.Csv, false, true), - sohsv(IngestionMapping.IngestionMappingKind.Csv, false, true), - psv(IngestionMapping.IngestionMappingKind.Csv, false, true), - txt(IngestionMapping.IngestionMappingKind.Csv, false, true), - tsve(IngestionMapping.IngestionMappingKind.Csv, false, true), - json(IngestionMapping.IngestionMappingKind.Json, true, true), - singlejson(IngestionMapping.IngestionMappingKind.Json, true, true), - multijson(IngestionMapping.IngestionMappingKind.Json, true, true), - avro(IngestionMapping.IngestionMappingKind.Avro, true, false), - apacheavro(IngestionMapping.IngestionMappingKind.ApacheAvro, false, true), - parquet(IngestionMapping.IngestionMappingKind.Parquet, false, false), - sstream(IngestionMapping.IngestionMappingKind.SStream, false, true), - orc(IngestionMapping.IngestionMappingKind.Orc, false, false), - raw(IngestionMapping.IngestionMappingKind.Csv, false, true), - w3clogfile(IngestionMapping.IngestionMappingKind.W3CLogFile, false, true); + CSV("csv", IngestionMapping.IngestionMappingKind.CSV, false, true), + TSV("tsv", IngestionMapping.IngestionMappingKind.CSV, false, true), + SCSV("scsv", IngestionMapping.IngestionMappingKind.CSV, false, true), + SOHSV("sohsv", IngestionMapping.IngestionMappingKind.CSV, false, true), + PSV("psv", IngestionMapping.IngestionMappingKind.CSV, false, true), + TXT("txt", IngestionMapping.IngestionMappingKind.CSV, false, true), + TSVE("tsve", IngestionMapping.IngestionMappingKind.CSV, false, true), + JSON("json", IngestionMapping.IngestionMappingKind.JSON, true, true), + SINGLEJSON("singlejson", IngestionMapping.IngestionMappingKind.JSON, true, true), + MULTIJSON("multijson", IngestionMapping.IngestionMappingKind.JSON, true, true), + AVRO("avro", IngestionMapping.IngestionMappingKind.AVRO, true, false), + APACHEAVRO("apacheavro", IngestionMapping.IngestionMappingKind.APACHEAVRO, false, true), + PARQUET("parquet", IngestionMapping.IngestionMappingKind.PARQUET, false, false), + SSTREAM("sstream", IngestionMapping.IngestionMappingKind.SSTREAM, false, true), + ORC("orc", IngestionMapping.IngestionMappingKind.ORC, false, false), + RAW("raw", IngestionMapping.IngestionMappingKind.CSV, false, true), + W3CLOGFILE("w3clogfile", IngestionMapping.IngestionMappingKind.W3CLOGFILE, false, true); + private final String kustoValue; private final IngestionMapping.IngestionMappingKind ingestionMappingKind; private final boolean mappingRequired; private final boolean compressible; - DataFormat(IngestionMapping.IngestionMappingKind ingestionMappingKind, boolean mappingRequired, boolean compressible) { + DataFormat(String kustoValue, IngestionMapping.IngestionMappingKind ingestionMappingKind, boolean mappingRequired, boolean compressible) { + this.kustoValue = kustoValue; this.ingestionMappingKind = ingestionMappingKind; this.mappingRequired = mappingRequired; this.compressible = compressible; } + public String getKustoValue() { + return kustoValue; + } + public IngestionMapping.IngestionMappingKind getIngestionMappingKind() { return ingestionMappingKind; } @@ -398,14 +404,34 @@ public class IngestionProperties { } public enum IngestionReportLevel { - FailuresOnly, - None, - FailuresAndSuccesses + FAILURES_ONLY("FailuresOnly"), + NONE("None"), + FAILURES_AND_SUCCESSES("FailuresAndSuccesses"); + + private final String kustoValue; + + IngestionReportLevel(String kustoValue) { + this.kustoValue = kustoValue; + } + + public String getKustoValue() { + return kustoValue; + } } public enum IngestionReportMethod { - Queue, - Table, - QueueAndTable + QUEUE("Queue"), + TABLE("Table"), + QUEUE_AND_TABLE("QueueAndTable"); + + private final String kustoValue; + + IngestionReportMethod(String kustoValue) { + this.kustoValue = kustoValue; + } + + public String getKustoValue() { + return kustoValue; + } } } \ No newline at end of file diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/QueuedIngestClient.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/QueuedIngestClient.java index b5e4bf17..ec2c7e7c 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/QueuedIngestClient.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/QueuedIngestClient.java @@ -10,8 +10,17 @@ import com.microsoft.azure.kusto.data.Ensure; import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException; import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException; -import com.microsoft.azure.kusto.ingest.result.*; -import com.microsoft.azure.kusto.ingest.source.*; +import com.microsoft.azure.kusto.ingest.result.IngestionResult; +import com.microsoft.azure.kusto.ingest.result.IngestionStatus; +import com.microsoft.azure.kusto.ingest.result.IngestionStatusInTableDescription; +import com.microsoft.azure.kusto.ingest.result.IngestionStatusResult; +import com.microsoft.azure.kusto.ingest.result.OperationStatus; +import com.microsoft.azure.kusto.ingest.result.TableReportIngestionResult; +import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo; +import com.microsoft.azure.kusto.ingest.source.CompressionType; +import com.microsoft.azure.kusto.ingest.source.FileSourceInfo; +import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo; +import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.CloudBlockBlob; import com.univocity.parsers.csv.CsvRoutines; @@ -60,8 +69,7 @@ public class QueuedIngestClient extends IngestClientBase implements IngestClient } public static String generateDmUriSuggestion(URIBuilder existingEndpoint) { - if (existingEndpoint.getHost().toLowerCase().startsWith(INGEST_PREFIX)) - { + if (existingEndpoint.getHost().toLowerCase().startsWith(INGEST_PREFIX)) { throw new IllegalArgumentException("The URL is already formatted as the suggested DM endpoint, so no suggestion can be made"); } existingEndpoint.setHost(INGEST_PREFIX + existingEndpoint.getHost()); @@ -92,8 +100,8 @@ public class QueuedIngestClient extends IngestClientBase implements IngestClient log.warn("Blob '{}' was sent for ingestion without specifying its raw data size", urlWithoutSecrets); } - ingestionBlobInfo.reportLevel = ingestionProperties.getReportLevel(); - ingestionBlobInfo.reportMethod = ingestionProperties.getReportMethod(); + ingestionBlobInfo.reportLevel = ingestionProperties.getReportLevel().getKustoValue(); + ingestionBlobInfo.reportMethod = ingestionProperties.getReportMethod().getKustoValue(); ingestionBlobInfo.flushImmediately = ingestionProperties.getFlushImmediately(); ingestionBlobInfo.additionalProperties = ingestionProperties.getIngestionProperties(); if (blobSourceInfo.getSourceId() != null) { @@ -107,8 +115,8 @@ public class QueuedIngestClient extends IngestClientBase implements IngestClient status.updatedOn = Date.from(Instant.now()); status.ingestionSourceId = ingestionBlobInfo.id; status.setIngestionSourcePath(urlWithoutSecrets); - boolean reportToTable = ingestionBlobInfo.reportLevel != IngestionProperties.IngestionReportLevel.None - && ingestionProperties.getReportMethod() != IngestionProperties.IngestionReportMethod.Queue; + boolean reportToTable = !IngestionProperties.IngestionReportLevel.NONE.equals(ingestionProperties.getReportLevel()) && + !IngestionProperties.IngestionReportMethod.QUEUE.equals(ingestionProperties.getReportMethod()); if (reportToTable) { status.status = OperationStatus.Pending; String tableStatusUri = resourceManager @@ -163,7 +171,7 @@ public class QueuedIngestClient extends IngestClientBase implements IngestClient file.getName(), ingestionProperties.getDatabaseName(), ingestionProperties.getTableName(), - dataFormat.name(), // Used to use an empty string if the DataFormat was empty. Now it can't be empty, with a default of CSV. + dataFormat.getKustoValue(), // Used to use an empty string if the DataFormat was empty. Now it can't be empty, with a default of CSV. shouldCompress ? CompressionType.gz : sourceCompressionType); CloudBlockBlob blob = azureStorageClient.uploadLocalFileToBlob(fileSourceInfo.getFilePath(), blobName, @@ -209,7 +217,7 @@ public class QueuedIngestClient extends IngestClientBase implements IngestClient "StreamUpload", ingestionProperties.getDatabaseName(), ingestionProperties.getTableName(), - dataFormat.name(), // Used to use an empty string if the DataFormat was empty. Now it can't be empty, with a default of CSV. + dataFormat.getKustoValue(), // Used to use an empty string if the DataFormat was empty. Now it can't be empty, with a default of CSV. shouldCompress ? CompressionType.gz : streamSourceInfo.getCompressionType()); CloudBlockBlob blob = azureStorageClient.uploadStreamToBlob( diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/StreamingIngestClient.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/StreamingIngestClient.java index 4043ce28..0326435c 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/StreamingIngestClient.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/StreamingIngestClient.java @@ -24,7 +24,6 @@ import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo; import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.CloudBlockBlob; - import org.apache.commons.lang3.StringUtils; import org.apache.http.client.utils.URIBuilder; import org.jetbrains.annotations.Nullable; @@ -141,7 +140,7 @@ public class StreamingIngestClient extends IngestClientBase implements IngestCli streamSourceInfo.validate(); ingestionProperties.validate(); if (dataFormat.isMappingRequired() && StringUtils.isBlank(ingestionProperties.getIngestionMapping().getIngestionMappingReference())) { - throw new IngestionClientException(String.format("Mapping reference must be specified for DataFormat '%s' in streaming ingestion.", dataFormat.name())); + throw new IngestionClientException(String.format("Mapping reference must be specified for DataFormat '%s' in streaming ingestion.", dataFormat.getKustoValue())); } ClientRequestProperties clientRequestProperties = null; @@ -157,10 +156,9 @@ public class StreamingIngestClient extends IngestClientBase implements IngestCli ingestionProperties.getTableName(), stream, clientRequestProperties, - dataFormat.name(), + dataFormat.getKustoValue(), ingestionProperties.getIngestionMapping().getIngestionMappingReference(), !(streamSourceInfo.getCompressionType() == null || !streamSourceInfo.isLeaveOpen())); - } catch (DataClientException | IOException e) { log.error(e.getMessage(), e); throw new IngestionClientException(e.getMessage(), e); diff --git a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/AzureStorageClientTest.java b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/AzureStorageClientTest.java index c8d4faed..3ebf0e5f 100644 --- a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/AzureStorageClientTest.java +++ b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/AzureStorageClientTest.java @@ -19,7 +19,12 @@ import java.net.URISyntaxException; import java.nio.file.Paths; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.isA; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; class AzureStorageClientTest { static private final AzureStorageClient azureStorageClient = new AzureStorageClient(); @@ -35,7 +40,7 @@ class AzureStorageClientTest { testFilePath = Paths.get("src", "test", "resources", "testdata.json").toString(); testFile = new File(testFilePath); testFilePathCompressed = Paths.get("src", "test", "resources", "testdata.json.gz").toString(); - blob = new CloudBlockBlob(new URI("https://ms.com/storageUri")); + blob = new CloudBlockBlob(new URI("https://testcontosourl.com/storageUrl")); } @BeforeEach @@ -61,11 +66,11 @@ class AzureStorageClientTest { void PostMessageToQueue_NullEntity_IllegalArgumentException() { assertThrows( IllegalArgumentException.class, - () -> azureStorageClient.azureTableInsertEntity("tableUri", null)); + () -> azureStorageClient.azureTableInsertEntity("tableUrl", null)); } @Test - void PostMessageToQueue_NullTableUri_IllegalArgumentException() { + void PostMessageToQueue_NullTableUrl_IllegalArgumentException() { TableServiceEntity serviceEntity = new TableServiceEntity(); assertThrows( IllegalArgumentException.class, @@ -77,7 +82,7 @@ class AzureStorageClientTest { throws IOException, StorageException, URISyntaxException { doNothing().when(azureStorageClientSpy).compressAndUploadFileToBlob(anyString(), any(CloudBlockBlob.class)); - azureStorageClientSpy.uploadLocalFileToBlob(testFilePath, "blobName", "https://ms.com/blob", IngestionProperties.DataFormat.csv); + azureStorageClientSpy.uploadLocalFileToBlob(testFilePath, "blobName", "https://testcontosourl.com/blob", IngestionProperties.DataFormat.CSV); verify(azureStorageClientSpy).compressAndUploadFileToBlob(anyString(), any(CloudBlockBlob.class)); } @@ -86,7 +91,7 @@ class AzureStorageClientTest { throws IOException, StorageException, URISyntaxException { doNothing().when(azureStorageClientSpy).uploadFileToBlob(any(File.class), any(CloudBlockBlob.class)); - azureStorageClientSpy.uploadLocalFileToBlob(testFilePathCompressed, "blobName", "https://ms.com/blob", IngestionProperties.DataFormat.csv); + azureStorageClientSpy.uploadLocalFileToBlob(testFilePathCompressed, "blobName", "https://testcontosourl.com/blob", IngestionProperties.DataFormat.CSV); verify(azureStorageClientSpy).uploadFileToBlob(any(File.class), any(CloudBlockBlob.class)); } @@ -94,21 +99,21 @@ class AzureStorageClientTest { void UploadLocalFileToBlob_NullFilePath_IllegalArgumentException() { assertThrows( IllegalArgumentException.class, - () -> azureStorageClient.uploadLocalFileToBlob(null, "blobName", "storageUri", IngestionProperties.DataFormat.csv)); + () -> azureStorageClient.uploadLocalFileToBlob(null, "blobName", "storageUrl", IngestionProperties.DataFormat.CSV)); } @Test void UploadLocalFileToBlob_NullBlobName_IllegalArgumentException() { assertThrows( IllegalArgumentException.class, - () -> azureStorageClient.uploadLocalFileToBlob(testFilePath, null, "storageUri", IngestionProperties.DataFormat.csv)); + () -> azureStorageClient.uploadLocalFileToBlob(testFilePath, null, "storageUrl", IngestionProperties.DataFormat.CSV)); } @Test - void UploadLocalFileToBlob_NullStorageUri_IllegalArgumentException() { + void UploadLocalFileToBlob_NullStorageUrl_IllegalArgumentException() { assertThrows( IllegalArgumentException.class, - () -> azureStorageClient.uploadLocalFileToBlob(testFilePath, "blobName", null, IngestionProperties.DataFormat.csv)); + () -> azureStorageClient.uploadLocalFileToBlob(testFilePath, "blobName", null, IngestionProperties.DataFormat.CSV)); } @Test @@ -116,7 +121,7 @@ class AzureStorageClientTest { String notExistingFilePath = "not.existing.file.path"; assertThrows( IOException.class, - () -> azureStorageClient.uploadLocalFileToBlob(notExistingFilePath, "blobName", "storageUri", IngestionProperties.DataFormat.csv)); + () -> azureStorageClient.uploadLocalFileToBlob(notExistingFilePath, "blobName", "storageUrl", IngestionProperties.DataFormat.CSV)); } @Test @@ -125,7 +130,7 @@ class AzureStorageClientTest { try (InputStream stream = new FileInputStream(testFilePath)) { doNothing().when(azureStorageClientSpy).uploadStream(any(InputStream.class), any(CloudBlockBlob.class)); - azureStorageClientSpy.uploadStreamToBlob(stream, "blobName", "https://ms.com/storageUri", false); + azureStorageClientSpy.uploadStreamToBlob(stream, "blobName", "https://ms.com/storageUrl", false); verify(azureStorageClientSpy).uploadStream(isA(InputStream.class), isA(CloudBlockBlob.class)); } } @@ -136,7 +141,7 @@ class AzureStorageClientTest { try (InputStream stream = new FileInputStream(testFilePath)) { doNothing().when(azureStorageClientSpy) .compressAndUploadStream(any(InputStream.class), any(CloudBlockBlob.class)); - azureStorageClientSpy.uploadStreamToBlob(stream, "blobName", "https://ms.com/storageUri", true); + azureStorageClientSpy.uploadStreamToBlob(stream, "blobName", "https://testcontosourl.com/storageUrl", true); verify(azureStorageClientSpy).compressAndUploadStream(isA(InputStream.class), isA(CloudBlockBlob.class)); } } @@ -145,7 +150,7 @@ class AzureStorageClientTest { void UploadStreamToBlob_NullInputStream_IllegalArgumentException() { assertThrows( IllegalArgumentException.class, - () -> azureStorageClient.uploadStreamToBlob(null, "blobName", "storageUri", false)); + () -> azureStorageClient.uploadStreamToBlob(null, "blobName", "storageUrl", false)); } @@ -154,12 +159,12 @@ class AzureStorageClientTest { try (InputStream stream = new FileInputStream(testFilePath)) { assertThrows( IllegalArgumentException.class, - () -> azureStorageClient.uploadStreamToBlob(stream, null, "storageUri", false)); + () -> azureStorageClient.uploadStreamToBlob(stream, null, "storageUrl", false)); } } @Test - void UploadStreamToBlob_NullStorageUri_IllegalArgumentException() throws IOException { + void UploadStreamToBlob_NullStorageUrl_IllegalArgumentException() throws IOException { try (InputStream stream = new FileInputStream(testFilePath)) { assertThrows( IllegalArgumentException.class, diff --git a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/E2ETest.java b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/E2ETest.java index 2dd6a075..628c9143 100644 --- a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/E2ETest.java +++ b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/E2ETest.java @@ -22,13 +22,22 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.StopWatch; import org.json.JSONArray; import org.json.JSONObject; -import org.junit.jupiter.api.*; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.*; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.net.URISyntaxException; import java.nio.file.Files; import java.nio.file.Paths; @@ -41,7 +50,11 @@ import java.util.Calendar; import java.util.List; import java.util.concurrent.Callable; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; class E2ETest { private static IngestClient ingestClient; @@ -135,23 +148,23 @@ class E2ETest { private static void createTestData() { IngestionProperties ingestionPropertiesWithoutMapping = new IngestionProperties(databaseName, tableName); ingestionPropertiesWithoutMapping.setFlushImmediately(true); - ingestionPropertiesWithoutMapping.setDataFormat(DataFormat.csv); + ingestionPropertiesWithoutMapping.setDataFormat(DataFormat.CSV); IngestionProperties ingestionPropertiesWithMappingReference = new IngestionProperties(databaseName, tableName); ingestionPropertiesWithMappingReference.setFlushImmediately(true); - ingestionPropertiesWithMappingReference.setIngestionMapping(mappingReference, IngestionMappingKind.Json); - ingestionPropertiesWithMappingReference.setDataFormat(DataFormat.json); + ingestionPropertiesWithMappingReference.setIngestionMapping(mappingReference, IngestionMappingKind.JSON); + ingestionPropertiesWithMappingReference.setDataFormat(DataFormat.JSON); IngestionProperties ingestionPropertiesWithColumnMapping = new IngestionProperties(databaseName, tableName); ingestionPropertiesWithColumnMapping.setFlushImmediately(true); - ingestionPropertiesWithColumnMapping.setDataFormat(DataFormat.json); + ingestionPropertiesWithColumnMapping.setDataFormat(DataFormat.JSON); ColumnMapping first = new ColumnMapping("rownumber", "int"); first.setPath("$.rownumber"); ColumnMapping second = new ColumnMapping("rowguid", "string"); second.setPath("$.rowguid"); ColumnMapping[] columnMapping = new ColumnMapping[]{first, second}; - ingestionPropertiesWithColumnMapping.setIngestionMapping(columnMapping, IngestionMappingKind.Json); - ingestionPropertiesWithColumnMapping.setDataFormat(DataFormat.json); + ingestionPropertiesWithColumnMapping.setIngestionMapping(columnMapping, IngestionMappingKind.JSON); + ingestionPropertiesWithColumnMapping.setDataFormat(DataFormat.JSON); dataForTests = Arrays.asList(new TestDataItem() { { @@ -401,7 +414,7 @@ class E2ETest { void testParameterizedQuery() throws DataServiceException, DataClientException { IngestionProperties ingestionPropertiesWithoutMapping = new IngestionProperties(databaseName, tableName); ingestionPropertiesWithoutMapping.setFlushImmediately(true); - ingestionPropertiesWithoutMapping.setDataFormat(DataFormat.csv); + ingestionPropertiesWithoutMapping.setDataFormat(DataFormat.CSV); TestDataItem item = new TestDataItem() { { diff --git a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/ManagedStreamingIngestClientTest.java b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/ManagedStreamingIngestClientTest.java index 65e93fe4..916fd610 100644 --- a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/ManagedStreamingIngestClientTest.java +++ b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/ManagedStreamingIngestClientTest.java @@ -16,7 +16,6 @@ import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo; import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo; import com.microsoft.azure.storage.blob.CloudBlockBlob; import com.microsoft.azure.storage.table.TableServiceEntity; - import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -70,7 +69,7 @@ class ManagedStreamingIngestClientTest { private static final AzureStorageClient azureStorageClientMock = mock(AzureStorageClient.class); private static ManagedStreamingIngestClient managedStreamingIngestClient; private static IngestionProperties ingestionProperties; - private static final String STORAGE_URI = "https://ms.com/storageUri"; + private static final String STORAGE_URL = "https://testcontosourl.com/storageUrl"; @Mock private static StreamingClient streamingClientMock; @@ -94,14 +93,14 @@ class ManagedStreamingIngestClientTest { when(resourceManagerMock.getIdentityToken()).thenReturn("identityToken"); when(azureStorageClientMock.uploadStreamToBlob(any(InputStream.class), anyString(), anyString(), anyBoolean())) - .thenReturn(new CloudBlockBlob(new URI(STORAGE_URI))); + .thenReturn(new CloudBlockBlob(new URI(STORAGE_URL))); - when(azureStorageClientMock.getBlobPathWithSas(any(CloudBlockBlob.class))).thenReturn(STORAGE_URI); + when(azureStorageClientMock.getBlobPathWithSas(any(CloudBlockBlob.class))).thenReturn(STORAGE_URL); when(azureStorageClientMock.getBlobSize(anyString())).thenReturn(100L); when(azureStorageClientMock.uploadLocalFileToBlob(anyString(), anyString(), anyString(), anyBoolean())) - .thenReturn(new CloudBlockBlob(new URI(STORAGE_URI))); + .thenReturn(new CloudBlockBlob(new URI(STORAGE_URL))); doNothing().when(azureStorageClientMock).azureTableInsertEntity(anyString(), any(TableServiceEntity.class)); @@ -123,12 +122,12 @@ class ManagedStreamingIngestClientTest { managedStreamingIngestClient = new ManagedStreamingIngestClient(resourceManagerMock, azureStorageClientMock, streamingClientMock, retryTemplate); ingestionProperties = new IngestionProperties("dbName", "tableName"); - ingestionProperties.setIngestionMapping("mappingName", IngestionMapping.IngestionMappingKind.Json); + ingestionProperties.setIngestionMapping("mappingName", IngestionMapping.IngestionMappingKind.JSON); } @Test void IngestFromBlob_IngestionReportMethodIsQueue_IngestionStatusHardcoded() throws Exception { - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON); BlobSourceInfo blobSourceInfo = new BlobSourceInfo("http://blobPath.com", 100); IngestionResult result = managedStreamingIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties); assertEquals(1, result.getIngestionStatusesLength()); @@ -138,8 +137,8 @@ class ManagedStreamingIngestClientTest { @Test void IngestFromBlob_IngestionReportMethodIsTable_NotEmptyIngestionStatus() throws Exception { BlobSourceInfo blobSourceInfo = new BlobSourceInfo("http://blobPath.com", 100); - ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.Table); - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json); + ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.TABLE); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON); IngestionResult result = managedStreamingIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties); assertNotEquals(0, result.getIngestionStatusesLength()); } @@ -162,8 +161,8 @@ class ManagedStreamingIngestClientTest { @Test void IngestFromBlob_IngestionReportMethodIsTable_RemovesSecrets() throws Exception { BlobSourceInfo blobSourceInfo = new BlobSourceInfo("https://storage.table.core.windows.net/ingestionsstatus20190505?sv=2018-03-28&tn=ingestionsstatus20190505&sig=anAusomeSecret%2FK024xNydFzT%2B2cCE%2BA2S8Y6U%3D&st=2019-05-05T09%3A00%3A31Z&se=2019-05-09T10%3A00%3A31Z&sp=raud", 100); - ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.Table); - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json); + ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.TABLE); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON); ArgumentCaptor capture = ArgumentCaptor.forClass(TableServiceEntity.class); managedStreamingIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties); @@ -230,7 +229,7 @@ class ManagedStreamingIngestClientTest { @ValueSource(booleans = {true, false}) void IngestFromFile_Csv(boolean useSourceId) throws Exception { UUID sourceId = useSourceId ? CustomUUID : null; - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON); String resourcesDirectory = System.getProperty("user.dir") + "/src/test/resources/"; String path = resourcesDirectory + "testdata.csv"; FileSourceInfo fileSourceInfo = new FileSourceInfo(path, new File(path).length(), sourceId); @@ -258,8 +257,8 @@ class ManagedStreamingIngestClientTest { ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(InputStream.class); - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.csv); - ingestionProperties.setIngestionMapping("mappingName", IngestionMapping.IngestionMappingKind.Csv); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.CSV); + ingestionProperties.setIngestionMapping("mappingName", IngestionMapping.IngestionMappingKind.CSV); ResultSetSourceInfo resultSetSourceInfo = new ResultSetSourceInfo(resultSet, sourceId); OperationStatus status = managedStreamingIngestClient.ingestFromResultSet(resultSetSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status; assertEquals(OperationStatus.Succeeded, status); @@ -278,8 +277,8 @@ class ManagedStreamingIngestClientTest { FileSourceInfo fileSourceInfo = new FileSourceInfo(path, new File(path).length()); String contents = new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8).trim(); - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json); - ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.Json); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON); + ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.JSON); OperationStatus status = managedStreamingIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status; assertEquals(OperationStatus.Succeeded, status); verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(), @@ -295,8 +294,8 @@ class ManagedStreamingIngestClientTest { FileSourceInfo fileSourceInfo = new FileSourceInfo(path, new File(path).length()); AtomicBoolean visited = new AtomicBoolean(false); - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json); - ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.Json); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON); + ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.JSON); OperationStatus status; try { when(streamingClientMock.executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(), @@ -318,7 +317,7 @@ class ManagedStreamingIngestClientTest { @ParameterizedTest @CsvSource({"true,true", "false,true", "true,false", "false,false"}) void IngestFromStream_Success(boolean leaveOpen, boolean useSourceId) throws Exception { - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON); String data = "Name, Age, Weight, Height"; InputStream inputStream = new CloseableByteArrayInputStream(StandardCharsets.UTF_8.encode(data).array()); UUID sourceId = useSourceId ? CustomUUID : null; @@ -352,8 +351,8 @@ class ManagedStreamingIngestClientTest { String resourcesDirectory = System.getProperty("user.dir") + "/src/test/resources/"; String path = resourcesDirectory + "testdata.json.gz"; FileSourceInfo fileSourceInfo = new FileSourceInfo(path, new File(path).length()); - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json); - ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.Json); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON); + ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.JSON); when(streamingClientMock.executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(), any(ClientRequestProperties.class), any(String.class), eq("JsonMapping"), any(boolean.class))) @@ -387,7 +386,7 @@ class ManagedStreamingIngestClientTest { }); // Should fail 3 times and then succeed with the queued client - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON); managedStreamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties); assertEquals(ManagedStreamingIngestClient.ATTEMPT_COUNT, times[0]); } finally { @@ -416,7 +415,7 @@ class ManagedStreamingIngestClientTest { throw new DataServiceException("some cluster", "Some error", false); }).thenReturn(null); - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON); StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream, leaveOpen, sourceId); OperationStatus status = managedStreamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status; assertEquals(OperationStatus.Succeeded, status); @@ -465,7 +464,7 @@ class ManagedStreamingIngestClientTest { throw new DataServiceException("some cluster", "Some error", ex, false); }).thenReturn(null); - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON); StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream, leaveOpen, sourceId); OperationStatus status = managedStreamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status; assertEquals(OperationStatus.Succeeded, status); @@ -506,7 +505,7 @@ class ManagedStreamingIngestClientTest { }).thenAnswer((a) -> { throw new DataServiceException("some cluster", "Some error", ex, true); }).thenReturn(null); - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON); StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream); assertThrows(IngestionServiceException.class, () -> managedStreamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties)); } finally { @@ -525,7 +524,7 @@ class ManagedStreamingIngestClientTest { UUID sourceId = useSourceId ? CustomUUID : null; InputStream inputStream = new CloseableByteArrayInputStream(bytes); - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON); ArgumentCaptor capture = ArgumentCaptor.forClass(InputStream.class); diff --git a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/QueuedIngestClientTest.java b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/QueuedIngestClientTest.java index f3d14abc..3689e11b 100644 --- a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/QueuedIngestClientTest.java +++ b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/QueuedIngestClientTest.java @@ -9,7 +9,11 @@ import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException; import com.microsoft.azure.kusto.ingest.result.IngestionResult; import com.microsoft.azure.kusto.ingest.result.IngestionStatus; import com.microsoft.azure.kusto.ingest.result.OperationStatus; -import com.microsoft.azure.kusto.ingest.source.*; +import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo; +import com.microsoft.azure.kusto.ingest.source.CompressionType; +import com.microsoft.azure.kusto.ingest.source.FileSourceInfo; +import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo; +import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo; import com.microsoft.azure.storage.blob.CloudBlockBlob; import com.microsoft.azure.storage.table.TableServiceEntity; import org.junit.jupiter.api.BeforeAll; @@ -21,13 +25,30 @@ import java.io.FileInputStream; import java.io.InputStream; import java.net.URI; import java.nio.file.Paths; -import java.sql.*; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; import java.util.function.BiFunction; import static com.microsoft.azure.kusto.ingest.QueuedIngestClient.EXPECTED_SERVICE_TYPE; import static com.microsoft.azure.kusto.ingest.QueuedIngestClient.WRONG_ENDPOINT_MESSAGE; -import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; class QueuedIngestClientTest { private static final ResourceManager resourceManagerMock = mock(ResourceManager.class); @@ -35,7 +56,7 @@ class QueuedIngestClientTest { private static QueuedIngestClient queuedIngestClient; private static IngestionProperties ingestionProperties; private static String testFilePath; - private static final String STORAGE_URI = "https://ms.com/storageUri"; + private static final String STORAGE_URL = "https://testcontosourl.com/storageUrl"; private static final String ENDPOINT_SERVICE_TYPE_ENGINE = "Engine"; @BeforeAll @@ -52,14 +73,14 @@ class QueuedIngestClientTest { when(resourceManagerMock.getIdentityToken()).thenReturn("identityToken"); when(azureStorageClientMock.uploadStreamToBlob(any(InputStream.class), anyString(), anyString(), anyBoolean())) - .thenReturn(new CloudBlockBlob(new URI(STORAGE_URI))); + .thenReturn(new CloudBlockBlob(new URI(STORAGE_URL))); - when(azureStorageClientMock.getBlobPathWithSas(any(CloudBlockBlob.class))).thenReturn(STORAGE_URI); + when(azureStorageClientMock.getBlobPathWithSas(any(CloudBlockBlob.class))).thenReturn(STORAGE_URL); when(azureStorageClientMock.getBlobSize(anyString())).thenReturn(100L); when(azureStorageClientMock.uploadLocalFileToBlob(anyString(), anyString(), anyString(), anyBoolean())) - .thenReturn(new CloudBlockBlob(new URI(STORAGE_URI))); + .thenReturn(new CloudBlockBlob(new URI(STORAGE_URL))); doNothing().when(azureStorageClientMock).azureTableInsertEntity(anyString(), any(TableServiceEntity.class)); @@ -72,8 +93,8 @@ class QueuedIngestClientTest { queuedIngestClient = new QueuedIngestClient(resourceManagerMock, azureStorageClientMock); ingestionProperties = new IngestionProperties("dbName", "tableName"); - ingestionProperties.setIngestionMapping("mappingName", IngestionMapping.IngestionMappingKind.Csv); - ingestionProperties.setDataFormat(DataFormat.csv); + ingestionProperties.setIngestionMapping("mappingName", IngestionMapping.IngestionMappingKind.CSV); + ingestionProperties.setDataFormat(DataFormat.CSV); } @Test @@ -87,7 +108,7 @@ class QueuedIngestClientTest { @Test void IngestFromBlob_IngestionReportMethodIsTable_NotEmptyIngestionStatus() throws Exception { BlobSourceInfo blobSourceInfo = new BlobSourceInfo("http://blobPath.com", 100); - ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.Table); + ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.TABLE); IngestionResult result = queuedIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties); assertNotEquals(0, result.getIngestionStatusesLength()); @@ -111,7 +132,7 @@ class QueuedIngestClientTest { @Test void IngestFromBlob_IngestionReportMethodIsTable_RemovesSecrets() throws Exception { BlobSourceInfo blobSourceInfo = new BlobSourceInfo("https://storage.table.core.windows.net/ingestionsstatus20190505?sv=2018-03-28&tn=ingestionsstatus20190505&sig=anAusomeSecret%2FK024xNydFzT%2B2cCE%2BA2S8Y6U%3D&st=2019-05-05T09%3A00%3A31Z&se=2019-05-09T10%3A00%3A31Z&sp=raud", 100); - ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.Table); + ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.TABLE); ArgumentCaptor captur = ArgumentCaptor.forClass(TableServiceEntity.class); queuedIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties); @@ -296,16 +317,16 @@ class QueuedIngestClientTest { holder.name, "db1", "t1", - format.name(), + format.getKustoValue(), shouldCompress ? CompressionType.gz : compression); }; - String csvNoCompression = genName.apply(DataFormat.csv, null); + String csvNoCompression = genName.apply(DataFormat.CSV, null); assert (csvNoCompression.endsWith(".csv.gz")); - String csvCompression = genName.apply(DataFormat.csv, CompressionType.zip); + String csvCompression = genName.apply(DataFormat.CSV, CompressionType.zip); assert (csvCompression.endsWith(".csv.zip")); - String parquet = genName.apply(DataFormat.parquet, null); + String parquet = genName.apply(DataFormat.PARQUET, null); assert (parquet.endsWith(".parquet")); String avroLocalFileName = "avi.avro"; @@ -313,11 +334,11 @@ class QueuedIngestClientTest { CompressionType compressionTypeRes = AzureStorageClient.getCompression(avroLocalFileName); CompressionType compressionTypeRes2 = AzureStorageClient.getCompression(avroLocalCompressFileName); holder.name = avroLocalFileName; - String avroName = genName.apply(DataFormat.avro, compressionTypeRes); + String avroName = genName.apply(DataFormat.AVRO, compressionTypeRes); assert (avroName.endsWith(".avro")); holder.name = avroLocalCompressFileName; - String avroNameCompression = genName.apply(DataFormat.avro, compressionTypeRes2); + String avroNameCompression = genName.apply(DataFormat.AVRO, compressionTypeRes2); assert (avroNameCompression.endsWith(".avro.gz")); } diff --git a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/StreamingIngestClientTest.java b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/StreamingIngestClientTest.java index 93996ddb..4a2550c5 100644 --- a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/StreamingIngestClientTest.java +++ b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/StreamingIngestClientTest.java @@ -12,7 +12,11 @@ import com.microsoft.azure.kusto.data.exceptions.DataWebException; import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException; import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException; import com.microsoft.azure.kusto.ingest.result.OperationStatus; -import com.microsoft.azure.kusto.ingest.source.*; +import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo; +import com.microsoft.azure.kusto.ingest.source.CompressionType; +import com.microsoft.azure.kusto.ingest.source.FileSourceInfo; +import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo; +import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo; import com.microsoft.azure.storage.blob.BlobInputStream; import com.microsoft.azure.storage.blob.BlobProperties; import com.microsoft.azure.storage.blob.CloudBlockBlob; @@ -40,8 +44,18 @@ import java.util.zip.GZIPOutputStream; import static com.microsoft.azure.kusto.ingest.IngestClientBase.WRONG_ENDPOINT_MESSAGE; import static com.microsoft.azure.kusto.ingest.StreamingIngestClient.EXPECTED_SERVICE_TYPE; -import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.isNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; class StreamingIngestClientTest { @@ -141,8 +155,8 @@ class StreamingIngestClientTest { String data = "{\"Name\": \"name\", \"Age\": \"age\", \"Weight\": \"weight\", \"Height\": \"height\"}"; InputStream inputStream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(data).array()); StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream); - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json); - ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.Json); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON); + ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.JSON); OperationStatus status = streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status; assertEquals(OperationStatus.Succeeded, status); verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(), @@ -164,8 +178,8 @@ class StreamingIngestClientTest { InputStream inputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray()); StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream); streamSourceInfo.setCompressionType(CompressionType.gz); - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json); - ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.Json); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON); + ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.JSON); OperationStatus status = streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status; assertEquals(OperationStatus.Succeeded, status); verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(), @@ -241,7 +255,7 @@ class StreamingIngestClientTest { String data = "{\"Name\": \"name\", \"Age\": \"age\", \"Weight\": \"weight\", \"Height\": \"height\"}"; InputStream inputStream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(data).array()); StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream); - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON); IngestionClientException ingestionClientException = assertThrows(IngestionClientException.class, () -> streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties), "Expected IngestionClientException to be thrown, but it didn't"); @@ -253,8 +267,8 @@ class StreamingIngestClientTest { String data = "{\"Name\": \"name\", \"Age\": \"age\", \"Weight\": \"weight\", \"Height\": \"height\"}"; InputStream inputStream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(data).array()); StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream); - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json); - ingestionProperties.setIngestionMapping("CsvMapping", IngestionMapping.IngestionMappingKind.Csv); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON); + ingestionProperties.setIngestionMapping("CsvMapping", IngestionMapping.IngestionMappingKind.CSV); IngestionClientException ingestionClientException = assertThrows(IngestionClientException.class, () -> streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties), "Expected IngestionClientException to be thrown, but it didn't"); @@ -265,7 +279,7 @@ class StreamingIngestClientTest { void IngestFromStream_AvroNoMappingReference_IngestionClientException() { InputStream inputStream = new ByteArrayInputStream(new byte[10]); StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream); - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.avro); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.AVRO); IngestionClientException ingestionClientException = assertThrows(IngestionClientException.class, () -> streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties), "Expected IngestionClientException to be thrown, but it didn't"); @@ -276,8 +290,8 @@ class StreamingIngestClientTest { void IngestFromStream_AvroWrongMappingKind_IngestionClientException() { InputStream inputStream = new ByteArrayInputStream(new byte[10]); StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream); - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.avro); - ingestionProperties.setIngestionMapping("CsvMapping", IngestionMapping.IngestionMappingKind.Csv); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.AVRO); + ingestionProperties.setIngestionMapping("CsvMapping", IngestionMapping.IngestionMappingKind.CSV); IngestionClientException ingestionClientException = assertThrows(IngestionClientException.class, () -> streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties), "Expected IngestionClientException to be thrown, but it didn't"); @@ -356,8 +370,8 @@ class StreamingIngestClientTest { FileSourceInfo fileSourceInfo = new FileSourceInfo(path, new File(path).length()); String contents = new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8).trim(); - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json); - ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.Json); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON); + ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.JSON); OperationStatus status = streamingIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status; assertEquals(OperationStatus.Succeeded, status); verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(), @@ -371,8 +385,8 @@ class StreamingIngestClientTest { String resourcesDirectory = System.getProperty("user.dir") + "/src/test/resources/"; String path = resourcesDirectory + "testdata.json.gz"; FileSourceInfo fileSourceInfo = new FileSourceInfo(path, new File(path).length()); - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json); - ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.Json); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON); + ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.JSON); OperationStatus status = streamingIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status; assertEquals(OperationStatus.Succeeded, status); verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(), @@ -463,7 +477,7 @@ class StreamingIngestClientTest { String resourcesDirectory = System.getProperty("user.dir") + "/src/test/resources/"; String path = resourcesDirectory + "testdata.json"; FileSourceInfo fileSourceInfo = new FileSourceInfo(path, new File(path).length()); - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON); IngestionClientException ingestionClientException = assertThrows(IngestionClientException.class, () -> streamingIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties), "Expected IngestionClientException to be thrown, but it didn't"); @@ -475,8 +489,8 @@ class StreamingIngestClientTest { String resourcesDirectory = System.getProperty("user.dir") + "/src/test/resources/"; String path = resourcesDirectory + "testdata.json"; FileSourceInfo fileSourceInfo = new FileSourceInfo(path, new File(path).length()); - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json); - ingestionProperties.setIngestionMapping("CsvMapping", IngestionMapping.IngestionMappingKind.Csv); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON); + ingestionProperties.setIngestionMapping("CsvMapping", IngestionMapping.IngestionMappingKind.CSV); IngestionClientException ingestionClientException = assertThrows(IngestionClientException.class, () -> streamingIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties), "Expected IngestionClientException to be thrown, but it didn't"); diff --git a/quickstart/src/main/java/com/microsoft/azure/kusto/quickstart/KustoSampleApp.java b/quickstart/src/main/java/com/microsoft/azure/kusto/quickstart/KustoSampleApp.java index 48d7bcc9..363d865d 100644 --- a/quickstart/src/main/java/com/microsoft/azure/kusto/quickstart/KustoSampleApp.java +++ b/quickstart/src/main/java/com/microsoft/azure/kusto/quickstart/KustoSampleApp.java @@ -4,11 +4,20 @@ package com.microsoft.azure.kusto.quickstart; import com.fasterxml.jackson.databind.ObjectMapper; -import com.microsoft.azure.kusto.data.*; +import com.microsoft.azure.kusto.data.Client; +import com.microsoft.azure.kusto.data.ClientImpl; +import com.microsoft.azure.kusto.data.ClientRequestProperties; +import com.microsoft.azure.kusto.data.KustoOperationResult; +import com.microsoft.azure.kusto.data.KustoResultColumn; +import com.microsoft.azure.kusto.data.KustoResultSetTable; import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; import com.microsoft.azure.kusto.data.exceptions.DataClientException; import com.microsoft.azure.kusto.data.exceptions.DataServiceException; -import com.microsoft.azure.kusto.ingest.*; +import com.microsoft.azure.kusto.ingest.IngestClient; +import com.microsoft.azure.kusto.ingest.IngestClientFactory; +import com.microsoft.azure.kusto.ingest.IngestionMapping; +import com.microsoft.azure.kusto.ingest.IngestionProperties; +import com.microsoft.azure.kusto.ingest.SecurityUtils; import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException; import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException; import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo; @@ -22,7 +31,11 @@ import java.net.URISyntaxException; import java.security.GeneralSecurityException; import java.security.PrivateKey; import java.security.cert.X509Certificate; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Scanner; +import java.util.UUID; import java.util.concurrent.TimeUnit; public class KustoSampleApp { @@ -86,7 +99,7 @@ public class KustoSampleApp { if (shouldIngestData) { for (Map file : dataToIngest) { - IngestionProperties.DataFormat dataFormat = IngestionProperties.DataFormat.valueOf(file.get("format").toLowerCase()); + IngestionProperties.DataFormat dataFormat = IngestionProperties.DataFormat.valueOf(file.get("format").toUpperCase()); String mappingName = file.get("mappingName"); // Tip: This is generally a one-time configuration. @@ -349,25 +362,25 @@ public class KustoSampleApp { private static boolean createIngestionMappings(boolean useExistingMapping, Client kustoClient, String databaseName, String tableName, String mappingName, String mappingValue, IngestionProperties.DataFormat dataFormat) { if (!useExistingMapping) { if (dataFormat.isMappingRequired() && StringUtils.isBlank(mappingValue)) { - System.out.printf("The data format '%s' requires a mapping, but configuration indicates to not use an existing mapping and no mapping was provided. Skipping this ingestion.%n", dataFormat.name()); + System.out.printf("The data format '%s' requires a mapping, but configuration indicates to not use an existing mapping and no mapping was provided. Skipping this ingestion.%n", dataFormat.getKustoValue()); return false; } if (StringUtils.isNotBlank(mappingValue)) { IngestionMapping.IngestionMappingKind ingestionMappingKind = dataFormat.getIngestionMappingKind(); - waitForUserToProceed(String.format("Create a '%s' mapping reference named '%s'", ingestionMappingKind, mappingName)); + waitForUserToProceed(String.format("Create a '%s' mapping reference named '%s'", ingestionMappingKind.getKustoValue(), mappingName)); if (StringUtils.isBlank(mappingName)) { mappingName = "DefaultQuickstartMapping" + UUID.randomUUID().toString().substring(0, 5); } - String mappingCommand = String.format(".create-or-alter table %s ingestion %s mapping '%s' '%s'", tableName, ingestionMappingKind.name().toLowerCase(), mappingName, mappingValue); + String mappingCommand = String.format(".create-or-alter table %s ingestion %s mapping '%s' '%s'", tableName, ingestionMappingKind.getKustoValue().toLowerCase(), mappingName, mappingValue); if (!executeControlCommand(kustoClient, databaseName, mappingCommand)) { - System.out.printf("Failed to create a '%s' mapping reference named '%s'. Skipping this ingestion.%n", ingestionMappingKind, mappingName); + System.out.printf("Failed to create a '%s' mapping reference named '%s'. Skipping this ingestion.%n", ingestionMappingKind.getKustoValue(), mappingName); return false; } } } else if (dataFormat.isMappingRequired() && StringUtils.isBlank(mappingName)) { - System.out.printf("The data format '%s' requires a mapping and the configuration indicates an existing mapping should be used, but none was provided. Skipping this ingestion.%n", dataFormat.name()); + System.out.printf("The data format '%s' requires a mapping and the configuration indicates an existing mapping should be used, but none was provided. Skipping this ingestion.%n", dataFormat.getKustoValue()); return false; } return true; @@ -379,8 +392,8 @@ public class KustoSampleApp { waitForUserToProceed(String.format("Ingest '%s' from '%s'", uri, sourceType)); // Tip: When ingesting json files, if each line represents a single-line json, use MULTIJSON format even if the file only contains one line. // If the json contains whitespace formatting, use SINGLEJSON. In this case, only one data row json object is allowed per file. - if (dataFormat == IngestionProperties.DataFormat.json) { - dataFormat = IngestionProperties.DataFormat.multijson; + if (dataFormat == IngestionProperties.DataFormat.JSON) { + dataFormat = IngestionProperties.DataFormat.MULTIJSON; } // Tip: Kusto's Java SDK can ingest data from files, blobs, java.sql.ResultSet objects, and open streams. diff --git a/samples/README.md b/samples/README.md index ec4a5c55..ccb56a9c 100644 --- a/samples/README.md +++ b/samples/README.md @@ -75,7 +75,7 @@ ConnectionStringBuilder csb = System.getProperty("appKey"), System.getProperty("appTenant")); ``` -2. Initialize client and it's properties +2. Initialize client and its properties ```java IngestClient client = IngestClientFactory.createClient(csb); @@ -138,7 +138,7 @@ ConnectionStringBuilder csb = System.getProperty("appKey"), System.getProperty("appTenant")); ``` -2. Initialize client and it's properties +2. Initialize client and its properties ```java IngestClient client = IngestClientFactory.createClient(csb); @@ -228,7 +228,7 @@ ConnectionStringBuilder csb = System.getProperty("appTenant")); ``` -3. Initialize client and it's properties +3. Initialize client and its properties ```java IngestClient client = IngestClientFactory.createClient(csb); @@ -237,7 +237,7 @@ IngestionProperties ingestionProperties = new IngestionProperties( System.getPro System.getProperty("tableName")); ingestionProperties.getIngestionMapping().setIngestionMappingReference(System.getProperty("dataMappingName"), IngestionMapping.IngestionMappingKind.Csv); ingestionProperties.setReportMethod(QueueAndTable); - ingestionProperties.setReportLevel(IngestionProperties.IngestionReportLevel.FailuresAndSuccesses); + ingestionProperties.setReportLevel(IngestionProperties.IngestionReportLevel.FAILURES_AND_SUCCESSES); ``` 4. Load file and ingest it into table diff --git a/samples/src/main/java/FileIngestion.java b/samples/src/main/java/FileIngestion.java index 678fede4..bcf939ba 100644 --- a/samples/src/main/java/FileIngestion.java +++ b/samples/src/main/java/FileIngestion.java @@ -2,7 +2,11 @@ // Licensed under the MIT License. import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; -import com.microsoft.azure.kusto.ingest.*; +import com.microsoft.azure.kusto.ingest.ColumnMapping; +import com.microsoft.azure.kusto.ingest.IngestClient; +import com.microsoft.azure.kusto.ingest.IngestClientFactory; +import com.microsoft.azure.kusto.ingest.IngestionMapping; +import com.microsoft.azure.kusto.ingest.IngestionProperties; import com.microsoft.azure.kusto.ingest.result.IngestionResult; import com.microsoft.azure.kusto.ingest.source.FileSourceInfo; import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo; @@ -21,7 +25,7 @@ public class FileIngestion { try (IngestClient client = IngestClientFactory.createClient(csb)) { IngestionProperties ingestionProperties = new IngestionProperties(System.getProperty("dbName"), System.getProperty("tableName")); - ingestionProperties.setIngestionMapping(System.getProperty("dataMappingName"), IngestionMapping.IngestionMappingKind.Json); + ingestionProperties.setIngestionMapping(System.getProperty("dataMappingName"), IngestionMapping.IngestionMappingKind.JSON); FileSourceInfo fileSourceInfo = new FileSourceInfo(System.getProperty("filePath"), 0); IngestionResult ingestionResult = client.ingestFromFile(fileSourceInfo, ingestionProperties); @@ -37,8 +41,8 @@ public class FileIngestion { csvColumnMapping.setOrdinal(0); ColumnMapping csvColumnMapping2 = new ColumnMapping("ColB", "int"); csvColumnMapping2.setOrdinal(1); - ingestionProperties2.setDataFormat(IngestionProperties.DataFormat.csv); - ingestionProperties2.setIngestionMapping(new ColumnMapping[]{csvColumnMapping, csvColumnMapping2}, IngestionMapping.IngestionMappingKind.Csv); + ingestionProperties2.setDataFormat(IngestionProperties.DataFormat.CSV); + ingestionProperties2.setIngestionMapping(new ColumnMapping[]{csvColumnMapping, csvColumnMapping2}, IngestionMapping.IngestionMappingKind.CSV); IngestionResult ingestionResult2 = client.ingestFromStream(info, ingestionProperties2); } diff --git a/samples/src/main/java/FileIngestionCompletableFuture.java b/samples/src/main/java/FileIngestionCompletableFuture.java index 97c4f5dd..dd97094a 100644 --- a/samples/src/main/java/FileIngestionCompletableFuture.java +++ b/samples/src/main/java/FileIngestionCompletableFuture.java @@ -34,7 +34,7 @@ public class FileIngestionCompletableFuture { IngestionProperties ingestionProperties = new IngestionProperties( System.getProperty("dbName"), System.getProperty("tableName")); - ingestionProperties.setIngestionMapping(System.getProperty("dataMappingName"), IngestionMapping.IngestionMappingKind.Json); + ingestionProperties.setIngestionMapping(System.getProperty("dataMappingName"), IngestionMapping.IngestionMappingKind.JSON); FileSourceInfo fileSourceInfo = new FileSourceInfo(System.getProperty("filePath"), 0); diff --git a/samples/src/main/java/StreamingIngest.java b/samples/src/main/java/StreamingIngest.java index c0c3606a..89cb66f8 100644 --- a/samples/src/main/java/StreamingIngest.java +++ b/samples/src/main/java/StreamingIngest.java @@ -14,7 +14,11 @@ import com.microsoft.azure.kusto.ingest.source.FileSourceInfo; import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo; import com.microsoft.azure.storage.StorageException; -import java.io.*; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.InputStream; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; @@ -54,7 +58,7 @@ public class StreamingIngest { String data = "0,00000000-0000-0000-0001-020304050607,0,0,0,0,0,0,0,0,0,0,2014-01-01T01:01:01.0000000Z,Zero,\"Zero\",0,00:00:00,,null"; InputStream inputStream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(data).array()); StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream); - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.csv); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.CSV); OperationStatus status = streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status; System.out.println(status.toString()); @@ -71,8 +75,8 @@ public class StreamingIngest { System.out.println(status.toString()); // Open JSON File Stream and Ingest - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json); - ingestionProperties.setIngestionMapping(mapping, IngestionMapping.IngestionMappingKind.Json); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON); + ingestionProperties.setIngestionMapping(mapping, IngestionMapping.IngestionMappingKind.JSON); fileInputStream = new FileInputStream(resourcesDirectory + "dataset.json"); streamSourceInfo.setStream(fileInputStream); status = streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status; @@ -85,15 +89,15 @@ public class StreamingIngest { //Ingest CSV file String path = resourcesDirectory + "dataset.csv"; FileSourceInfo fileSourceInfo = new FileSourceInfo(path, new File(path).length()); - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.csv); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.CSV); OperationStatus status = streamingIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status; System.out.println(status.toString()); //Ingest compressed JSON file path = resourcesDirectory + "dataset.jsonz.gz"; fileSourceInfo = new FileSourceInfo(path, new File(path).length()); - ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json); - ingestionProperties.setIngestionMapping(mapping, IngestionMapping.IngestionMappingKind.Json); + ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON); + ingestionProperties.setIngestionMapping(mapping, IngestionMapping.IngestionMappingKind.JSON); status = streamingIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status; System.out.println(status.toString()); } diff --git a/samples/src/main/java/TableStatus.java b/samples/src/main/java/TableStatus.java index e45a9cd2..52072ecf 100644 --- a/samples/src/main/java/TableStatus.java +++ b/samples/src/main/java/TableStatus.java @@ -14,7 +14,7 @@ import com.microsoft.azure.kusto.ingest.source.FileSourceInfo; import java.util.List; -import static com.microsoft.azure.kusto.ingest.IngestionProperties.IngestionReportMethod.QueueAndTable; +import static com.microsoft.azure.kusto.ingest.IngestionProperties.IngestionReportMethod.QUEUE_AND_TABLE; public class TableStatus { public static void main(String[] args) { @@ -30,9 +30,9 @@ public class TableStatus { try (IngestClient client = IngestClientFactory.createClient(csb)) { IngestionProperties ingestionProperties = new IngestionProperties(System.getProperty("dbName"), System.getProperty("tableName")); - ingestionProperties.setIngestionMapping(System.getProperty("dataMappingName"), IngestionMapping.IngestionMappingKind.Json); - ingestionProperties.setReportMethod(QueueAndTable); - ingestionProperties.setReportLevel(IngestionProperties.IngestionReportLevel.FailuresAndSuccesses); + ingestionProperties.setIngestionMapping(System.getProperty("dataMappingName"), IngestionMapping.IngestionMappingKind.JSON); + ingestionProperties.setReportMethod(QUEUE_AND_TABLE); + ingestionProperties.setReportLevel(IngestionProperties.IngestionReportLevel.FAILURES_AND_SUCCESSES); FileSourceInfo fileSourceInfo = new FileSourceInfo(System.getProperty("filePath"), 0); ingestionResult = client.ingestFromFile(fileSourceInfo, ingestionProperties); }