Added scrub metrics calculation
This commit is contained in:
Родитель
34eb31b688
Коммит
bab668c8f6
|
@ -61,6 +61,7 @@ namespace CosmosCloneCommon.Migrator
|
|||
logger.LogInfo($"Initialize process for scrub rule on filter {filterCondition}");
|
||||
var sRules = scrubRules.Where(o => o.FilterCondition.Equals(filterCondition)).ToList();
|
||||
logger.LogInfo($"Scrub rules found {sRules.Count}");
|
||||
long filterRecordCount = cosmosHelper.GetFilterRecordCount(filterCondition);
|
||||
ScrubDataFetchQuery = cosmosHelper.GetScrubDataDocumentQuery<string>(targetClient, filterCondition, CloneSettings.ReadBatchSize);
|
||||
await ReadUploadInbatches((IDocumentQuery<string>)ScrubDataFetchQuery, sRules);
|
||||
|
||||
|
@ -69,6 +70,7 @@ namespace CosmosCloneCommon.Migrator
|
|||
if(srule.FilterCondition.Equals(filterCondition))
|
||||
{
|
||||
srule.IsComplete = true;
|
||||
srule.RecordsByFilter = filterRecordCount;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -115,6 +117,7 @@ namespace CosmosCloneCommon.Migrator
|
|||
nentities.Add(JsonConvert.SerializeObject(jobj));
|
||||
}
|
||||
scrubbedEntities = nentities;
|
||||
scrubRule.RecordsUpdated += jEntities.Count;
|
||||
}
|
||||
var objEntities = jEntities.Cast<Object>().ToList();
|
||||
try
|
||||
|
@ -125,8 +128,7 @@ namespace CosmosCloneCommon.Migrator
|
|||
{
|
||||
logger.LogError(ex);
|
||||
throw (ex);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
badEntities = uploadResponse.BadInputDocuments;
|
||||
TotalRecordsScrubbed += uploadResponse.NumberOfDocumentsImported;
|
||||
|
|
|
@ -94,16 +94,18 @@ namespace CosmosCloneCommon.Migrator
|
|||
if (CloneSettings.ScrubbingRequired && noFilterScrubRules != null && noFilterScrubRules.Count > 0)
|
||||
{
|
||||
var dcs = new DataScrubMigrator();
|
||||
var result = dcs.StartScrub(noFilterScrubRules);
|
||||
var result = await dcs.StartScrub(noFilterScrubRules);
|
||||
}
|
||||
}
|
||||
|
||||
if (CloneSettings.ScrubbingRequired && filteredScrubRules != null && filteredScrubRules.Count > 0)
|
||||
{
|
||||
var dcs = new DataScrubMigrator();
|
||||
var result = dcs.StartScrub(filteredScrubRules);
|
||||
var result = await dcs.StartScrub(filteredScrubRules);
|
||||
}
|
||||
|
||||
logger.LogScrubRulesInformation(DocumentMigrator.scrubRules);
|
||||
|
||||
IsCodeMigrationComplete = false;
|
||||
if (CloneSettings.CopyStoredProcedures) { await CopyStoredProcedures(); }
|
||||
if (CloneSettings.CopyUDFs) { await CopyUDFs(); }
|
||||
|
@ -145,6 +147,7 @@ namespace CosmosCloneCommon.Migrator
|
|||
{
|
||||
if(string.IsNullOrEmpty(sRule.FilterCondition))
|
||||
{
|
||||
sRule.RecordsByFilter = TotalRecordsInSource;
|
||||
noFilterScrubRules.Add(sRule);
|
||||
}
|
||||
else
|
||||
|
@ -193,7 +196,7 @@ namespace CosmosCloneCommon.Migrator
|
|||
}
|
||||
else
|
||||
{
|
||||
var jEntities = new List<JToken>();
|
||||
var jEntities = new List<JToken>();
|
||||
foreach (var sRule in noFilterScrubRules)
|
||||
{
|
||||
jEntities = objectScrubber.ScrubObjectList(scrubbedEntities, sRule);
|
||||
|
@ -203,6 +206,7 @@ namespace CosmosCloneCommon.Migrator
|
|||
nentities.Add(JsonConvert.SerializeObject(jobj));
|
||||
}
|
||||
scrubbedEntities = nentities;
|
||||
sRule.RecordsUpdated += jEntities.Count;
|
||||
}
|
||||
var objDocuments = jEntities.Cast<Object>().ToList();
|
||||
uploadResponse = await cosmosBulkImporter.BulkSendToNewCollection<dynamic>(objDocuments);
|
||||
|
|
|
@ -19,9 +19,9 @@ namespace CosmosCloneCommon.Model
|
|||
|
||||
public RuleType? Type { get; set; }
|
||||
|
||||
private int RecordsByFilter { get; set; }
|
||||
private int RecordsUpdated { get; set; }
|
||||
private int CountNullAttributes { get; set; }
|
||||
public long RecordsByFilter { get; set; }
|
||||
public long RecordsUpdated { get; set; }
|
||||
private long CountNullAttributes { get; set; }
|
||||
public bool IsComplete { get; set; }
|
||||
|
||||
public ScrubRule() { }
|
||||
|
@ -35,8 +35,9 @@ namespace CosmosCloneCommon.Model
|
|||
this.RuleId = ruleId;
|
||||
this.IsComplete = false;
|
||||
}
|
||||
|
||||
}
|
||||
public enum RuleType { SingleValue, NullValue, Shuffle };//Can add random later if required.
|
||||
public enum RuleType { SingleValue, NullValue, Shuffle };//Can add random rule type later if required.
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ using System.Linq;
|
|||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
using System.Diagnostics;
|
||||
using CosmosCloneCommon.Model;
|
||||
|
||||
namespace CosmosCloneCommon.Utility
|
||||
{
|
||||
|
@ -47,5 +48,22 @@ namespace CosmosCloneCommon.Utility
|
|||
Exception baseException = e.GetBaseException();
|
||||
LogInfo($"Error: {e.Message}, Message: {baseException.Message}");
|
||||
}
|
||||
|
||||
public static void LogScrubRulesInformation(List<ScrubRule> scrubRules)
|
||||
{
|
||||
if (scrubRules!=null && scrubRules.Count>0 && CloneSettings.ScrubbingRequired)
|
||||
{
|
||||
long totalRecords=0;
|
||||
foreach (var rule in scrubRules)
|
||||
{
|
||||
LogInfo($"Rule Id: {rule.RuleId}. Attribute: {rule.PropertyName}");
|
||||
LogInfo($"Rule filter:{(string.IsNullOrEmpty(rule.FilterCondition) ? "None":rule.FilterCondition)}");
|
||||
LogInfo($"Rule Type: {rule.Type.ToString()}");
|
||||
LogInfo($"Records by filter: {rule.RecordsByFilter}. Records updated: {rule.RecordsUpdated}");
|
||||
totalRecords += rule.RecordsUpdated;
|
||||
}
|
||||
LogInfo($"Total records scrubbed: {totalRecords}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ namespace CosmosCloneCommon.Utility
|
|||
public class CosmosBulkImporter
|
||||
{
|
||||
private IBulkExecutor bulkExecutor;
|
||||
|
||||
private static readonly ConnectionPolicy ConnectionPolicy = new ConnectionPolicy
|
||||
{
|
||||
ConnectionMode = ConnectionMode.Direct,
|
||||
|
|
|
@ -118,10 +118,19 @@ namespace CosmosCloneCommon.Utility
|
|||
{
|
||||
try
|
||||
{
|
||||
var newConnectionPolicy = new ConnectionPolicy
|
||||
{
|
||||
ConnectionMode = ConnectionMode.Direct,
|
||||
ConnectionProtocol = Protocol.Tcp,
|
||||
RetryOptions = new RetryOptions()
|
||||
};
|
||||
newConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 20;
|
||||
newConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 600;
|
||||
|
||||
string targetEndpointUrl = CloneSettings.TargetSettings.EndpointUrl;
|
||||
string targetAccessKey = CloneSettings.TargetSettings.AccessKey;
|
||||
OfferThroughput = CloneSettings.TargetMigrationOfferThroughputRUs;
|
||||
var targetDocumentClient = new DocumentClient(new Uri(targetEndpointUrl), targetAccessKey, ConnectionPolicy);
|
||||
var targetDocumentClient = new DocumentClient(new Uri(targetEndpointUrl), targetAccessKey, newConnectionPolicy);
|
||||
return targetDocumentClient;
|
||||
}
|
||||
catch (Exception ex)
|
||||
|
@ -130,7 +139,33 @@ namespace CosmosCloneCommon.Utility
|
|||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public DocumentClient GetTargetDocumentDbClient(ConnectionPolicy connectionPolicy)
|
||||
{
|
||||
try
|
||||
{
|
||||
var newConnectionPolicy = new ConnectionPolicy
|
||||
{
|
||||
ConnectionMode = ConnectionMode.Direct,
|
||||
ConnectionProtocol = Protocol.Tcp,
|
||||
RetryOptions = new RetryOptions()
|
||||
};
|
||||
newConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 20;
|
||||
newConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 600;
|
||||
|
||||
string targetEndpointUrl = CloneSettings.TargetSettings.EndpointUrl;
|
||||
string targetAccessKey = CloneSettings.TargetSettings.AccessKey;
|
||||
OfferThroughput = CloneSettings.TargetMigrationOfferThroughputRUs;
|
||||
var targetDocumentClient = new DocumentClient(new Uri(targetEndpointUrl), targetAccessKey, newConnectionPolicy);
|
||||
return targetDocumentClient;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogError(ex);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<DocumentCollection> GetTargetDocumentCollection(DocumentClient targetClient)
|
||||
{
|
||||
try
|
||||
|
@ -307,6 +342,53 @@ namespace CosmosCloneCommon.Utility
|
|||
}
|
||||
return true;
|
||||
}
|
||||
public bool CheckSourceReadability()
|
||||
{
|
||||
string sourceDatabaseName = CloneSettings.SourceSettings.DatabaseName;
|
||||
string sourceCollectionName = CloneSettings.SourceSettings.CollectionName;
|
||||
string topOneRecordQuery = "SELECT TOP 1 * FROM c";
|
||||
FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true };
|
||||
//long sourceTotalRecordCount, targetTotalRecordCount;
|
||||
try {
|
||||
using (var cosmosClient = GetSourceDocumentDbClient())
|
||||
{
|
||||
var document = cosmosClient.CreateDocumentQuery<dynamic>(
|
||||
UriFactory.CreateDocumentCollectionUri(sourceDatabaseName, sourceCollectionName), topOneRecordQuery, queryOptions)
|
||||
.AsEnumerable().First();
|
||||
if (document != null)
|
||||
return true;
|
||||
}
|
||||
}
|
||||
catch(Exception ex)
|
||||
{
|
||||
logger.LogInfo("Exception during CheckSource Readability");
|
||||
logger.LogError(ex);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public long GetFilterRecordCount(string filterCondition)
|
||||
{
|
||||
//var TargetCosmosDBSettings = CloneSettings.GetConfigurationSection("SourceCosmosDBSettings");
|
||||
string targetDatabaseName = CloneSettings.TargetSettings.DatabaseName;
|
||||
string targetCollectionName = CloneSettings.TargetSettings.CollectionName;
|
||||
|
||||
string totalCountQuery = "SELECT VALUE COUNT(1) FROM c";
|
||||
if(!string.IsNullOrEmpty(filterCondition))
|
||||
{
|
||||
totalCountQuery = "SELECT VALUE COUNT(1) FROM c WHERE "+ filterCondition;
|
||||
}
|
||||
FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true };
|
||||
long totalRecordCount;
|
||||
using (var cosmosClient = GetTargetDocumentDbClient())
|
||||
{
|
||||
totalRecordCount = cosmosClient.CreateDocumentQuery<long>(
|
||||
UriFactory.CreateDocumentCollectionUri(targetDatabaseName, targetCollectionName), totalCountQuery, queryOptions)
|
||||
.AsEnumerable().First();
|
||||
}
|
||||
return totalRecordCount;
|
||||
}
|
||||
|
||||
public bool CompareRecordCount()
|
||||
{
|
||||
//var SourceCosmosDBSettings = CloneSettings.GetConfigurationSection("SourceCosmosDBSettings");
|
||||
|
@ -317,7 +399,7 @@ namespace CosmosCloneCommon.Utility
|
|||
string targetDatabaseName = CloneSettings.TargetSettings.DatabaseName;
|
||||
string targetCollectionName = CloneSettings.TargetSettings.CollectionName;
|
||||
|
||||
string totalCountQuery = "SELECT VALUE COUNT(1) FROM p";
|
||||
string totalCountQuery = "SELECT VALUE COUNT(1) FROM c";
|
||||
FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true };
|
||||
long sourceTotalRecordCount, targetTotalRecordCount;
|
||||
using (var cosmosClient = GetSourceDocumentDbClient())
|
||||
|
|
Загрузка…
Ссылка в новой задаче