DENG-807 Added backfill complete cli command (#4040)

* Added complete command

---------

Co-authored-by: Alexander <anicholson@mozilla.com>
This commit is contained in:
Winnie Chan 2023-07-13 15:44:41 -07:00 коммит произвёл GitHub
Родитель 9657d2203e
Коммит 97cb4117ad
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
2 изменённых файлов: 151 добавлений и 4 удалений

Просмотреть файл

@ -208,9 +208,12 @@ def get_backfill_entries_to_process_dict(
)
sys.exit(1)
if (len(entries)) > 1:
if not entries:
click.echo(f"No backfill to process for table: {qualified_table_name} ")
sys.exit(1)
elif (len(entries)) > 1:
click.echo(
f"There should not be more than one entry with drafting status: for {qualified_table_name} "
f"There should not be more than one entry in backfill.yaml file with status: {BackfillStatus.DRAFTING} "
)
sys.exit(1)

Просмотреть файл

@ -2,10 +2,12 @@
import sys
import tempfile
from datetime import date, datetime
from datetime import date, datetime, timedelta
import click
import yaml
from google.cloud import bigquery
from google.cloud.exceptions import Conflict, NotFound
from ..backfill.parse import (
BACKFILL_FILE,
@ -15,6 +17,8 @@ from ..backfill.parse import (
BackfillStatus,
)
from ..backfill.utils import (
BACKFILL_DESTINATION_DATASET,
BACKFILL_DESTINATION_PROJECT,
get_backfill_entries_to_process_dict,
get_backfill_file_from_qualified_table_name,
get_backfill_staging_qualified_table_name,
@ -30,7 +34,7 @@ from ..backfill.validate import (
)
from ..cli.query import backfill as query_backfill
from ..cli.query import deploy
from ..cli.utils import project_id_option, sql_dir_option
from ..cli.utils import is_authenticated, project_id_option, sql_dir_option
from ..config import ConfigLoader
@ -314,6 +318,7 @@ def scheduled(ctx, qualified_table_name, sql_dir, project_id):
Examples:
\b
# Process backfill entry for specific table
./bqetl backfill process moz-fx-data-shared-prod.telemetry_derived.clients_daily_v6
@ -377,3 +382,142 @@ def process(ctx, qualified_table_name, sql_dir, project_id, dry_run):
# todo: send notification to watcher(s) that backill for file has been completed
click.echo("Backfill processing completed.")
@backfill.command(
help="""Complete entry in backfill.yaml with Validated status.
Examples:
\b
# Complete backfill entry for specific table
./bqetl backfill complete moz-fx-data-shared-prod.telemetry_derived.clients_daily_v6
Use the `--project_id` option to change the project;
default project_id is `moz-fx-data-shared-prod`.
"""
)
@click.argument("qualified_table_name")
@sql_dir_option
@project_id_option("moz-fx-data-shared-prod")
@click.pass_context
def complete(ctx, qualified_table_name, sql_dir, project_id):
"""Complete backfill entry in backfill.yaml file(s)."""
if not is_authenticated():
click.echo(
"Authentication to GCP required. Run `gcloud auth login` "
"and check that the project is set correctly."
)
sys.exit(1)
client = bigquery.Client(project=project_id)
entries = get_entries_from_qualified_table_name(
sql_dir, qualified_table_name, BackfillStatus.VALIDATED.value
)
if not entries:
click.echo(f"No backfill to complete for table: {qualified_table_name} ")
sys.exit(1)
elif len(entries) > 1:
click.echo(
f"There should not be more than one entry in backfill.yaml file with status: {BackfillStatus.VALIDATED} "
)
sys.exit(1)
entry_to_complete = entries[0]
click.echo(
f"Completing backfill for {qualified_table_name} with entry date {entry_to_complete.entry_date}:"
)
backfill_staging_qualified_table_name = get_backfill_staging_qualified_table_name(
qualified_table_name, entry_to_complete.entry_date
)
# do not complete backfill when staging table does not exist
try:
client.get_table(backfill_staging_qualified_table_name)
except NotFound:
click.echo(
f"""
Backfill staging table does not exists for {qualified_table_name}:
{backfill_staging_qualified_table_name}
"""
)
sys.exit(1)
project, dataset, table = qualified_table_name_matching(qualified_table_name)
# clone production table
cloned_table_id = f"{table}_backup_{entry_to_complete.entry_date}".replace("-", "_")
cloned_table_full_name = f"{BACKFILL_DESTINATION_PROJECT}.{BACKFILL_DESTINATION_DATASET}.{cloned_table_id}"
_copy_table(qualified_table_name, cloned_table_full_name, client, clone=True)
# copy backfill data to production data
start_date = entry_to_complete.start_date
end_date = entry_to_complete.end_date
dates = [start_date + timedelta(i) for i in range((end_date - start_date).days + 1)]
# replace partitions in production table that have been backfilled
for backfill_date in dates:
if backfill_date in entry_to_complete.excluded_dates:
click.echo(f"Skipping excluded date: {backfill_date}")
continue
partition = backfill_date.strftime("%Y%m%d")
production_table = f"{qualified_table_name}${partition}"
backfill_table = f"{backfill_staging_qualified_table_name}${partition}"
_copy_table(backfill_table, production_table, client)
# delete backfill staging table
client.delete_table(backfill_staging_qualified_table_name)
click.echo(
f"Backfill staging table deleted: {backfill_staging_qualified_table_name}"
)
click.echo(
f"Completed backfill for {qualified_table_name} with entry date {entry_to_complete.entry_date}"
)
def _copy_table(
source_table: str, destination_table: str, client, clone: bool = False
) -> None:
"""
Copy and overwrite table from source to destination table.
If clone is True, clone (previous) production data for backup before swapping stage data into production.
"""
job_type_str = "copied"
if clone:
copy_config = bigquery.CopyJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_EMPTY,
operation_type=bigquery.job.copy_.OperationType.CLONE,
destination_expiration_time=(datetime.now() + timedelta(days=30)).strftime(
"%Y-%m-%dT%H:%M:%SZ"
),
)
job_type_str = "cloned"
else:
copy_config = bigquery.CopyJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
operation_type=bigquery.job.copy_.OperationType.COPY,
)
try:
client.copy_table(
source_table,
destination_table,
job_config=copy_config,
).result()
except NotFound:
click.echo(f"Source table not found: {source_table}")
sys.exit(1)
except Conflict:
print(f"Backup table already exists: {destination_table}")
sys.exit(1)
click.echo(
f"Table {source_table} successfully {job_type_str} to {destination_table}"
)