From 3992ee6579b93dc7517c8a8498e0432a597ee6df Mon Sep 17 00:00:00 2001 From: Rabeea Abu-Saleh Date: Tue, 2 Oct 2018 11:05:56 +0300 Subject: [PATCH] Resource manager thread safe (#24) * Add ReadWriteLocks to make the Resource Manager thread safe * Remove String formatting from logs. * Use concurrent HashMap for ingestion resources map. --- .../kusto/ingest/AzureStorageHelper.java | 11 +- .../azure/kusto/ingest/IngestClientImpl.java | 10 +- .../azure/kusto/ingest/ResourceManager.java | 148 ++++++++++++------ .../kusto/ingest/IngestClientImplTest.java | 6 +- .../kusto/ingest/ResourceManagerTest.java | 16 +- samples/src/main/java/FileIngestion.java | 2 +- 6 files changed, 113 insertions(+), 80 deletions(-) diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/AzureStorageHelper.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/AzureStorageHelper.java index 8cf8458a..ef6edc73 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/AzureStorageHelper.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/AzureStorageHelper.java @@ -39,7 +39,7 @@ class AzureStorageHelper { } catch (Exception e) { - log.error(String.format("postMessageToQueue: %s.",e.getMessage()), e); + log.error("Error in postMessageToQueue", e); throw e; } } @@ -54,7 +54,7 @@ class AzureStorageHelper { public static CloudBlockBlob uploadLocalFileToBlob(String filePath, String blobName, String storageUri) throws Exception{ try { - log.debug(String.format("uploadLocalFileToBlob: filePath: %s, blobName: %s, storageUri: %s", filePath, blobName, storageUri)); + log.debug("uploadLocalFileToBlob: filePath: {}, blobName: {}, storageUri: {}", filePath, blobName, storageUri); // Check if the file is already compressed: boolean isCompressed = filePath.endsWith(".gz") || filePath.endsWith(".zip"); @@ -74,13 +74,14 @@ class AzureStorageHelper { } catch (StorageException se) { - log.error(String.format("uploadLocalFileToBlob: Error returned from the service. Http code: %d and error code: %s", se.getHttpStatusCode(), se.getErrorCode()), se); + log.error("uploadLocalFileToBlob: Error returned from the service. Http code: {}. error code: {}. file path: {}" + , se.getHttpStatusCode(), se.getErrorCode(), filePath, se); throw se; } catch (Exception ex) { - log.error(String.format("uploadLocalFileToBlob: Error while uploading file to blob."), ex); + log.error("uploadLocalFileToBlob: Error while uploading file to blob. file path: {}", filePath, ex); throw ex; } } @@ -97,7 +98,7 @@ class AzureStorageHelper { } public static CloudBlockBlob uploadStreamToBlob(InputStream inputStream, String blobName, String storageUri, boolean compress) throws IOException, URISyntaxException, StorageException { - log.debug(String.format("uploadLocalFileToBlob: blobName: %s, storageUri: %s", blobName, storageUri)); + log.debug("uploadLocalFileToBlob: blobName: {}, storageUri: {}", blobName, storageUri); CloudBlobContainer container = new CloudBlobContainer(new URI(storageUri)); CloudBlockBlob blob = container.getBlockBlobReference(blobName+ (compress?".gz":"")); BlobOutputStream bos = blob.openOutputStream(); diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClientImpl.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClientImpl.java index 8ce812c3..f7701dff 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClientImpl.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClientImpl.java @@ -63,7 +63,7 @@ class IngestClientImpl implements IngestClient { } if (ingestionProperties.getReportMethod() != IngestionProperties.IngestionReportMethod.Queue) { - String tableStatusUri = resourceManager.getIngestionResource(ResourceManager.ResourceTypes.INGESTIONS_STATUS_TABLE); + String tableStatusUri = resourceManager.getIngestionResource(ResourceManager.ResourceType.INGESTIONS_STATUS_TABLE); ingestionBlobInfo.IngestionStatusInTable = new IngestionStatusInTableDescription(); ingestionBlobInfo.IngestionStatusInTable.TableConnectionString = tableStatusUri; ingestionBlobInfo.IngestionStatusInTable.RowKey = ingestionBlobInfo.id.toString(); @@ -85,7 +85,7 @@ class IngestClientImpl implements IngestClient { String serializedIngestionBlobInfo = objectMapper.writeValueAsString(ingestionBlobInfo); postMessageToQueue( - resourceManager.getIngestionResource(ResourceManager.ResourceTypes.SECURED_READY_FOR_AGGREGATION_QUEUE) + resourceManager.getIngestionResource(ResourceManager.ResourceType.SECURED_READY_FOR_AGGREGATION_QUEUE) , serializedIngestionBlobInfo); } catch (Exception ex) { @@ -106,7 +106,7 @@ class IngestClientImpl implements IngestClient { try { String fileName = (new File(fileSourceInfo.getFilePath())).getName(); String blobName = genBlobName(fileName, ingestionProperties.getDatabaseName(), ingestionProperties.getTableName()); - CloudBlockBlob blob = uploadLocalFileToBlob(fileSourceInfo.getFilePath(), blobName, resourceManager.getIngestionResource(ResourceManager.ResourceTypes.TEMP_STORAGE)); + CloudBlockBlob blob = uploadLocalFileToBlob(fileSourceInfo.getFilePath(), blobName, resourceManager.getIngestionResource(ResourceManager.ResourceType.TEMP_STORAGE)); String blobPath = AzureStorageHelper.getBlobPathWithSas(blob); long rawDataSize = fileSourceInfo.getRawSizeInBytes() > 0L ? fileSourceInfo.getRawSizeInBytes() : estimateFileRawSize(fileSourceInfo.getFilePath()); @@ -132,7 +132,7 @@ class IngestClientImpl implements IngestClient { CloudBlockBlob blob = uploadStreamToBlob( streamSourceInfo.getStream(), blobName, - resourceManager.getIngestionResource(ResourceManager.ResourceTypes.TEMP_STORAGE), + resourceManager.getIngestionResource(ResourceManager.ResourceType.TEMP_STORAGE), true ); String blobPath = AzureStorageHelper.getBlobPathWithSas(blob); @@ -144,7 +144,7 @@ class IngestClientImpl implements IngestClient { } return ingestionResult; } catch (Exception ex) { - log.error(String.format("ingestFromStream: Error while ingesting from stream. Error: %s", ex.getMessage()), ex); + log.error("ingestFromStream: Error while ingesting from stream.", ex); throw ex; } } diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ResourceManager.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ResourceManager.java index a006d0e6..9c32a1dd 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ResourceManager.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/ResourceManager.java @@ -9,10 +9,13 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; class ResourceManager { - public enum ResourceTypes{ + public enum ResourceType { SECURED_READY_FOR_AGGREGATION_QUEUE("SecuredReadyForAggregationQueue"), FAILED_INGESTIONS_QUEUE("FailedIngestionsQueue"), SUCCESSFUL_INGESTIONS_QUEUE("SuccessfulIngestionsQueue"), @@ -21,37 +24,39 @@ class ResourceManager { private String name; - ResourceTypes(String name) { + ResourceType(String name) { this.name = name; } - String getName(){ + String getName() { return name; } } - private ResourceTypes getResourceTypeByName(String name){ - for (ResourceTypes t : ResourceTypes.values()){ - if (t.name.equalsIgnoreCase(name)){ + private ResourceType getResourceTypeByName(String name) { + for (ResourceType t : ResourceType.values()) { + if (t.name.equalsIgnoreCase(name)) { return t; } } return null; } - private HashMap ingestionResources; + //Ingestion Resources + private ConcurrentHashMap ingestionResources; //Identity Token private String identityToken; private KustoClient kustoClient; - private final long REFRESH_INGESTION_RESOURCES_PERIOD = 1000 * 60 * 60 * 1; // 1 hour - private Timer timer = new Timer(true); private final Logger log = LoggerFactory.getLogger(ResourceManager.class); - public ResourceManager(KustoClient kustoClient) throws Exception { + private ReadWriteLock ingestionResourcesLock = new ReentrantReadWriteLock(); + private ReadWriteLock authTokenLock = new ReentrantReadWriteLock(); + + ResourceManager(KustoClient kustoClient) throws Exception { this.kustoClient = kustoClient; - ingestionResources = new HashMap<>(); + ingestionResources = new ConcurrentHashMap<>(); TimerTask refreshIngestionResourceValuesTask = new TimerTask() { @Override @@ -59,7 +64,7 @@ class ResourceManager { try { refreshIngestionResources(); } catch (Exception e) { - log.error(String.format("Error in refreshIngestionResources: %s.", e.getMessage()), e); + log.error("Error in refreshIngestionResources.", e); } } }; @@ -70,97 +75,136 @@ class ResourceManager { try { refreshIngestionAuthToken(); } catch (Exception e) { - log.error(String.format("Error in refreshIngestionAuthToken: %s.", e.getMessage()), e); + log.error("Error in refreshIngestionAuthToken.", e); } } }; try { + Timer timer = new Timer(true); + long REFRESH_INGESTION_RESOURCES_PERIOD = 1000 * 60 * 60; // 1 hour timer.schedule(refreshIngestionAuthTokenTask, 0, REFRESH_INGESTION_RESOURCES_PERIOD); timer.schedule(refreshIngestionResourceValuesTask, 0, REFRESH_INGESTION_RESOURCES_PERIOD); } catch (Exception e) { - log.error(String.format("Error in initializing ResourceManager: %s.", e.getMessage()), e); + log.error("Error in initializing ResourceManager.", e); throw e; } } - public void clean() { - ingestionResources.clear(); + String getIngestionResource(ResourceType resourceType) throws Exception { + if (!ingestionResources.containsKey(resourceType)) { + // When the value is not available, we need to get the tokens from Kusto (refresh): + refreshIngestionResources(); + try { + // If the write lock is locked, then the read will wait here. + // In other words if the refresh is running yet, then wait until it ends: + ingestionResourcesLock.readLock().lock(); + if (!ingestionResources.containsKey(resourceType)) { + throw new Exception("Unable to get ingestion resources for this type: " + resourceType.getName()); + } + } finally { + ingestionResourcesLock.readLock().unlock(); + } + } + return ingestionResources.get(resourceType).nextValue(); } - public String getKustoIdentityToken() throws Exception { + String getKustoIdentityToken() throws Exception { if (identityToken == null) { refreshIngestionAuthToken(); - if (identityToken == null) { - throw new Exception("Unable to get Identity token"); + try { + authTokenLock.readLock().lock(); + if (identityToken == null) { + throw new Exception("Unable to get Identity token"); + } + } finally { + authTokenLock.readLock().unlock(); } } return identityToken; } - public String getIngestionResource(ResourceTypes resourceType) throws Exception { + private void addIngestionResource(HashMap ingestionResources, String key, String value) { + ResourceType resourceType = getResourceTypeByName(key); if (!ingestionResources.containsKey(resourceType)) { - refreshIngestionResources(); - if (!ingestionResources.containsKey(resourceType)) { - throw new Exception("Unable to get ingestion resources for this type: " + resourceType.getName()); - } - } - - return ingestionResources.get(resourceType).nextValue(); - } - - int getSize(ResourceTypes resourceType){ - return ingestionResources.containsKey(resourceType) ? ingestionResources.get(resourceType).getSize() : 0; - } - - private void addValue(String key, String value) { - ResourceTypes resourceType = getResourceTypeByName(key); - if(!ingestionResources.containsKey(resourceType)){ ingestionResources.put(resourceType, new IngestionResource(resourceType)); } ingestionResources.get(resourceType).addValue(value); } private void refreshIngestionResources() throws Exception { - log.info("Refreshing Ingestion Resources"); - KustoResults ingestionResourcesResults = kustoClient.execute(Commands.INGESTION_RESOURCES_SHOW_COMMAND); - ArrayList> values = ingestionResourcesResults.getValues(); + // Here we use tryLock(): If there is another instance doing the refresh, then just skip it. + if (ingestionResourcesLock.writeLock().tryLock()) { + try { + log.info("Refreshing Ingestion Resources"); + KustoResults ingestionResourcesResults = kustoClient.execute(Commands.INGESTION_RESOURCES_SHOW_COMMAND); + if (ingestionResourcesResults != null && ingestionResourcesResults.getValues() != null) { + HashMap newIngestionResources = new HashMap<>(); + // Add the received values to a new IngestionResources map: + ingestionResourcesResults.getValues().forEach(pairValues -> { + String key = pairValues.get(0); + String value = pairValues.get(1); + addIngestionResource(newIngestionResources, key, value); + }); + // Replace the values in the ingestionResources map with the values in the new map: + putIngestionResourceValues(ingestionResources, newIngestionResources); + } + } finally { + ingestionResourcesLock.writeLock().unlock(); + } + } + } - clean(); + private void putIngestionResourceValues(ConcurrentHashMap ingestionResources, HashMap newIngestionResources) { + // Update the values in the original resources map: + newIngestionResources.keySet().forEach( + k -> ingestionResources.put(k, newIngestionResources.get(k)) + ); - values.forEach(pairValues -> { - String key = pairValues.get(0); - String value = pairValues.get(1); - addValue(key, value); + // Remove the key-value pairs that are not existing in the new resources map: + ingestionResources.keySet().forEach(k -> { + if (!newIngestionResources.containsKey(k)) { + ingestionResources.remove(k); + } }); } private void refreshIngestionAuthToken() throws Exception { - log.info("Refreshing Ingestion Auth Token"); - KustoResults identityTokenResult = kustoClient.execute(Commands.KUSTO_IDENTITY_GET_COMMAND); - identityToken = identityTokenResult.getValues().get(0).get(identityTokenResult.getIndexByColumnName("AuthorizationContext")); + if (authTokenLock.writeLock().tryLock()) { + try { + log.info("Refreshing Ingestion Auth Token"); + KustoResults identityTokenResult = kustoClient.execute(Commands.KUSTO_IDENTITY_GET_COMMAND); + if (identityTokenResult != null + && identityTokenResult.getValues() != null + && identityTokenResult.getValues().size() > 0) { + identityToken = identityTokenResult.getValues().get(0).get(identityTokenResult.getIndexByColumnName("AuthorizationContext")); + } + } finally { + authTokenLock.writeLock().unlock(); + } + } } private class IngestionResource { - ResourceTypes type; + ResourceType type; int roundRubinIdx = 0; ArrayList valuesList; - IngestionResource(ResourceTypes resourceType){ + IngestionResource(ResourceType resourceType) { this.type = resourceType; valuesList = new ArrayList<>(); } - void addValue(String val){ + void addValue(String val) { valuesList.add(val); } - int getSize(){ + int getSize() { return valuesList.size(); } - String nextValue(){ + String nextValue() { roundRubinIdx = (roundRubinIdx + 1) % valuesList.size(); return valuesList.get(roundRubinIdx); } diff --git a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/IngestClientImplTest.java b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/IngestClientImplTest.java index 3117e493..df0c0445 100644 --- a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/IngestClientImplTest.java +++ b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/IngestClientImplTest.java @@ -29,15 +29,15 @@ class IngestClientImplTest { ingestClientMock = mock(IngestClient.class); ingestClientMockImpl = mock(IngestClientImpl.class); - when(resourceManagerMock.getIngestionResource(ResourceManager.ResourceTypes.SECURED_READY_FOR_AGGREGATION_QUEUE)) + when(resourceManagerMock.getIngestionResource(ResourceManager.ResourceType.SECURED_READY_FOR_AGGREGATION_QUEUE)) .thenReturn("queue1") .thenReturn("queue2"); - when(resourceManagerMock.getIngestionResource(ResourceManager.ResourceTypes.TEMP_STORAGE)) + when(resourceManagerMock.getIngestionResource(ResourceManager.ResourceType.TEMP_STORAGE)) .thenReturn("storage1") .thenReturn("storage2"); - when(resourceManagerMock.getIngestionResource(ResourceManager.ResourceTypes.INGESTIONS_STATUS_TABLE)) + when(resourceManagerMock.getIngestionResource(ResourceManager.ResourceType.INGESTIONS_STATUS_TABLE)) .thenReturn("statusTable"); when(resourceManagerMock.getKustoIdentityToken()) diff --git a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/ResourceManagerTest.java b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/ResourceManagerTest.java index 5dc55340..06279ea9 100644 --- a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/ResourceManagerTest.java +++ b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/ResourceManagerTest.java @@ -64,7 +64,7 @@ class ResourceManagerTest { HashMap m = new HashMap(); for(int i=0; i<10; i++){ - storage = resourceManager.getIngestionResource(ResourceManager.ResourceTypes.TEMP_STORAGE); + storage = resourceManager.getIngestionResource(ResourceManager.ResourceType.TEMP_STORAGE); m.put(storage,m.getOrDefault(storage,0)+1); } @@ -82,7 +82,7 @@ class ResourceManagerTest { HashMap m = new HashMap(); for(int i=0; i<10; i++){ - queueName = resourceManager.getIngestionResource(ResourceManager.ResourceTypes.SECURED_READY_FOR_AGGREGATION_QUEUE); + queueName = resourceManager.getIngestionResource(ResourceManager.ResourceType.SECURED_READY_FOR_AGGREGATION_QUEUE); m.put(queueName,m.getOrDefault(queueName,0)+1); } @@ -93,18 +93,6 @@ class ResourceManagerTest { } } - //@Test (Not ready yet) - void clean() { - try{ - resourceManager.clean(); - for(ResourceManager.ResourceTypes resourceType : ResourceManager.ResourceTypes.values()){ - assertEquals(0, resourceManager.getSize(resourceType)); - } - } catch (Exception e) { - e.printStackTrace(); - } - } - private KustoResults generateIngestionResourcesResult() { HashMap colNameToIndexMap = new HashMap<>(); HashMap colNameToTypeMap = new HashMap<>(); diff --git a/samples/src/main/java/FileIngestion.java b/samples/src/main/java/FileIngestion.java index 1ab70374..69b04b08 100644 --- a/samples/src/main/java/FileIngestion.java +++ b/samples/src/main/java/FileIngestion.java @@ -32,4 +32,4 @@ public class FileIngestion { e.printStackTrace(); } } -} +} \ No newline at end of file