diff --git a/GRAVEYARD.md b/GRAVEYARD.md index 9f09cd7d29..09fe1fac54 100644 --- a/GRAVEYARD.md +++ b/GRAVEYARD.md @@ -14,3 +14,13 @@ history every day, which had started to take on order 1 hour to run. The v2 tables instead define a `day_0` view and a `day_13` view and relies on the Growth and Usage Dashboard (GUD) to query them separately and join the results together at query time. + +## Shredder support for per-cluster deletes + +- [Removal PR](https://github.com/mozilla/bigquery-etl/pull/733) + +For `telemetry_stable.main_v4` shredder used `SELECT` statements over single +clusters, then combined the result to remove rows from the table. This was an +attempt to improve performance so that reserved slots would be cheaper than +on-demand pricing, but it turned out to be slower than using `DELETE` +statements for whole partitions. diff --git a/bigquery_etl/shredder/config.py b/bigquery_etl/shredder/config.py index ce33ac8926..5cd9756826 100644 --- a/bigquery_etl/shredder/config.py +++ b/bigquery_etl/shredder/config.py @@ -4,10 +4,6 @@ from dataclasses import dataclass from functools import partial -from itertools import chain -from typing import Optional, Tuple - -from ..util.sql_table_id import sql_table_id SHARED_PROD = "moz-fx-data-shared-prod" @@ -31,52 +27,18 @@ class DeleteSource: """Dataset Id.""" return self.table.split(".", 1)[0] - @property - def sql_table_id(self): - """Make sql_table_id available as a property for easier templating.""" - return sql_table_id(self) - - -@dataclass(frozen=True) -class ClusterCondition: - """Data class for cluster condition.""" - - condition: str - needs_clustering: bool - @dataclass(frozen=True) class DeleteTarget: """Data class for deletion request target. - Without cluster conditions rows will be removed using either one DELETE - statement for the whole table, or one DELETE statement per partition if the - table is larger than some configurable threshold. - - When provided cluster conditions are used to divide up deletes into parts - smaller than partitions. This is a mitigation specifically for main_v4 - because it has thousands of sparsely populated columns and partitions in - excess of 10TiB, resulting in very slow DELETE performance that could - exceed 6 hours for a single partition and makes flat-rate pricing more - expensive than on-demand pricing. - - To improve performance vs DELETE operations, cluster conditions can set - needs_clustering to False to avoid the overhead of clustering results when - the condition identifies a single cluster. - - Each cluster condition is used with a SELECT statement to extract rows from - the target table into an intermediate table while filtering out rows with - deletion requests. The intermediate tables are then combined using a copy - operation to overwrite target table partitions. - - This means that every row must be covered by precisely one cluster - condition. Any rows not covered by a cluster condition would be dropped, - and any rows covered by multiple conditions would be duplicated. + Rows will be removed using either one DELETE statement for the whole table, + or one DELETE statement per partition if the table is larger than some + configurable threshold. """ table: str field: str - cluster_conditions: Optional[Tuple[ClusterCondition, ...]] = None project: str = SHARED_PROD @property @@ -89,11 +51,6 @@ class DeleteTarget: """Dataset Id.""" return self.table.split(".", 1)[0] - @property - def sql_table_id(self): - """Make sql_table_id available as a property for easier templating.""" - return sql_table_id(self) - CLIENT_ID = "client_id" GLEAN_CLIENT_ID = "client_info.client_id" @@ -196,26 +153,7 @@ DELETE_TARGETS = { client_id_target(table="telemetry_stable.frecency_update_v4"): DESKTOP_SRC, client_id_target(table="telemetry_stable.health_v4"): DESKTOP_SRC, client_id_target(table="telemetry_stable.heartbeat_v4"): DESKTOP_SRC, - client_id_target( - table="telemetry_stable.main_v4", - cluster_conditions=tuple( - ClusterCondition(condition, needs_clustering) - for condition, needs_clustering in chain( - { - f"sample_id = {sample_id} AND normalized_channel = 'release'": False - for sample_id in range(100) - }.items(), - [ - ( - "(sample_id IS NULL " - "OR normalized_channel IS NULL " - "OR normalized_channel != 'release')", - True, - ) - ], - ) - ), - ): DESKTOP_SRC, + client_id_target(table="telemetry_stable.main_v4"): DESKTOP_SRC, client_id_target(table="telemetry_stable.modules_v4"): DESKTOP_SRC, client_id_target(table="telemetry_stable.new_profile_v4"): DESKTOP_SRC, client_id_target(table="telemetry_stable.saved_session_v4"): DESKTOP_SRC, diff --git a/bigquery_etl/shredder/delete.py b/bigquery_etl/shredder/delete.py index 5175c8ef9b..20c62b6715 100644 --- a/bigquery_etl/shredder/delete.py +++ b/bigquery_etl/shredder/delete.py @@ -12,11 +12,10 @@ import warnings from google.cloud import bigquery +from ..util.bigquery_id import FULL_JOB_ID_RE, full_job_id, sql_table_id from ..util.client_queue import ClientQueue -from ..util.temp_table import get_temporary_table from ..util.table_filter import add_table_filter_arguments, get_table_filter -from ..util.sql_table_id import sql_table_id -from .config import ClusterCondition, DELETE_TARGETS +from .config import DELETE_TARGETS parser = ArgumentParser(description=__doc__) @@ -106,57 +105,13 @@ parser.add_argument( help="Table for recording state; Used to avoid repeating deletes if interrupted; " "Create it if it does not exist; By default state is not recorded", ) -parser.add_argument( - "--ignore-cluster-conditions", - "--ignore_cluster_conditions", - action="store_true", - help="Ignore cluster conditions; Used to process main_v4 using DELETE queries; " - "Should be combined with --billing-projects that use on-demand pricing", -) add_table_filter_arguments(parser) -WHERE_CLAUSE = """ -WHERE - {target.field} IN ( - SELECT DISTINCT - {source.field} - FROM - `{source.sql_table_id}` - WHERE - {source_condition} - ) - AND {partition_condition} -""" -DELETE_TEMPLATE = f""" -DELETE - `{{target.sql_table_id}}` -{WHERE_CLAUSE.strip()} -""" - -SELECT_TEMPLATE = f""" -CREATE TABLE - `{{destination.project}}.{{destination.dataset_id}}.{{destination.table_id}}` -PARTITION BY - {{partition_expr}} -{{clustering}} -OPTIONS ( - expiration_timestamp = '{{expiration_timestamp}}' -) -AS -SELECT - * -FROM - `{{target.sql_table_id}}` -{WHERE_CLAUSE.strip()} - AND {{cluster_condition}} -""" - - -def record_state(client, state_table, task_id, job, dry_run, start_date, end_date): +def record_state(client, task_id, job, dry_run, start_date, end_date, state_table): """Record the job for task_id in state_table.""" if state_table is not None: - job_id = "a job_id" if dry_run else f"{job.project}.{job.location}.{job.job_id}" + job_id = "a job_id" if dry_run else full_job_id(job) insert_tense = "Would insert" if dry_run else "Inserting" logging.info(f"{insert_tense} {job_id} in {state_table} for task: {task_id}") if not dry_run: @@ -184,69 +139,69 @@ def record_state(client, state_table, task_id, job, dry_run, start_date, end_dat ).result() -def wait_for_job( - client, state_table, states, task_id, dry_run, start_date, end_date, create_job -): +def wait_for_job(client, states, task_id, dry_run, create_job, **state_kwargs): """Get a job from state or create a new job, and wait for the job to complete.""" - job = states.get(task_id) - if job: - project, location, job_id = job.split(".") - job = client.get_job(job_id, project, location) + job = None + if task_id in states: + job = client.get_job(**FULL_JOB_ID_RE.fullmatch(states[task_id]).groupdict()) if job.errors: - logging.info(f"Previous attempt failed, retrying: {task_id}") + logging.info(f"Previous attempt failed, retrying for {task_id}") job = None elif job.ended: - logging.info(f"Previous attempt succeeded, reusing result: {task_id}") + logging.info(f"Previous attempt succeeded, reusing result for {task_id}") else: - logging.info(f"Previous attempt still running: {task_id}") + logging.info(f"Previous attempt still running for {task_id}") if job is None: job = create_job(client) - record_state(client, state_table, task_id, job, dry_run, start_date, end_date) + record_state( + client=client, task_id=task_id, dry_run=dry_run, job=job, **state_kwargs + ) if not dry_run and not job.ended: - logging.info(f"Waiting on {job.project}.{job.location}.{job.job_id}: {task_id}") + logging.info(f"Waiting on {full_job_id(job)} for {task_id}") job.result() return job -def get_task_id(job_type, target, partition_id=None, cluster_condition=None): +def get_task_id(target, partition_id): """Get unique task id for state tracking.""" - task_id = f"{job_type} for {target.sql_table_id}" + task_id = sql_table_id(target) if partition_id: task_id += f"${partition_id}" - if cluster_condition: - task_id += f" where {cluster_condition}" return task_id -async def delete_from_cluster( +async def delete_from_partition( executor, client_q, dry_run, - priority, - states, - state_table, - target, + partition_condition, partition_id, - cluster_condition, - clustering_fields, - update_clustering, - start_date, - end_date, - **template_kwargs, + priority, + source, + source_condition, + target, + **wait_for_job_kwargs, ): - """Process deletion requests for a cluster condition on a partition.""" + """Process deletion requests for partitions of a target table.""" # noqa: D202 def create_job(client): - if cluster_condition is None: - query = DELETE_TEMPLATE.format(target=target, **template_kwargs) - else: - query = SELECT_TEMPLATE.format( - destination=get_temporary_table(client), - cluster_condition=cluster_condition, - target=target, - **template_kwargs, - ) + query = dedent( + f""" + DELETE + `{sql_table_id(target)}` + WHERE + {target.field} IN ( + SELECT DISTINCT + {source.field} + FROM + `{sql_table_id(source)}` + WHERE + {source_condition} + ) + AND {partition_condition} + """ + ).strip() run_tense = "Would run" if dry_run else "Running" logging.debug(f"{run_tense} query: {query}") return client.query( @@ -257,102 +212,13 @@ async def delete_from_cluster( executor, partial( wait_for_job, - state_table=state_table, - states=states, - task_id=get_task_id("delete", target, partition_id, cluster_condition), - dry_run=dry_run, - start_date=start_date, - end_date=end_date, create_job=create_job, + dry_run=dry_run, + task_id=get_task_id(target, partition_id), + **wait_for_job_kwargs, ), ) - if update_clustering: - destination = sql_table_id(job.destination) - if dry_run: - logging.debug(f"Would update clustering on {destination}") - else: - table = client_q.default_client.get_table(job.destination) - if table.clustering_fields != clustering_fields: - logging.debug(f"Updating clustering on {destination}") - table.clustering_fields = clustering_fields - client_q.default_client.update_table(table, ["clustering"]) - return job - - -async def delete_from_partition( - client_q, - dry_run, - target, - partition_id, - clustering, - start_date, - end_date, - state_table, - states, - **kwargs, -): - """Process deletion requests for a single partition of a target table.""" - client = client_q.default_client - jobs = await asyncio.gather( - *[ - delete_from_cluster( - client_q=client_q, - dry_run=dry_run, - target=target, - partition_id=partition_id, - clustering=(clustering if cluster.needs_clustering else ""), - start_date=start_date, - end_date=end_date, - state_table=state_table, - states=states, - update_clustering=cluster.needs_clustering is False, - cluster_condition=cluster.condition, - **kwargs, - ) - for cluster in (target.cluster_conditions or [ClusterCondition(None, None)]) - ] - ) - if target.cluster_conditions: - # copy results into place and delete temp tables - sources = [sql_table_id(job.destination) for job in jobs] - dest = f"{target.sql_table_id}${partition_id}" - overwrite_tense = "Would overwrite" if dry_run else "Overwriting" - logging.info(f"{overwrite_tense} {dest} by copying {len(sources)} temp tables") - logging.debug(f"{overwrite_tense} {dest} by copying {', '.join(sources)}") - if not dry_run: - - def create_job(client): - return client.copy_table( - sources, - dest, - bigquery.CopyJobConfig( - write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE - ), - ) - - wait_for_job( - client=client_q.default_client, - state_table=state_table, - states=states, - task_id=get_task_id( - job_type="copy", target=target, partition_id=partition_id - ), - dry_run=dry_run, - start_date=start_date, - end_date=end_date, - create_job=create_job, - ) - delete_tense = "Would delete" if dry_run else "Deleting" - logging.info(f"{delete_tense} {len(sources)} temp tables") - for table in sources: - logging.debug(f"{delete_tense} {table}") - if not dry_run: - client.delete_table(table) - return sum( - job.total_bytes_processed - for job in jobs - if job.total_bytes_processed is not None - ) + return job.total_bytes_processed def get_partition_expr(table): @@ -375,7 +241,7 @@ def list_partitions(client, target): SELECT partition_id FROM - [{target.sql_table_id.replace('.',':',1)}$__PARTITIONS_SUMMARY__] + [{sql_table_id(target)}$__PARTITIONS_SUMMARY__] """ ).strip(), bigquery.QueryJobConfig(use_legacy_sql=True), @@ -387,10 +253,9 @@ def list_partitions(client, target): async def delete_from_table( client_q, target, dry_run, end_date, max_single_dml_bytes, **kwargs ): - """Process deletion requests for a single target table.""" + """Process deletion requests for a target table.""" client = client_q.default_client - table = client.get_table(target.sql_table_id) - clustering = f"CLUSTER BY {', '.join(table.clustering_fields)}" + table = client.get_table(sql_table_id(target)) partition_expr = get_partition_expr(table) bytes_deleted = 0 bytes_processed = sum( @@ -398,8 +263,6 @@ async def delete_from_table( *[ delete_from_partition( client_q=client_q, - clustering_fields=table.clustering_fields, - clustering=clustering, dry_run=dry_run, partition_condition=( f"{partition_expr} <= '{end_date}'" @@ -415,7 +278,6 @@ async def delete_from_table( for partition_id, partition_date in ( list_partitions(client=client, target=target) if table.num_bytes > max_single_dml_bytes - or target.cluster_conditions else [(None, None)] ) if partition_date is None or partition_date < end_date @@ -426,7 +288,7 @@ async def delete_from_table( logging.info(f"Would scan {bytes_processed} bytes from {target.table}") else: bytes_deleted = ( - table.num_bytes - client.get_table(target.sql_table_id).num_bytes + table.num_bytes - client.get_table(sql_table_id(target)).num_bytes ) logging.info( f"Scanned {bytes_processed} bytes and " @@ -458,7 +320,7 @@ async def main(): CREATE TABLE IF NOT EXISTS `{args.state_table}`( task_id STRING, - job STRING, + job_id STRING, job_started TIMESTAMP, start_date DATE, end_date DATE @@ -467,13 +329,13 @@ async def main(): ).strip() ) states = { - row["task_id"]: row["job"] + row["task_id"]: row["job_id"] for row in client_q.default_client.query( dedent( f""" SELECT task_id, - job, + job_id, FROM `{args.state_table}` WHERE @@ -492,13 +354,7 @@ async def main(): client_q=client_q, executor=executor, target=replace( - target, - project=args.target_project or target.project, - cluster_conditions=( - None - if args.ignore_cluster_conditions - else target.cluster_conditions - ), + target, project=args.target_project or target.project ), source=replace( source, project=args.source_project or source.project diff --git a/bigquery_etl/util/bigquery_id.py b/bigquery_etl/util/bigquery_id.py new file mode 100644 index 0000000000..94264ae438 --- /dev/null +++ b/bigquery_etl/util/bigquery_id.py @@ -0,0 +1,15 @@ +"""Get the string id in various formats for BigQuery resources.""" + +import re + +FULL_JOB_ID_RE = re.compile(r"(?P[^:]+):(?P[^.]+).(?P.+)") + + +def full_job_id(job): + """Get the bq cli format fully qualified id for a job.""" + return f"{job.project}:{job.location}.{job.job_id}" + + +def sql_table_id(table): + """Get the standard sql format fully qualified id for a table.""" + return f"{table.project}.{table.dataset_id}.{table.table_id}" diff --git a/bigquery_etl/util/sql_table_id.py b/bigquery_etl/util/sql_table_id.py deleted file mode 100644 index b9177a1195..0000000000 --- a/bigquery_etl/util/sql_table_id.py +++ /dev/null @@ -1,6 +0,0 @@ -"""Get the standard sql format fully qualified id for a table.""" - - -def sql_table_id(table): - """Get the standard sql format fully qualified id for a table.""" - return f"{table.project}.{table.dataset_id}.{table.table_id}"