"""Parsing of metadata yaml files.""" import enum import os import re import string from datetime import date from pathlib import Path from typing import Any, Dict, List, Optional import attr import cattrs import yaml from google.cloud import bigquery from bigquery_etl.query_scheduling.utils import is_email, is_email_or_github_identity METADATA_FILE = "metadata.yaml" DATASET_METADATA_FILE = "dataset_metadata.yaml" DEFAULT_WORKGROUP_ACCESS = [ dict(role="roles/bigquery.dataViewer", members=["workgroup:mozilla-confidential"]) ] DEFAULT_TABLE_WORKGROUP_ACCESS = DEFAULT_WORKGROUP_ACCESS class Literal(str): """Represents a YAML literal.""" pass def literal_presenter(dumper, data): """Literal representer for YAML output.""" return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|") yaml.add_representer(Literal, literal_presenter) class PartitionType(enum.Enum): """Represents BigQuery table partition types.""" HOUR = "hour" DAY = "day" MONTH = "month" YEAR = "year" @property def bigquery_type(self): """Map to the BigQuery data type.""" d = { "hour": bigquery.TimePartitioningType.HOUR, "day": bigquery.TimePartitioningType.DAY, "month": bigquery.TimePartitioningType.MONTH, "year": bigquery.TimePartitioningType.YEAR, } return d[self.value] @attr.s(auto_attribs=True) class PartitionMetadata: """Metadata for defining BigQuery table partitions.""" type: PartitionType field: Optional[str] = attr.ib(None) require_partition_filter: bool = attr.ib(True) expiration_days: Optional[float] = attr.ib(None) @property def expiration_ms(self) -> Optional[float]: """Convert partition expiration from days to milliseconds.""" if self.expiration_days is None: return None return int(self.expiration_days * 86400000) @attr.s(auto_attribs=True) class PartitionRange: """Metadata for defining the partition range.""" start: int end: int interval: int @attr.s(auto_attribs=True) class RangePartitionMetadata: """Metadata for defining range partitioned tables.""" range: PartitionRange field: Optional[str] = attr.ib(None) @attr.s(auto_attribs=True) class ClusteringMetadata: """Metadata for defining BigQuery table clustering.""" fields: List[str] @attr.s(auto_attribs=True) class BigQueryMetadata: """ Metadata related to BigQuery configurations for the query. For example, partitioning or clustering of the destination table. """ time_partitioning: Optional[PartitionMetadata] = attr.ib(None) range_partitioning: Optional[RangePartitionMetadata] = attr.ib(None) clustering: Optional[ClusteringMetadata] = attr.ib(None) @attr.s(auto_attribs=True) class SchemaDerivedMetadata: """Metadata specifying parent schema.""" table: List[str] # list of excluded columns exclude: Optional[List[str]] = attr.ib(None) @attr.s(auto_attribs=True) class SchemaMetadata: """Metadata related to additional schema information.""" derived_from: List[SchemaDerivedMetadata] @attr.s(auto_attribs=True) class WorkgroupAccessMetadata: """Workgroup access metadata.""" role: str members: List[str] class ExternalDataFormat(enum.Enum): """Represents the external types fo data that are supported to be integrated.""" GOOGLE_SHEETS = "google_sheets" CSV = "csv" @attr.s(auto_attribs=True) class ExternalDataMetadata: """Metadata for specifying external data.""" format: ExternalDataFormat source_uris: List[str] options: Optional[Dict[str, Any]] = attr.ib(None) @attr.s(auto_attribs=True) class MonitoringMetadata: """Metadata for specifying observability and monitoring configuration.""" enabled: bool = attr.ib(True) collection: Optional[str] = attr.ib(None) partition_column: Optional[str] = attr.ib(None) partition_column_set: bool = attr.ib(False) @attr.s(auto_attribs=True) class Metadata: """ Representation of a table or view Metadata configuration. Uses attrs to simplify the class definition and provide validation. Docs: https://www.attrs.org """ friendly_name: str = attr.ib() description: str = attr.ib() owners: List[str] = attr.ib() labels: Dict = attr.ib({}) scheduling: Optional[Dict] = attr.ib({}) bigquery: Optional[BigQueryMetadata] = attr.ib(None) schema: Optional[SchemaMetadata] = attr.ib(None) workgroup_access: Optional[List[WorkgroupAccessMetadata]] = attr.ib(None) references: Dict = attr.ib({}) external_data: Optional[ExternalDataMetadata] = attr.ib(None) deprecated: bool = attr.ib(False) deletion_date: Optional[date] = attr.ib(None) monitoring: Optional[MonitoringMetadata] = attr.ib(None) @owners.validator def validate_owners(self, attribute, value): """Check that provided email addresses or github identities for owners are valid.""" if not all(map(lambda e: is_email_or_github_identity(e), value)): raise ValueError(f"Invalid email or Github identity for owners: {value}.") @labels.validator def validate_labels(self, attribute, value): """Check that labels are valid.""" for key, label in value.items(): if key == "review_bugs" and label != "": if isinstance(label, list): for bug in label: if not Metadata.is_valid_label(str(bug)): raise ValueError(f"Invalid label format: {bug}") else: raise ValueError("Error: review_bugs needs to be a list.") elif not isinstance(label, bool): if not Metadata.is_valid_label(str(key)): raise ValueError(f"Invalid label format: {key}") elif not Metadata.is_valid_label(str(label)) and label != "": raise ValueError(f"Invalid label format: {label}") @staticmethod def is_valid_label(label): """ Check if a label has the right format. Only hyphens (-), underscores (_), lowercase characters, and numbers are allowed. International characters are not allowed. Keys have a minimum length of 1 character and a maximum length of 63 characters, and cannot be empty. Values can be empty, and have a maximum length of 63 characters. """ return re.fullmatch(r"[0-9a-z-_]{1,63}", label) is not None @staticmethod def is_metadata_file(file_path): """ Check if the provided file is a metadata file. Checks if the name and file format match the metadata file requirements. """ # todo: we should probably also check if the file actually exists etc. return os.path.basename(file_path) == METADATA_FILE @classmethod def of_table(cls, dataset, table, version, target_dir): """ Return metadata that is associated with the provided table. The provided directory is searched for metadata files and is expected to have the following structure: //_/metadata.yaml. """ path = os.path.join(target_dir, dataset, table + "_" + version) metadata_file = os.path.join(path, METADATA_FILE) cls = Metadata.from_file(metadata_file) return cls @classmethod def from_file(cls, metadata_file): """Parse metadata from the provided file and create a new Metadata instance.""" friendly_name = None description = None owners = [] labels = {} scheduling = {} bigquery = None schema = None workgroup_access = None references = {} external_data = None deprecated = False deletion_date = None monitoring = None with open(metadata_file, "r") as yaml_stream: try: metadata = yaml.safe_load(yaml_stream) table_name = str(Path(metadata_file).parent.name) friendly_name = metadata.get( "friendly_name", string.capwords(table_name.replace("_", " ")) ) description = metadata.get( "description", "Please provide a description for the query", ) if "labels" in metadata: for key, label in metadata["labels"].items(): if isinstance(label, bool): # publish key-value pair with bool value as tag if label: labels[str(key)] = "" elif isinstance(label, list): labels[str(key)] = list(map(str, label)) else: # all other pairs get published as key-value pair label labels[str(key)] = str(label) if "scheduling" in metadata: scheduling = metadata["scheduling"] if "dag_name" in scheduling and cls.is_valid_label( scheduling["dag_name"] ): labels["dag"] = scheduling["dag_name"] if "bigquery" in metadata and metadata["bigquery"]: converter = cattrs.BaseConverter() bigquery = converter.structure( metadata["bigquery"], BigQueryMetadata ) if "owners" in metadata: owners = metadata["owners"] owner_idx = 1 for owner in filter(is_email, owners): label = owner.split("@")[0] if Metadata.is_valid_label(label): labels[f"owner{owner_idx}"] = label owner_idx += 1 if "schema" in metadata: converter = cattrs.BaseConverter() schema = converter.structure(metadata["schema"], SchemaMetadata) if "workgroup_access" in metadata: converter = cattrs.BaseConverter() workgroup_access = converter.structure( metadata["workgroup_access"], List[WorkgroupAccessMetadata] ) if "references" in metadata: references = metadata["references"] if "external_data" in metadata: converter = cattrs.BaseConverter() external_data = converter.structure( metadata["external_data"], ExternalDataMetadata ) if "deprecated" in metadata: deprecated = metadata["deprecated"] if "deletion_date" in metadata: deletion_date = metadata["deletion_date"] if "monitoring" in metadata: converter = cattrs.BaseConverter() monitoring = converter.structure( metadata["monitoring"], MonitoringMetadata ) if "partition_column" in metadata["monitoring"]: # check if partition column metadata has been set explicitly; # needed for monitoring config validation for views where partition # column needs to be set explicitly monitoring.partition_column_set = True return cls( friendly_name, description, owners, labels, scheduling, bigquery, schema, workgroup_access, references, external_data, deprecated, deletion_date, monitoring, ) except yaml.YAMLError as e: raise e @classmethod def of_query_file(cls, sql_file): """Return the metadata that is associated with the provided SQL file.""" path, _ = os.path.split(sql_file) metadata_file = os.path.join(path, METADATA_FILE) cls = Metadata.from_file(metadata_file) return cls def write(self, file): """Write metadata information to the provided file.""" converter = cattrs.BaseConverter() metadata_dict = converter.unstructure(self) if metadata_dict["scheduling"] == {}: del metadata_dict["scheduling"] if metadata_dict["labels"]: for label_key, label_value in metadata_dict["labels"].items(): # handle tags if label_value == "": metadata_dict["labels"][label_key] = True if "description" in metadata_dict: metadata_dict["description"] = Literal(metadata_dict["description"]) if metadata_dict["schema"] is None: del metadata_dict["schema"] if metadata_dict["workgroup_access"] is None: del metadata_dict["workgroup_access"] if metadata_dict["external_data"] is None: del metadata_dict["external_data"] if not metadata_dict["deprecated"]: del metadata_dict["deprecated"] if not metadata_dict["deletion_date"]: del metadata_dict["deletion_date"] if not metadata_dict["monitoring"]: del metadata_dict["monitoring"] file.write_text( yaml.dump( converter.unstructure(metadata_dict), default_flow_style=False, sort_keys=False, ) ) def is_public_bigquery(self): """Return true if the public_bigquery flag is set.""" return "public_bigquery" in self.labels def is_public_json(self): """Return true if the public_json flag is set.""" return "public_json" in self.labels def is_incremental(self): """Return true if the incremental flag is set.""" return "incremental" in self.labels def is_incremental_export(self): """Return true if the incremental_export flag is set.""" return "incremental_export" in self.labels def review_bugs(self): """Return the bug ID of the data review bug in bugzilla.""" return self.labels.get("review_bugs", None) def set_bigquery_partitioning( self, field, partition_type, required, expiration_days=None ): """Update the BigQuery partitioning metadata.""" clustering = None if self.bigquery and self.bigquery.clustering: clustering = self.bigquery.clustering self.bigquery = BigQueryMetadata( time_partitioning=PartitionMetadata( field=field, type=PartitionType(partition_type), require_partition_filter=required, expiration_days=expiration_days, ), clustering=clustering, ) def set_bigquery_clustering(self, clustering_fields): """Update the BigQuery partitioning metadata.""" if self.bigquery: time_partitioning = self.bigquery.time_partitioning range_partitioning = self.bigquery.range_partitioning self.bigquery = BigQueryMetadata( time_partitioning=time_partitioning, range_partitioning=range_partitioning, clustering=ClusteringMetadata(fields=clustering_fields), ) @attr.s(auto_attribs=True) class DatasetMetadata: """ Representation of a dataset-level metadata configuration. Uses attrs to simplify the class definition and provide validation. Docs: https://www.attrs.org """ friendly_name: str = attr.ib() description: str = attr.ib() dataset_base_acl: str = attr.ib() user_facing: bool = attr.ib(False) labels: Dict = attr.ib({}) default_table_workgroup_access: Optional[List[Dict[str, Any]]] = attr.ib(None) default_table_expiration_ms: str = attr.ib(None) workgroup_access: list = attr.ib(DEFAULT_WORKGROUP_ACCESS) syndication: Dict = attr.ib({}) def __attrs_post_init__(self): """Set default table workgroup access to workgroup access.""" if self.default_table_workgroup_access is None: self.default_table_workgroup_access = self.workgroup_access @staticmethod def is_dataset_metadata_file(file_path): """ Check if the provided file is a metadata file. Checks if the name and file format match the metadata file requirements. """ # todo: we should probably also check if the file actually exists etc. return os.path.basename(file_path) == DATASET_METADATA_FILE def write(self, file): """Write dataset metadata information to the provided file.""" metadata_dict = self.__dict__ if metadata_dict["labels"]: for label_key, label_value in metadata_dict["labels"].items(): # handle tags if label_value == "": metadata_dict["labels"][label_key] = True if "description" in metadata_dict: metadata_dict["description"] = Literal(metadata_dict["description"]) if "default_table_workgroup_access" in metadata_dict: metadata_dict["default_table_workgroup_access"] = metadata_dict[ "default_table_workgroup_access" ] converter = cattrs.BaseConverter() file.write_text( yaml.dump( converter.unstructure(metadata_dict), default_flow_style=False, sort_keys=False, ) ) @classmethod def from_file(cls, metadata_file): """Parse dataset metadata from the provided file. Returns a new DatasetMetadata instance. """ with open(metadata_file, "r") as yaml_stream: try: metadata = yaml.safe_load(yaml_stream) return cls(**metadata) except yaml.YAMLError as e: raise e