Merge pull request #6 from microsoft/synapse_first

Synapse first
This commit is contained in:
gpabon 2022-06-21 11:55:40 -04:00 коммит произвёл GitHub
Родитель 0562d75852 5aff78cc29
Коммит e858bdaef7
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
14 изменённых файлов: 1819 добавлений и 54 удалений

Просмотреть файл

@ -648,4 +648,4 @@
},
"nbformat": 4,
"nbformat_minor": 4
}
}

Просмотреть файл

@ -399,4 +399,4 @@
},
"nbformat": 4,
"nbformat_minor": 2
}
}

Двоичные данные
db_notebooks/iJungle Evaluation.dbc

Двоичный файл не отображается.

Двоичные данные
db_notebooks/iJungle Training.dbc

Двоичный файл не отображается.

Просмотреть файл

@ -1,16 +0,0 @@
{
"name": "AutoResolveIntegrationRuntime",
"properties": {
"type": "Managed",
"typeProperties": {
"computeProperties": {
"location": "AutoResolve",
"dataFlowProperties": {
"computeType": "General",
"coreCount": 8,
"timeToLive": 0
}
}
}
}
}

Просмотреть файл

@ -1,20 +0,0 @@
{
"name": "syw-guspabon-001-WorkspaceDefaultSqlServer",
"type": "Microsoft.Synapse/workspaces/linkedservices",
"properties": {
"typeProperties": {
"connectionString": "Data Source=tcp:syw-guspabon-001.sql.azuresynapse.net,1433;Initial Catalog=@{linkedService().DBName}"
},
"parameters": {
"DBName": {
"type": "String"
}
},
"type": "AzureSqlDW",
"connectVia": {
"referenceName": "AutoResolveIntegrationRuntime",
"type": "IntegrationRuntimeReference"
},
"annotations": []
}
}

Просмотреть файл

@ -1,15 +0,0 @@
{
"name": "syw-guspabon-001-WorkspaceDefaultStorage",
"type": "Microsoft.Synapse/workspaces/linkedservices",
"properties": {
"typeProperties": {
"url": "https://dlsguspabon001.dfs.core.windows.net"
},
"type": "AzureBlobFS",
"connectVia": {
"referenceName": "AutoResolveIntegrationRuntime",
"type": "IntegrationRuntimeReference"
},
"annotations": []
}
}

Просмотреть файл

@ -1 +0,0 @@
{"publishBranch":"workspace_publish"}

Просмотреть файл

@ -0,0 +1,499 @@
{
"name": "01 - ijungle - common - feature engineering",
"properties": {
"nbformat": 4,
"nbformat_minor": 2,
"bigDataPool": {
"referenceName": "small",
"type": "BigDataPoolReference"
},
"sessionProperties": {
"driverMemory": "56g",
"driverCores": 8,
"executorMemory": "56g",
"executorCores": 8,
"numExecutors": 2,
"conf": {
"spark.dynamicAllocation.enabled": "false",
"spark.dynamicAllocation.minExecutors": "2",
"spark.dynamicAllocation.maxExecutors": "2",
"spark.autotune.trackingId": "29868e2b-e68d-4a40-b3e6-abf08d632430"
}
},
"metadata": {
"saveOutput": true,
"enableDebugMode": false,
"kernelspec": {
"name": "synapse_pyspark",
"display_name": "Synapse PySpark"
},
"language_info": {
"name": "python"
},
"a365ComputeOptions": {
"id": "/subscriptions/024dc50c-b360-41e5-9702-f88c150130de/resourceGroups/rg-guspabon-sbx/providers/Microsoft.Synapse/workspaces/syw-guspabon-001/bigDataPools/small",
"name": "small",
"type": "Spark",
"endpoint": "https://syw-guspabon-001.dev.azuresynapse.net/livyApi/versions/2019-11-01-preview/sparkPools/small",
"auth": {
"type": "AAD",
"authResource": "https://dev.azuresynapse.net"
},
"sparkVersion": "3.1",
"nodeCount": 10,
"cores": 8,
"memory": 56,
"automaticScaleJobs": false
},
"sessionKeepAliveTimeout": 30
},
"cells": [
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"from pyspark.sql.types import StructType, StructField, IntegerType, DateType, FloatType, StringType, BooleanType\r\n",
"import pyspark.sql.functions as F\r\n",
"from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer, StandardScalerModel, StringIndexerModel"
],
"execution_count": 1
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"tags": [
"parameters"
]
},
"source": [
"transformed_data_path = 'abfss://transformeddata@eiadapstoreworking.dfs.core.windows.net/TransformedData'\r\n",
"# prepped_data_path = 'abfss://pridevsynapseworkingfs@eiadapstoreworking.dfs.core.windows.net/ijungle/02-Prepped_Data'\r\n",
"prepped_data_path = 'abfss://pridevsynapseworkingfs@eiadapstoreworking.dfs.core.windows.net/ijungle/07-Predict_Prepped_Data'\r\n",
"features_path = 'abfss://pridevsynapseworkingfs@eiadapstoreworking.dfs.core.windows.net/ijungle/02-Features'\r\n",
"id_feat = \"['issuer_id','issued_date']\"\r\n",
"date_feat = 'issued_date'\r\n",
"first_year = \"1950\"\r\n",
"allowed_null_pct = \"0.051\"\r\n",
"# training=\"True\"\r\n",
"training=\"False\""
],
"execution_count": 2
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"# Casting parameters\r\n",
"\r\n",
"id_feat = eval(id_feat)\r\n",
"first_year = int(first_year)\r\n",
"allowed_null_pct = float(allowed_null_pct)\r\n",
"training = bool(training)"
],
"execution_count": 3
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"schema = StructType([\r\n",
" StructField('issuer_id', IntegerType(), False),\r\n",
" StructField('issued_date', DateType(), False),\r\n",
" StructField('number_of_sales', IntegerType(), False),\r\n",
" StructField('count_buyers', IntegerType(), False),\r\n",
" StructField('sum_document_type_C',FloatType(), False),\r\n",
" StructField('sum_document_type_D',FloatType(), False),\r\n",
" StructField('sum_document_type_F',FloatType(), False),\r\n",
" StructField('sum_document_type_P',FloatType(), False),\r\n",
" StructField('sum_document_type_X',FloatType(), False),\r\n",
" StructField('sum_self_transactions',FloatType(), False),\r\n",
" StructField('sum_total_voucher_to_self',FloatType(), False),\r\n",
" StructField('sum_total_taxable_services',FloatType(), False),\r\n",
" StructField('sum_total_non_taxable_services',FloatType(), False),\r\n",
" StructField('sum_total_taxable_goods',FloatType(), False),\r\n",
" StructField('sum_total_non_taxable_goods',FloatType(), False),\r\n",
" StructField('sum_total_taxable',FloatType(), False),\r\n",
" StructField('sum_total_non_taxable',FloatType(), False),\r\n",
" StructField('sum_total_sales',FloatType(), False),\r\n",
" StructField('sum_total_discounts',FloatType(), False),\r\n",
" StructField('sum_total_voucher',FloatType(), False),\r\n",
" StructField('sum_total_tax',FloatType(), False),\r\n",
" StructField('number_of_purchases',IntegerType(), False),\r\n",
" StructField('count_suppliers',FloatType(), False),\r\n",
" StructField('sum_total_purchases',FloatType(), False),\r\n",
" StructField('pagerank_score',FloatType(), False),\r\n",
" StructField('taxpayer_type',StringType(), False),\r\n",
" StructField('taxpayer_size',StringType(), False),\r\n",
" StructField('main_activity',StringType(), False),\r\n",
" StructField('sec1_activity',StringType(), False),\r\n",
" StructField('sec2_activity',StringType(), False),\r\n",
" StructField('employees_number',IntegerType(), False),\r\n",
" StructField('legal_reg_date',DateType(), False),\r\n",
" StructField('tax_reg_date',DateType(), False),\r\n",
" StructField('e_inv_enroll_date',DateType(), False),\r\n",
" StructField('total_capital',FloatType(), False),\r\n",
" StructField('reported_assets',BooleanType(), False),\r\n",
" StructField('social_capital',FloatType(), False),\r\n",
" StructField('total_assets',FloatType(), False),\r\n",
" StructField('total_fixed_assets',FloatType(), False),\r\n",
" StructField('total_liabilities',FloatType(), False),\r\n",
" StructField('gross_income',FloatType(), False),\r\n",
" StructField('net_income',FloatType(), False),\r\n",
" StructField('total_vat_sales',FloatType(), False),\r\n",
" StructField('credited_einvoicing_value',FloatType(), False),\r\n",
" StructField('state',StringType(), False),\r\n",
" StructField('municipality',StringType(), False),\r\n",
" StructField('city',StringType(), False),\r\n",
" StructField('ratio_sales_purchases',FloatType(), False),\r\n",
" StructField('ratio_tax_sales',FloatType(), False),\r\n",
" StructField('ratio_sales_employees',FloatType(), False),\r\n",
" StructField('ratio_buyers_suppliers',FloatType(), False),\r\n",
" StructField('ratio_in_out',FloatType(), False),\r\n",
" StructField('act01',FloatType(), False),\r\n",
" StructField('total_voucher_act01',FloatType(), False),\r\n",
" StructField('act02',FloatType(), False),\r\n",
" StructField('total_voucher_act02',FloatType(), False),\r\n",
" StructField('act03',FloatType(), False),\r\n",
" StructField('total_voucher_act03',FloatType(), False),\r\n",
" StructField('act04',FloatType(), False),\r\n",
" StructField('total_voucher_act04',FloatType(), False),\r\n",
" StructField('act05',FloatType(), False),\r\n",
" StructField('total_voucher_act05',FloatType(), False),\r\n",
" StructField('depth_s',IntegerType(), False),\r\n",
" StructField('depth_r',IntegerType(), False),\r\n",
" StructField('min_depth_of_supply_chain',IntegerType(), False),\r\n",
" StructField('place_in_supply_chain',FloatType(), False)\r\n",
"])"
],
"execution_count": 4
},
{
"cell_type": "code",
"metadata": {
"collapsed": false
},
"source": [
"df = spark.read.schema(schema).csv(\r\n",
" path=transformed_data_path,\r\n",
" header=True)\r\n",
"m = df.count()\r\n",
"print('Number of records {:,}'.format(m))"
],
"execution_count": 5
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": false
},
"source": [
"# Removing features with high percentaje of null values\r\n",
"\r\n",
"allowed_null_feats = []\r\n",
"for feat in df.columns:\r\n",
" null_pct = df.where(F.isnull(feat)).count()/m \r\n",
" if null_pct <= allowed_null_pct:\r\n",
" allowed_null_feats.append(feat)\r\n",
" else:\r\n",
" print(\"Feature {} has {:.2f}% of null values\".format(feat, null_pct*100))\r\n",
"\r\n",
"df_allowed_null = df.select(allowed_null_feats)"
],
"execution_count": 6
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": false
},
"source": [
"# Removing null values\r\n",
"\r\n",
"df_notnull = df_allowed_null\r\n",
"\r\n",
"for feat in df_notnull.schema.fieldNames():\r\n",
" df_notnull = df_notnull.where(~F.isnull(feat))\r\n",
"\r\n",
"print(\"Not null records: {:,}\".format(df_notnull.count()))"
],
"execution_count": 7
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": false
},
"source": [
"# Removing records previous to first year parameter\r\n",
"df_recent = df_notnull.where(F.year(date_feat) >= first_year)\r\n",
"print(\"Number of records since {}: {:,}\".format(first_year, df_recent.count()))"
],
"execution_count": 8
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": false
},
"source": [
"# Date data augmentation\r\n",
"\r\n",
"df_augmented = df_recent.withColumn('_dayofweek', F.dayofweek(date_feat))\r\n",
"df_augmented = df_augmented.withColumn('_dayofmonth', F.dayofmonth(date_feat))\r\n",
"df_augmented = df_augmented.withColumn('_dayofyear', F.dayofyear(date_feat))\r\n",
"df_augmented = df_augmented.withColumn('_weekofyear', F.weekofyear(date_feat))\r\n",
"df_augmented = df_augmented.withColumn('_month', F.month(date_feat))\r\n",
"df_augmented = df_augmented.withColumn('_quarter', F.quarter(date_feat))\r\n",
"df_augmented = df_augmented.withColumn('_year', F.year(date_feat))"
],
"execution_count": 9
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": false
},
"source": [
"# Date to int\r\n",
"\r\n",
"date_feats = [x['name'] for x in df_augmented.schema.jsonValue()['fields'] if x['type']=='date']\r\n",
"\r\n",
"df_date_int = df_augmented\r\n",
"\r\n",
"for feat in date_feats:\r\n",
" print(\"Casting date feature {} to int ...\".format(feat))\r\n",
" df_date_int = df_date_int.withColumn(feat+'_int', F.unix_timestamp(feat))\r\n",
""
],
"execution_count": 10
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": false
},
"source": [
"# String indexers\r\n",
"\r\n",
"string_feats = [x['name'] for x in df_date_int.schema.jsonValue()['fields'] if x['type']=='string']\r\n",
"\r\n",
"df_string_indexed = df_date_int\r\n",
"\r\n",
"for feat in string_feats:\r\n",
" print(\"Indexing string feature {} ...\".format(feat))\r\n",
" if training:\r\n",
" indexer = StringIndexer(inputCol=feat, outputCol=feat+'_indexed', stringOrderType='frequencyDesc')\r\n",
" model = indexer.fit(df_string_indexed)\r\n",
" model.write().overwrite().save('_ijungle_indexer_'+feat+'.pkl')\r\n",
" else:\r\n",
" model = StringIndexerModel.load('_ijungle_indexer_'+feat+'.pkl')\r\n",
" df_string_indexed = model.transform(df_string_indexed)"
],
"execution_count": 11
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": false
},
"source": [
"# Assemble features to scalate\r\n",
"\r\n",
"columns = df_string_indexed.schema.fieldNames()\r\n",
"feats_to_remove = id_feat + date_feats + string_feats\r\n",
"feats = [feat for feat in columns if not feat in feats_to_remove]\r\n",
"assembler = VectorAssembler(inputCols=feats, outputCol='feats')\r\n",
"df_assembled = assembler.transform(df_string_indexed).select(id_feat + ['feats'])"
],
"execution_count": 12
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": false
},
"source": [
"# Store features \r\n",
"if training:\r\n",
" spark.createDataFrame(zip(range(len(feats)), feats),['id','feat']).write.mode('overwrite').parquet(features_path)"
],
"execution_count": 13
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": false
},
"source": [
"# Scalate features\r\n",
"if training:\r\n",
" scaler = StandardScaler(inputCol='feats', outputCol='scaled')\r\n",
" model = scaler.fit(df_assembled)\r\n",
" model.write().overwrite().save('_ijungle_scaler.pkl')\r\n",
"else:\r\n",
" model = StandardScalerModel.load('_ijungle_scaler.pkl')\r\n",
"df_scaled = model.transform(df_assembled).select(id_feat+['scaled'])"
],
"execution_count": 14
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"# Write scaled data as parquet files\r\n",
"\r\n",
"df_scaled.write.mode('overwrite').parquet(prepped_data_path)"
],
"execution_count": 15
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
""
],
"execution_count": null
}
]
}
}

Просмотреть файл

@ -0,0 +1,289 @@
{
"name": "02 - ijungle - training - iforest training",
"properties": {
"nbformat": 4,
"nbformat_minor": 2,
"sessionProperties": {
"driverMemory": "28g",
"driverCores": 4,
"executorMemory": "28g",
"executorCores": 4,
"numExecutors": 2,
"conf": {
"spark.dynamicAllocation.enabled": "false",
"spark.dynamicAllocation.minExecutors": "2",
"spark.dynamicAllocation.maxExecutors": "2",
"spark.autotune.trackingId": "d52c46c4-6394-436d-ac00-29fa77db5c5d"
}
},
"metadata": {
"saveOutput": true,
"enableDebugMode": false,
"kernelspec": {
"name": "synapse_pyspark",
"display_name": "Synapse PySpark"
},
"language_info": {
"name": "python"
},
"sessionKeepAliveTimeout": 30
},
"cells": [
{
"cell_type": "code",
"source": [
"import pyspark.sql.functions as F\r\n",
"from pyspark.sql.window import Window\r\n",
"from pyspark.ml.functions import vector_to_array\r\n",
"import numpy as np\r\n",
"import pandas as pd"
],
"execution_count": 1
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"tags": [
"parameters"
]
},
"source": [
"prepped_data_path = 'abfss://pridevsynapseworkingfs@eiadapstoreworking.dfs.core.windows.net/ijungle/02-Prepped_Data'\r\n",
"iFor_data_prefix = 'abfss://pridevsynapseworkingfs@eiadapstoreworking.dfs.core.windows.net/ijungle/03-iFor'\r\n",
"subsample_list = \"[4096, 2048, 1024, 512]\"\r\n",
"trees_list = \"[500, 100, 20, 10]\"\r\n",
"train_size = \"0.01\"\r\n",
"id_feat = \"['issuer_id','issued_date']\"\r\n",
"seed = \"42\""
],
"execution_count": 2
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"# Cast parameters\r\n",
"subsample_list = eval(subsample_list)\r\n",
"trees_list = eval(trees_list)\r\n",
"train_size = float(train_size)\r\n",
"id_feat = eval(id_feat)\r\n",
"seed = int(seed)"
],
"execution_count": 3
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"max_subsample_size = max(subsample_list)\r\n",
"w = Window().orderBy(F.lit('A'))"
],
"execution_count": 4
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": false
},
"source": [
"df = spark.read.parquet(prepped_data_path)\r\n",
"m = df.count()\r\n",
"print(\"Number of records: {:,}\".format(m))"
],
"execution_count": 5
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"num_groups = int(np.ceil(m*train_size/max_subsample_size))"
],
"execution_count": 6
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": false
},
"source": [
"# Add id to join with group table\r\n",
"df_id = df.withColumn('_id',F.row_number().over(w))"
],
"execution_count": 7
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": false
},
"source": [
"def ijungle_train(id_feat, seed, subsample_size, trees):\r\n",
" def _fun(key, pdf):\r\n",
" from sklearn.ensemble import IsolationForest\r\n",
" import joblib\r\n",
"\r\n",
" group = key[0]\r\n",
" pdf.set_index(id_feat, inplace=True)\r\n",
" feats = list(pdf.columns)\r\n",
" feats.remove('_group')\r\n",
" pdf = pdf[feats]\r\n",
"\r\n",
" clf = IsolationForest(\r\n",
" n_estimators = trees, \r\n",
" max_samples=min(subsample_size, pdf.shape[0]), \r\n",
" random_state=seed, n_jobs=-1)\r\n",
" clf.fit(pdf)\r\n",
"\r\n",
" model_filename = 'iJungle_' + str(group) + '_' + str(trees) + '_' + str(subsample_size) + '.pkl'\r\n",
" joblib.dump(clf, model_filename)\r\n",
"\r\n",
" with open(model_filename, 'rb') as model_file:\r\n",
" model_bytes = model_file.read()\r\n",
"\r\n",
" return(pd.DataFrame([(group, model_bytes)]))\r\n",
" return(_fun)"
],
"execution_count": 8
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": false
},
"source": [
"\r\n",
"for trees in trees_list:\r\n",
" for subsample_size in subsample_list:\r\n",
"\r\n",
" print(\"Training iJungle models for {} number of trees and {} subsample size ...\".format(trees, subsample_size))\r\n",
"\r\n",
" # Random selection of records in groups of subsample size\r\n",
" group_array = np.array([])\r\n",
" for group in range(num_groups):\r\n",
" group_array = np.concatenate([group_array, group * np.ones(subsample_size)])\r\n",
"\r\n",
" group_array = np.concatenate([group_array, -1*np.ones(m-(num_groups*subsample_size))])\r\n",
"\r\n",
" np.random.shuffle(group_array)\r\n",
"\r\n",
" pdf_id_group = pd.DataFrame(group_array, columns=['_group']).reset_index()\r\n",
" pdf_id_group.columns = ['_id', '_group']\r\n",
"\r\n",
" df_id_group = spark.createDataFrame(pdf_id_group)\r\n",
"\r\n",
" # Join of random selection of groups with training data\r\n",
" df_subsamples = df_id.join(df_id_group, on='_id').where(F.col('_group')>=0).select(id_feat+['scaled','_group'])\r\n",
" df_subsamples = df_subsamples.cache()\r\n",
"\r\n",
" # Vector to individual columns to prepare for parallel training\r\n",
" num_feats = len(df_subsamples.head(1)[0]['scaled'])\r\n",
" df_unassembled = df_subsamples.withColumn('f', vector_to_array(\"scaled\")).select(id_feat + ['_group'] + [F.col(\"f\")[i] for i in range(num_feats)])\r\n",
"\r\n",
" # Parallel training using applyInPandas function\r\n",
" df_iFor = df_unassembled.groupBy('_group').applyInPandas(\r\n",
" ijungle_train(id_feat, seed, subsample_size, trees), \r\n",
" schema=\"id long, model binary\"\r\n",
" )\r\n",
"\r\n",
" # Save DataFrame with trained models\r\n",
" iFor_data_path = iFor_data_prefix + '_' + str(trees) + '_' + str(subsample_size)\r\n",
" df_iFor.write.mode('overwrite').parquet(iFor_data_path)"
],
"execution_count": 9
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
""
],
"execution_count": null
}
]
}
}

Просмотреть файл

@ -0,0 +1,261 @@
{
"name": "03 - ijungle - training - overhead",
"properties": {
"nbformat": 4,
"nbformat_minor": 2,
"sessionProperties": {
"driverMemory": "28g",
"driverCores": 4,
"executorMemory": "28g",
"executorCores": 4,
"numExecutors": 2,
"conf": {
"spark.dynamicAllocation.enabled": "false",
"spark.dynamicAllocation.minExecutors": "2",
"spark.dynamicAllocation.maxExecutors": "2",
"spark.autotune.trackingId": "17d09700-3932-4386-ae95-5fcc8f410a21"
}
},
"metadata": {
"saveOutput": true,
"enableDebugMode": false,
"kernelspec": {
"name": "synapse_pyspark",
"display_name": "Synapse PySpark"
},
"language_info": {
"name": "python"
},
"sessionKeepAliveTimeout": 30
},
"cells": [
{
"cell_type": "code",
"source": [
"import pyspark.sql.functions as F\r\n",
"from pyspark.ml.functions import vector_to_array\r\n",
"import numpy as np"
],
"execution_count": 10
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"tags": [
"parameters"
]
},
"source": [
"prepped_data_path = 'abfss://pridevsynapseworkingfs@eiadapstoreworking.dfs.core.windows.net/ijungle/02-Prepped_Data'\r\n",
"iFor_data_prefix = 'abfss://pridevsynapseworkingfs@eiadapstoreworking.dfs.core.windows.net/ijungle/03-iFor'\r\n",
"overhead_data_path = 'abfss://pridevsynapseworkingfs@eiadapstoreworking.dfs.core.windows.net/ijungle/04-Overhead_Data'\r\n",
"overhead_results_prefix = 'abfss://pridevsynapseworkingfs@eiadapstoreworking.dfs.core.windows.net/ijungle/05-iFor_results'\r\n",
"\r\n",
"subsample_list = \"[4096, 2048, 1024, 512]\"\r\n",
"trees_list = \"[500, 100, 20, 10]\"\r\n",
"train_size = \"0.01\"\r\n",
"id_feat = \"['issuer_id','issued_date']\"\r\n",
"id_feat_types = \"['int', 'date']\"\r\n",
"seed = \"42\"\r\n",
"overhead_size = \"0.01\""
],
"execution_count": 11
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"# Casting parameters\r\n",
"\r\n",
"subsample_list = eval(subsample_list)\r\n",
"trees_list = eval(trees_list)\r\n",
"train_size = float(train_size)\r\n",
"id_feat = eval(id_feat)\r\n",
"id_feat_types = eval(id_feat_types)\r\n",
"seed = int(seed)\r\n",
"overhead_size = float(overhead_size)"
],
"execution_count": 12
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": false
},
"source": [
"df = spark.read.parquet(prepped_data_path)\r\n",
"m = df.count()\r\n",
"print(\"Number of records: {:,}\".format(m))"
],
"execution_count": 13
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"max_subsample_size = max(subsample_list)\r\n",
"num_groups = int(np.ceil(m*train_size/max_subsample_size))\r\n",
"print(\"Num groups: \", num_groups)"
],
"execution_count": 14
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": false
},
"source": [
"# Creation of overhead sample\r\n",
"\r\n",
"df_W = df.sample(withReplacement=False, fraction=overhead_size)\r\n",
"df_W.write.mode('overwrite').parquet(overhead_data_path)\r\n",
"df_W = spark.read.parquet(overhead_data_path)\r\n",
"\r\n",
"num_feats = len(df_W.head(1)[0]['scaled'])\r\n",
"df_W_unassembled = df_W.withColumn('f', vector_to_array(\"scaled\")).select(id_feat + [F.col(\"f\")[i] for i in range(num_feats)])\r\n",
"print(\"Number of records of overhead dataset: {:,}\".format(df_W_unassembled.count()))"
],
"execution_count": 15
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"def ijungle_overhead(id_feat, subsample_size, trees, model_bytes, group):\r\n",
" def _fun(iterator):\r\n",
" import pandas as pd\r\n",
" import joblib\r\n",
"\r\n",
" model_filename = 'iJungle_' + str(group) + '_' + str(trees) + '_' + str(subsample_size) + '.pkl'\r\n",
"\r\n",
" with open(model_filename, 'wb') as model_file:\r\n",
" model_file.write(model_bytes)\r\n",
"\r\n",
" clf = joblib.load(model_filename)\r\n",
"\r\n",
" for pdf in iterator:\r\n",
" pdf.set_index(id_feat, inplace=True)\r\n",
" _predict = clf.predict(pdf)\r\n",
" pdf.reset_index(drop=False, inplace=True)\r\n",
" pdf_out = pd.DataFrame()\r\n",
" pdf_out[id_feat] = pdf[id_feat]\r\n",
" pdf_out['predict_'+str(group)] = _predict\r\n",
" yield(pdf_out)\r\n",
" return(_fun)"
],
"execution_count": 16
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": false
},
"source": [
"dcc_str = \", \".join([x[0]+\" \"+x[1] for x in zip(id_feat, id_feat_types)])\r\n",
"\r\n",
"\r\n",
"for trees in trees_list:\r\n",
" for subsample_size in subsample_list:\r\n",
" print(\"Overhead calculation for trees {}, and subsample size {} ...\".format(trees, subsample_size))\r\n",
" iFor_data_path = iFor_data_prefix + '_' + str(trees) + '_' + str(subsample_size)\r\n",
" df_iFor = spark.read.parquet(iFor_data_path)\r\n",
" df_predict = df_W_unassembled.select(id_feat)\r\n",
" for group in range(num_groups):\r\n",
" model_bytes = df_iFor.where(F.col('id')==group).select('model').collect()[0]['model']\r\n",
" df_predict_group =df_W_unassembled.mapInPandas(\r\n",
" ijungle_overhead(id_feat, subsample_size, trees, model_bytes, group), \r\n",
" schema=dcc_str + \", predict_\"+str(group)+\" float\"\r\n",
" )\r\n",
" df_predict = df_predict.join(df_predict_group, on=id_feat)\r\n",
" df_predict.write.mode('overwrite').parquet(overhead_results_prefix + '_' + str(trees) + '_' + str(subsample_size))\r\n",
""
],
"execution_count": 17
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
""
],
"execution_count": null
}
]
}
}

Просмотреть файл

@ -0,0 +1,223 @@
{
"name": "04 - ijungle - training - best iforest",
"properties": {
"nbformat": 4,
"nbformat_minor": 2,
"sessionProperties": {
"driverMemory": "28g",
"driverCores": 4,
"executorMemory": "28g",
"executorCores": 4,
"numExecutors": 2,
"conf": {
"spark.dynamicAllocation.enabled": "false",
"spark.dynamicAllocation.minExecutors": "2",
"spark.dynamicAllocation.maxExecutors": "2",
"spark.autotune.trackingId": "d4734a04-fba7-48e0-ac7f-0058d606f4e5"
}
},
"metadata": {
"saveOutput": true,
"enableDebugMode": false,
"kernelspec": {
"name": "synapse_pyspark",
"display_name": "Synapse PySpark"
},
"language_info": {
"name": "python"
},
"sessionKeepAliveTimeout": 30
},
"cells": [
{
"cell_type": "code",
"source": [
"import pyspark.sql.functions as F\r\n",
"import numpy as np"
],
"execution_count": 1
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"tags": [
"parameters"
]
},
"source": [
"iFor_data_prefix = 'abfss://pridevsynapseworkingfs@eiadapstoreworking.dfs.core.windows.net/ijungle/03-iFor'\r\n",
"overhead_data_path = 'abfss://pridevsynapseworkingfs@eiadapstoreworking.dfs.core.windows.net/ijungle/04-Overhead_Data'\r\n",
"overhead_results_prefix = 'abfss://pridevsynapseworkingfs@eiadapstoreworking.dfs.core.windows.net/ijungle/05-iFor_results'\r\n",
"best_iforest_path = 'abfss://pridevsynapseworkingfs@eiadapstoreworking.dfs.core.windows.net/ijungle/06-best_iforest'\r\n",
"subsample_list = \"[4096, 2048, 1024, 512]\"\r\n",
"trees_list = \"[500, 100, 20, 10]\"\r\n",
"id_feat = \"['issuer_id','issued_date']\""
],
"execution_count": 2
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"# Casting parameters\r\n",
"\r\n",
"subsample_list = eval(subsample_list)\r\n",
"trees_list = eval(trees_list)\r\n",
"id_feat = eval(id_feat)"
],
"execution_count": 3
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": false
},
"source": [
"df = spark.read.parquet(overhead_data_path).select(id_feat)\r\n",
"print(\"Overhead dataset, number of records: {:,}\".format(df.count()))\r\n",
"avg_feat_names = []\r\n",
"for trees in trees_list:\r\n",
" for subsample_size in subsample_list:\r\n",
" print(\"Calculating average result for {} trees and {} subsample size\".format(trees, subsample_size))\r\n",
" overhead_results_path = overhead_results_prefix + '_' + str(trees) + '_' + str(subsample_size)\r\n",
" df_predict = spark.read.parquet(overhead_results_path)\r\n",
" feats = [feat for feat in df_predict.columns if not feat in id_feat] \r\n",
" num_groups = len(feats)\r\n",
" average_fun = sum(map(F.col, feats))/len(feats)\r\n",
" avg_feat_name = 'avg' + '_' + str(trees) + '_' + str(subsample_size)\r\n",
" avg_feat_names.append(avg_feat_name)\r\n",
" df_average = df_predict.withColumn(avg_feat_name,average_fun).select(id_feat + [avg_feat_name])\r\n",
" df = df.join(df_average, on=id_feat)\r\n",
"average_fun = sum(map(F.col, avg_feat_names))/len(avg_feat_names)\r\n",
"df = df.withColumn('avg',average_fun).select(id_feat + ['avg'])"
],
"execution_count": 4
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": false
},
"source": [
"pdf = df.toPandas().set_index(id_feat)\r\n",
"best_l2 = np.inf\r\n",
"\r\n",
"for trees in trees_list:\r\n",
" for subsample_size in subsample_list:\r\n",
" print(\"Calculating the best iForest analyzing {} trees and {} subsample size\".format(trees, subsample_size))\r\n",
" overhead_results_path = overhead_results_prefix + '_' + str(trees) + '_' + str(subsample_size)\r\n",
" df_predict = spark.read.parquet(overhead_results_path)\r\n",
" for group in range(num_groups):\r\n",
" predict_feat = 'predict_'+str(group)\r\n",
" pdf_predict = df_predict.select(id_feat + [predict_feat]).toPandas().set_index(id_feat)\r\n",
" pdf_joined = pdf.join(pdf_predict)\r\n",
" l2 = np.linalg.norm(pdf_joined[predict_feat]-pdf_joined['avg'])\r\n",
" if l2 < best_l2:\r\n",
" best_l2 = l2\r\n",
" best_trees = trees\r\n",
" best_subsample_size = subsample_size\r\n",
" best_group = group\r\n",
"print(\"Best trees\", best_trees)\r\n",
"print(\"Best subsample size\", best_subsample_size)\r\n",
"print(\"Best group\", best_group)\r\n",
""
],
"execution_count": 5
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"iFor_data_path = iFor_data_prefix + '_' + str(best_trees) + '_' + str(best_subsample_size)\r\n",
"df_iFor = spark.read.parquet(iFor_data_path)\r\n",
"model_bytes = df_iFor.where(F.col('id')==best_group).select('model').collect()[0]['model']"
],
"execution_count": 6
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": false
},
"source": [
"spark.createDataFrame([('best_iforest',model_bytes)],schema=['id','model']).write.mode('overwrite').parquet(best_iforest_path)"
],
"execution_count": 7
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
""
],
"execution_count": null
}
]
}
}

Просмотреть файл

@ -0,0 +1,240 @@
{
"name": "05 - ijungle - predict - anomaly detection",
"properties": {
"nbformat": 4,
"nbformat_minor": 2,
"sessionProperties": {
"driverMemory": "28g",
"driverCores": 4,
"executorMemory": "28g",
"executorCores": 4,
"numExecutors": 2,
"conf": {
"spark.dynamicAllocation.enabled": "false",
"spark.dynamicAllocation.minExecutors": "2",
"spark.dynamicAllocation.maxExecutors": "2",
"spark.autotune.trackingId": "7b21d62e-eff9-46ea-bc18-1d6088b60e23"
}
},
"metadata": {
"saveOutput": true,
"enableDebugMode": false,
"kernelspec": {
"name": "synapse_pyspark",
"display_name": "Synapse PySpark"
},
"language_info": {
"name": "python"
},
"sessionKeepAliveTimeout": 30
},
"cells": [
{
"cell_type": "code",
"source": [
"import joblib\r\n",
"from pyspark.ml.functions import vector_to_array\r\n",
"import pyspark.sql.functions as F"
],
"execution_count": 1
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"tags": [
"parameters"
]
},
"source": [
"best_iforest_path = 'abfss://pridevsynapseworkingfs@eiadapstoreworking.dfs.core.windows.net/ijungle/06-best_iforest'\r\n",
"predict_prepped_path = 'abfss://pridevsynapseworkingfs@eiadapstoreworking.dfs.core.windows.net/ijungle/07-Predict_Prepped_Data'\r\n",
"results_path = 'abfss://pridevsynapseworkingfs@eiadapstoreworking.dfs.core.windows.net/ijungle/08-Result_Data'\r\n",
"id_feat = \"['issuer_id','issued_date']\"\r\n",
"id_feat_types = \"['int', 'date']\""
],
"execution_count": 2
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"# Casting parameters\r\n",
"\r\n",
"id_feat = eval(id_feat)\r\n",
"id_feat_types = eval(id_feat_types)"
],
"execution_count": 3
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": false
},
"source": [
"model_bytes = spark.read.parquet(best_iforest_path).head(1)[0]['model']\r\n",
"with open('best_iforest.pkl', 'bw') as model_file:\r\n",
" model_file.write(model_bytes)\r\n",
"clf = joblib.load('best_iforest.pkl')\r\n",
"clf"
],
"execution_count": 4
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": false
},
"source": [
"df = spark.read.parquet(predict_prepped_path)\r\n",
"print(\"Number of records: {:,}\".format(df.count()))"
],
"execution_count": 5
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": false
},
"source": [
"num_feats = len(df.head(1)[0]['scaled'])\r\n",
"print(\"Number of features:\", num_feats)\r\n",
"df_unassembled = df.withColumn('f', vector_to_array(\"scaled\")).select(id_feat + [F.col(\"f\")[i] for i in range(num_feats)])\r\n",
"print(\"Number of records of overhead dataset: {:,}\".format(df_unassembled.count()))"
],
"execution_count": 6
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"def ijungle_predict(id_feat, clf):\r\n",
" def _fun(iterator):\r\n",
" import pandas as pd\r\n",
" for pdf in iterator:\r\n",
" pdf.set_index(id_feat, inplace=True)\r\n",
" _predict = clf.predict(pdf)\r\n",
" _score = clf.score_samples(pdf)\r\n",
" pdf.reset_index(drop=False, inplace=True)\r\n",
" pdf_out = pd.DataFrame()\r\n",
" pdf_out[id_feat] = pdf[id_feat]\r\n",
" pdf_out['predict'] = _predict\r\n",
" pdf_out['score'] = _score\r\n",
" yield(pdf_out)\r\n",
" return(_fun)"
],
"execution_count": 7
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"dcc_str = \", \".join([x[0]+\" \"+x[1] for x in zip(id_feat, id_feat_types)]) + \", predict int, score float\"\r\n",
"df_results = df_unassembled.mapInPandas(ijungle_predict(id_feat, clf),dcc_str)"
],
"execution_count": 8
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"df_results.write.mode('overwrite').parquet(results_path)"
],
"execution_count": 9
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
""
],
"execution_count": null
}
]
}
}

Просмотреть файл

@ -0,0 +1,305 @@
{
"name": "06 - ijungle - predict - interpret",
"properties": {
"nbformat": 4,
"nbformat_minor": 2,
"sessionProperties": {
"driverMemory": "28g",
"driverCores": 4,
"executorMemory": "28g",
"executorCores": 4,
"numExecutors": 2,
"conf": {
"spark.dynamicAllocation.enabled": "false",
"spark.dynamicAllocation.minExecutors": "2",
"spark.dynamicAllocation.maxExecutors": "2",
"spark.autotune.trackingId": "8d8beece-cf5b-4830-95bb-9732514b8bb5"
}
},
"metadata": {
"saveOutput": true,
"enableDebugMode": false,
"kernelspec": {
"name": "synapse_pyspark",
"display_name": "Synapse PySpark"
},
"language_info": {
"name": "python"
},
"sessionKeepAliveTimeout": 30
},
"cells": [
{
"cell_type": "code",
"source": [
"import pyspark.sql.functions as F\r\n",
"from pyspark.ml.functions import vector_to_array\r\n",
"import joblib\r\n",
"import numpy as np"
],
"execution_count": 61
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"tags": [
"parameters"
]
},
"source": [
"best_iforest_path = 'abfss://pridevsynapseworkingfs@eiadapstoreworking.dfs.core.windows.net/ijungle/06-best_iforest'\r\n",
"predict_prepped_path = 'abfss://pridevsynapseworkingfs@eiadapstoreworking.dfs.core.windows.net/ijungle/07-Predict_Prepped_Data'\r\n",
"results_path = 'abfss://pridevsynapseworkingfs@eiadapstoreworking.dfs.core.windows.net/ijungle/08-Result_Data'\r\n",
"features_path = 'abfss://pridevsynapseworkingfs@eiadapstoreworking.dfs.core.windows.net/ijungle/02-Features'\r\n",
"interpret_path = 'abfss://anomalydetectoroutput@eiadapstoreworking.dfs.core.windows.net/Results/iJungle_Output'\r\n",
"id_feat = \"['issuer_id','issued_date']\"\r\n",
"id_feat_types = \"['int', 'date']\"\r\n",
"score_threshold = \"-0.8\""
],
"execution_count": 114
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"# Cast parameters\r\n",
"id_feat = eval(id_feat)\r\n",
"id_feat_types = eval(id_feat_types)\r\n",
"score_threshold = float(score_threshold)"
],
"execution_count": 115
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"df_results = spark.read.parquet(results_path).where(F.col('score')<=score_threshold)\r\n",
"print(\"Number of anomalies found: {:,}\".format(df_results.count()))"
],
"execution_count": 41
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"df_assembled = df_results.join(spark.read.parquet(predict_prepped_path), on=id_feat)\r\n",
"print(\"Number assembled records: {:,}\".format(df_assembled.count()))"
],
"execution_count": 42
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"num_feats = len(df_assembled.head(1)[0]['scaled'])\r\n",
"print(\"Number of features:\", num_feats)\r\n",
"df_unassembled = df_assembled.withColumn('f', vector_to_array(\"scaled\")).select(id_feat + ['score'] + [F.col(\"f\")[i] for i in range(num_feats)])\r\n",
"print(\"Number of records of overhead dataset: {:,}\".format(df_unassembled.count()))"
],
"execution_count": 43
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"model_bytes = spark.read.parquet(best_iforest_path).head(1)[0]['model']\r\n",
"with open('best_iforest.pkl', 'bw') as model_file:\r\n",
" model_file.write(model_bytes)\r\n",
"clf = joblib.load('best_iforest.pkl')\r\n",
"clf"
],
"execution_count": 44
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"feats = np.array([row['feat'] for row in spark.read.parquet(features_path).orderBy('id').collect()])\r\n",
"feats.shape"
],
"execution_count": 63
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"feat3_lst = ['feat1', 'feat2', 'feat3']\r\n",
"score3_lst = ['score1', 'score2', 'score3']\r\n",
"feat_score_lst = list(np.array(list(zip(feat3_lst, score3_lst))).reshape(6))\r\n",
"dcc_str = \", \".join([x[0]+\" \"+x[1] for x in zip(id_feat, id_feat_types)]) + \", score float, \"\r\n",
"dcc_str += \", \".join([x[0]+\" \"+x[1] for x in zip(feat_score_lst, ['string', 'float', 'string', 'float', 'string', 'float'])])"
],
"execution_count": 110
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"def ijungle_interpret(id_feat, clf, feats, feat3_lst, score3_lst):\r\n",
" def _fun(iterator):\r\n",
" import shap\r\n",
" explainer = shap.TreeExplainer(clf)\r\n",
" feat_score_lst = list(np.array(list(zip(feat3_lst, score3_lst))).reshape(6))\r\n",
" for pdf in iterator:\r\n",
" pdf.set_index(id_feat + ['score'], inplace=True)\r\n",
" shap_values = explainer.shap_values(pdf)\r\n",
" top_feats = feats[shap_values.argsort()[:,:3]]\r\n",
" top_scores = np.sort(shap_values)[:,:3]\r\n",
" pdf_out = pdf.reset_index()\r\n",
" pdf_out = pdf_out[id_feat + ['score']]\r\n",
" pdf_out[feat3_lst] = top_feats\r\n",
" pdf_out[score3_lst] = top_scores\r\n",
" pdf_out = pdf_out[id_feat + ['score'] + feat_score_lst]\r\n",
" yield(pdf_out)\r\n",
" return(_fun)"
],
"execution_count": 104
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": false
},
"source": [
"df_result = df_unassembled.mapInPandas(ijungle_interpret(id_feat, clf, feats, feat3_lst, score3_lst),dcc_str)\r\n",
"print(\"Number of anomalies with intepretation: {:,}\".format(df_result.count()))"
],
"execution_count": 113
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"df_results.write.mode('overwrite').parquet(interpret_path)"
],
"execution_count": 116
},
{
"cell_type": "code",
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
""
],
"execution_count": null
}
]
}
}