Merge branch 'master' into users/baala/fix_issue#12
This commit is contained in:
Коммит
48cf2bdca6
|
@ -34,7 +34,7 @@ namespace CosmicCloneUI
|
|||
|
||||
public void SetStatus()
|
||||
{
|
||||
//StatusTextBlock.Text = CloneLogger.getFullLog();
|
||||
//StatusTextBlock.Text = CloneLogger.GetFullLog();
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -340,17 +340,16 @@ namespace CosmicCloneUI
|
|||
if (uiElement.GetType().Name == "TextBox")
|
||||
{
|
||||
TextBox tb = (TextBox) uiElement;
|
||||
string name = tb.Name.Substring(0, tb.Name.Length - 1);
|
||||
|
||||
if (name == "Filter")
|
||||
if (tb.Name.StartsWith("Filter"))
|
||||
{
|
||||
sr.FilterCondition = tb.Text.Trim();
|
||||
}
|
||||
else if (name == "ScrubAttribute")
|
||||
else if (tb.Name.StartsWith("ScrubAttribute"))
|
||||
{
|
||||
sr.PropertyName = tb.Text.Trim();
|
||||
}
|
||||
else if (name == "ScrubValue")
|
||||
else if (tb.Name.StartsWith("ScrubValue"))
|
||||
{
|
||||
sr.UpdateValue = tb.Text.Trim();
|
||||
}
|
||||
|
@ -359,8 +358,7 @@ namespace CosmicCloneUI
|
|||
if (uiElement.GetType().Name == "ComboBox")
|
||||
{
|
||||
ComboBox cb = (ComboBox) uiElement;
|
||||
string name = cb.Name.Substring(0, cb.Name.Length - 1);
|
||||
if (name == "ScrubType")
|
||||
if (cb.Name.StartsWith("ScrubType"))
|
||||
{
|
||||
//sr.Type = (RuleType) Enum.Parse(typeof(RuleType), cb.Text);
|
||||
RuleType rType;
|
||||
|
|
|
@ -77,7 +77,7 @@
|
|||
<Compile Include="Migrator\DocumentMigrator.cs" />
|
||||
<Compile Include="Model\RandomNumberGenerator.cs" />
|
||||
<Compile Include="Model\ScrubRule.cs" />
|
||||
<Compile Include="Model\Validationresult.cs" />
|
||||
<Compile Include="Model\ValidationResult.cs" />
|
||||
<Compile Include="Properties\AssemblyInfo.cs" />
|
||||
<Compile Include="Utility\CloneLogger.cs" />
|
||||
<Compile Include="Utility\CloneSerializer.cs" />
|
||||
|
|
|
@ -15,195 +15,195 @@ using Microsoft.Azure.Documents.Client;
|
|||
namespace CosmosCloneCommon.Migrator
|
||||
{
|
||||
public class CodeMigrator
|
||||
{
|
||||
#region declare variables
|
||||
private CosmosDBHelper cosmosHelper;
|
||||
private CosmosBulkImporter cosmosBulkImporter;
|
||||
protected DocumentClient sourceClient;
|
||||
protected DocumentClient targetClient;
|
||||
protected DocumentCollection sourceCollection;
|
||||
protected DocumentCollection targetCollection;
|
||||
private string SourceEndpointUrl;
|
||||
private string SourceAccessKey;
|
||||
private string sourceDatabaseName;
|
||||
private string sourceCollectionName;
|
||||
|
||||
private string TargetEndpointUrl;
|
||||
private string TargetAccessKey;
|
||||
private string TargetDatabaseName;
|
||||
private string TargetCollectionName;
|
||||
//private EntitySummary summary;
|
||||
|
||||
#endregion
|
||||
|
||||
public CodeMigrator()
|
||||
{
|
||||
#region declare variables
|
||||
private CosmosDBHelper cosmosHelper;
|
||||
private CosmosBulkImporter cosmosBulkImporter;
|
||||
protected DocumentClient sourceClient;
|
||||
protected DocumentClient targetClient;
|
||||
protected DocumentCollection sourceCollection;
|
||||
protected DocumentCollection targetCollection;
|
||||
private string SourceEndpointUrl;
|
||||
private string SourceAccessKey;
|
||||
private string sourceDatabaseName;
|
||||
private string sourceCollectionName;
|
||||
//initialize settings and other utilities
|
||||
var SourceCosmosDBSettings = CloneSettings.GetConfigurationSection("SourceCosmosDBSettings");
|
||||
SourceEndpointUrl = CloneSettings.SourceSettings.EndpointUrl; ;
|
||||
SourceAccessKey = CloneSettings.SourceSettings.AccessKey;
|
||||
sourceDatabaseName = CloneSettings.SourceSettings.DatabaseName;
|
||||
sourceCollectionName = CloneSettings.SourceSettings.CollectionName;
|
||||
|
||||
private string TargetEndpointUrl;
|
||||
private string TargetAccessKey;
|
||||
private string TargetDatabaseName;
|
||||
private string TargetCollectionName;
|
||||
//private EntitySummary summary;
|
||||
|
||||
#endregion
|
||||
|
||||
public CodeMigrator()
|
||||
{
|
||||
//initialize settings and other utilities
|
||||
var SourceCosmosDBSettings = CloneSettings.GetConfigurationSection("SourceCosmosDBSettings");
|
||||
SourceEndpointUrl = CloneSettings.SourceSettings.EndpointUrl; ;
|
||||
SourceAccessKey = CloneSettings.SourceSettings.AccessKey;
|
||||
sourceDatabaseName = CloneSettings.SourceSettings.DatabaseName;
|
||||
sourceCollectionName = CloneSettings.SourceSettings.CollectionName;
|
||||
|
||||
//var TargetCosmosDBSettings = CloneSettings.GetConfigurationSection("TargetCosmosDBSettings");
|
||||
TargetEndpointUrl = CloneSettings.TargetSettings.EndpointUrl;
|
||||
TargetAccessKey = CloneSettings.TargetSettings.AccessKey;
|
||||
TargetDatabaseName = CloneSettings.TargetSettings.DatabaseName;
|
||||
TargetCollectionName = CloneSettings.TargetSettings.CollectionName;
|
||||
|
||||
cosmosHelper = new CosmosDBHelper();
|
||||
cosmosBulkImporter = new CosmosBulkImporter();
|
||||
//summary = new EntitySummary();
|
||||
//summary.EntityType = "DBCode";
|
||||
}
|
||||
private async Task Initialize()
|
||||
{
|
||||
logger.LogInfo("-----------------------------------------------");
|
||||
logger.LogInfo("Begin CosmosDBCodeMigrator");
|
||||
|
||||
sourceClient = cosmosHelper.GetSourceDocumentDbClient();
|
||||
sourceCollection = await cosmosHelper.GetSourceDocumentCollection(sourceClient);
|
||||
|
||||
targetClient = cosmosHelper.GetTargetDocumentDbClient();
|
||||
targetCollection = await cosmosHelper.CreateTargetDocumentCollection(targetClient, sourceCollection.IndexingPolicy, sourceCollection.PartitionKey);
|
||||
}
|
||||
|
||||
public async Task<bool> StartCopy()
|
||||
{
|
||||
await Initialize();
|
||||
if (CloneSettings.CopyStoredProcedures) { await CopyStoredProcedures(); }
|
||||
if (CloneSettings.CopyUDFs) { await CopyUDFs(); }
|
||||
if (CloneSettings.CopyTriggers) { await CopyTriggers(); }
|
||||
|
||||
//summary.isMigrationComplete = true;
|
||||
logger.LogInfo("CosmosDBCodeMigrator End");
|
||||
logger.LogInfo("-----------------------------------------------");
|
||||
//return summary;
|
||||
return true;
|
||||
}
|
||||
|
||||
private async Task CopyTriggers()
|
||||
{
|
||||
try
|
||||
{
|
||||
logger.LogInfo("-----------------------------------------------");
|
||||
logger.LogInfo("Begin CopyTriggers");
|
||||
FeedOptions feedOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true };
|
||||
var requestOptions = new RequestOptions { OfferEnableRUPerMinuteThroughput = true };
|
||||
var triggerFeedResponse = await sourceClient.ReadTriggerFeedAsync(UriFactory.CreateDocumentCollectionUri(sourceDatabaseName, sourceCollectionName), feedOptions);
|
||||
var triggerList = triggerFeedResponse.ToList();
|
||||
logger.LogInfo($"Triggers retrieved from source {triggerList.Count}");
|
||||
//summary.totalRecordsRetrieved += triggerList.Count;
|
||||
|
||||
var targetResponse = await targetClient.ReadTriggerFeedAsync(UriFactory.CreateDocumentCollectionUri(TargetDatabaseName, TargetCollectionName), feedOptions);
|
||||
var targetTriggerList = targetResponse.ToList();
|
||||
logger.LogInfo($"Triggers already in target {targetTriggerList.Count}");
|
||||
var targetTriggerIds = new HashSet<string>();
|
||||
targetTriggerList.ForEach(sp => targetTriggerIds.Add(sp.Id));
|
||||
|
||||
foreach (var trigger in triggerList)
|
||||
{
|
||||
if (targetTriggerIds.Contains(trigger.Id))
|
||||
{
|
||||
logger.LogInfo($"Trigger {trigger.Id} already Exists in destination DB");
|
||||
continue;
|
||||
}
|
||||
logger.LogInfo($"Create Trigger {trigger.Id} start");
|
||||
await targetClient.CreateTriggerAsync(UriFactory.CreateDocumentCollectionUri(TargetDatabaseName, TargetCollectionName), trigger, requestOptions);
|
||||
logger.LogInfo($"Create Trigger {trigger.Id} complete");
|
||||
//summary.totalRecordsSent++;
|
||||
}
|
||||
logger.LogInfo("Copy Triggers end.");
|
||||
logger.LogInfo("");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogInfo("Exception while CopyTriggers");
|
||||
logger.LogError(ex);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task CopyUDFs()
|
||||
{
|
||||
try
|
||||
{
|
||||
logger.LogInfo("-----------------------------------------------");
|
||||
logger.LogInfo("Begin CopyUDFs");
|
||||
FeedOptions feedOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true };
|
||||
var udfFeedResponse = await sourceClient.ReadUserDefinedFunctionFeedAsync(UriFactory.CreateDocumentCollectionUri(sourceDatabaseName, sourceCollectionName), feedOptions);
|
||||
var udfList = udfFeedResponse.ToList<UserDefinedFunction>();
|
||||
logger.LogInfo($"UDFs retrieved from source {udfList.Count}");
|
||||
//summary.totalRecordsRetrieved += udfList.Count;
|
||||
|
||||
var targetResponse = await targetClient.ReadUserDefinedFunctionFeedAsync(UriFactory.CreateDocumentCollectionUri(TargetDatabaseName, TargetCollectionName), feedOptions);
|
||||
var targetUdfList = targetResponse.ToList();
|
||||
logger.LogInfo($"Triggers already in target {targetUdfList.Count}");
|
||||
var targetUDFIds = new HashSet<string>();
|
||||
targetUdfList.ForEach(sp => targetUDFIds.Add(sp.Id));
|
||||
|
||||
var requestOptions = new RequestOptions { OfferEnableRUPerMinuteThroughput = true };
|
||||
foreach (var udf in udfList)
|
||||
{
|
||||
if (targetUDFIds.Contains(udf.Id))
|
||||
{
|
||||
logger.LogInfo($"UDF {udf.Id} already Exists in destination DB");
|
||||
continue;
|
||||
}
|
||||
logger.LogInfo($"Create Trigger {udf.Id} start");
|
||||
await targetClient.CreateUserDefinedFunctionAsync(UriFactory.CreateDocumentCollectionUri(TargetDatabaseName, TargetCollectionName), udf, requestOptions);
|
||||
logger.LogInfo($"Create Trigger {udf.Id} complete");
|
||||
//summary.totalRecordsSent++;
|
||||
}
|
||||
logger.LogInfo("CopyUDFs end.");
|
||||
logger.LogInfo("");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogInfo("Exception while CopyUDFs");
|
||||
logger.LogError(ex);
|
||||
}
|
||||
}
|
||||
private async Task CopyStoredProcedures()
|
||||
{
|
||||
try
|
||||
{
|
||||
logger.LogInfo("-----------------------------------------------");
|
||||
logger.LogInfo("Begin CopyStoredProcedures");
|
||||
FeedOptions feedOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true };
|
||||
var requestOptions = new RequestOptions { OfferEnableRUPerMinuteThroughput = true };
|
||||
var sourceResponse = await sourceClient.ReadStoredProcedureFeedAsync(UriFactory.CreateDocumentCollectionUri(sourceDatabaseName, sourceCollectionName), feedOptions);
|
||||
var splist = sourceResponse.ToList<StoredProcedure>();
|
||||
logger.LogInfo($"StoredProcedures retrieved from source {splist.Count}");
|
||||
//summary.totalRecordsRetrieved += splist.Count;
|
||||
|
||||
var targetResponse = await targetClient.ReadStoredProcedureFeedAsync(UriFactory.CreateDocumentCollectionUri(TargetDatabaseName, TargetCollectionName), feedOptions);
|
||||
var targetSPList = targetResponse.ToList();
|
||||
logger.LogInfo($"StoredProcedures already retrieved in target {targetSPList.Count}");
|
||||
var targetSPIds = new HashSet<string>();
|
||||
targetSPList.ForEach(sp => targetSPIds.Add(sp.Id));
|
||||
|
||||
foreach (var sp in splist)
|
||||
{
|
||||
if (targetSPIds.Contains(sp.Id))
|
||||
{
|
||||
logger.LogInfo($"StoredProcedure {sp.Id} already Exists in destination DB");
|
||||
continue;
|
||||
}
|
||||
logger.LogInfo($"Create StoredProcedure {sp.Id} start");
|
||||
await targetClient.CreateStoredProcedureAsync(UriFactory.CreateDocumentCollectionUri(TargetDatabaseName, TargetCollectionName), sp, requestOptions);
|
||||
logger.LogInfo($"Create StoredProcedure {sp.Id} complete");
|
||||
//summary.totalRecordsSent++;
|
||||
}
|
||||
logger.LogInfo("CopyStoredProcedures end.");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogInfo("Exception while CopyStoredProcedures");
|
||||
logger.LogError(ex);
|
||||
logger.LogInfo("");
|
||||
}
|
||||
}
|
||||
//var TargetCosmosDBSettings = CloneSettings.GetConfigurationSection("TargetCosmosDBSettings");
|
||||
TargetEndpointUrl = CloneSettings.TargetSettings.EndpointUrl;
|
||||
TargetAccessKey = CloneSettings.TargetSettings.AccessKey;
|
||||
TargetDatabaseName = CloneSettings.TargetSettings.DatabaseName;
|
||||
TargetCollectionName = CloneSettings.TargetSettings.CollectionName;
|
||||
|
||||
cosmosHelper = new CosmosDBHelper();
|
||||
cosmosBulkImporter = new CosmosBulkImporter();
|
||||
//summary = new EntitySummary();
|
||||
//summary.EntityType = "DBCode";
|
||||
}
|
||||
|
||||
|
||||
private async Task Initialize()
|
||||
{
|
||||
logger.LogInfo("-----------------------------------------------");
|
||||
logger.LogInfo("Begin CosmosDBCodeMigrator");
|
||||
|
||||
sourceClient = cosmosHelper.GetSourceDocumentDbClient();
|
||||
sourceCollection = await cosmosHelper.GetSourceDocumentCollection(sourceClient);
|
||||
|
||||
targetClient = cosmosHelper.GetTargetDocumentDbClient();
|
||||
targetCollection = await cosmosHelper.CreateTargetDocumentCollection(targetClient, sourceCollection.IndexingPolicy, sourceCollection.PartitionKey);
|
||||
}
|
||||
|
||||
public async Task<bool> StartCopy()
|
||||
{
|
||||
await Initialize();
|
||||
if (CloneSettings.CopyStoredProcedures) { await CopyStoredProcedures(); }
|
||||
if (CloneSettings.CopyUDFs) { await CopyUDFs(); }
|
||||
if (CloneSettings.CopyTriggers) { await CopyTriggers(); }
|
||||
|
||||
//summary.isMigrationComplete = true;
|
||||
logger.LogInfo("CosmosDBCodeMigrator End");
|
||||
logger.LogInfo("-----------------------------------------------");
|
||||
//return summary;
|
||||
return true;
|
||||
}
|
||||
|
||||
private async Task CopyTriggers()
|
||||
{
|
||||
try
|
||||
{
|
||||
logger.LogInfo("-----------------------------------------------");
|
||||
logger.LogInfo("Begin CopyTriggers");
|
||||
FeedOptions feedOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true };
|
||||
var requestOptions = new RequestOptions { OfferEnableRUPerMinuteThroughput = true };
|
||||
var triggerFeedResponse = await sourceClient.ReadTriggerFeedAsync(UriFactory.CreateDocumentCollectionUri(sourceDatabaseName, sourceCollectionName), feedOptions);
|
||||
var triggerList = triggerFeedResponse.ToList();
|
||||
logger.LogInfo($"Triggers retrieved from source {triggerList.Count}");
|
||||
//summary.totalRecordsRetrieved += triggerList.Count;
|
||||
|
||||
var targetResponse = await targetClient.ReadTriggerFeedAsync(UriFactory.CreateDocumentCollectionUri(TargetDatabaseName, TargetCollectionName), feedOptions);
|
||||
var targetTriggerList = targetResponse.ToList();
|
||||
logger.LogInfo($"Triggers already in target {targetTriggerList.Count}");
|
||||
var targetTriggerIds = new HashSet<string>();
|
||||
targetTriggerList.ForEach(sp => targetTriggerIds.Add(sp.Id));
|
||||
|
||||
foreach (var trigger in triggerList)
|
||||
{
|
||||
if (targetTriggerIds.Contains(trigger.Id))
|
||||
{
|
||||
logger.LogInfo($"Trigger {trigger.Id} already Exists in destination DB");
|
||||
continue;
|
||||
}
|
||||
logger.LogInfo($"Create Trigger {trigger.Id} start");
|
||||
await targetClient.CreateTriggerAsync(UriFactory.CreateDocumentCollectionUri(TargetDatabaseName, TargetCollectionName), trigger, requestOptions);
|
||||
logger.LogInfo($"Create Trigger {trigger.Id} complete");
|
||||
//summary.totalRecordsSent++;
|
||||
}
|
||||
logger.LogInfo("Copy Triggers end.");
|
||||
logger.LogInfo("");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogInfo("Exception while CopyTriggers");
|
||||
logger.LogError(ex);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task CopyUDFs()
|
||||
{
|
||||
try
|
||||
{
|
||||
logger.LogInfo("-----------------------------------------------");
|
||||
logger.LogInfo("Begin CopyUDFs");
|
||||
FeedOptions feedOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true };
|
||||
var udfFeedResponse = await sourceClient.ReadUserDefinedFunctionFeedAsync(UriFactory.CreateDocumentCollectionUri(sourceDatabaseName, sourceCollectionName), feedOptions);
|
||||
var udfList = udfFeedResponse.ToList<UserDefinedFunction>();
|
||||
logger.LogInfo($"UDFs retrieved from source {udfList.Count}");
|
||||
//summary.totalRecordsRetrieved += udfList.Count;
|
||||
|
||||
var targetResponse = await targetClient.ReadUserDefinedFunctionFeedAsync(UriFactory.CreateDocumentCollectionUri(TargetDatabaseName, TargetCollectionName), feedOptions);
|
||||
var targetUdfList = targetResponse.ToList();
|
||||
logger.LogInfo($"Triggers already in target {targetUdfList.Count}");
|
||||
var targetUDFIds = new HashSet<string>();
|
||||
targetUdfList.ForEach(sp => targetUDFIds.Add(sp.Id));
|
||||
|
||||
var requestOptions = new RequestOptions { OfferEnableRUPerMinuteThroughput = true };
|
||||
foreach (var udf in udfList)
|
||||
{
|
||||
if (targetUDFIds.Contains(udf.Id))
|
||||
{
|
||||
logger.LogInfo($"UDF {udf.Id} already Exists in destination DB");
|
||||
continue;
|
||||
}
|
||||
logger.LogInfo($"Create Trigger {udf.Id} start");
|
||||
await targetClient.CreateUserDefinedFunctionAsync(UriFactory.CreateDocumentCollectionUri(TargetDatabaseName, TargetCollectionName), udf, requestOptions);
|
||||
logger.LogInfo($"Create Trigger {udf.Id} complete");
|
||||
//summary.totalRecordsSent++;
|
||||
}
|
||||
logger.LogInfo("CopyUDFs end.");
|
||||
logger.LogInfo("");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogInfo("Exception while CopyUDFs");
|
||||
logger.LogError(ex);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task CopyStoredProcedures()
|
||||
{
|
||||
try
|
||||
{
|
||||
logger.LogInfo("-----------------------------------------------");
|
||||
logger.LogInfo("Begin CopyStoredProcedures");
|
||||
FeedOptions feedOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true };
|
||||
var requestOptions = new RequestOptions { OfferEnableRUPerMinuteThroughput = true };
|
||||
var sourceResponse = await sourceClient.ReadStoredProcedureFeedAsync(UriFactory.CreateDocumentCollectionUri(sourceDatabaseName, sourceCollectionName), feedOptions);
|
||||
var splist = sourceResponse.ToList<StoredProcedure>();
|
||||
logger.LogInfo($"StoredProcedures retrieved from source {splist.Count}");
|
||||
//summary.totalRecordsRetrieved += splist.Count;
|
||||
|
||||
var targetResponse = await targetClient.ReadStoredProcedureFeedAsync(UriFactory.CreateDocumentCollectionUri(TargetDatabaseName, TargetCollectionName), feedOptions);
|
||||
var targetSPList = targetResponse.ToList();
|
||||
logger.LogInfo($"StoredProcedures already retrieved in target {targetSPList.Count}");
|
||||
var targetSPIds = new HashSet<string>();
|
||||
targetSPList.ForEach(sp => targetSPIds.Add(sp.Id));
|
||||
|
||||
foreach (var sp in splist)
|
||||
{
|
||||
if (targetSPIds.Contains(sp.Id))
|
||||
{
|
||||
logger.LogInfo($"StoredProcedure {sp.Id} already Exists in destination DB");
|
||||
continue;
|
||||
}
|
||||
logger.LogInfo($"Create StoredProcedure {sp.Id} start");
|
||||
await targetClient.CreateStoredProcedureAsync(UriFactory.CreateDocumentCollectionUri(TargetDatabaseName, TargetCollectionName), sp, requestOptions);
|
||||
logger.LogInfo($"Create StoredProcedure {sp.Id} complete");
|
||||
//summary.totalRecordsSent++;
|
||||
}
|
||||
logger.LogInfo("CopyStoredProcedures end.");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogInfo("Exception while CopyStoredProcedures");
|
||||
logger.LogError(ex);
|
||||
logger.LogInfo("");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,11 +36,13 @@ namespace CosmosCloneCommon.Migrator
|
|||
public long TotalRecordsScrubbed { get; set; }
|
||||
public long TotalRecordsToScrub { get; set; }
|
||||
#endregion
|
||||
|
||||
public DataScrubMigrator()
|
||||
{
|
||||
cosmosHelper = new CosmosDBHelper();
|
||||
cosmosBulkImporter = new CosmosBulkImporter();
|
||||
}
|
||||
|
||||
public async Task<bool> StartScrub(List<ScrubRule> scrubRules)
|
||||
{
|
||||
DataScrubMigrator.scrubRules = scrubRules;
|
||||
|
@ -63,7 +65,7 @@ namespace CosmosCloneCommon.Migrator
|
|||
logger.LogInfo($"Scrub rules found {sRules.Count}");
|
||||
long filterRecordCount = cosmosHelper.GetFilterRecordCount(filterCondition);
|
||||
ScrubDataFetchQuery = cosmosHelper.GetScrubDataDocumentQuery<string>(targetClient, filterCondition, CloneSettings.ReadBatchSize);
|
||||
await ReadUploadInbatches((IDocumentQuery<string>)ScrubDataFetchQuery, sRules);
|
||||
await ReadUploadInBatches((IDocumentQuery<string>)ScrubDataFetchQuery, sRules);
|
||||
|
||||
foreach(var srule in DataScrubMigrator.scrubRules)
|
||||
{
|
||||
|
@ -77,6 +79,7 @@ namespace CosmosCloneCommon.Migrator
|
|||
|
||||
return true;
|
||||
}
|
||||
|
||||
public async Task InitializeMigration()
|
||||
{
|
||||
logger.LogInfo("Initialize data scrubbing");
|
||||
|
@ -85,7 +88,7 @@ namespace CosmosCloneCommon.Migrator
|
|||
await cosmosBulkImporter.InitializeBulkExecutor(targetClient, targetCollection);
|
||||
}
|
||||
|
||||
public async Task ReadUploadInbatches(IDocumentQuery<string> query, List<ScrubRule> scrubRules)
|
||||
public async Task ReadUploadInBatches(IDocumentQuery<string> query, List<ScrubRule> scrubRules)
|
||||
{
|
||||
#region batchVariables
|
||||
//initialize Batch Process variables
|
||||
|
@ -127,7 +130,7 @@ namespace CosmosCloneCommon.Migrator
|
|||
catch(Exception ex)
|
||||
{
|
||||
logger.LogError(ex);
|
||||
throw (ex);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
badEntities = uploadResponse.BadInputDocuments;
|
||||
|
@ -172,7 +175,6 @@ namespace CosmosCloneCommon.Migrator
|
|||
System.Threading.Thread.Sleep(ReadDelaybetweenRequestsInMs);
|
||||
return entities;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -88,7 +88,7 @@ namespace CosmosCloneCommon.Migrator
|
|||
await InitializeMigration();
|
||||
if (CloneSettings.CopyDocuments)
|
||||
{
|
||||
await ReadUploadInbatches((IDocumentQuery<dynamic>)SourceCommonDataFetchQuery);
|
||||
await ReadUploadInBatches((IDocumentQuery<dynamic>)SourceCommonDataFetchQuery);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -161,9 +161,8 @@ namespace CosmosCloneCommon.Migrator
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public async Task ReadUploadInbatches(IDocumentQuery<dynamic> query)
|
||||
public async Task ReadUploadInBatches(IDocumentQuery<dynamic> query)
|
||||
{
|
||||
|
||||
#region batchVariables
|
||||
|
@ -236,7 +235,7 @@ namespace CosmosCloneCommon.Migrator
|
|||
{
|
||||
//sourceClient = cosmosHelper.GetSourceDocumentDbClient();
|
||||
//sourceCollection = await cosmosHelper.GetSourceDocumentCollection(sourceClient);
|
||||
var setCorrect = await cosmosHelper.SetTargetRestOfferThroughPut();
|
||||
var setCorrect = await cosmosHelper.SetTargetRestOfferThroughput();
|
||||
}
|
||||
public bool SetCompleteOnNoFilterRules()
|
||||
{
|
||||
|
|
|
@ -14,10 +14,12 @@ namespace CosmosCloneCommon.Model
|
|||
{
|
||||
return _random.Next(0,maxValue);
|
||||
}
|
||||
|
||||
public static int GetNext()
|
||||
{
|
||||
return _random.Next(1, 99999999);
|
||||
}
|
||||
|
||||
public static string GetRandomEntityType()
|
||||
{
|
||||
switch (RandomNumberGenerator.GetNext(5))
|
||||
|
@ -31,7 +33,6 @@ namespace CosmosCloneCommon.Model
|
|||
case 4:
|
||||
return "Partner";
|
||||
case 0:
|
||||
return "External";
|
||||
default:
|
||||
return "External";
|
||||
}
|
||||
|
|
|
@ -37,7 +37,6 @@ namespace CosmosCloneCommon.Model
|
|||
}
|
||||
|
||||
}
|
||||
public enum RuleType { SingleValue, NullValue, Shuffle, PartialMaskFromLeft, PartialMaskFromRight };//Can add random rule type later if required.
|
||||
|
||||
|
||||
}
|
||||
public enum RuleType { SingleValue, NullValue, Shuffle, PartialMaskFromLeft, PartialMaskFromRight };//Can add random rule type later if required.
|
||||
}
|
|
@ -28,20 +28,23 @@ namespace CosmosCloneCommon.Utility
|
|||
}
|
||||
}
|
||||
|
||||
public static string getFullLog()
|
||||
public static string GetFullLog()
|
||||
{
|
||||
return _logBuilder.ToString();
|
||||
}
|
||||
|
||||
public static void LogInfo(string info)
|
||||
{
|
||||
Console.WriteLine(info);
|
||||
_logBuilder.Append("\n"+info);
|
||||
}
|
||||
|
||||
public static void LogError(string s)
|
||||
{
|
||||
LogInfo("Error Occurred");
|
||||
LogInfo(s);
|
||||
}
|
||||
|
||||
public static void LogError(Exception e)
|
||||
{
|
||||
LogInfo("LogError");
|
||||
|
|
|
@ -97,7 +97,6 @@ namespace CosmosCloneCommon.Utility
|
|||
return result;
|
||||
}
|
||||
|
||||
|
||||
public DocumentClient GetSourceDocumentDbClient()
|
||||
{
|
||||
try
|
||||
|
@ -210,7 +209,6 @@ namespace CosmosCloneCommon.Utility
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
public async Task<DocumentCollection> GetSourceDocumentCollection(DocumentClient sourceClient)
|
||||
{
|
||||
try
|
||||
|
@ -274,6 +272,7 @@ namespace CosmosCloneCommon.Utility
|
|||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
public IQueryable<T> GetSourceEntityDocumentQuery<T>(DocumentClient sourceClient, int batchSize = -1)
|
||||
{
|
||||
try
|
||||
|
@ -292,6 +291,7 @@ namespace CosmosCloneCommon.Utility
|
|||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
public IQueryable<T> GetScrubDataDocumentQuery<T>(DocumentClient targetClient,string filterCondition, int batchSize = -1)
|
||||
{
|
||||
try
|
||||
|
@ -321,7 +321,7 @@ namespace CosmosCloneCommon.Utility
|
|||
}
|
||||
}
|
||||
|
||||
public async Task<bool> SetTargetRestOfferThroughPut()
|
||||
public async Task<bool> SetTargetRestOfferThroughput()
|
||||
{
|
||||
using (var client = GetTargetDocumentDbClient())
|
||||
{
|
||||
|
|
|
@ -22,7 +22,7 @@ namespace CosmosCloneCommon.Utility
|
|||
{
|
||||
try
|
||||
{
|
||||
JToken jToken = getUpdatedJsonArrayValue((JToken)JObject.Parse(strObj), propNames, scrubRule.UpdateValue, scrubRule.Type);
|
||||
JToken jToken = GetUpdatedJsonArrayValue((JToken)JObject.Parse(strObj), propNames, scrubRule.UpdateValue, scrubRule.Type);
|
||||
scrubbedObjects.Add(jToken);
|
||||
}
|
||||
catch(Exception ex)
|
||||
|
@ -43,7 +43,7 @@ namespace CosmosCloneCommon.Utility
|
|||
try
|
||||
{
|
||||
List<JToken> jTokenList = new List<JToken>();
|
||||
getPropertyValues((JToken)JObject.Parse(strObj), propNames, ref jTokenList);
|
||||
GetPropertyValues((JToken)JObject.Parse(strObj), propNames, ref jTokenList);
|
||||
propertyValues.AddRange(jTokenList);
|
||||
}
|
||||
catch (Exception ex)
|
||||
|
@ -62,7 +62,7 @@ namespace CosmosCloneCommon.Utility
|
|||
{
|
||||
try
|
||||
{
|
||||
JToken jToken = getDocumentShuffledToken((JToken)JObject.Parse(strObj), propNames, ref shuffledTokenQ);
|
||||
JToken jToken = GetDocumentShuffledToken((JToken)JObject.Parse(strObj), propNames, ref shuffledTokenQ);
|
||||
scrubbedObjects.Add(jToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
|
@ -84,9 +84,7 @@ namespace CosmosCloneCommon.Utility
|
|||
return scrubbedObjects;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public List<JToken> getPropertyValues(JToken token, List<string> propNames, ref List<JToken> jTokenList)
|
||||
public List<JToken> GetPropertyValues(JToken token, List<string> propNames, ref List<JToken> jTokenList)
|
||||
{
|
||||
if(jTokenList == null)
|
||||
{
|
||||
|
@ -120,7 +118,7 @@ namespace CosmosCloneCommon.Utility
|
|||
}
|
||||
else
|
||||
{
|
||||
getPropertyValues(jArray[k], propNames.GetRange(1, propNames.Count - 1), ref jTokenList);
|
||||
GetPropertyValues(jArray[k], propNames.GetRange(1, propNames.Count - 1), ref jTokenList);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
@ -141,14 +139,14 @@ namespace CosmosCloneCommon.Utility
|
|||
}
|
||||
else
|
||||
{
|
||||
getPropertyValues((JToken)jObj[currentProperty], propNames.GetRange(1, propNames.Count - 1), ref jTokenList);
|
||||
GetPropertyValues((JToken)jObj[currentProperty], propNames.GetRange(1, propNames.Count - 1), ref jTokenList);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return jTokenList;
|
||||
}
|
||||
public JToken getDocumentShuffledToken(JToken token, List<string> propNames, ref Queue<JToken> tokenQ)
|
||||
public JToken GetDocumentShuffledToken(JToken token, List<string> propNames, ref Queue<JToken> tokenQ)
|
||||
{
|
||||
if (token == null || token.Type == JTokenType.Null) return null;
|
||||
|
||||
|
@ -175,7 +173,7 @@ namespace CosmosCloneCommon.Utility
|
|||
}
|
||||
else
|
||||
{
|
||||
jArray[k] = getDocumentShuffledToken(jArray[k], propNames.GetRange(1, propNames.Count - 1), ref tokenQ);
|
||||
jArray[k] = GetDocumentShuffledToken(jArray[k], propNames.GetRange(1, propNames.Count - 1), ref tokenQ);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
@ -194,7 +192,7 @@ namespace CosmosCloneCommon.Utility
|
|||
}
|
||||
else
|
||||
{
|
||||
jObj[currentProperty] = getDocumentShuffledToken((JToken)jObj[currentProperty], propNames.GetRange(1, propNames.Count - 1), ref tokenQ);
|
||||
jObj[currentProperty] = GetDocumentShuffledToken((JToken)jObj[currentProperty], propNames.GetRange(1, propNames.Count - 1), ref tokenQ);
|
||||
}
|
||||
var str3 = jObj.ToString();
|
||||
jTokenResult = (JToken)jObj;
|
||||
|
@ -207,7 +205,7 @@ namespace CosmosCloneCommon.Utility
|
|||
return jTokenResult;
|
||||
}
|
||||
|
||||
public JToken getUpdatedJsonArrayValue(JToken token, List<string> propNames, string overwritevalue, RuleType? ruleType)
|
||||
public JToken GetUpdatedJsonArrayValue(JToken token, List<string> propNames, string overwritevalue, RuleType? ruleType)
|
||||
{
|
||||
if (token == null || token.Type == JTokenType.Null) return null;
|
||||
|
||||
|
@ -237,7 +235,7 @@ namespace CosmosCloneCommon.Utility
|
|||
{
|
||||
if (jArray[k] != null && jArray[k][currentProperty].Type != JTokenType.Null)
|
||||
{
|
||||
jArray[k] = getUpdatedJsonArrayValue(jArray[k], propNames.GetRange(1, propNames.Count - 1), overwritevalue, ruleType);
|
||||
jArray[k] = GetUpdatedJsonArrayValue(jArray[k], propNames.GetRange(1, propNames.Count - 1), overwritevalue, ruleType);
|
||||
continue;
|
||||
}
|
||||
//else return null;
|
||||
|
@ -260,7 +258,7 @@ namespace CosmosCloneCommon.Utility
|
|||
{
|
||||
if (jObj[currentProperty] != null && jObj[currentProperty].Type != JTokenType.Null)
|
||||
{
|
||||
jObj[currentProperty] = getUpdatedJsonArrayValue((JToken)jObj[currentProperty], propNames.GetRange(1, propNames.Count - 1), overwritevalue, ruleType);
|
||||
jObj[currentProperty] = GetUpdatedJsonArrayValue((JToken)jObj[currentProperty], propNames.GetRange(1, propNames.Count - 1), overwritevalue, ruleType);
|
||||
}
|
||||
//else return null;
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче