This commit is contained in:
Yihezkel Schoenbrun 2024-07-31 16:08:19 +03:00
Родитель 626edb2707
Коммит 924635e377
2 изменённых файлов: 17 добавлений и 0 удалений

Просмотреть файл

@ -87,6 +87,7 @@ class ResourceManager implements Closeable, IngestionResourceManager {
@Override
public void close() {
log.warn("Yihezkel: Close ResourceManager start");
timer.cancel();
timer.purge();
timer = null;
@ -95,6 +96,7 @@ class ResourceManager implements Closeable, IngestionResourceManager {
} catch (IOException e) {
log.error("Couldn't close client: " + e.getMessage(), e);
}
log.warn("Yihezkel: Close ResourceManager done");
}
private void init() {
@ -102,6 +104,7 @@ class ResourceManager implements Closeable, IngestionResourceManager {
@Override
public void run() {
try {
log.warn("Yihezkel: RefreshIngestionResourcesTask.run start");
refreshIngestionResources();
timer.schedule(new RefreshIngestionResourcesTask(), defaultRefreshTime);
} catch (Exception e) {
@ -110,12 +113,14 @@ class ResourceManager implements Closeable, IngestionResourceManager {
timer.schedule(new RefreshIngestionResourcesTask(), refreshTimeOnFailure);
}
}
log.warn("Yihezkel: RefreshIngestionResourcesTask.run done");
}
}
class RefreshIngestionAuthTokenTask extends TimerTask {
@Override
public void run() {
log.warn("Yihezkel: RefreshIngestionAuthTokenTask.run start");
try {
refreshIngestionAuthToken();
timer.schedule(new RefreshIngestionAuthTokenTask(), defaultRefreshTime);
@ -125,6 +130,7 @@ class ResourceManager implements Closeable, IngestionResourceManager {
timer.schedule(new RefreshIngestionAuthTokenTask(), refreshTimeOnFailure);
}
}
log.warn("Yihezkel: RefreshIngestionAuthTokenTask.run done");
}
}
@ -156,6 +162,7 @@ class ResourceManager implements Closeable, IngestionResourceManager {
}
public String getIdentityToken() throws IngestionServiceException, IngestionClientException {
log.warn("Yihezkel: getIdentityToken start");
if (identityToken == null) {
refreshIngestionAuthToken();
try {
@ -167,6 +174,7 @@ class ResourceManager implements Closeable, IngestionResourceManager {
authTokenLock.readLock().unlock();
}
}
log.warn("Yihezkel: getIdentityToken done");
return identityToken;
}
@ -179,6 +187,7 @@ class ResourceManager implements Closeable, IngestionResourceManager {
}
private <T> IngestionResource<T> getResourceSet(Callable<IngestionResource<T>> resourceGetter) throws IngestionClientException, IngestionServiceException {
log.warn("Yihezkel: getResourceSet start");
IngestionResource<T> resource = null;
try {
resource = resourceGetter.call();
@ -204,6 +213,7 @@ class ResourceManager implements Closeable, IngestionResourceManager {
}
}
log.warn("Yihezkel: getResourceSet done");
return resource;
}
@ -239,6 +249,7 @@ class ResourceManager implements Closeable, IngestionResourceManager {
}
private void refreshIngestionResourcesImpl() throws IngestionClientException, IngestionServiceException {
log.warn("Yihezkel: RefreshIngestionResourcesTask.refreshIngestionResources start");
// Here we use tryLock(): If there is another instance doing the refresh, then just skip it.
if (ingestionResourcesLock.writeLock().tryLock()) {
try {
@ -268,6 +279,7 @@ class ResourceManager implements Closeable, IngestionResourceManager {
throw new IngestionClientException(e.getMessage(), e);
} finally {
ingestionResourcesLock.writeLock().unlock();
log.warn("Yihezkel: RefreshIngestionResourcesTask.refreshIngestionResources done");
}
}
}
@ -305,6 +317,7 @@ class ResourceManager implements Closeable, IngestionResourceManager {
}
private void refreshIngestionAuthTokenImpl() throws IngestionClientException, IngestionServiceException {
log.warn("Yihezkel: RefreshIngestionAuthTokenTask.refreshIngestionAuthToken start");
if (authTokenLock.writeLock().tryLock()) {
try {
log.info("Refreshing Ingestion Auth Token");
@ -318,6 +331,7 @@ class ResourceManager implements Closeable, IngestionResourceManager {
resultTable.next();
identityToken = resultTable.getString(0);
}
log.info("Refreshing Ingestion Auth Token Finished");
} catch (DataServiceException e) {
throw new IngestionServiceException(e.getIngestionSource(), "Error refreshing IngestionAuthToken. " + e.getMessage(), e);
} catch (DataClientException e) {
@ -326,6 +340,7 @@ class ResourceManager implements Closeable, IngestionResourceManager {
throw new IngestionClientException(e.getMessage(), e);
} finally {
authTokenLock.writeLock().unlock();
log.warn("Yihezkel: RefreshIngestionAuthTokenTask.refreshIngestionAuthToken done");
}
}
}

Просмотреть файл

@ -124,6 +124,8 @@ class E2ETest {
@AfterAll
public static void tearDown() {
System.out.printf(
"Yihezkel: Tearing down, to see whether this method is the cause vs is only called because of the timeout with a different underlying issue");
try {
queryClient.executeToJsonResult(databaseName, String.format(".drop table %s ifexists", tableName));
ingestClient.close();