Remove support for cluster handling in shredder (#733)

This commit is contained in:
Daniel Thorn 2020-02-18 21:07:40 +01:00 коммит произвёл GitHub
Родитель 2590b1bd78
Коммит 7864154807
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 82 добавлений и 269 удалений

Просмотреть файл

@ -14,3 +14,13 @@ history every day, which had started to take on order 1 hour to run. The
v2 tables instead define a `day_0` view and a `day_13` view and relies on
the Growth and Usage Dashboard (GUD) to query them separately and join the
results together at query time.
## Shredder support for per-cluster deletes
- [Removal PR](https://github.com/mozilla/bigquery-etl/pull/733)
For `telemetry_stable.main_v4` shredder used `SELECT` statements over single
clusters, then combined the result to remove rows from the table. This was an
attempt to improve performance so that reserved slots would be cheaper than
on-demand pricing, but it turned out to be slower than using `DELETE`
statements for whole partitions.

Просмотреть файл

@ -4,10 +4,6 @@
from dataclasses import dataclass
from functools import partial
from itertools import chain
from typing import Optional, Tuple
from ..util.sql_table_id import sql_table_id
SHARED_PROD = "moz-fx-data-shared-prod"
@ -31,52 +27,18 @@ class DeleteSource:
"""Dataset Id."""
return self.table.split(".", 1)[0]
@property
def sql_table_id(self):
"""Make sql_table_id available as a property for easier templating."""
return sql_table_id(self)
@dataclass(frozen=True)
class ClusterCondition:
"""Data class for cluster condition."""
condition: str
needs_clustering: bool
@dataclass(frozen=True)
class DeleteTarget:
"""Data class for deletion request target.
Without cluster conditions rows will be removed using either one DELETE
statement for the whole table, or one DELETE statement per partition if the
table is larger than some configurable threshold.
When provided cluster conditions are used to divide up deletes into parts
smaller than partitions. This is a mitigation specifically for main_v4
because it has thousands of sparsely populated columns and partitions in
excess of 10TiB, resulting in very slow DELETE performance that could
exceed 6 hours for a single partition and makes flat-rate pricing more
expensive than on-demand pricing.
To improve performance vs DELETE operations, cluster conditions can set
needs_clustering to False to avoid the overhead of clustering results when
the condition identifies a single cluster.
Each cluster condition is used with a SELECT statement to extract rows from
the target table into an intermediate table while filtering out rows with
deletion requests. The intermediate tables are then combined using a copy
operation to overwrite target table partitions.
This means that every row must be covered by precisely one cluster
condition. Any rows not covered by a cluster condition would be dropped,
and any rows covered by multiple conditions would be duplicated.
Rows will be removed using either one DELETE statement for the whole table,
or one DELETE statement per partition if the table is larger than some
configurable threshold.
"""
table: str
field: str
cluster_conditions: Optional[Tuple[ClusterCondition, ...]] = None
project: str = SHARED_PROD
@property
@ -89,11 +51,6 @@ class DeleteTarget:
"""Dataset Id."""
return self.table.split(".", 1)[0]
@property
def sql_table_id(self):
"""Make sql_table_id available as a property for easier templating."""
return sql_table_id(self)
CLIENT_ID = "client_id"
GLEAN_CLIENT_ID = "client_info.client_id"
@ -196,26 +153,7 @@ DELETE_TARGETS = {
client_id_target(table="telemetry_stable.frecency_update_v4"): DESKTOP_SRC,
client_id_target(table="telemetry_stable.health_v4"): DESKTOP_SRC,
client_id_target(table="telemetry_stable.heartbeat_v4"): DESKTOP_SRC,
client_id_target(
table="telemetry_stable.main_v4",
cluster_conditions=tuple(
ClusterCondition(condition, needs_clustering)
for condition, needs_clustering in chain(
{
f"sample_id = {sample_id} AND normalized_channel = 'release'": False
for sample_id in range(100)
}.items(),
[
(
"(sample_id IS NULL "
"OR normalized_channel IS NULL "
"OR normalized_channel != 'release')",
True,
)
],
)
),
): DESKTOP_SRC,
client_id_target(table="telemetry_stable.main_v4"): DESKTOP_SRC,
client_id_target(table="telemetry_stable.modules_v4"): DESKTOP_SRC,
client_id_target(table="telemetry_stable.new_profile_v4"): DESKTOP_SRC,
client_id_target(table="telemetry_stable.saved_session_v4"): DESKTOP_SRC,

Просмотреть файл

@ -12,11 +12,10 @@ import warnings
from google.cloud import bigquery
from ..util.bigquery_id import FULL_JOB_ID_RE, full_job_id, sql_table_id
from ..util.client_queue import ClientQueue
from ..util.temp_table import get_temporary_table
from ..util.table_filter import add_table_filter_arguments, get_table_filter
from ..util.sql_table_id import sql_table_id
from .config import ClusterCondition, DELETE_TARGETS
from .config import DELETE_TARGETS
parser = ArgumentParser(description=__doc__)
@ -106,57 +105,13 @@ parser.add_argument(
help="Table for recording state; Used to avoid repeating deletes if interrupted; "
"Create it if it does not exist; By default state is not recorded",
)
parser.add_argument(
"--ignore-cluster-conditions",
"--ignore_cluster_conditions",
action="store_true",
help="Ignore cluster conditions; Used to process main_v4 using DELETE queries; "
"Should be combined with --billing-projects that use on-demand pricing",
)
add_table_filter_arguments(parser)
WHERE_CLAUSE = """
WHERE
{target.field} IN (
SELECT DISTINCT
{source.field}
FROM
`{source.sql_table_id}`
WHERE
{source_condition}
)
AND {partition_condition}
"""
DELETE_TEMPLATE = f"""
DELETE
`{{target.sql_table_id}}`
{WHERE_CLAUSE.strip()}
"""
SELECT_TEMPLATE = f"""
CREATE TABLE
`{{destination.project}}.{{destination.dataset_id}}.{{destination.table_id}}`
PARTITION BY
{{partition_expr}}
{{clustering}}
OPTIONS (
expiration_timestamp = '{{expiration_timestamp}}'
)
AS
SELECT
*
FROM
`{{target.sql_table_id}}`
{WHERE_CLAUSE.strip()}
AND {{cluster_condition}}
"""
def record_state(client, state_table, task_id, job, dry_run, start_date, end_date):
def record_state(client, task_id, job, dry_run, start_date, end_date, state_table):
"""Record the job for task_id in state_table."""
if state_table is not None:
job_id = "a job_id" if dry_run else f"{job.project}.{job.location}.{job.job_id}"
job_id = "a job_id" if dry_run else full_job_id(job)
insert_tense = "Would insert" if dry_run else "Inserting"
logging.info(f"{insert_tense} {job_id} in {state_table} for task: {task_id}")
if not dry_run:
@ -184,69 +139,69 @@ def record_state(client, state_table, task_id, job, dry_run, start_date, end_dat
).result()
def wait_for_job(
client, state_table, states, task_id, dry_run, start_date, end_date, create_job
):
def wait_for_job(client, states, task_id, dry_run, create_job, **state_kwargs):
"""Get a job from state or create a new job, and wait for the job to complete."""
job = states.get(task_id)
if job:
project, location, job_id = job.split(".")
job = client.get_job(job_id, project, location)
job = None
if task_id in states:
job = client.get_job(**FULL_JOB_ID_RE.fullmatch(states[task_id]).groupdict())
if job.errors:
logging.info(f"Previous attempt failed, retrying: {task_id}")
logging.info(f"Previous attempt failed, retrying for {task_id}")
job = None
elif job.ended:
logging.info(f"Previous attempt succeeded, reusing result: {task_id}")
logging.info(f"Previous attempt succeeded, reusing result for {task_id}")
else:
logging.info(f"Previous attempt still running: {task_id}")
logging.info(f"Previous attempt still running for {task_id}")
if job is None:
job = create_job(client)
record_state(client, state_table, task_id, job, dry_run, start_date, end_date)
record_state(
client=client, task_id=task_id, dry_run=dry_run, job=job, **state_kwargs
)
if not dry_run and not job.ended:
logging.info(f"Waiting on {job.project}.{job.location}.{job.job_id}: {task_id}")
logging.info(f"Waiting on {full_job_id(job)} for {task_id}")
job.result()
return job
def get_task_id(job_type, target, partition_id=None, cluster_condition=None):
def get_task_id(target, partition_id):
"""Get unique task id for state tracking."""
task_id = f"{job_type} for {target.sql_table_id}"
task_id = sql_table_id(target)
if partition_id:
task_id += f"${partition_id}"
if cluster_condition:
task_id += f" where {cluster_condition}"
return task_id
async def delete_from_cluster(
async def delete_from_partition(
executor,
client_q,
dry_run,
priority,
states,
state_table,
target,
partition_condition,
partition_id,
cluster_condition,
clustering_fields,
update_clustering,
start_date,
end_date,
**template_kwargs,
priority,
source,
source_condition,
target,
**wait_for_job_kwargs,
):
"""Process deletion requests for a cluster condition on a partition."""
"""Process deletion requests for partitions of a target table."""
# noqa: D202
def create_job(client):
if cluster_condition is None:
query = DELETE_TEMPLATE.format(target=target, **template_kwargs)
else:
query = SELECT_TEMPLATE.format(
destination=get_temporary_table(client),
cluster_condition=cluster_condition,
target=target,
**template_kwargs,
)
query = dedent(
f"""
DELETE
`{sql_table_id(target)}`
WHERE
{target.field} IN (
SELECT DISTINCT
{source.field}
FROM
`{sql_table_id(source)}`
WHERE
{source_condition}
)
AND {partition_condition}
"""
).strip()
run_tense = "Would run" if dry_run else "Running"
logging.debug(f"{run_tense} query: {query}")
return client.query(
@ -257,102 +212,13 @@ async def delete_from_cluster(
executor,
partial(
wait_for_job,
state_table=state_table,
states=states,
task_id=get_task_id("delete", target, partition_id, cluster_condition),
dry_run=dry_run,
start_date=start_date,
end_date=end_date,
create_job=create_job,
dry_run=dry_run,
task_id=get_task_id(target, partition_id),
**wait_for_job_kwargs,
),
)
if update_clustering:
destination = sql_table_id(job.destination)
if dry_run:
logging.debug(f"Would update clustering on {destination}")
else:
table = client_q.default_client.get_table(job.destination)
if table.clustering_fields != clustering_fields:
logging.debug(f"Updating clustering on {destination}")
table.clustering_fields = clustering_fields
client_q.default_client.update_table(table, ["clustering"])
return job
async def delete_from_partition(
client_q,
dry_run,
target,
partition_id,
clustering,
start_date,
end_date,
state_table,
states,
**kwargs,
):
"""Process deletion requests for a single partition of a target table."""
client = client_q.default_client
jobs = await asyncio.gather(
*[
delete_from_cluster(
client_q=client_q,
dry_run=dry_run,
target=target,
partition_id=partition_id,
clustering=(clustering if cluster.needs_clustering else ""),
start_date=start_date,
end_date=end_date,
state_table=state_table,
states=states,
update_clustering=cluster.needs_clustering is False,
cluster_condition=cluster.condition,
**kwargs,
)
for cluster in (target.cluster_conditions or [ClusterCondition(None, None)])
]
)
if target.cluster_conditions:
# copy results into place and delete temp tables
sources = [sql_table_id(job.destination) for job in jobs]
dest = f"{target.sql_table_id}${partition_id}"
overwrite_tense = "Would overwrite" if dry_run else "Overwriting"
logging.info(f"{overwrite_tense} {dest} by copying {len(sources)} temp tables")
logging.debug(f"{overwrite_tense} {dest} by copying {', '.join(sources)}")
if not dry_run:
def create_job(client):
return client.copy_table(
sources,
dest,
bigquery.CopyJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE
),
)
wait_for_job(
client=client_q.default_client,
state_table=state_table,
states=states,
task_id=get_task_id(
job_type="copy", target=target, partition_id=partition_id
),
dry_run=dry_run,
start_date=start_date,
end_date=end_date,
create_job=create_job,
)
delete_tense = "Would delete" if dry_run else "Deleting"
logging.info(f"{delete_tense} {len(sources)} temp tables")
for table in sources:
logging.debug(f"{delete_tense} {table}")
if not dry_run:
client.delete_table(table)
return sum(
job.total_bytes_processed
for job in jobs
if job.total_bytes_processed is not None
)
return job.total_bytes_processed
def get_partition_expr(table):
@ -375,7 +241,7 @@ def list_partitions(client, target):
SELECT
partition_id
FROM
[{target.sql_table_id.replace('.',':',1)}$__PARTITIONS_SUMMARY__]
[{sql_table_id(target)}$__PARTITIONS_SUMMARY__]
"""
).strip(),
bigquery.QueryJobConfig(use_legacy_sql=True),
@ -387,10 +253,9 @@ def list_partitions(client, target):
async def delete_from_table(
client_q, target, dry_run, end_date, max_single_dml_bytes, **kwargs
):
"""Process deletion requests for a single target table."""
"""Process deletion requests for a target table."""
client = client_q.default_client
table = client.get_table(target.sql_table_id)
clustering = f"CLUSTER BY {', '.join(table.clustering_fields)}"
table = client.get_table(sql_table_id(target))
partition_expr = get_partition_expr(table)
bytes_deleted = 0
bytes_processed = sum(
@ -398,8 +263,6 @@ async def delete_from_table(
*[
delete_from_partition(
client_q=client_q,
clustering_fields=table.clustering_fields,
clustering=clustering,
dry_run=dry_run,
partition_condition=(
f"{partition_expr} <= '{end_date}'"
@ -415,7 +278,6 @@ async def delete_from_table(
for partition_id, partition_date in (
list_partitions(client=client, target=target)
if table.num_bytes > max_single_dml_bytes
or target.cluster_conditions
else [(None, None)]
)
if partition_date is None or partition_date < end_date
@ -426,7 +288,7 @@ async def delete_from_table(
logging.info(f"Would scan {bytes_processed} bytes from {target.table}")
else:
bytes_deleted = (
table.num_bytes - client.get_table(target.sql_table_id).num_bytes
table.num_bytes - client.get_table(sql_table_id(target)).num_bytes
)
logging.info(
f"Scanned {bytes_processed} bytes and "
@ -458,7 +320,7 @@ async def main():
CREATE TABLE IF NOT EXISTS
`{args.state_table}`(
task_id STRING,
job STRING,
job_id STRING,
job_started TIMESTAMP,
start_date DATE,
end_date DATE
@ -467,13 +329,13 @@ async def main():
).strip()
)
states = {
row["task_id"]: row["job"]
row["task_id"]: row["job_id"]
for row in client_q.default_client.query(
dedent(
f"""
SELECT
task_id,
job,
job_id,
FROM
`{args.state_table}`
WHERE
@ -492,13 +354,7 @@ async def main():
client_q=client_q,
executor=executor,
target=replace(
target,
project=args.target_project or target.project,
cluster_conditions=(
None
if args.ignore_cluster_conditions
else target.cluster_conditions
),
target, project=args.target_project or target.project
),
source=replace(
source, project=args.source_project or source.project

Просмотреть файл

@ -0,0 +1,15 @@
"""Get the string id in various formats for BigQuery resources."""
import re
FULL_JOB_ID_RE = re.compile(r"(?P<project>[^:]+):(?P<location>[^.]+).(?P<job_id>.+)")
def full_job_id(job):
"""Get the bq cli format fully qualified id for a job."""
return f"{job.project}:{job.location}.{job.job_id}"
def sql_table_id(table):
"""Get the standard sql format fully qualified id for a table."""
return f"{table.project}.{table.dataset_id}.{table.table_id}"

Просмотреть файл

@ -1,6 +0,0 @@
"""Get the standard sql format fully qualified id for a table."""
def sql_table_id(table):
"""Get the standard sql format fully qualified id for a table."""
return f"{table.project}.{table.dataset_id}.{table.table_id}"