зеркало из
1
0
Форкнуть 0
This commit is contained in:
Vimalraj Anbarasu 2023-07-06 08:31:02 -07:00
Родитель 39ca566048
Коммит f17b35c78c
2 изменённых файлов: 184 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,162 @@
{
"$schema": "http://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"factoryName": {
"type": "string",
"metadata": "Data Factory name"
}
},
"variables": {
"factoryId": "[concat('Microsoft.DataFactory/factories/', parameters('factoryName'))]"
},
"resources": [
{
"name": "[concat(parameters('factoryName'), '/DCSForAzure_SQL_Pipeline')]",
"type": "Microsoft.DataFactory/factories/pipelines",
"apiVersion": "2018-06-01",
"properties": {
"activities": [
{
"name": "DCS For Azure SQL Data Flow",
"type": "ExecuteDataFlow",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataflow": {
"referenceName": "DCSForAzure_SQL_Dataflow",
"type": "DataFlowReference",
"parameters": {
"runId": {
"value": "'@{pipeline().RunId}'",
"type": "Expression"
}
}
},
"compute": {
"coreCount": 8,
"computeType": "General"
},
"traceLevel": "Fine",
"runConcurrently": true,
"continueOnError": true
}
}
],
"annotations": [],
"lastPublishTime": "2023-06-01T14:03:55Z"
},
"dependsOn": [
"[concat(variables('factoryId'), '/dataflows/DCSForAzure_SQL_Dataflow')]"
]
},
{
"name": "[concat(parameters('factoryName'), '/DCSForAzure_SQL_Dataflow')]",
"type": "Microsoft.DataFactory/factories/dataflows",
"apiVersion": "2018-06-01",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"name": "AzureSQLSource",
"description": "Source table that will be masked"
}
],
"sinks": [
{
"name": "AzureSQLSink",
"description": "Destination table for masked data"
}
],
"transformations": [
{
"name": "DCSForAzureAPI"
},
{
"name": "AggregateColumnsByBatch",
"description": "Aggregate columns to be masked by Batch size."
},
{
"name": "FlattenAggregateData",
"description": "Flatten the masked aggregated data"
},
{
"name": "SelectColumnsUnmasked",
"description": "Get all unmasked columns from the source table"
},
{
"name": "JoinMaskedAndUnmaskedData",
"description": "Join the masked and unmasked columns"
},
{
"name": "SortByKeyColumn",
"description": "Select a key column to sort"
},
{
"name": "CreateSurrogateKey",
"description": "Add new key DELPHIX_COMPLIANCE_SERVICE_BATCH_ID starting from 1 with step 1"
},
{
"name": "AssertNoFailures",
"description": "Check if there are any failed request"
}
],
"scriptLines": [
"parameters{",
" runId as string (\"\")",
"}",
"source(allowSchemaDrift: true,",
" validateSchema: false,",
" partitionBy('roundRobin', 32)) ~> AzureSQLSource",
"AggregateColumnsByBatch call(mapColumn(",
" items = All",
" ),",
" skipDuplicateMapInputs: false,",
" skipDuplicateMapOutputs: false,",
" allowSchemaDrift: true) ~> DCSForAzureAPI",
"CreateSurrogateKey aggregate(groupBy(Batch = DELPHIX_COMPLIANCE_SERVICE_BATCH_ID%4),",
" All = collect(@(",
"DELPHIX_COMPLIANCE_SERVICE_BATCH_ID = DELPHIX_COMPLIANCE_SERVICE_BATCH_ID))) ~> AggregateColumnsByBatch",
"AssertNoFailures foldDown(unroll(body.items),",
" mapColumn(",
" DELPHIX_COMPLIANCE_SERVICE_BATCH_ID = body.items.DELPHIX_COMPLIANCE_SERVICE_BATCH_ID",
" ),",
" skipDuplicateMapInputs: false,",
" skipDuplicateMapOutputs: false) ~> FlattenAggregateData",
"CreateSurrogateKey select(mapColumn(",
" DELPHIX_COMPLIANCE_SERVICE_BATCH_ID",
" ),",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true) ~> SelectColumnsUnmasked",
"SelectColumnsUnmasked, FlattenAggregateData join(SelectColumnsUnmasked@DELPHIX_COMPLIANCE_SERVICE_BATCH_ID == FlattenAggregateData@DELPHIX_COMPLIANCE_SERVICE_BATCH_ID,",
" joinType:'inner',",
" matchType:'exact',",
" ignoreSpaces: false,",
" broadcast: 'auto')~> JoinMaskedAndUnmaskedData",
"AzureSQLSource sort(asc(ERROR_FUNCTION(''), true)) ~> SortByKeyColumn",
"SortByKeyColumn keyGenerate(output(DELPHIX_COMPLIANCE_SERVICE_BATCH_ID as long),",
" startAt: 1L,",
" stepValue: 1L) ~> CreateSurrogateKey",
"DCSForAzureAPI assert(expectTrue(toInteger(regexExtract(status, '(\\\\d+)', 1)) == 200, false, 'Failed_request', null, iif(isNull(body.message), status, concatWS(', ', 'timestamp: ' + toString(body.timestamp), 'status: ' + body.status, 'message: ' + body.message, 'trace_id: ' + body.trace_id))),",
" abort: true) ~> AssertNoFailures",
"JoinMaskedAndUnmaskedData sink(allowSchemaDrift: true,",
" validateSchema: false,",
" truncate: true,",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true,",
" format: 'table') ~> AzureSQLSink"
]
}
},
"dependsOn": []
}
]
}

Различия файлов скрыты, потому что одна или несколько строк слишком длинны