Streaming ingest from blob storage (#288)

* working and nice

* fix tests

* format

* revert samples

* test streaming from blob

* revert revert test

* test

* format

* fix test

* revert

---------

Co-authored-by: Ohad Bitton <ohbitton@microsoft.com>
This commit is contained in:
ohad bitton 2023-04-17 13:38:38 +03:00 коммит произвёл GitHub
Родитель e478801857
Коммит 01b608e6e7
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
12 изменённых файлов: 295 добавлений и 102 удалений

Просмотреть файл

@ -18,6 +18,10 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.ParseException;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.AbstractHttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import java.io.IOException;
@ -142,18 +146,19 @@ class ClientImpl implements Client, StreamingClient {
String clusterEndpoint = String.format(commandType.getEndpoint(), clusterUrl);
Map<String, String> headers;
headers = generateIngestAndCommandHeaders(properties, "KJC.execute",
commandType.getActivityTypeSuffix());
addCommandHeaders(headers);
String jsonPayload = generateCommandPayload(database, command, properties, clusterEndpoint);
try {
headers = generateIngestAndCommandHeaders(properties, "KJC.execute",
commandType.getActivityTypeSuffix());
validateEndpoint();
} catch (KustoClientInvalidConnectionStringException e) {
throw new DataClientException(clusterUrl, e.getMessage(), e);
}
return Utils.post(httpClient, clusterEndpoint, jsonPayload, null, timeoutMs + CLIENT_SERVER_DELTA_IN_MILLISECS, headers, false);
addCommandHeaders(headers);
String jsonPayload = generateCommandPayload(database, command, properties);
StringEntity requestEntity = new StringEntity(jsonPayload, ContentType.APPLICATION_JSON);
return Utils.post(httpClient, clusterEndpoint, requestEntity, timeoutMs + CLIENT_SERVER_DELTA_IN_MILLISECS, headers);
}
private void validateEndpoint() throws DataServiceException, KustoClientInvalidConnectionStringException {
@ -171,24 +176,52 @@ class ClientImpl implements Client, StreamingClient {
if (stream == null) {
throw new IllegalArgumentException("The provided stream is null.");
}
if (StringUtils.isBlank(database)) {
throw new IllegalArgumentException("Parameter database is empty.");
}
if (StringUtils.isBlank(table)) {
throw new IllegalArgumentException("Parameter table is empty.");
}
if (StringUtils.isBlank(streamFormat)) {
throw new IllegalArgumentException("Parameter streamFormat is empty.");
}
String clusterEndpoint = String.format(CommandType.STREAMING_INGEST.getEndpoint(), clusterUrl, database, table, streamFormat);
if (!StringUtils.isEmpty(mappingName)) {
clusterEndpoint = clusterEndpoint.concat(String.format("&mappingName=%s", mappingName));
String clusterEndpoint = buildClusterEndpoint(database, table, streamFormat, mappingName);
return executeStreamingIngestImpl(clusterEndpoint, stream, null, properties, leaveOpen);
}
@Override
public KustoOperationResult executeStreamingIngestFromBlob(String database, String table, String blobUrl, ClientRequestProperties properties,
String dataFormat, String mappingName)
throws DataServiceException, DataClientException {
if (blobUrl == null) {
throw new IllegalArgumentException("The provided blobUrl is null.");
}
Map<String, String> headers;
headers = generateIngestAndCommandHeaders(properties, "KJC.executeStreamingIngest",
String clusterEndpoint = buildClusterEndpoint(database, table, dataFormat, mappingName)
.concat("&sourceKind=uri");
return executeStreamingIngestImpl(clusterEndpoint, null, blobUrl, properties, false);
}
private KustoOperationResult executeStreamingIngestImpl(String clusterEndpoint, InputStream stream, String blobUrl, ClientRequestProperties properties,
boolean leaveOpen)
throws DataServiceException, DataClientException {
boolean isStreamSource = stream != null;
Map<String, String> headers = generateIngestAndCommandHeaders(properties,
"KJC.executeStreamingIngest" + (isStreamSource ? "" : "FromBlob"),
CommandType.STREAMING_INGEST.getActivityTypeSuffix());
if (isStreamSource) {
headers.put(HttpHeaders.CONTENT_ENCODING, "gzip");
}
Long timeoutMs = populateHeadersAndGetTimeout(properties, headers);
try (InputStream ignored = (isStreamSource && !leaveOpen) ? stream : null) {
validateEndpoint();
// We use UncloseableStream to prevent HttpClient From closing it
AbstractHttpEntity entity = isStreamSource ? new InputStreamEntity(new UncloseableStream(stream))
: new StringEntity(new IngestionSourceStorage(blobUrl).toString(), ContentType.APPLICATION_JSON);
String response = Utils.post(httpClient, clusterEndpoint, entity, timeoutMs + CLIENT_SERVER_DELTA_IN_MILLISECS, headers);
return new KustoOperationResult(response, "v1");
} catch (KustoServiceQueryError e) {
throw new DataClientException(clusterEndpoint, "Error converting json response to KustoOperationResult:" + e.getMessage(), e);
} catch (KustoClientInvalidConnectionStringException | IOException e) {
throw new DataClientException(clusterUrl, e.getMessage(), e);
}
}
private Long populateHeadersAndGetTimeout(ClientRequestProperties properties, Map<String, String> headers) throws DataClientException {
Long timeoutMs = null;
if (properties != null) {
timeoutMs = determineTimeout(properties, CommandType.STREAMING_INGEST, clusterUrl);
@ -198,22 +231,29 @@ class ClientImpl implements Client, StreamingClient {
headers.put(pair.getKey(), pair.getValue().toString());
}
}
headers.put(HttpHeaders.CONTENT_ENCODING, "gzip");
if (timeoutMs == null) {
timeoutMs = STREAMING_INGEST_TIMEOUT_IN_MILLISECS;
}
try {
validateEndpoint();
String response = Utils.post(httpClient, clusterEndpoint, null, stream, timeoutMs + CLIENT_SERVER_DELTA_IN_MILLISECS, headers, leaveOpen);
return new KustoOperationResult(response, "v1");
} catch (KustoServiceQueryError e) {
throw new DataClientException(clusterEndpoint, "Error converting json response to KustoOperationResult:" + e.getMessage(), e);
} catch (KustoClientInvalidConnectionStringException e) {
throw new DataClientException(clusterUrl, e.getMessage(), e);
return timeoutMs;
}
private String buildClusterEndpoint(String database, String table, String format, String mappingName) {
if (StringUtils.isBlank(database)) {
throw new IllegalArgumentException("Parameter database is empty.");
}
if (StringUtils.isBlank(table)) {
throw new IllegalArgumentException("Parameter table is empty.");
}
if (StringUtils.isBlank(format)) {
throw new IllegalArgumentException("Parameter format is empty.");
}
String clusterEndpoint = String.format(CommandType.STREAMING_INGEST.getEndpoint(), clusterUrl, database, table, format);
if (!StringUtils.isEmpty(mappingName)) {
clusterEndpoint = clusterEndpoint.concat(String.format("&mappingName=%s", mappingName));
}
return clusterEndpoint;
}
@Override
@ -245,7 +285,7 @@ class ClientImpl implements Client, StreamingClient {
commandType.getActivityTypeSuffix());
addCommandHeaders(headers);
String jsonPayload = generateCommandPayload(database, command, properties, clusterEndpoint);
String jsonPayload = generateCommandPayload(database, command, properties);
try {
validateEndpoint();
@ -257,7 +297,7 @@ class ClientImpl implements Client, StreamingClient {
}
private long determineTimeout(ClientRequestProperties properties, CommandType commandType, String clusterUrl) throws DataClientException {
Long timeoutMs = null;
Long timeoutMs;
try {
timeoutMs = properties == null ? null : properties.getTimeoutInMilliSec();
} catch (ParseException e) {
@ -333,7 +373,7 @@ class ClientImpl implements Client, StreamingClient {
return headers;
}
private String generateCommandPayload(String database, String command, ClientRequestProperties properties, String clusterEndpoint) {
private String generateCommandPayload(String database, String command, ClientRequestProperties properties) {
ObjectNode json = objectMapper.createObjectNode()
.put("db", database)

Просмотреть файл

@ -0,0 +1,21 @@
package com.microsoft.azure.kusto.data;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
class IngestionSourceStorage {
public String sourceUri;
public IngestionSourceStorage(String uri) {
sourceUri = uri;
}
public String toString() {
ObjectMapper objectMapper = Utils.getObjectMapper();
try {
return objectMapper.writeValueAsString(this);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}

Просмотреть файл

@ -58,4 +58,7 @@ public interface StreamingClient extends Closeable {
* @throws DataServiceException An exception returned from the service
*/
KustoOperationResult execute(String command) throws DataServiceException, DataClientException;
KustoOperationResult executeStreamingIngestFromBlob(String databaseName, String tableName, String blobUrl, ClientRequestProperties clientRequestProperties,
String dataFormat, String ingestionMappingReference) throws DataServiceException, DataClientException;
}

Просмотреть файл

@ -25,6 +25,7 @@ import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.EofSensorInputStream;
import org.apache.http.entity.AbstractHttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.entity.StringEntity;
@ -67,13 +68,13 @@ public class Utils {
// Hide constructor, as this is a static utility class
}
static String post(CloseableHttpClient httpClient, String urlStr, String payload, InputStream stream, long timeoutMs, Map<String, String> headers,
boolean leaveOpen)
static String post(CloseableHttpClient httpClient, String urlStr, AbstractHttpEntity requestEntity, long timeoutMs,
Map<String, String> headers)
throws DataServiceException, DataClientException {
URI url = parseUriFromUrlString(urlStr);
try (InputStream ignored = (stream != null && !leaveOpen) ? stream : null) {
HttpPost request = setupHttpPostRequest(url, payload, stream, headers);
try {
HttpPost request = setupHttpPostRequest(url, requestEntity, headers);
int requestTimeout = timeoutMs > Integer.MAX_VALUE ? Integer.MAX_VALUE : Math.toIntExact(timeoutMs);
RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(requestTimeout).build();
request.setConfig(requestConfig);
@ -124,7 +125,8 @@ public class Utils {
*/
CloseableHttpResponse httpResponse = null;
try {
HttpPost httpPost = setupHttpPostRequest(uri, payload, null, headers);
StringEntity requestEntity = new StringEntity(payload, ContentType.APPLICATION_JSON);
HttpPost httpPost = setupHttpPostRequest(uri, requestEntity, headers);
int requestTimeout = Math.toIntExact(timeoutTimeMs - System.currentTimeMillis());
RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(requestTimeout).build();
httpPost.setConfig(requestConfig);
@ -243,12 +245,10 @@ public class Utils {
return activityId;
}
private static HttpPost setupHttpPostRequest(URI uri, String payload, InputStream stream, Map<String, String> headers) {
private static HttpPost setupHttpPostRequest(URI uri, AbstractHttpEntity requestEntity, Map<String, String> headers) {
HttpPost request = new HttpPost(uri);
// Request parameters and other properties. We use UncloseableStream to prevent HttpClient From closing it
HttpEntity requestEntity = (stream == null) ? new StringEntity(payload, ContentType.APPLICATION_JSON)
: new InputStreamEntity(new UncloseableStream(stream));
// Request parameters and other properties
request.setEntity(requestEntity);
request.addHeader(HttpHeaders.ACCEPT_ENCODING, "gzip,deflate");

Просмотреть файл

@ -1,5 +1,8 @@
package com.microsoft.azure.kusto.ingest;
import com.azure.core.http.HttpClient;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobClientBuilder;
import com.microsoft.azure.kusto.data.Ensure;
import com.microsoft.azure.kusto.data.HttpClientProperties;
import com.microsoft.azure.kusto.data.StreamingClient;
@ -39,7 +42,6 @@ import java.util.UUID;
* It tries {@value ATTEMPT_COUNT} times using the streaming client, after which it falls back to the queued streaming client in case of failure.
* If the size of the stream is bigger than {@value MAX_STREAMING_SIZE_BYTES}, it will fall back to the queued streaming client.
* <p>
* Note that {@code ingestFromBlob} behaves differently from the other methods - since a blob already exists it makes more sense to enqueue it rather than downloading and streaming it, thus ManagedStreamingIngestClient skips the streaming retries and sends it directly to the queued client.
*/
public class ManagedStreamingIngestClient implements IngestClient {
@ -49,29 +51,30 @@ public class ManagedStreamingIngestClient implements IngestClient {
final QueuedIngestClientImpl queuedIngestClient;
final StreamingIngestClient streamingIngestClient;
private final ExponentialRetry exponentialRetryTemplate;
private CloseableHttpClient httpClient = null;
/**
* @param dmConnectionString dm connection string
* @return a new ManagedStreamingIngestClient
* @throws URISyntaxException if the connection string is invalid
* @deprecated - Ingest clients now automatically deduce the endpoint, use {@link #ManagedStreamingIngestClient(ConnectionStringBuilder, HttpClientProperties)} instead.
* Creates a new ManagedStreamingIngestClient from a DM connection string, with default http client properties.
* This method infers the engine connection string from the DM connection string.
* For advanced usage, use {@link ManagedStreamingIngestClient#ManagedStreamingIngestClient(ConnectionStringBuilder, ConnectionStringBuilder)}
* @param dmConnectionString dm connection string
* @return a new ManagedStreamingIngestClient
* @throws URISyntaxException if the connection string is invalid
*/
public static ManagedStreamingIngestClient fromDmConnectionString(ConnectionStringBuilder dmConnectionString) throws URISyntaxException {
return fromDmConnectionString(dmConnectionString, (HttpClientProperties) null);
}
/**
* @param dmConnectionString dm connection string
* @param properties additional properties to configure the http client
* @return a new ManagedStreamingIngestClient
* @throws URISyntaxException if the connection string is invalid
* @deprecated - Ingest clients now automatically deduce the endpoint, use {@link #ManagedStreamingIngestClient(ConnectionStringBuilder, HttpClientProperties)} instead.
* Creates a new ManagedStreamingIngestClient from a DM connection string.
* This method infers the engine connection string from the DM connection string.
* For advanced usage, use {@link ManagedStreamingIngestClient#ManagedStreamingIngestClient(ConnectionStringBuilder, ConnectionStringBuilder)}
* @param dmConnectionString dm connection string
* @param properties additional properties to configure the http client
* @return a new ManagedStreamingIngestClient
* @throws URISyntaxException if the connection string is invalid
*/
public static ManagedStreamingIngestClient fromDmConnectionString(ConnectionStringBuilder dmConnectionString,
@Nullable HttpClientProperties properties)
@ -82,27 +85,27 @@ public class ManagedStreamingIngestClient implements IngestClient {
}
/**
* @param engineConnectionString engine connection string
* @return a new ManagedStreamingIngestClient
* @throws URISyntaxException if the connection string is invalid
* @deprecated - Ingest clients now automatically deduce the endpoint, use {@link #ManagedStreamingIngestClient(ConnectionStringBuilder, HttpClientProperties)} instead.
* Creates a new ManagedStreamingIngestClient from an engine connection string, with default http client properties.
* This method infers the DM connection string from the engine connection string.
* For advanced usage, use {@link ManagedStreamingIngestClient#ManagedStreamingIngestClient(ConnectionStringBuilder, ConnectionStringBuilder)}
* @param engineConnectionString engine connection string
* @return a new ManagedStreamingIngestClient
* @throws URISyntaxException if the connection string is invalid
*/
public static ManagedStreamingIngestClient fromEngineConnectionString(ConnectionStringBuilder engineConnectionString) throws URISyntaxException {
return fromEngineConnectionString(engineConnectionString, null);
}
/**
* @param engineConnectionString engine connection string
* @param properties additional properties to configure the http client
* @return a new ManagedStreamingIngestClient
* @throws URISyntaxException if the connection string is invalid
* @deprecated - Ingest clients now automatically deduce the endpoint, use {@link #ManagedStreamingIngestClient(ConnectionStringBuilder, HttpClientProperties)} instead.
* Creates a new ManagedStreamingIngestClient from an engine connection string.
* This method infers the DM connection string from the engine connection string.
* For advanced usage, use {@link ManagedStreamingIngestClient#ManagedStreamingIngestClient(ConnectionStringBuilder, ConnectionStringBuilder)}
* @param engineConnectionString engine connection string
* @param properties additional properties to configure the http client
* @return a new ManagedStreamingIngestClient
* @throws URISyntaxException if the connection string is invalid
*/
public static ManagedStreamingIngestClient fromEngineConnectionString(ConnectionStringBuilder engineConnectionString,
@Nullable HttpClientProperties properties)
@ -113,12 +116,12 @@ public class ManagedStreamingIngestClient implements IngestClient {
}
/**
* @param ingestionEndpointConnectionStringBuilder - Endpoint for ingesting data, usually starts with "https://ingest-"
* @param queryEndpointConnectionStringBuilder - Endpoint for querying data, does not include "ingest-"
* @throws URISyntaxException if the connection string is invalid
* @deprecated - This method is slated to be private. Use
* {@link IngestClientFactory#createManagedStreamingIngestClient(ConnectionStringBuilder, ConnectionStringBuilder)}
* instead.
* @param ingestionEndpointConnectionStringBuilder - Endpoint for ingesting data, usually starts with "https://ingest-"
* @param queryEndpointConnectionStringBuilder - Endpoint for querying data, does not include "ingest-"
* @throws URISyntaxException if the connection string is invalid
*/
public ManagedStreamingIngestClient(ConnectionStringBuilder ingestionEndpointConnectionStringBuilder,
ConnectionStringBuilder queryEndpointConnectionStringBuilder) throws URISyntaxException {
@ -126,14 +129,14 @@ public class ManagedStreamingIngestClient implements IngestClient {
}
/**
* @param ingestionEndpointConnectionStringBuilder - Endpoint for ingesting data, usually starts with "https://ingest-"
* @param queryEndpointConnectionStringBuilder - Endpoint for querying data, does not include "ingest-"
* @param properties - Additional properties to configure the http client
* @throws URISyntaxException if the connection string is invalid
* @deprecated - This method is slated to be private. Use
* {@link IngestClientFactory#createManagedStreamingIngestClient(ConnectionStringBuilder, ConnectionStringBuilder, HttpClientProperties)} instead.
* This constructor should only be used for advanced cases. If your endpoints are standard, or you do not know, use
* {@link #ManagedStreamingIngestClient(ConnectionStringBuilder, HttpClientProperties)})} instead.
* @param ingestionEndpointConnectionStringBuilder - Endpoint for ingesting data, usually starts with "https://ingest-"
* @param queryEndpointConnectionStringBuilder - Endpoint for querying data, does not include "ingest-"
* @param properties - Additional properties to configure the http client
* @throws URISyntaxException if the connection string is invalid
*/
public ManagedStreamingIngestClient(ConnectionStringBuilder ingestionEndpointConnectionStringBuilder,
ConnectionStringBuilder queryEndpointConnectionStringBuilder,
@ -157,15 +160,16 @@ public class ManagedStreamingIngestClient implements IngestClient {
log.info("Creating a new ManagedStreamingIngestClient from connection strings");
queuedIngestClient = new QueuedIngestClientImpl(connectionStringBuilder, httpClient);
streamingIngestClient = new StreamingIngestClient(connectionStringBuilder, httpClient);
this.httpClient = httpClient;
exponentialRetryTemplate = new ExponentialRetry(ATTEMPT_COUNT);
}
/**
* @param resourceManager ingestion resources manager
* @param storageClient - storage utilities
* @param streamingClient - the streaming client
* @deprecated - This method is slated to be private. Use
* {@link IngestClientFactory#createManagedStreamingIngestClient(ConnectionStringBuilder)} instead.
* @param resourceManager ingestion resources manager
* @param storageClient - storage utilities
* @param streamingClient - the streaming client
*/
public ManagedStreamingIngestClient(ResourceManager resourceManager,
AzureStorageClient storageClient,
@ -206,7 +210,6 @@ public class ManagedStreamingIngestClient implements IngestClient {
/**
* {@inheritDoc}
* <p>
* This method behaves differently from the rest for {@link ManagedStreamingIngestClient} - since a blob already exists it makes more sense to enqueue it rather than downloading and streaming it, thus ManagedStreamingIngestClient skips the streaming retries and sends it directly to the queued client.</p>
*/
@Override
public IngestionResult ingestFromBlob(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties)
@ -217,7 +220,55 @@ public class ManagedStreamingIngestClient implements IngestClient {
blobSourceInfo.validate();
ingestionProperties.validate();
// If it's a blob we ingest using the queued client
UUID sourceId = blobSourceInfo.getSourceId();
if (sourceId == null) {
sourceId = UUID.randomUUID();
}
BlobClientBuilder blobClientBuilder = new BlobClientBuilder().endpoint(blobSourceInfo.getBlobPath());
if (httpClient != null) {
blobClientBuilder.httpClient((HttpClient) httpClient);
}
BlobClient blobClient = blobClientBuilder.buildClient();
if (blobSourceInfo.getRawSizeInBytes() <= 0) {
blobSourceInfo.setRawSizeInBytes(blobClient.getProperties().getBlobSize());
}
if (blobSourceInfo.getRawSizeInBytes() > MAX_STREAMING_SIZE_BYTES) {
log.info("Blob size is greater than max streaming size ({} bytes). Falling back to queued.", blobSourceInfo.getRawSizeInBytes());
return queuedIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties);
}
ExponentialRetry retry = new ExponentialRetry(exponentialRetryTemplate);
UUID finalSourceId = sourceId;
IngestionResult result = retry.execute(currentAttempt -> {
try {
String clientRequestId = String.format("KJC.executeManagedStreamingIngest.ingestFromBlob;%s;%d", finalSourceId, currentAttempt);
return streamingIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties, blobClient, clientRequestId);
} catch (Exception e) {
if (e instanceof IngestionServiceException
&& e.getCause() != null
&& e.getCause() instanceof DataServiceException
&& e.getCause().getCause() != null
&& e.getCause().getCause() instanceof DataWebException) {
DataWebException webException = (DataWebException) e.getCause().getCause();
OneApiError oneApiError = webException.getApiError();
if (oneApiError.isPermanent()) {
throw e;
}
}
log.info(String.format("Streaming ingestion failed attempt %d", currentAttempt), e);
}
return null;
});
if (result != null) {
return result;
}
return queuedIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties);
}
@ -287,7 +338,7 @@ public class ManagedStreamingIngestClient implements IngestClient {
try {
IngestionResult result = retry.execute(currentAttempt -> {
try {
String clientRequestId = String.format("KJC.executeManagedStreamingIngest;%s;%d", finalSourceId, currentAttempt);
String clientRequestId = String.format("KJC.executeManagedStreamingIngest.ingestFromStream;%s;%d", finalSourceId, currentAttempt);
return streamingIngestClient.ingestFromStream(managedSourceInfo, ingestionProperties, clientRequestId);
} catch (Exception e) {
if (e instanceof IngestionServiceException

Просмотреть файл

@ -226,7 +226,7 @@ public class QueuedIngestClientImpl extends IngestClientBase implements QueuedIn
shouldCompress);
String blobPath = container.getContainer().getBlobContainerUrl() + "/" + blobName + container.getSas();
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(
blobPath, 0); // TODO: check if we can get the rawDataSize locally - maybe add a countingStream
blobPath, 0, streamSourceInfo.getSourceId()); // TODO: check if we can get the rawDataSize locally - maybe add a countingStream
ingestionResult = ingestFromBlob(blobSourceInfo, ingestionProperties);
if (!streamSourceInfo.isLeaveOpen()) {

Просмотреть файл

@ -88,7 +88,6 @@ public class StreamingIngestClient extends IngestClientBase implements IngestCli
@Override
public IngestionResult ingestFromBlob(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException {
log.warn("Ingesting from blob using the StreamingIngestClient is not recommended, consider using the IngestClient instead.");
Ensure.argIsNotNull(blobSourceInfo, "blobSourceInfo");
Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
@ -97,7 +96,7 @@ public class StreamingIngestClient extends IngestClientBase implements IngestCli
try {
BlobClient blobClient = new BlobClientBuilder().endpoint(blobSourceInfo.getBlobPath()).buildClient();
return ingestFromBlob(blobSourceInfo, ingestionProperties, blobClient);
return ingestFromBlob(blobSourceInfo, ingestionProperties, blobClient, null);
} catch (IllegalArgumentException e) {
String msg = "Unexpected error when ingesting a blob - Invalid blob path.";
log.error(msg, e);
@ -203,7 +202,10 @@ public class StreamingIngestClient extends IngestClientBase implements IngestCli
return inputStream;
}
IngestionResult ingestFromBlob(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties, BlobClient cloudBlockBlob)
IngestionResult ingestFromBlob(BlobSourceInfo blobSourceInfo,
IngestionProperties ingestionProperties,
BlobClient cloudBlockBlob,
@Nullable String clientRequestId)
throws IngestionClientException, IngestionServiceException {
String blobPath = blobSourceInfo.getBlobPath();
try {
@ -217,9 +219,33 @@ public class StreamingIngestClient extends IngestClientBase implements IngestCli
throw new IngestionClientException(String.format("Exception trying to read blob metadata,%s",
ex.getStatusCode() == 403 ? "this might mean the blob doesn't exist" : ""), ex);
}
InputStream stream = cloudBlockBlob.openInputStream();
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(stream, false, blobSourceInfo.getSourceId(), IngestionUtils.getCompression(blobPath));
return ingestFromStream(streamSourceInfo, ingestionProperties);
ClientRequestProperties clientRequestProperties = null;
if (StringUtils.isNotBlank(clientRequestId)) {
clientRequestProperties = new ClientRequestProperties();
clientRequestProperties.setClientRequestId(clientRequestId);
}
IngestionProperties.DataFormat dataFormat = ingestionProperties.getDataFormat();
try {
this.streamingClient.executeStreamingIngestFromBlob(ingestionProperties.getDatabaseName(),
ingestionProperties.getTableName(),
blobPath,
clientRequestProperties,
dataFormat.getKustoValue(),
ingestionProperties.getIngestionMapping().getIngestionMappingReference());
} catch (DataClientException e) {
log.error(e.getMessage(), e);
throw new IngestionClientException(e.getMessage(), e);
} catch (DataServiceException e) {
log.error(e.getMessage(), e);
throw new IngestionServiceException(e.getMessage(), e);
}
log.debug("Blob was ingested successfully.");
IngestionStatus ingestionStatus = new IngestionStatus();
ingestionStatus.status = OperationStatus.Succeeded;
ingestionStatus.table = ingestionProperties.getTableName();
ingestionStatus.database = ingestionProperties.getDatabaseName();
return new IngestionStatusResult(ingestionStatus);
}
protected void setConnectionDataSource(String connectionDataSource) {

Просмотреть файл

@ -24,9 +24,14 @@ import com.microsoft.azure.kusto.data.format.CslDateTimeFormat;
import com.microsoft.azure.kusto.data.format.CslTimespanFormat;
import com.microsoft.azure.kusto.ingest.IngestionMapping.IngestionMappingKind;
import com.microsoft.azure.kusto.ingest.IngestionProperties.DataFormat;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import com.microsoft.azure.kusto.ingest.utils.ContainerWithSas;
import com.microsoft.azure.kusto.ingest.utils.SecurityUtils;
import org.apache.commons.lang3.StringUtils;
@ -49,7 +54,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;
@ -60,6 +64,7 @@ import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Calendar;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
@ -76,6 +81,7 @@ class E2ETest {
private static StreamingIngestClient streamingIngestClient;
private static ManagedStreamingIngestClient managedStreamingIngestClient;
private static Client queryClient;
private static Client dmCslClient;
private static StreamingClient streamingClient;
private static final String databaseName = System.getenv("TEST_DATABASE");
private static final String appId = System.getenv("APP_ID");
@ -91,7 +97,7 @@ class E2ETest {
private ObjectMapper objectMapper = Utils.getObjectMapper();
@BeforeAll
public static void setUp() throws IOException {
public static void setUp() throws IOException, URISyntaxException {
appKey = System.getenv("APP_KEY");
if (appKey == null) {
String secretPath = System.getProperty("SecretPath");
@ -109,6 +115,7 @@ class E2ETest {
tenantId);
dmCsb.setUserNameForTracing("testUser");
try {
dmCslClient = ClientFactory.createClient(dmCsb);
ingestClient = IngestClientFactory.createClient(dmCsb, HttpClientProperties.builder()
.keepAlive(true)
.maxKeepAliveTime(120)
@ -604,4 +611,30 @@ class E2ETest {
});
}
@Test
void testStreamingIngestFromBlob() throws IngestionClientException, IngestionServiceException, IOException {
ResourceManager resourceManager = new ResourceManager(dmCslClient, null);
ContainerWithSas container = resourceManager.getTempStorage();
AzureStorageClient azureStorageClient = new AzureStorageClient();
for (TestDataItem item : dataForTests) {
if (item.testOnstreamingIngestion) {
String blobName = String.format("%s__%s.%s.gz",
"testStreamingIngestFromBlob",
UUID.randomUUID(),
item.ingestionProperties.getDataFormat());
String blobPath = container.getContainer().getBlobContainerUrl() + "/" + blobName + container.getSas();
azureStorageClient.uploadLocalFileToBlob(item.file, blobName,
container.getContainer(), !item.file.getName().endsWith(".gz"));
try {
streamingIngestClient.ingestFromBlob(new BlobSourceInfo(blobPath), item.ingestionProperties);
} catch (Exception ex) {
Assertions.fail(ex);
}
assertRowCount(item.rows, false);
}
}
}
}

Просмотреть файл

@ -41,6 +41,7 @@ import java.sql.ResultSetMetaData;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.microsoft.azure.kusto.ingest.ManagedStreamingIngestClient.MAX_STREAMING_SIZE_BYTES;
import static com.microsoft.azure.kusto.ingest.StreamingIngestClientTest.jsonDataUncompressed;
import static com.microsoft.azure.kusto.ingest.StreamingIngestClientTest.verifyCompressedStreamContent;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@ -116,23 +117,24 @@ class ManagedStreamingIngestClientTest {
@Test
void IngestFromBlob_IngestionReportMethodIsNotTable_EmptyIngestionStatus() throws Exception {
BlobSourceInfo blobSourceInfo = new BlobSourceInfo("http://blobPath.com", 100);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo("https://blobPath.blob.core.windows.net/container/blob",
MAX_STREAMING_SIZE_BYTES + 1);
IngestionResult result = managedStreamingIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties);
assertEquals(result.getIngestionStatusCollection().get(0).status, OperationStatus.Queued);
assertEquals(OperationStatus.Queued, result.getIngestionStatusCollection().get(0).status);
}
@Test
void IngestFromBlob_IngestionReportMethodIsTable_NotEmptyIngestionStatus() throws Exception {
BlobSourceInfo blobSourceInfo = new BlobSourceInfo("http://blobPath.com", 100);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo("https://blobPath.blob.core.windows.net/container/blob", 100);
ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.TABLE);
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
IngestionResult result = managedStreamingIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties);
assertNotEquals(result.getIngestionStatusesLength(), 0);
assertNotEquals(0, result.getIngestionStatusesLength());
}
@Test
void IngestFromBlob_NullIngestionProperties_IllegalArgumentException() {
BlobSourceInfo blobSourceInfo = new BlobSourceInfo("http://blobPath.com", 100);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo("https://blobPath.blob.core.windows.net/container/blob", 100);
assertThrows(
IllegalArgumentException.class,
() -> managedStreamingIngestClient.ingestFromBlob(blobSourceInfo, null));
@ -148,8 +150,8 @@ class ManagedStreamingIngestClientTest {
@Test
void IngestFromBlob_IngestionReportMethodIsTable_RemovesSecrets() throws Exception {
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(
"https://storage.table.core.windows.net/ingestionsstatus20190505?sv=2018-03-28&tn=ingestionsstatus20190505&sig=anAusomeSecret%2FK024xNydFzT%2B2cCE%2BA2S8Y6U%3D&st=2019-05-05T09%3A00%3A31Z&se=2019-05-09T10%3A00%3A31Z&sp=raud",
100);
"http://blobPath.blob.core.windows.net/container/blob",
MAX_STREAMING_SIZE_BYTES + 1);
ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.TABLE);
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
ArgumentCaptor<TableEntity> captor = ArgumentCaptor.forClass(TableEntity.class);
@ -158,7 +160,7 @@ class ManagedStreamingIngestClientTest {
verify(azureStorageClientMock, atLeast(1)).azureTableInsertEntity(any(), captor.capture());
assert (IngestionStatus.fromEntity(captor.getValue()).getIngestionSourcePath())
.equals("https://storage.table.core.windows.net/ingestionsstatus20190505");
.equals("http://blobPath.blob.core.windows.net/container/blob");
}
@Test
@ -541,6 +543,18 @@ class ManagedStreamingIngestClientTest {
}
}
@Test
void IngestFromBlob_IngestOverBlobLimit_QueuedFallback() throws Exception {
ingestionProperties.setDataFormat(IngestionProperties.DataFormat.JSON);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(
"https://blobPath.blob.core.windows.net/container/blob", MAX_STREAMING_SIZE_BYTES + 1);
managedStreamingIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties);
verify(streamingClientMock, never()).executeStreamingIngest(any(String.class), any(String.class), any(InputStream.class),
clientRequestPropertiesCaptor.capture(), any(String.class), eq("mappingName"), any(boolean.class));
}
@Test
void CreateManagedStreamingIngestClient_WithDefaultCtor_WithQueryUri_Pass() throws URISyntaxException {
ManagedStreamingIngestClient client = IngestClientFactory.createManagedStreamingIngestClient(ConnectionStringBuilder.createWithUserPrompt("https" +
@ -600,11 +614,15 @@ class ManagedStreamingIngestClientTest {
}
private static void verifyClientRequestId(int count, @Nullable UUID expectedUUID) {
verifyClientRequestId(count, expectedUUID, "ingestFromStream");
}
private static void verifyClientRequestId(int count, @Nullable UUID expectedUUID, String method) {
String clientRequestId = clientRequestPropertiesCaptor.getValue().getClientRequestId();
assertNotNull(clientRequestId);
String[] values = clientRequestId.split(";");
assertEquals(3, values.length);
assertEquals("KJC.executeManagedStreamingIngest", values[0]);
assertEquals("KJC.executeManagedStreamingIngest.ingestFromStream", values[0]);
assertDoesNotThrow(() -> {
UUID actual = UUID.fromString(values[1]);
if (expectedUUID != null) {

Просмотреть файл

@ -86,14 +86,14 @@ class QueuedIngestClientTest {
@Test
void IngestFromBlob_IngestionReportMethodIsNotTable_EmptyIngestionStatus() throws Exception {
BlobSourceInfo blobSourceInfo = new BlobSourceInfo("http://blobPath.com", 100);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo("https://blobPath.blob.core.windows.net/container/blob", 100);
IngestionResult result = queuedIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties);
assertEquals(result.getIngestionStatusCollection().get(0).status, OperationStatus.Queued);
}
@Test
void IngestFromBlob_IngestionReportMethodIsTable_NotEmptyIngestionStatus() throws Exception {
BlobSourceInfo blobSourceInfo = new BlobSourceInfo("http://blobPath.com", 100);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo("https://blobPath.blob.core.windows.net/container/blob", 100);
ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.TABLE);
IngestionResult result = queuedIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties);
@ -102,7 +102,7 @@ class QueuedIngestClientTest {
@Test
void IngestFromBlob_NullIngestionProperties_IllegalArgumentException() {
BlobSourceInfo blobSourceInfo = new BlobSourceInfo("http://blobPath.com", 100);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo("https://blobPath.blob.core.windows.net/container/blob", 100);
assertThrows(
IllegalArgumentException.class,
() -> queuedIngestClient.ingestFromBlob(blobSourceInfo, null));

Просмотреть файл

@ -499,7 +499,7 @@ class StreamingIngestClientTest {
when(cloudBlockBlob.getProperties()).thenReturn(blobProperties);
when(cloudBlockBlob.openInputStream()).thenReturn(blobInputStream);
OperationStatus status = streamingIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties, cloudBlockBlob).getIngestionStatusCollection()
OperationStatus status = streamingIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties, cloudBlockBlob, null).getIngestionStatusCollection()
.get(0).status;
assertEquals(OperationStatus.Succeeded, status);
verify(streamingClientMock, atLeastOnce()).executeStreamingIngest(any(String.class), any(String.class), any(InputStream.class),
@ -612,7 +612,7 @@ class StreamingIngestClientTest {
when(cloudBlockBlob.getProperties()).thenReturn(blobProperties);
IngestionClientException ingestionClientException = assertThrows(IngestionClientException.class,
() -> streamingIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties, cloudBlockBlob),
() -> streamingIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties, cloudBlockBlob, null),
"Expected IngestionClientException to be thrown, but it didn't");
assertTrue(ingestionClientException.getMessage().contains("Empty blob."));
}

Просмотреть файл

@ -17,16 +17,17 @@ import java.io.ByteArrayOutputStream;
public class FileIngestion {
public static void main(String[] args) {
try {
ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(System.getProperty("clusterPath"),
System.getProperty("appId"),
System.getProperty("appKey"),
System.getProperty("appTenant"));
try (IngestClient client = IngestClientFactory.createClient(csb)) {
IngestionProperties ingestionProperties = new IngestionProperties(System.getProperty("dbName"),
System.getProperty("tableName"));
ingestionProperties.setIngestionMapping(System.getProperty("dataMappingName"), IngestionMapping.IngestionMappingKind.JSON);
// TableClient tableClient =
// TableWithSas.TableClientFromUrl("https://5s8kstrldruthruth01.blob.core.windows.net/20230313-ingestdata-e5c334ee145d4b4-0?sv=2018-03-28&sr=c&sig=QshIuU9ZZ1jvcgcPMnHcr0EvCwO9sxZbvAUaAtI%3D&st=2023-03-13T13%3A16%3A57Z&se=2023-03-17T14%3A16%3A57Z&sp=rw",
// null);
// tableClient.createEntity(new TableEntity("123", "123"));
FileSourceInfo fileSourceInfo = new FileSourceInfo(System.getProperty("filePath"), 0);
ConnectionStringBuilder csb = ConnectionStringBuilder.createWithUserPrompt("https://ruthruth.eastus.kusto.windows.net");
try (IngestClient client = IngestClientFactory.createClient(csb)) {
IngestionProperties ingestionProperties = new IngestionProperties("db2", "TestTable");
ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.TABLE);
ingestionProperties.setReportLevel(IngestionProperties.IngestionReportLevel.FAILURES_AND_SUCCESSES);
FileSourceInfo fileSourceInfo = new FileSourceInfo("C:\\Users\\ohbitton\\OneDrive - Microsoft\\Desktop\\data\\a.csv", 0);
IngestionResult ingestionResult = client.ingestFromFile(fileSourceInfo, ingestionProperties);
ByteArrayOutputStream st = new ByteArrayOutputStream();
st.write("asd,2".getBytes());