Generate LookML inside View classes
This commit is contained in:
Родитель
be0d398121
Коммит
8bc8be6b32
|
@ -1,7 +1,6 @@
|
|||
"""All possible generated explores."""
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, Iterator, List
|
||||
|
||||
from .views import PingView, View
|
||||
|
|
|
@ -1,9 +1,7 @@
|
|||
"""Generate lookml from namespaces."""
|
||||
import logging
|
||||
import re
|
||||
from itertools import filterfalse
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Iterable, List, Tuple, TypedDict
|
||||
from typing import Dict, Iterable
|
||||
|
||||
import click
|
||||
import lkml
|
||||
|
@ -11,174 +9,17 @@ import yaml
|
|||
from google.cloud import bigquery
|
||||
|
||||
from .explores import explore_types
|
||||
from .views import GrowthAccountingView, View, view_types
|
||||
|
||||
BIGQUERY_TYPE_TO_DIMENSION_TYPE = {
|
||||
"BIGNUMERIC": "string",
|
||||
"BOOLEAN": "yesno",
|
||||
"BYTES": "string",
|
||||
"DATE": "time",
|
||||
"DATETIME": "time",
|
||||
"FLOAT": "number",
|
||||
"INTEGER": "number",
|
||||
"NUMERIC": "number",
|
||||
"STRING": "string",
|
||||
"TIME": "time",
|
||||
"TIMESTAMP": "time",
|
||||
}
|
||||
|
||||
HIDDEN_DIMENSIONS = {
|
||||
("document_id",),
|
||||
("client_id",),
|
||||
("client_info", "client_id"),
|
||||
}
|
||||
|
||||
MAP_LAYER_NAMES = {
|
||||
("country",): "countries",
|
||||
("metadata", "geo", "country"): "countries",
|
||||
}
|
||||
|
||||
|
||||
def _get_dimension(path: Tuple[str, ...], field_type: str, mode: str) -> Dict[str, Any]:
|
||||
result: Dict[str, Any] = {}
|
||||
result["sql"] = "${TABLE}." + ".".join(path)
|
||||
name = path
|
||||
if mode == "REPEATED" or path in HIDDEN_DIMENSIONS:
|
||||
result["hidden"] = "yes"
|
||||
else:
|
||||
result["type"] = BIGQUERY_TYPE_TO_DIMENSION_TYPE[field_type]
|
||||
if result["type"] == "time":
|
||||
# Remove _{type} suffix from the last path element for dimension group
|
||||
# names For example submission_date and submission_timestamp become
|
||||
# submission, and metadata.header.parsed_date becomes
|
||||
# metadata__header__parsed. This is because the timeframe will add a _{type}
|
||||
# suffix to the individual dimension names.
|
||||
name = *path[:-1], re.sub("_(date|time(stamp)?)$", "", path[-1])
|
||||
result["timeframes"] = [
|
||||
"raw",
|
||||
"time",
|
||||
"date",
|
||||
"week",
|
||||
"month",
|
||||
"quarter",
|
||||
"year",
|
||||
]
|
||||
if field_type == "DATE":
|
||||
result["timeframes"].remove("time")
|
||||
result["convert_tz"] = "no"
|
||||
result["datatype"] = "date"
|
||||
if len(path) > 1:
|
||||
result["group_label"] = " ".join(path[:-1]).replace("_", " ").title()
|
||||
result["group_item_label"] = path[-1].replace("_", " ").title()
|
||||
if path in MAP_LAYER_NAMES:
|
||||
result["map_layer_name"] = MAP_LAYER_NAMES[path]
|
||||
result["name"] = "__".join(name)
|
||||
return result
|
||||
|
||||
|
||||
def _generate_dimensions_helper(
|
||||
schema: List[bigquery.SchemaField], *prefix: str
|
||||
) -> Iterable[dict]:
|
||||
for field in sorted(schema, key=lambda f: f.name):
|
||||
if field.field_type == "RECORD" and not field.mode == "REPEATED":
|
||||
yield from _generate_dimensions_helper(field.fields, *prefix, field.name)
|
||||
else:
|
||||
yield _get_dimension((*prefix, field.name), field.field_type, field.mode)
|
||||
|
||||
|
||||
def _generate_dimensions(client: bigquery.Client, table: str) -> List[Dict[str, Any]]:
|
||||
"""Generate dimensions and dimension groups from a bigquery table.
|
||||
|
||||
When schema contains both submission_timestamp and submission_date, only produce
|
||||
a dimension group for submission_timestamp.
|
||||
|
||||
Raise ClickException if schema results in duplicate dimensions.
|
||||
"""
|
||||
dimensions = {}
|
||||
for dimension in _generate_dimensions_helper(client.get_table(table).schema):
|
||||
name = dimension["name"]
|
||||
# overwrite duplicate "submission" dimension group, thus picking the
|
||||
# last value sorted by field name, which is submission_timestamp
|
||||
if name in dimensions and name != "submission":
|
||||
raise click.ClickException(
|
||||
f"duplicate dimension {name!r} for table {table!r}"
|
||||
)
|
||||
dimensions[name] = dimension
|
||||
return list(dimensions.values())
|
||||
|
||||
|
||||
def _is_dimension_group(dimension: dict):
|
||||
"""Determine if a dimension is actually a dimension group."""
|
||||
return "timeframes" in dimension or "intervals" in dimension
|
||||
|
||||
|
||||
def _generate_measures(dimensions: List[dict], table: str) -> List[Dict[str, str]]:
|
||||
"""Generate measures from a list of dimensions.
|
||||
|
||||
When no dimension-specific measures are found, return a single "count" measure.
|
||||
|
||||
Raise ClickException if dimensions result in duplicate measures.
|
||||
"""
|
||||
measures = {}
|
||||
for dimension in dimensions:
|
||||
dimension_name = dimension["name"]
|
||||
if dimension_name in {"client_id", "client_info__client_id"}:
|
||||
measure = {
|
||||
"name": "clients",
|
||||
"type": "count_distinct",
|
||||
"sql": f"${{{dimension_name}}}",
|
||||
}
|
||||
elif dimension_name == "document_id":
|
||||
measure = {"name": "ping_count", "type": "count"}
|
||||
else:
|
||||
continue
|
||||
name = measure["name"]
|
||||
if name in measures:
|
||||
raise click.ClickException(
|
||||
f"duplicate measure {name!r} for table {table!r}"
|
||||
)
|
||||
measures[name] = measure
|
||||
# return a generic count measure if no default measures were generated
|
||||
return list(measures.values()) or [{"name": "count", "type": "count"}]
|
||||
from .views import GrowthAccountingView, View, ViewDict, view_types
|
||||
|
||||
|
||||
def _generate_views(client, out_dir: Path, views: Iterable[View]) -> Iterable[Path]:
|
||||
for view in views:
|
||||
if view.type == GrowthAccountingView.type:
|
||||
if view.view_type == GrowthAccountingView.type:
|
||||
continue
|
||||
|
||||
view_defn: Dict[str, Any] = {"name": view.name}
|
||||
# use schema for the table where channel=="release" or the first one
|
||||
table = next(
|
||||
(table for table in view.tables if table.get("channel") == "release"),
|
||||
view.tables[0],
|
||||
)["table"]
|
||||
# add dimensions and dimension groups
|
||||
dimensions = _generate_dimensions(client, table)
|
||||
view_defn["dimensions"] = list(filterfalse(_is_dimension_group, dimensions))
|
||||
view_defn["dimension_groups"] = list(filter(_is_dimension_group, dimensions))
|
||||
# add measures
|
||||
view_defn["measures"] = _generate_measures(dimensions, table)
|
||||
if len(view.tables) > 1:
|
||||
# parameterize table name
|
||||
view_defn["parameters"] = [
|
||||
{
|
||||
"name": "channel",
|
||||
"type": "unquoted",
|
||||
"allowed_values": [
|
||||
{
|
||||
"label": table["channel"].title(),
|
||||
"value": table["table"],
|
||||
}
|
||||
for table in view.tables
|
||||
],
|
||||
}
|
||||
]
|
||||
view_defn["sql_table_name"] = "`{% parameter channel %}`"
|
||||
else:
|
||||
view_defn["sql_table_name"] = f"`{table}`"
|
||||
path = out_dir / f"{view.name}.view.lkml"
|
||||
path.write_text(lkml.dump({"views": [view_defn]}))
|
||||
lookml = {"views": view.to_lookml(client)}
|
||||
path.write_text(lkml.dump(lookml))
|
||||
yield path
|
||||
|
||||
|
||||
|
@ -190,19 +31,18 @@ def _generate_explores(
|
|||
continue
|
||||
|
||||
explore = explore_types[defn["type"]].from_dict(explore_name, defn)
|
||||
explore_lookml = explore.to_lookml()
|
||||
file_lookml = {
|
||||
"includes": f"/looker-hub/{namespace}/views/*.view.lkml",
|
||||
"explores": [explore_lookml],
|
||||
"explores": [explore.to_lookml()],
|
||||
}
|
||||
path = out_dir / (explore_name + ".explore.lkml")
|
||||
path.write_text(lkml.dump(file_lookml))
|
||||
yield path
|
||||
|
||||
|
||||
def _get_views_from_dict(views: Dict[str, List[Dict[str, str]]]) -> Iterable[View]:
|
||||
def _get_views_from_dict(views: Dict[str, ViewDict]) -> Iterable[View]:
|
||||
for view_name, view_info in views.items():
|
||||
yield view_types[view_info["type"]].from_dict(view_name, view_info)
|
||||
yield view_types[view_info["type"]].from_dict(view_name, view_info) # type: ignore
|
||||
|
||||
|
||||
@click.command(help=__doc__)
|
||||
|
|
|
@ -0,0 +1,104 @@
|
|||
"""Utils for generating lookml."""
|
||||
import re
|
||||
from typing import Any, Dict, Iterable, List, Tuple
|
||||
|
||||
import click
|
||||
from google.cloud import bigquery
|
||||
|
||||
BIGQUERY_TYPE_TO_DIMENSION_TYPE = {
|
||||
"BIGNUMERIC": "string",
|
||||
"BOOLEAN": "yesno",
|
||||
"BYTES": "string",
|
||||
"DATE": "time",
|
||||
"DATETIME": "time",
|
||||
"FLOAT": "number",
|
||||
"INTEGER": "number",
|
||||
"NUMERIC": "number",
|
||||
"STRING": "string",
|
||||
"TIME": "time",
|
||||
"TIMESTAMP": "time",
|
||||
}
|
||||
|
||||
HIDDEN_DIMENSIONS = {
|
||||
("document_id",),
|
||||
("client_id",),
|
||||
("client_info", "client_id"),
|
||||
}
|
||||
|
||||
MAP_LAYER_NAMES = {
|
||||
("country",): "countries",
|
||||
("metadata", "geo", "country"): "countries",
|
||||
}
|
||||
|
||||
|
||||
def _get_dimension(path: Tuple[str, ...], field_type: str, mode: str) -> Dict[str, Any]:
|
||||
result: Dict[str, Any] = {}
|
||||
result["sql"] = "${TABLE}." + ".".join(path)
|
||||
name = path
|
||||
if mode == "REPEATED" or path in HIDDEN_DIMENSIONS:
|
||||
result["hidden"] = "yes"
|
||||
else:
|
||||
result["type"] = BIGQUERY_TYPE_TO_DIMENSION_TYPE[field_type]
|
||||
if result["type"] == "time":
|
||||
# Remove _{type} suffix from the last path element for dimension group
|
||||
# names For example submission_date and submission_timestamp become
|
||||
# submission, and metadata.header.parsed_date becomes
|
||||
# metadata__header__parsed. This is because the timeframe will add a _{type}
|
||||
# suffix to the individual dimension names.
|
||||
name = *path[:-1], re.sub("_(date|time(stamp)?)$", "", path[-1])
|
||||
result["timeframes"] = [
|
||||
"raw",
|
||||
"time",
|
||||
"date",
|
||||
"week",
|
||||
"month",
|
||||
"quarter",
|
||||
"year",
|
||||
]
|
||||
if field_type == "DATE":
|
||||
result["timeframes"].remove("time")
|
||||
result["convert_tz"] = "no"
|
||||
result["datatype"] = "date"
|
||||
if len(path) > 1:
|
||||
result["group_label"] = " ".join(path[:-1]).replace("_", " ").title()
|
||||
result["group_item_label"] = path[-1].replace("_", " ").title()
|
||||
if path in MAP_LAYER_NAMES:
|
||||
result["map_layer_name"] = MAP_LAYER_NAMES[path]
|
||||
result["name"] = "__".join(name)
|
||||
return result
|
||||
|
||||
|
||||
def _generate_dimensions_helper(
|
||||
schema: List[bigquery.SchemaField], *prefix: str
|
||||
) -> Iterable[dict]:
|
||||
for field in sorted(schema, key=lambda f: f.name):
|
||||
if field.field_type == "RECORD" and not field.mode == "REPEATED":
|
||||
yield from _generate_dimensions_helper(field.fields, *prefix, field.name)
|
||||
else:
|
||||
yield _get_dimension((*prefix, field.name), field.field_type, field.mode)
|
||||
|
||||
|
||||
def _generate_dimensions(client: bigquery.Client, table: str) -> List[Dict[str, Any]]:
|
||||
"""Generate dimensions and dimension groups from a bigquery table.
|
||||
|
||||
When schema contains both submission_timestamp and submission_date, only produce
|
||||
a dimension group for submission_timestamp.
|
||||
|
||||
Raise ClickException if schema results in duplicate dimensions.
|
||||
"""
|
||||
dimensions = {}
|
||||
for dimension in _generate_dimensions_helper(client.get_table(table).schema):
|
||||
name = dimension["name"]
|
||||
# overwrite duplicate "submission" dimension group, thus picking the
|
||||
# last value sorted by field name, which is submission_timestamp
|
||||
if name in dimensions and name != "submission":
|
||||
raise click.ClickException(
|
||||
f"duplicate dimension {name!r} for table {table!r}"
|
||||
)
|
||||
dimensions[name] = dimension
|
||||
return list(dimensions.values())
|
||||
|
||||
|
||||
def _is_dimension_group(dimension: dict):
|
||||
"""Determine if a dimension is actually a dimension group."""
|
||||
return "timeframes" in dimension or "intervals" in dimension
|
|
@ -2,11 +2,23 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from collections import defaultdict
|
||||
from typing import Dict, Iterator, List
|
||||
from itertools import filterfalse
|
||||
from typing import Any, Dict, Iterator, List, TypedDict
|
||||
|
||||
import click
|
||||
|
||||
from . import lookml_utils
|
||||
|
||||
OMIT_VIEWS = {"deletion_request"}
|
||||
|
||||
|
||||
class ViewDict(TypedDict):
|
||||
"""Represent a view definition."""
|
||||
|
||||
type: str
|
||||
tables: List[Dict[str, str]]
|
||||
|
||||
|
||||
class View(object):
|
||||
"""A generic Looker View."""
|
||||
|
||||
|
@ -28,7 +40,7 @@ class View(object):
|
|||
raise NotImplementedError("Only implemented in subclass.")
|
||||
|
||||
@classmethod
|
||||
def from_dict(klass, name: str, _dict: List[Dict[str, str]]) -> View:
|
||||
def from_dict(klass, name: str, _dict: ViewDict) -> View:
|
||||
"""Get a view from a name and dict definition."""
|
||||
raise NotImplementedError("Only implemented in subclass.")
|
||||
|
||||
|
@ -61,8 +73,15 @@ class View(object):
|
|||
)
|
||||
return False
|
||||
|
||||
def generate_dimensions(self):
|
||||
pass
|
||||
def to_lookml(self, bq_client) -> List[dict]:
|
||||
"""
|
||||
Generate Lookml for this view.
|
||||
|
||||
View instances can generate more than one Looker view,
|
||||
for e.g. nested fields and joins, so this returns
|
||||
a list.
|
||||
"""
|
||||
raise NotImplementedError("Only implemented in subclass.")
|
||||
|
||||
|
||||
class PingView(View):
|
||||
|
@ -100,14 +119,87 @@ class PingView(View):
|
|||
yield PingView(view_id, tables)
|
||||
|
||||
@classmethod
|
||||
def from_dict(klass, name: str, _dict: List[Dict[str, str]]) -> PingView:
|
||||
def from_dict(klass, name: str, _dict: ViewDict) -> PingView:
|
||||
"""Get a view from a name and dict definition."""
|
||||
return PingView(name, _dict["tables"])
|
||||
|
||||
def generate_dimensions(self):
|
||||
pass
|
||||
def to_lookml(self, bq_client) -> List[dict]:
|
||||
"""Generate LookML for this view."""
|
||||
view_defn: Dict[str, Any] = {"name": self.name}
|
||||
|
||||
def generate_measures(self):
|
||||
pass
|
||||
# use schema for the table where channel=="release" or the first one
|
||||
table = next(
|
||||
(table for table in self.tables if table.get("channel") == "release"),
|
||||
self.tables[0],
|
||||
)["table"]
|
||||
|
||||
# add dimensions and dimension groups
|
||||
dimensions = lookml_utils._generate_dimensions(bq_client, table)
|
||||
view_defn["dimensions"] = list(
|
||||
filterfalse(lookml_utils._is_dimension_group, dimensions)
|
||||
)
|
||||
view_defn["dimension_groups"] = list(
|
||||
filter(lookml_utils._is_dimension_group, dimensions)
|
||||
)
|
||||
|
||||
# add measures
|
||||
view_defn["measures"] = self.get_measures(dimensions, table)
|
||||
|
||||
# parameterize table name
|
||||
if len(self.tables) > 1:
|
||||
view_defn["parameters"] = [
|
||||
{
|
||||
"name": "channel",
|
||||
"type": "unquoted",
|
||||
"allowed_values": [
|
||||
{
|
||||
"label": table["channel"].title(),
|
||||
"value": table["table"],
|
||||
}
|
||||
for table in self.tables
|
||||
],
|
||||
}
|
||||
]
|
||||
view_defn["sql_table_name"] = "`{% parameter channel %}`"
|
||||
else:
|
||||
view_defn["sql_table_name"] = f"`{table}`"
|
||||
|
||||
return [view_defn]
|
||||
|
||||
def get_measures(self, dimensions: List[dict], table: str) -> List[Dict[str, str]]:
|
||||
"""Generate measures from a list of dimensions.
|
||||
|
||||
When no dimension-specific measures are found, return a single "count" measure.
|
||||
|
||||
Raise ClickException if dimensions result in duplicate measures.
|
||||
"""
|
||||
measures = {}
|
||||
|
||||
for dimension in dimensions:
|
||||
dimension_name = dimension["name"]
|
||||
if dimension_name in {"client_id", "client_info__client_id"}:
|
||||
measure = {
|
||||
"name": "clients",
|
||||
"type": "count_distinct",
|
||||
"sql": f"${{{dimension_name}}}",
|
||||
}
|
||||
elif dimension_name == "document_id":
|
||||
measure = {"name": "ping_count", "type": "count"}
|
||||
else:
|
||||
continue
|
||||
name = measure["name"]
|
||||
if name in measures:
|
||||
raise click.ClickException(
|
||||
f"duplicate measure {name!r} for table {table!r}"
|
||||
)
|
||||
measures[name] = measure
|
||||
|
||||
if len(measures) == 0:
|
||||
raise click.ClickException(
|
||||
f"Missing client_id and doc_id dimensions in {table!r}"
|
||||
)
|
||||
|
||||
return list(measures.values())
|
||||
|
||||
|
||||
class GrowthAccountingView(View):
|
||||
|
@ -134,11 +226,14 @@ class GrowthAccountingView(View):
|
|||
yield GrowthAccountingView([{"table": f"mozdata.{dataset}.{view_id}"}])
|
||||
|
||||
@classmethod
|
||||
def from_dict(
|
||||
klass, name: str, _dict: List[Dict[str, str]]
|
||||
) -> GrowthAccountingView:
|
||||
def from_dict(klass, name: str, _dict: ViewDict) -> GrowthAccountingView:
|
||||
"""Get a view from a name and dict definition."""
|
||||
return GrowthAccountingView(_dict["tables"])
|
||||
|
||||
def to_lookml(self, bq_client) -> List[dict]:
|
||||
"""Generate LookML for this view."""
|
||||
pass
|
||||
|
||||
|
||||
view_types = {
|
||||
PingView.type: PingView,
|
||||
|
|
|
@ -365,7 +365,7 @@ def test_duplicate_dimension(runner, tmp_path):
|
|||
canonical_app_name: Custom
|
||||
views:
|
||||
baseline:
|
||||
type: ping_explore
|
||||
type: ping_view
|
||||
tables:
|
||||
- channel: release
|
||||
table: mozdata.fail.duplicate_dimension
|
||||
|
@ -397,7 +397,7 @@ def test_duplicate_measure(runner, tmp_path):
|
|||
canonical_app_name: Custom
|
||||
views:
|
||||
baseline:
|
||||
type: ping_explore
|
||||
type: ping_view
|
||||
tables:
|
||||
- channel: release
|
||||
table: mozdata.fail.duplicate_measure
|
||||
|
@ -406,6 +406,8 @@ def test_duplicate_measure(runner, tmp_path):
|
|||
)
|
||||
with runner.isolated_filesystem():
|
||||
with patch("google.cloud.bigquery.Client", MockClient):
|
||||
# print(f"{namespaces}")
|
||||
# lookml(str(namespaces), "looker-hub/")
|
||||
result = runner.invoke(
|
||||
lookml,
|
||||
[
|
||||
|
|
Загрузка…
Ссылка в новой задаче