From bab668c8f68ef66e8a81a58cd32f0c0d23918e90 Mon Sep 17 00:00:00 2001 From: Kranthi Kumar Medam Date: Wed, 16 Jan 2019 21:03:09 +0530 Subject: [PATCH] Added scrub metrics calculation --- .../Migrator/DataScrubMigrator.cs | 6 +- .../Migrator/DocumentMigrator.cs | 10 ++- .../CosmosCloneCommon/Model/ScrubRule.cs | 9 +- .../CosmosCloneCommon/Utility/CloneLogger.cs | 18 ++++ .../Utility/CosmosBulkImporter.cs | 1 + .../Utility/CosmosDBHelper.cs | 88 ++++++++++++++++++- 6 files changed, 120 insertions(+), 12 deletions(-) diff --git a/CosmosClone/CosmosCloneCommon/Migrator/DataScrubMigrator.cs b/CosmosClone/CosmosCloneCommon/Migrator/DataScrubMigrator.cs index 0244a13..f8b8b8f 100644 --- a/CosmosClone/CosmosCloneCommon/Migrator/DataScrubMigrator.cs +++ b/CosmosClone/CosmosCloneCommon/Migrator/DataScrubMigrator.cs @@ -61,6 +61,7 @@ namespace CosmosCloneCommon.Migrator logger.LogInfo($"Initialize process for scrub rule on filter {filterCondition}"); var sRules = scrubRules.Where(o => o.FilterCondition.Equals(filterCondition)).ToList(); logger.LogInfo($"Scrub rules found {sRules.Count}"); + long filterRecordCount = cosmosHelper.GetFilterRecordCount(filterCondition); ScrubDataFetchQuery = cosmosHelper.GetScrubDataDocumentQuery(targetClient, filterCondition, CloneSettings.ReadBatchSize); await ReadUploadInbatches((IDocumentQuery)ScrubDataFetchQuery, sRules); @@ -69,6 +70,7 @@ namespace CosmosCloneCommon.Migrator if(srule.FilterCondition.Equals(filterCondition)) { srule.IsComplete = true; + srule.RecordsByFilter = filterRecordCount; } } } @@ -115,6 +117,7 @@ namespace CosmosCloneCommon.Migrator nentities.Add(JsonConvert.SerializeObject(jobj)); } scrubbedEntities = nentities; + scrubRule.RecordsUpdated += jEntities.Count; } var objEntities = jEntities.Cast().ToList(); try @@ -125,8 +128,7 @@ namespace CosmosCloneCommon.Migrator { logger.LogError(ex); throw (ex); - } - + } } badEntities = uploadResponse.BadInputDocuments; TotalRecordsScrubbed += uploadResponse.NumberOfDocumentsImported; diff --git a/CosmosClone/CosmosCloneCommon/Migrator/DocumentMigrator.cs b/CosmosClone/CosmosCloneCommon/Migrator/DocumentMigrator.cs index cbb6fe1..42934b8 100644 --- a/CosmosClone/CosmosCloneCommon/Migrator/DocumentMigrator.cs +++ b/CosmosClone/CosmosCloneCommon/Migrator/DocumentMigrator.cs @@ -94,16 +94,18 @@ namespace CosmosCloneCommon.Migrator if (CloneSettings.ScrubbingRequired && noFilterScrubRules != null && noFilterScrubRules.Count > 0) { var dcs = new DataScrubMigrator(); - var result = dcs.StartScrub(noFilterScrubRules); + var result = await dcs.StartScrub(noFilterScrubRules); } } if (CloneSettings.ScrubbingRequired && filteredScrubRules != null && filteredScrubRules.Count > 0) { var dcs = new DataScrubMigrator(); - var result = dcs.StartScrub(filteredScrubRules); + var result = await dcs.StartScrub(filteredScrubRules); } + logger.LogScrubRulesInformation(DocumentMigrator.scrubRules); + IsCodeMigrationComplete = false; if (CloneSettings.CopyStoredProcedures) { await CopyStoredProcedures(); } if (CloneSettings.CopyUDFs) { await CopyUDFs(); } @@ -145,6 +147,7 @@ namespace CosmosCloneCommon.Migrator { if(string.IsNullOrEmpty(sRule.FilterCondition)) { + sRule.RecordsByFilter = TotalRecordsInSource; noFilterScrubRules.Add(sRule); } else @@ -193,7 +196,7 @@ namespace CosmosCloneCommon.Migrator } else { - var jEntities = new List(); + var jEntities = new List(); foreach (var sRule in noFilterScrubRules) { jEntities = objectScrubber.ScrubObjectList(scrubbedEntities, sRule); @@ -203,6 +206,7 @@ namespace CosmosCloneCommon.Migrator nentities.Add(JsonConvert.SerializeObject(jobj)); } scrubbedEntities = nentities; + sRule.RecordsUpdated += jEntities.Count; } var objDocuments = jEntities.Cast().ToList(); uploadResponse = await cosmosBulkImporter.BulkSendToNewCollection(objDocuments); diff --git a/CosmosClone/CosmosCloneCommon/Model/ScrubRule.cs b/CosmosClone/CosmosCloneCommon/Model/ScrubRule.cs index b7d3053..97c7895 100644 --- a/CosmosClone/CosmosCloneCommon/Model/ScrubRule.cs +++ b/CosmosClone/CosmosCloneCommon/Model/ScrubRule.cs @@ -19,9 +19,9 @@ namespace CosmosCloneCommon.Model public RuleType? Type { get; set; } - private int RecordsByFilter { get; set; } - private int RecordsUpdated { get; set; } - private int CountNullAttributes { get; set; } + public long RecordsByFilter { get; set; } + public long RecordsUpdated { get; set; } + private long CountNullAttributes { get; set; } public bool IsComplete { get; set; } public ScrubRule() { } @@ -35,8 +35,9 @@ namespace CosmosCloneCommon.Model this.RuleId = ruleId; this.IsComplete = false; } + } - public enum RuleType { SingleValue, NullValue, Shuffle };//Can add random later if required. + public enum RuleType { SingleValue, NullValue, Shuffle };//Can add random rule type later if required. } diff --git a/CosmosClone/CosmosCloneCommon/Utility/CloneLogger.cs b/CosmosClone/CosmosCloneCommon/Utility/CloneLogger.cs index 288bb71..3c6cb4b 100644 --- a/CosmosClone/CosmosCloneCommon/Utility/CloneLogger.cs +++ b/CosmosClone/CosmosCloneCommon/Utility/CloneLogger.cs @@ -8,6 +8,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; using System.Diagnostics; +using CosmosCloneCommon.Model; namespace CosmosCloneCommon.Utility { @@ -47,5 +48,22 @@ namespace CosmosCloneCommon.Utility Exception baseException = e.GetBaseException(); LogInfo($"Error: {e.Message}, Message: {baseException.Message}"); } + + public static void LogScrubRulesInformation(List scrubRules) + { + if (scrubRules!=null && scrubRules.Count>0 && CloneSettings.ScrubbingRequired) + { + long totalRecords=0; + foreach (var rule in scrubRules) + { + LogInfo($"Rule Id: {rule.RuleId}. Attribute: {rule.PropertyName}"); + LogInfo($"Rule filter:{(string.IsNullOrEmpty(rule.FilterCondition) ? "None":rule.FilterCondition)}"); + LogInfo($"Rule Type: {rule.Type.ToString()}"); + LogInfo($"Records by filter: {rule.RecordsByFilter}. Records updated: {rule.RecordsUpdated}"); + totalRecords += rule.RecordsUpdated; + } + LogInfo($"Total records scrubbed: {totalRecords}"); + } + } } } diff --git a/CosmosClone/CosmosCloneCommon/Utility/CosmosBulkImporter.cs b/CosmosClone/CosmosCloneCommon/Utility/CosmosBulkImporter.cs index de3564c..2305139 100644 --- a/CosmosClone/CosmosCloneCommon/Utility/CosmosBulkImporter.cs +++ b/CosmosClone/CosmosCloneCommon/Utility/CosmosBulkImporter.cs @@ -18,6 +18,7 @@ namespace CosmosCloneCommon.Utility public class CosmosBulkImporter { private IBulkExecutor bulkExecutor; + private static readonly ConnectionPolicy ConnectionPolicy = new ConnectionPolicy { ConnectionMode = ConnectionMode.Direct, diff --git a/CosmosClone/CosmosCloneCommon/Utility/CosmosDBHelper.cs b/CosmosClone/CosmosCloneCommon/Utility/CosmosDBHelper.cs index 057a7f2..62b6066 100644 --- a/CosmosClone/CosmosCloneCommon/Utility/CosmosDBHelper.cs +++ b/CosmosClone/CosmosCloneCommon/Utility/CosmosDBHelper.cs @@ -118,10 +118,19 @@ namespace CosmosCloneCommon.Utility { try { + var newConnectionPolicy = new ConnectionPolicy + { + ConnectionMode = ConnectionMode.Direct, + ConnectionProtocol = Protocol.Tcp, + RetryOptions = new RetryOptions() + }; + newConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 20; + newConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 600; + string targetEndpointUrl = CloneSettings.TargetSettings.EndpointUrl; string targetAccessKey = CloneSettings.TargetSettings.AccessKey; OfferThroughput = CloneSettings.TargetMigrationOfferThroughputRUs; - var targetDocumentClient = new DocumentClient(new Uri(targetEndpointUrl), targetAccessKey, ConnectionPolicy); + var targetDocumentClient = new DocumentClient(new Uri(targetEndpointUrl), targetAccessKey, newConnectionPolicy); return targetDocumentClient; } catch (Exception ex) @@ -130,7 +139,33 @@ namespace CosmosCloneCommon.Utility throw; } } - + + public DocumentClient GetTargetDocumentDbClient(ConnectionPolicy connectionPolicy) + { + try + { + var newConnectionPolicy = new ConnectionPolicy + { + ConnectionMode = ConnectionMode.Direct, + ConnectionProtocol = Protocol.Tcp, + RetryOptions = new RetryOptions() + }; + newConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 20; + newConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 600; + + string targetEndpointUrl = CloneSettings.TargetSettings.EndpointUrl; + string targetAccessKey = CloneSettings.TargetSettings.AccessKey; + OfferThroughput = CloneSettings.TargetMigrationOfferThroughputRUs; + var targetDocumentClient = new DocumentClient(new Uri(targetEndpointUrl), targetAccessKey, newConnectionPolicy); + return targetDocumentClient; + } + catch (Exception ex) + { + logger.LogError(ex); + throw; + } + } + public async Task GetTargetDocumentCollection(DocumentClient targetClient) { try @@ -307,6 +342,53 @@ namespace CosmosCloneCommon.Utility } return true; } + public bool CheckSourceReadability() + { + string sourceDatabaseName = CloneSettings.SourceSettings.DatabaseName; + string sourceCollectionName = CloneSettings.SourceSettings.CollectionName; + string topOneRecordQuery = "SELECT TOP 1 * FROM c"; + FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true }; + //long sourceTotalRecordCount, targetTotalRecordCount; + try { + using (var cosmosClient = GetSourceDocumentDbClient()) + { + var document = cosmosClient.CreateDocumentQuery( + UriFactory.CreateDocumentCollectionUri(sourceDatabaseName, sourceCollectionName), topOneRecordQuery, queryOptions) + .AsEnumerable().First(); + if (document != null) + return true; + } + } + catch(Exception ex) + { + logger.LogInfo("Exception during CheckSource Readability"); + logger.LogError(ex); + } + return false; + } + + public long GetFilterRecordCount(string filterCondition) + { + //var TargetCosmosDBSettings = CloneSettings.GetConfigurationSection("SourceCosmosDBSettings"); + string targetDatabaseName = CloneSettings.TargetSettings.DatabaseName; + string targetCollectionName = CloneSettings.TargetSettings.CollectionName; + + string totalCountQuery = "SELECT VALUE COUNT(1) FROM c"; + if(!string.IsNullOrEmpty(filterCondition)) + { + totalCountQuery = "SELECT VALUE COUNT(1) FROM c WHERE "+ filterCondition; + } + FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true }; + long totalRecordCount; + using (var cosmosClient = GetTargetDocumentDbClient()) + { + totalRecordCount = cosmosClient.CreateDocumentQuery( + UriFactory.CreateDocumentCollectionUri(targetDatabaseName, targetCollectionName), totalCountQuery, queryOptions) + .AsEnumerable().First(); + } + return totalRecordCount; + } + public bool CompareRecordCount() { //var SourceCosmosDBSettings = CloneSettings.GetConfigurationSection("SourceCosmosDBSettings"); @@ -317,7 +399,7 @@ namespace CosmosCloneCommon.Utility string targetDatabaseName = CloneSettings.TargetSettings.DatabaseName; string targetCollectionName = CloneSettings.TargetSettings.CollectionName; - string totalCountQuery = "SELECT VALUE COUNT(1) FROM p"; + string totalCountQuery = "SELECT VALUE COUNT(1) FROM c"; FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true }; long sourceTotalRecordCount, targetTotalRecordCount; using (var cosmosClient = GetSourceDocumentDbClient())