This commit is contained in:
Omri Mendels 2018-07-03 13:01:24 +03:00
Родитель 287c0d96f3
Коммит 08cd1feefa
13 изменённых файлов: 903870 добавлений и 55 удалений

58
.gitignore поставляемый
Просмотреть файл

@ -3,9 +3,6 @@ __pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
@ -35,36 +32,9 @@ MANIFEST
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
.log
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
metastore_db/
# PyBuilder
target/
@ -75,30 +45,8 @@ target/
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.mypy_cache/

10
.travis.yml Normal file
Просмотреть файл

@ -0,0 +1,10 @@
language: python
python:
- "3.6"
install:
- pip install --upgrade databricks-cli
script:
- python -c "dummy=0"
deploy:
- provider: script
script: bash databricks/deploy/1_configure/deploy_job.sh -r "westeurope" -t $DATABRICKS_TOKEN -j "*.config.json"

Различия файлов скрыты, потому что одна или несколько строк слишком длинны

Просмотреть файл

@ -0,0 +1,787 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Productionization - scaling the process for 70 million events / day\n",
"\n",
"This notebook describes the driver safety estimation process in Spark. In order to use it, one needs to install pyspark or run on a spark cluster. \n",
"\n",
"Follow these instructions to install pyspark:\n",
"- Windows: https://medium.com/@GalarnykMichael/install-spark-on-windows-pyspark-4498a5d8d66c\n",
"- MacOS / Linux: https://blog.sicara.com/get-started-pyspark-jupyter-guide-tutorial-ae2fe84f594f"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Initiate Spark session:"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
" <div>\n",
" <p><b>SparkSession - hive</b></p>\n",
" \n",
" <div>\n",
" <p><b>SparkContext</b></p>\n",
"\n",
" <p><a href=\"http://vahemesw-dt.fareast.corp.microsoft.com:4041\">Spark UI</a></p>\n",
"\n",
" <dl>\n",
" <dt>Version</dt>\n",
" <dd><code>v2.3.0</code></dd>\n",
" <dt>Master</dt>\n",
" <dd><code>local[*]</code></dd>\n",
" <dt>AppName</dt>\n",
" <dd><code>PySparkShell</code></dd>\n",
" </dl>\n",
" </div>\n",
" \n",
" </div>\n",
" "
],
"text/plain": [
"<pyspark.sql.session.SparkSession at 0x1965e95cb38>"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"spark"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"import pyarrow\n",
"import pandas as pd\n",
"import numpy as np"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Load data from CSV into spark:"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"dfRaw = spark.createDataFrame(pd.read_csv('dataset.csv'))"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------+--------+-----------------+------------------+-------------------+----------+--------------------+\n",
"|Unnamed: 0|DriverId| EventName| Latitude| Longitude|Speed km/h| ts|\n",
"+----------+--------+-----------------+------------------+-------------------+----------+--------------------+\n",
"| 0| 0| Timed Event|34.186630640899025| -118.0881019771853| 64.0|2017-11-01 00:00:...|\n",
"| 1| 0| Distance Event| 34.18605950880732|-118.08924119475836| 53.0|2017-11-01 00:00:...|\n",
"| 2| 0| Distance Event|34.186408023905244|-118.08956044011869| 34.0|2017-11-01 00:00:...|\n",
"| 3| 0| Distance Event|34.187478566679914|-118.08891455921557| 33.0|2017-11-01 00:00:...|\n",
"| 4| 0| Distance Event| 34.18866466110383|-118.08645912965711| 32.0|2017-11-01 00:00:...|\n",
"| 5| 0| Distance Event| 34.18817105525333|-118.08727910558605| 47.0|2017-11-01 00:00:...|\n",
"| 6| 0| Distance Event| 34.18940894903005|-118.08642010277607| 24.0|2017-11-01 00:00:...|\n",
"| 7| 0| Timed Event| 34.18876479856378|-118.08608576529423| 35.0|2017-11-01 00:01:...|\n",
"| 8| 0| Distance Event| 34.189056684982| -118.0851121510116| 43.0|2017-11-01 00:01:...|\n",
"| 9| 0| Distance Event|34.190146024335306|-118.08393470374344| 48.0|2017-11-01 00:01:...|\n",
"| 10| 0| Distance Event|34.189943609632465|-118.08257096283572| 48.0|2017-11-01 00:01:...|\n",
"| 11| 0| Distance Event|34.190821516186425|-118.08359733571626| 27.0|2017-11-01 00:01:...|\n",
"| 12| 0| Distance Event|34.191282246048715|-118.08452045197686| 38.0|2017-11-01 00:01:...|\n",
"| 13| 0| Distance Event|34.191648500900946|-118.08147173394501| 17.0|2017-11-01 00:01:...|\n",
"| 14| 0| Timed Event|34.192749288162524|-118.08590807024687| 37.0|2017-11-01 00:02:...|\n",
"| 15| 0| Distance Event| 34.19068699109516|-118.08287641557119| 19.0|2017-11-01 00:02:...|\n",
"| 16| 0| Distance Event| 34.1925073821591|-118.08247811600988| 18.0|2017-11-01 00:02:...|\n",
"| 17| 0|Engine turned off| 34.19225007080113| -118.0816315782898| 0.0|2017-11-01 00:02:...|\n",
"| 18| 0| Network Event|34.190314759224016|-118.08232660944327| 0.0|2017-11-01 01:02:...|\n",
"| 19| 0| Network Event| 34.19032314285829|-118.08355986871914| 0.0|2017-11-01 02:02:...|\n",
"+----------+--------+-----------------+------------------+-------------------+----------+--------------------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"dfRaw.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Data opreparation"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"75654 samples after removing events that are not relevant for modeling\n",
"Removed NAs\n",
"Number of drivers = 189\n",
"74730 events remained after cleansing\n"
]
}
],
"source": [
"import pyspark.sql.functions as psf\n",
"\n",
"RELEVANT_EVENTS = ['Harsh Acceleration', 'Reached max speed', 'Out of max speed',\n",
" 'Harsh Braking', 'Harsh Turn (motion based)',\n",
" 'Harsh Braking (motion based)', 'Harsh Acceleration (motion based)',\n",
" 'Harsh Turn Left (motion based)', 'Harsh Turn Right (motion based)']\n",
"\n",
"## Filter out unwanted events\n",
"df = dfRaw.filter(dfRaw.EventName.isin(RELEVANT_EVENTS))\n",
"print (\"{} samples after removing events that are not relevant for modeling\".format(df.count()))\n",
"df = df.select(\"DriverId\",\"ts\",\"EventName\",\"Latitude\",\"Longitude\",\"Speed km/h\")\n",
"\n",
"\n",
"####### RUN PER POPULATION SEGMENT #######\n",
"\n",
"## Keep only the drivers in the current segment. This notebook runs multiple times in DataBricks, once for each population segment.\n",
"## Each time we filter the drivers in this segment and perform the analysis on these drivers. \n",
"## It is commented out since we don't have any segments in the sample data.\n",
"\n",
"#populationSegment = Segments.filter('SegmentId == \"' + segmentId + '\"')\n",
"#df = df.join(PopulationSegment,\"DriverId\")\n",
"#print ('number of drivers: {}'.format(populationSegment.count()))\n",
"#print(str(df.count()) + \" Events after segmentation\")\n",
"\n",
"##########################################\n",
"\n",
"# Remove NAs\n",
"df = df.dropna()\n",
"print(\"Removed NAs\")\n",
"\n",
"# Filter out users with too few samples\n",
"minRecordsPerDriver=50\n",
"\n",
"subs = df.groupBy('DriverId').count().filter('count>{}'.format(minRecordsPerDriver))\n",
"print('Number of drivers = {}'.format(subs.count()))\n",
"df = df.join(subs,['DriverId'])\n",
"\n",
"cnt = df.count()\n",
"print(\"{} events remained after cleansing\".format(cnt))\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Calculate total drive distance per driver"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---------+--------+\n",
"| Distance|DriverId|\n",
"+---------+--------+\n",
"|1753.2018| 0|\n",
"| 1918.817| 1|\n",
"|1074.2893| 2|\n",
"| 391.8667| 3|\n",
"| 534.5035| 4|\n",
"|2273.0337| 5|\n",
"| 586.1509| 6|\n",
"|1125.1642| 7|\n",
"|1849.9005| 8|\n",
"|426.77353| 9|\n",
"|485.79166| 10|\n",
"|843.41595| 11|\n",
"|927.64954| 12|\n",
"|149.02852| 13|\n",
"| 2094.504| 14|\n",
"|1325.4692| 15|\n",
"|1341.9535| 16|\n",
"|996.11487| 17|\n",
"|1215.6517| 18|\n",
"| 510.8158| 19|\n",
"+---------+--------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"from pyspark.sql.functions import pandas_udf, PandasUDFType\n",
"import pandas as pd\n",
"import numpy as np\n",
"from pyspark.sql.types import *\n",
"\n",
"schema = StructType([\n",
" StructField(\"Distance\", FloatType()),\n",
" StructField(\"DriverId\", IntegerType())\n",
"\n",
"])\n",
"\n",
"\n",
"def haversine(lon1, lat1, lon2, lat2):\n",
" lon1, lat1, lon2, lat2 = map(np.radians, [lon1, lat1, lon2, lat2])\n",
"\n",
" dlon = lon2 - lon1\n",
" dlat = lat2 - lat1\n",
"\n",
" a = np.sin(dlat/2.0)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2.0)**2\n",
"\n",
" c = 2 * np.arcsin(np.sqrt(a))\n",
" km = 6367 * c\n",
" return km\n",
"\n",
"@pandas_udf(schema,PandasUDFType.GROUPED_MAP)\n",
"def total_distance(oneDriver):\n",
" dist = haversine(oneDriver.Longitude.shift(1), oneDriver.Latitude.shift(1),\n",
" oneDriver.loc[1:, 'Longitude'], oneDriver.loc[1:, 'Latitude'])\n",
" return pd.DataFrame({\"DriverId\":oneDriver['DriverId'].iloc[0],\"Distance\":np.sum(dist)},index = [0])\n",
"\n",
"## Calculate the overall distance made by each subscriber\n",
"distancePerDriver = dfRaw.groupBy('DriverId').apply(total_distance)\n",
"#distancePerDriver = distancePerDriver[distancePerDriver.Distance > 0]\n",
"distancePerDriver.sort('DriverId').show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Feature engineering"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Starting feature engineering with 74730 samples\n",
"189 drivers after join with distances\n"
]
}
],
"source": [
"featureNames = ['Harsh Turn (motion based)','Harsh Acceleration (motion based)','Harsh Braking (motion based)','OverSpeed']\n",
"colNames = featureNames[:]\n",
"colNames.append('DriverId')\n",
"\n",
"print(\"Starting feature engineering with {} samples\".format(df.count()))\n",
"\n",
"## Pivot the data from events to features: 1. Count number of events per type per driver, 2. Pivot into a data frames of drivers X features\n",
"subs = df.groupby('DriverId').count()\n",
"\n",
"eventsPerDriver = df.groupby([\"DriverId\",\"EventName\"]).count()\n",
"\n",
"eventsPerDriver = eventsPerDriver.groupby(\"DriverId\").pivot('EventName').sum('count')\n",
"eventsPerDriver = eventsPerDriver.na.fill(0)\n",
"\n",
"\n",
"## Add total drive duration per driver\n",
"eventsPerDriver = distancePerDriver.join(eventsPerDriver,'DriverId')\n",
"print(\"{} drivers after join with distances\".format(eventsPerDriver.count()))\n",
"## Divide the number of events per subscriber with the total driver duration per subscriber\n",
"for fea in featureNames:\n",
" if fea in eventsPerDriver.columns:\n",
" eventsPerDriver = eventsPerDriver.withColumn(fea,psf.col(fea)/eventsPerDriver.Distance)\n",
" else:\n",
" eventsPerDriver = eventsPerDriver.withColumn(fea,psf.lit(0))\n",
"\n",
"## Keep only feature columns\n",
"eventsPerDriver = eventsPerDriver.select(colNames)\n"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"DataFrame[Harsh Turn (motion based): double, Harsh Acceleration (motion based): double, Harsh Braking (motion based): double, OverSpeed: int, DriverId: int]"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"display(eventsPerDriver)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Switch from a PySpark data frame to a Pandas data frame for the rest of the analysis:"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [],
"source": [
"features = eventsPerDriver.toPandas().set_index('DriverId')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Handle outliers"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"found outlier with factor: 2.5 : [ 20 23 32 54 62 85 142]\n",
"replacing outliers [ 20 23 32 54 62 85 142] with max=0.032507542602764665\n",
"only zero valued values found\n",
"\n"
]
},
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>Harsh Turn (motion based)</th>\n",
" <th>Harsh Acceleration (motion based)</th>\n",
" <th>Harsh Braking (motion based)</th>\n",
" <th>OverSpeed</th>\n",
" </tr>\n",
" <tr>\n",
" <th>DriverId</th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>29</th>\n",
" <td>0.000000</td>\n",
" <td>0.000000</td>\n",
" <td>0.000000</td>\n",
" <td>0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>65</th>\n",
" <td>0.024376</td>\n",
" <td>0.014626</td>\n",
" <td>0.005485</td>\n",
" <td>0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>191</th>\n",
" <td>0.008038</td>\n",
" <td>0.008931</td>\n",
" <td>0.032150</td>\n",
" <td>0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>222</th>\n",
" <td>0.002871</td>\n",
" <td>0.000000</td>\n",
" <td>0.009569</td>\n",
" <td>0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>19</th>\n",
" <td>0.000000</td>\n",
" <td>0.005873</td>\n",
" <td>0.003915</td>\n",
" <td>0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>54</th>\n",
" <td>0.000000</td>\n",
" <td>0.000000</td>\n",
" <td>0.000000</td>\n",
" <td>0</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" Harsh Turn (motion based) Harsh Acceleration (motion based) \\\n",
"DriverId \n",
"29 0.000000 0.000000 \n",
"65 0.024376 0.014626 \n",
"191 0.008038 0.008931 \n",
"222 0.002871 0.000000 \n",
"19 0.000000 0.005873 \n",
"54 0.000000 0.000000 \n",
"\n",
" Harsh Braking (motion based) OverSpeed \n",
"DriverId \n",
"29 0.000000 0 \n",
"65 0.005485 0 \n",
"191 0.032150 0 \n",
"222 0.009569 0 \n",
"19 0.003915 0 \n",
"54 0.000000 0 "
]
},
"execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import scipy.stats as st\n",
"\n",
"def transform_to_normal(x):\n",
" xt = np.zeros(len(x))\n",
" if np.count_nonzero(x) == 0:\n",
" print(\"only zero valued values found\")\n",
" return x\n",
" \n",
" valueGreaterThanZero = np.where(x<=0,0,1)\n",
" positives = x[valueGreaterThanZero == 1]\n",
" if(len(positives)> 0):\n",
" xt[valueGreaterThanZero == 1],_ = st.boxcox(positives+1)\n",
" xt = xt - np.min(xt)\n",
" return xt\n",
"\n",
"def replace_outliers_with_limit(x, stdFactor = 2.5):\n",
" x = x.values\n",
" xt = np.zeros(len(x))\n",
" if np.count_nonzero(x) == 0:\n",
" print(\"only zero valued values found\\n\")\n",
" return x\n",
" \n",
" xt = transform_to_normal(x)\n",
" \n",
" xMean, xStd = np.mean(xt), np.std(xt)\n",
" outliers = np.where(xt > xMean + stdFactor*xStd)[0]\n",
" inliers = np.where(xt <= xMean + stdFactor*xStd)[0]\n",
" if len(outliers) > 0:\n",
" print(\"found outlier with factor: \"+str(stdFactor)+\" : \"+str(outliers))\n",
" xinline = x[inliers]\n",
" maxInRange = np.max(xinline)\n",
" print(\"replacing outliers {} with max={}\".format(outliers,maxInRange))\n",
" vals = x.copy()\n",
" vals[outliers] = maxInRange\n",
" return x\n",
"\n",
"cleanFeatures = features.apply(replace_outliers_with_limit)\n",
"cleanFeatures.head(10)"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"extracting commulative probs for series: Harsh Turn (motion based) length 189\n",
"params = (), 0.0006723457225678595, 0.027154269594805047.\n",
"extracting commulative probs for series: Harsh Turn (motion based) length 189\n",
"params = (), 0.0006723457225678595, 0.027154269594805047.\n",
"extracting commulative probs for series: Harsh Acceleration (motion based) length 189\n",
"params = (), 0.00047724328580577187, 0.010651619847116538.\n",
"extracting commulative probs for series: Harsh Braking (motion based) length 189\n",
"params = (), 0.0012179933230326801, 0.028236240397887587.\n",
"extracting commulative probs for series: OverSpeed length 189\n"
]
},
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>Harsh Turn (motion based)_CDF</th>\n",
" <th>Harsh Acceleration (motion based)_CDF</th>\n",
" <th>Harsh Braking (motion based)_CDF</th>\n",
" <th>OverSpeed_CDF</th>\n",
" </tr>\n",
" <tr>\n",
" <th>DriverId</th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>29</th>\n",
" <td>0.000000</td>\n",
" <td>0.000000</td>\n",
" <td>0.000000</td>\n",
" <td>0.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>65</th>\n",
" <td>0.582276</td>\n",
" <td>0.735071</td>\n",
" <td>0.140243</td>\n",
" <td>0.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>191</th>\n",
" <td>0.237564</td>\n",
" <td>0.547797</td>\n",
" <td>0.665622</td>\n",
" <td>0.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>222</th>\n",
" <td>0.077766</td>\n",
" <td>0.000000</td>\n",
" <td>0.256027</td>\n",
" <td>0.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>19</th>\n",
" <td>0.000000</td>\n",
" <td>0.397437</td>\n",
" <td>0.091106</td>\n",
" <td>0.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>54</th>\n",
" <td>0.000000</td>\n",
" <td>0.000000</td>\n",
" <td>0.000000</td>\n",
" <td>0.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>0.406299</td>\n",
" <td>0.008706</td>\n",
" <td>0.587733</td>\n",
" <td>0.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>113</th>\n",
" <td>0.398448</td>\n",
" <td>0.000000</td>\n",
" <td>0.441974</td>\n",
" <td>0.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>155</th>\n",
" <td>0.186887</td>\n",
" <td>0.052206</td>\n",
" <td>0.379146</td>\n",
" <td>0.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>167</th>\n",
" <td>0.275357</td>\n",
" <td>0.419959</td>\n",
" <td>0.803008</td>\n",
" <td>0.0</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" Harsh Turn (motion based)_CDF \\\n",
"DriverId \n",
"29 0.000000 \n",
"65 0.582276 \n",
"191 0.237564 \n",
"222 0.077766 \n",
"19 0.000000 \n",
"54 0.000000 \n",
"0 0.406299 \n",
"113 0.398448 \n",
"155 0.186887 \n",
"167 0.275357 \n",
"\n",
" Harsh Acceleration (motion based)_CDF \\\n",
"DriverId \n",
"29 0.000000 \n",
"65 0.735071 \n",
"191 0.547797 \n",
"222 0.000000 \n",
"19 0.397437 \n",
"54 0.000000 \n",
"0 0.008706 \n",
"113 0.000000 \n",
"155 0.052206 \n",
"167 0.419959 \n",
"\n",
" Harsh Braking (motion based)_CDF OverSpeed_CDF \n",
"DriverId \n",
"29 0.000000 0.0 \n",
"65 0.140243 0.0 \n",
"191 0.665622 0.0 \n",
"222 0.256027 0.0 \n",
"19 0.091106 0.0 \n",
"54 0.000000 0.0 \n",
"0 0.587733 0.0 \n",
"113 0.441974 0.0 \n",
"155 0.379146 0.0 \n",
"167 0.803008 0.0 "
]
},
"execution_count": 21,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import scipy.stats as st\n",
"\n",
"def extract_cummulative_prob_from_dist(x):\n",
" \n",
" print(\"extracting commulative probs for series:\",x.name,\"length\",len(x))\n",
" #print(x.head())\n",
" \n",
" xPositive = x[x>0]\n",
" xNonPositive = x[x<=0]\n",
" \n",
" ratioOfZeroEvents = len(xNonPositive)/len(x)\n",
" probs = np.zeros(len(x))\n",
" if(len(xPositive)>0):\n",
" params = st.expon.fit(xPositive)\n",
" arg = params[:-2]\n",
" loc = params[-2]\n",
" scale = params[-1]\n",
" print('params = {}, {}, {}.'.format(arg,loc,scale))\n",
" probs[x>0] = st.expon.cdf(xPositive, loc=loc, scale=scale, *arg)\n",
"\n",
" return probs\n",
" \n",
"cdfs = cleanFeatures.apply(extract_cummulative_prob_from_dist).add_suffix(\"_CDF\")\n",
"cdfs.head(10)"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [],
"source": [
"from scipy.stats import rankdata\n",
"\n",
"NUM_OF_FEATURES = 3.0\n",
"\n",
"cdfs['metric'] = cdfs.sum(axis = 1) / NUM_OF_FEATURES\n",
"cdfs = cdfs.sort_values('metric')\n",
"cdfs['rank'] = cdfs.metric.rank(method=\"min\")/len(cdfs)*1.0\n",
"\n",
"finalDF = spark.createDataFrame(cdfs.reset_index())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"finalDF.head(10)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.4"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

Просмотреть файл

@ -1,3 +1,50 @@
# Driver behavior analysis
This code repository holds the jupyter notebooks for estimating driver safety on Pointer's dataset.
It contains a sample of the dataset as well as Python (pandas) and PySpark implementations of the process.
It's best first to go over the python notebook as it contains more details, and then to the pyspark notebook to see the same implementation on pyspark.
### Notebooks:
- Python: https://github.com/omri374/driver_behavior_analysis/blob/master/Driver%20safety%20estimation%20-%20pandas.ipynb
- PySpark: https://github.com/omri374/driver_behavior_analysis/blob/master/Driver%20safety%20estimation%20-%20pyspark.ipynb
### Requirements
- For the python notebook:
- Python > 3.5
- **packages**: numpy (1.14.0 and above), scipy, pandas, seaborn, matplotlib
- For PySpark:
- Python > 3.5
- **packages**: numpy (1.14.0 and above), scipy, pandas, seaborn, matplotlib, pyspark
A code story presenting the entire flow will be uploaded to https://www.microsoft.com/developerblog
### Deployment
When deploying this sample solution you should take note of a few resources provided in the repository:
- databricks/deploy
- 0_azure - an interactive, one-time script that configures your local environment and azure resources to run this solution. It will create a Databricks cluster on azure, install its cli locally on your machine and guide you through the initial configuration.
- 1_configure - Databricks job deployment script and sample job definition file. These resources are intended to be used multiple times, possiblly in an automated way, i.e. in a continues deployment pipeline (CD - see more below).
- The shell script can be used interactively and in batch.
It requires the job definition location or pattern (to iterate over). It can also use parameters for the Databricks region and access token - these are most likely required in for CD as those need to be configured on every run.
- The json file specify settings for creating a new job on Databricks. Things like: name, cluster, notebook location, library dependencies, schedule, etc.
- databricks/notebooks - a sample python notebook to be run on a Databricks cluster
- .travis.yml - an example of how to configure a CI/CD pipeline on [Travis-CI](https://www.travis-ci.org)
#### How to use Travis-CI
The example travis.yml file performs the following actions:
- Install the Databricks CLI
- Deploy to Databricks environment from the master branch (by default) by running the deploy_job.sh script described above.
This action is using a special environment variable $DATABRICKS_TOKEN that is injected from Travis-CI where it's saved securely.
The following image shows where and how this is defined on the configuration screen:
<img src="assets/travis-env-vars.jpg" alt="Travis Environment Variables" width="600px"/>
Note: currently the "test" phase isn't doing much since this can be very specific to the notebook code, Databricks and additional resources related (file storage, event sources, etc.).
### License
MIT
# Contributing

Двоичные данные
assets/travis-env-vars.jpg Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 95 KiB

Просмотреть файл

@ -0,0 +1,60 @@
{
"$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"deployNs": {
"type": "string"
},
"dbricksWorkspaceName": {
"defaultValue": "[concat(parameters('deployNs'), 'dbricks', uniqueString(resourceGroup().id))]",
"type": "string"
},
"dbricksLocation": {
"defaultValue": "[resourceGroup().location]",
"type": "string",
"metadata": {
"description": "Key Vault secret name which references the AD Service Principal password"
}
},
"dbricksTier": {
"defaultValue": "premium",
"type": "string",
"allowedValues": [
"premium",
"standard"
]
}
},
"variables": {
"managedResourceGroupId": "[concat(subscription().id, '/resourceGroups/', variables('managedResourceGroupName'))]",
"managedResourceGroupName": "[concat('databricks-rg-', parameters('dbricksWorkspaceName'), '-', uniqueString(parameters('dbricksWorkspaceName'), resourceGroup().id))]"
},
"resources": [
{
"apiVersion": "2018-04-01",
"location": "[parameters('dbricksLocation')]",
"name": "[parameters('dbricksWorkspaceName')]",
"tags": {
"displayName": "Databricks Workspace"
},
"sku": {
"name": "[parameters('dbricksTier')]"
},
"properties": {
"ManagedResourceGroupId": "[variables('managedResourceGroupId')]"
},
"type": "Microsoft.Databricks/workspaces"
}
],
"outputs": {
"dbricksWorkspaceName": {
"value": "[parameters('dbricksWorkspaceName')]",
"type": "string"
},
"dbricksLocation": {
"value": "[parameters('dbricksLocation')]",
"type": "string"
}
}
}

Просмотреть файл

@ -0,0 +1,9 @@
{
"$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentParameters.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"deployNs": {
"value": "xyz"
}
}
}

Просмотреть файл

@ -0,0 +1,129 @@
#!/bin/bash
# Access granted under MIT Open Source License: https://en.wikipedia.org/wiki/MIT_License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, # and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all copies or substantial portions
# of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
# TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
# CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
#
#
# Description: Deploy ARM template which creates a Databricks account
#
# Usage: ./deploy.sh myResourceGroup "East US 2"
#
# Requirments:
# - User must be logged in to the az cli with the appropriate account set.
# - User must have appropraite permission to deploy to a resource group
# - User must have appropriate permission to create a service principal
set -o errexit
set -o pipefail
set -o nounset
#set -o xtrace
###################
# SETUP
# Check if required utilities are installed
command -v jq >/dev/null 2>&1 || { echo >&2 "I require jq but it's not installed. See https://stedolan.github.io/jq/. Aborting."; exit 1; }
command -v az >/dev/null 2>&1 || { echo >&2 "I require azure cli but it's not installed. See https://bit.ly/2Gc8IsS. Aborting."; exit 1; }
# Globals
timestamp=$(date +%s)
deploy_name="deployment${timestamp}"
env_file="../.env"
# Constants
RED='\033[0;31m'
ORANGE='\033[0;33m'
NC='\033[0m'
# Set path
parent_path=$( cd "$(dirname "${BASH_SOURCE[0]}")" ; pwd -P )
cd "$parent_path"
# Check if user is logged in
[[ -n $(az account show 2> /dev/null) ]] || { echo "Please login via the Azure CLI: "; az login; }
###################
# USER PARAMETERS
rg_name="${1-}"
rg_location="${2-}"
sub_id="${3-}"
while [[ -z $rg_name ]]; do
read -rp "$(echo -e ${ORANGE}"Enter Resource Group name: "${NC})" rg_name
done
while [[ -z $rg_location ]]; do
read -rp "$(echo -e ${ORANGE}"Enter Azure Location (ei. EAST US 2): "${NC})" rg_location
done
while [[ -z $sub_id ]]; do
# Check if user only has one sub
sub_count=$(az account list --output json | jq '. | length')
if (( $sub_count != 1 )); then
az account list -o table
read -rp "$(echo -e ${ORANGE}"Enter Azure Subscription Id you wish to deploy to (enter to use Default): "${NC})" sub_id
fi
# If still empty then user selected IsDefault
if [[ -z $sub_id ]]; then
sub_id=$(az account show --output json | jq -r '.id')
fi
done
# Set account
echo "Deploying to Subscription: $sub_id"
az account set --subscription $sub_id
#####################
# Deploy ARM template
echo "Creating resource group: $rg_name"
az group create --name "$rg_name" --location "$rg_location"
echo "Deploying resources into $rg_name"
arm_output=$(az group deployment create \
--name "$deploy_name" \
--resource-group "$rg_name" \
--template-file "./azuredeploy.json" \
--parameters @"./azuredeploy.parameters.json" \
--output json)
if [[ -z $arm_output ]]; then
echo >&2 "ARM deployment failed."
exit 1
fi
#####################
# Ask user to configure databricks cli
dbi_workspace=$(echo $arm_output | jq -r '.properties.outputs.dbricksWorkspaceName.value')
pip install --upgrade databricks-cli
echo -e "${ORANGE}"
echo "Configure your databricks cli to connect to the newly created Databricks workspace: ${dbi_workspace}. See here for more info: https://bit.ly/2GUwHcw."
databricks configure --token
echo -e "${NC}"
#####################
# Append to .env file
echo "Retrieving configuration information from newly deployed resources."
# Databricks details
dbricks_location=$(echo $arm_output | jq -r '.properties.outputs.dbricksLocation.value')
dbi_token=$(awk '/token/ && NR==3 {print $0;exit;}' ~/.databrickscfg | cut -d' ' -f3)
[[ -n $dbi_token ]] || { echo >&2 "Databricks cli not configured correctly. Please run databricks configure --token. Aborting."; exit 1; }

Просмотреть файл

@ -0,0 +1,119 @@
#!/bin/bash
# Access granted under MIT Open Source License: https://en.wikipedia.org/wiki/MIT_License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, # and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all copies or substantial portions
# of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
# TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
# CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
#
#
# Description: This script is used to deploy and notebook and schedule it as a job.
# It will delete a pre-existing job with the same name and hence can be used in a CI/CD pipeline.
#
# Usage: deploy_job.sh -j "job conf file or pattern" [optional in case databricks isn't preconfigured: -r "region" -t "token"] [optional: -p "profile"]
# Example: deploy_job.sh -j "myjob.json" [-r "westeurope" -t "dapi58349058ea5230482058"] [-p "myDBWorkspace"]
set -o errexit
set -o pipefail
set -o nounset
#set -o xtrace
function config_job() {
# get values from the job config
db_job_name=$(jq -r '.name' "${jobconf}")
job_notebook_path=$(jq -r '.notebook_task.notebook_path' "${jobconf}")
job_notebook_dir=$(jq -r '.notebook_task.notebook_path' "${jobconf}" | cut -d"/" -f2)
job_notebook_name=$(jq -r '.notebook_task.notebook_path' "${jobconf}" | cut -d"/" -f3)
echo " job name: ${db_job_name}"
echo " notebook full path: ${job_notebook_path}"
echo " notebook folder: ${job_notebook_dir}"
echo " notebook name: ${job_notebook_name}"
# create the directory for the notebooks in the workspace
echo "creaing a folder in the workspace"
databricks --profile "${db_cli_profile}" workspace mkdirs "/${job_notebook_dir}/"
# upload notebook
echo "uploading notebook"
databricks --profile "${db_cli_profile}" workspace import "../../notebooks/${job_notebook_name}.py" "${job_notebook_path}" --language python --overwrite
jobs=$(set +o pipefail && databricks --profile "${db_cli_profile}" jobs list | grep "${db_job_name}" | cut -d" " -f1)
for job in $jobs
do
# if the job already exists we should delete it before recreating it.
# it's required since we don't know if the job definition has changed or not
echo "deleting existing job: $job"
databricks --profile "${db_cli_profile}" jobs delete --job-id "${job}"
done
# create the job
echo "creating a new job"
databricks --profile "${db_cli_profile}" jobs create --json-file "${jobconf}"
}
#set path
parent_path=$( cd "$(dirname "${BASH_SOURCE[0]}")" ; pwd -P )
cd "$parent_path"
echo "working direcoty is: $(pwd)"
db_region=""
db_token=""
db_job_conf=""
db_cli_profile="DEFAULT"
# assign command line options to variables
while getopts r:t:j:p: option
do
case "${option}"
in
r) db_region=${OPTARG};;
t) db_token=${OPTARG};;
j) db_job_conf=${OPTARG};;
p) db_cli_profile=${OPTARG};;
esac
done
if [ -z "$db_job_conf" ]; then
echo "Job configuration file wasn't supplied!"
exit 1
fi
# configure databricks authentication
db_conf_file=~/.databrickscfg
if [ ! -f $db_conf_file ]; then
# if the conf file doesn't exist we must have some parameters...
if [ -z "$db_region" ]; then
echo "Cluster region wasn't supplied!"
exit 1
fi
if [ -z "$db_token" ]; then
echo "Access token wasn't supplied!"
exit 1
fi
echo "configurating databricks authentication"
echo "[${db_cli_profile}]" >> $db_conf_file
echo "host = https://${db_region}.azuredatabricks.net" >> $db_conf_file
echo "token = ${db_token}" >> $db_conf_file
echo "" >> $db_conf_file
fi
for jobconf in $db_job_conf; do
echo "start process for job config: ${jobconf}"
config_job
done

Просмотреть файл

@ -0,0 +1,35 @@
{
"name": "PROD - Driver Safety",
"new_cluster": {
"spark_version": "4.1.x-scala2.11",
"node_type_id": "Standard_D3_v2",
"num_workers": 3,
"spark_env_vars": {
"PYSPARK_PYTHON": "/databricks/python3/bin/python3"
}
},
"libraries": [{
"pypi": {
"package": "numpy==1.14.0"
}
},
{
"pypi": {
"package": "scipy==1.1.0"
}
}
],
"email_notifications": {
"on_start": [],
"on_success": [],
"on_failure": []
},
"max_retries": 1,
"schedule": {
"quartz_cron_expression": "0 15 22 ? * *",
"timezone_id": "America/Los_Angeles"
},
"notebook_task": {
"notebook_path": "/PROD/driver_safety"
}
}

Просмотреть файл

@ -0,0 +1,220 @@
# Databricks notebook source
## Verify we have the latest packages
## numpy 1.14.0 is required here. there's a bug with fitting an distribution with older versions of numpy.
import numpy
numpy.__version__
# COMMAND ----------
import pandas as pd
import numpy as np
dfRaw = spark.createDataFrame(pd.read_csv('https://raw.githubusercontent.com/omri374/driver_behavior_analysis/master/dataset.csv'))
# COMMAND ----------
display(dfRaw)
# COMMAND ----------
import pyspark.sql.functions as psf
RELEVANT_EVENTS = ['Harsh Acceleration', 'Reached max speed', 'Out of max speed',
'Harsh Braking', 'Harsh Turn (motion based)',
'Harsh Braking (motion based)', 'Harsh Acceleration (motion based)',
'Harsh Turn Left (motion based)', 'Harsh Turn Right (motion based)']
## Filter out unwanted events
df = dfRaw.filter(dfRaw.EventName.isin(RELEVANT_EVENTS))
print ("{} samples after removing events that are not relevant for modeling".format(df.count()))
df = df.select("DriverId","ts","EventName","Latitude","Longitude","Speed km/h")
####### RUN PER POPULATION SEGMENT #######
## Keep only the drivers in the current segment. This notebook runs multiple times in DataBricks, once for each population segment.
## Each time we filter the drivers in this segment and perform the analysis on these drivers.
## It is commented out since we don't have any segments in the sample data.
#populationSegment = Segments.filter('SegmentId == "' + segmentId + '"')
#df = df.join(PopulationSegment,"DriverId")
#print ('number of drivers: {}'.format(populationSegment.count()))
#print(str(df.count()) + " Events after segmentation")
##########################################
# Remove NAs
df = df.dropna()
print("Removed NAs")
# Filter out users with too few samples
minRecordsPerDriver=50
subs = df.groupBy('DriverId').count().filter('count>{}'.format(minRecordsPerDriver))
print('Number of drivers = {}'.format(subs.count()))
df = df.join(subs,['DriverId'])
cnt = df.count()
print("{} events remained after cleansing".format(cnt))
# COMMAND ----------
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd
import numpy as np
from pyspark.sql.types import *
schema = StructType([
StructField("Distance", FloatType()),
StructField("DriverId", IntegerType())
])
def haversine(lon1, lat1, lon2, lat2):
lon1, lat1, lon2, lat2 = map(np.radians, [lon1, lat1, lon2, lat2])
dlon = lon2 - lon1
dlat = lat2 - lat1
a = np.sin(dlat/2.0)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2.0)**2
c = 2 * np.arcsin(np.sqrt(a))
km = 6367 * c
return km
@pandas_udf(schema,PandasUDFType.GROUPED_MAP)
def total_distance(oneDriver):
dist = haversine(oneDriver.Longitude.shift(1), oneDriver.Latitude.shift(1),
oneDriver.loc[1:, 'Longitude'], oneDriver.loc[1:, 'Latitude'])
return pd.DataFrame({"DriverId":oneDriver['DriverId'].iloc[0],"Distance":np.sum(dist)},index = [0])
## Calculate the overall distance made by each subscriber
distancePerDriver = dfRaw.groupBy('DriverId').apply(total_distance)
#distancePerDriver = distancePerDriver[distancePerDriver.Distance > 0]
distancePerDriver.sort('DriverId').show()
# COMMAND ----------
featureNames = ['Harsh Turn (motion based)','Harsh Acceleration (motion based)','Harsh Braking (motion based)','OverSpeed']
colNames = featureNames[:]
colNames.append('DriverId')
print("Starting feature engineering with {} samples".format(df.count()))
## Pivot the data from events to features: 1. Count number of events per type per driver, 2. Pivot into a data frames of drivers X features
subs = df.groupby('DriverId').count()
eventsPerDriver = df.groupby(["DriverId","EventName"]).count()
eventsPerDriver = eventsPerDriver.groupby("DriverId").pivot('EventName').sum('count')
eventsPerDriver = eventsPerDriver.na.fill(0)
## Add total drive duration per driver
eventsPerDriver = distancePerDriver.join(eventsPerDriver,'DriverId')
print("{} drivers after join with distances".format(eventsPerDriver.count()))
## Divide the number of events per subscriber with the total driver duration per subscriber
for fea in featureNames:
if fea in eventsPerDriver.columns:
eventsPerDriver = eventsPerDriver.withColumn(fea,psf.col(fea)/eventsPerDriver.Distance)
else:
eventsPerDriver = eventsPerDriver.withColumn(fea,psf.lit(0))
## Keep only feature columns
# COMMAND ----------
features = eventsPerDriver.toPandas().set_index('DriverId')
# COMMAND ----------
# COMMAND ----------
import scipy.stats as st
def transform_to_normal(x):
xt = np.zeros(len(x))
if np.count_nonzero(x) == 0:
print("only zero valued values found")
return x
valueGreaterThanZero = np.where(x<=0,0,1)
positives = x[valueGreaterThanZero == 1]
if(len(positives)> 0):
xt[valueGreaterThanZero == 1],_ = st.boxcox(positives+1)
xt = xt - np.min(xt)
return xt
def replace_outliers_with_limit(x, stdFactor = 2.5):
x = x.values
xt = np.zeros(len(x))
if np.count_nonzero(x) == 0:
print("only zero valued values found\n")
return x
xt = transform_to_normal(x)
xMean, xStd = np.mean(xt), np.std(xt)
outliers = np.where(xt > xMean + stdFactor*xStd)[0]
inliers = np.where(xt <= xMean + stdFactor*xStd)[0]
if len(outliers) > 0:
print("found outlier with factor: "+str(stdFactor)+" : "+str(outliers))
xinline = x[inliers]
maxInRange = np.max(xinline)
print("replacing outliers {} with max={}".format(outliers,maxInRange))
vals = x.copy()
vals[outliers] = maxInRange
return x
cleanFeatures = features.apply(replace_outliers_with_limit)
cleanFeatures.head(10)
# COMMAND ----------
import scipy.stats as st
def extract_cummulative_prob_from_dist(x):
print("extracting commulative probs for series:",x.name,"length",len(x))
#print(x.head())
xPositive = x[x>0]
xNonPositive = x[x<=0]
ratioOfZeroEvents = len(xNonPositive)/len(x)
probs = np.zeros(len(x))
if(len(xPositive)>0):
params = st.expon.fit(xPositive)
arg = params[:-2]
loc = params[-2]
scale = params[-1]
#print('params = {}, {}, {}.'.format(arg,loc,scale))
probs[x>0] = st.expon.cdf(xPositive, loc=loc, scale=scale, *arg)
return probs
cdfs = cleanFeatures.apply(extract_cummulative_prob_from_dist).add_suffix("_CDF")
cleanFeatures
# COMMAND ----------
from scipy.stats import rankdata
NUM_OF_FEATURES = 3.0
cdfs['metric'] = cdfs.sum(axis = 1) / NUM_OF_FEATURES
cdfs = cdfs.sort_values('metric')
cdfs['rank'] = cdfs.metric.rank(method="min")/len(cdfs)*1.0
finalDF = spark.createDataFrame(cdfs.reset_index())
# COMMAND ----------
display(finalDF)
# COMMAND ----------
display(finalDF)

899612
dataset.csv Normal file

Разница между файлами не показана из-за своего большого размера Загрузить разницу