Load DLLs from Storage container and sync cache (#542)

* Load DLLs from storage and update cache
This commit is contained in:
rekhaswaminathan 2020-07-09 13:50:46 -07:00 коммит произвёл GitHub
Родитель 5de30bde76
Коммит f2c90acc83
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
12 изменённых файлов: 244 добавлений и 50 удалений

Просмотреть файл

@ -366,7 +366,11 @@ namespace Diagnostics.RuntimeHost.Controllers
}
await _sourceWatcherService.Watcher.CreateOrUpdatePackage(pkg);
await storageWatcher.CreateOrUpdatePackage(pkg);
// If Azure Storage is not enabled, we still want to keep data updated.
if(!tableCacheService.IsStorageAsSourceEnabled())
{
await storageWatcher.CreateOrUpdatePackage(pkg);
}
return Ok();
}
@ -772,7 +776,6 @@ namespace Diagnostics.RuntimeHost.Controllers
private async Task<Tuple<Response, List<DataProviderMetadata>>> GetDetectorInternal(string detectorId, RuntimeContext<TResource> context)
{
await this._sourceWatcherService.Watcher.WaitForFirstCompletion();
var queryParams = Request.Query;
var dataProviderContext = (DataProviderContext)HttpContext.Items[HostConstants.DataProviderContextKey];
@ -792,7 +795,32 @@ namespace Diagnostics.RuntimeHost.Controllers
{
dataProvidersMetadata = GetDataProvidersMetadata(dataProviders);
}
var invoker = this._invokerCache.GetEntityInvoker<TResource>(detectorId, context);
EntityInvoker invoker = null;
if(tableCacheService.IsStorageAsSourceEnabled())
{
invoker = this._invokerCache.GetEntityInvoker<TResource>(detectorId, context);
var allDetectors = await this.tableCacheService.GetEntityListByType(context, "Detector");
var detectorMetadata = allDetectors.Where(entity => entity.RowKey.ToLower().Equals(detectorId, StringComparison.CurrentCultureIgnoreCase)).FirstOrDefault();
// This means detector is definitely not present
if (invoker == null && detectorMetadata == null)
{
return null;
}
// If detector is still downloading, then await first completion
if (invoker == null && detectorMetadata != null)
{
await this._sourceWatcherService.Watcher.WaitForFirstCompletion();
// Refetch from invoker cache
invoker = this._invokerCache.GetEntityInvoker<TResource>(detectorId, context);
}
} else
{
await this._sourceWatcherService.Watcher.WaitForFirstCompletion();
invoker = this._invokerCache.GetEntityInvoker<TResource>(detectorId, context);
}
if (invoker == null)
{

Просмотреть файл

@ -1,9 +1,10 @@
using System;
using Diagnostics.RuntimeHost.Services.CacheService;
using Diagnostics.RuntimeHost.Services.SourceWatcher.Watchers;
using Diagnostics.RuntimeHost.Services.StorageService;
using Diagnostics.RuntimeHost.Utilities;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Win32;
namespace Diagnostics.RuntimeHost.Services.SourceWatcher
{
@ -13,22 +14,25 @@ namespace Diagnostics.RuntimeHost.Services.SourceWatcher
public ISourceWatcher Watcher => _watcher;
public SourceWatcherService(IHostingEnvironment env, IConfiguration configuration, IInvokerCacheService invokerCacheService, IGistCacheService gistCacheService, IKustoMappingsCacheService kustoMappingsCacheService)
public ISourceWatcher KustoMappingWatcher;
public SourceWatcherService(IHostingEnvironment env, IConfiguration configuration, IInvokerCacheService invokerCacheService, IGistCacheService gistCacheService, IKustoMappingsCacheService kustoMappingsCacheService, IStorageService storageService)
{
SourceWatcherType watcherType;
watcherType = Enum.Parse<SourceWatcherType>(configuration[$"SourceWatcher:{RegistryConstants.WatcherTypeKey}"]);
switch (watcherType)
var sourceWatcherType = Enum.Parse<SourceWatcherType>(configuration[$"SourceWatcher:{RegistryConstants.WatcherTypeKey}"]);
IGithubClient githubClient = new GithubClient(env, configuration);
switch (sourceWatcherType)
{
case SourceWatcherType.LocalFileSystem:
_watcher = new LocalFileSystemWatcher(env, configuration, invokerCacheService, gistCacheService);
break;
case SourceWatcherType.Github:
IGithubClient githubClient = new GithubClient(env, configuration);
_watcher = new GitHubWatcher(env, configuration, invokerCacheService, gistCacheService, kustoMappingsCacheService, githubClient);
break;
case SourceWatcherType.AzureStorage:
_watcher = new StorageWatcher(env, configuration, storageService, invokerCacheService, gistCacheService);
KustoMappingWatcher = new GitHubWatcher(env, configuration, invokerCacheService, gistCacheService, kustoMappingsCacheService, githubClient);
break;
default:
throw new NotSupportedException("Source Watcher Type not supported");
}

Просмотреть файл

@ -3,6 +3,7 @@
public enum SourceWatcherType
{
LocalFileSystem = 0,
Github = 1
Github = 1,
AzureStorage = 2
}
}

Просмотреть файл

@ -76,17 +76,31 @@ namespace Diagnostics.RuntimeHost.Services.SourceWatcher
#region Initialize Github Worker
// If AzureStorageWatcher is enabled, use GithubWatcher only for updating KustoMapping Cache
SourceWatcherType watcherType = Enum.Parse<SourceWatcherType>(configuration[$"SourceWatcher:{RegistryConstants.WatcherTypeKey}"]);
// TODO: Register the github worker with destination path.
var gistWorker = new GithubGistWorker(gistCache, _loadOnlyPublicDetectors);
var detectorWorker = new GithubDetectorWorker(invokerCache, _loadOnlyPublicDetectors);
var kustoMappingsWorker = new GithubKustoConfigurationWorker(kustoMappingsCache);
GithubWorkers = new Dictionary<string, IGithubWorker>
if(watcherType.Equals(SourceWatcherType.AzureStorage))
{
{ gistWorker.Name, gistWorker },
{ detectorWorker.Name, detectorWorker },
{ kustoMappingsWorker.Name, kustoMappingsWorker }
};
GithubWorkers = new Dictionary<string, IGithubWorker>
{
{ kustoMappingsWorker.Name, kustoMappingsWorker }
};
} else
{
GithubWorkers = new Dictionary<string, IGithubWorker>
{
{ gistWorker.Name, gistWorker },
{ detectorWorker.Name, detectorWorker },
{ kustoMappingsWorker.Name, kustoMappingsWorker }
};
}
#endregion Initialize Github Worker

Просмотреть файл

@ -1,4 +1,5 @@
using Diagnostics.RuntimeHost.Models;
using Diagnostics.RuntimeHost.Services.CacheService;
using Diagnostics.RuntimeHost.Services.StorageService;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
@ -8,7 +9,7 @@ namespace Diagnostics.RuntimeHost.Services.SourceWatcher.Watchers
{
public sealed class NationalCloudStorageWatcher : StorageWatcher
{
public NationalCloudStorageWatcher(IHostingEnvironment env, IConfiguration configuration, IStorageService service): base(env, configuration, service)
public NationalCloudStorageWatcher(IHostingEnvironment env, IConfiguration configuration, IStorageService service, IInvokerCacheService invokerCacheService, IGistCacheService gistCacheService): base(env, configuration, service, invokerCacheService, gistCacheService)
{
}

Просмотреть файл

@ -11,26 +11,76 @@ using Newtonsoft.Json;
using Diagnostics.RuntimeHost.Utilities;
using System.IO;
using Diagnostics.Logger;
using Diagnostics.RuntimeHost.Services.CacheService;
using System.Linq;
using System.Collections.Generic;
using Diagnostics.Scripts.Models;
using Diagnostics.Scripts;
using System.Reflection;
namespace Diagnostics.RuntimeHost.Services.SourceWatcher.Watchers
{
public class StorageWatcher : ISourceWatcher
{
public Task WaitForFirstCompletion() => blobDowloadTask;
private IStorageService storageService;
private IHostingEnvironment hostingEnvironment;
private IConfiguration configuration;
private IGithubClient gitHubClient;
private Task blobDowloadTask;
public StorageWatcher(IHostingEnvironment env, IConfiguration config, IStorageService service)
private Dictionary<EntityType, ICache<string, EntityInvoker>> _invokerDictionary;
private int _pollingIntervalInSeconds = 30;
private DateTime cacheLastModifiedTime;
private bool LoadOnlyPublicDetectors
{
get
{
if (bool.TryParse(configuration["SourceWatcher:LoadOnlyPublicDetectors"], out bool retVal))
{
return retVal;
}
return false;
}
}
public StorageWatcher(IHostingEnvironment env, IConfiguration config, IStorageService service, IInvokerCacheService invokerCache, IGistCacheService gistCache)
{
storageService = service;
hostingEnvironment = env;
configuration = config;
gitHubClient = new GithubClient(env, config);
_invokerDictionary = new Dictionary<EntityType, ICache<string, EntityInvoker>>
{
{ EntityType.Detector, invokerCache},
{ EntityType.Signal, invokerCache},
{ EntityType.Gist, gistCache}
};
Start();
}
public async Task<HealthCheckResult> CheckHealthAsync(CancellationToken cancellationToken)
public virtual async Task<HealthCheckResult> CheckHealthAsync(CancellationToken cancellationToken = default(CancellationToken))
{
throw new NotImplementedException();
HealthCheckResult result = null;
Exception storageException = null;
try
{
var response = await storageService.ListBlobsInContainer();
}
catch (Exception ex)
{
storageException = ex;
}
finally
{
result = new HealthCheckResult(storageException == null ? HealthStatus.Healthy : HealthStatus.Unhealthy, "Azure Storage", "Run a test against azure storage by listing blobs in container", storageException);
}
return result;
}
public virtual async Task CreateOrUpdatePackage(Package pkg)
@ -41,6 +91,7 @@ namespace Diagnostics.RuntimeHost.Services.SourceWatcher.Watchers
}
try
{
await gitHubClient.CreateOrUpdateFiles(pkg.GetCommitContents(), pkg.GetCommitMessage());
var blobName = $"{pkg.Id.ToLower()}/{pkg.Id.ToLower()}.dll";
var etag = await storageService.LoadBlobToContainer(blobName, pkg.DllBytes);
if (string.IsNullOrWhiteSpace(etag))
@ -53,7 +104,7 @@ namespace Diagnostics.RuntimeHost.Services.SourceWatcher.Watchers
if (gitCommit != null)
{
diagEntity.GitHubSha = gitCommit.Commit.Tree.Sha;
diagEntity.GithubLastModified = gitCommit.Commit.Author.Date.DateTime.ToUniversalTime();
diagEntity.GithubLastModified = gitCommit.Commit.Author.Date.LocalDateTime; // Setting it as local date time because storage sdk converts to UTC while saving
}
using (var ms = new MemoryStream(Convert.FromBase64String(pkg.DllBytes)))
{
@ -69,14 +120,83 @@ namespace Diagnostics.RuntimeHost.Services.SourceWatcher.Watchers
public void Start()
{
throw new NotImplementedException();
blobDowloadTask = StartBlobDownload(true);
StartPollingForChanges();
}
public Task WaitForFirstCompletion()
private async Task StartPollingForChanges()
{
throw new NotImplementedException();
await blobDowloadTask;
DiagnosticsETWProvider.Instance.LogAzureStorageMessage(nameof(StorageWatcher), $"Start up blob download task completed at {DateTime.UtcNow}");
cacheLastModifiedTime = DateTime.UtcNow;
do
{
await Task.Delay(_pollingIntervalInSeconds * 1000);
await StartBlobDownload(false);
DiagnosticsETWProvider.Instance.LogAzureStorageMessage(nameof(StorageWatcher), $"Polling for blob download task completed at {DateTime.UtcNow}");
cacheLastModifiedTime = DateTime.UtcNow;
} while (true);
}
private async Task StartBlobDownload(bool startup = false)
{
var detectorsList = await storageService.GetEntitiesByPartitionkey("Detector");
var gists = await storageService.GetEntitiesByPartitionkey("Gist");
var entitiesToLoad = new List<DiagEntity>();
var filteredDetectors = LoadOnlyPublicDetectors ? detectorsList.Where(row => !row.IsInternal).ToList() : detectorsList;
if(!startup)
{
entitiesToLoad.AddRange(filteredDetectors.Where(s => s.Timestamp >= cacheLastModifiedTime).ToList());
entitiesToLoad.AddRange(gists.Where(s => s.Timestamp >= cacheLastModifiedTime).ToList());
} else
{
entitiesToLoad.AddRange(filteredDetectors.ToList());
entitiesToLoad.AddRange(gists);
}
try
{
foreach (var entity in entitiesToLoad)
{
var assemblyData = await storageService.GetBlobByName($"{entity.RowKey.ToLower()}/{entity.RowKey.ToLower()}.dll");
if (assemblyData == null || assemblyData.Length == 0)
{
DiagnosticsETWProvider.Instance.LogAzureStorageWarning(nameof(StorageWatcher), $" blob {entity.RowKey.ToLower()}.dll is neither null or 0 bytes in length");
continue;
}
// initializing Entry Point of Invoker using assembly
Assembly temp = Assembly.Load(assemblyData);
EntityType entityType = EntityType.Signal;
if(entity.PartitionKey.Equals("Gist"))
{
entityType = EntityType.Gist;
}
else if (!Enum.TryParse<EntityType>(entity.DetectorType, out entityType))
{
entityType = EntityType.Signal;
}
EntityMetadata metaData = new EntityMetadata(string.Empty, entityType);
var newInvoker = new EntityInvoker(metaData);
newInvoker.InitializeEntryPoint(temp);
if (_invokerDictionary.TryGetValue(entityType, out ICache<string, EntityInvoker> cache) && newInvoker.EntryPointDefinitionAttribute != null)
{
DiagnosticsETWProvider.Instance.LogAzureStorageMessage(nameof(StorageWatcher), $"Updating cache with new invoker with id : {newInvoker.EntryPointDefinitionAttribute.Id}");
cache.AddOrUpdate(newInvoker.EntryPointDefinitionAttribute.Id, newInvoker);
}
else
{
DiagnosticsETWProvider.Instance.LogAzureStorageWarning(nameof(StorageWatcher), $"No invoker cache exist for {entityType}");
}
}
} catch (Exception ex)
{
DiagnosticsETWProvider.Instance.LogAzureStorageException(nameof(StorageWatcher), $"Exception occurred while trying to update cache {ex.Message} ", ex.GetType().ToString(), ex.ToString());
}
}
}
}

Просмотреть файл

@ -32,5 +32,10 @@ namespace Diagnostics.RuntimeHost.Services.StorageService
{
return Task.FromResult(new DiagEntity());
}
public async Task<int> ListBlobsInContainer()
{
return 0;
}
}
}

Просмотреть файл

@ -13,6 +13,7 @@ using Microsoft.WindowsAzure.Storage.Blob;
using System.Diagnostics;
using System.IO;
using System.Linq;
using Diagnostics.RuntimeHost.Services.SourceWatcher;
namespace Diagnostics.RuntimeHost.Services.StorageService
{
@ -23,6 +24,8 @@ namespace Diagnostics.RuntimeHost.Services.StorageService
Task<DiagEntity> LoadDataToTable(DiagEntity detectorEntity);
Task<string> LoadBlobToContainer(string blobname, string contents);
Task<byte[]> GetBlobByName(string name);
Task<int> ListBlobsInContainer();
}
public class StorageService : IStorageService
{
@ -58,10 +61,10 @@ namespace Diagnostics.RuntimeHost.Services.StorageService
{
loadOnlyPublicDetectors = false;
}
if(!bool.TryParse((configuration["SourceWatcher:UseStorageAsSource"]), out isStorageEnabled))
var sourceWatcherType = Enum.Parse<SourceWatcherType>(configuration[$"SourceWatcher:{RegistryConstants.WatcherTypeKey}"]);
if (sourceWatcherType.Equals(SourceWatcherType.AzureStorage))
{
isStorageEnabled = false;
isStorageEnabled = true;
}
cloudTable = tableClient.GetTableReference(tableName);
}
@ -175,6 +178,7 @@ namespace Diagnostics.RuntimeHost.Services.StorageService
using (MemoryStream ms = new MemoryStream())
{
await cloudBlob.DownloadToStreamAsync(ms);
timeTakenStopWatch.Stop();
DiagnosticsETWProvider.Instance.LogAzureStorageMessage(nameof(StorageService), $"Downloaded {name} to memory stream, time taken {timeTakenStopWatch.ElapsedMilliseconds}");
return ms.ToArray();
}
@ -184,5 +188,22 @@ namespace Diagnostics.RuntimeHost.Services.StorageService
return null;
}
}
public async Task<int> ListBlobsInContainer()
{
try
{
var timeTakenStopWatch = new Stopwatch();
timeTakenStopWatch.Start();
var blobsResult = await containerClient.ListBlobsSegmentedAsync(null);
timeTakenStopWatch.Stop();
DiagnosticsETWProvider.Instance.LogAzureStorageMessage(nameof(StorageService), $"Number of blobs stored in container {container} is {blobsResult.Results.Count()}, time taken {timeTakenStopWatch.ElapsedMilliseconds} milliseconds");
return blobsResult.Results != null ? blobsResult.Results.Count() : 0 ;
} catch (Exception ex)
{
DiagnosticsETWProvider.Instance.LogAzureStorageException(nameof(StorageService), ex.Message, ex.GetType().ToString(), ex.ToString());
throw ex;
}
}
}
}

Просмотреть файл

@ -72,7 +72,7 @@ namespace Diag.SourceWatcher
//First check if entity exists in blob or table
var existingDetectorEntity = await storageService.GetEntityFromTable(configFileData.PartitionKey, configFileData.RowKey, githubdir.Name);
var doesBlobExists = await storageService.CheckDetectorExists(githubdir.Name);
var doesBlobExists = await storageService.CheckDetectorExists($"{githubdir.Name}/{githubdir.Name}.dll");
//If there is no entry in table or blob or github last modifed date has been changed, upload to blob
if (existingDetectorEntity == null || !doesBlobExists || existingDetectorEntity.GithubLastModified != configFileData.GithubLastModified)
{

Просмотреть файл

@ -56,20 +56,16 @@ namespace SourceWatcherFuncApp.Services
{
try
{
if (existingDetectors.Count < 1)
var cloudBlob = blobContainer.GetBlockBlobReference(currentDetector);
var doesExist = await cloudBlob.ExistsAsync();
storageServiceLogger.LogInformation($"{currentDetector} exist in blob {doesExist.ToString()}");
if (doesExist)
{
var blobsList = await blobContainer.ListBlobsSegmentedAsync(null);
foreach (var blobItem in blobsList.Results)
{
if(blobItem is CloudBlobDirectory)
{
var directory = (CloudBlobDirectory)blobItem;
var name = directory.Prefix.Replace("/", "");
existingDetectors.Add(name);
}
}
await cloudBlob.FetchAttributesAsync();
storageServiceLogger.LogInformation($"Size of {currentDetector} is {cloudBlob.Properties.Length} bytes");
return cloudBlob.Properties.Length > 0;
}
return existingDetectors.Contains(currentDetector);
return false;
} catch(Exception ex)
{
storageServiceLogger.LogError(ex.ToString());
@ -83,7 +79,9 @@ namespace SourceWatcherFuncApp.Services
{
storageServiceLogger.LogInformation($"Uploading {name} blob");
var cloudBlob = blobContainer.GetBlockBlobReference(name);
uploadStream.Position = 0;
await cloudBlob.UploadFromStreamAsync(uploadStream);
storageServiceLogger.LogInformation($"Loaded {name} to blob");
} catch (Exception ex)
{
storageServiceLogger.LogError(ex.ToString());

Просмотреть файл

@ -1,16 +1,17 @@
{
"version": "2.0",
"version": "2.0",
"logging": {
"fileLoggingMode": "debugOnly",
"logLevel": {
"Function": "Information",
"default": "Information"
},
"fileLoggingMode": "debugOnly",
"logLevel": {
"Function": "Information",
"default": "Information"
},
"applicationInsights": {
"samplingExcludedTypes": "Request",
"samplingSettings": {
"isEnabled": true
}
}
}
},
"functionTimeout": "01:00:00"
}

Просмотреть файл

@ -53,6 +53,7 @@ namespace Diagnostics.Tests.AzureStorageTests
{
configuration["SourceWatcher:BlobContainerName"] = "detectors";
}
configuration[$"SourceWatcher:WatcherType"] = "AzureStorage";
storageService = new StorageService(configuration, environment);
tableCacheService = new DiagEntityTableCacheService(storageService);
}