* Add ReadWriteLocks to make the Resource Manager thread safe
* Remove String formatting from logs.
* Use concurrent HashMap for ingestion resources map.
This commit is contained in:
Rabeea Abu-Saleh 2018-10-02 11:05:56 +03:00 коммит произвёл GitHub
Родитель 698b0aa8fb
Коммит 3992ee6579
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
6 изменённых файлов: 113 добавлений и 80 удалений

Просмотреть файл

@ -39,7 +39,7 @@ class AzureStorageHelper {
}
catch (Exception e)
{
log.error(String.format("postMessageToQueue: %s.",e.getMessage()), e);
log.error("Error in postMessageToQueue", e);
throw e;
}
}
@ -54,7 +54,7 @@ class AzureStorageHelper {
public static CloudBlockBlob uploadLocalFileToBlob(String filePath, String blobName, String storageUri) throws Exception{
try {
log.debug(String.format("uploadLocalFileToBlob: filePath: %s, blobName: %s, storageUri: %s", filePath, blobName, storageUri));
log.debug("uploadLocalFileToBlob: filePath: {}, blobName: {}, storageUri: {}", filePath, blobName, storageUri);
// Check if the file is already compressed:
boolean isCompressed = filePath.endsWith(".gz") || filePath.endsWith(".zip");
@ -74,13 +74,14 @@ class AzureStorageHelper {
}
catch (StorageException se)
{
log.error(String.format("uploadLocalFileToBlob: Error returned from the service. Http code: %d and error code: %s", se.getHttpStatusCode(), se.getErrorCode()), se);
log.error("uploadLocalFileToBlob: Error returned from the service. Http code: {}. error code: {}. file path: {}"
, se.getHttpStatusCode(), se.getErrorCode(), filePath, se);
throw se;
}
catch (Exception ex)
{
log.error(String.format("uploadLocalFileToBlob: Error while uploading file to blob."), ex);
log.error("uploadLocalFileToBlob: Error while uploading file to blob. file path: {}", filePath, ex);
throw ex;
}
}
@ -97,7 +98,7 @@ class AzureStorageHelper {
}
public static CloudBlockBlob uploadStreamToBlob(InputStream inputStream, String blobName, String storageUri, boolean compress) throws IOException, URISyntaxException, StorageException {
log.debug(String.format("uploadLocalFileToBlob: blobName: %s, storageUri: %s", blobName, storageUri));
log.debug("uploadLocalFileToBlob: blobName: {}, storageUri: {}", blobName, storageUri);
CloudBlobContainer container = new CloudBlobContainer(new URI(storageUri));
CloudBlockBlob blob = container.getBlockBlobReference(blobName+ (compress?".gz":""));
BlobOutputStream bos = blob.openOutputStream();

Просмотреть файл

@ -63,7 +63,7 @@ class IngestClientImpl implements IngestClient {
}
if (ingestionProperties.getReportMethod() != IngestionProperties.IngestionReportMethod.Queue) {
String tableStatusUri = resourceManager.getIngestionResource(ResourceManager.ResourceTypes.INGESTIONS_STATUS_TABLE);
String tableStatusUri = resourceManager.getIngestionResource(ResourceManager.ResourceType.INGESTIONS_STATUS_TABLE);
ingestionBlobInfo.IngestionStatusInTable = new IngestionStatusInTableDescription();
ingestionBlobInfo.IngestionStatusInTable.TableConnectionString = tableStatusUri;
ingestionBlobInfo.IngestionStatusInTable.RowKey = ingestionBlobInfo.id.toString();
@ -85,7 +85,7 @@ class IngestClientImpl implements IngestClient {
String serializedIngestionBlobInfo = objectMapper.writeValueAsString(ingestionBlobInfo);
postMessageToQueue(
resourceManager.getIngestionResource(ResourceManager.ResourceTypes.SECURED_READY_FOR_AGGREGATION_QUEUE)
resourceManager.getIngestionResource(ResourceManager.ResourceType.SECURED_READY_FOR_AGGREGATION_QUEUE)
, serializedIngestionBlobInfo);
} catch (Exception ex) {
@ -106,7 +106,7 @@ class IngestClientImpl implements IngestClient {
try {
String fileName = (new File(fileSourceInfo.getFilePath())).getName();
String blobName = genBlobName(fileName, ingestionProperties.getDatabaseName(), ingestionProperties.getTableName());
CloudBlockBlob blob = uploadLocalFileToBlob(fileSourceInfo.getFilePath(), blobName, resourceManager.getIngestionResource(ResourceManager.ResourceTypes.TEMP_STORAGE));
CloudBlockBlob blob = uploadLocalFileToBlob(fileSourceInfo.getFilePath(), blobName, resourceManager.getIngestionResource(ResourceManager.ResourceType.TEMP_STORAGE));
String blobPath = AzureStorageHelper.getBlobPathWithSas(blob);
long rawDataSize = fileSourceInfo.getRawSizeInBytes() > 0L ? fileSourceInfo.getRawSizeInBytes() :
estimateFileRawSize(fileSourceInfo.getFilePath());
@ -132,7 +132,7 @@ class IngestClientImpl implements IngestClient {
CloudBlockBlob blob = uploadStreamToBlob(
streamSourceInfo.getStream(),
blobName,
resourceManager.getIngestionResource(ResourceManager.ResourceTypes.TEMP_STORAGE),
resourceManager.getIngestionResource(ResourceManager.ResourceType.TEMP_STORAGE),
true
);
String blobPath = AzureStorageHelper.getBlobPathWithSas(blob);
@ -144,7 +144,7 @@ class IngestClientImpl implements IngestClient {
}
return ingestionResult;
} catch (Exception ex) {
log.error(String.format("ingestFromStream: Error while ingesting from stream. Error: %s", ex.getMessage()), ex);
log.error("ingestFromStream: Error while ingesting from stream.", ex);
throw ex;
}
}

Просмотреть файл

@ -9,10 +9,13 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class ResourceManager {
public enum ResourceTypes{
public enum ResourceType {
SECURED_READY_FOR_AGGREGATION_QUEUE("SecuredReadyForAggregationQueue"),
FAILED_INGESTIONS_QUEUE("FailedIngestionsQueue"),
SUCCESSFUL_INGESTIONS_QUEUE("SuccessfulIngestionsQueue"),
@ -21,37 +24,39 @@ class ResourceManager {
private String name;
ResourceTypes(String name) {
ResourceType(String name) {
this.name = name;
}
String getName(){
String getName() {
return name;
}
}
private ResourceTypes getResourceTypeByName(String name){
for (ResourceTypes t : ResourceTypes.values()){
if (t.name.equalsIgnoreCase(name)){
private ResourceType getResourceTypeByName(String name) {
for (ResourceType t : ResourceType.values()) {
if (t.name.equalsIgnoreCase(name)) {
return t;
}
}
return null;
}
private HashMap<ResourceTypes, IngestionResource> ingestionResources;
//Ingestion Resources
private ConcurrentHashMap<ResourceType, IngestionResource> ingestionResources;
//Identity Token
private String identityToken;
private KustoClient kustoClient;
private final long REFRESH_INGESTION_RESOURCES_PERIOD = 1000 * 60 * 60 * 1; // 1 hour
private Timer timer = new Timer(true);
private final Logger log = LoggerFactory.getLogger(ResourceManager.class);
public ResourceManager(KustoClient kustoClient) throws Exception {
private ReadWriteLock ingestionResourcesLock = new ReentrantReadWriteLock();
private ReadWriteLock authTokenLock = new ReentrantReadWriteLock();
ResourceManager(KustoClient kustoClient) throws Exception {
this.kustoClient = kustoClient;
ingestionResources = new HashMap<>();
ingestionResources = new ConcurrentHashMap<>();
TimerTask refreshIngestionResourceValuesTask = new TimerTask() {
@Override
@ -59,7 +64,7 @@ class ResourceManager {
try {
refreshIngestionResources();
} catch (Exception e) {
log.error(String.format("Error in refreshIngestionResources: %s.", e.getMessage()), e);
log.error("Error in refreshIngestionResources.", e);
}
}
};
@ -70,97 +75,136 @@ class ResourceManager {
try {
refreshIngestionAuthToken();
} catch (Exception e) {
log.error(String.format("Error in refreshIngestionAuthToken: %s.", e.getMessage()), e);
log.error("Error in refreshIngestionAuthToken.", e);
}
}
};
try {
Timer timer = new Timer(true);
long REFRESH_INGESTION_RESOURCES_PERIOD = 1000 * 60 * 60; // 1 hour
timer.schedule(refreshIngestionAuthTokenTask, 0, REFRESH_INGESTION_RESOURCES_PERIOD);
timer.schedule(refreshIngestionResourceValuesTask, 0, REFRESH_INGESTION_RESOURCES_PERIOD);
} catch (Exception e) {
log.error(String.format("Error in initializing ResourceManager: %s.", e.getMessage()), e);
log.error("Error in initializing ResourceManager.", e);
throw e;
}
}
public void clean() {
ingestionResources.clear();
String getIngestionResource(ResourceType resourceType) throws Exception {
if (!ingestionResources.containsKey(resourceType)) {
// When the value is not available, we need to get the tokens from Kusto (refresh):
refreshIngestionResources();
try {
// If the write lock is locked, then the read will wait here.
// In other words if the refresh is running yet, then wait until it ends:
ingestionResourcesLock.readLock().lock();
if (!ingestionResources.containsKey(resourceType)) {
throw new Exception("Unable to get ingestion resources for this type: " + resourceType.getName());
}
} finally {
ingestionResourcesLock.readLock().unlock();
}
}
return ingestionResources.get(resourceType).nextValue();
}
public String getKustoIdentityToken() throws Exception {
String getKustoIdentityToken() throws Exception {
if (identityToken == null) {
refreshIngestionAuthToken();
if (identityToken == null) {
throw new Exception("Unable to get Identity token");
try {
authTokenLock.readLock().lock();
if (identityToken == null) {
throw new Exception("Unable to get Identity token");
}
} finally {
authTokenLock.readLock().unlock();
}
}
return identityToken;
}
public String getIngestionResource(ResourceTypes resourceType) throws Exception {
private void addIngestionResource(HashMap<ResourceType, IngestionResource> ingestionResources, String key, String value) {
ResourceType resourceType = getResourceTypeByName(key);
if (!ingestionResources.containsKey(resourceType)) {
refreshIngestionResources();
if (!ingestionResources.containsKey(resourceType)) {
throw new Exception("Unable to get ingestion resources for this type: " + resourceType.getName());
}
}
return ingestionResources.get(resourceType).nextValue();
}
int getSize(ResourceTypes resourceType){
return ingestionResources.containsKey(resourceType) ? ingestionResources.get(resourceType).getSize() : 0;
}
private void addValue(String key, String value) {
ResourceTypes resourceType = getResourceTypeByName(key);
if(!ingestionResources.containsKey(resourceType)){
ingestionResources.put(resourceType, new IngestionResource(resourceType));
}
ingestionResources.get(resourceType).addValue(value);
}
private void refreshIngestionResources() throws Exception {
log.info("Refreshing Ingestion Resources");
KustoResults ingestionResourcesResults = kustoClient.execute(Commands.INGESTION_RESOURCES_SHOW_COMMAND);
ArrayList<ArrayList<String>> values = ingestionResourcesResults.getValues();
// Here we use tryLock(): If there is another instance doing the refresh, then just skip it.
if (ingestionResourcesLock.writeLock().tryLock()) {
try {
log.info("Refreshing Ingestion Resources");
KustoResults ingestionResourcesResults = kustoClient.execute(Commands.INGESTION_RESOURCES_SHOW_COMMAND);
if (ingestionResourcesResults != null && ingestionResourcesResults.getValues() != null) {
HashMap<ResourceType, IngestionResource> newIngestionResources = new HashMap<>();
// Add the received values to a new IngestionResources map:
ingestionResourcesResults.getValues().forEach(pairValues -> {
String key = pairValues.get(0);
String value = pairValues.get(1);
addIngestionResource(newIngestionResources, key, value);
});
// Replace the values in the ingestionResources map with the values in the new map:
putIngestionResourceValues(ingestionResources, newIngestionResources);
}
} finally {
ingestionResourcesLock.writeLock().unlock();
}
}
}
clean();
private void putIngestionResourceValues(ConcurrentHashMap<ResourceType, IngestionResource> ingestionResources, HashMap<ResourceType, IngestionResource> newIngestionResources) {
// Update the values in the original resources map:
newIngestionResources.keySet().forEach(
k -> ingestionResources.put(k, newIngestionResources.get(k))
);
values.forEach(pairValues -> {
String key = pairValues.get(0);
String value = pairValues.get(1);
addValue(key, value);
// Remove the key-value pairs that are not existing in the new resources map:
ingestionResources.keySet().forEach(k -> {
if (!newIngestionResources.containsKey(k)) {
ingestionResources.remove(k);
}
});
}
private void refreshIngestionAuthToken() throws Exception {
log.info("Refreshing Ingestion Auth Token");
KustoResults identityTokenResult = kustoClient.execute(Commands.KUSTO_IDENTITY_GET_COMMAND);
identityToken = identityTokenResult.getValues().get(0).get(identityTokenResult.getIndexByColumnName("AuthorizationContext"));
if (authTokenLock.writeLock().tryLock()) {
try {
log.info("Refreshing Ingestion Auth Token");
KustoResults identityTokenResult = kustoClient.execute(Commands.KUSTO_IDENTITY_GET_COMMAND);
if (identityTokenResult != null
&& identityTokenResult.getValues() != null
&& identityTokenResult.getValues().size() > 0) {
identityToken = identityTokenResult.getValues().get(0).get(identityTokenResult.getIndexByColumnName("AuthorizationContext"));
}
} finally {
authTokenLock.writeLock().unlock();
}
}
}
private class IngestionResource {
ResourceTypes type;
ResourceType type;
int roundRubinIdx = 0;
ArrayList<String> valuesList;
IngestionResource(ResourceTypes resourceType){
IngestionResource(ResourceType resourceType) {
this.type = resourceType;
valuesList = new ArrayList<>();
}
void addValue(String val){
void addValue(String val) {
valuesList.add(val);
}
int getSize(){
int getSize() {
return valuesList.size();
}
String nextValue(){
String nextValue() {
roundRubinIdx = (roundRubinIdx + 1) % valuesList.size();
return valuesList.get(roundRubinIdx);
}

Просмотреть файл

@ -29,15 +29,15 @@ class IngestClientImplTest {
ingestClientMock = mock(IngestClient.class);
ingestClientMockImpl = mock(IngestClientImpl.class);
when(resourceManagerMock.getIngestionResource(ResourceManager.ResourceTypes.SECURED_READY_FOR_AGGREGATION_QUEUE))
when(resourceManagerMock.getIngestionResource(ResourceManager.ResourceType.SECURED_READY_FOR_AGGREGATION_QUEUE))
.thenReturn("queue1")
.thenReturn("queue2");
when(resourceManagerMock.getIngestionResource(ResourceManager.ResourceTypes.TEMP_STORAGE))
when(resourceManagerMock.getIngestionResource(ResourceManager.ResourceType.TEMP_STORAGE))
.thenReturn("storage1")
.thenReturn("storage2");
when(resourceManagerMock.getIngestionResource(ResourceManager.ResourceTypes.INGESTIONS_STATUS_TABLE))
when(resourceManagerMock.getIngestionResource(ResourceManager.ResourceType.INGESTIONS_STATUS_TABLE))
.thenReturn("statusTable");
when(resourceManagerMock.getKustoIdentityToken())

Просмотреть файл

@ -64,7 +64,7 @@ class ResourceManagerTest {
HashMap<String,Integer> m = new HashMap();
for(int i=0; i<10; i++){
storage = resourceManager.getIngestionResource(ResourceManager.ResourceTypes.TEMP_STORAGE);
storage = resourceManager.getIngestionResource(ResourceManager.ResourceType.TEMP_STORAGE);
m.put(storage,m.getOrDefault(storage,0)+1);
}
@ -82,7 +82,7 @@ class ResourceManagerTest {
HashMap<String,Integer> m = new HashMap();
for(int i=0; i<10; i++){
queueName = resourceManager.getIngestionResource(ResourceManager.ResourceTypes.SECURED_READY_FOR_AGGREGATION_QUEUE);
queueName = resourceManager.getIngestionResource(ResourceManager.ResourceType.SECURED_READY_FOR_AGGREGATION_QUEUE);
m.put(queueName,m.getOrDefault(queueName,0)+1);
}
@ -93,18 +93,6 @@ class ResourceManagerTest {
}
}
//@Test (Not ready yet)
void clean() {
try{
resourceManager.clean();
for(ResourceManager.ResourceTypes resourceType : ResourceManager.ResourceTypes.values()){
assertEquals(0, resourceManager.getSize(resourceType));
}
} catch (Exception e) {
e.printStackTrace();
}
}
private KustoResults generateIngestionResourcesResult() {
HashMap<String, Integer> colNameToIndexMap = new HashMap<>();
HashMap<String, String> colNameToTypeMap = new HashMap<>();

Просмотреть файл

@ -32,4 +32,4 @@ public class FileIngestion {
e.printStackTrace();
}
}
}
}