Add retry mechanism for potentiality throttled operations. (#237)

This commit is contained in:
alonadam 2022-04-06 17:03:18 +03:00 коммит произвёл GitHub
Родитель 0a7f304ea5
Коммит 2847786036
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
8 изменённых файлов: 72 добавлений и 20 удалений

Просмотреть файл

@ -4,11 +4,7 @@
package com.microsoft.azure.kusto.data;
import com.microsoft.azure.kusto.data.auth.CloudInfo;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.data.exceptions.DataWebException;
import com.microsoft.azure.kusto.data.exceptions.OneApiError;
import com.microsoft.azure.kusto.data.exceptions.WebException;
import com.microsoft.azure.kusto.data.exceptions.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
@ -74,10 +70,13 @@ class Utils {
if (entity != null) {
StatusLine statusLine = response.getStatusLine();
String responseContent = EntityUtils.toString(entity);
if (statusLine.getStatusCode() == 200) {
return responseContent;
} else {
throw createExceptionFromResponse(urlStr, response, null, responseContent);
switch (statusLine.getStatusCode()) {
case HttpStatus.SC_OK:
return responseContent;
case HttpStatus.SC_TOO_MANY_REQUESTS:
throw new ThrottleException(urlStr);
default:
throw createExceptionFromResponse(urlStr, response, null, responseContent);
}
}
} catch (SocketTimeoutException e) {

Просмотреть файл

@ -14,11 +14,6 @@ public class DataServiceException extends KustoDataExceptionBase {
super(ingestionSource, message, exception, isPermanent);
}
public boolean is404Error() {
Integer code = getStatusCode();
return code != null && code == 404;
}
@Nullable
public Integer getStatusCode() {
Throwable cause = getCause();

Просмотреть файл

@ -0,0 +1,9 @@
package com.microsoft.azure.kusto.data.exceptions;
public class ThrottleException extends DataServiceException {
public static final String ERROR_MESSAGE = "Request was throttled, too many requests.";
public ThrottleException(String ingestionSource) {
super(ingestionSource, ERROR_MESSAGE, false);
}
}

Просмотреть файл

@ -6,6 +6,7 @@ import com.microsoft.azure.kusto.data.exceptions.DataWebException;
import org.apache.http.ProtocolVersion;
import org.apache.http.message.BasicHttpResponse;
import org.apache.http.message.BasicStatusLine;
import org.apache.http.HttpStatus;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
@ -53,7 +54,7 @@ class UtilitiesTest {
void createExceptionFromResponse404Error() {
BasicHttpResponse basicHttpResponse = getBasicHttpResponse(404);
DataServiceException error = Utils.createExceptionFromResponse("https://sample.kusto.windows.net", basicHttpResponse, new Exception(), "error");
Assertions.assertTrue(error.is404Error());
Assertions.assertTrue(error.getStatusCode() != null && error.getStatusCode() == HttpStatus.SC_NOT_FOUND);
}
@Test

Просмотреть файл

@ -256,5 +256,20 @@
<artifactId>httpcore</artifactId>
<version>${httpcore.version}</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-retry</artifactId>
<version>${resilience4j.version}</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-core</artifactId>
<version>${resilience4j.version}</version>
</dependency>
<dependency>
<groupId>io.vavr</groupId>
<artifactId>vavr</artifactId>
<version>${io.vavr.version}</version>
</dependency>
</dependencies>
</project>

Просмотреть файл

@ -8,8 +8,13 @@ import com.microsoft.azure.kusto.data.KustoOperationResult;
import com.microsoft.azure.kusto.data.KustoResultSetTable;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.data.exceptions.ThrottleException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.retry.Retry;
import io.vavr.CheckedFunction0;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -57,15 +62,20 @@ class ResourceManager implements Closeable {
private ReadWriteLock authTokenLock = new ReentrantReadWriteLock();
private static final long REFRESH_INGESTION_RESOURCES_PERIOD = 1000L * 60 * 60; // 1 hour
private static final long REFRESH_INGESTION_RESOURCES_PERIOD_ON_FAILURE = 1000L * 60 * 15; // 15 minutes
private static final int MAX_RETRY_ATTEMPTS = 4;
private static final long MAX_RETRY_INTERVAL = 1000L * 30;
private static final long BASE_INTERVAL = 1000L * 2;
private final Long defaultRefreshTime;
private final Long refreshTimeOnFailure;
public static final String SERVICE_TYPE_COLUMN_NAME = "ServiceType";
private final RetryConfig retryConfig;
ResourceManager(Client client, long defaultRefreshTime, long refreshTimeOnFailure) {
this.defaultRefreshTime = defaultRefreshTime;
this.refreshTimeOnFailure = refreshTimeOnFailure;
this.client = client;
timer = new Timer(true);
retryConfig = buildRetryConfig();
init();
}
@ -79,6 +89,18 @@ class ResourceManager implements Closeable {
timer.purge();
}
private RetryConfig buildRetryConfig() {
IntervalFunction sleepConfig = IntervalFunction.ofExponentialRandomBackoff(BASE_INTERVAL,
IntervalFunction.DEFAULT_MULTIPLIER,
IntervalFunction.DEFAULT_RANDOMIZATION_FACTOR,
MAX_RETRY_INTERVAL);
return RetryConfig.custom()
.maxAttempts(MAX_RETRY_ATTEMPTS)
.intervalFunction(sleepConfig)
.retryExceptions(ThrottleException.class)
.build();
}
private void init() {
ingestionResources = Collections.synchronizedMap(new EnumMap<>(ResourceType.class));
@ -162,7 +184,10 @@ class ResourceManager implements Closeable {
if (ingestionResourcesLock.writeLock().tryLock()) {
try {
log.info("Refreshing Ingestion Resources");
KustoOperationResult ingestionResourcesResults = client.execute(Commands.INGESTION_RESOURCES_SHOW_COMMAND);
Retry retry = Retry.of("get ingestion resources", this.retryConfig);
CheckedFunction0<KustoOperationResult> retryExecute = Retry.decorateCheckedSupplier(retry,
() -> client.execute(Commands.INGESTION_RESOURCES_SHOW_COMMAND));
KustoOperationResult ingestionResourcesResults = retryExecute.apply();
ingestionResources = Collections.synchronizedMap(new EnumMap<>(ResourceType.class));
if (ingestionResourcesResults != null && ingestionResourcesResults.hasNext()) {
KustoResultSetTable table = ingestionResourcesResults.next();
@ -177,6 +202,8 @@ class ResourceManager implements Closeable {
throw new IngestionServiceException(e.getIngestionSource(), "Error refreshing IngestionResources", e);
} catch (DataClientException e) {
throw new IngestionClientException(e.getIngestionSource(), "Error refreshing IngestionResources", e);
} catch (Throwable e) {
throw new IngestionClientException(e.getMessage());
} finally {
ingestionResourcesLock.writeLock().unlock();
}
@ -187,7 +214,9 @@ class ResourceManager implements Closeable {
if (authTokenLock.writeLock().tryLock()) {
try {
log.info("Refreshing Ingestion Auth Token");
KustoOperationResult identityTokenResult = client.execute(Commands.IDENTITY_GET_COMMAND);
Retry retry = Retry.of("get Ingestion Auth Token resources", this.retryConfig);
CheckedFunction0<KustoOperationResult> retryExecute = Retry.decorateCheckedSupplier(retry, () -> client.execute(Commands.IDENTITY_GET_COMMAND));
KustoOperationResult identityTokenResult = retryExecute.apply();
if (identityTokenResult != null
&& identityTokenResult.hasNext()
&& !identityTokenResult.getResultTables().isEmpty()) {
@ -199,6 +228,8 @@ class ResourceManager implements Closeable {
throw new IngestionServiceException(e.getIngestionSource(), "Error refreshing IngestionAuthToken", e);
} catch (DataClientException e) {
throw new IngestionClientException(e.getIngestionSource(), "Error refreshing IngestionAuthToken", e);
} catch (Throwable e) {
throw new IngestionClientException(e.getMessage());
} finally {
authTokenLock.writeLock().unlock();
}

Просмотреть файл

@ -26,6 +26,7 @@ import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.HttpStatus;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -168,7 +169,7 @@ public class StreamingIngestClient extends IngestClientBase implements IngestCli
throw new IngestionClientException(e.getMessage(), e);
} catch (DataServiceException e) {
log.error(e.getMessage(), e);
if (e.is404Error()) {
if (e.getStatusCode() != null && e.getStatusCode() == HttpStatus.SC_NOT_FOUND) {
validateEndpointServiceType(connectionDataSource, EXPECTED_SERVICE_TYPE);
}
throw new IngestionServiceException(e.getMessage(), e);

Просмотреть файл

@ -43,7 +43,7 @@
<commons-text.version>1.9</commons-text.version>
<msal4j.version>1.11.0</msal4j.version>
<httpclient.version>4.5.13</httpclient.version>
<httpcore.version>4.4.13</httpcore.version>
<httpcore.version>4.4.15</httpcore.version>
<json.version>20201115</json.version>
<fasterxml.jackson.core.version>[2.11.0,2.13.0]</fasterxml.jackson.core.version>
<azure-storage.version>8.6.6</azure-storage.version>
@ -51,7 +51,8 @@
<azure-core.version>1.21.0</azure-core.version>
<reactor-core.version>3.4.10</reactor-core.version>
<univocity-parsers.version>2.9.1</univocity-parsers.version>
<resilience4j.version>1.7.1</resilience4j.version>
<io.vavr.version>0.10.2</io.vavr.version>
<!-- Test dependencies -->
<bouncycastle.version>1.68</bouncycastle.version>
<jsonassert.version>1.5.0</jsonassert.version>