Reactivate tests and fix non passed tests (#191)

* E2e

* E2e

* fix tests

* fix tests

* fix tests

* disable prompt

* disable prompt

* e2e

* Fixed test

* Removed not needed dependency

* Quotes

* Revert "Removed not needed dependency"

This reverts commit a7de79e8aa.

* Leave only api

* Fixed test

* Added doc

* Fixed NRE

* Fixed implicit dependency

* Clean up some unnecessary changes

* Ensure DataFormat always exists (defaults to CSV)

* CloudInfo won't have the same values as the default for all connection strings

Co-authored-by: Ohad Bitton <ohbitton@microsoft.com>
Co-authored-by: Yihezkel Schoenbrun <yihezkel@schoenbrun.net>
Co-authored-by: Asaf Mahlev <asafmahlev@microsoft.com>
Co-authored-by: Yihezkel Schoenbrun <yischoen@microsoft.com>
This commit is contained in:
ohad bitton 2021-12-23 15:31:52 +02:00 коммит произвёл GitHub
Родитель e4028c4761
Коммит a779473d90
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
19 изменённых файлов: 136 добавлений и 112 удалений

Просмотреть файл

@ -59,10 +59,6 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin.version}</version>
<configuration>
<excludes>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>

Просмотреть файл

@ -3,7 +3,10 @@
package com.microsoft.azure.kusto.data;
import com.microsoft.azure.kusto.data.exceptions.*;
import com.microsoft.azure.kusto.data.auth.CloudInfo;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.data.exceptions.DataWebException;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.*;
import org.apache.http.client.HttpClient;
@ -34,7 +37,6 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.zip.DeflaterInputStream;
import java.util.zip.GZIPInputStream;
import com.microsoft.azure.kusto.data.auth.CloudInfo;
class Utils {
private static final int MAX_REDIRECT_COUNT = 1;
@ -67,11 +69,9 @@ class Utils {
throw createExceptionFromResponse(url, response, null, responseContent);
}
}
}
catch (SocketTimeoutException e) {
} catch (SocketTimeoutException e) {
throw new DataServiceException(url, "Timed out in post request:" + e.getMessage(), false);
}
catch (JSONException | IOException e) {
} catch (JSONException | IOException e) {
throw new DataClientException(url, "Error in post request:" + e.getMessage(), e);
}
return null;
@ -155,7 +155,7 @@ class Utils {
*/
String activityId = determineActivityId(httpResponse);
if (!StringUtils.isBlank(errorFromResponse)) {
String message = "";
String message;
DataWebException formattedException = new DataWebException(errorFromResponse, httpResponse);
try {
message = String.format("%s, ActivityId='%s'", formattedException.getApiError().getDescription(), activityId);
@ -165,6 +165,9 @@ class Utils {
}
}
errorFromResponse = String.format("Http StatusCode='%s', ActivityId='%s'", httpResponse.getStatusLine().toString(), activityId);
if (thrownException == null) {
thrownException = new DataWebException(errorFromResponse, httpResponse);
}
return new DataServiceException(url, errorFromResponse, thrownException, false);
}
@ -263,7 +266,7 @@ class Utils {
long seconds = duration.getSeconds();
int nanos = duration.getNano();
long hours = TimeUnit.SECONDS.toHours(seconds) % TimeUnit.DAYS.toHours(1);
long minutes = TimeUnit.SECONDS.toMinutes( seconds) % TimeUnit.MINUTES.toSeconds(1);
long minutes = TimeUnit.SECONDS.toMinutes(seconds) % TimeUnit.MINUTES.toSeconds(1);
long secs = seconds % TimeUnit.HOURS.toSeconds(1);
long days = TimeUnit.SECONDS.toDays(seconds);
String positive = String.format(

Просмотреть файл

@ -2,8 +2,6 @@ package com.microsoft.azure.kusto.data.auth;
import com.microsoft.azure.kusto.data.UriUtils;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
@ -37,6 +35,7 @@ public class CloudInfo {
public static final String LOCALHOST = "http://localhost";
private static final Map<String, CloudInfo> cache = new HashMap<>();
static {
cache.put(LOCALHOST, DEFAULT_CLOUD);
}
@ -57,15 +56,20 @@ public class CloudInfo {
this.firstPartyAuthorityUrl = firstPartyAuthorityUrl;
}
public static void manuallyAddToCache(String clusterUrl, CloudInfo cloudInfo) {
public static void manuallyAddToCache(String clusterUrl, CloudInfo cloudInfo) throws URISyntaxException {
synchronized (cache) {
cache.put(StringUtils.stripEnd(clusterUrl, "/"), cloudInfo);
cache.put(UriUtils.setPathForUri(clusterUrl, ""), cloudInfo);
}
}
public static CloudInfo retrieveCloudInfoForCluster(String clusterUrl) throws DataServiceException {
synchronized (cache) {
CloudInfo cloudInfo = cache.get(StringUtils.stripEnd(clusterUrl, "/"));
CloudInfo cloudInfo;
try {
cloudInfo = cache.get(UriUtils.setPathForUri(clusterUrl, ""));
} catch (URISyntaxException ex) {
throw new DataServiceException(clusterUrl, "Error in metadata endpoint, cluster uri invalid", ex, true);
}
if (cloudInfo != null) {
return cloudInfo;
}

Просмотреть файл

@ -3,6 +3,8 @@
package com.microsoft.azure.kusto.data.exceptions;
import org.apache.http.HttpResponse;
public class DataServiceException extends KustoDataExceptionBase {
public DataServiceException(String ingestionSource, String message, boolean isPermanent) {
this(ingestionSource, message, null, isPermanent);
@ -11,4 +13,14 @@ public class DataServiceException extends KustoDataExceptionBase {
public DataServiceException(String ingestionSource, String message, Exception exception, boolean isPermanent) {
super(ingestionSource, message, exception, isPermanent);
}
public boolean is404Error() {
Throwable cause = getCause();
if (!(cause instanceof DataWebException)) {
return false;
}
HttpResponse httpResponse = ((DataWebException) cause).getHttpResponse();
return httpResponse != null && httpResponse.getStatusLine().getStatusCode() == 404;
}
}

Просмотреть файл

@ -2,19 +2,12 @@ package com.microsoft.azure.kusto.data;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.io.InputStream;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;
class UncloseableStreamTest {
@ -43,7 +36,7 @@ class UncloseableStreamTest {
@Test
void skip() throws IOException {
stream.skip(0);
verify(stream.getInnerStream(), times(1)).skip(eq((long)0));
verify(stream.getInnerStream(), times(1)).skip(0);
}
@Test
@ -55,7 +48,7 @@ class UncloseableStreamTest {
@Test
void mark() {
stream.mark(0);
verify(stream.getInnerStream(), times(1)).mark(eq(0));
verify(stream.getInnerStream(), times(1)).mark(0);
}
@Test

Просмотреть файл

@ -45,7 +45,7 @@ import static org.mockito.Mockito.spy;
public class AadAuthenticationHelperTest {
@BeforeAll
public static void setUp() {
public static void setUp() throws URISyntaxException {
CloudInfo.manuallyAddToCache("https://resource.uri", CloudInfo.DEFAULT_CLOUD);
}
@ -66,10 +66,10 @@ public class AadAuthenticationHelperTest {
aadAuthenticationHelper.initializeCloudInfo();
aadAuthenticationHelper.setRequiredMembersBasedOnCloudInfo();
assertEquals("https://login.microsoftonline.com/organizations/",aadAuthenticationHelper.aadAuthorityUrl);
assertEquals("https://login.microsoftonline.com/organizations/", aadAuthenticationHelper.aadAuthorityUrl);
assertEquals(new HashSet<>(Collections.singletonList("https://kusto.kusto.windows.net/.default")), aadAuthenticationHelper.scopes);
Assertions.assertThrows(DataServiceException.class,
() -> aadAuthenticationHelper.acquireNewAccessToken());
aadAuthenticationHelper::acquireNewAccessToken);
}
public static KeyCert readPem(String path, String password)
@ -127,7 +127,7 @@ public class AadAuthenticationHelperTest {
doReturn(null).when(aadAuthenticationHelperSpy).acquireAccessTokenSilently();
doReturn(authenticationResult).when(aadAuthenticationHelperSpy).acquireNewAccessToken();
assertEquals("firstToken", aadAuthenticationHelperSpy.acquireAccessToken());
assertEquals("https://login.microsoftonline.com/organizations/",aadAuthenticationHelperSpy.aadAuthorityUrl);
assertEquals("https://login.microsoftonline.com/organizations/", aadAuthenticationHelperSpy.aadAuthorityUrl);
assertEquals(new HashSet<>(Collections.singletonList("https://kusto.kusto.windows.net/.default")), aadAuthenticationHelperSpy.scopes);

Просмотреть файл

@ -107,11 +107,6 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin.version}</version>
<!--<configuration>
<excludes>
<exclude>**/*E2ETest.java</exclude>
</excludes>
</configuration>-->
</plugin>
</plugins>
</build>
@ -179,12 +174,6 @@
<artifactId>univocity-parsers</artifactId>
<version>${univocity-parsers.version}</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
@ -203,6 +192,12 @@
<version>${sqlite-jdbc.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
@ -213,10 +208,22 @@
<artifactId>annotations</artifactId>
<version>${annotations.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
<exclusions>
<exclusion>
<artifactId>httpcore</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>${httpcore.version}</version>
</dependency>
</dependencies>
</project>

Просмотреть файл

@ -177,7 +177,20 @@ class AzureStorageClient {
if (fileName.endsWith(".zip")) {
return CompressionType.zip;
}
return null;
}
static String removeExtension(String filename) {
if (filename == null) {
return null;
}
int extensionPos = filename.lastIndexOf('.');
int lastDirSeparator = filename.lastIndexOf('\\');
if (extensionPos == -1 || lastDirSeparator > extensionPos) {
return filename;
} else {
return filename.substring(0, extensionPos);
}
}
}

Просмотреть файл

@ -51,6 +51,9 @@ public interface IngestClient extends Closeable {
* <p>Ingest data from a Result Set into Kusto database.</p>
* This method ingests the data from a given Result Set, described in {@code resultSetSourceInfo}, into Kusto database,
* according to the properties mentioned in {@code ingestionProperties}
* <p>
* Ingesting from ResultSet is equivalent to ingesting from a csv stream.
* The DataFormat should be empty or set to "csv", and the mapping, should it be provided, should be csv mapping.
*
* @param resultSetSourceInfo The specific SourceInfo to be ingested
* @param ingestionProperties Settings used to customize the ingestion operation

Просмотреть файл

@ -43,6 +43,7 @@ public class IngestionProperties {
* <p>{@code reportMethod} : {@code IngestionReportMethod.Queue;}</p>
* <p>{@code flushImmediately} : {@code false;}</p>
* <p>{@code additionalProperties} : {@code new HashMap();}</p>
* <p>{@code dataFormat} : {@code DataFormat.csv;}</p>
* </blockquote>
*
* @param databaseName the name of the database in the destination Kusto cluster.
@ -60,10 +61,12 @@ public class IngestionProperties {
this.ingestIfNotExists = new ArrayList<>();
this.additionalTags = new ArrayList<>();
this.ingestionMapping = new IngestionMapping();
this.dataFormat = DataFormat.csv;
}
/**
* Copy constructor for {@code IngestionProperties}.
*
* @param other the instance to copy from.
*/
public IngestionProperties(IngestionProperties other) {
@ -72,6 +75,7 @@ public class IngestionProperties {
this.reportLevel = other.reportLevel;
this.reportMethod = other.reportMethod;
this.flushImmediately = other.flushImmediately;
this.dataFormat = other.getDataFormat();
this.additionalProperties = new HashMap<>(other.additionalProperties);
this.dropByTags = new ArrayList<>(other.dropByTags);
this.ingestByTags = new ArrayList<>(other.ingestByTags);
@ -207,10 +211,7 @@ public class IngestionProperties {
fullAdditionalProperties.put("ingestIfNotExists", ingestIfNotExistsJson);
}
fullAdditionalProperties.putAll(additionalProperties);
if (dataFormat != null) {
fullAdditionalProperties.put("format", dataFormat.name());
}
fullAdditionalProperties.put("format", dataFormat.name());
String mappingReference = ingestionMapping.getIngestionMappingReference();
if (StringUtils.isNotBlank(mappingReference)) {
@ -247,13 +248,13 @@ public class IngestionProperties {
}
/**
* Returns the DataFormat if it exists, and otherwise defaults to CSV
* Returns the DataFormat
*
* @return The DataFormat if it exists, and otherwise defaults to CSV
* @return The DataFormat
*/
@NotNull
public DataFormat getDataFormat() {
return dataFormat != null ? dataFormat : DataFormat.csv;
return dataFormat;
}
/**
@ -312,7 +313,7 @@ public class IngestionProperties {
}
}
} else if (StringUtils.isBlank(mappingReference)) {
if (dataFormat != null && dataFormat.isMappingRequired()) {
if (dataFormat.isMappingRequired()) {
message += String.format("Mapping must be specified for '%s' format.", dataFormat.name());
}
@ -321,8 +322,8 @@ public class IngestionProperties {
}
}
if (dataFormat != null && dataFormat.isMappingRequired() && !dataFormat.getIngestionMappingKind().equals(ingestionMappingKind)) {
message += String.format("Wrong ingestion mapping for format '%s'; found '%s' mapping kind.", dataFormat.name(), ingestionMappingKind.name());
if (dataFormat.isMappingRequired() && !dataFormat.getIngestionMappingKind().equals(ingestionMappingKind)) {
message += String.format("Wrong ingestion mapping for format '%s'; found '%s' mapping kind.", dataFormat.name(), ingestionMappingKind != null ? ingestionMappingKind.name() : "null");
}
if (message != null) {
@ -331,6 +332,13 @@ public class IngestionProperties {
}
}
public void validateResultSetProperties() throws IngestionClientException {
Ensure.isTrue(IngestionProperties.DataFormat.csv.equals(getDataFormat()),
String.format("ResultSet translates into csv format but '%s' was given", getDataFormat()));
validate();
}
public enum DataFormat {
csv(IngestionMapping.IngestionMappingKind.Csv, false, true),
tsv(IngestionMapping.IngestionMappingKind.Csv, false, true),
@ -371,13 +379,6 @@ public class IngestionProperties {
public boolean isCompressible() {
return compressible;
}
public static IngestionProperties.DataFormat getDataFormatFromString(String ingestionPropertiesDataFormat) {
if (ingestionPropertiesDataFormat == null) {
return IngestionProperties.DataFormat.csv;
}
return IngestionProperties.DataFormat.valueOf(ingestionPropertiesDataFormat.toLowerCase());
}
}
public enum IngestionReportLevel {

Просмотреть файл

@ -1,8 +1,8 @@
package com.microsoft.azure.kusto.ingest;
import com.microsoft.azure.kusto.data.Ensure;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.data.StreamingClient;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.data.exceptions.DataWebException;
import com.microsoft.azure.kusto.data.exceptions.OneApiError;
@ -93,7 +93,7 @@ public class ManagedStreamingIngestClient implements IngestClient {
Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
resultSetSourceInfo.validate();
ingestionProperties.validate();
ingestionProperties.validateResultSetProperties();
try {
StreamSourceInfo streamSourceInfo = IngestionUtils.resultSetToStream(resultSetSourceInfo);
return ingestFromStream(streamSourceInfo, ingestionProperties);

Просмотреть файл

@ -238,8 +238,8 @@ public class QueuedIngestClient extends IngestClientBase implements IngestClient
return String.format("%s__%s__%s__%s%s%s",
databaseName,
tableName,
AzureStorageClient.removeExtension(fileName),
UUID.randomUUID(),
fileName,
dataFormat == null ? "" : "." + dataFormat,
compressionType == null ? "" : "." + compressionType);
}
@ -252,7 +252,7 @@ public class QueuedIngestClient extends IngestClientBase implements IngestClient
Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
resultSetSourceInfo.validate();
ingestionProperties.validate();
ingestionProperties.validateResultSetProperties();
try {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
new CsvRoutines().write(resultSetSourceInfo.getResultSet(), byteArrayOutputStream);

Просмотреть файл

@ -7,7 +7,6 @@ import com.microsoft.azure.kusto.data.*;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.data.exceptions.DataWebException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.result.IngestionResult;
@ -96,7 +95,8 @@ public class StreamingIngestClient extends IngestClientBase implements IngestCli
Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
resultSetSourceInfo.validate();
ingestionProperties.validate();
ingestionProperties.validateResultSetProperties();
try {
StreamSourceInfo streamSourceInfo = IngestionUtils.resultSetToStream(resultSetSourceInfo);
return ingestFromStream(streamSourceInfo, ingestionProperties);
@ -117,7 +117,7 @@ public class StreamingIngestClient extends IngestClientBase implements IngestCli
streamSourceInfo.validate();
ingestionProperties.validate();
if (dataFormat.isMappingRequired() && StringUtils.isBlank(ingestionProperties.getIngestionMapping().getIngestionMappingReference())) {
throw new IngestionClientException(String.format("Mapping reference must be specified for streaming ingestion.", dataFormat.name()));
throw new IngestionClientException(String.format("Mapping reference must be specified for DataFormat '%s' in streaming ingestion.", dataFormat.name()));
}
try {
@ -133,12 +133,12 @@ public class StreamingIngestClient extends IngestClientBase implements IngestCli
} catch (DataClientException | IOException e) {
log.error(e.getMessage(), e);
if (e.getCause() instanceof DataWebException && "Error in post request".equals(e.getMessage())) {
validateEndpointServiceType(connectionDataSource, EXPECTED_SERVICE_TYPE);
}
throw new IngestionClientException(e.getMessage(), e);
} catch (DataServiceException e) {
log.error(e.getMessage(), e);
if (e.is404Error()) {
validateEndpointServiceType(connectionDataSource, EXPECTED_SERVICE_TYPE);
}
throw new IngestionServiceException(e.getMessage(), e);
}

Просмотреть файл

@ -16,26 +16,15 @@ import com.microsoft.azure.kusto.ingest.IngestionProperties.DataFormat;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.*;
import java.lang.invoke.MethodHandles;
import java.net.URISyntaxException;
import java.nio.file.Files;
@ -49,11 +38,7 @@ import java.util.Calendar;
import java.util.List;
import java.util.concurrent.Callable;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.*;
class E2ETest {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -74,14 +59,14 @@ class E2ETest {
@BeforeAll
public static void setUp() throws IOException {
appKey = System.getenv("APP_KEY");
if (appKey == null) {
String secretPath = System.getProperty("SecretPath");
if (secretPath == null) {
throw new IllegalArgumentException("SecretPath is not set");
}
appKey= Files.readAllLines(Paths.get(secretPath)).get(0);
}
appKey = System.getenv("APP_KEY");
if (appKey == null) {
String secretPath = System.getProperty("SecretPath");
if (secretPath == null) {
throw new IllegalArgumentException("SecretPath is not set");
}
appKey = Files.readAllLines(Paths.get(secretPath)).get(0);
}
tableName = "JavaTest_" + new SimpleDateFormat("yyyy_MM_dd_hh_mm_ss_SSS").format(Calendar.getInstance().getTime());
@ -130,7 +115,6 @@ class E2ETest {
resourcesPath = Paths.get(System.getProperty("user.dir"), "src", "test", "resources").toString();
try {
Thread.sleep(2000);
String mappingAsString = new String(Files.readAllBytes(Paths.get(resourcesPath, "dataset_mapping.json")));
queryClient.executeToJsonResult(databaseName, String.format(".create table %s ingestion json mapping '%s' '%s'",
tableName, mappingReference, mappingAsString));
@ -259,7 +243,7 @@ class E2ETest {
result = localQueryClient.execute(databaseName, String.format(".show database %s principals", databaseName));
//result = localQueryClient.execute(databaseName, String.format(".show version"));
} catch (Exception ex) {
Assertions.fail("Failed to execute show database principal command", ex);
Assertions.fail("Failed to execute show database principals command", ex);
}
KustoResultSetTable mainTableResultSet = result.getPrimaryResults();
while (mainTableResultSet.next()) {
@ -398,6 +382,7 @@ class E2ETest {
String clusterUrl = System.getenv("ENGINE_CONNECTION_STRING");
CloudInfo cloudInfo = CloudInfo.retrieveCloudInfoForCluster(clusterUrl);
assertNotSame(CloudInfo.DEFAULT_CLOUD, cloudInfo);
assertNotNull(cloudInfo);
assertSame(cloudInfo, CloudInfo.retrieveCloudInfoForCluster(clusterUrl));
}

Просмотреть файл

@ -90,10 +90,11 @@ class ManagedStreamingIngestClientTest {
}
@Test
void IngestFromBlob_IngestionReportMethodIsQueue_IngestionStatusHardcoded1() throws Exception {
void IngestFromBlob_IngestionReportMethodIsQueue_IngestionStatusHardcoded() throws Exception {
BlobSourceInfo blobSourceInfo = new BlobSourceInfo("http://blobPath.com", 100);
IngestionResult result = managedStreamingIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties);
assertEquals(1, result.getIngestionStatusesLength());
assertEquals(OperationStatus.Queued, result.getIngestionStatusCollection().get(0).status);
}
@Test

Просмотреть файл

@ -8,6 +8,7 @@ import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.result.IngestionResult;
import com.microsoft.azure.kusto.ingest.result.IngestionStatus;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.*;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.table.TableServiceEntity;
@ -39,7 +40,7 @@ class QueuedIngestClientTest {
@BeforeAll
static void setUp() throws Exception {
testFilePath = Paths.get("src", "test", "resources", "testdata.json").toString();
testFilePath = Paths.get("src", "test", "resources", "testdata.csv").toString();
when(resourceManagerMock.getIngestionResource(ResourceManager.ResourceType.SECURED_READY_FOR_AGGREGATION_QUEUE))
.thenReturn("queue1")
@ -71,7 +72,8 @@ class QueuedIngestClientTest {
queuedIngestClient = new QueuedIngestClient(resourceManagerMock, azureStorageClientMock);
ingestionProperties = new IngestionProperties("dbName", "tableName");
ingestionProperties.setIngestionMapping("mappingName", IngestionMapping.IngestionMappingKind.Json);
ingestionProperties.setIngestionMapping("mappingName", IngestionMapping.IngestionMappingKind.Csv);
ingestionProperties.setDataFormat(DataFormat.csv);
}
@Test
@ -79,6 +81,7 @@ class QueuedIngestClientTest {
BlobSourceInfo blobSourceInfo = new BlobSourceInfo("http://blobPath.com", 100);
IngestionResult result = queuedIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties);
assertEquals(1, result.getIngestionStatusesLength());
assertEquals(OperationStatus.Queued, result.getIngestionStatusCollection().get(0).status);
}
@Test
@ -297,13 +300,13 @@ class QueuedIngestClientTest {
shouldCompress ? CompressionType.gz : compression);
};
String csvNoCompression = genName.apply(DataFormat.csv, null);
assert(csvNoCompression.endsWith("fileName.csv.gz"));
assert (csvNoCompression.endsWith(".csv.gz"));
String csvCompression = genName.apply(DataFormat.csv, CompressionType.zip);
assert(csvCompression.endsWith("fileName.csv.zip"));
assert (csvCompression.endsWith(".csv.zip"));
String parquet = genName.apply(DataFormat.parquet, null);
assert(parquet.endsWith("fileName.parquet"));
assert (parquet.endsWith(".parquet"));
String avroLocalFileName = "avi.avro";
String avroLocalCompressFileName = "avi.avro.gz";
@ -311,11 +314,11 @@ class QueuedIngestClientTest {
CompressionType compressionTypeRes2 = AzureStorageClient.getCompression(avroLocalCompressFileName);
holder.name = avroLocalFileName;
String avroName = genName.apply(DataFormat.avro, compressionTypeRes);
assert(avroName.endsWith("avi.avro.avro"));
assert (avroName.endsWith(".avro"));
holder.name = avroLocalCompressFileName;
String avroNameCompression = genName.apply(DataFormat.avro, compressionTypeRes2);
assert(avroNameCompression.endsWith("avi.avro.gz.avro.gz"));
assert (avroNameCompression.endsWith(".avro.gz"));
}
private ResultSet getSampleResultSet() throws SQLException {

Просмотреть файл

@ -15,6 +15,9 @@ import com.microsoft.azure.kusto.ingest.source.*;
import com.microsoft.azure.storage.blob.BlobInputStream;
import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import org.apache.http.ProtocolVersion;
import org.apache.http.message.BasicHttpResponse;
import org.apache.http.message.BasicStatusLine;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -63,7 +66,6 @@ class StreamingIngestClientTest {
@BeforeEach
void setUpEach() throws Exception {
ingestionProperties = new IngestionProperties("dbName", "tableName");
when(streamingClientMock.executeStreamingIngest(any(String.class), any(String.class), any(InputStream.class),
isNull(), any(String.class), any(String.class), any(boolean.class))).thenReturn(null);
@ -307,8 +309,9 @@ class StreamingIngestClientTest {
@Test
void IngestFromFile_GivenStreamingIngestClientAndDmEndpoint_ThrowsIngestionClientException() throws Exception {
DataClientException dataClientException = new DataClientException("some cluster", "Error in post request",
new DataWebException("Error in post request", null));
DataServiceException dataClientException = new DataServiceException("some cluster", "Error in post request. status 404",
new DataWebException("Error in post request", new BasicHttpResponse(new BasicStatusLine(new ProtocolVersion("http", 1, 1), 404, "Not found"))),
true);
doThrow(dataClientException).when(streamingClientMock).executeStreamingIngest(eq(ingestionProperties.getDatabaseName()), eq(ingestionProperties.getTableName()), any(), isNull(), any(), isNull(), eq(false));
when(streamingClientMock.execute(Commands.VERSION_SHOW_COMMAND)).thenReturn(new KustoOperationResult("{\"Tables\":[{\"TableName\":\"Table_0\",\"Columns\":[{\"ColumnName\":\"BuildVersion\",\"DataType\":\"String\"},{\"ColumnName\":\"BuildTime\",\"DataType\":\"DateTime\"},{\"ColumnName\":\"ServiceType\",\"DataType\":\"String\"},{\"ColumnName\":\"ProductVersion\",\"DataType\":\"String\"}],\"Rows\":[[\"1.0.0.0\",\"2000-01-01T00:00:00Z\",\"DataManagement\",\"PrivateBuild.yischoen.YISCHOEN-OP7070.2020-09-07 12-09-22\"]]}]}", "v1"));

Просмотреть файл

@ -66,7 +66,7 @@
<maven-dependency-plugin.version>3.1.2</maven-dependency-plugin.version>
<maven-jar-plugin.version>3.2.0</maven-jar-plugin.version>
<flatten-maven-plugin.version>1.2.5</flatten-maven-plugin.version>
<junit.version>5.7.1</junit.version>
<junit.version>5.8.1</junit.version>
<mockito.version>3.7.7</mockito.version>
</properties>

Просмотреть файл

@ -134,7 +134,7 @@ public class KustoSampleApp {
private static void loadConfigs(String configFileName) {
File configFile = new File(".\\" + configFileName);
Map<String, Object> configs = null;
Map<String, Object> configs;
try {
configs = new ObjectMapper().readValue(configFile, HashMap.class);
@ -435,7 +435,7 @@ public class KustoSampleApp {
IngestionProperties ingestionProperties = new IngestionProperties(databaseName, tableName);
ingestionProperties.setDataFormat(dataFormat);
// Learn More: For more information about supported data formats, see: https://docs.microsoft.com/azure/data-explorer/ingestion-supported-formats
if (StringUtils.isNotBlank(mappingName) && dataFormat != null) {
if (StringUtils.isNotBlank(mappingName)) {
ingestionProperties.setIngestionMapping(mappingName, dataFormat.getIngestionMappingKind());
}
// TODO (config - optional): Setting the ingestion batching policy takes up to 5 minutes to take effect.