Streaming client wrong endpoint detection (#149)
* Streaming client wrong endpoint detection * Fix invalid character for the file's encoding * Add parent class to both Queued and Streaming IngestClients for the local purpose of removing significant code duplication in the wrong-endpoint-detection feature. This change represents a better architecture that will yield further benefits. * Rename AbstractIngestClient to IngestClientBase
This commit is contained in:
Родитель
9dce220692
Коммит
a75db70f3a
|
@ -28,4 +28,13 @@ public interface StreamingClient {
|
|||
* @throws DataServiceException An exception returned from the service
|
||||
*/
|
||||
KustoOperationResult executeStreamingIngest(String database, String table, InputStream stream, ClientRequestProperties properties, String streamFormat, String mappingName, boolean leaveOpen) throws DataServiceException, DataClientException;
|
||||
|
||||
/**
|
||||
* <p>Execute the provided command against the default database.</p>
|
||||
*
|
||||
* @param command The command to execute
|
||||
* @throws DataClientException An exception originating from a client activity
|
||||
* @throws DataServiceException An exception returned from the service
|
||||
*/
|
||||
KustoOperationResult execute(String command) throws DataServiceException, DataClientException;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
package com.microsoft.azure.kusto.ingest;
|
||||
|
||||
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
|
||||
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.http.client.utils.URIBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
public abstract class IngestClientBase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
protected String connectionDataSource;
|
||||
private String endpointServiceType;
|
||||
private String suggestedEndpointUri;
|
||||
public static final String INGEST_PREFIX = "ingest-";
|
||||
protected static final String WRONG_ENDPOINT_MESSAGE =
|
||||
"You are using '%s' client type, but the provided endpoint is of ServiceType '%s'. Initialize the client with the appropriate endpoint URI";
|
||||
|
||||
protected void validateEndpointServiceType(String connectionDataSource, String expectedServiceType) throws IngestionServiceException, IngestionClientException {
|
||||
if (StringUtils.isBlank(endpointServiceType)) {
|
||||
endpointServiceType = retrieveServiceType();
|
||||
}
|
||||
if (!expectedServiceType.equals(endpointServiceType)) {
|
||||
String message = String.format(WRONG_ENDPOINT_MESSAGE, expectedServiceType, endpointServiceType);
|
||||
suggestedEndpointUri = generateEndpointSuggestion(suggestedEndpointUri, connectionDataSource);
|
||||
if (StringUtils.isNotBlank(suggestedEndpointUri)) {
|
||||
message = String.format("%s: '%s'", message, suggestedEndpointUri);
|
||||
} else {
|
||||
message += ".";
|
||||
}
|
||||
throw new IngestionClientException(message);
|
||||
}
|
||||
}
|
||||
|
||||
protected String generateEndpointSuggestion(String existingSuggestedEndpointUri, String dataSource) {
|
||||
if (existingSuggestedEndpointUri != null) {
|
||||
return existingSuggestedEndpointUri;
|
||||
}
|
||||
// The default is not passing a suggestion to the exception
|
||||
String endpointUriToSuggestStr = "";
|
||||
if (StringUtils.isNotBlank(dataSource)) {
|
||||
URIBuilder existingEndpoint;
|
||||
try {
|
||||
existingEndpoint = new URIBuilder(dataSource);
|
||||
endpointUriToSuggestStr = emendEndpointUri(existingEndpoint);
|
||||
} catch (URISyntaxException e) {
|
||||
log.error("Couldn't parse dataSource '{}', so no suggestion can be made.", dataSource, e);
|
||||
}
|
||||
}
|
||||
|
||||
return endpointUriToSuggestStr;
|
||||
}
|
||||
|
||||
protected abstract String retrieveServiceType() throws IngestionServiceException, IngestionClientException;
|
||||
|
||||
protected abstract String emendEndpointUri(URIBuilder existingEndpoint);
|
||||
}
|
|
@ -14,7 +14,6 @@ import com.microsoft.azure.kusto.ingest.source.*;
|
|||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.CloudBlockBlob;
|
||||
import com.univocity.parsers.csv.CsvRoutines;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.http.client.utils.URIBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -25,24 +24,18 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.net.URISyntaxException;
|
||||
import java.sql.Date;
|
||||
import java.time.Instant;
|
||||
import java.util.Date;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
public class QueuedIngestClient implements IngestClient {
|
||||
public class QueuedIngestClient extends IngestClientBase implements IngestClient {
|
||||
|
||||
private final static Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
private static final int COMPRESSED_FILE_MULTIPLIER = 11;
|
||||
private final ResourceManager resourceManager;
|
||||
private final AzureStorageClient azureStorageClient;
|
||||
private String connectionDataSource;
|
||||
private String endpointServiceType;
|
||||
private String suggestedEndpointUri;
|
||||
public static final String INGEST_PREFIX = "ingest-";
|
||||
protected static final String WRONG_ENDPOINT_MESSAGE =
|
||||
"You are using '%s' client type, but the provided endpoint is of ServiceType '%s'. Initialize the client with the appropriate endpoint URI";
|
||||
public static final String EXPECTED_SERVICE_TYPE = "DataManagement";
|
||||
|
||||
QueuedIngestClient(ConnectionStringBuilder csb) throws URISyntaxException {
|
||||
|
@ -130,7 +123,7 @@ public class QueuedIngestClient implements IngestClient {
|
|||
} catch (IOException | URISyntaxException e) {
|
||||
throw new IngestionClientException("Failed to ingest from blob", e);
|
||||
} catch (IngestionServiceException e) {
|
||||
validateEndpointServiceType();
|
||||
validateEndpointServiceType(connectionDataSource, EXPECTED_SERVICE_TYPE);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -173,7 +166,7 @@ public class QueuedIngestClient implements IngestClient {
|
|||
} catch (IOException | URISyntaxException e) {
|
||||
throw new IngestionClientException("Failed to ingest from file", e);
|
||||
} catch (IngestionServiceException e) {
|
||||
validateEndpointServiceType();
|
||||
validateEndpointServiceType(connectionDataSource, EXPECTED_SERVICE_TYPE);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -192,7 +185,7 @@ public class QueuedIngestClient implements IngestClient {
|
|||
IngestionResult ingestionResult;
|
||||
if (streamSourceInfo.getStream() == null) {
|
||||
throw new IngestionClientException("The provided stream is null.");
|
||||
} else if(streamSourceInfo.getStream().available() <= 0) {
|
||||
} else if (streamSourceInfo.getStream().available() <= 0) {
|
||||
throw new IngestionClientException("The provided stream is empty.");
|
||||
}
|
||||
boolean shouldCompress = AzureStorageClient.shouldCompress(streamSourceInfo.getCompressionType(), ingestionProperties.getDataFormat());
|
||||
|
@ -224,7 +217,7 @@ public class QueuedIngestClient implements IngestClient {
|
|||
} catch (StorageException e) {
|
||||
throw new IngestionServiceException("Failed to ingest from stream", e);
|
||||
} catch (IngestionServiceException e) {
|
||||
validateEndpointServiceType();
|
||||
validateEndpointServiceType(connectionDataSource, EXPECTED_SERVICE_TYPE);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -272,42 +265,14 @@ public class QueuedIngestClient implements IngestClient {
|
|||
}
|
||||
}
|
||||
|
||||
protected void validateEndpointServiceType() throws IngestionServiceException, IngestionClientException {
|
||||
if (StringUtils.isBlank(endpointServiceType)) {
|
||||
endpointServiceType = retrieveServiceType();
|
||||
}
|
||||
if (!EXPECTED_SERVICE_TYPE.equals(endpointServiceType)) {
|
||||
String message = String.format(WRONG_ENDPOINT_MESSAGE, EXPECTED_SERVICE_TYPE, endpointServiceType);
|
||||
suggestedEndpointUri = generateEndpointSuggestion(suggestedEndpointUri, connectionDataSource);
|
||||
if (StringUtils.isNotBlank(suggestedEndpointUri)) {
|
||||
message = String.format("%s: '%s'", message, suggestedEndpointUri);
|
||||
} else {
|
||||
message += ".";
|
||||
}
|
||||
throw new IngestionClientException(message);
|
||||
}
|
||||
@Override
|
||||
protected String emendEndpointUri(URIBuilder existingEndpoint) {
|
||||
existingEndpoint.setHost(INGEST_PREFIX + existingEndpoint.getHost());
|
||||
return existingEndpoint.toString();
|
||||
}
|
||||
|
||||
protected static String generateEndpointSuggestion(String existingSuggestedEndpointUri, String dataSource) {
|
||||
if (existingSuggestedEndpointUri != null) {
|
||||
return existingSuggestedEndpointUri;
|
||||
}
|
||||
// The default is not passing a suggestion to the exception
|
||||
String endpointUriToSuggestStr = "";
|
||||
if (StringUtils.isNotBlank(dataSource)) {
|
||||
URIBuilder endpointUriToSuggest;
|
||||
try {
|
||||
endpointUriToSuggest = new URIBuilder(dataSource);
|
||||
endpointUriToSuggest.setHost(INGEST_PREFIX + endpointUriToSuggest.getHost());
|
||||
endpointUriToSuggestStr = endpointUriToSuggest.toString();
|
||||
} catch (URISyntaxException e) {
|
||||
log.error("Couldn't parse dataSource '{}', so no suggestion can be made.", dataSource, e);
|
||||
}
|
||||
}
|
||||
return endpointUriToSuggestStr;
|
||||
}
|
||||
|
||||
private String retrieveServiceType() throws IngestionServiceException, IngestionClientException {
|
||||
@Override
|
||||
protected String retrieveServiceType() throws IngestionServiceException, IngestionClientException {
|
||||
if (resourceManager != null) {
|
||||
return resourceManager.retrieveServiceType();
|
||||
}
|
||||
|
|
|
@ -63,7 +63,7 @@ class ResourceManager implements Closeable {
|
|||
private static final long REFRESH_INGESTION_RESOURCES_PERIOD_ON_FAILURE = 1000 * 60 * 15; // 15 minutes
|
||||
private Long defaultRefreshTime;
|
||||
private Long refreshTimeOnFailure;
|
||||
private static final String SERVICE_TYPE_COLUMN_NAME = "ServiceType";
|
||||
public static final String SERVICE_TYPE_COLUMN_NAME = "ServiceType";
|
||||
|
||||
ResourceManager(Client client, long defaultRefreshTime, long refreshTimeOnFailure) {
|
||||
this.defaultRefreshTime = defaultRefreshTime;
|
||||
|
@ -207,7 +207,7 @@ class ResourceManager implements Closeable {
|
|||
KustoOperationResult identityTokenResult = client.execute(Commands.IDENTITY_GET_COMMAND);
|
||||
if (identityTokenResult != null
|
||||
&& identityTokenResult.hasNext()
|
||||
&& identityTokenResult.getResultTables().size() > 0) {
|
||||
&& !identityTokenResult.getResultTables().isEmpty()) {
|
||||
KustoResultSetTable resultTable = identityTokenResult.next();
|
||||
resultTable.next();
|
||||
|
||||
|
@ -227,7 +227,7 @@ class ResourceManager implements Closeable {
|
|||
log.info("Getting version to determine endpoint's ServiceType");
|
||||
try {
|
||||
KustoOperationResult versionResult = client.execute(Commands.VERSION_SHOW_COMMAND);
|
||||
if (versionResult != null && versionResult.hasNext() && versionResult.getResultTables().size() > 0) {
|
||||
if (versionResult != null && versionResult.hasNext() && !versionResult.getResultTables().isEmpty()) {
|
||||
KustoResultSetTable resultTable = versionResult.next();
|
||||
resultTable.next();
|
||||
return resultTable.getString(SERVICE_TYPE_COLUMN_NAME);
|
||||
|
|
|
@ -6,14 +6,22 @@ package com.microsoft.azure.kusto.ingest;
|
|||
import com.microsoft.azure.kusto.data.*;
|
||||
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
|
||||
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
|
||||
import com.microsoft.azure.kusto.data.exceptions.DataWebException;
|
||||
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
|
||||
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
|
||||
import com.microsoft.azure.kusto.ingest.result.*;
|
||||
import com.microsoft.azure.kusto.ingest.source.*;
|
||||
import com.microsoft.azure.kusto.ingest.result.IngestionResult;
|
||||
import com.microsoft.azure.kusto.ingest.result.IngestionStatus;
|
||||
import com.microsoft.azure.kusto.ingest.result.IngestionStatusResult;
|
||||
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
|
||||
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
|
||||
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
|
||||
import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo;
|
||||
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.CloudBlockBlob;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.http.client.utils.URIBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -23,15 +31,17 @@ import java.net.URI;
|
|||
import java.net.URISyntaxException;
|
||||
import java.util.zip.GZIPOutputStream;
|
||||
|
||||
public class StreamingIngestClient implements IngestClient {
|
||||
public class StreamingIngestClient extends IngestClientBase implements IngestClient {
|
||||
|
||||
private final static Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
private final StreamingClient streamingClient;
|
||||
private static final int STREAM_COMPRESS_BUFFER_SIZE = 16 * 1024;
|
||||
public static final String EXPECTED_SERVICE_TYPE = "Engine";
|
||||
|
||||
StreamingIngestClient(ConnectionStringBuilder csb) throws URISyntaxException {
|
||||
log.info("Creating a new StreamingIngestClient");
|
||||
this.streamingClient = ClientFactory.createStreamingClient(csb);
|
||||
this.connectionDataSource = csb.getClusterUrl();
|
||||
}
|
||||
|
||||
StreamingIngestClient(StreamingClient streamingClient) {
|
||||
|
@ -123,6 +133,9 @@ public class StreamingIngestClient implements IngestClient {
|
|||
throw new IngestionClientException(e.getMessage(), e);
|
||||
} catch (DataServiceException e) {
|
||||
log.error(e.getMessage(), e);
|
||||
if (e.getCause() instanceof DataWebException && "Error in post request".equals(e.getMessage())) {
|
||||
validateEndpointServiceType(connectionDataSource, EXPECTED_SERVICE_TYPE);
|
||||
}
|
||||
throw new IngestionServiceException(e.getMessage(), e);
|
||||
}
|
||||
|
||||
|
@ -201,7 +214,40 @@ public class StreamingIngestClient implements IngestClient {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
protected String emendEndpointUri(URIBuilder existingEndpoint) {
|
||||
if (existingEndpoint.getHost().startsWith(IngestClientBase.INGEST_PREFIX)) {
|
||||
existingEndpoint.setHost(existingEndpoint.getHost().substring(IngestClientBase.INGEST_PREFIX.length()));
|
||||
return existingEndpoint.toString();
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String retrieveServiceType() throws IngestionServiceException, IngestionClientException {
|
||||
if (streamingClient != null) {
|
||||
log.info("Getting version to determine endpoint's ServiceType");
|
||||
try {
|
||||
KustoOperationResult versionResult = streamingClient.execute(Commands.VERSION_SHOW_COMMAND);
|
||||
if (versionResult != null && versionResult.hasNext() && !versionResult.getResultTables().isEmpty()) {
|
||||
KustoResultSetTable resultTable = versionResult.next();
|
||||
resultTable.next();
|
||||
return resultTable.getString(ResourceManager.SERVICE_TYPE_COLUMN_NAME);
|
||||
}
|
||||
} catch (DataServiceException e) {
|
||||
throw new IngestionServiceException(e.getIngestionSource(), "Couldn't retrieve ServiceType because of a service exception executing '.show version'", e);
|
||||
} catch (DataClientException e) {
|
||||
throw new IngestionClientException(e.getIngestionSource(), "Couldn't retrieve ServiceType because of a client exception executing '.show version'", e);
|
||||
}
|
||||
throw new IngestionServiceException("Couldn't retrieve ServiceType because '.show version' didn't return any records");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
protected void setConnectionDataSource(String connectionDataSource) {
|
||||
this.connectionDataSource = connectionDataSource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,9 +3,11 @@
|
|||
|
||||
package com.microsoft.azure.kusto.ingest;
|
||||
|
||||
import com.microsoft.azure.kusto.data.KustoOperationResult;
|
||||
import com.microsoft.azure.kusto.data.StreamingClient;
|
||||
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
|
||||
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
|
||||
import com.microsoft.azure.kusto.data.exceptions.DataWebException;
|
||||
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
|
||||
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
|
||||
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
|
||||
|
@ -14,7 +16,6 @@ import com.microsoft.azure.storage.blob.BlobInputStream;
|
|||
import com.microsoft.azure.storage.blob.BlobProperties;
|
||||
import com.microsoft.azure.storage.blob.CloudBlockBlob;
|
||||
|
||||
import org.apache.commons.codec.binary.Hex;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
@ -26,7 +27,6 @@ import java.io.ByteArrayInputStream;
|
|||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.InputStream;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
|
@ -35,7 +35,8 @@ import java.sql.ResultSetMetaData;
|
|||
import java.util.zip.GZIPInputStream;
|
||||
import java.util.zip.GZIPOutputStream;
|
||||
|
||||
import javax.xml.bind.DatatypeConverter;
|
||||
import static com.microsoft.azure.kusto.ingest.IngestClientBase.WRONG_ENDPOINT_MESSAGE;
|
||||
import static com.microsoft.azure.kusto.ingest.StreamingIngestClient.EXPECTED_SERVICE_TYPE;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
@ -51,6 +52,9 @@ class StreamingIngestClientTest {
|
|||
@Captor
|
||||
private static ArgumentCaptor<InputStream> argumentCaptor;
|
||||
|
||||
private static final String ENDPOINT_SERVICE_TYPE_DM = "DataManagement";
|
||||
|
||||
|
||||
@BeforeAll
|
||||
static void setUp() {
|
||||
streamingClientMock = mock(StreamingClient.class);
|
||||
|
@ -75,7 +79,7 @@ class StreamingIngestClientTest {
|
|||
InputStream inputStream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(data).array());
|
||||
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream);
|
||||
OperationStatus status = streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
|
||||
assertEquals(status, OperationStatus.Succeeded);
|
||||
assertEquals(OperationStatus.Succeeded, status);
|
||||
verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(),
|
||||
isNull(), any(String.class), isNull(), any(boolean.class));
|
||||
|
||||
|
@ -101,7 +105,7 @@ class StreamingIngestClientTest {
|
|||
// When ingesting compressed data, we should set this property true to avoid double compression.
|
||||
streamSourceInfo.setCompressionType(CompressionType.gz);
|
||||
OperationStatus status = streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
|
||||
assertEquals(status, OperationStatus.Succeeded);
|
||||
assertEquals(OperationStatus.Succeeded, status);
|
||||
verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(),
|
||||
isNull(), any(String.class), isNull(), any(boolean.class));
|
||||
|
||||
|
@ -117,7 +121,7 @@ class StreamingIngestClientTest {
|
|||
ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.json);
|
||||
ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.Json);
|
||||
OperationStatus status = streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
|
||||
assertEquals(status, OperationStatus.Succeeded);
|
||||
assertEquals(OperationStatus.Succeeded, status);
|
||||
verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(),
|
||||
isNull(), any(String.class), any(String.class), any(boolean.class));
|
||||
|
||||
|
@ -139,7 +143,7 @@ class StreamingIngestClientTest {
|
|||
streamSourceInfo.setCompressionType(CompressionType.gz);
|
||||
ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.Json);
|
||||
OperationStatus status = streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
|
||||
assertEquals(status, OperationStatus.Succeeded);
|
||||
assertEquals(OperationStatus.Succeeded, status);
|
||||
verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(),
|
||||
isNull(), any(String.class), any(String.class), any(boolean.class));
|
||||
|
||||
|
@ -298,11 +302,27 @@ class StreamingIngestClientTest {
|
|||
String path = resourcesDirectory + "testdata.csv";
|
||||
FileSourceInfo fileSourceInfo = new FileSourceInfo(path, new File(path).length());
|
||||
OperationStatus status = streamingIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
|
||||
assertEquals(status, OperationStatus.Succeeded);
|
||||
assertEquals(OperationStatus.Succeeded, status);
|
||||
verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), any(InputStream.class),
|
||||
isNull(), any(String.class), isNull(), any(boolean.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
void IngestFromFile_GivenStreamingIngestClientAndDmEndpoint_ThrowsIngestionClientException() throws Exception {
|
||||
DataServiceException dataServiceException = new DataServiceException("Error in post request", new DataWebException("Error in post request", null));
|
||||
doThrow(dataServiceException).when(streamingClientMock).executeStreamingIngest(eq(ingestionProperties.getDatabaseName()), eq(ingestionProperties.getTableName()), any(), isNull(), any(), isNull(), eq(false));
|
||||
when(streamingClientMock.execute(Commands.VERSION_SHOW_COMMAND)).thenReturn(new KustoOperationResult("{\"Tables\":[{\"TableName\":\"Table_0\",\"Columns\":[{\"ColumnName\":\"BuildVersion\",\"DataType\":\"String\"},{\"ColumnName\":\"BuildTime\",\"DataType\":\"DateTime\"},{\"ColumnName\":\"ServiceType\",\"DataType\":\"String\"},{\"ColumnName\":\"ProductVersion\",\"DataType\":\"String\"}],\"Rows\":[[\"1.0.0.0\",\"2000-01-01T00:00:00Z\",\"DataManagement\",\"PrivateBuild.yischoen.YISCHOEN-OP7070.2020-09-07 12-09-22\"]]}]}", "v1"));
|
||||
|
||||
streamingIngestClient.setConnectionDataSource("https://ingest-testendpoint.dev.kusto.windows.net");
|
||||
String resourcesDirectory = System.getProperty("user.dir") + "/src/test/resources/";
|
||||
String path = resourcesDirectory + "testdata.csv";
|
||||
FileSourceInfo fileSourceInfo = new FileSourceInfo(path, new File(path).length());
|
||||
String expectedMessage =
|
||||
String.format(WRONG_ENDPOINT_MESSAGE + ": '%s'", EXPECTED_SERVICE_TYPE, ENDPOINT_SERVICE_TYPE_DM, "https://testendpoint.dev.kusto.windows.net");
|
||||
Exception exception = assertThrows(IngestionClientException.class, () -> streamingIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties));
|
||||
assertEquals(expectedMessage, exception.getMessage());
|
||||
}
|
||||
|
||||
@Test
|
||||
void IngestFromFile_Json() throws Exception {
|
||||
String resourcesDirectory = System.getProperty("user.dir") + "/src/test/resources/";
|
||||
|
@ -313,7 +333,7 @@ class StreamingIngestClientTest {
|
|||
ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.json);
|
||||
ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.Json);
|
||||
OperationStatus status = streamingIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
|
||||
assertEquals(status, OperationStatus.Succeeded);
|
||||
assertEquals(OperationStatus.Succeeded, status);
|
||||
verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(),
|
||||
isNull(), any(String.class), any(String.class), any(boolean.class));
|
||||
|
||||
|
@ -328,7 +348,7 @@ class StreamingIngestClientTest {
|
|||
ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.json);
|
||||
ingestionProperties.setIngestionMapping("JsonMapping", IngestionMapping.IngestionMappingKind.Json);
|
||||
OperationStatus status = streamingIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
|
||||
assertEquals(status, OperationStatus.Succeeded);
|
||||
assertEquals(OperationStatus.Succeeded, status);
|
||||
verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(),
|
||||
isNull(), any(String.class), any(String.class), any(boolean.class));
|
||||
|
||||
|
@ -465,7 +485,7 @@ class StreamingIngestClientTest {
|
|||
when(cloudBlockBlob.openInputStream()).thenReturn(blobInputStream);
|
||||
|
||||
OperationStatus status = streamingIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties, cloudBlockBlob).getIngestionStatusCollection().get(0).status;
|
||||
assertEquals(status, OperationStatus.Succeeded);
|
||||
assertEquals(OperationStatus.Succeeded, status);
|
||||
verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), any(InputStream.class),
|
||||
isNull(), any(String.class), isNull(), any(boolean.class));
|
||||
}
|
||||
|
@ -599,7 +619,7 @@ class StreamingIngestClientTest {
|
|||
|
||||
ResultSetSourceInfo resultSetSourceInfo = new ResultSetSourceInfo(resultSet);
|
||||
OperationStatus status = streamingIngestClient.ingestFromResultSet(resultSetSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
|
||||
assertEquals(status, OperationStatus.Succeeded);
|
||||
assertEquals(OperationStatus.Succeeded, status);
|
||||
verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), argumentCaptor.capture(),
|
||||
isNull(), any(String.class), isNull(), any(boolean.class));
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче