Add support for external data
This commit is contained in:
Родитель
dc69069939
Коммит
0ec61ba3a0
|
@ -40,6 +40,7 @@ from ..metadata.parse_metadata import (
|
|||
BigQueryMetadata,
|
||||
ClusteringMetadata,
|
||||
DatasetMetadata,
|
||||
ExternalDataFormat,
|
||||
Metadata,
|
||||
PartitionMetadata,
|
||||
PartitionType,
|
||||
|
@ -1633,6 +1634,8 @@ def deploy(
|
|||
)
|
||||
click.echo(f"Schema (and metadata) updated for {full_table_id}.")
|
||||
|
||||
_deploy_external_data(name, sql_dir, project_id, skip_existing)
|
||||
|
||||
|
||||
def _attach_metadata(query_file_path: Path, table: bigquery.Table) -> None:
|
||||
"""Add metadata from query file's metadata.yaml to table object."""
|
||||
|
@ -1661,6 +1664,70 @@ def _attach_metadata(query_file_path: Path, table: bigquery.Table) -> None:
|
|||
table.labels = metadata.labels
|
||||
|
||||
|
||||
def _deploy_external_data(
|
||||
name,
|
||||
sql_dir,
|
||||
project_id,
|
||||
skip_existing,
|
||||
) -> None:
|
||||
"""Publish external data tables"""
|
||||
metadata_files = paths_matching_name_pattern(
|
||||
name, sql_dir, project_id, ["metadata.yaml"]
|
||||
)
|
||||
client = bigquery.Client()
|
||||
for metadata_file_path in metadata_files:
|
||||
existing_schema_path = metadata_file_path.parent / SCHEMA_FILE
|
||||
|
||||
if not existing_schema_path.is_file():
|
||||
click.echo(f"No schema file found for {metadata_file_path}")
|
||||
continue
|
||||
|
||||
table_name = metadata_file_path.parent.name
|
||||
dataset_name = metadata_file_path.parent.parent.name
|
||||
project_name = metadata_file_path.parent.parent.parent.name
|
||||
full_table_id = f"{project_name}.{dataset_name}.{table_name}"
|
||||
|
||||
existing_schema = Schema.from_schema_file(existing_schema_path)
|
||||
|
||||
try:
|
||||
table = client.get_table(full_table_id)
|
||||
except NotFound:
|
||||
table = bigquery.Table(full_table_id)
|
||||
|
||||
with NamedTemporaryFile(suffix=".json") as tmp_schema_file:
|
||||
existing_schema.to_json_file(Path(tmp_schema_file.name))
|
||||
bigquery_schema = client.schema_from_json(tmp_schema_file.name)
|
||||
|
||||
table.schema = bigquery_schema
|
||||
_attach_metadata(metadata_file_path, table)
|
||||
metadata = Metadata.from_file(metadata_file_path)
|
||||
|
||||
if not table.created:
|
||||
if metadata.external_data:
|
||||
if metadata.external_data.format == ExternalDataFormat.GOOGLE_SHEET:
|
||||
external_config = bigquery.ExternalConfig("GOOGLE_SHEETS")
|
||||
external_config.source_uris = [metadata.external_data.source_uri]
|
||||
external_config.options = metadata.external_data.options
|
||||
table.external_data_configuration = external_config
|
||||
table = client.create_table(table)
|
||||
click.echo(f"Destination table {full_table_id} created.")
|
||||
else:
|
||||
click.echo(
|
||||
f"External data format {metadata.external_data.format} unsupported."
|
||||
)
|
||||
elif not skip_existing:
|
||||
client.update_table(
|
||||
table,
|
||||
[
|
||||
"schema",
|
||||
"friendly_name",
|
||||
"description",
|
||||
"labels",
|
||||
],
|
||||
)
|
||||
click.echo(f"Schema (and metadata) updated for {full_table_id}.")
|
||||
|
||||
|
||||
def _validate_schema_from_path(
|
||||
query_file_path, use_cloud_function=True, respect_dryrun_skip=True
|
||||
):
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
import enum
|
||||
import os
|
||||
import re
|
||||
from typing import Dict, List, Optional
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import attr
|
||||
import cattrs
|
||||
|
@ -114,6 +114,21 @@ class WorkgroupAccessMetadata:
|
|||
members: List[str]
|
||||
|
||||
|
||||
class ExternalDataFormat(enum.Enum):
|
||||
"""Represents the external types fo data that are supported to be integrated."""
|
||||
|
||||
GOOGLE_SHEET = "google_sheet"
|
||||
|
||||
|
||||
@attr.s(auto_attribs=True)
|
||||
class ExternalDataMetadata:
|
||||
"""Metadata for specifying external data."""
|
||||
|
||||
format: ExternalDataFormat
|
||||
source_uri: str
|
||||
options: Optional[Dict[str, Any]] = attr.ib(None)
|
||||
|
||||
|
||||
@attr.s(auto_attribs=True)
|
||||
class Metadata:
|
||||
"""
|
||||
|
@ -132,6 +147,7 @@ class Metadata:
|
|||
schema: Optional[SchemaMetadata] = attr.ib(None)
|
||||
workgroup_access: Optional[List[WorkgroupAccessMetadata]] = attr.ib(None)
|
||||
references: Dict = attr.ib({})
|
||||
external_data: Optional[ExternalDataMetadata] = attr.ib(None)
|
||||
|
||||
@owners.validator
|
||||
def validate_owners(self, attribute, value):
|
||||
|
@ -205,6 +221,7 @@ class Metadata:
|
|||
schema = None
|
||||
workgroup_access = None
|
||||
references = {}
|
||||
external_data = None
|
||||
|
||||
with open(metadata_file, "r") as yaml_stream:
|
||||
try:
|
||||
|
@ -252,6 +269,12 @@ class Metadata:
|
|||
if "references" in metadata:
|
||||
references = metadata["references"]
|
||||
|
||||
if "external_data" in metadata:
|
||||
converter = cattrs.BaseConverter()
|
||||
external_data = converter.structure(
|
||||
metadata["external_data"], ExternalDataMetadata
|
||||
)
|
||||
|
||||
return cls(
|
||||
friendly_name,
|
||||
description,
|
||||
|
@ -262,6 +285,7 @@ class Metadata:
|
|||
schema,
|
||||
workgroup_access,
|
||||
references,
|
||||
external_data,
|
||||
)
|
||||
except yaml.YAMLError as e:
|
||||
raise e
|
||||
|
@ -297,6 +321,9 @@ class Metadata:
|
|||
if metadata_dict["workgroup_access"] is None:
|
||||
del metadata_dict["workgroup_access"]
|
||||
|
||||
if metadata_dict["external_data"] is None:
|
||||
del metadata_dict["external_data"]
|
||||
|
||||
file.write_text(
|
||||
yaml.dump(
|
||||
converter.unstructure(metadata_dict),
|
||||
|
|
Загрузка…
Ссылка в новой задаче