Add BigQuery schema conversion util (#5034)

* Add BigQuery schema conversion util

* Update bigquery_etl/schema/__init__.py

Co-authored-by: Sean Rose <1994030+sean-rose@users.noreply.github.com>

---------

Co-authored-by: Sean Rose <1994030+sean-rose@users.noreply.github.com>
This commit is contained in:
Alexander 2024-02-13 16:46:46 -05:00 коммит произвёл GitHub
Родитель ee28007cfb
Коммит a5c6c91bb1
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
3 изменённых файлов: 86 добавлений и 9 удалений

Просмотреть файл

@ -15,7 +15,6 @@ from functools import partial
from glob import glob
from multiprocessing.pool import Pool, ThreadPool
from pathlib import Path
from tempfile import NamedTemporaryFile
from traceback import print_exc
from typing import Optional
@ -2127,10 +2126,7 @@ def deploy(
)
sys.exit(1)
with NamedTemporaryFile(suffix=".json") as tmp_schema_file:
existing_schema.to_json_file(Path(tmp_schema_file.name))
bigquery_schema = client.schema_from_json(tmp_schema_file.name)
bigquery_schema = existing_schema.to_bigquery_schema()
try:
table = client.get_table(full_table_id)
except NotFound:
@ -2249,10 +2245,7 @@ def _deploy_external_data(
except NotFound:
table = bigquery.Table(full_table_id)
with NamedTemporaryFile(suffix=".json") as tmp_schema_file:
existing_schema.to_json_file(Path(tmp_schema_file.name))
bigquery_schema = client.schema_from_json(tmp_schema_file.name)
bigquery_schema = existing_schema.to_bigquery_schema()
table.schema = bigquery_schema
_attach_metadata(metadata_file_path, table)

Просмотреть файл

@ -10,6 +10,7 @@ import attr
import yaml
from google.api_core.exceptions import NotFound
from google.cloud import bigquery
from google.cloud.bigquery import SchemaField
from .. import dryrun
@ -282,3 +283,12 @@ class Schema:
def to_json(self):
"""Return the schema data as JSON."""
return json.dumps(self.schema)
def to_bigquery_schema(self) -> List[SchemaField]:
"""Get the BigQuery representation of the schema."""
return [SchemaField.from_api_repr(field) for field in self.schema["fields"]]
@classmethod
def from_bigquery_schema(cls, fields: List[SchemaField]) -> "Schema":
"""Construct a Schema from the BigQuery representation."""
return cls({"fields": [field.to_api_repr() for field in fields]})

Просмотреть файл

@ -2,6 +2,7 @@ from pathlib import Path
from textwrap import dedent
import yaml
from google.cloud.bigquery import SchemaField
from bigquery_etl.schema import Schema
@ -379,3 +380,76 @@ class TestQuerySchema:
schema_1.merge(schema_2)
assert schema_1.schema["fields"][0]["description"] == "Date of the submission"
def test_bigquery_conversion(self):
bq_schema = [
SchemaField(
"record_field",
"RECORD",
"REPEATED",
description="Record field",
fields=(
SchemaField(
"nested_field_1",
"STRING",
"NULLABLE",
description="Nested Field 1",
),
SchemaField(
"nested_field_2",
"INTEGER",
"NULLABLE",
description="Nested Field 2",
),
),
),
SchemaField("normal_field", "STRING"),
]
schema = Schema.from_bigquery_schema(bq_schema)
assert schema.schema["fields"][0]["description"] == "Record field"
assert schema.schema["fields"][0]["name"] == "record_field"
assert (
schema.schema["fields"][0]["fields"][0]["description"] == "Nested Field 1"
)
assert schema.schema["fields"][0]["fields"][0]["type"] == "STRING"
assert schema.schema["fields"][1]["name"] == "normal_field"
schema_yaml = dedent(
"""
fields:
- mode: NULLABLE
name: submission_date
type: DATE
- mode: NULLABLE
name: client_id
type: STRING
- fields:
- mode: NULLABLE
name: campaign
type: STRING
description: attribution campaign
mode: NULLABLE
name: attribution
type: RECORD
"""
)
schema = Schema.from_json(yaml.safe_load(schema_yaml))
bq_schema = schema.to_bigquery_schema()
assert len(bq_schema) == 3
assert bq_schema[0] == SchemaField(
name="submission_date", field_type="DATE", mode="NULLABLE"
)
assert bq_schema[2] == SchemaField(
name="attribution",
field_type="RECORD",
mode="NULLABLE",
fields=(
SchemaField(
name="campaign",
field_type="STRING",
description="attribution campaign",
),
),
)