Merged PR 96: LocalPath Files Time Filtering

I added changes primarily in Collector.cs (UploadLocalData() function and other helper functions) to grab files from the LocalPath field location specified in the configuration file. Then go through those files and filter out those whose 'LastModified' time does not fit the StartTimeStamp/EndTimeStamp times. Then, with the files that fit the time range, I created a copy of those files and put them in the CacheLocation path specified in the configuration file. The files in the CacheLocation are used later for formatting and local ingestion.

I added changes to the other files to support the LocalPath field and allow you to test the code I put in the Collector.cs file. Those changes are the same as what is in my prior PR but doesn't include all of them. I only added what was necessary to reach the new functionality I described above and test that the file copies were created in the CacheLocation. You won't be able to actually completely do local ingestion with my changes in this PR, only in the prior PR.
This commit is contained in:
Victoria Chin 2024-07-27 07:07:00 +00:00
Родитель 356debf019 15c2a1fa5e
Коммит f007c8382f
2 изменённых файлов: 145 добавлений и 67 удалений

Просмотреть файл

@ -39,9 +39,9 @@ Options:
if collectsfdata.options.json exists, it will be used for configuration.
-cf|--containerFilter [string] string / regex to filter container names
-dp|--databasePersistence [bool] default false to create a volatile database. a value of true will persist
the database a given path in your container.
the database on a given path in your container.
-dpp|--databasePersistencePath [string] path where you want your database to be persisted in for local ingestion.
path much be in the format: "@'C:\\kustodata\\MyDatabaseName\\md',@'C:\\kustodata\\MyDatabaseName\\data'"
path must be in the format: "@'C:\\kustodata\\MyDatabaseName\\md',@'C:\\kustodata\\MyDatabaseName\\data'"
-dc|--deleteCache [bool] delete downloaded blobs from local disk at end of execution.
-to|--stop [DateTime] end time range to collect data to. default is now.
example: "06/01/2021 09:01:23 -04:00"

Просмотреть файл

@ -437,87 +437,165 @@ namespace CollectSFData
}
}
}
else if (Config.IsCacheLocationPreConfigured())
else if (Config.IsCacheLocationPreConfigured() && !Config.IsIngestionLocal)
{
switch (Config.FileType)
{
case FileTypesEnum.counter:
files = Instance.FileMgr.GetFilesByExtension(Config.CacheLocation, Constants.PerfCtrExtension);
if (files.Count < 1)
{
files = Instance.FileMgr.GetFilesByExtension(Config.CacheLocation, Constants.PerfCsvExtension);
}
break;
case FileTypesEnum.setup:
files = Instance.FileMgr.GetFilesByExtension(Config.CacheLocation, Constants.SetupExtension);
break;
case FileTypesEnum.table:
files = Instance.FileMgr.GetFilesByExtension(Config.CacheLocation, Constants.TableExtension);
break;
case FileTypesEnum.trace:
files = Instance.FileMgr.GetFilesByExtension(Config.CacheLocation, Constants.DtrExtension + Constants.ZipExtension);
if (files.Count < 1)
{
files = Instance.FileMgr.GetFilesByExtension(Config.CacheLocation, Constants.DtrExtension);
}
if (files.Count < 1)
{
files = Instance.FileMgr.GetFilesByExtension(Config.CacheLocation, Constants.EtlExtension + Constants.ZipExtension);
}
if (files.Count < 1)
{
files = Instance.FileMgr.GetFilesByExtension(Config.CacheLocation, Constants.EtlExtension);
}
break;
case FileTypesEnum.sfextlog:
files = Instance.FileMgr.GetFilesByExtension(Config.CacheLocation, Constants.LogExtension);
break;
default:
Log.Warning($"configured filetype:{Config.FileType} not valid for cache upload. returning.");
return;
}
files = GetFilesWithNecessaryExtension(files, Config.CacheLocation);
if (files.Count < 1)
{
Log.Error($"configuration set to upload cache files from 'cachelocation' {Config.CacheLocation} but no files found");
}
}
else if (Config.IsCacheLocationPreConfigured() && Config.IsIngestionLocal)
{
files = GetFilesWithNecessaryExtension(files, Config.LocalPath);
if (files.Count < 1)
{
Log.Error($"configuration set to upload cache files from 'LocalPath' {Config.LocalPath} but no files found");
}
}
Instance.TotalFilesEnumerated += files.Count;
foreach (string file in files)
if (Config.IsIngestionLocal)
{
FileObject fileObject = new FileObject(file, Config.CacheLocation) { Status = FileStatus.enumerated };
// only queue if file not already in FileObjects list
if (Instance.FileObjects.Add(fileObject))
List<FileObject> fileObjects = PrepareFiles(files);
foreach (FileObject fileObject in fileObjects)
{
Log.Info($"adding file: {fileObject.FileUri}", ConsoleColor.Green);
if (!Config.List)
{
QueueForIngest(fileObject);
}
QueueAndAddToFileObjects(fileObject);
}
else
}
else
{
foreach (string file in files)
{
Log.Debug($"file {fileObject.FileUri} already in FileObjects. not queueing for ingest.");
FileObject fileObject = new FileObject(file, Config.CacheLocation) { Status = FileStatus.enumerated };
QueueAndAddToFileObjects(fileObject);
}
}
}
private void QueueAndAddToFileObjects(FileObject fileObject)
{
// only queue if file not already in FileObjects list
if (Instance.FileObjects.Add(fileObject))
{
Log.Info($"adding file: {fileObject.FileUri}", ConsoleColor.Green);
if (!Config.List)
{
QueueForIngest(fileObject);
}
}
else
{
Log.Debug($"file {fileObject.FileUri} already in FileObjects. not queueing for ingest.");
}
}
private List<string> GetFilesWithNecessaryExtension(List<string> files, string location)
{
switch (Config.FileType)
{
case FileTypesEnum.counter:
files = Instance.FileMgr.GetFilesByExtension(location, Constants.PerfCtrExtension);
if (files.Count < 1)
{
files = Instance.FileMgr.GetFilesByExtension(location, Constants.PerfCsvExtension);
}
break;
case FileTypesEnum.setup:
files = Instance.FileMgr.GetFilesByExtension(location, Constants.SetupExtension);
break;
case FileTypesEnum.table:
files = Instance.FileMgr.GetFilesByExtension(location, Constants.TableExtension);
break;
case FileTypesEnum.trace:
files = Instance.FileMgr.GetFilesByExtension(location, Constants.DtrExtension + Constants.ZipExtension);
if (files.Count < 1)
{
files = Instance.FileMgr.GetFilesByExtension(location, Constants.DtrExtension);
}
if (files.Count < 1)
{
files = Instance.FileMgr.GetFilesByExtension(location, Constants.EtlExtension + Constants.ZipExtension);
}
if (files.Count < 1)
{
files = Instance.FileMgr.GetFilesByExtension(location, Constants.EtlExtension);
}
break;
case FileTypesEnum.sfextlog:
files = Instance.FileMgr.GetFilesByExtension(location, Constants.LogExtension);
break;
default:
Log.Warning($"configured filetype:{Config.FileType} not valid for cache upload. returning.");
return null;
}
return files;
}
private List<string> FilterFilesByTimeStamp(List<string> files)
{
List<string> filteredFilesByTime = new List<string>();
foreach (string file in files)
{
DateTimeOffset lastWriteTime = File.GetLastWriteTimeUtc(file);
if (lastWriteTime >= Config.StartTimeUtc && lastWriteTime <= Config.EndTimeUtc)
{
filteredFilesByTime.Add(file);
Instance.TotalFilesMatched++;
}
else
{
Instance.TotalFilesSkipped++;
}
}
return filteredFilesByTime;
}
private FileObject CopyLocalFileToCacheLocation(string fileLocalPath)
{
// remove path from the file name
string fileName = fileLocalPath.Substring(Config.LocalPath.Length + 1);
// create copy of this file in the cache location
string fileCacheLocationPath = Path.Combine(Config.CacheLocation, fileName);
File.Copy(fileLocalPath, fileCacheLocationPath, true);
FileObject fileObject = new FileObject(fileCacheLocationPath, Config.CacheLocation) { Status = FileStatus.enumerated };
return fileObject;
}
private List<FileObject> PrepareFiles(List<string> files)
{
List<string> filteredFilesByTime = FilterFilesByTimeStamp(files);
List<FileObject> fileObjects = new List<FileObject>();
foreach (string file in filteredFilesByTime)
{
FileObject fileObject = CopyLocalFileToCacheLocation(file);
fileObjects.Add(fileObject);
}
return fileObjects;
}
}
}