Fix issues with duplicate edges in querylog etl

This commit is contained in:
Anthony Miyaguchi 2020-12-23 15:56:49 -08:00
Родитель 0921e9f9ac
Коммит f3116621bb
3 изменённых файлов: 37 добавлений и 15 удалений

Просмотреть файл

@ -5,11 +5,7 @@ from pathlib import Path
import click
from .config import *
from .crawler import (
fetch_dataset_listing,
fetch_table_listing,
resolve_view_references,
)
from .crawler import fetch_dataset_listing, fetch_table_listing, resolve_view_references
from .utils import ensure_folder, ndjson_load, print_json, qualify, run, run_query
ROOT = Path(__file__).parent.parent
@ -52,6 +48,17 @@ def query_logs():
)
def _get_name(obj):
"""Assumes structure in views_references.ndjson"""
return qualify(obj["projectId"], obj["datasetId"], obj["tableId"])
def _deduplicate_edges(edges):
"""Given a list of flat dictionaries, return a deduplicated list."""
edgeset = set([tuple(d.items()) for d in edges])
return [dict(tup) for tup in edgeset]
@cli.command()
def index():
"""Combine all of the files together."""
@ -59,10 +66,6 @@ def index():
data_root = ROOT / "data"
edges = []
def get_name(obj):
"""Assumes structure in views_references.ndjson"""
return qualify(obj["projectId"], obj["datasetId"], obj["tableId"])
for view_ref in data_root.glob("**/*views_references.ndjson"):
rows = ndjson_load(view_ref)
logging.info(
@ -70,10 +73,10 @@ def index():
)
for row in rows:
# TODO: this needs a schema
destination = get_name(row)
destination = _get_name(row)
for referenced in row["query"].get("referencedTables", []):
edges.append(
dict(destination=destination, referenced=get_name(referenced))
dict(destination=destination, referenced=_get_name(referenced))
)
# TODO: hardcoded artifact tied to query_logs command
@ -90,6 +93,8 @@ def index():
)
)
edges = _deduplicate_edges(edges)
# write the file to disk as both csv and json, csv target is gephi compatible
with (data_root / "edges.json").open("w") as fp:
json.dump(edges, fp, indent=2)

Просмотреть файл

@ -10,10 +10,11 @@ RETURNS string AS (
-- case: moz-fx-data-shared-prod:tmp.active_profiles_v1_2020_04_27_6e09b8
-- case: moz-fx-data-shared-prod:telemetry_derived.attitudes_daily_v1$20200314
-- case: moz-fx-data-shared-prod:telemetry_derived.attitudes_daily$20200314
-- Get rid of the date partition if it exists, and then extract everything up to the version part.
-- case: moz-fx-data-shared-prod:telemetry_derived_core_clients_daily_v1_test_aggregation.telemetry_core
-- Get rid of the date partition if it exists in the table name, and then extract everything up to the version part.
-- If the regex fails, just return the name without the partition.
coalesce(
REGEXP_EXTRACT(SPLIT(name, "$")[OFFSET(0)], r"^(.*_v[0-9]+)"),
REGEXP_EXTRACT(SPLIT(name, "$")[OFFSET(0)], r"^(.*:.*\..*_v[0-9]+)"),
SPLIT(name, "$")[OFFSET(0)]
)
);
@ -72,3 +73,12 @@ WHERE
ORDER BY
destination_table,
creation_time
-- SELECT
-- strip_suffix(test)
-- FROM
-- UNNEST([
-- "moz-fx-data-shared-prod:tmp.active_profiles_v1_2020_04_27_6e09b8",
-- "moz-fx-data-shared-prod:telemetry_derived.attitudes_daily_v1$20200314",
-- "moz-fx-data-shared-prod:telemetry_derived.attitudes_daily$20200314",
-- "moz-fx-data-shared-prod:telemetry_derived_core_clients_daily_v1_test_aggregation.telemetry_core"
-- ]) AS test

Просмотреть файл

@ -26,17 +26,24 @@
async function main() {
let edges = await fetch("data/edges.json").then((resp) => resp.json());
var nodes = new Set();
let nodes = new Set();
for (i = 0; i < edges.length; i++) {
nodes.add(edges[i].destination);
nodes.add(edges[i].referenced);
}
// datasets are nodes too now, but assigned to a different group
var datasets = new Set();
let datasets = new Set();
nodes.forEach((name) => {
datasets.add(getDatasetId(name));
});
// check for intersection, which will break network visualization
let intersect = new Set([...nodes].filter(n => datasets.has(n)));
if (intersect.size > 0) {
console.log("intersection between nodes and datasets found");
console.log(intersect);
}
let nodeMap = new Map(
[...nodes, ...datasets].map((el, idx) => [el, idx])
);