Remove isMappingRequired from DataFormat (#211)

* Remove isMappingRequired from DataFormat

* Fix tests
This commit is contained in:
Yihezkel Schoenbrun 2022-01-11 19:14:24 +02:00 коммит произвёл GitHub
Родитель e85db5514b
Коммит 6df93e8835
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 44 добавлений и 43 удалений

Просмотреть файл

@ -313,7 +313,8 @@ public class IngestionProperties {
TextStringBuilder message = new TextStringBuilder();
if ((ingestionMapping.getColumnMappings() == null) && StringUtils.isBlank(mappingReference)) {
if (dataFormat.isMappingRequired()) {
// TODO Deprecated: this restriction is likely to be removed soon
if (dataFormat == DataFormat.JSON || dataFormat == DataFormat.MULTIJSON || dataFormat == DataFormat.SINGLEJSON || dataFormat == DataFormat.AVRO) {
message.appendln("Mapping must be specified for '%s' format.", dataFormat.getKustoValue());
}
@ -356,33 +357,31 @@ public class IngestionProperties {
}
public enum DataFormat {
CSV("csv", IngestionMapping.IngestionMappingKind.CSV, false, true),
TSV("tsv", IngestionMapping.IngestionMappingKind.CSV, false, true),
SCSV("scsv", IngestionMapping.IngestionMappingKind.CSV, false, true),
SOHSV("sohsv", IngestionMapping.IngestionMappingKind.CSV, false, true),
PSV("psv", IngestionMapping.IngestionMappingKind.CSV, false, true),
TXT("txt", IngestionMapping.IngestionMappingKind.CSV, false, true),
TSVE("tsve", IngestionMapping.IngestionMappingKind.CSV, false, true),
JSON("json", IngestionMapping.IngestionMappingKind.JSON, true, true),
SINGLEJSON("singlejson", IngestionMapping.IngestionMappingKind.JSON, true, true),
MULTIJSON("multijson", IngestionMapping.IngestionMappingKind.JSON, true, true),
AVRO("avro", IngestionMapping.IngestionMappingKind.AVRO, true, false),
APACHEAVRO("apacheavro", IngestionMapping.IngestionMappingKind.APACHEAVRO, false, true),
PARQUET("parquet", IngestionMapping.IngestionMappingKind.PARQUET, false, false),
SSTREAM("sstream", IngestionMapping.IngestionMappingKind.SSTREAM, false, true),
ORC("orc", IngestionMapping.IngestionMappingKind.ORC, false, false),
RAW("raw", IngestionMapping.IngestionMappingKind.CSV, false, true),
W3CLOGFILE("w3clogfile", IngestionMapping.IngestionMappingKind.W3CLOGFILE, false, true);
CSV("csv", IngestionMapping.IngestionMappingKind.CSV, true),
TSV("tsv", IngestionMapping.IngestionMappingKind.CSV, true),
SCSV("scsv", IngestionMapping.IngestionMappingKind.CSV, true),
SOHSV("sohsv", IngestionMapping.IngestionMappingKind.CSV, true),
PSV("psv", IngestionMapping.IngestionMappingKind.CSV, true),
TXT("txt", IngestionMapping.IngestionMappingKind.CSV, true),
TSVE("tsve", IngestionMapping.IngestionMappingKind.CSV, true),
JSON("json", IngestionMapping.IngestionMappingKind.JSON, true),
SINGLEJSON("singlejson", IngestionMapping.IngestionMappingKind.JSON, true),
MULTIJSON("multijson", IngestionMapping.IngestionMappingKind.JSON, true),
AVRO("avro", IngestionMapping.IngestionMappingKind.AVRO, false),
APACHEAVRO("apacheavro", IngestionMapping.IngestionMappingKind.APACHEAVRO, true),
PARQUET("parquet", IngestionMapping.IngestionMappingKind.PARQUET, false),
SSTREAM("sstream", IngestionMapping.IngestionMappingKind.SSTREAM, true),
ORC("orc", IngestionMapping.IngestionMappingKind.ORC, false),
RAW("raw", IngestionMapping.IngestionMappingKind.CSV, true),
W3CLOGFILE("w3clogfile", IngestionMapping.IngestionMappingKind.W3CLOGFILE, true);
private final String kustoValue;
private final IngestionMapping.IngestionMappingKind ingestionMappingKind;
private final boolean mappingRequired;
private final boolean compressible;
DataFormat(String kustoValue, IngestionMapping.IngestionMappingKind ingestionMappingKind, boolean mappingRequired, boolean compressible) {
DataFormat(String kustoValue, IngestionMapping.IngestionMappingKind ingestionMappingKind, boolean compressible) {
this.kustoValue = kustoValue;
this.ingestionMappingKind = ingestionMappingKind;
this.mappingRequired = mappingRequired;
this.compressible = compressible;
}
@ -394,10 +393,6 @@ public class IngestionProperties {
return ingestionMappingKind;
}
public boolean isMappingRequired() {
return mappingRequired;
}
public boolean isCompressible() {
return compressible;
}

Просмотреть файл

@ -139,7 +139,8 @@ public class StreamingIngestClient extends IngestClientBase implements IngestCli
streamSourceInfo.validate();
ingestionProperties.validate();
if (dataFormat.isMappingRequired() && StringUtils.isBlank(ingestionProperties.getIngestionMapping().getIngestionMappingReference())) {
// TODO Deprecated: this restriction is likely to be removed soon
if ((dataFormat == IngestionProperties.DataFormat.JSON || dataFormat == IngestionProperties.DataFormat.MULTIJSON || dataFormat == IngestionProperties.DataFormat.SINGLEJSON || dataFormat == IngestionProperties.DataFormat.AVRO) && StringUtils.isBlank(ingestionProperties.getIngestionMapping().getIngestionMappingReference())) {
throw new IngestionClientException(String.format("Mapping reference must be specified for DataFormat '%s' in streaming ingestion.", dataFormat.getKustoValue()));
}

Просмотреть файл

@ -11,12 +11,14 @@ import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.data.exceptions.DataWebException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.result.IngestionResult;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobInputStream;
import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
@ -34,6 +36,7 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
@ -251,15 +254,15 @@ class StreamingIngestClientTest {
}
@Test
void IngestFromStream_JsonNoMappingReference_IngestionClientException() {
void IngestFromStream_JsonNoMappingReference_IngestionSucceeds() throws IngestionClientException, IngestionServiceException, URISyntaxException, StorageException {
String data = "{\"Name\": \"name\", \"Age\": \"age\", \"Weight\": \"weight\", \"Height\": \"height\"}";
InputStream inputStream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(data).array());
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream);
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
IngestionClientException ingestionClientException = assertThrows(IngestionClientException.class,
() -> streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties),
"Expected IngestionClientException to be thrown, but it didn't");
assertTrue(ingestionClientException.getMessage().contains("Mapping must be specified for 'json' format."));
ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.JSON);
IngestionResult ingestionResult = streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties);
assertEquals("Succeeded", ingestionResult.getIngestionStatusCollection().get(0).status.name());
assertEquals(1, ingestionResult.getIngestionStatusesLength());
}
@Test
@ -276,14 +279,14 @@ class StreamingIngestClientTest {
}
@Test
void IngestFromStream_AvroNoMappingReference_IngestionClientException() {
void IngestFromStream_AvroNoMappingReference_IngestionSucceeds() throws IngestionClientException, IngestionServiceException, URISyntaxException, StorageException {
InputStream inputStream = new ByteArrayInputStream(new byte[10]);
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream);
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.AVRO);
IngestionClientException ingestionClientException = assertThrows(IngestionClientException.class,
() -> streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties),
"Expected IngestionClientException to be thrown, but it didn't");
assertTrue(ingestionClientException.getMessage().contains("Mapping must be specified for 'avro' format."));
ingestionProperties.setIngestionMapping("AvroMapping", IngestionMapping.IngestionMappingKind.AVRO);
IngestionResult ingestionResult = streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties);
assertEquals("Succeeded", ingestionResult.getIngestionStatusCollection().get(0).status.name());
assertEquals(1, ingestionResult.getIngestionStatusesLength());
}
@Test
@ -473,15 +476,15 @@ class StreamingIngestClientTest {
}
@Test
void IngestFromFile_JsonNoMappingReference_IngestionClientException() {
void IngestFromFile_JsonNoMappingReference_IngestionSuccess() throws IngestionClientException, IngestionServiceException, URISyntaxException, StorageException {
String resourcesDirectory = System.getProperty("user.dir") + "/src/test/resources/";
String path = resourcesDirectory + "testdata.json";
FileSourceInfo fileSourceInfo = new FileSourceInfo(path, new File(path).length());
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
IngestionClientException ingestionClientException = assertThrows(IngestionClientException.class,
() -> streamingIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties),
"Expected IngestionClientException to be thrown, but it didn't");
assertTrue(ingestionClientException.getMessage().contains("Mapping must be specified for 'json' format."));
ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.JSON);
IngestionResult ingestionResult = streamingIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties);
assertEquals("Succeeded", ingestionResult.getIngestionStatusCollection().get(0).status.name());
assertEquals(1, ingestionResult.getIngestionStatusesLength());
}
@Test

Просмотреть файл

@ -361,7 +361,8 @@ public class KustoSampleApp {
private static boolean createIngestionMappings(boolean useExistingMapping, Client kustoClient, String databaseName, String tableName, String mappingName, String mappingValue, IngestionProperties.DataFormat dataFormat) {
if (!useExistingMapping) {
if (dataFormat.isMappingRequired() && StringUtils.isBlank(mappingValue)) {
// TODO Deprecated: this restriction is likely to be removed soon
if ((dataFormat == IngestionProperties.DataFormat.JSON || dataFormat == IngestionProperties.DataFormat.MULTIJSON || dataFormat == IngestionProperties.DataFormat.SINGLEJSON || dataFormat == IngestionProperties.DataFormat.AVRO) && StringUtils.isBlank(mappingValue)) {
System.out.printf("The data format '%s' requires a mapping, but configuration indicates to not use an existing mapping and no mapping was provided. Skipping this ingestion.%n", dataFormat.getKustoValue());
return false;
}
@ -379,7 +380,8 @@ public class KustoSampleApp {
return false;
}
}
} else if (dataFormat.isMappingRequired() && StringUtils.isBlank(mappingName)) {
} else if ((dataFormat == IngestionProperties.DataFormat.JSON || dataFormat == IngestionProperties.DataFormat.MULTIJSON || dataFormat == IngestionProperties.DataFormat.SINGLEJSON || dataFormat == IngestionProperties.DataFormat.AVRO) && StringUtils.isBlank(mappingName)) {
// TODO Deprecated: this restriction is likely to be removed soon
System.out.printf("The data format '%s' requires a mapping and the configuration indicates an existing mapping should be used, but none was provided. Skipping this ingestion.%n", dataFormat.getKustoValue());
return false;
}