Merge pull request #5 from Microsoft/users/krankm/initial

Users/krankm/initial
This commit is contained in:
Kranthi Kumar Medam 2019-01-16 23:36:36 +05:30 коммит произвёл GitHub
Родитель 0b68be68df 0a79b0f589
Коммит 25218fe012
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 145 добавлений и 22 удалений

Просмотреть файл

@ -243,6 +243,7 @@ namespace CosmicCloneUI
void Worker_DoWork2(object sender, DoWorkEventArgs e)
{
Task.Delay(3000).Wait();
long readPercentProgress = 0;
long writePercentProgress = 0;
long scrubPercentProgress = 0;
@ -262,12 +263,22 @@ namespace CosmicCloneUI
readPercentProgress = (DocumentMigrator.TotalRecordsRetrieved * 100) / DocumentMigrator.TotalRecordsInSource;
writePercentProgress = (DocumentMigrator.TotalRecordsSent * 100) / DocumentMigrator.TotalRecordsInSource;
}
if(CloneSettings.ScrubbingRequired && DocumentMigrator.scrubRules!=null && DocumentMigrator.scrubRules.Count>0)
else
{
readPercentProgress = 100;
writePercentProgress = 100;
}
if (CloneSettings.ScrubbingRequired && DocumentMigrator.scrubRules != null && DocumentMigrator.scrubRules.Count > 0)
{
scrubPercentProgress = DocumentMigrator.ScrubPercentProgress;
}
else
{
if (DocumentMigrator.scrubRules == null || DocumentMigrator.scrubRules.Count == 0) scrubPercentProgress = 100;
else scrubPercentProgress = 0;
}
else scrubPercentProgress = 100;
}
@ -280,14 +291,15 @@ namespace CosmicCloneUI
{
int receivePercent = e.ProgressPercentage;
int writePercent = (receivePercent % 1000);
int readPercent = (receivePercent % 1000000) / 1000;
int scrubPercent = receivePercent/1000000;
int scrubPercent = receivePercent / 1000000;
((ProgressBar)pages[4].FindName("ReadProgress")).Value = readPercent;
((ProgressBar)pages[4].FindName("WriteProgress")).Value = writePercent;
((ProgressBar)pages[4].FindName("ScrubProgress")).Value = scrubPercent;
var statustextbox = ((TextBox)pages[4].FindName("StatusTextBlock"));
statustextbox.Text = logger.FullLog;
statustextbox.ScrollToEnd();

Просмотреть файл

@ -61,6 +61,7 @@ namespace CosmosCloneCommon.Migrator
logger.LogInfo($"Initialize process for scrub rule on filter {filterCondition}");
var sRules = scrubRules.Where(o => o.FilterCondition.Equals(filterCondition)).ToList();
logger.LogInfo($"Scrub rules found {sRules.Count}");
long filterRecordCount = cosmosHelper.GetFilterRecordCount(filterCondition);
ScrubDataFetchQuery = cosmosHelper.GetScrubDataDocumentQuery<string>(targetClient, filterCondition, CloneSettings.ReadBatchSize);
await ReadUploadInbatches((IDocumentQuery<string>)ScrubDataFetchQuery, sRules);
@ -69,6 +70,7 @@ namespace CosmosCloneCommon.Migrator
if(srule.FilterCondition.Equals(filterCondition))
{
srule.IsComplete = true;
srule.RecordsByFilter = filterRecordCount;
}
}
}
@ -115,6 +117,7 @@ namespace CosmosCloneCommon.Migrator
nentities.Add(JsonConvert.SerializeObject(jobj));
}
scrubbedEntities = nentities;
scrubRule.RecordsUpdated += jEntities.Count;
}
var objEntities = jEntities.Cast<Object>().ToList();
try
@ -125,8 +128,7 @@ namespace CosmosCloneCommon.Migrator
{
logger.LogError(ex);
throw (ex);
}
}
}
badEntities = uploadResponse.BadInputDocuments;
TotalRecordsScrubbed += uploadResponse.NumberOfDocumentsImported;

Просмотреть файл

@ -47,6 +47,7 @@ namespace CosmosCloneCommon.Migrator
public static long TotalRecordsSent { get; set; }
public static long TotalRecordsInSource { get; set; }
public static bool IsCodeMigrationComplete { get; set; }
protected static bool IsInitialized { get; set; }
public static int ScrubPercentProgress
{
@ -55,18 +56,19 @@ namespace CosmosCloneCommon.Migrator
if (scrubRules != null && scrubRules.Count > 0)
{
int totalRules = scrubRules.Count();
int noFilterRuleCompleteCount = scrubRules.Where(x => x.IsComplete == true && string.IsNullOrEmpty(x.FilterCondition) ).ToList().Count();
int noFilterRuleCompleteCount = scrubRules.Where(x => x.IsComplete == true && string.IsNullOrEmpty(x.FilterCondition)).ToList().Count();
int filterRuleCompleteCount = 0;
if(DataScrubMigrator.scrubRules!=null && DataScrubMigrator.scrubRules.Count>0)
if (DataScrubMigrator.scrubRules != null && DataScrubMigrator.scrubRules.Count > 0)
{
filterRuleCompleteCount = DataScrubMigrator.scrubRules.Where(x => x.IsComplete == true).ToList().Count();
}
var completedRules = scrubRules.Where(x => x.IsComplete == true).ToList().Count();
int percent = (int)((noFilterRuleCompleteCount + filterRuleCompleteCount) * 100 / scrubRules.Count());
return percent;
}
else return 100;
else if (IsInitialized) return 100;
else return 0;
}
}
@ -80,6 +82,7 @@ namespace CosmosCloneCommon.Migrator
}
public async Task<bool> StartCopy(List<ScrubRule> scrubRules = null)
{
IsCodeMigrationComplete = false;
DocumentMigrator.scrubRules = scrubRules;
await InitializeMigration();
@ -94,17 +97,18 @@ namespace CosmosCloneCommon.Migrator
if (CloneSettings.ScrubbingRequired && noFilterScrubRules != null && noFilterScrubRules.Count > 0)
{
var dcs = new DataScrubMigrator();
var result = dcs.StartScrub(noFilterScrubRules);
var result = await dcs.StartScrub(noFilterScrubRules);
}
}
if (CloneSettings.ScrubbingRequired && filteredScrubRules != null && filteredScrubRules.Count > 0)
{
var dcs = new DataScrubMigrator();
var result = dcs.StartScrub(filteredScrubRules);
var result = await dcs.StartScrub(filteredScrubRules);
}
IsCodeMigrationComplete = false;
logger.LogScrubRulesInformation(DocumentMigrator.scrubRules);
if (CloneSettings.CopyStoredProcedures) { await CopyStoredProcedures(); }
if (CloneSettings.CopyUDFs) { await CopyUDFs(); }
if (CloneSettings.CopyTriggers) { await CopyTriggers(); }
@ -118,6 +122,7 @@ namespace CosmosCloneCommon.Migrator
logger.LogInfo($"Source Database: {CloneSettings.SourceSettings.DatabaseName} Source Collection: {CloneSettings.SourceSettings.CollectionName}");
logger.LogInfo($"Target Database: {CloneSettings.TargetSettings.DatabaseName} Target Collection: {CloneSettings.TargetSettings.CollectionName}");
IsInitialized = true;
sourceClient = cosmosHelper.GetSourceDocumentDbClient();
sourceCollection = await cosmosHelper.GetSourceDocumentCollection(sourceClient);
@ -145,6 +150,7 @@ namespace CosmosCloneCommon.Migrator
{
if(string.IsNullOrEmpty(sRule.FilterCondition))
{
sRule.RecordsByFilter = TotalRecordsInSource;
noFilterScrubRules.Add(sRule);
}
else
@ -193,7 +199,7 @@ namespace CosmosCloneCommon.Migrator
}
else
{
var jEntities = new List<JToken>();
var jEntities = new List<JToken>();
foreach (var sRule in noFilterScrubRules)
{
jEntities = objectScrubber.ScrubObjectList(scrubbedEntities, sRule);
@ -203,6 +209,7 @@ namespace CosmosCloneCommon.Migrator
nentities.Add(JsonConvert.SerializeObject(jobj));
}
scrubbedEntities = nentities;
sRule.RecordsUpdated += jEntities.Count;
}
var objDocuments = jEntities.Cast<Object>().ToList();
uploadResponse = await cosmosBulkImporter.BulkSendToNewCollection<dynamic>(objDocuments);

Просмотреть файл

@ -19,9 +19,9 @@ namespace CosmosCloneCommon.Model
public RuleType? Type { get; set; }
private int RecordsByFilter { get; set; }
private int RecordsUpdated { get; set; }
private int CountNullAttributes { get; set; }
public long RecordsByFilter { get; set; }
public long RecordsUpdated { get; set; }
private long CountNullAttributes { get; set; }
public bool IsComplete { get; set; }
public ScrubRule() { }
@ -35,8 +35,9 @@ namespace CosmosCloneCommon.Model
this.RuleId = ruleId;
this.IsComplete = false;
}
}
public enum RuleType { SingleValue, NullValue, Shuffle };//Can add random later if required.
public enum RuleType { SingleValue, NullValue, Shuffle };//Can add random rule type later if required.
}

Просмотреть файл

@ -8,6 +8,7 @@ using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Diagnostics;
using CosmosCloneCommon.Model;
namespace CosmosCloneCommon.Utility
{
@ -47,5 +48,22 @@ namespace CosmosCloneCommon.Utility
Exception baseException = e.GetBaseException();
LogInfo($"Error: {e.Message}, Message: {baseException.Message}");
}
public static void LogScrubRulesInformation(List<ScrubRule> scrubRules)
{
if (scrubRules!=null && scrubRules.Count>0 && CloneSettings.ScrubbingRequired)
{
long totalRecords=0;
foreach (var rule in scrubRules)
{
LogInfo($"Rule Id: {rule.RuleId}. Attribute: {rule.PropertyName}");
LogInfo($"Rule filter:{(string.IsNullOrEmpty(rule.FilterCondition) ? "None":rule.FilterCondition)}");
LogInfo($"Rule Type: {rule.Type.ToString()}");
LogInfo($"Records by filter: {rule.RecordsByFilter}. Records updated: {rule.RecordsUpdated}");
totalRecords += rule.RecordsUpdated;
}
LogInfo($"Total records scrubbed: {totalRecords}");
}
}
}
}

Просмотреть файл

@ -18,6 +18,7 @@ namespace CosmosCloneCommon.Utility
public class CosmosBulkImporter
{
private IBulkExecutor bulkExecutor;
private static readonly ConnectionPolicy ConnectionPolicy = new ConnectionPolicy
{
ConnectionMode = ConnectionMode.Direct,

Просмотреть файл

@ -118,10 +118,19 @@ namespace CosmosCloneCommon.Utility
{
try
{
var newConnectionPolicy = new ConnectionPolicy
{
ConnectionMode = ConnectionMode.Direct,
ConnectionProtocol = Protocol.Tcp,
RetryOptions = new RetryOptions()
};
newConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 20;
newConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 600;
string targetEndpointUrl = CloneSettings.TargetSettings.EndpointUrl;
string targetAccessKey = CloneSettings.TargetSettings.AccessKey;
OfferThroughput = CloneSettings.TargetMigrationOfferThroughputRUs;
var targetDocumentClient = new DocumentClient(new Uri(targetEndpointUrl), targetAccessKey, ConnectionPolicy);
var targetDocumentClient = new DocumentClient(new Uri(targetEndpointUrl), targetAccessKey, newConnectionPolicy);
return targetDocumentClient;
}
catch (Exception ex)
@ -130,7 +139,33 @@ namespace CosmosCloneCommon.Utility
throw;
}
}
public DocumentClient GetTargetDocumentDbClient(ConnectionPolicy connectionPolicy)
{
try
{
var newConnectionPolicy = new ConnectionPolicy
{
ConnectionMode = ConnectionMode.Direct,
ConnectionProtocol = Protocol.Tcp,
RetryOptions = new RetryOptions()
};
newConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 20;
newConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 600;
string targetEndpointUrl = CloneSettings.TargetSettings.EndpointUrl;
string targetAccessKey = CloneSettings.TargetSettings.AccessKey;
OfferThroughput = CloneSettings.TargetMigrationOfferThroughputRUs;
var targetDocumentClient = new DocumentClient(new Uri(targetEndpointUrl), targetAccessKey, newConnectionPolicy);
return targetDocumentClient;
}
catch (Exception ex)
{
logger.LogError(ex);
throw;
}
}
public async Task<DocumentCollection> GetTargetDocumentCollection(DocumentClient targetClient)
{
try
@ -307,6 +342,53 @@ namespace CosmosCloneCommon.Utility
}
return true;
}
public bool CheckSourceReadability()
{
string sourceDatabaseName = CloneSettings.SourceSettings.DatabaseName;
string sourceCollectionName = CloneSettings.SourceSettings.CollectionName;
string topOneRecordQuery = "SELECT TOP 1 * FROM c";
FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true };
//long sourceTotalRecordCount, targetTotalRecordCount;
try {
using (var cosmosClient = GetSourceDocumentDbClient())
{
var document = cosmosClient.CreateDocumentQuery<dynamic>(
UriFactory.CreateDocumentCollectionUri(sourceDatabaseName, sourceCollectionName), topOneRecordQuery, queryOptions)
.AsEnumerable().First();
if (document != null)
return true;
}
}
catch(Exception ex)
{
logger.LogInfo("Exception during CheckSource Readability");
logger.LogError(ex);
}
return false;
}
public long GetFilterRecordCount(string filterCondition)
{
//var TargetCosmosDBSettings = CloneSettings.GetConfigurationSection("SourceCosmosDBSettings");
string targetDatabaseName = CloneSettings.TargetSettings.DatabaseName;
string targetCollectionName = CloneSettings.TargetSettings.CollectionName;
string totalCountQuery = "SELECT VALUE COUNT(1) FROM c";
if(!string.IsNullOrEmpty(filterCondition))
{
totalCountQuery = "SELECT VALUE COUNT(1) FROM c WHERE "+ filterCondition;
}
FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true };
long totalRecordCount;
using (var cosmosClient = GetTargetDocumentDbClient())
{
totalRecordCount = cosmosClient.CreateDocumentQuery<long>(
UriFactory.CreateDocumentCollectionUri(targetDatabaseName, targetCollectionName), totalCountQuery, queryOptions)
.AsEnumerable().First();
}
return totalRecordCount;
}
public bool CompareRecordCount()
{
//var SourceCosmosDBSettings = CloneSettings.GetConfigurationSection("SourceCosmosDBSettings");
@ -317,7 +399,7 @@ namespace CosmosCloneCommon.Utility
string targetDatabaseName = CloneSettings.TargetSettings.DatabaseName;
string targetCollectionName = CloneSettings.TargetSettings.CollectionName;
string totalCountQuery = "SELECT VALUE COUNT(1) FROM p";
string totalCountQuery = "SELECT VALUE COUNT(1) FROM c";
FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true };
long sourceTotalRecordCount, targetTotalRecordCount;
using (var cosmosClient = GetSourceDocumentDbClient())