diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/StreamingClient.java b/data/src/main/java/com/microsoft/azure/kusto/data/StreamingClient.java index 35cac638..5c355871 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/StreamingClient.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/StreamingClient.java @@ -28,4 +28,13 @@ public interface StreamingClient { * @throws DataServiceException An exception returned from the service */ KustoOperationResult executeStreamingIngest(String database, String table, InputStream stream, ClientRequestProperties properties, String streamFormat, String mappingName, boolean leaveOpen) throws DataServiceException, DataClientException; + + /** + *
Execute the provided command against the default database.
+ * + * @param command The command to execute + * @throws DataClientException An exception originating from a client activity + * @throws DataServiceException An exception returned from the service + */ + KustoOperationResult execute(String command) throws DataServiceException, DataClientException; } diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClientBase.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClientBase.java new file mode 100644 index 00000000..d4a7d4e8 --- /dev/null +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClientBase.java @@ -0,0 +1,60 @@ +package com.microsoft.azure.kusto.ingest; + +import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException; +import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.client.utils.URIBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; +import java.net.URISyntaxException; + +public abstract class IngestClientBase { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + protected String connectionDataSource; + private String endpointServiceType; + private String suggestedEndpointUri; + public static final String INGEST_PREFIX = "ingest-"; + protected static final String WRONG_ENDPOINT_MESSAGE = + "You are using '%s' client type, but the provided endpoint is of ServiceType '%s'. Initialize the client with the appropriate endpoint URI"; + + protected void validateEndpointServiceType(String connectionDataSource, String expectedServiceType) throws IngestionServiceException, IngestionClientException { + if (StringUtils.isBlank(endpointServiceType)) { + endpointServiceType = retrieveServiceType(); + } + if (!expectedServiceType.equals(endpointServiceType)) { + String message = String.format(WRONG_ENDPOINT_MESSAGE, expectedServiceType, endpointServiceType); + suggestedEndpointUri = generateEndpointSuggestion(suggestedEndpointUri, connectionDataSource); + if (StringUtils.isNotBlank(suggestedEndpointUri)) { + message = String.format("%s: '%s'", message, suggestedEndpointUri); + } else { + message += "."; + } + throw new IngestionClientException(message); + } + } + + protected String generateEndpointSuggestion(String existingSuggestedEndpointUri, String dataSource) { + if (existingSuggestedEndpointUri != null) { + return existingSuggestedEndpointUri; + } + // The default is not passing a suggestion to the exception + String endpointUriToSuggestStr = ""; + if (StringUtils.isNotBlank(dataSource)) { + URIBuilder existingEndpoint; + try { + existingEndpoint = new URIBuilder(dataSource); + endpointUriToSuggestStr = emendEndpointUri(existingEndpoint); + } catch (URISyntaxException e) { + log.error("Couldn't parse dataSource '{}', so no suggestion can be made.", dataSource, e); + } + } + + return endpointUriToSuggestStr; + } + + protected abstract String retrieveServiceType() throws IngestionServiceException, IngestionClientException; + + protected abstract String emendEndpointUri(URIBuilder existingEndpoint); +} \ No newline at end of file diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/QueuedIngestClient.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/QueuedIngestClient.java index 5da88f35..db461735 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/QueuedIngestClient.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/QueuedIngestClient.java @@ -14,7 +14,6 @@ import com.microsoft.azure.kusto.ingest.source.*; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.CloudBlockBlob; import com.univocity.parsers.csv.CsvRoutines; -import org.apache.commons.lang3.StringUtils; import org.apache.http.client.utils.URIBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,24 +24,18 @@ import java.io.File; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.net.URISyntaxException; -import java.sql.Date; import java.time.Instant; +import java.util.Date; import java.util.LinkedList; import java.util.List; import java.util.UUID; -public class QueuedIngestClient implements IngestClient { +public class QueuedIngestClient extends IngestClientBase implements IngestClient { - private final static Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final int COMPRESSED_FILE_MULTIPLIER = 11; private final ResourceManager resourceManager; private final AzureStorageClient azureStorageClient; - private String connectionDataSource; - private String endpointServiceType; - private String suggestedEndpointUri; - public static final String INGEST_PREFIX = "ingest-"; - protected static final String WRONG_ENDPOINT_MESSAGE = - "You are using '%s' client type, but the provided endpoint is of ServiceType '%s'. Initialize the client with the appropriate endpoint URI"; public static final String EXPECTED_SERVICE_TYPE = "DataManagement"; QueuedIngestClient(ConnectionStringBuilder csb) throws URISyntaxException { @@ -130,7 +123,7 @@ public class QueuedIngestClient implements IngestClient { } catch (IOException | URISyntaxException e) { throw new IngestionClientException("Failed to ingest from blob", e); } catch (IngestionServiceException e) { - validateEndpointServiceType(); + validateEndpointServiceType(connectionDataSource, EXPECTED_SERVICE_TYPE); throw e; } } @@ -173,7 +166,7 @@ public class QueuedIngestClient implements IngestClient { } catch (IOException | URISyntaxException e) { throw new IngestionClientException("Failed to ingest from file", e); } catch (IngestionServiceException e) { - validateEndpointServiceType(); + validateEndpointServiceType(connectionDataSource, EXPECTED_SERVICE_TYPE); throw e; } } @@ -192,7 +185,7 @@ public class QueuedIngestClient implements IngestClient { IngestionResult ingestionResult; if (streamSourceInfo.getStream() == null) { throw new IngestionClientException("The provided stream is null."); - } else if(streamSourceInfo.getStream().available() <= 0) { + } else if (streamSourceInfo.getStream().available() <= 0) { throw new IngestionClientException("The provided stream is empty."); } boolean shouldCompress = AzureStorageClient.shouldCompress(streamSourceInfo.getCompressionType(), ingestionProperties.getDataFormat()); @@ -224,7 +217,7 @@ public class QueuedIngestClient implements IngestClient { } catch (StorageException e) { throw new IngestionServiceException("Failed to ingest from stream", e); } catch (IngestionServiceException e) { - validateEndpointServiceType(); + validateEndpointServiceType(connectionDataSource, EXPECTED_SERVICE_TYPE); throw e; } } @@ -272,42 +265,14 @@ public class QueuedIngestClient implements IngestClient { } } - protected void validateEndpointServiceType() throws IngestionServiceException, IngestionClientException { - if (StringUtils.isBlank(endpointServiceType)) { - endpointServiceType = retrieveServiceType(); - } - if (!EXPECTED_SERVICE_TYPE.equals(endpointServiceType)) { - String message = String.format(WRONG_ENDPOINT_MESSAGE, EXPECTED_SERVICE_TYPE, endpointServiceType); - suggestedEndpointUri = generateEndpointSuggestion(suggestedEndpointUri, connectionDataSource); - if (StringUtils.isNotBlank(suggestedEndpointUri)) { - message = String.format("%s: '%s'", message, suggestedEndpointUri); - } else { - message += "."; - } - throw new IngestionClientException(message); - } + @Override + protected String emendEndpointUri(URIBuilder existingEndpoint) { + existingEndpoint.setHost(INGEST_PREFIX + existingEndpoint.getHost()); + return existingEndpoint.toString(); } - protected static String generateEndpointSuggestion(String existingSuggestedEndpointUri, String dataSource) { - if (existingSuggestedEndpointUri != null) { - return existingSuggestedEndpointUri; - } - // The default is not passing a suggestion to the exception - String endpointUriToSuggestStr = ""; - if (StringUtils.isNotBlank(dataSource)) { - URIBuilder endpointUriToSuggest; - try { - endpointUriToSuggest = new URIBuilder(dataSource); - endpointUriToSuggest.setHost(INGEST_PREFIX + endpointUriToSuggest.getHost()); - endpointUriToSuggestStr = endpointUriToSuggest.toString(); - } catch (URISyntaxException e) { - log.error("Couldn't parse dataSource '{}', so no suggestion can be made.", dataSource, e); - } - } - return endpointUriToSuggestStr; - } - - private String retrieveServiceType() throws IngestionServiceException, IngestionClientException { + @Override + protected String retrieveServiceType() throws IngestionServiceException, IngestionClientException { if (resourceManager != null) { return resourceManager.retrieveServiceType(); } diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ResourceManager.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ResourceManager.java index de6dbdae..0df91031 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ResourceManager.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ResourceManager.java @@ -63,7 +63,7 @@ class ResourceManager implements Closeable { private static final long REFRESH_INGESTION_RESOURCES_PERIOD_ON_FAILURE = 1000 * 60 * 15; // 15 minutes private Long defaultRefreshTime; private Long refreshTimeOnFailure; - private static final String SERVICE_TYPE_COLUMN_NAME = "ServiceType"; + public static final String SERVICE_TYPE_COLUMN_NAME = "ServiceType"; ResourceManager(Client client, long defaultRefreshTime, long refreshTimeOnFailure) { this.defaultRefreshTime = defaultRefreshTime; @@ -207,7 +207,7 @@ class ResourceManager implements Closeable { KustoOperationResult identityTokenResult = client.execute(Commands.IDENTITY_GET_COMMAND); if (identityTokenResult != null && identityTokenResult.hasNext() - && identityTokenResult.getResultTables().size() > 0) { + && !identityTokenResult.getResultTables().isEmpty()) { KustoResultSetTable resultTable = identityTokenResult.next(); resultTable.next(); @@ -227,7 +227,7 @@ class ResourceManager implements Closeable { log.info("Getting version to determine endpoint's ServiceType"); try { KustoOperationResult versionResult = client.execute(Commands.VERSION_SHOW_COMMAND); - if (versionResult != null && versionResult.hasNext() && versionResult.getResultTables().size() > 0) { + if (versionResult != null && versionResult.hasNext() && !versionResult.getResultTables().isEmpty()) { KustoResultSetTable resultTable = versionResult.next(); resultTable.next(); return resultTable.getString(SERVICE_TYPE_COLUMN_NAME); diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/StreamingIngestClient.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/StreamingIngestClient.java index c2f364d2..9a8fa6b3 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/StreamingIngestClient.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/StreamingIngestClient.java @@ -6,14 +6,22 @@ package com.microsoft.azure.kusto.ingest; import com.microsoft.azure.kusto.data.*; import com.microsoft.azure.kusto.data.exceptions.DataClientException; import com.microsoft.azure.kusto.data.exceptions.DataServiceException; +import com.microsoft.azure.kusto.data.exceptions.DataWebException; import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException; import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException; -import com.microsoft.azure.kusto.ingest.result.*; -import com.microsoft.azure.kusto.ingest.source.*; +import com.microsoft.azure.kusto.ingest.result.IngestionResult; +import com.microsoft.azure.kusto.ingest.result.IngestionStatus; +import com.microsoft.azure.kusto.ingest.result.IngestionStatusResult; +import com.microsoft.azure.kusto.ingest.result.OperationStatus; +import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo; +import com.microsoft.azure.kusto.ingest.source.FileSourceInfo; +import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo; +import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.CloudBlockBlob; import org.apache.commons.lang3.StringUtils; +import org.apache.http.client.utils.URIBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,15 +31,17 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.zip.GZIPOutputStream; -public class StreamingIngestClient implements IngestClient { +public class StreamingIngestClient extends IngestClientBase implements IngestClient { - private final static Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final StreamingClient streamingClient; private static final int STREAM_COMPRESS_BUFFER_SIZE = 16 * 1024; + public static final String EXPECTED_SERVICE_TYPE = "Engine"; StreamingIngestClient(ConnectionStringBuilder csb) throws URISyntaxException { log.info("Creating a new StreamingIngestClient"); this.streamingClient = ClientFactory.createStreamingClient(csb); + this.connectionDataSource = csb.getClusterUrl(); } StreamingIngestClient(StreamingClient streamingClient) { @@ -123,6 +133,9 @@ public class StreamingIngestClient implements IngestClient { throw new IngestionClientException(e.getMessage(), e); } catch (DataServiceException e) { log.error(e.getMessage(), e); + if (e.getCause() instanceof DataWebException && "Error in post request".equals(e.getMessage())) { + validateEndpointServiceType(connectionDataSource, EXPECTED_SERVICE_TYPE); + } throw new IngestionServiceException(e.getMessage(), e); } @@ -201,7 +214,40 @@ public class StreamingIngestClient implements IngestClient { } @Override - public void close() { + protected String emendEndpointUri(URIBuilder existingEndpoint) { + if (existingEndpoint.getHost().startsWith(IngestClientBase.INGEST_PREFIX)) { + existingEndpoint.setHost(existingEndpoint.getHost().substring(IngestClientBase.INGEST_PREFIX.length())); + return existingEndpoint.toString(); + } + return ""; + } + @Override + protected String retrieveServiceType() throws IngestionServiceException, IngestionClientException { + if (streamingClient != null) { + log.info("Getting version to determine endpoint's ServiceType"); + try { + KustoOperationResult versionResult = streamingClient.execute(Commands.VERSION_SHOW_COMMAND); + if (versionResult != null && versionResult.hasNext() && !versionResult.getResultTables().isEmpty()) { + KustoResultSetTable resultTable = versionResult.next(); + resultTable.next(); + return resultTable.getString(ResourceManager.SERVICE_TYPE_COLUMN_NAME); + } + } catch (DataServiceException e) { + throw new IngestionServiceException(e.getIngestionSource(), "Couldn't retrieve ServiceType because of a service exception executing '.show version'", e); + } catch (DataClientException e) { + throw new IngestionClientException(e.getIngestionSource(), "Couldn't retrieve ServiceType because of a client exception executing '.show version'", e); + } + throw new IngestionServiceException("Couldn't retrieve ServiceType because '.show version' didn't return any records"); + } + return null; + } + + protected void setConnectionDataSource(String connectionDataSource) { + this.connectionDataSource = connectionDataSource; + } + + @Override + public void close() { } } diff --git a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/StreamingIngestClientTest.java b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/StreamingIngestClientTest.java index e7712e13..340279fb 100644 --- a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/StreamingIngestClientTest.java +++ b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/StreamingIngestClientTest.java @@ -3,9 +3,11 @@ package com.microsoft.azure.kusto.ingest; +import com.microsoft.azure.kusto.data.KustoOperationResult; import com.microsoft.azure.kusto.data.StreamingClient; import com.microsoft.azure.kusto.data.exceptions.DataClientException; import com.microsoft.azure.kusto.data.exceptions.DataServiceException; +import com.microsoft.azure.kusto.data.exceptions.DataWebException; import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException; import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException; import com.microsoft.azure.kusto.ingest.result.OperationStatus; @@ -14,7 +16,6 @@ import com.microsoft.azure.storage.blob.BlobInputStream; import com.microsoft.azure.storage.blob.BlobProperties; import com.microsoft.azure.storage.blob.CloudBlockBlob; -import org.apache.commons.codec.binary.Hex; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -26,7 +27,6 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.InputStream; -import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; @@ -35,7 +35,8 @@ import java.sql.ResultSetMetaData; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; -import javax.xml.bind.DatatypeConverter; +import static com.microsoft.azure.kusto.ingest.IngestClientBase.WRONG_ENDPOINT_MESSAGE; +import static com.microsoft.azure.kusto.ingest.StreamingIngestClient.EXPECTED_SERVICE_TYPE; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; @@ -51,6 +52,9 @@ class StreamingIngestClientTest { @Captor private static ArgumentCaptor