diff --git a/analysis/TAARLogMunge.ipynb b/analysis/TAARLogMunge.ipynb index 4bd5580..f432dde 100644 --- a/analysis/TAARLogMunge.ipynb +++ b/analysis/TAARLogMunge.ipynb @@ -1 +1 @@ -{"cells":[{"cell_type":"code","source":["import pyspark.sql.functions as F\nimport datetime as dt\n\nimport ast\nimport boto3\nimport json\nimport re"],"metadata":{},"outputs":[],"execution_count":1},{"cell_type":"code","source":["DATA_LOCATION = \"s3://net-mozaws-prod-us-west-2-pipeline-analysis/taar-api-logs-daily/\""],"metadata":{},"outputs":[],"execution_count":2},{"cell_type":"code","source":["# Parse the TAAR application logs from s3 source.\ntaar_logs = sqlContext\\\n .read.format(\"com.databricks.spark.csv\")\\\n .option(\"header\", \"true\")\\\n .option(\"inferschema\", \"true\")\\\n .option(\"mode\", \"DROPMALFORMED\")\\\n .load(DATA_LOCATION)"],"metadata":{},"outputs":[],"execution_count":3},{"cell_type":"code","source":["# Display log file schema.\nprint(taar_logs.schema)\n# Display one exampel row of log data.\nprint(\"\\n\" + str(taar_logs.take(1)))"],"metadata":{},"outputs":[],"execution_count":4},{"cell_type":"code","source":["# Convert text timestamp to actual timestamp object.\ntime_format = \"yyyy-MM-dd HH:mm:ss.SSS\"\ntaar_logs_timestamps = taar_logs.withColumn(\"parsed_time\", F.to_timestamp(\"timestamp\", time_format)\n .cast(\"double\")\n .cast(\"timestamp\")).drop(\"timestamp\")\n\nprint(taar_logs_timestamps.schema)\nprint(\"\\n\")\nprint(taar_logs_timestamps.take(1))"],"metadata":{},"outputs":[],"execution_count":5},{"cell_type":"code","source":["# Define a utility for writing results of this analysis to an accessible s3 bucket.\ndef write_to_s3(bucket_name, filename, data, aws_access_key_id=None, aws_secret_access_key=None):\n \"\"\" write list as CSV to s3\n params: bucket_name, str, name of bucket\n filename, str, name of file (prefix + file name)\n return: nothing\n \"\"\"\n s3 = boto3.Session(aws_access_key_id=aws_access_key_id,\n aws_secret_access_key=aws_secret_access_key).resource('s3')\n obj = s3.Object(bucket_name, filename)\n obj.put(Body=json.dumps(data, ensure_ascii=False).encode('utf8'))"],"metadata":{},"outputs":[],"execution_count":6},{"cell_type":"code","source":["def is_log_type_recommendation(r):\n return \"taar.recommenders.\" in r[\"type\"]\n \ndef is_log_type_ensemble(r):\n return \"ensemble_recommender\" in r[\"type\"]\n\ndef valid_uuid_as_field(r):\n reg_comp = re.compile(\"[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}\");\n return reg_comp.findall(r['fields'])\n\ndef manual_dedup(p):\n zes = \"00000000-0000-0000-0000-000000000000\"\n a = set()\n for c in p:\n if len(c) == 1:\n if c != zes:\n a |= set(c)\n else:\n for g in c:\n if g != zes:\n a |= set(g)\n uuid_list = list(a)\n return uuid_list"],"metadata":{},"outputs":[],"execution_count":7},{"cell_type":"code","source":["# Filter out log data from outside experiment time\n# 2018-03-12 begin date\n# 2018-04-18 end date\nprint(\"lines of log data for TAAR service: \" + str(taar_logs_timestamps.count()))\ntaar_logs_time_filtered = taar_logs_timestamps.where((taar_logs_timestamps.parsed_time > dt.datetime(2018, 3, 12, 0, 0, 0)) & (taar_logs_timestamps.parsed_time < dt.datetime(2018, 4, 23, 0, 0, 0)))\nprint(\"lines of log data after date filtering to study period: \" + str(taar_logs_time_filtered.count()))"],"metadata":{},"outputs":[],"execution_count":8},{"cell_type":"code","source":["# Find clients that had data retrieval failures\ndef is_dynamo_interaction(p):\n return 'taar.adapters.dynamo' in p[\"type\"]\n\ndef is_client_data_fail(p):\n return \"message=Error loading client data for\" in p[\"fields\"]\n \nclients_with_lookup_fail = taar_logs_time_filtered.rdd\\\n .filter(lambda p: is_dynamo_interaction(p))\\\n .filter(lambda p: is_client_data_fail(p))\\\n .map(lambda p: valid_uuid_as_field(p))\n\nprint(\"number of failed client lookups: \" + str(clients_with_lookup_fail.count()))\n\nunique_output_failed_lookup_clientIDs = clients_with_lookup_fail.toDF().distinct().collect()\nprint(\"post deduplication: \" + str(len(unique_output_failed_lookup_clientIDs)))\n\n# write the blacklist\nwrite_to_s3(\"net-mozaws-prod-us-west-2-pipeline-analysis\", \"failed_dynamo_clients.csv\", unique_output_failed_lookup_clientIDs)"],"metadata":{},"outputs":[],"execution_count":9},{"cell_type":"code","source":["def is_linear_recomender(p):\n return 'taar.recommenders.recommendation_manager' in p[\"type\"]\n\n# Find clients successfully served by linear\nclient_ids_linear_serves = taar_logs_time_filtered.rdd\\\n .filter(lambda p: not is_dynamo_interaction(p))\\\n .filter(lambda p: not is_client_data_fail(p))\\\n .filter(lambda p: is_linear_recomender(p))\\\n .map(lambda p: valid_uuid_as_field(p))\n \nprint(\"number of linear taar service events: \" + str(client_ids_linear_serves.count()))\nunique_client_ids_linear_serves = client_ids_linear_serves.collect()\n\nunique_client_ids_linear_serves = manual_dedup(unique_client_ids_linear_serves)\nprint(\"unique clients served by linear taar: \" + str(len(unique_client_ids_linear_serves)))\n\nwrite_to_s3(\"net-mozaws-prod-us-west-2-pipeline-analysis\", \"clients_served_linear.csv\", unique_client_ids_linear_serves)"],"metadata":{},"outputs":[],"execution_count":10},{"cell_type":"code","source":["def is_ensemble_recommender(p):\n return 'recommenders.ensemble_recommender' in p[\"type\"]\n\ndef valid_ensemble_uuid(p):\n reg_comp = re.compile(\"message=client_id: \\\\[\")\n txt = reg_comp.split(p['fields'])\n return txt[1][0:36]\n \n# find clients successfully served by ensemble\nclient_ids_ensemble_serves = taar_logs_time_filtered.rdd\\\n .filter(lambda p: not is_dynamo_interaction(p))\\\n .filter(lambda p: not is_client_data_fail(p))\\\n .filter(lambda p: is_ensemble_recommender(p))\\\n .map(lambda p: valid_ensemble_uuid(p))\n \nprint(\"number of ensemble taar service events: \" + str(client_ids_ensemble_serves.count()))\n\nunique_client_ids_ensemble_serves = list(set(client_ids_ensemble_serves.collect()))\nprint(\"unique clients served by ensemble taar: \" + str(len(unique_client_ids_ensemble_serves)))\n\nwrite_to_s3(\"net-mozaws-prod-us-west-2-pipeline-analysis\", \"clients_served_ensemble.csv\", unique_client_ids_ensemble_serves)"],"metadata":{},"outputs":[],"execution_count":11},{"cell_type":"code","source":["def extract_rec_guids(r): \n if \"message=client_id:\" in r['fields']:\n l1 = re.compile(\"\\\\[\\\\[\").split(r['fields'])\n if len(l1[1]) >= 1:\n l2 = re.compile(\"\\\\]\\\\]\").split(l1[1])\n return ast.literal_eval(\"[\" + l2[0] + \"]\")\n else:\n return None\n else:\n return None\n\ndef has_guid_recs(r):\n guids = extract_rec_guids(r)\n return guids\n"],"metadata":{},"outputs":[],"execution_count":12},{"cell_type":"code","source":["taar_logs_time_filtered.take(100)"],"metadata":{},"outputs":[],"execution_count":13},{"cell_type":"code","source":[""],"metadata":{},"outputs":[],"execution_count":14}],"metadata":{"name":"taar_log_munge","notebookId":10421},"nbformat":4,"nbformat_minor":0} +{"cells":[{"cell_type":"code","source":["import pyspark.sql.functions as F\nimport datetime as dt\n\nimport ast\nimport boto3\nimport json\nimport re"],"metadata":{},"outputs":[],"execution_count":1},{"cell_type":"code","source":["DATA_LOCATION = \"s3://net-mozaws-prod-us-west-2-pipeline-analysis/taar-api-logs-daily/\""],"metadata":{},"outputs":[],"execution_count":2},{"cell_type":"code","source":["# Parse the TAAR application logs from s3 source.\ntaar_logs = sqlContext\\\n .read.format(\"com.databricks.spark.csv\")\\\n .option(\"header\", \"true\")\\\n .option(\"inferschema\", \"true\")\\\n .option(\"mode\", \"DROPMALFORMED\")\\\n .load(DATA_LOCATION)"],"metadata":{},"outputs":[],"execution_count":3},{"cell_type":"code","source":["# Display log file schema.\nprint(taar_logs.schema)\n# Display one exampel row of log data.\nprint(\"\\n\" + str(taar_logs.take(1)))"],"metadata":{},"outputs":[],"execution_count":4},{"cell_type":"code","source":["# Convert text timestamp to actual timestamp object.\ntime_format = \"yyyy-MM-dd HH:mm:ss.SSS\"\ntaar_logs_timestamps = taar_logs.withColumn(\"parsed_time\", F.to_timestamp(\"timestamp\", time_format)\n .cast(\"double\")\n .cast(\"timestamp\")).drop(\"timestamp\")\n\nprint(taar_logs_timestamps.schema)\nprint(\"\\n\")\nprint(taar_logs_timestamps.take(1))"],"metadata":{},"outputs":[],"execution_count":5},{"cell_type":"code","source":["# Define a utility for writing results of this analysis to an accessible s3 bucket.\ndef write_to_s3(bucket_name, filename, data, aws_access_key_id=None, aws_secret_access_key=None):\n \"\"\" write list as CSV to s3\n params: bucket_name, str, name of bucket\n filename, str, name of file (prefix + file name)\n return: nothing\n \"\"\"\n s3 = boto3.Session(aws_access_key_id=aws_access_key_id,\n aws_secret_access_key=aws_secret_access_key).resource('s3')\n obj = s3.Object(bucket_name, filename)\n obj.put(Body=json.dumps(data, ensure_ascii=False).encode('utf8'))"],"metadata":{},"outputs":[],"execution_count":6},{"cell_type":"code","source":["def is_log_type_recommendation(r):\n return \"taar.recommenders.\" in r[\"type\"]\n \ndef is_log_type_ensemble(r):\n return \"ensemble_recommender\" in r[\"type\"]\n\ndef valid_uuid_as_field(r):\n reg_comp = re.compile(\"[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}\");\n return reg_comp.findall(r['fields'])\n\ndef manual_dedup(p):\n zes = \"00000000-0000-0000-0000-000000000000\"\n a = set()\n for c in p:\n if len(c) == 1:\n if c != zes:\n a |= set(c)\n else:\n for g in c:\n if g != zes:\n a |= set(g)\n uuid_list = list(a)\n return uuid_list"],"metadata":{},"outputs":[],"execution_count":7},{"cell_type":"code","source":["# Filter out log data from outside experiment time\n# 2018-03-12 begin date\n# 2018-04-23 end date\nprint(\"lines of log data for TAAR service: \" + str(taar_logs_timestamps.count()))\ntaar_logs_time_filtered = taar_logs_timestamps.where((taar_logs_timestamps.parsed_time > dt.datetime(2018, 3, 12, 0, 0, 0)) & (taar_logs_timestamps.parsed_time < dt.datetime(2018, 4, 23, 0, 0, 0)))\nprint(\"lines of log data after date filtering to study period: \" + str(taar_logs_time_filtered.count()))"],"metadata":{},"outputs":[],"execution_count":8},{"cell_type":"code","source":["# Find clients that had data retrieval failures\ndef is_dynamo_interaction(p):\n return 'taar.adapters.dynamo' in p[\"type\"]\n\ndef is_client_data_fail(p):\n return \"message=Error loading client data for\" in p[\"fields\"]\n \nclients_with_lookup_fail = taar_logs_time_filtered.rdd\\\n .filter(lambda p: is_dynamo_interaction(p))\\\n .filter(lambda p: is_client_data_fail(p))\\\n .map(lambda p: valid_uuid_as_field(p))\n\nprint(\"number of failed client lookups: \" + str(clients_with_lookup_fail.count()))\n\nunique_output_failed_lookup_clientIDs = clients_with_lookup_fail.toDF().distinct().collect()\nprint(\"post deduplication: \" + str(len(unique_output_failed_lookup_clientIDs)))\n\n# write the blacklist\nwrite_to_s3(\"net-mozaws-prod-us-west-2-pipeline-analysis\", \"failed_dynamo_clients.csv\", unique_output_failed_lookup_clientIDs)"],"metadata":{},"outputs":[],"execution_count":9},{"cell_type":"code","source":["def is_linear_recomender(p):\n return 'taar.recommenders.recommendation_manager' in p[\"type\"]\n\n# Find clients successfully served by linear\nclient_ids_linear_serves = taar_logs_time_filtered.rdd\\\n .filter(lambda p: not is_dynamo_interaction(p))\\\n .filter(lambda p: not is_client_data_fail(p))\\\n .filter(lambda p: is_linear_recomender(p))\\\n .map(lambda p: valid_uuid_as_field(p))\n \nprint(\"number of linear taar service events: \" + str(client_ids_linear_serves.count()))\nunique_client_ids_linear_serves = client_ids_linear_serves.collect()\n\nunique_client_ids_linear_serves = manual_dedup(unique_client_ids_linear_serves)\nprint(\"unique clients served by linear taar: \" + str(len(unique_client_ids_linear_serves)))\n\nwrite_to_s3(\"net-mozaws-prod-us-west-2-pipeline-analysis\", \"clients_served_linear.csv\", unique_client_ids_linear_serves)"],"metadata":{},"outputs":[],"execution_count":10},{"cell_type":"code","source":["def is_ensemble_recommender(p):\n return 'recommenders.ensemble_recommender' in p[\"type\"]\n\ndef valid_ensemble_uuid(p):\n reg_comp = re.compile(\"message=client_id: \\\\[\")\n txt = reg_comp.split(p['fields'])\n return txt[1][0:36]\n \n# find clients successfully served by ensemble\nclient_ids_ensemble_serves = taar_logs_time_filtered.rdd\\\n .filter(lambda p: not is_dynamo_interaction(p))\\\n .filter(lambda p: not is_client_data_fail(p))\\\n .filter(lambda p: is_ensemble_recommender(p))\\\n .map(lambda p: valid_ensemble_uuid(p))\n \nprint(\"number of ensemble taar service events: \" + str(client_ids_ensemble_serves.count()))\n\nunique_client_ids_ensemble_serves = list(set(client_ids_ensemble_serves.collect()))\nprint(\"unique clients served by ensemble taar: \" + str(len(unique_client_ids_ensemble_serves)))\n\nwrite_to_s3(\"net-mozaws-prod-us-west-2-pipeline-analysis\", \"clients_served_ensemble.csv\", unique_client_ids_ensemble_serves)"],"metadata":{},"outputs":[],"execution_count":11},{"cell_type":"code","source":["def extract_rec_guids(r): \n if \"message=client_id:\" in r['fields']:\n l1 = re.compile(\"\\\\[\\\\[\").split(r['fields'])\n if len(l1[1]) >= 1:\n l2 = re.compile(\"\\\\]\\\\]\").split(l1[1])\n return ast.literal_eval(\"[\" + l2[0] + \"]\")\n else:\n return None\n else:\n return None\n\ndef has_guid_recs(r):\n guids = extract_rec_guids(r)\n return guids\n"],"metadata":{},"outputs":[],"execution_count":12},{"cell_type":"code","source":["taar_logs_time_filtered.take(100)"],"metadata":{},"outputs":[],"execution_count":13},{"cell_type":"code","source":[""],"metadata":{},"outputs":[],"execution_count":14}],"metadata":{"name":"taar_log_munge","notebookId":10421},"nbformat":4,"nbformat_minor":0}