Fix enum nomenclature (#209)
* Fix enum nomenclature * Fix enum nomenclature * Fix IngestionBlobInfo to use the String versions of the enums * Add KustoValue to IngestionReportLevel and IngestionReportMethod enums * From code review
This commit is contained in:
Родитель
ac04421977
Коммит
2c583f3a7a
|
@ -91,27 +91,25 @@ public class ColumnMapping implements Serializable {
|
|||
return properties.get(MappingConsts.STORAGE_DATA_TYPE.getName());
|
||||
}
|
||||
|
||||
public boolean isValid(IngestionMapping.IngestionMappingKind mappingKind)
|
||||
{
|
||||
switch (mappingKind)
|
||||
{
|
||||
case Csv:
|
||||
case SStream:
|
||||
public boolean isValid(IngestionMapping.IngestionMappingKind mappingKind) {
|
||||
switch (mappingKind) {
|
||||
case CSV:
|
||||
case SSTREAM:
|
||||
return !StringUtils.isEmpty(this.columnName);
|
||||
case Json:
|
||||
case Parquet:
|
||||
case Orc:
|
||||
case W3CLogFile:
|
||||
case JSON:
|
||||
case PARQUET:
|
||||
case ORC:
|
||||
case W3CLOGFILE:
|
||||
TransformationMethod transformationMethod = getTransform();
|
||||
return !StringUtils.isEmpty(this.columnName) && (!StringUtils.isEmpty(getPath())
|
||||
|| transformationMethod == TransformationMethod.SourceLineNumber
|
||||
|| transformationMethod == TransformationMethod.SourceLocation);
|
||||
case Avro:
|
||||
case ApacheAvro:
|
||||
case AVRO:
|
||||
case APACHEAVRO:
|
||||
return !StringUtils.isEmpty(this.columnName) &&
|
||||
!StringUtils.isEmpty(getColumns());
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -15,8 +15,8 @@ final public class IngestionBlobInfo {
|
|||
public String tableName;
|
||||
public UUID id;
|
||||
public Boolean retainBlobOnSuccess;
|
||||
public IngestionProperties.IngestionReportLevel reportLevel;
|
||||
public IngestionProperties.IngestionReportMethod reportMethod;
|
||||
public String reportLevel;
|
||||
public String reportMethod;
|
||||
public Boolean flushImmediately;
|
||||
public IngestionStatusInTableDescription IngestionStatusInTable;
|
||||
|
||||
|
@ -29,7 +29,7 @@ final public class IngestionBlobInfo {
|
|||
id = UUID.randomUUID();
|
||||
retainBlobOnSuccess = true;
|
||||
flushImmediately = false;
|
||||
reportLevel = IngestionProperties.IngestionReportLevel.FailuresOnly;
|
||||
reportMethod = IngestionProperties.IngestionReportMethod.Queue;
|
||||
reportLevel = IngestionProperties.IngestionReportLevel.FAILURES_ONLY.getKustoValue();
|
||||
reportMethod = IngestionProperties.IngestionReportMethod.QUEUE.getKustoValue();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -94,6 +94,23 @@ public class IngestionMapping {
|
|||
Represents an ingestion mapping kind - the format of the source data to map from.
|
||||
*/
|
||||
public enum IngestionMappingKind {
|
||||
Csv, Json, Avro, Parquet, SStream, Orc, ApacheAvro, W3CLogFile
|
||||
CSV("Csv"),
|
||||
JSON("Json"),
|
||||
AVRO("Avro"),
|
||||
PARQUET("Parquet"),
|
||||
SSTREAM("SStream"),
|
||||
ORC("Orc"),
|
||||
APACHEAVRO("ApacheAvro"),
|
||||
W3CLOGFILE("W3CLogFile");
|
||||
|
||||
private final String kustoValue;
|
||||
|
||||
IngestionMappingKind(String kustoValue) {
|
||||
this.kustoValue = kustoValue;
|
||||
}
|
||||
|
||||
public String getKustoValue() {
|
||||
return kustoValue;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -53,8 +53,8 @@ public class IngestionProperties {
|
|||
public IngestionProperties(String databaseName, String tableName) {
|
||||
this.databaseName = databaseName;
|
||||
this.tableName = tableName;
|
||||
this.reportLevel = IngestionReportLevel.FailuresOnly;
|
||||
this.reportMethod = IngestionReportMethod.Queue;
|
||||
this.reportLevel = IngestionReportLevel.FAILURES_ONLY;
|
||||
this.reportMethod = IngestionReportMethod.QUEUE;
|
||||
this.flushImmediately = false;
|
||||
this.additionalProperties = new HashMap<>();
|
||||
this.dropByTags = new ArrayList<>();
|
||||
|
@ -62,7 +62,7 @@ public class IngestionProperties {
|
|||
this.ingestIfNotExists = new ArrayList<>();
|
||||
this.additionalTags = new ArrayList<>();
|
||||
this.ingestionMapping = new IngestionMapping();
|
||||
this.dataFormat = DataFormat.csv;
|
||||
this.dataFormat = DataFormat.CSV;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -212,12 +212,12 @@ public class IngestionProperties {
|
|||
fullAdditionalProperties.put("ingestIfNotExists", ingestIfNotExistsJson);
|
||||
}
|
||||
fullAdditionalProperties.putAll(additionalProperties);
|
||||
fullAdditionalProperties.put("format", dataFormat.name());
|
||||
fullAdditionalProperties.put("format", dataFormat.getKustoValue());
|
||||
|
||||
String mappingReference = ingestionMapping.getIngestionMappingReference();
|
||||
if (StringUtils.isNotBlank(mappingReference)) {
|
||||
fullAdditionalProperties.put("ingestionMappingReference", mappingReference);
|
||||
fullAdditionalProperties.put("ingestionMappingType", ingestionMapping.getIngestionMappingKind().toString());
|
||||
fullAdditionalProperties.put("ingestionMappingType", ingestionMapping.getIngestionMappingKind().getKustoValue());
|
||||
} else if (ingestionMapping.getColumnMappings() != null) {
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
objectMapper.setVisibility(PropertyAccessor.ALL, Visibility.NONE);
|
||||
|
@ -225,7 +225,7 @@ public class IngestionProperties {
|
|||
|
||||
String mapping = objectMapper.writeValueAsString(ingestionMapping.getColumnMappings());
|
||||
fullAdditionalProperties.put("ingestionMapping", mapping);
|
||||
fullAdditionalProperties.put("ingestionMappingType", ingestionMapping.getIngestionMappingKind().toString());
|
||||
fullAdditionalProperties.put("ingestionMappingType", ingestionMapping.getIngestionMappingKind().getKustoValue());
|
||||
}
|
||||
|
||||
return fullAdditionalProperties;
|
||||
|
@ -235,7 +235,7 @@ public class IngestionProperties {
|
|||
* Sets the data format.
|
||||
*
|
||||
* @param dataFormat One of the values in: {@link DataFormat DataFormat}
|
||||
* @throws IllegalArgumentException if null argument is passed
|
||||
* @throws IllegalArgumentException if null argument is passed
|
||||
*/
|
||||
public void setDataFormat(@NotNull DataFormat dataFormat) {
|
||||
Ensure.argIsNotNull(dataFormat, "dataFormat");
|
||||
|
@ -250,7 +250,7 @@ public class IngestionProperties {
|
|||
*/
|
||||
public void setDataFormat(@NotNull String dataFormatName) {
|
||||
try {
|
||||
this.dataFormat = DataFormat.valueOf(dataFormatName.toLowerCase());
|
||||
this.dataFormat = DataFormat.valueOf(dataFormatName.toUpperCase());
|
||||
} catch (IllegalArgumentException ex) {
|
||||
log.warn("IngestionProperties.setDataFormat(): Invalid dataFormatName of {}. Per the API's specification, DataFormat property value wasn't set.", dataFormatName);
|
||||
}
|
||||
|
@ -314,7 +314,7 @@ public class IngestionProperties {
|
|||
|
||||
if ((ingestionMapping.getColumnMappings() == null) && StringUtils.isBlank(mappingReference)) {
|
||||
if (dataFormat.isMappingRequired()) {
|
||||
message.appendln("Mapping must be specified for '%s' format.", dataFormat.name());
|
||||
message.appendln("Mapping must be specified for '%s' format.", dataFormat.getKustoValue());
|
||||
}
|
||||
|
||||
if (ingestionMappingKind != null) {
|
||||
|
@ -323,7 +323,7 @@ public class IngestionProperties {
|
|||
} else { // a mapping was provided
|
||||
if (dataFormat.getIngestionMappingKind() != null && !dataFormat.getIngestionMappingKind().equals(ingestionMappingKind)) {
|
||||
message.appendln("Wrong ingestion mapping for format '%s'; mapping kind should be '%s', but was '%s'.",
|
||||
dataFormat.name(), dataFormat.getIngestionMappingKind(), ingestionMappingKind != null ? ingestionMappingKind.name() : "null");
|
||||
dataFormat.getKustoValue(), dataFormat.getIngestionMappingKind().getKustoValue(), ingestionMappingKind != null ? ingestionMappingKind.getKustoValue() : "null");
|
||||
}
|
||||
|
||||
if (ingestionMapping.getColumnMappings() != null) {
|
||||
|
@ -349,41 +349,47 @@ public class IngestionProperties {
|
|||
}
|
||||
|
||||
public void validateResultSetProperties() throws IngestionClientException {
|
||||
Ensure.isTrue(IngestionProperties.DataFormat.csv.equals(dataFormat),
|
||||
Ensure.isTrue(IngestionProperties.DataFormat.CSV.equals(dataFormat),
|
||||
String.format("ResultSet translates into csv format but '%s' was given", dataFormat));
|
||||
|
||||
validate();
|
||||
}
|
||||
|
||||
public enum DataFormat {
|
||||
csv(IngestionMapping.IngestionMappingKind.Csv, false, true),
|
||||
tsv(IngestionMapping.IngestionMappingKind.Csv, false, true),
|
||||
scsv(IngestionMapping.IngestionMappingKind.Csv, false, true),
|
||||
sohsv(IngestionMapping.IngestionMappingKind.Csv, false, true),
|
||||
psv(IngestionMapping.IngestionMappingKind.Csv, false, true),
|
||||
txt(IngestionMapping.IngestionMappingKind.Csv, false, true),
|
||||
tsve(IngestionMapping.IngestionMappingKind.Csv, false, true),
|
||||
json(IngestionMapping.IngestionMappingKind.Json, true, true),
|
||||
singlejson(IngestionMapping.IngestionMappingKind.Json, true, true),
|
||||
multijson(IngestionMapping.IngestionMappingKind.Json, true, true),
|
||||
avro(IngestionMapping.IngestionMappingKind.Avro, true, false),
|
||||
apacheavro(IngestionMapping.IngestionMappingKind.ApacheAvro, false, true),
|
||||
parquet(IngestionMapping.IngestionMappingKind.Parquet, false, false),
|
||||
sstream(IngestionMapping.IngestionMappingKind.SStream, false, true),
|
||||
orc(IngestionMapping.IngestionMappingKind.Orc, false, false),
|
||||
raw(IngestionMapping.IngestionMappingKind.Csv, false, true),
|
||||
w3clogfile(IngestionMapping.IngestionMappingKind.W3CLogFile, false, true);
|
||||
CSV("csv", IngestionMapping.IngestionMappingKind.CSV, false, true),
|
||||
TSV("tsv", IngestionMapping.IngestionMappingKind.CSV, false, true),
|
||||
SCSV("scsv", IngestionMapping.IngestionMappingKind.CSV, false, true),
|
||||
SOHSV("sohsv", IngestionMapping.IngestionMappingKind.CSV, false, true),
|
||||
PSV("psv", IngestionMapping.IngestionMappingKind.CSV, false, true),
|
||||
TXT("txt", IngestionMapping.IngestionMappingKind.CSV, false, true),
|
||||
TSVE("tsve", IngestionMapping.IngestionMappingKind.CSV, false, true),
|
||||
JSON("json", IngestionMapping.IngestionMappingKind.JSON, true, true),
|
||||
SINGLEJSON("singlejson", IngestionMapping.IngestionMappingKind.JSON, true, true),
|
||||
MULTIJSON("multijson", IngestionMapping.IngestionMappingKind.JSON, true, true),
|
||||
AVRO("avro", IngestionMapping.IngestionMappingKind.AVRO, true, false),
|
||||
APACHEAVRO("apacheavro", IngestionMapping.IngestionMappingKind.APACHEAVRO, false, true),
|
||||
PARQUET("parquet", IngestionMapping.IngestionMappingKind.PARQUET, false, false),
|
||||
SSTREAM("sstream", IngestionMapping.IngestionMappingKind.SSTREAM, false, true),
|
||||
ORC("orc", IngestionMapping.IngestionMappingKind.ORC, false, false),
|
||||
RAW("raw", IngestionMapping.IngestionMappingKind.CSV, false, true),
|
||||
W3CLOGFILE("w3clogfile", IngestionMapping.IngestionMappingKind.W3CLOGFILE, false, true);
|
||||
|
||||
private final String kustoValue;
|
||||
private final IngestionMapping.IngestionMappingKind ingestionMappingKind;
|
||||
private final boolean mappingRequired;
|
||||
private final boolean compressible;
|
||||
|
||||
DataFormat(IngestionMapping.IngestionMappingKind ingestionMappingKind, boolean mappingRequired, boolean compressible) {
|
||||
DataFormat(String kustoValue, IngestionMapping.IngestionMappingKind ingestionMappingKind, boolean mappingRequired, boolean compressible) {
|
||||
this.kustoValue = kustoValue;
|
||||
this.ingestionMappingKind = ingestionMappingKind;
|
||||
this.mappingRequired = mappingRequired;
|
||||
this.compressible = compressible;
|
||||
}
|
||||
|
||||
public String getKustoValue() {
|
||||
return kustoValue;
|
||||
}
|
||||
|
||||
public IngestionMapping.IngestionMappingKind getIngestionMappingKind() {
|
||||
return ingestionMappingKind;
|
||||
}
|
||||
|
@ -398,14 +404,34 @@ public class IngestionProperties {
|
|||
}
|
||||
|
||||
public enum IngestionReportLevel {
|
||||
FailuresOnly,
|
||||
None,
|
||||
FailuresAndSuccesses
|
||||
FAILURES_ONLY("FailuresOnly"),
|
||||
NONE("None"),
|
||||
FAILURES_AND_SUCCESSES("FailuresAndSuccesses");
|
||||
|
||||
private final String kustoValue;
|
||||
|
||||
IngestionReportLevel(String kustoValue) {
|
||||
this.kustoValue = kustoValue;
|
||||
}
|
||||
|
||||
public String getKustoValue() {
|
||||
return kustoValue;
|
||||
}
|
||||
}
|
||||
|
||||
public enum IngestionReportMethod {
|
||||
Queue,
|
||||
Table,
|
||||
QueueAndTable
|
||||
QUEUE("Queue"),
|
||||
TABLE("Table"),
|
||||
QUEUE_AND_TABLE("QueueAndTable");
|
||||
|
||||
private final String kustoValue;
|
||||
|
||||
IngestionReportMethod(String kustoValue) {
|
||||
this.kustoValue = kustoValue;
|
||||
}
|
||||
|
||||
public String getKustoValue() {
|
||||
return kustoValue;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -10,8 +10,17 @@ import com.microsoft.azure.kusto.data.Ensure;
|
|||
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
|
||||
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
|
||||
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
|
||||
import com.microsoft.azure.kusto.ingest.result.*;
|
||||
import com.microsoft.azure.kusto.ingest.source.*;
|
||||
import com.microsoft.azure.kusto.ingest.result.IngestionResult;
|
||||
import com.microsoft.azure.kusto.ingest.result.IngestionStatus;
|
||||
import com.microsoft.azure.kusto.ingest.result.IngestionStatusInTableDescription;
|
||||
import com.microsoft.azure.kusto.ingest.result.IngestionStatusResult;
|
||||
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
|
||||
import com.microsoft.azure.kusto.ingest.result.TableReportIngestionResult;
|
||||
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
|
||||
import com.microsoft.azure.kusto.ingest.source.CompressionType;
|
||||
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
|
||||
import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo;
|
||||
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.CloudBlockBlob;
|
||||
import com.univocity.parsers.csv.CsvRoutines;
|
||||
|
@ -60,8 +69,7 @@ public class QueuedIngestClient extends IngestClientBase implements IngestClient
|
|||
}
|
||||
|
||||
public static String generateDmUriSuggestion(URIBuilder existingEndpoint) {
|
||||
if (existingEndpoint.getHost().toLowerCase().startsWith(INGEST_PREFIX))
|
||||
{
|
||||
if (existingEndpoint.getHost().toLowerCase().startsWith(INGEST_PREFIX)) {
|
||||
throw new IllegalArgumentException("The URL is already formatted as the suggested DM endpoint, so no suggestion can be made");
|
||||
}
|
||||
existingEndpoint.setHost(INGEST_PREFIX + existingEndpoint.getHost());
|
||||
|
@ -92,8 +100,8 @@ public class QueuedIngestClient extends IngestClientBase implements IngestClient
|
|||
log.warn("Blob '{}' was sent for ingestion without specifying its raw data size", urlWithoutSecrets);
|
||||
}
|
||||
|
||||
ingestionBlobInfo.reportLevel = ingestionProperties.getReportLevel();
|
||||
ingestionBlobInfo.reportMethod = ingestionProperties.getReportMethod();
|
||||
ingestionBlobInfo.reportLevel = ingestionProperties.getReportLevel().getKustoValue();
|
||||
ingestionBlobInfo.reportMethod = ingestionProperties.getReportMethod().getKustoValue();
|
||||
ingestionBlobInfo.flushImmediately = ingestionProperties.getFlushImmediately();
|
||||
ingestionBlobInfo.additionalProperties = ingestionProperties.getIngestionProperties();
|
||||
if (blobSourceInfo.getSourceId() != null) {
|
||||
|
@ -107,8 +115,8 @@ public class QueuedIngestClient extends IngestClientBase implements IngestClient
|
|||
status.updatedOn = Date.from(Instant.now());
|
||||
status.ingestionSourceId = ingestionBlobInfo.id;
|
||||
status.setIngestionSourcePath(urlWithoutSecrets);
|
||||
boolean reportToTable = ingestionBlobInfo.reportLevel != IngestionProperties.IngestionReportLevel.None
|
||||
&& ingestionProperties.getReportMethod() != IngestionProperties.IngestionReportMethod.Queue;
|
||||
boolean reportToTable = !IngestionProperties.IngestionReportLevel.NONE.equals(ingestionProperties.getReportLevel()) &&
|
||||
!IngestionProperties.IngestionReportMethod.QUEUE.equals(ingestionProperties.getReportMethod());
|
||||
if (reportToTable) {
|
||||
status.status = OperationStatus.Pending;
|
||||
String tableStatusUri = resourceManager
|
||||
|
@ -163,7 +171,7 @@ public class QueuedIngestClient extends IngestClientBase implements IngestClient
|
|||
file.getName(),
|
||||
ingestionProperties.getDatabaseName(),
|
||||
ingestionProperties.getTableName(),
|
||||
dataFormat.name(), // Used to use an empty string if the DataFormat was empty. Now it can't be empty, with a default of CSV.
|
||||
dataFormat.getKustoValue(), // Used to use an empty string if the DataFormat was empty. Now it can't be empty, with a default of CSV.
|
||||
shouldCompress ? CompressionType.gz : sourceCompressionType);
|
||||
|
||||
CloudBlockBlob blob = azureStorageClient.uploadLocalFileToBlob(fileSourceInfo.getFilePath(), blobName,
|
||||
|
@ -209,7 +217,7 @@ public class QueuedIngestClient extends IngestClientBase implements IngestClient
|
|||
"StreamUpload",
|
||||
ingestionProperties.getDatabaseName(),
|
||||
ingestionProperties.getTableName(),
|
||||
dataFormat.name(), // Used to use an empty string if the DataFormat was empty. Now it can't be empty, with a default of CSV.
|
||||
dataFormat.getKustoValue(), // Used to use an empty string if the DataFormat was empty. Now it can't be empty, with a default of CSV.
|
||||
shouldCompress ? CompressionType.gz : streamSourceInfo.getCompressionType());
|
||||
|
||||
CloudBlockBlob blob = azureStorageClient.uploadStreamToBlob(
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo;
|
|||
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.CloudBlockBlob;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.http.client.utils.URIBuilder;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
@ -141,7 +140,7 @@ public class StreamingIngestClient extends IngestClientBase implements IngestCli
|
|||
streamSourceInfo.validate();
|
||||
ingestionProperties.validate();
|
||||
if (dataFormat.isMappingRequired() && StringUtils.isBlank(ingestionProperties.getIngestionMapping().getIngestionMappingReference())) {
|
||||
throw new IngestionClientException(String.format("Mapping reference must be specified for DataFormat '%s' in streaming ingestion.", dataFormat.name()));
|
||||
throw new IngestionClientException(String.format("Mapping reference must be specified for DataFormat '%s' in streaming ingestion.", dataFormat.getKustoValue()));
|
||||
}
|
||||
|
||||
ClientRequestProperties clientRequestProperties = null;
|
||||
|
@ -157,10 +156,9 @@ public class StreamingIngestClient extends IngestClientBase implements IngestCli
|
|||
ingestionProperties.getTableName(),
|
||||
stream,
|
||||
clientRequestProperties,
|
||||
dataFormat.name(),
|
||||
dataFormat.getKustoValue(),
|
||||
ingestionProperties.getIngestionMapping().getIngestionMappingReference(),
|
||||
!(streamSourceInfo.getCompressionType() == null || !streamSourceInfo.isLeaveOpen()));
|
||||
|
||||
} catch (DataClientException | IOException e) {
|
||||
log.error(e.getMessage(), e);
|
||||
throw new IngestionClientException(e.getMessage(), e);
|
||||
|
|
|
@ -19,7 +19,12 @@ import java.net.URISyntaxException;
|
|||
import java.nio.file.Paths;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.mockito.Mockito.*;
|
||||
import static org.mockito.Mockito.any;
|
||||
import static org.mockito.Mockito.anyString;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.isA;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
class AzureStorageClientTest {
|
||||
static private final AzureStorageClient azureStorageClient = new AzureStorageClient();
|
||||
|
@ -35,7 +40,7 @@ class AzureStorageClientTest {
|
|||
testFilePath = Paths.get("src", "test", "resources", "testdata.json").toString();
|
||||
testFile = new File(testFilePath);
|
||||
testFilePathCompressed = Paths.get("src", "test", "resources", "testdata.json.gz").toString();
|
||||
blob = new CloudBlockBlob(new URI("https://ms.com/storageUri"));
|
||||
blob = new CloudBlockBlob(new URI("https://testcontosourl.com/storageUrl"));
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
|
@ -61,11 +66,11 @@ class AzureStorageClientTest {
|
|||
void PostMessageToQueue_NullEntity_IllegalArgumentException() {
|
||||
assertThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> azureStorageClient.azureTableInsertEntity("tableUri", null));
|
||||
() -> azureStorageClient.azureTableInsertEntity("tableUrl", null));
|
||||
}
|
||||
|
||||
@Test
|
||||
void PostMessageToQueue_NullTableUri_IllegalArgumentException() {
|
||||
void PostMessageToQueue_NullTableUrl_IllegalArgumentException() {
|
||||
TableServiceEntity serviceEntity = new TableServiceEntity();
|
||||
assertThrows(
|
||||
IllegalArgumentException.class,
|
||||
|
@ -77,7 +82,7 @@ class AzureStorageClientTest {
|
|||
throws IOException, StorageException, URISyntaxException {
|
||||
doNothing().when(azureStorageClientSpy).compressAndUploadFileToBlob(anyString(), any(CloudBlockBlob.class));
|
||||
|
||||
azureStorageClientSpy.uploadLocalFileToBlob(testFilePath, "blobName", "https://ms.com/blob", IngestionProperties.DataFormat.csv);
|
||||
azureStorageClientSpy.uploadLocalFileToBlob(testFilePath, "blobName", "https://testcontosourl.com/blob", IngestionProperties.DataFormat.CSV);
|
||||
verify(azureStorageClientSpy).compressAndUploadFileToBlob(anyString(), any(CloudBlockBlob.class));
|
||||
}
|
||||
|
||||
|
@ -86,7 +91,7 @@ class AzureStorageClientTest {
|
|||
throws IOException, StorageException, URISyntaxException {
|
||||
doNothing().when(azureStorageClientSpy).uploadFileToBlob(any(File.class), any(CloudBlockBlob.class));
|
||||
|
||||
azureStorageClientSpy.uploadLocalFileToBlob(testFilePathCompressed, "blobName", "https://ms.com/blob", IngestionProperties.DataFormat.csv);
|
||||
azureStorageClientSpy.uploadLocalFileToBlob(testFilePathCompressed, "blobName", "https://testcontosourl.com/blob", IngestionProperties.DataFormat.CSV);
|
||||
verify(azureStorageClientSpy).uploadFileToBlob(any(File.class), any(CloudBlockBlob.class));
|
||||
}
|
||||
|
||||
|
@ -94,21 +99,21 @@ class AzureStorageClientTest {
|
|||
void UploadLocalFileToBlob_NullFilePath_IllegalArgumentException() {
|
||||
assertThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> azureStorageClient.uploadLocalFileToBlob(null, "blobName", "storageUri", IngestionProperties.DataFormat.csv));
|
||||
() -> azureStorageClient.uploadLocalFileToBlob(null, "blobName", "storageUrl", IngestionProperties.DataFormat.CSV));
|
||||
}
|
||||
|
||||
@Test
|
||||
void UploadLocalFileToBlob_NullBlobName_IllegalArgumentException() {
|
||||
assertThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> azureStorageClient.uploadLocalFileToBlob(testFilePath, null, "storageUri", IngestionProperties.DataFormat.csv));
|
||||
() -> azureStorageClient.uploadLocalFileToBlob(testFilePath, null, "storageUrl", IngestionProperties.DataFormat.CSV));
|
||||
}
|
||||
|
||||
@Test
|
||||
void UploadLocalFileToBlob_NullStorageUri_IllegalArgumentException() {
|
||||
void UploadLocalFileToBlob_NullStorageUrl_IllegalArgumentException() {
|
||||
assertThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> azureStorageClient.uploadLocalFileToBlob(testFilePath, "blobName", null, IngestionProperties.DataFormat.csv));
|
||||
() -> azureStorageClient.uploadLocalFileToBlob(testFilePath, "blobName", null, IngestionProperties.DataFormat.CSV));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -116,7 +121,7 @@ class AzureStorageClientTest {
|
|||
String notExistingFilePath = "not.existing.file.path";
|
||||
assertThrows(
|
||||
IOException.class,
|
||||
() -> azureStorageClient.uploadLocalFileToBlob(notExistingFilePath, "blobName", "storageUri", IngestionProperties.DataFormat.csv));
|
||||
() -> azureStorageClient.uploadLocalFileToBlob(notExistingFilePath, "blobName", "storageUrl", IngestionProperties.DataFormat.CSV));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -125,7 +130,7 @@ class AzureStorageClientTest {
|
|||
try (InputStream stream = new FileInputStream(testFilePath)) {
|
||||
doNothing().when(azureStorageClientSpy).uploadStream(any(InputStream.class), any(CloudBlockBlob.class));
|
||||
|
||||
azureStorageClientSpy.uploadStreamToBlob(stream, "blobName", "https://ms.com/storageUri", false);
|
||||
azureStorageClientSpy.uploadStreamToBlob(stream, "blobName", "https://ms.com/storageUrl", false);
|
||||
verify(azureStorageClientSpy).uploadStream(isA(InputStream.class), isA(CloudBlockBlob.class));
|
||||
}
|
||||
}
|
||||
|
@ -136,7 +141,7 @@ class AzureStorageClientTest {
|
|||
try (InputStream stream = new FileInputStream(testFilePath)) {
|
||||
doNothing().when(azureStorageClientSpy)
|
||||
.compressAndUploadStream(any(InputStream.class), any(CloudBlockBlob.class));
|
||||
azureStorageClientSpy.uploadStreamToBlob(stream, "blobName", "https://ms.com/storageUri", true);
|
||||
azureStorageClientSpy.uploadStreamToBlob(stream, "blobName", "https://testcontosourl.com/storageUrl", true);
|
||||
verify(azureStorageClientSpy).compressAndUploadStream(isA(InputStream.class), isA(CloudBlockBlob.class));
|
||||
}
|
||||
}
|
||||
|
@ -145,7 +150,7 @@ class AzureStorageClientTest {
|
|||
void UploadStreamToBlob_NullInputStream_IllegalArgumentException() {
|
||||
assertThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> azureStorageClient.uploadStreamToBlob(null, "blobName", "storageUri", false));
|
||||
() -> azureStorageClient.uploadStreamToBlob(null, "blobName", "storageUrl", false));
|
||||
|
||||
}
|
||||
|
||||
|
@ -154,12 +159,12 @@ class AzureStorageClientTest {
|
|||
try (InputStream stream = new FileInputStream(testFilePath)) {
|
||||
assertThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> azureStorageClient.uploadStreamToBlob(stream, null, "storageUri", false));
|
||||
() -> azureStorageClient.uploadStreamToBlob(stream, null, "storageUrl", false));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void UploadStreamToBlob_NullStorageUri_IllegalArgumentException() throws IOException {
|
||||
void UploadStreamToBlob_NullStorageUrl_IllegalArgumentException() throws IOException {
|
||||
try (InputStream stream = new FileInputStream(testFilePath)) {
|
||||
assertThrows(
|
||||
IllegalArgumentException.class,
|
||||
|
|
|
@ -22,13 +22,22 @@ import org.apache.commons.lang3.StringUtils;
|
|||
import org.apache.commons.lang3.time.StopWatch;
|
||||
import org.json.JSONArray;
|
||||
import org.json.JSONObject;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Assumptions;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.*;
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
|
@ -41,7 +50,11 @@ import java.util.Calendar;
|
|||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotSame;
|
||||
import static org.junit.jupiter.api.Assertions.assertSame;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
class E2ETest {
|
||||
private static IngestClient ingestClient;
|
||||
|
@ -135,23 +148,23 @@ class E2ETest {
|
|||
private static void createTestData() {
|
||||
IngestionProperties ingestionPropertiesWithoutMapping = new IngestionProperties(databaseName, tableName);
|
||||
ingestionPropertiesWithoutMapping.setFlushImmediately(true);
|
||||
ingestionPropertiesWithoutMapping.setDataFormat(DataFormat.csv);
|
||||
ingestionPropertiesWithoutMapping.setDataFormat(DataFormat.CSV);
|
||||
|
||||
IngestionProperties ingestionPropertiesWithMappingReference = new IngestionProperties(databaseName, tableName);
|
||||
ingestionPropertiesWithMappingReference.setFlushImmediately(true);
|
||||
ingestionPropertiesWithMappingReference.setIngestionMapping(mappingReference, IngestionMappingKind.Json);
|
||||
ingestionPropertiesWithMappingReference.setDataFormat(DataFormat.json);
|
||||
ingestionPropertiesWithMappingReference.setIngestionMapping(mappingReference, IngestionMappingKind.JSON);
|
||||
ingestionPropertiesWithMappingReference.setDataFormat(DataFormat.JSON);
|
||||
|
||||
IngestionProperties ingestionPropertiesWithColumnMapping = new IngestionProperties(databaseName, tableName);
|
||||
ingestionPropertiesWithColumnMapping.setFlushImmediately(true);
|
||||
ingestionPropertiesWithColumnMapping.setDataFormat(DataFormat.json);
|
||||
ingestionPropertiesWithColumnMapping.setDataFormat(DataFormat.JSON);
|
||||
ColumnMapping first = new ColumnMapping("rownumber", "int");
|
||||
first.setPath("$.rownumber");
|
||||
ColumnMapping second = new ColumnMapping("rowguid", "string");
|
||||
second.setPath("$.rowguid");
|
||||
ColumnMapping[] columnMapping = new ColumnMapping[]{first, second};
|
||||
ingestionPropertiesWithColumnMapping.setIngestionMapping(columnMapping, IngestionMappingKind.Json);
|
||||
ingestionPropertiesWithColumnMapping.setDataFormat(DataFormat.json);
|
||||
ingestionPropertiesWithColumnMapping.setIngestionMapping(columnMapping, IngestionMappingKind.JSON);
|
||||
ingestionPropertiesWithColumnMapping.setDataFormat(DataFormat.JSON);
|
||||
|
||||
dataForTests = Arrays.asList(new TestDataItem() {
|
||||
{
|
||||
|
@ -401,7 +414,7 @@ class E2ETest {
|
|||
void testParameterizedQuery() throws DataServiceException, DataClientException {
|
||||
IngestionProperties ingestionPropertiesWithoutMapping = new IngestionProperties(databaseName, tableName);
|
||||
ingestionPropertiesWithoutMapping.setFlushImmediately(true);
|
||||
ingestionPropertiesWithoutMapping.setDataFormat(DataFormat.csv);
|
||||
ingestionPropertiesWithoutMapping.setDataFormat(DataFormat.CSV);
|
||||
|
||||
TestDataItem item = new TestDataItem() {
|
||||
{
|
||||
|
|
|
@ -16,7 +16,6 @@ import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo;
|
|||
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
|
||||
import com.microsoft.azure.storage.blob.CloudBlockBlob;
|
||||
import com.microsoft.azure.storage.table.TableServiceEntity;
|
||||
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
@ -70,7 +69,7 @@ class ManagedStreamingIngestClientTest {
|
|||
private static final AzureStorageClient azureStorageClientMock = mock(AzureStorageClient.class);
|
||||
private static ManagedStreamingIngestClient managedStreamingIngestClient;
|
||||
private static IngestionProperties ingestionProperties;
|
||||
private static final String STORAGE_URI = "https://ms.com/storageUri";
|
||||
private static final String STORAGE_URL = "https://testcontosourl.com/storageUrl";
|
||||
|
||||
@Mock
|
||||
private static StreamingClient streamingClientMock;
|
||||
|
@ -94,14 +93,14 @@ class ManagedStreamingIngestClientTest {
|
|||
when(resourceManagerMock.getIdentityToken()).thenReturn("identityToken");
|
||||
|
||||
when(azureStorageClientMock.uploadStreamToBlob(any(InputStream.class), anyString(), anyString(), anyBoolean()))
|
||||
.thenReturn(new CloudBlockBlob(new URI(STORAGE_URI)));
|
||||
.thenReturn(new CloudBlockBlob(new URI(STORAGE_URL)));
|
||||
|
||||
when(azureStorageClientMock.getBlobPathWithSas(any(CloudBlockBlob.class))).thenReturn(STORAGE_URI);
|
||||
when(azureStorageClientMock.getBlobPathWithSas(any(CloudBlockBlob.class))).thenReturn(STORAGE_URL);
|
||||
|
||||
when(azureStorageClientMock.getBlobSize(anyString())).thenReturn(100L);
|
||||
|
||||
when(azureStorageClientMock.uploadLocalFileToBlob(anyString(), anyString(), anyString(), anyBoolean()))
|
||||
.thenReturn(new CloudBlockBlob(new URI(STORAGE_URI)));
|
||||
.thenReturn(new CloudBlockBlob(new URI(STORAGE_URL)));
|
||||
|
||||
doNothing().when(azureStorageClientMock).azureTableInsertEntity(anyString(), any(TableServiceEntity.class));
|
||||
|
||||
|
@ -123,12 +122,12 @@ class ManagedStreamingIngestClientTest {
|
|||
managedStreamingIngestClient = new ManagedStreamingIngestClient(resourceManagerMock, azureStorageClientMock, streamingClientMock,
|
||||
retryTemplate);
|
||||
ingestionProperties = new IngestionProperties("dbName", "tableName");
|
||||
ingestionProperties.setIngestionMapping("mappingName", IngestionMapping.IngestionMappingKind.Json);
|
||||
ingestionProperties.setIngestionMapping("mappingName", IngestionMapping.IngestionMappingKind.JSON);
|
||||
}
|
||||
|
||||
@Test
|
||||
void IngestFromBlob_IngestionReportMethodIsQueue_IngestionStatusHardcoded() throws Exception {
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
|
||||
BlobSourceInfo blobSourceInfo = new BlobSourceInfo("http://blobPath.com", 100);
|
||||
IngestionResult result = managedStreamingIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties);
|
||||
assertEquals(1, result.getIngestionStatusesLength());
|
||||
|
@ -138,8 +137,8 @@ class ManagedStreamingIngestClientTest {
|
|||
@Test
|
||||
void IngestFromBlob_IngestionReportMethodIsTable_NotEmptyIngestionStatus() throws Exception {
|
||||
BlobSourceInfo blobSourceInfo = new BlobSourceInfo("http://blobPath.com", 100);
|
||||
ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.Table);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json);
|
||||
ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.TABLE);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
|
||||
IngestionResult result = managedStreamingIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties);
|
||||
assertNotEquals(0, result.getIngestionStatusesLength());
|
||||
}
|
||||
|
@ -162,8 +161,8 @@ class ManagedStreamingIngestClientTest {
|
|||
@Test
|
||||
void IngestFromBlob_IngestionReportMethodIsTable_RemovesSecrets() throws Exception {
|
||||
BlobSourceInfo blobSourceInfo = new BlobSourceInfo("https://storage.table.core.windows.net/ingestionsstatus20190505?sv=2018-03-28&tn=ingestionsstatus20190505&sig=anAusomeSecret%2FK024xNydFzT%2B2cCE%2BA2S8Y6U%3D&st=2019-05-05T09%3A00%3A31Z&se=2019-05-09T10%3A00%3A31Z&sp=raud", 100);
|
||||
ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.Table);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json);
|
||||
ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.TABLE);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
|
||||
ArgumentCaptor<TableServiceEntity> capture = ArgumentCaptor.forClass(TableServiceEntity.class);
|
||||
|
||||
managedStreamingIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties);
|
||||
|
@ -230,7 +229,7 @@ class ManagedStreamingIngestClientTest {
|
|||
@ValueSource(booleans = {true, false})
|
||||
void IngestFromFile_Csv(boolean useSourceId) throws Exception {
|
||||
UUID sourceId = useSourceId ? CustomUUID : null;
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
|
||||
String resourcesDirectory = System.getProperty("user.dir") + "/src/test/resources/";
|
||||
String path = resourcesDirectory + "testdata.csv";
|
||||
FileSourceInfo fileSourceInfo = new FileSourceInfo(path, new File(path).length(), sourceId);
|
||||
|
@ -258,8 +257,8 @@ class ManagedStreamingIngestClientTest {
|
|||
|
||||
ArgumentCaptor<InputStream> argumentCaptor = ArgumentCaptor.forClass(InputStream.class);
|
||||
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.csv);
|
||||
ingestionProperties.setIngestionMapping("mappingName", IngestionMapping.IngestionMappingKind.Csv);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.CSV);
|
||||
ingestionProperties.setIngestionMapping("mappingName", IngestionMapping.IngestionMappingKind.CSV);
|
||||
ResultSetSourceInfo resultSetSourceInfo = new ResultSetSourceInfo(resultSet, sourceId);
|
||||
OperationStatus status = managedStreamingIngestClient.ingestFromResultSet(resultSetSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
|
||||
assertEquals(OperationStatus.Succeeded, status);
|
||||
|
@ -278,8 +277,8 @@ class ManagedStreamingIngestClientTest {
|
|||
FileSourceInfo fileSourceInfo = new FileSourceInfo(path, new File(path).length());
|
||||
String contents = new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8).trim();
|
||||
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json);
|
||||
ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.Json);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
|
||||
ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.JSON);
|
||||
OperationStatus status = managedStreamingIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
|
||||
assertEquals(OperationStatus.Succeeded, status);
|
||||
verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(),
|
||||
|
@ -295,8 +294,8 @@ class ManagedStreamingIngestClientTest {
|
|||
FileSourceInfo fileSourceInfo = new FileSourceInfo(path, new File(path).length());
|
||||
AtomicBoolean visited = new AtomicBoolean(false);
|
||||
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json);
|
||||
ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.Json);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
|
||||
ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.JSON);
|
||||
OperationStatus status;
|
||||
try {
|
||||
when(streamingClientMock.executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(),
|
||||
|
@ -318,7 +317,7 @@ class ManagedStreamingIngestClientTest {
|
|||
@ParameterizedTest
|
||||
@CsvSource({"true,true", "false,true", "true,false", "false,false"})
|
||||
void IngestFromStream_Success(boolean leaveOpen, boolean useSourceId) throws Exception {
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
|
||||
String data = "Name, Age, Weight, Height";
|
||||
InputStream inputStream = new CloseableByteArrayInputStream(StandardCharsets.UTF_8.encode(data).array());
|
||||
UUID sourceId = useSourceId ? CustomUUID : null;
|
||||
|
@ -352,8 +351,8 @@ class ManagedStreamingIngestClientTest {
|
|||
String resourcesDirectory = System.getProperty("user.dir") + "/src/test/resources/";
|
||||
String path = resourcesDirectory + "testdata.json.gz";
|
||||
FileSourceInfo fileSourceInfo = new FileSourceInfo(path, new File(path).length());
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json);
|
||||
ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.Json);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
|
||||
ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.JSON);
|
||||
|
||||
when(streamingClientMock.executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(),
|
||||
any(ClientRequestProperties.class), any(String.class), eq("JsonMapping"), any(boolean.class)))
|
||||
|
@ -387,7 +386,7 @@ class ManagedStreamingIngestClientTest {
|
|||
});
|
||||
|
||||
// Should fail 3 times and then succeed with the queued client
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
|
||||
managedStreamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties);
|
||||
assertEquals(ManagedStreamingIngestClient.ATTEMPT_COUNT, times[0]);
|
||||
} finally {
|
||||
|
@ -416,7 +415,7 @@ class ManagedStreamingIngestClientTest {
|
|||
throw new DataServiceException("some cluster", "Some error", false);
|
||||
}).thenReturn(null);
|
||||
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
|
||||
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream, leaveOpen, sourceId);
|
||||
OperationStatus status = managedStreamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
|
||||
assertEquals(OperationStatus.Succeeded, status);
|
||||
|
@ -465,7 +464,7 @@ class ManagedStreamingIngestClientTest {
|
|||
throw new DataServiceException("some cluster", "Some error", ex, false);
|
||||
}).thenReturn(null);
|
||||
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
|
||||
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream, leaveOpen, sourceId);
|
||||
OperationStatus status = managedStreamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
|
||||
assertEquals(OperationStatus.Succeeded, status);
|
||||
|
@ -506,7 +505,7 @@ class ManagedStreamingIngestClientTest {
|
|||
}).thenAnswer((a) -> {
|
||||
throw new DataServiceException("some cluster", "Some error", ex, true);
|
||||
}).thenReturn(null);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
|
||||
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream);
|
||||
assertThrows(IngestionServiceException.class, () -> managedStreamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties));
|
||||
} finally {
|
||||
|
@ -525,7 +524,7 @@ class ManagedStreamingIngestClientTest {
|
|||
|
||||
UUID sourceId = useSourceId ? CustomUUID : null;
|
||||
InputStream inputStream = new CloseableByteArrayInputStream(bytes);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
|
||||
|
||||
ArgumentCaptor<InputStream> capture = ArgumentCaptor.forClass(InputStream.class);
|
||||
|
||||
|
|
|
@ -9,7 +9,11 @@ import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
|
|||
import com.microsoft.azure.kusto.ingest.result.IngestionResult;
|
||||
import com.microsoft.azure.kusto.ingest.result.IngestionStatus;
|
||||
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
|
||||
import com.microsoft.azure.kusto.ingest.source.*;
|
||||
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
|
||||
import com.microsoft.azure.kusto.ingest.source.CompressionType;
|
||||
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
|
||||
import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo;
|
||||
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
|
||||
import com.microsoft.azure.storage.blob.CloudBlockBlob;
|
||||
import com.microsoft.azure.storage.table.TableServiceEntity;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
|
@ -21,13 +25,30 @@ import java.io.FileInputStream;
|
|||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Paths;
|
||||
import java.sql.*;
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
import static com.microsoft.azure.kusto.ingest.QueuedIngestClient.EXPECTED_SERVICE_TYPE;
|
||||
import static com.microsoft.azure.kusto.ingest.QueuedIngestClient.WRONG_ENDPOINT_MESSAGE;
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.mockito.Mockito.any;
|
||||
import static org.mockito.Mockito.anyBoolean;
|
||||
import static org.mockito.Mockito.anyString;
|
||||
import static org.mockito.Mockito.atLeast;
|
||||
import static org.mockito.Mockito.atLeastOnce;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
class QueuedIngestClientTest {
|
||||
private static final ResourceManager resourceManagerMock = mock(ResourceManager.class);
|
||||
|
@ -35,7 +56,7 @@ class QueuedIngestClientTest {
|
|||
private static QueuedIngestClient queuedIngestClient;
|
||||
private static IngestionProperties ingestionProperties;
|
||||
private static String testFilePath;
|
||||
private static final String STORAGE_URI = "https://ms.com/storageUri";
|
||||
private static final String STORAGE_URL = "https://testcontosourl.com/storageUrl";
|
||||
private static final String ENDPOINT_SERVICE_TYPE_ENGINE = "Engine";
|
||||
|
||||
@BeforeAll
|
||||
|
@ -52,14 +73,14 @@ class QueuedIngestClientTest {
|
|||
when(resourceManagerMock.getIdentityToken()).thenReturn("identityToken");
|
||||
|
||||
when(azureStorageClientMock.uploadStreamToBlob(any(InputStream.class), anyString(), anyString(), anyBoolean()))
|
||||
.thenReturn(new CloudBlockBlob(new URI(STORAGE_URI)));
|
||||
.thenReturn(new CloudBlockBlob(new URI(STORAGE_URL)));
|
||||
|
||||
when(azureStorageClientMock.getBlobPathWithSas(any(CloudBlockBlob.class))).thenReturn(STORAGE_URI);
|
||||
when(azureStorageClientMock.getBlobPathWithSas(any(CloudBlockBlob.class))).thenReturn(STORAGE_URL);
|
||||
|
||||
when(azureStorageClientMock.getBlobSize(anyString())).thenReturn(100L);
|
||||
|
||||
when(azureStorageClientMock.uploadLocalFileToBlob(anyString(), anyString(), anyString(), anyBoolean()))
|
||||
.thenReturn(new CloudBlockBlob(new URI(STORAGE_URI)));
|
||||
.thenReturn(new CloudBlockBlob(new URI(STORAGE_URL)));
|
||||
|
||||
doNothing().when(azureStorageClientMock).azureTableInsertEntity(anyString(), any(TableServiceEntity.class));
|
||||
|
||||
|
@ -72,8 +93,8 @@ class QueuedIngestClientTest {
|
|||
|
||||
queuedIngestClient = new QueuedIngestClient(resourceManagerMock, azureStorageClientMock);
|
||||
ingestionProperties = new IngestionProperties("dbName", "tableName");
|
||||
ingestionProperties.setIngestionMapping("mappingName", IngestionMapping.IngestionMappingKind.Csv);
|
||||
ingestionProperties.setDataFormat(DataFormat.csv);
|
||||
ingestionProperties.setIngestionMapping("mappingName", IngestionMapping.IngestionMappingKind.CSV);
|
||||
ingestionProperties.setDataFormat(DataFormat.CSV);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -87,7 +108,7 @@ class QueuedIngestClientTest {
|
|||
@Test
|
||||
void IngestFromBlob_IngestionReportMethodIsTable_NotEmptyIngestionStatus() throws Exception {
|
||||
BlobSourceInfo blobSourceInfo = new BlobSourceInfo("http://blobPath.com", 100);
|
||||
ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.Table);
|
||||
ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.TABLE);
|
||||
|
||||
IngestionResult result = queuedIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties);
|
||||
assertNotEquals(0, result.getIngestionStatusesLength());
|
||||
|
@ -111,7 +132,7 @@ class QueuedIngestClientTest {
|
|||
@Test
|
||||
void IngestFromBlob_IngestionReportMethodIsTable_RemovesSecrets() throws Exception {
|
||||
BlobSourceInfo blobSourceInfo = new BlobSourceInfo("https://storage.table.core.windows.net/ingestionsstatus20190505?sv=2018-03-28&tn=ingestionsstatus20190505&sig=anAusomeSecret%2FK024xNydFzT%2B2cCE%2BA2S8Y6U%3D&st=2019-05-05T09%3A00%3A31Z&se=2019-05-09T10%3A00%3A31Z&sp=raud", 100);
|
||||
ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.Table);
|
||||
ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.TABLE);
|
||||
ArgumentCaptor<TableServiceEntity> captur = ArgumentCaptor.forClass(TableServiceEntity.class);
|
||||
|
||||
queuedIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties);
|
||||
|
@ -296,16 +317,16 @@ class QueuedIngestClientTest {
|
|||
holder.name,
|
||||
"db1",
|
||||
"t1",
|
||||
format.name(),
|
||||
format.getKustoValue(),
|
||||
shouldCompress ? CompressionType.gz : compression);
|
||||
};
|
||||
String csvNoCompression = genName.apply(DataFormat.csv, null);
|
||||
String csvNoCompression = genName.apply(DataFormat.CSV, null);
|
||||
assert (csvNoCompression.endsWith(".csv.gz"));
|
||||
|
||||
String csvCompression = genName.apply(DataFormat.csv, CompressionType.zip);
|
||||
String csvCompression = genName.apply(DataFormat.CSV, CompressionType.zip);
|
||||
assert (csvCompression.endsWith(".csv.zip"));
|
||||
|
||||
String parquet = genName.apply(DataFormat.parquet, null);
|
||||
String parquet = genName.apply(DataFormat.PARQUET, null);
|
||||
assert (parquet.endsWith(".parquet"));
|
||||
|
||||
String avroLocalFileName = "avi.avro";
|
||||
|
@ -313,11 +334,11 @@ class QueuedIngestClientTest {
|
|||
CompressionType compressionTypeRes = AzureStorageClient.getCompression(avroLocalFileName);
|
||||
CompressionType compressionTypeRes2 = AzureStorageClient.getCompression(avroLocalCompressFileName);
|
||||
holder.name = avroLocalFileName;
|
||||
String avroName = genName.apply(DataFormat.avro, compressionTypeRes);
|
||||
String avroName = genName.apply(DataFormat.AVRO, compressionTypeRes);
|
||||
assert (avroName.endsWith(".avro"));
|
||||
|
||||
holder.name = avroLocalCompressFileName;
|
||||
String avroNameCompression = genName.apply(DataFormat.avro, compressionTypeRes2);
|
||||
String avroNameCompression = genName.apply(DataFormat.AVRO, compressionTypeRes2);
|
||||
assert (avroNameCompression.endsWith(".avro.gz"));
|
||||
}
|
||||
|
||||
|
|
|
@ -12,7 +12,11 @@ import com.microsoft.azure.kusto.data.exceptions.DataWebException;
|
|||
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
|
||||
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
|
||||
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
|
||||
import com.microsoft.azure.kusto.ingest.source.*;
|
||||
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
|
||||
import com.microsoft.azure.kusto.ingest.source.CompressionType;
|
||||
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
|
||||
import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo;
|
||||
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
|
||||
import com.microsoft.azure.storage.blob.BlobInputStream;
|
||||
import com.microsoft.azure.storage.blob.BlobProperties;
|
||||
import com.microsoft.azure.storage.blob.CloudBlockBlob;
|
||||
|
@ -40,8 +44,18 @@ import java.util.zip.GZIPOutputStream;
|
|||
|
||||
import static com.microsoft.azure.kusto.ingest.IngestClientBase.WRONG_ENDPOINT_MESSAGE;
|
||||
import static com.microsoft.azure.kusto.ingest.StreamingIngestClient.EXPECTED_SERVICE_TYPE;
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.Mockito.any;
|
||||
import static org.mockito.Mockito.atLeastOnce;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.eq;
|
||||
import static org.mockito.Mockito.isNull;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
class StreamingIngestClientTest {
|
||||
|
||||
|
@ -141,8 +155,8 @@ class StreamingIngestClientTest {
|
|||
String data = "{\"Name\": \"name\", \"Age\": \"age\", \"Weight\": \"weight\", \"Height\": \"height\"}";
|
||||
InputStream inputStream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(data).array());
|
||||
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json);
|
||||
ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.Json);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
|
||||
ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.JSON);
|
||||
OperationStatus status = streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
|
||||
assertEquals(OperationStatus.Succeeded, status);
|
||||
verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(),
|
||||
|
@ -164,8 +178,8 @@ class StreamingIngestClientTest {
|
|||
InputStream inputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
|
||||
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream);
|
||||
streamSourceInfo.setCompressionType(CompressionType.gz);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json);
|
||||
ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.Json);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
|
||||
ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.JSON);
|
||||
OperationStatus status = streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
|
||||
assertEquals(OperationStatus.Succeeded, status);
|
||||
verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(),
|
||||
|
@ -241,7 +255,7 @@ class StreamingIngestClientTest {
|
|||
String data = "{\"Name\": \"name\", \"Age\": \"age\", \"Weight\": \"weight\", \"Height\": \"height\"}";
|
||||
InputStream inputStream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(data).array());
|
||||
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
|
||||
IngestionClientException ingestionClientException = assertThrows(IngestionClientException.class,
|
||||
() -> streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties),
|
||||
"Expected IngestionClientException to be thrown, but it didn't");
|
||||
|
@ -253,8 +267,8 @@ class StreamingIngestClientTest {
|
|||
String data = "{\"Name\": \"name\", \"Age\": \"age\", \"Weight\": \"weight\", \"Height\": \"height\"}";
|
||||
InputStream inputStream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(data).array());
|
||||
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json);
|
||||
ingestionProperties.setIngestionMapping("CsvMapping", IngestionMapping.IngestionMappingKind.Csv);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
|
||||
ingestionProperties.setIngestionMapping("CsvMapping", IngestionMapping.IngestionMappingKind.CSV);
|
||||
IngestionClientException ingestionClientException = assertThrows(IngestionClientException.class,
|
||||
() -> streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties),
|
||||
"Expected IngestionClientException to be thrown, but it didn't");
|
||||
|
@ -265,7 +279,7 @@ class StreamingIngestClientTest {
|
|||
void IngestFromStream_AvroNoMappingReference_IngestionClientException() {
|
||||
InputStream inputStream = new ByteArrayInputStream(new byte[10]);
|
||||
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.avro);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.AVRO);
|
||||
IngestionClientException ingestionClientException = assertThrows(IngestionClientException.class,
|
||||
() -> streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties),
|
||||
"Expected IngestionClientException to be thrown, but it didn't");
|
||||
|
@ -276,8 +290,8 @@ class StreamingIngestClientTest {
|
|||
void IngestFromStream_AvroWrongMappingKind_IngestionClientException() {
|
||||
InputStream inputStream = new ByteArrayInputStream(new byte[10]);
|
||||
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.avro);
|
||||
ingestionProperties.setIngestionMapping("CsvMapping", IngestionMapping.IngestionMappingKind.Csv);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.AVRO);
|
||||
ingestionProperties.setIngestionMapping("CsvMapping", IngestionMapping.IngestionMappingKind.CSV);
|
||||
IngestionClientException ingestionClientException = assertThrows(IngestionClientException.class,
|
||||
() -> streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties),
|
||||
"Expected IngestionClientException to be thrown, but it didn't");
|
||||
|
@ -356,8 +370,8 @@ class StreamingIngestClientTest {
|
|||
FileSourceInfo fileSourceInfo = new FileSourceInfo(path, new File(path).length());
|
||||
String contents = new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8).trim();
|
||||
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json);
|
||||
ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.Json);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
|
||||
ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.JSON);
|
||||
OperationStatus status = streamingIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
|
||||
assertEquals(OperationStatus.Succeeded, status);
|
||||
verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(),
|
||||
|
@ -371,8 +385,8 @@ class StreamingIngestClientTest {
|
|||
String resourcesDirectory = System.getProperty("user.dir") + "/src/test/resources/";
|
||||
String path = resourcesDirectory + "testdata.json.gz";
|
||||
FileSourceInfo fileSourceInfo = new FileSourceInfo(path, new File(path).length());
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json);
|
||||
ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.Json);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
|
||||
ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.JSON);
|
||||
OperationStatus status = streamingIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
|
||||
assertEquals(OperationStatus.Succeeded, status);
|
||||
verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(),
|
||||
|
@ -463,7 +477,7 @@ class StreamingIngestClientTest {
|
|||
String resourcesDirectory = System.getProperty("user.dir") + "/src/test/resources/";
|
||||
String path = resourcesDirectory + "testdata.json";
|
||||
FileSourceInfo fileSourceInfo = new FileSourceInfo(path, new File(path).length());
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
|
||||
IngestionClientException ingestionClientException = assertThrows(IngestionClientException.class,
|
||||
() -> streamingIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties),
|
||||
"Expected IngestionClientException to be thrown, but it didn't");
|
||||
|
@ -475,8 +489,8 @@ class StreamingIngestClientTest {
|
|||
String resourcesDirectory = System.getProperty("user.dir") + "/src/test/resources/";
|
||||
String path = resourcesDirectory + "testdata.json";
|
||||
FileSourceInfo fileSourceInfo = new FileSourceInfo(path, new File(path).length());
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json);
|
||||
ingestionProperties.setIngestionMapping("CsvMapping", IngestionMapping.IngestionMappingKind.Csv);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
|
||||
ingestionProperties.setIngestionMapping("CsvMapping", IngestionMapping.IngestionMappingKind.CSV);
|
||||
IngestionClientException ingestionClientException = assertThrows(IngestionClientException.class,
|
||||
() -> streamingIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties),
|
||||
"Expected IngestionClientException to be thrown, but it didn't");
|
||||
|
|
|
@ -4,11 +4,20 @@
|
|||
package com.microsoft.azure.kusto.quickstart;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.microsoft.azure.kusto.data.*;
|
||||
import com.microsoft.azure.kusto.data.Client;
|
||||
import com.microsoft.azure.kusto.data.ClientImpl;
|
||||
import com.microsoft.azure.kusto.data.ClientRequestProperties;
|
||||
import com.microsoft.azure.kusto.data.KustoOperationResult;
|
||||
import com.microsoft.azure.kusto.data.KustoResultColumn;
|
||||
import com.microsoft.azure.kusto.data.KustoResultSetTable;
|
||||
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
|
||||
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
|
||||
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
|
||||
import com.microsoft.azure.kusto.ingest.*;
|
||||
import com.microsoft.azure.kusto.ingest.IngestClient;
|
||||
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
|
||||
import com.microsoft.azure.kusto.ingest.IngestionMapping;
|
||||
import com.microsoft.azure.kusto.ingest.IngestionProperties;
|
||||
import com.microsoft.azure.kusto.ingest.SecurityUtils;
|
||||
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
|
||||
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
|
||||
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
|
||||
|
@ -22,7 +31,11 @@ import java.net.URISyntaxException;
|
|||
import java.security.GeneralSecurityException;
|
||||
import java.security.PrivateKey;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.util.*;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Scanner;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class KustoSampleApp {
|
||||
|
@ -86,7 +99,7 @@ public class KustoSampleApp {
|
|||
|
||||
if (shouldIngestData) {
|
||||
for (Map<String, String> file : dataToIngest) {
|
||||
IngestionProperties.DataFormat dataFormat = IngestionProperties.DataFormat.valueOf(file.get("format").toLowerCase());
|
||||
IngestionProperties.DataFormat dataFormat = IngestionProperties.DataFormat.valueOf(file.get("format").toUpperCase());
|
||||
String mappingName = file.get("mappingName");
|
||||
|
||||
// Tip: This is generally a one-time configuration.
|
||||
|
@ -349,25 +362,25 @@ public class KustoSampleApp {
|
|||
private static boolean createIngestionMappings(boolean useExistingMapping, Client kustoClient, String databaseName, String tableName, String mappingName, String mappingValue, IngestionProperties.DataFormat dataFormat) {
|
||||
if (!useExistingMapping) {
|
||||
if (dataFormat.isMappingRequired() && StringUtils.isBlank(mappingValue)) {
|
||||
System.out.printf("The data format '%s' requires a mapping, but configuration indicates to not use an existing mapping and no mapping was provided. Skipping this ingestion.%n", dataFormat.name());
|
||||
System.out.printf("The data format '%s' requires a mapping, but configuration indicates to not use an existing mapping and no mapping was provided. Skipping this ingestion.%n", dataFormat.getKustoValue());
|
||||
return false;
|
||||
}
|
||||
|
||||
if (StringUtils.isNotBlank(mappingValue)) {
|
||||
IngestionMapping.IngestionMappingKind ingestionMappingKind = dataFormat.getIngestionMappingKind();
|
||||
waitForUserToProceed(String.format("Create a '%s' mapping reference named '%s'", ingestionMappingKind, mappingName));
|
||||
waitForUserToProceed(String.format("Create a '%s' mapping reference named '%s'", ingestionMappingKind.getKustoValue(), mappingName));
|
||||
|
||||
if (StringUtils.isBlank(mappingName)) {
|
||||
mappingName = "DefaultQuickstartMapping" + UUID.randomUUID().toString().substring(0, 5);
|
||||
}
|
||||
String mappingCommand = String.format(".create-or-alter table %s ingestion %s mapping '%s' '%s'", tableName, ingestionMappingKind.name().toLowerCase(), mappingName, mappingValue);
|
||||
String mappingCommand = String.format(".create-or-alter table %s ingestion %s mapping '%s' '%s'", tableName, ingestionMappingKind.getKustoValue().toLowerCase(), mappingName, mappingValue);
|
||||
if (!executeControlCommand(kustoClient, databaseName, mappingCommand)) {
|
||||
System.out.printf("Failed to create a '%s' mapping reference named '%s'. Skipping this ingestion.%n", ingestionMappingKind, mappingName);
|
||||
System.out.printf("Failed to create a '%s' mapping reference named '%s'. Skipping this ingestion.%n", ingestionMappingKind.getKustoValue(), mappingName);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} else if (dataFormat.isMappingRequired() && StringUtils.isBlank(mappingName)) {
|
||||
System.out.printf("The data format '%s' requires a mapping and the configuration indicates an existing mapping should be used, but none was provided. Skipping this ingestion.%n", dataFormat.name());
|
||||
System.out.printf("The data format '%s' requires a mapping and the configuration indicates an existing mapping should be used, but none was provided. Skipping this ingestion.%n", dataFormat.getKustoValue());
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
|
@ -379,8 +392,8 @@ public class KustoSampleApp {
|
|||
waitForUserToProceed(String.format("Ingest '%s' from '%s'", uri, sourceType));
|
||||
// Tip: When ingesting json files, if each line represents a single-line json, use MULTIJSON format even if the file only contains one line.
|
||||
// If the json contains whitespace formatting, use SINGLEJSON. In this case, only one data row json object is allowed per file.
|
||||
if (dataFormat == IngestionProperties.DataFormat.json) {
|
||||
dataFormat = IngestionProperties.DataFormat.multijson;
|
||||
if (dataFormat == IngestionProperties.DataFormat.JSON) {
|
||||
dataFormat = IngestionProperties.DataFormat.MULTIJSON;
|
||||
}
|
||||
|
||||
// Tip: Kusto's Java SDK can ingest data from files, blobs, java.sql.ResultSet objects, and open streams.
|
||||
|
|
|
@ -75,7 +75,7 @@ ConnectionStringBuilder csb =
|
|||
System.getProperty("appKey"),
|
||||
System.getProperty("appTenant"));
|
||||
```
|
||||
2. Initialize client and it's properties
|
||||
2. Initialize client and its properties
|
||||
|
||||
```java
|
||||
IngestClient client = IngestClientFactory.createClient(csb);
|
||||
|
@ -138,7 +138,7 @@ ConnectionStringBuilder csb =
|
|||
System.getProperty("appKey"),
|
||||
System.getProperty("appTenant"));
|
||||
```
|
||||
2. Initialize client and it's properties
|
||||
2. Initialize client and its properties
|
||||
|
||||
```java
|
||||
IngestClient client = IngestClientFactory.createClient(csb);
|
||||
|
@ -228,7 +228,7 @@ ConnectionStringBuilder csb =
|
|||
System.getProperty("appTenant"));
|
||||
```
|
||||
|
||||
3. Initialize client and it's properties
|
||||
3. Initialize client and its properties
|
||||
|
||||
```java
|
||||
IngestClient client = IngestClientFactory.createClient(csb);
|
||||
|
@ -237,7 +237,7 @@ IngestionProperties ingestionProperties = new IngestionProperties( System.getPro
|
|||
System.getProperty("tableName"));
|
||||
ingestionProperties.getIngestionMapping().setIngestionMappingReference(System.getProperty("dataMappingName"), IngestionMapping.IngestionMappingKind.Csv);
|
||||
ingestionProperties.setReportMethod(QueueAndTable);
|
||||
ingestionProperties.setReportLevel(IngestionProperties.IngestionReportLevel.FailuresAndSuccesses);
|
||||
ingestionProperties.setReportLevel(IngestionProperties.IngestionReportLevel.FAILURES_AND_SUCCESSES);
|
||||
```
|
||||
|
||||
4. Load file and ingest it into table
|
||||
|
|
|
@ -2,7 +2,11 @@
|
|||
// Licensed under the MIT License.
|
||||
|
||||
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
|
||||
import com.microsoft.azure.kusto.ingest.*;
|
||||
import com.microsoft.azure.kusto.ingest.ColumnMapping;
|
||||
import com.microsoft.azure.kusto.ingest.IngestClient;
|
||||
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
|
||||
import com.microsoft.azure.kusto.ingest.IngestionMapping;
|
||||
import com.microsoft.azure.kusto.ingest.IngestionProperties;
|
||||
import com.microsoft.azure.kusto.ingest.result.IngestionResult;
|
||||
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
|
||||
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
|
||||
|
@ -21,7 +25,7 @@ public class FileIngestion {
|
|||
try (IngestClient client = IngestClientFactory.createClient(csb)) {
|
||||
IngestionProperties ingestionProperties = new IngestionProperties(System.getProperty("dbName"),
|
||||
System.getProperty("tableName"));
|
||||
ingestionProperties.setIngestionMapping(System.getProperty("dataMappingName"), IngestionMapping.IngestionMappingKind.Json);
|
||||
ingestionProperties.setIngestionMapping(System.getProperty("dataMappingName"), IngestionMapping.IngestionMappingKind.JSON);
|
||||
|
||||
FileSourceInfo fileSourceInfo = new FileSourceInfo(System.getProperty("filePath"), 0);
|
||||
IngestionResult ingestionResult = client.ingestFromFile(fileSourceInfo, ingestionProperties);
|
||||
|
@ -37,8 +41,8 @@ public class FileIngestion {
|
|||
csvColumnMapping.setOrdinal(0);
|
||||
ColumnMapping csvColumnMapping2 = new ColumnMapping("ColB", "int");
|
||||
csvColumnMapping2.setOrdinal(1);
|
||||
ingestionProperties2.setDataFormat(IngestionProperties.DataFormat.csv);
|
||||
ingestionProperties2.setIngestionMapping(new ColumnMapping[]{csvColumnMapping, csvColumnMapping2}, IngestionMapping.IngestionMappingKind.Csv);
|
||||
ingestionProperties2.setDataFormat(IngestionProperties.DataFormat.CSV);
|
||||
ingestionProperties2.setIngestionMapping(new ColumnMapping[]{csvColumnMapping, csvColumnMapping2}, IngestionMapping.IngestionMappingKind.CSV);
|
||||
|
||||
IngestionResult ingestionResult2 = client.ingestFromStream(info, ingestionProperties2);
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ public class FileIngestionCompletableFuture {
|
|||
IngestionProperties ingestionProperties = new IngestionProperties(
|
||||
System.getProperty("dbName"),
|
||||
System.getProperty("tableName"));
|
||||
ingestionProperties.setIngestionMapping(System.getProperty("dataMappingName"), IngestionMapping.IngestionMappingKind.Json);
|
||||
ingestionProperties.setIngestionMapping(System.getProperty("dataMappingName"), IngestionMapping.IngestionMappingKind.JSON);
|
||||
|
||||
FileSourceInfo fileSourceInfo = new FileSourceInfo(System.getProperty("filePath"), 0);
|
||||
|
||||
|
|
|
@ -14,7 +14,11 @@ import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
|
|||
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
|
||||
import java.io.*;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
|
@ -54,7 +58,7 @@ public class StreamingIngest {
|
|||
String data = "0,00000000-0000-0000-0001-020304050607,0,0,0,0,0,0,0,0,0,0,2014-01-01T01:01:01.0000000Z,Zero,\"Zero\",0,00:00:00,,null";
|
||||
InputStream inputStream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(data).array());
|
||||
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.csv);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.CSV);
|
||||
OperationStatus status = streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
|
||||
System.out.println(status.toString());
|
||||
|
||||
|
@ -71,8 +75,8 @@ public class StreamingIngest {
|
|||
System.out.println(status.toString());
|
||||
|
||||
// Open JSON File Stream and Ingest
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json);
|
||||
ingestionProperties.setIngestionMapping(mapping, IngestionMapping.IngestionMappingKind.Json);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
|
||||
ingestionProperties.setIngestionMapping(mapping, IngestionMapping.IngestionMappingKind.JSON);
|
||||
fileInputStream = new FileInputStream(resourcesDirectory + "dataset.json");
|
||||
streamSourceInfo.setStream(fileInputStream);
|
||||
status = streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
|
||||
|
@ -85,15 +89,15 @@ public class StreamingIngest {
|
|||
//Ingest CSV file
|
||||
String path = resourcesDirectory + "dataset.csv";
|
||||
FileSourceInfo fileSourceInfo = new FileSourceInfo(path, new File(path).length());
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.csv);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.CSV);
|
||||
OperationStatus status = streamingIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
|
||||
System.out.println(status.toString());
|
||||
|
||||
//Ingest compressed JSON file
|
||||
path = resourcesDirectory + "dataset.jsonz.gz";
|
||||
fileSourceInfo = new FileSourceInfo(path, new File(path).length());
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.json);
|
||||
ingestionProperties.setIngestionMapping(mapping, IngestionMapping.IngestionMappingKind.Json);
|
||||
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
|
||||
ingestionProperties.setIngestionMapping(mapping, IngestionMapping.IngestionMappingKind.JSON);
|
||||
status = streamingIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
|
||||
System.out.println(status.toString());
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@ import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
|
|||
|
||||
import java.util.List;
|
||||
|
||||
import static com.microsoft.azure.kusto.ingest.IngestionProperties.IngestionReportMethod.QueueAndTable;
|
||||
import static com.microsoft.azure.kusto.ingest.IngestionProperties.IngestionReportMethod.QUEUE_AND_TABLE;
|
||||
|
||||
public class TableStatus {
|
||||
public static void main(String[] args) {
|
||||
|
@ -30,9 +30,9 @@ public class TableStatus {
|
|||
try (IngestClient client = IngestClientFactory.createClient(csb)) {
|
||||
IngestionProperties ingestionProperties = new IngestionProperties(System.getProperty("dbName"),
|
||||
System.getProperty("tableName"));
|
||||
ingestionProperties.setIngestionMapping(System.getProperty("dataMappingName"), IngestionMapping.IngestionMappingKind.Json);
|
||||
ingestionProperties.setReportMethod(QueueAndTable);
|
||||
ingestionProperties.setReportLevel(IngestionProperties.IngestionReportLevel.FailuresAndSuccesses);
|
||||
ingestionProperties.setIngestionMapping(System.getProperty("dataMappingName"), IngestionMapping.IngestionMappingKind.JSON);
|
||||
ingestionProperties.setReportMethod(QUEUE_AND_TABLE);
|
||||
ingestionProperties.setReportLevel(IngestionProperties.IngestionReportLevel.FAILURES_AND_SUCCESSES);
|
||||
FileSourceInfo fileSourceInfo = new FileSourceInfo(System.getProperty("filePath"), 0);
|
||||
ingestionResult = client.ingestFromFile(fileSourceInfo, ingestionProperties);
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче