taar/analysis/TAARLogMunge.ipynb

308 строки
11 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import pyspark.sql.functions as F\n",
"import datetime as dt\n",
"import ast\n",
"import boto3\n",
"import json\n",
"import re"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"DATA_LOCATION = \"s3://net-mozaws-prod-us-west-2-pipeline-analysis/taar-api-logs-daily/\""
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Parse the TAAR application logs from s3 source.\n",
"taar_logs = sqlContext\\\n",
" .read.format(\"com.databricks.spark.csv\")\\\n",
" .option(\"header\", \"true\")\\\n",
" .option(\"inferschema\", \"true\")\\\n",
" .option(\"mode\", \"DROPMALFORMED\")\\\n",
" .load(DATA_LOCATION)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"StructType(List(StructField(timestamp,StringType,true),StructField(severity,IntegerType,true),StructField(type,StringType,true),StructField(fields,StringType,true),StructField(date,StringType,true)))\n",
"\n",
"[Row(timestamp=u'2018-03-30 00:00:23.000', severity=6, type=u'taar.recommenders.ensemble_recommender', fields=u\"{message=client_id: [00000000-0000-0000-0000-000000000000], ensemble_weight: [{'similarity': 0.09216174, 'collaborative': 2.16759527, 'legacy': 0.05516607, 'locale': 2.09866473}], guids: [['uBlock0@raymondhill.net', '{73a6fe31-595d-460b-a920-fcc0f8843232}', 'firefox@ghostery.com', 'firefoxdav@icloud.com', 'ich@maltegoetz.de', 'idsafe@norton.com', 'nortonsafeweb@symantec.com', '{d04b0b40-3dab-4f0b-97a6-04ec3eddbfb0}', 'artur.dubovoy@gmail.com', '{a0d7ccb3-214d-498b-b4aa-0e8fda9a7bf7}']], recommender=null, client_id=null, lang=null, limit=null, num_recommendations=null, maximum_similarity=null}\", date=u'2018-03-29')]\n"
]
}
],
"source": [
"# Display log file schema.\n",
"print(taar_logs.schema)\n",
"# Display one exampel row of log data.\n",
"print(\"\\n\" + str(taar_logs.take(1)))"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"StructType(List(StructField(severity,IntegerType,true),StructField(type,StringType,true),StructField(fields,StringType,true),StructField(date,StringType,true),StructField(parsed_time,TimestampType,true)))\n",
"\n",
"\n",
"[Row(severity=6, type=u'taar.recommenders.ensemble_recommender', fields=u\"{message=client_id: [00000000-0000-0000-0000-000000000000], ensemble_weight: [{'similarity': 0.09216174, 'collaborative': 2.16759527, 'legacy': 0.05516607, 'locale': 2.09866473}], guids: [['uBlock0@raymondhill.net', '{73a6fe31-595d-460b-a920-fcc0f8843232}', 'firefox@ghostery.com', 'firefoxdav@icloud.com', 'ich@maltegoetz.de', 'idsafe@norton.com', 'nortonsafeweb@symantec.com', '{d04b0b40-3dab-4f0b-97a6-04ec3eddbfb0}', 'artur.dubovoy@gmail.com', '{a0d7ccb3-214d-498b-b4aa-0e8fda9a7bf7}']], recommender=null, client_id=null, lang=null, limit=null, num_recommendations=null, maximum_similarity=null}\", date=u'2018-03-29', parsed_time=datetime.datetime(2018, 3, 30, 0, 0, 23))]\n"
]
}
],
"source": [
"# Convert text timestamp to actual timestamp object.\n",
"time_format = \"yyyy-MM-dd HH:mm:ss.SSS\"\n",
"taar_logs_timestamps = taar_logs.withColumn(\"parsed_time\", F.to_timestamp(\"timestamp\", time_format)\n",
" .cast(\"double\")\n",
" .cast(\"timestamp\")).drop(\"timestamp\")\n",
"\n",
"print(taar_logs_timestamps.schema)\n",
"print(\"\\n\")\n",
"print(taar_logs_timestamps.take(1))"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Define a utility for writing results of this analysis to an accessible s3 bucket.\n",
"def write_to_s3(bucket_name, filename, data, aws_access_key_id=None, aws_secret_access_key=None):\n",
" \"\"\" write list as CSV to s3\n",
" params: bucket_name, str, name of bucket\n",
" filename, str, name of file (prefix + file name)\n",
" return: nothing\n",
" \"\"\"\n",
" s3 = boto3.Session(aws_access_key_id=aws_access_key_id,\n",
" aws_secret_access_key=aws_secret_access_key).resource('s3')\n",
" obj = s3.Object(bucket_name, filename)\n",
" obj.put(Body=json.dumps(data, ensure_ascii=False).encode('utf8'))"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def is_log_type_recommendation(r):\n",
" return \"taar.recommenders.\" in r[\"type\"]\n",
" \n",
"def is_log_type_ensemble(r):\n",
" return \"ensemble_recommender\" in r[\"type\"]\n",
"\n",
"def valid_uuid_as_field(r):\n",
" reg_comp = re.compile(\"[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}\");\n",
" return reg_comp.findall(r['fields'])\n",
"\n",
"def manual_dedup(p):\n",
" zes = \"00000000-0000-0000-0000-000000000000\"\n",
" a = set()\n",
" for c in p:\n",
" if len(c) == 1:\n",
" if c != zes:\n",
" a |= set(c)\n",
" else:\n",
" for g in c:\n",
" if g != zes:\n",
" a |= set(g)\n",
" uuid_list = list(a)\n",
" return uuid_list"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"lines of log data for TAAR service: 903766\n",
"lines of log data after date filtering to study period: 807734\n"
]
}
],
"source": [
"# Filter out log data from outside experiment time\n",
"# 2018-03-12 begin date\n",
"# 2018-04-18 end date\n",
"print(\"lines of log data for TAAR service: \" + str(taar_logs_timestamps.count()))\n",
"taar_logs_time_filtered = taar_logs_timestamps.where((taar_logs_timestamps.parsed_time > dt.datetime(2018, 3, 12, 0, 0, 0)) & (taar_logs_timestamps.parsed_time < dt.datetime(2018, 4, 23, 0, 0, 0)))\n",
"print(\"lines of log data after date filtering to study period: \" + str(taar_logs_time_filtered.count()))"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"number of failed client lookups: 24470\n",
"post deduplication: 21859\n"
]
}
],
"source": [
"# Find clients that had data retrieval failures\n",
"def is_dynamo_interaction(p):\n",
" return 'taar.adapters.dynamo' in p[\"type\"]\n",
"\n",
"def is_client_data_fail(p):\n",
" return \"message=Error loading client data for\" in p[\"fields\"]\n",
" \n",
"clients_with_lookup_fail = taar_logs_time_filtered.rdd\\\n",
" .filter(lambda p: is_dynamo_interaction(p))\\\n",
" .filter(lambda p: is_client_data_fail(p))\\\n",
" .map(lambda p: valid_uuid_as_field(p))\n",
"\n",
"print(\"number of failed client lookups: \" + str(clients_with_lookup_fail.count()))\n",
"\n",
"unique_output_failed_lookup_clientIDs = clients_with_lookup_fail.toDF().distinct().collect()\n",
"print(\"post deduplication: \" + str(len(unique_output_failed_lookup_clientIDs)))\n",
"\n",
"# write the blacklist\n",
"write_to_s3(\"net-mozaws-prod-us-west-2-pipeline-analysis\", \"failed_dynamo_clients.csv\", unique_output_failed_lookup_clientIDs)"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"number of linear taar service events: 471583\n",
"unique clients served by linear taar: 175911\n"
]
}
],
"source": [
"def is_linear_recomender(p):\n",
" return 'taar.recommenders.recommendation_manager' in p[\"type\"]\n",
"\n",
"# Find clients successfully served by linear\n",
"client_ids_linear_serves = taar_logs_time_filtered.rdd\\\n",
" .filter(lambda p: not is_dynamo_interaction(p))\\\n",
" .filter(lambda p: not is_client_data_fail(p))\\\n",
" .filter(lambda p: is_linear_recomender(p))\\\n",
" .map(lambda p: valid_uuid_as_field(p))\n",
" \n",
"print(\"number of linear taar service events: \" + str(client_ids_linear_serves.count()))\n",
"unique_client_ids_linear_serves = client_ids_linear_serves.collect()\n",
"\n",
"unique_client_ids_linear_serves = manual_dedup(unique_client_ids_linear_serves)\n",
"print(\"unique clients served by linear taar: \" + str(len(unique_client_ids_linear_serves)))\n",
"\n",
"write_to_s3(\"net-mozaws-prod-us-west-2-pipeline-analysis\", \"clients_served_linear.csv\", unique_client_ids_linear_serves)"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"number of ensemble taar service events: 287211\n",
"unique clients served by ensemble taar: 175321\n"
]
}
],
"source": [
"def is_ensemble_recommender(p):\n",
" return 'recommenders.ensemble_recommender' in p[\"type\"]\n",
"\n",
"def valid_ensemble_uuid(p):\n",
" reg_comp = re.compile(\"message=client_id: \\\\[\")\n",
" txt = reg_comp.split(p['fields'])\n",
" return txt[1][0:36]\n",
" \n",
"# find clients successfully served by ensemble\n",
"client_ids_ensemble_serves = taar_logs_time_filtered.rdd\\\n",
" .filter(lambda p: not is_dynamo_interaction(p))\\\n",
" .filter(lambda p: not is_client_data_fail(p))\\\n",
" .filter(lambda p: is_ensemble_recommender(p))\\\n",
" .map(lambda p: valid_ensemble_uuid(p))\n",
" \n",
"print(\"number of ensemble taar service events: \" + str(client_ids_ensemble_serves.count()))\n",
"\n",
"unique_client_ids_ensemble_serves = list(set(client_ids_ensemble_serves.collect()))\n",
"print(\"unique clients served by ensemble taar: \" + str(len(unique_client_ids_ensemble_serves)))\n",
"\n",
"write_to_s3(\"net-mozaws-prod-us-west-2-pipeline-analysis\", \"clients_served_ensemble.csv\", unique_client_ids_ensemble_serves)"
]
}
],
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.13"
},
"name": "taar_log_munge",
"notebookId": 10421
},
"nbformat": 4,
"nbformat_minor": 1
}