Fix time out as timestamp (#185)
* fix timeout * revert * timeout as timespan * j * many changes * better * clean * clean * no auth * better * better * comments * comments * comments Co-authored-by: KustoIbizaExtension Build <kustodev@microsoft.com> Co-authored-by: Ohad Bitton <ohbitton@microsoft.com>
This commit is contained in:
Родитель
cde2361654
Коммит
674045bfb9
|
@ -3,6 +3,7 @@
|
|||
|
||||
package com.microsoft.azure.kusto.data;
|
||||
|
||||
import com.microsoft.azure.kusto.data.auth.CloudInfo;
|
||||
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
|
||||
import com.microsoft.azure.kusto.data.auth.TokenProviderBase;
|
||||
import com.microsoft.azure.kusto.data.auth.TokenProviderFactory;
|
||||
|
@ -18,10 +19,7 @@ import org.json.JSONObject;
|
|||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class ClientImpl implements Client, StreamingClient {
|
||||
|
@ -46,6 +44,8 @@ public class ClientImpl implements Client, StreamingClient {
|
|||
String url = csb.getClusterUrl();
|
||||
URI clusterUri = new URI(url);
|
||||
String host = clusterUri.getHost();
|
||||
Objects.requireNonNull(clusterUri.getAuthority(), "clusterUri.authority");
|
||||
|
||||
String auth = clusterUri.getAuthority().toLowerCase();
|
||||
if (host == null && auth.endsWith(FEDERATED_SECURITY_POSTFIX)) {
|
||||
url = new URIBuilder().setScheme(clusterUri.getScheme()).setHost(auth.substring(0, clusterUri.getAuthority().indexOf(FEDERATED_SECURITY_POSTFIX))).toString();
|
||||
|
@ -53,7 +53,8 @@ public class ClientImpl implements Client, StreamingClient {
|
|||
}
|
||||
|
||||
clusterUrl = url;
|
||||
aadAuthenticationHelper = TokenProviderFactory.createTokenProvider(csb);
|
||||
aadAuthenticationHelper = clusterUrl.toLowerCase().startsWith(CloudInfo.LOCALHOST) ?
|
||||
null : TokenProviderFactory.createTokenProvider(csb);
|
||||
clientVersionForTracing = "Kusto.Java.Client";
|
||||
String version = Utils.getPackageVersion();
|
||||
if (StringUtils.isNotBlank(version)) {
|
||||
|
@ -86,7 +87,7 @@ public class ClientImpl implements Client, StreamingClient {
|
|||
return new KustoOperationResult(response, clusterEndpoint.endsWith("v2/rest/query") ? "v2" : "v1");
|
||||
} catch (KustoServiceQueryError e) {
|
||||
throw new DataServiceException(clusterEndpoint,
|
||||
"Error parsing json response as KustoOperationResult:" + e.getMessage(), e, e.isPermanent());
|
||||
"Error found while parsing json response as KustoOperationResult:" + e.getMessage(), e, e.isPermanent());
|
||||
} catch (Exception e){
|
||||
throw new DataClientException(clusterEndpoint, e.getMessage(), e);
|
||||
}
|
||||
|
@ -116,7 +117,8 @@ public class ClientImpl implements Client, StreamingClient {
|
|||
long timeoutMs = determineTimeout(properties, commandType);
|
||||
String clusterEndpoint = String.format(commandType.getEndpoint(), clusterUrl);
|
||||
|
||||
Map<String, String> headers = generateIngestAndCommandHeaders(properties, "KJC.execute", commandType.getActivityTypeSuffix());
|
||||
Map<String, String> headers = generateIngestAndCommandHeaders(properties, "KJC.execute",
|
||||
commandType.getActivityTypeSuffix());
|
||||
addCommandHeaders(headers);
|
||||
String jsonPayload = generateCommandPayload(database, command, properties, clusterEndpoint);
|
||||
|
||||
|
@ -142,7 +144,8 @@ public class ClientImpl implements Client, StreamingClient {
|
|||
if (!StringUtils.isEmpty(mappingName)) {
|
||||
clusterEndpoint = clusterEndpoint.concat(String.format("&mappingName=%s", mappingName));
|
||||
}
|
||||
Map<String, String> headers = generateIngestAndCommandHeaders(properties, "KJC.executeStreamingIngest", CommandType.STREAMING_INGEST.getActivityTypeSuffix());
|
||||
Map<String, String> headers = generateIngestAndCommandHeaders(properties, "KJC.executeStreamingIngest",
|
||||
CommandType.STREAMING_INGEST.getActivityTypeSuffix());
|
||||
|
||||
Long timeoutMs = null;
|
||||
if (properties != null) {
|
||||
|
@ -190,7 +193,8 @@ public class ClientImpl implements Client, StreamingClient {
|
|||
long timeoutMs = determineTimeout(properties, commandType);
|
||||
String clusterEndpoint = String.format(commandType.getEndpoint(), clusterUrl);
|
||||
|
||||
Map<String, String> headers = generateIngestAndCommandHeaders(properties, "KJC.executeStreaming", commandType.getActivityTypeSuffix());
|
||||
Map<String, String> headers = generateIngestAndCommandHeaders(properties, "KJC.executeStreaming",
|
||||
commandType.getActivityTypeSuffix());
|
||||
addCommandHeaders(headers);
|
||||
String jsonPayload = generateCommandPayload(database, command, properties, clusterEndpoint);
|
||||
|
||||
|
@ -216,7 +220,10 @@ public class ClientImpl implements Client, StreamingClient {
|
|||
return CommandType.QUERY;
|
||||
}
|
||||
|
||||
private Map<String, String> generateIngestAndCommandHeaders(ClientRequestProperties properties, String clientRequestIdPrefix, String activityTypeSuffix) throws DataServiceException, DataClientException {
|
||||
private Map<String, String> generateIngestAndCommandHeaders(ClientRequestProperties properties,
|
||||
String clientRequestIdPrefix,
|
||||
String activityTypeSuffix)
|
||||
throws DataServiceException, DataClientException {
|
||||
Map<String, String> headers = new HashMap<>();
|
||||
headers.put("x-ms-client-version", clientVersionForTracing);
|
||||
if (applicationNameForTracing != null) {
|
||||
|
@ -225,8 +232,9 @@ public class ClientImpl implements Client, StreamingClient {
|
|||
if (userNameForTracing != null) {
|
||||
headers.put("x-ms-user-id", userNameForTracing);
|
||||
}
|
||||
headers.put(HttpHeaders.AUTHORIZATION, String.format("Bearer %s", aadAuthenticationHelper.acquireAccessToken()));
|
||||
|
||||
if (aadAuthenticationHelper != null) {
|
||||
headers.put(HttpHeaders.AUTHORIZATION, String.format("Bearer %s", aadAuthenticationHelper.acquireAccessToken()));
|
||||
}
|
||||
String clientRequestId;
|
||||
if (properties != null && StringUtils.isNotBlank(properties.getClientRequestId())) {
|
||||
clientRequestId = properties.getClientRequestId();
|
||||
|
|
|
@ -4,12 +4,18 @@
|
|||
package com.microsoft.azure.kusto.data;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.http.ParseException;
|
||||
import org.json.JSONException;
|
||||
import org.json.JSONObject;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.LocalTime;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/*
|
||||
* Kusto supports attaching various properties to client requests (such as queries and control commands).
|
||||
|
@ -23,9 +29,12 @@ public class ClientRequestProperties {
|
|||
private static final String PARAMETERS_KEY = "Parameters";
|
||||
public static final String OPTION_SERVER_TIMEOUT = "servertimeout";
|
||||
public static final String OPTION_CLIENT_REQUEST_ID = "ClientRequestId";
|
||||
private static final long NANOS_TO_MILLIS = 1000000L;
|
||||
private HashMap<String, Object> parameters;
|
||||
private HashMap<String, Object> options;
|
||||
private final Map<String, Object> parameters;
|
||||
private final Map<String, Object> options;
|
||||
private static final Pattern PATTERN =
|
||||
Pattern.compile("(?:(\\d+)\\.)?((?:[0-2]?\\d:)?(?:[0-5]?\\d):(?:[0-5]?\\d)(?:\\.\\d+)?)",
|
||||
Pattern.CASE_INSENSITIVE);
|
||||
static final long MAX_TIMEOUT_MS = TimeUnit.HOURS.toSeconds(1) * 1000;
|
||||
|
||||
public ClientRequestProperties() {
|
||||
parameters = new HashMap<>();
|
||||
|
@ -70,7 +79,7 @@ public class ClientRequestProperties {
|
|||
if (timeoutObj instanceof Long) {
|
||||
timeout = (Long) timeoutObj;
|
||||
} else if (timeoutObj instanceof String) {
|
||||
timeout = LocalTime.parse((String) timeoutObj).toNanoOfDay() / NANOS_TO_MILLIS;
|
||||
timeout = parseTimeoutFromTimespanString((String) timeoutObj);
|
||||
} else if (timeoutObj instanceof Integer) {
|
||||
timeout = Long.valueOf((Integer) timeoutObj);
|
||||
}
|
||||
|
@ -78,6 +87,22 @@ public class ClientRequestProperties {
|
|||
return timeout;
|
||||
}
|
||||
|
||||
private long parseTimeoutFromTimespanString(String str) throws ParseException {
|
||||
Matcher matcher = PATTERN.matcher(str);
|
||||
if (!matcher.matches()) {
|
||||
throw new ParseException(String.format("Failed to parse timeout string as a timespan. Value: %s", str));
|
||||
}
|
||||
|
||||
long millis = 0;
|
||||
String days = matcher.group(1);
|
||||
if (days != null && !days.equals("0")) {
|
||||
return MAX_TIMEOUT_MS;
|
||||
}
|
||||
|
||||
millis += TimeUnit.NANOSECONDS.toMillis(LocalTime.parse(matcher.group(2)).toNanoOfDay());
|
||||
return millis;
|
||||
}
|
||||
|
||||
public void setTimeoutInMilliSec(Long timeoutInMs) {
|
||||
options.put(OPTION_SERVER_TIMEOUT, timeoutInMs);
|
||||
}
|
||||
|
@ -85,10 +110,20 @@ public class ClientRequestProperties {
|
|||
JSONObject toJson() {
|
||||
try {
|
||||
JSONObject optionsAsJSON = new JSONObject(this.options);
|
||||
Long timeoutInMilliSec = getTimeoutInMilliSec();
|
||||
if (timeoutInMilliSec != null) {
|
||||
LocalTime localTime = LocalTime.ofNanoOfDay(timeoutInMilliSec * NANOS_TO_MILLIS);
|
||||
optionsAsJSON.put(OPTION_SERVER_TIMEOUT, localTime.toString());
|
||||
Object timeoutObj = getOption(OPTION_SERVER_TIMEOUT);
|
||||
|
||||
if (timeoutObj != null) {
|
||||
String timeoutString = "";
|
||||
if (timeoutObj instanceof Long) {
|
||||
Duration duration = Duration.ofMillis((Long) timeoutObj);
|
||||
timeoutString = Utils.formatDurationAsTimespan(duration);
|
||||
} else if (timeoutObj instanceof String) {
|
||||
timeoutString = (String) timeoutObj;
|
||||
} else if (timeoutObj instanceof Integer) {
|
||||
Duration duration = Duration.ofMillis((Integer) timeoutObj);
|
||||
timeoutString = Utils.formatDurationAsTimespan(duration);
|
||||
}
|
||||
optionsAsJSON.put(OPTION_SERVER_TIMEOUT, timeoutString);
|
||||
}
|
||||
JSONObject json = new JSONObject();
|
||||
json.put(OPTIONS_KEY, optionsAsJSON);
|
||||
|
@ -111,11 +146,11 @@ public class ClientRequestProperties {
|
|||
while (it.hasNext()) {
|
||||
String propertyName = it.next();
|
||||
if (propertyName.equals(OPTIONS_KEY)) {
|
||||
JSONObject options = (JSONObject) jsonObj.get(propertyName);
|
||||
Iterator<String> optionsIt = options.keys();
|
||||
JSONObject optionsJson = (JSONObject) jsonObj.get(propertyName);
|
||||
Iterator<String> optionsIt = optionsJson.keys();
|
||||
while (optionsIt.hasNext()) {
|
||||
String optionName = optionsIt.next();
|
||||
crp.setOption(optionName, options.get(optionName));
|
||||
crp.setOption(optionName, optionsJson.get(optionName));
|
||||
}
|
||||
} else if (propertyName.equals(PARAMETERS_KEY)) {
|
||||
JSONObject parameters = (JSONObject) jsonObj.get(propertyName);
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT License.
|
||||
|
||||
package com.microsoft.azure.kusto.ingest;
|
||||
package com.microsoft.azure.kusto.data;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
|
@ -25,16 +25,16 @@ import org.slf4j.LoggerFactory;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.net.*;
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.zip.DeflaterInputStream;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
import com.microsoft.azure.kusto.data.auth.CloudInfo;
|
||||
|
||||
class Utils {
|
||||
private static final int MAX_REDIRECT_COUNT = 1;
|
||||
|
@ -47,7 +47,9 @@ class Utils {
|
|||
static String post(String url, String payload, InputStream stream, long timeoutMs, Map<String, String> headers, boolean leaveOpen) throws DataServiceException, DataClientException {
|
||||
URI uri = parseUriFromUrlString(url);
|
||||
|
||||
HttpClient httpClient = getHttpClient(Math.toIntExact(timeoutMs));
|
||||
HttpClient httpClient = getHttpClient(timeoutMs > Integer.MAX_VALUE ?
|
||||
Integer.MAX_VALUE :
|
||||
Math.toIntExact(timeoutMs));
|
||||
|
||||
try (InputStream ignored = (stream != null && !leaveOpen) ? stream : null) {
|
||||
HttpPost request = setupHttpPostRequest(uri, payload, stream, headers);
|
||||
|
@ -65,7 +67,11 @@ class Utils {
|
|||
throw createExceptionFromResponse(url, response, null, responseContent);
|
||||
}
|
||||
}
|
||||
} catch (JSONException | IOException e) {
|
||||
}
|
||||
catch (SocketTimeoutException e) {
|
||||
throw new DataServiceException(url, "Timed out in post request:" + e.getMessage(), false);
|
||||
}
|
||||
catch (JSONException | IOException e) {
|
||||
throw new DataClientException(url, "Error in post request:" + e.getMessage(), e);
|
||||
}
|
||||
return null;
|
||||
|
@ -148,19 +154,19 @@ class Utils {
|
|||
* result), or (2) in the KustoOperationResult's QueryCompletionInformation, both of which present with "200 OK". See .Net's DataReaderParser.
|
||||
*/
|
||||
String activityId = determineActivityId(httpResponse);
|
||||
if (StringUtils.isBlank(errorFromResponse)) {
|
||||
errorFromResponse = String.format("Http StatusCode='%s', ActivityId='%s'", httpResponse.getStatusLine().toString(), activityId);
|
||||
return new DataServiceException(url, errorFromResponse, thrownException, false);
|
||||
} else {
|
||||
if (!StringUtils.isBlank(errorFromResponse)) {
|
||||
String message = "";
|
||||
DataWebException formattedException = new DataWebException(errorFromResponse, httpResponse);
|
||||
try {
|
||||
message = String.format("%s, ActivityId='%s'", formattedException.getApiError().getDescription(), activityId);
|
||||
return new DataServiceException(url, message, formattedException,
|
||||
formattedException.getApiError().isPermanent());
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
return new DataServiceException(url, message, formattedException,
|
||||
formattedException.getApiError().isPermanent());
|
||||
}
|
||||
errorFromResponse = String.format("Http StatusCode='%s', ActivityId='%s'", httpResponse.getStatusLine().toString(), activityId);
|
||||
return new DataServiceException(url, errorFromResponse, thrownException, false);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -222,6 +228,7 @@ class Utils {
|
|||
for (Map.Entry<String, String> entry : headers.entrySet()) {
|
||||
request.addHeader(entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
||||
return request;
|
||||
}
|
||||
|
||||
|
@ -229,7 +236,7 @@ class Utils {
|
|||
private static URI parseUriFromUrlString(String url) throws DataClientException {
|
||||
try {
|
||||
URL cleanUrl = new URL(url);
|
||||
if ("https".equalsIgnoreCase(cleanUrl.getProtocol())) {
|
||||
if ("https".equalsIgnoreCase(cleanUrl.getProtocol()) || cleanUrl.getHost().equalsIgnoreCase(CloudInfo.LOCALHOST)) {
|
||||
return new URI(cleanUrl.getProtocol(), cleanUrl.getUserInfo(), cleanUrl.getHost(), cleanUrl.getPort(), cleanUrl.getPath(), cleanUrl.getQuery(), cleanUrl.getRef());
|
||||
} else {
|
||||
throw new DataClientException(url, "Cannot forward security token to a remote service over insecure " +
|
||||
|
@ -251,4 +258,22 @@ class Utils {
|
|||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
public static String formatDurationAsTimespan(Duration duration) {
|
||||
long seconds = duration.getSeconds();
|
||||
int nanos = duration.getNano();
|
||||
long hours = TimeUnit.SECONDS.toHours(seconds) % TimeUnit.DAYS.toHours(1);
|
||||
long minutes = TimeUnit.SECONDS.toMinutes( seconds) % TimeUnit.MINUTES.toSeconds(1);
|
||||
long secs = seconds % TimeUnit.HOURS.toSeconds(1);
|
||||
long days = TimeUnit.SECONDS.toDays(seconds);
|
||||
String positive = String.format(
|
||||
"%02d.%02d:%02d:%02d.%.3s",
|
||||
days,
|
||||
hours,
|
||||
minutes,
|
||||
secs,
|
||||
nanos);
|
||||
|
||||
return seconds < 0 ? "-" + positive : positive;
|
||||
}
|
||||
}
|
|
@ -33,8 +33,12 @@ public class CloudInfo {
|
|||
DEFAULT_KUSTO_SERVICE_RESOURCE_ID,
|
||||
DEFAULT_FIRST_PARTY_AUTHORITY_URL
|
||||
);
|
||||
public static final String LOCALHOST = "http://localhost";
|
||||
|
||||
private static final Map<String, CloudInfo> cache = new HashMap<>();
|
||||
static {
|
||||
cache.put(LOCALHOST, DEFAULT_CLOUD);
|
||||
}
|
||||
|
||||
private final boolean loginMfaRequired;
|
||||
private final String loginEndpoint;
|
||||
|
|
|
@ -37,7 +37,7 @@ public class UserPromptTokenProvider extends PublicAppTokenProviderBase {
|
|||
|
||||
private final String usernameHint;
|
||||
|
||||
UserPromptTokenProvider(@NotNull String clusterUrl, String authorityId) throws URISyntaxException {
|
||||
public UserPromptTokenProvider(@NotNull String clusterUrl, String authorityId) throws URISyntaxException {
|
||||
this(null, clusterUrl, authorityId);
|
||||
}
|
||||
|
||||
|
|
|
@ -35,14 +35,14 @@ public class ResultSetTest {
|
|||
String str = "str";
|
||||
Instant now = Instant.now();
|
||||
|
||||
Duration duration = Duration.ofHours(2);
|
||||
Duration duration = Duration.ofHours(2).plusSeconds(1);
|
||||
BigDecimal dec = BigDecimal.valueOf(1, 1);
|
||||
UUID uuid = UUID.randomUUID();
|
||||
int i = 1;
|
||||
long l = 100000000000L;
|
||||
double d = 1.1d;
|
||||
short s1 = 10;
|
||||
String durationAsKustoString = formatDuration(duration);
|
||||
String durationAsKustoString = LocalTime.MIDNIGHT.plus(duration).toString();
|
||||
row1.put(true);
|
||||
row1.put(str);
|
||||
row1.put(now);
|
||||
|
@ -96,16 +96,4 @@ public class ResultSetTest {
|
|||
Assertions.assertEquals(res.getLocalTime(9), LocalTime.parse(durationAsKustoString));
|
||||
Assertions.assertEquals(res.getShort(10), s1);
|
||||
}
|
||||
|
||||
public static String formatDuration(Duration duration) {
|
||||
long seconds = duration.getSeconds();
|
||||
long absSeconds = Math.abs(seconds);
|
||||
String positive = String.format(
|
||||
"%02d:%02d:%02d",
|
||||
absSeconds / 3600,
|
||||
(absSeconds % 3600) / 60,
|
||||
absSeconds % 60);
|
||||
return seconds < 0 ? "-" + positive : positive;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
package com.microsoft.azure.kusto.data;
|
||||
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.DisplayName;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
||||
public class UtilitiesTest {
|
||||
@Test
|
||||
@DisplayName("Convert millis to .Net timespan")
|
||||
void convertMillisToTimespan() {
|
||||
Long timeout = TimeUnit.MINUTES.toMillis(40) + TimeUnit.SECONDS.toMillis(2); // 40 minutes 2 seconds
|
||||
ClientRequestProperties clientRequestProperties = new ClientRequestProperties();
|
||||
clientRequestProperties.setTimeoutInMilliSec(timeout);
|
||||
Assertions.assertEquals(timeout, clientRequestProperties.getTimeoutInMilliSec());
|
||||
Assertions.assertEquals(timeout, clientRequestProperties.getOption(ClientRequestProperties.OPTION_SERVER_TIMEOUT));
|
||||
|
||||
String serverTimeoutOptionStr = "01:40:02.1";
|
||||
long serverTimeoutOptionMillis = TimeUnit.HOURS.toMillis(1)
|
||||
+ TimeUnit.MINUTES.toMillis(40)
|
||||
+ TimeUnit.SECONDS.toMillis(2) + 100L;
|
||||
clientRequestProperties.setOption(ClientRequestProperties.OPTION_SERVER_TIMEOUT, serverTimeoutOptionStr);
|
||||
Assertions.assertEquals(serverTimeoutOptionMillis, clientRequestProperties.getTimeoutInMilliSec());
|
||||
|
||||
// If set to over MAX_TIMEOUT_MS - value should be MAX_TIMEOUT_MS
|
||||
clientRequestProperties.setOption(ClientRequestProperties.OPTION_SERVER_TIMEOUT, "1.01:40:02.1");
|
||||
Assertions.assertEquals(ClientRequestProperties.MAX_TIMEOUT_MS,
|
||||
clientRequestProperties.getTimeoutInMilliSec());
|
||||
|
||||
clientRequestProperties.setOption(ClientRequestProperties.OPTION_SERVER_TIMEOUT, "15:00");
|
||||
Assertions.assertEquals(TimeUnit.HOURS.toMillis(15), clientRequestProperties.getTimeoutInMilliSec());
|
||||
}
|
||||
}
|
|
@ -3,6 +3,7 @@
|
|||
|
||||
package com.microsoft.azure.kusto.ingest;
|
||||
|
||||
import com.microsoft.azure.kusto.data.Ensure;
|
||||
import com.microsoft.azure.kusto.ingest.source.CompressionType;
|
||||
import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
|
|
|
@ -8,7 +8,7 @@ import java.util.List;
|
|||
|
||||
/**
|
||||
* This class describes the ingestion mapping to use for an ingestion request.
|
||||
* When a CSV data source schema and the target schema doesn't match or when using JSON, AVRO, PARQUET or ORC formats,
|
||||
* When a CSV data source schema and the target schema doesn't match or when using JSON, AVRO formats,
|
||||
* there is a need to define an ingestion mapping to map the source schema to the table schema.
|
||||
* This class describes a pre-define ingestion mapping by its name- mapping reference and its kind.
|
||||
*/
|
||||
|
@ -16,7 +16,7 @@ public class IngestionMapping {
|
|||
private ColumnMapping[] columnMappings;
|
||||
private IngestionMappingKind ingestionMappingKind;
|
||||
private String ingestionMappingReference;
|
||||
public static final List<String> mappingRequiredFormats = Arrays.asList("json", "singlejson", "avro", "apacheavro", "parquet", "orc");
|
||||
public static final List<String> mappingRequiredFormats = Arrays.asList("json", "singlejson", "avro");
|
||||
|
||||
/**
|
||||
* Creates a default ingestion mapping with kind Unknown and empty mapping reference.
|
||||
|
@ -97,6 +97,6 @@ public class IngestionMapping {
|
|||
Represents an ingestion mapping kind - the format of the source data to map from.
|
||||
*/
|
||||
public enum IngestionMappingKind {
|
||||
unknown, Csv, Json, Parquet, Avro, ApacheAvro, Orc
|
||||
unknown, Csv, Json, Parquet, Avro, ApacheAvro, Orc, W3CLogFile
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ package com.microsoft.azure.kusto.ingest;
|
|||
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
|
||||
import com.fasterxml.jackson.annotation.PropertyAccessor;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.microsoft.azure.kusto.data.Ensure;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -303,7 +304,8 @@ public class IngestionProperties {
|
|||
apacheavro,
|
||||
parquet,
|
||||
orc,
|
||||
raw
|
||||
raw,
|
||||
w3clogfile
|
||||
}
|
||||
|
||||
public enum IngestionReportLevel {
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package com.microsoft.azure.kusto.ingest;
|
||||
|
||||
import com.microsoft.azure.kusto.data.Ensure;
|
||||
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
|
||||
import com.microsoft.azure.kusto.data.StreamingClient;
|
||||
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
|
||||
|
|
|
@ -6,6 +6,7 @@ package com.microsoft.azure.kusto.ingest;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.microsoft.azure.kusto.data.Client;
|
||||
import com.microsoft.azure.kusto.data.ClientFactory;
|
||||
import com.microsoft.azure.kusto.data.Ensure;
|
||||
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
|
||||
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
|
||||
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
|
||||
|
@ -90,22 +91,23 @@ public class QueuedIngestClient extends IngestClientBase implements IngestClient
|
|||
ingestionBlobInfo.id = blobSourceInfo.getSourceId();
|
||||
}
|
||||
|
||||
if (ingestionProperties.getReportMethod() != IngestionProperties.IngestionReportMethod.Queue) {
|
||||
IngestionStatus status = new IngestionStatus(ingestionBlobInfo.id);
|
||||
status.database = ingestionProperties.getDatabaseName();
|
||||
status.table = ingestionProperties.getTableName();
|
||||
status.status = OperationStatus.Queued;
|
||||
status.updatedOn = Date.from(Instant.now());
|
||||
status.ingestionSourceId = ingestionBlobInfo.id;
|
||||
status.setIngestionSourcePath(urlWithoutSecrets);
|
||||
boolean reportToTable = ingestionBlobInfo.reportLevel != IngestionProperties.IngestionReportLevel.None
|
||||
&& ingestionProperties.getReportMethod() != IngestionProperties.IngestionReportMethod.Queue;
|
||||
if (reportToTable) {
|
||||
status.status = OperationStatus.Pending;
|
||||
String tableStatusUri = resourceManager
|
||||
.getIngestionResource(ResourceManager.ResourceType.INGESTIONS_STATUS_TABLE);
|
||||
ingestionBlobInfo.IngestionStatusInTable = new IngestionStatusInTableDescription();
|
||||
ingestionBlobInfo.IngestionStatusInTable.TableConnectionString = tableStatusUri;
|
||||
ingestionBlobInfo.IngestionStatusInTable.RowKey = ingestionBlobInfo.id.toString();
|
||||
ingestionBlobInfo.IngestionStatusInTable.PartitionKey = ingestionBlobInfo.id.toString();
|
||||
|
||||
IngestionStatus status = new IngestionStatus(ingestionBlobInfo.id);
|
||||
status.database = ingestionProperties.getDatabaseName();
|
||||
status.table = ingestionProperties.getTableName();
|
||||
status.status = OperationStatus.Pending;
|
||||
status.updatedOn = Date.from(Instant.now());
|
||||
status.ingestionSourceId = ingestionBlobInfo.id;
|
||||
status.setIngestionSourcePath(urlWithoutSecrets);
|
||||
|
||||
azureStorageClient.azureTableInsertEntity(tableStatusUri, status);
|
||||
tableStatuses.add(ingestionBlobInfo.IngestionStatusInTable);
|
||||
}
|
||||
|
@ -117,7 +119,9 @@ public class QueuedIngestClient extends IngestClientBase implements IngestClient
|
|||
resourceManager
|
||||
.getIngestionResource(ResourceManager.ResourceType.SECURED_READY_FOR_AGGREGATION_QUEUE)
|
||||
, serializedIngestionBlobInfo);
|
||||
return new TableReportIngestionResult(tableStatuses);
|
||||
return reportToTable
|
||||
? new TableReportIngestionResult(tableStatuses)
|
||||
: new IngestionStatusResult(status);
|
||||
} catch (StorageException e) {
|
||||
throw new IngestionServiceException("Failed to ingest from blob", e);
|
||||
} catch (IOException | URISyntaxException e) {
|
||||
|
|
|
@ -3,10 +3,7 @@
|
|||
|
||||
package com.microsoft.azure.kusto.ingest;
|
||||
|
||||
import com.microsoft.azure.kusto.data.ClientFactory;
|
||||
import com.microsoft.azure.kusto.data.KustoOperationResult;
|
||||
import com.microsoft.azure.kusto.data.KustoResultSetTable;
|
||||
import com.microsoft.azure.kusto.data.StreamingClient;
|
||||
import com.microsoft.azure.kusto.data.*;
|
||||
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
|
||||
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
|
||||
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
|
||||
|
|
|
@ -5,7 +5,7 @@ package com.microsoft.azure.kusto.ingest.source;
|
|||
|
||||
import java.util.UUID;
|
||||
|
||||
import static com.microsoft.azure.kusto.ingest.Ensure.stringIsNotBlank;
|
||||
import static com.microsoft.azure.kusto.data.Ensure.stringIsNotBlank;
|
||||
|
||||
|
||||
public class BlobSourceInfo extends AbstractSourceInfo {
|
||||
|
|
|
@ -5,7 +5,7 @@ package com.microsoft.azure.kusto.ingest.source;
|
|||
|
||||
import java.util.UUID;
|
||||
|
||||
import static com.microsoft.azure.kusto.ingest.Ensure.stringIsNotBlank;
|
||||
import static com.microsoft.azure.kusto.data.Ensure.stringIsNotBlank;
|
||||
|
||||
|
||||
public class FileSourceInfo extends AbstractSourceInfo {
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
package com.microsoft.azure.kusto.ingest.source;
|
||||
|
||||
import com.microsoft.azure.kusto.ingest.Ensure;
|
||||
import com.microsoft.azure.kusto.data.Ensure;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.util.Objects;
|
||||
|
|
|
@ -23,7 +23,7 @@ public class Query {
|
|||
|
||||
KustoOperationResult results = client.execute(System.getProperty("dbName"), System.getProperty("query"));
|
||||
KustoResultSetTable mainTableResult = results.getPrimaryResults();
|
||||
System.out.println(String.format("Kusto sent back %s rows.", mainTableResult.count()));
|
||||
System.out.printf("Kusto sent back %s rows.%n", mainTableResult.count());
|
||||
|
||||
// iterate values
|
||||
while (mainTableResult.next()) {
|
||||
|
|
Загрузка…
Ссылка в новой задаче