bigquery-etl/bigquery_etl/metadata/parse_metadata.py

538 строки
18 KiB
Python
Исходник Обычный вид История

"""Parsing of metadata yaml files."""
2020-02-28 23:16:20 +03:00
2021-02-12 04:26:26 +03:00
import enum
import os
import re
import string
from datetime import date
from pathlib import Path
2022-12-07 21:47:47 +03:00
from typing import Any, Dict, List, Optional
import attr
import cattrs
import yaml
from google.cloud import bigquery
from bigquery_etl.query_scheduling.utils import is_email, is_email_or_github_identity
METADATA_FILE = "metadata.yaml"
DATASET_METADATA_FILE = "dataset_metadata.yaml"
DEFAULT_WORKGROUP_ACCESS = [
dict(role="roles/bigquery.dataViewer", members=["workgroup:mozilla-confidential"])
]
DEFAULT_TABLE_WORKGROUP_ACCESS = DEFAULT_WORKGROUP_ACCESS
2020-02-28 23:16:20 +03:00
class Literal(str):
"""Represents a YAML literal."""
pass
def literal_presenter(dumper, data):
"""Literal representer for YAML output."""
return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|")
yaml.add_representer(Literal, literal_presenter)
2021-02-12 04:26:26 +03:00
class PartitionType(enum.Enum):
"""Represents BigQuery table partition types."""
HOUR = "hour"
DAY = "day"
MONTH = "month"
YEAR = "year"
@property
def bigquery_type(self):
2021-02-17 02:10:28 +03:00
"""Map to the BigQuery data type."""
2021-02-12 04:26:26 +03:00
d = {
"hour": bigquery.TimePartitioningType.HOUR,
"day": bigquery.TimePartitioningType.DAY,
"month": bigquery.TimePartitioningType.MONTH,
"year": bigquery.TimePartitioningType.YEAR,
}
return d[self.value]
@attr.s(auto_attribs=True)
class PartitionMetadata:
"""Metadata for defining BigQuery table partitions."""
type: PartitionType
field: Optional[str] = attr.ib(None)
require_partition_filter: bool = attr.ib(True)
2021-12-02 01:55:56 +03:00
expiration_days: Optional[float] = attr.ib(None)
@property
def expiration_ms(self) -> Optional[float]:
"""Convert partition expiration from days to milliseconds."""
if self.expiration_days is None:
return None
return int(self.expiration_days * 86400000)
2021-02-12 04:26:26 +03:00
@attr.s(auto_attribs=True)
class PartitionRange:
"""Metadata for defining the partition range."""
start: int
end: int
interval: int
@attr.s(auto_attribs=True)
class RangePartitionMetadata:
"""Metadata for defining range partitioned tables."""
range: PartitionRange
field: Optional[str] = attr.ib(None)
2021-02-17 02:10:28 +03:00
@attr.s(auto_attribs=True)
class ClusteringMetadata:
"""Metadata for defining BigQuery table clustering."""
fields: List[str]
2021-02-12 04:26:26 +03:00
@attr.s(auto_attribs=True)
class BigQueryMetadata:
"""
Metadata related to BigQuery configurations for the query.
For example, partitioning or clustering of the destination table.
"""
2021-02-17 02:10:28 +03:00
time_partitioning: Optional[PartitionMetadata] = attr.ib(None)
range_partitioning: Optional[RangePartitionMetadata] = attr.ib(None)
2021-02-17 02:10:28 +03:00
clustering: Optional[ClusteringMetadata] = attr.ib(None)
2021-02-12 04:26:26 +03:00
2021-05-11 01:25:07 +03:00
@attr.s(auto_attribs=True)
class SchemaDerivedMetadata:
"""Metadata specifying parent schema."""
2021-05-18 18:39:35 +03:00
table: List[str]
2021-05-11 01:25:07 +03:00
# list of excluded columns
exclude: Optional[List[str]] = attr.ib(None)
@attr.s(auto_attribs=True)
class SchemaMetadata:
"""Metadata related to additional schema information."""
2021-05-18 18:39:35 +03:00
derived_from: List[SchemaDerivedMetadata]
@attr.s(auto_attribs=True)
class WorkgroupAccessMetadata:
"""Workgroup access metadata."""
role: str
members: List[str]
2021-05-11 01:25:07 +03:00
2022-12-07 21:47:47 +03:00
class ExternalDataFormat(enum.Enum):
"""Represents the external types fo data that are supported to be integrated."""
GOOGLE_SHEETS = "google_sheets"
CSV = "csv"
2022-12-07 21:47:47 +03:00
@attr.s(auto_attribs=True)
class ExternalDataMetadata:
"""Metadata for specifying external data."""
format: ExternalDataFormat
source_uris: List[str]
2022-12-07 21:47:47 +03:00
options: Optional[Dict[str, Any]] = attr.ib(None)
@attr.s(auto_attribs=True)
class MonitoringMetadata:
"""Metadata for specifying observability and monitoring configuration."""
enabled: bool = attr.ib(True)
collection: Optional[str] = attr.ib(None)
partition_column: Optional[str] = attr.ib(None)
partition_column_set: bool = attr.ib(False)
@attr.s(auto_attribs=True)
2020-02-28 23:16:20 +03:00
class Metadata:
"""
Representation of a table or view Metadata configuration.
Uses attrs to simplify the class definition and provide validation.
Docs: https://www.attrs.org
"""
friendly_name: str = attr.ib()
description: str = attr.ib()
owners: List[str] = attr.ib()
labels: Dict = attr.ib({})
scheduling: Optional[Dict] = attr.ib({})
2021-02-12 04:26:26 +03:00
bigquery: Optional[BigQueryMetadata] = attr.ib(None)
2021-05-11 01:25:07 +03:00
schema: Optional[SchemaMetadata] = attr.ib(None)
2021-05-18 18:39:35 +03:00
workgroup_access: Optional[List[WorkgroupAccessMetadata]] = attr.ib(None)
references: Dict = attr.ib({})
2022-12-07 21:47:47 +03:00
external_data: Optional[ExternalDataMetadata] = attr.ib(None)
deprecated: bool = attr.ib(False)
deletion_date: Optional[date] = attr.ib(None)
monitoring: Optional[MonitoringMetadata] = attr.ib(None)
@owners.validator
def validate_owners(self, attribute, value):
"""Check that provided email addresses or github identities for owners are valid."""
if not all(map(lambda e: is_email_or_github_identity(e), value)):
raise ValueError(f"Invalid email or Github identity for owners: {value}.")
@labels.validator
def validate_labels(self, attribute, value):
"""Check that labels are valid."""
for key, label in value.items():
2020-10-27 00:48:09 +03:00
if key == "review_bugs" and label != "":
if isinstance(label, list):
for bug in label:
if not Metadata.is_valid_label(str(bug)):
raise ValueError(f"Invalid label format: {bug}")
else:
raise ValueError("Error: review_bugs needs to be a list.")
elif not isinstance(label, bool):
if not Metadata.is_valid_label(str(key)):
2020-10-27 00:48:09 +03:00
raise ValueError(f"Invalid label format: {key}")
elif not Metadata.is_valid_label(str(label)) and label != "":
raise ValueError(f"Invalid label format: {label}")
2020-02-28 23:42:10 +03:00
2020-02-28 23:16:20 +03:00
@staticmethod
def is_valid_label(label):
"""
Check if a label has the right format.
2020-02-28 23:52:25 +03:00
Only hyphens (-), underscores (_), lowercase characters, and
numbers are allowed. International characters are not allowed.
2020-03-09 22:08:26 +03:00
Keys have a minimum length of 1 character and a maximum length of
63 characters, and cannot be empty. Values can be empty, and have
a maximum length of 63 characters.
2020-02-28 23:16:20 +03:00
"""
return re.fullmatch(r"[0-9a-z-_]{1,63}", label) is not None
2020-02-28 23:16:20 +03:00
2020-04-29 21:05:21 +03:00
@staticmethod
def is_metadata_file(file_path):
"""
Check if the provided file is a metadata file.
2020-04-29 21:05:21 +03:00
Checks if the name and file format match the metadata file requirements.
"""
2020-04-29 21:33:10 +03:00
# todo: we should probably also check if the file actually exists etc.
2020-04-29 21:05:21 +03:00
return os.path.basename(file_path) == METADATA_FILE
@classmethod
def of_table(cls, dataset, table, version, target_dir):
"""
Return metadata that is associated with the provided table.
The provided directory is searched for metadata files and is expected to
have the following structure: /<dataset>/<table>_<version>/metadata.yaml.
"""
path = os.path.join(target_dir, dataset, table + "_" + version)
metadata_file = os.path.join(path, METADATA_FILE)
cls = Metadata.from_file(metadata_file)
return cls
2020-02-28 23:16:20 +03:00
@classmethod
2020-02-28 23:42:10 +03:00
def from_file(cls, metadata_file):
"""Parse metadata from the provided file and create a new Metadata instance."""
2020-02-28 23:42:10 +03:00
friendly_name = None
description = None
2020-05-22 00:42:31 +03:00
owners = []
2020-02-28 23:42:10 +03:00
labels = {}
2020-04-21 01:01:30 +03:00
scheduling = {}
2021-02-17 02:10:28 +03:00
bigquery = None
2021-05-11 01:25:07 +03:00
schema = None
2021-05-18 18:39:35 +03:00
workgroup_access = None
references = {}
2022-12-07 21:47:47 +03:00
external_data = None
deprecated = False
deletion_date = None
monitoring = None
2020-02-28 23:42:10 +03:00
2020-02-28 23:16:20 +03:00
with open(metadata_file, "r") as yaml_stream:
try:
metadata = yaml.safe_load(yaml_stream)
table_name = str(Path(metadata_file).parent.name)
friendly_name = metadata.get(
"friendly_name", string.capwords(table_name.replace("_", " "))
)
description = metadata.get(
"description",
"Please provide a description for the query",
)
2020-02-28 23:16:20 +03:00
if "labels" in metadata:
for key, label in metadata["labels"].items():
if isinstance(label, bool):
2020-02-28 23:52:25 +03:00
# publish key-value pair with bool value as tag
2020-02-28 23:16:20 +03:00
if label:
2020-02-28 23:42:10 +03:00
labels[str(key)] = ""
2020-10-27 00:48:09 +03:00
elif isinstance(label, list):
labels[str(key)] = list(map(str, label))
else:
2020-02-28 23:16:20 +03:00
# all other pairs get published as key-value pair label
2020-02-28 23:42:10 +03:00
labels[str(key)] = str(label)
2020-02-28 23:16:20 +03:00
2020-04-21 01:01:30 +03:00
if "scheduling" in metadata:
scheduling = metadata["scheduling"]
2023-07-17 18:58:20 +03:00
if "dag_name" in scheduling and cls.is_valid_label(
scheduling["dag_name"]
):
labels["dag"] = scheduling["dag_name"]
2020-04-21 01:01:30 +03:00
2021-02-17 02:10:28 +03:00
if "bigquery" in metadata and metadata["bigquery"]:
converter = cattrs.BaseConverter()
2021-02-12 04:26:26 +03:00
bigquery = converter.structure(
metadata["bigquery"], BigQueryMetadata
)
2020-05-22 00:42:31 +03:00
if "owners" in metadata:
owners = metadata["owners"]
owner_idx = 1
for owner in filter(is_email, owners):
label = owner.split("@")[0]
if Metadata.is_valid_label(label):
labels[f"owner{owner_idx}"] = label
owner_idx += 1
2020-05-22 00:42:31 +03:00
2021-05-11 01:25:07 +03:00
if "schema" in metadata:
converter = cattrs.BaseConverter()
2021-05-11 02:21:23 +03:00
schema = converter.structure(metadata["schema"], SchemaMetadata)
2021-05-11 01:25:07 +03:00
2021-05-18 18:39:35 +03:00
if "workgroup_access" in metadata:
converter = cattrs.BaseConverter()
2021-05-18 18:39:35 +03:00
workgroup_access = converter.structure(
metadata["workgroup_access"], List[WorkgroupAccessMetadata]
)
if "references" in metadata:
references = metadata["references"]
2022-12-07 21:47:47 +03:00
if "external_data" in metadata:
converter = cattrs.BaseConverter()
external_data = converter.structure(
metadata["external_data"], ExternalDataMetadata
)
if "deprecated" in metadata:
deprecated = metadata["deprecated"]
if "deletion_date" in metadata:
deletion_date = metadata["deletion_date"]
2022-12-07 21:47:47 +03:00
if "monitoring" in metadata:
converter = cattrs.BaseConverter()
monitoring = converter.structure(
metadata["monitoring"], MonitoringMetadata
)
if "partition_column" in metadata["monitoring"]:
# check if partition column metadata has been set explicitly;
# needed for monitoring config validation for views where partition
# column needs to be set explicitly
monitoring.partition_column_set = True
2021-02-12 04:26:26 +03:00
return cls(
2021-05-11 02:21:23 +03:00
friendly_name,
description,
owners,
labels,
scheduling,
bigquery,
schema,
2021-05-18 18:39:35 +03:00
workgroup_access,
references,
2022-12-07 21:47:47 +03:00
external_data,
deprecated,
deletion_date,
monitoring,
2021-02-12 04:26:26 +03:00
)
2020-02-28 23:16:20 +03:00
except yaml.YAMLError as e:
raise e
2020-02-28 23:42:10 +03:00
@classmethod
2020-11-10 02:55:41 +03:00
def of_query_file(cls, sql_file):
2020-03-06 21:37:33 +03:00
"""Return the metadata that is associated with the provided SQL file."""
path, _ = os.path.split(sql_file)
metadata_file = os.path.join(path, METADATA_FILE)
cls = Metadata.from_file(metadata_file)
return cls
2020-07-27 22:25:59 +03:00
def write(self, file):
"""Write metadata information to the provided file."""
converter = cattrs.BaseConverter()
2021-05-18 18:39:35 +03:00
metadata_dict = converter.unstructure(self)
2020-07-27 22:25:59 +03:00
if metadata_dict["scheduling"] == {}:
del metadata_dict["scheduling"]
2020-08-21 00:39:22 +03:00
if metadata_dict["labels"]:
for label_key, label_value in metadata_dict["labels"].items():
# handle tags
if label_value == "":
metadata_dict["labels"][label_key] = True
2021-02-18 01:10:07 +03:00
if "description" in metadata_dict:
metadata_dict["description"] = Literal(metadata_dict["description"])
2021-05-11 02:21:23 +03:00
if metadata_dict["schema"] is None:
del metadata_dict["schema"]
2021-05-18 18:39:35 +03:00
if metadata_dict["workgroup_access"] is None:
del metadata_dict["workgroup_access"]
2022-12-07 21:47:47 +03:00
if metadata_dict["external_data"] is None:
del metadata_dict["external_data"]
if not metadata_dict["deprecated"]:
del metadata_dict["deprecated"]
if not metadata_dict["deletion_date"]:
del metadata_dict["deletion_date"]
if not metadata_dict["monitoring"]:
del metadata_dict["monitoring"]
2021-02-18 01:10:07 +03:00
file.write_text(
yaml.dump(
converter.unstructure(metadata_dict),
default_flow_style=False,
sort_keys=False,
)
)
2020-07-27 22:25:59 +03:00
2020-02-28 23:42:10 +03:00
def is_public_bigquery(self):
"""Return true if the public_bigquery flag is set."""
2020-02-28 23:42:10 +03:00
return "public_bigquery" in self.labels
def is_public_json(self):
"""Return true if the public_json flag is set."""
return "public_json" in self.labels
2020-03-19 00:48:27 +03:00
def is_incremental(self):
"""Return true if the incremental flag is set."""
return "incremental" in self.labels
2020-04-03 01:23:37 +03:00
def is_incremental_export(self):
"""Return true if the incremental_export flag is set."""
return "incremental_export" in self.labels
2020-10-27 00:48:09 +03:00
def review_bugs(self):
"""Return the bug ID of the data review bug in bugzilla."""
2020-10-27 00:48:09 +03:00
return self.labels.get("review_bugs", None)
2021-02-12 04:26:26 +03:00
def set_bigquery_partitioning(
2021-12-02 01:55:56 +03:00
self, field, partition_type, required, expiration_days=None
):
2021-02-12 04:26:26 +03:00
"""Update the BigQuery partitioning metadata."""
clustering = None
2021-02-17 02:10:28 +03:00
if self.bigquery and self.bigquery.clustering:
clustering = self.bigquery.clustering
2021-02-12 04:26:26 +03:00
self.bigquery = BigQueryMetadata(
2021-02-17 02:10:28 +03:00
time_partitioning=PartitionMetadata(
field=field,
type=PartitionType(partition_type),
require_partition_filter=required,
2021-12-02 01:55:56 +03:00
expiration_days=expiration_days,
2021-02-12 04:26:26 +03:00
),
2021-02-17 02:10:28 +03:00
clustering=clustering,
2021-02-12 04:26:26 +03:00
)
def set_bigquery_clustering(self, clustering_fields):
"""Update the BigQuery partitioning metadata."""
if self.bigquery:
time_partitioning = self.bigquery.time_partitioning
range_partitioning = self.bigquery.range_partitioning
2021-02-12 04:26:26 +03:00
self.bigquery = BigQueryMetadata(
time_partitioning=time_partitioning,
range_partitioning=range_partitioning,
clustering=ClusteringMetadata(fields=clustering_fields),
)
@attr.s(auto_attribs=True)
class DatasetMetadata:
"""
Representation of a dataset-level metadata configuration.
Uses attrs to simplify the class definition and provide validation.
Docs: https://www.attrs.org
"""
friendly_name: str = attr.ib()
description: str = attr.ib()
dataset_base_acl: str = attr.ib()
user_facing: bool = attr.ib(False)
labels: Dict = attr.ib({})
default_table_workgroup_access: Optional[List[Dict[str, Any]]] = attr.ib(None)
default_table_expiration_ms: str = attr.ib(None)
workgroup_access: list = attr.ib(DEFAULT_WORKGROUP_ACCESS)
syndication: Dict = attr.ib({})
def __attrs_post_init__(self):
"""Set default table workgroup access to workgroup access."""
if self.default_table_workgroup_access is None:
self.default_table_workgroup_access = self.workgroup_access
@staticmethod
def is_dataset_metadata_file(file_path):
"""
Check if the provided file is a metadata file.
Checks if the name and file format match the metadata file requirements.
"""
# todo: we should probably also check if the file actually exists etc.
return os.path.basename(file_path) == DATASET_METADATA_FILE
def write(self, file):
"""Write dataset metadata information to the provided file."""
metadata_dict = self.__dict__
if metadata_dict["labels"]:
for label_key, label_value in metadata_dict["labels"].items():
# handle tags
if label_value == "":
metadata_dict["labels"][label_key] = True
if "description" in metadata_dict:
metadata_dict["description"] = Literal(metadata_dict["description"])
if "default_table_workgroup_access" in metadata_dict:
metadata_dict["default_table_workgroup_access"] = metadata_dict[
"default_table_workgroup_access"
]
converter = cattrs.BaseConverter()
file.write_text(
yaml.dump(
converter.unstructure(metadata_dict),
default_flow_style=False,
sort_keys=False,
)
)
@classmethod
def from_file(cls, metadata_file):
"""Parse dataset metadata from the provided file.
Returns a new DatasetMetadata instance.
"""
with open(metadata_file, "r") as yaml_stream:
try:
metadata = yaml.safe_load(yaml_stream)
return cls(**metadata)
except yaml.YAMLError as e:
raise e