DENG-3719: Allow setting billing project for managed backfills (#5605)

* Added default billing project and param
This commit is contained in:
Winnie Chan 2024-05-21 12:27:55 -07:00 коммит произвёл GitHub
Родитель c98b5ddd81
Коммит ce9b8c40c1
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
4 изменённых файлов: 420 добавлений и 42 удалений

Просмотреть файл

@ -14,6 +14,7 @@ from bigquery_etl.query_scheduling.utils import is_email_or_github_identity
BACKFILL_FILE = "backfill.yaml"
DEFAULT_WATCHER = "nobody@mozilla.com"
DEFAULT_REASON = "Please provide a reason for the backfill and links to any related bugzilla or jira tickets"
DEFAULT_BILLING_PROJECT = "moz-fx-data-backfill-slots"
class UniqueKeyLoader(yaml.SafeLoader):
@ -69,6 +70,7 @@ class Backfill:
reason: str = attr.ib()
watchers: List[str] = attr.ib()
status: BackfillStatus = attr.ib()
billing_project: Optional[str] = attr.ib(None)
def __str__(self):
"""Return print friendly string of backfill object."""
@ -147,6 +149,14 @@ class Backfill:
if not hasattr(BackfillStatus, value.name):
raise ValueError(f"Invalid status: {value.name}.")
@billing_project.validator
def validate_billing_project(self, attribute, value):
"""Check that billing project is valid."""
if value and not value.startswith("moz-fx-data-backfill-"):
raise ValueError(
f"Invalid billing project: {value}. Please use one of the projects assigned to backfills."
)
@staticmethod
def is_backfill_file(file_path: Path) -> bool:
"""Check if the provided file is a backfill file."""
@ -177,18 +187,15 @@ class Backfill:
if status is not None and entry["status"] != status:
continue
excluded_dates = []
if "excluded_dates" in entry:
excluded_dates = entry["excluded_dates"]
backfill = cls(
entry_date=entry_date,
start_date=entry["start_date"],
end_date=entry["end_date"],
excluded_dates=excluded_dates,
excluded_dates=entry.get("excluded_dates", []),
reason=entry["reason"],
watchers=entry["watchers"],
status=BackfillStatus[entry["status"].upper()],
billing_project=entry.get("billing_project", None),
)
backfill_entries.append(backfill)
@ -208,12 +215,16 @@ class Backfill:
"reason": self.reason,
"watchers": self.watchers,
"status": self.status.value,
"billing_project": self.billing_project,
}
}
if yaml_dict[self.entry_date]["excluded_dates"] == []:
del yaml_dict[self.entry_date]["excluded_dates"]
if yaml_dict[self.entry_date]["billing_project"] is None:
del yaml_dict[self.entry_date]["billing_project"]
return yaml.dump(
yaml_dict,
sort_keys=False,

Просмотреть файл

@ -1,6 +1,7 @@
"""bigquery-etl CLI backfill command."""
import json
import logging
import subprocess
import sys
import tempfile
@ -15,6 +16,7 @@ from google.cloud.exceptions import Conflict, NotFound
from ..backfill.date_range import BackfillDateRange, get_backfill_partition
from ..backfill.parse import (
BACKFILL_FILE,
DEFAULT_BILLING_PROJECT,
DEFAULT_REASON,
DEFAULT_WATCHER,
Backfill,
@ -46,6 +48,9 @@ from ..cli.utils import (
from ..config import ConfigLoader
from ..metadata.parse_metadata import METADATA_FILE, Metadata
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
@click.group(help="Commands for managing backfills.")
@click.pass_context
@ -102,6 +107,8 @@ def backfill(ctx):
help="Watcher of the backfill (email address)",
default=DEFAULT_WATCHER,
)
# If not specified, the billing project will be set to the default billing project when the backfill is initiated.
@billing_project_option()
@click.pass_context
def create(
ctx,
@ -111,6 +118,7 @@ def create(
end_date,
exclude,
watcher,
billing_project,
):
"""CLI command for creating a new backfill entry in backfill.yaml file.
@ -136,6 +144,7 @@ def create(
reason=DEFAULT_REASON,
watchers=[watcher],
status=BackfillStatus.INITIATE,
billing_project=billing_project,
)
validate_duplicate_entry_with_initiate_status(new_entry, existing_backfills)
@ -364,10 +373,13 @@ def scheduled(ctx, qualified_table_name, sql_dir, project_id, status, json_path=
@project_id_option(
ConfigLoader.get("default", "project", fallback="moz-fx-data-shared-prod")
)
@billing_project_option()
@click.pass_context
def initiate(
ctx, qualified_table_name, parallelism, sql_dir, project_id, billing_project
ctx,
qualified_table_name,
parallelism,
sql_dir,
project_id,
):
"""Process backfill entry with initiate status in backfill.yaml file(s)."""
click.echo("Backfill processing (initiate) started....")
@ -396,9 +408,21 @@ def initiate(
destination_table=backfill_staging_qualified_table_name,
)
billing_project = DEFAULT_BILLING_PROJECT
# override with billing project from backfill entry
if entry_to_initiate.billing_project is not None:
billing_project = entry_to_initiate.billing_project
elif not billing_project.startswith("moz-fx-data-backfill-"):
raise ValueError(
f"Invalid billing project: {billing_project}. Please use one of the projects assigned to backfills."
)
sys.exit(1)
click.echo(
f"\nInitiating backfill for {qualified_table_name} with entry date {entry_to_initiate.entry_date} via dry run:"
)
_initiate_backfill(
ctx,
qualified_table_name,
@ -433,7 +457,7 @@ def _initiate_backfill(
entry: Backfill,
parallelism: int = 16,
dry_run: bool = False,
billing_project=None,
billing_project=DEFAULT_BILLING_PROJECT,
):
if not is_authenticated():
click.echo(
@ -444,6 +468,14 @@ def _initiate_backfill(
project, dataset, table = qualified_table_name_matching(qualified_table_name)
logging_str = f"""Initiating backfill for {qualified_table_name} (destination: {backfill_staging_qualified_table_name}).
Query will be executed in {billing_project}."""
if dry_run:
logging_str += " This is a dry run."
log.info(logging_str)
# backfill table
# in the long-run we should remove the query backfill command and require a backfill entry for all backfills
try:

Просмотреть файл

@ -12,6 +12,7 @@ from bigquery_etl.backfill.parse import (
)
DEFAULT_STATUS = BackfillStatus.INITIATE
DEFAULT_BILLING_PROJECT = "moz-fx-data-backfill-slots"
TEST_DIR = Path(__file__).parent.parent
@ -35,6 +36,17 @@ TEST_BACKFILL_2 = Backfill(
DEFAULT_STATUS,
)
TEST_BACKFILL_3 = Backfill(
date(2021, 5, 3),
date(2021, 1, 3),
date(2021, 5, 3),
[date(2021, 2, 3)],
DEFAULT_REASON,
[DEFAULT_WATCHER],
DEFAULT_STATUS,
DEFAULT_BILLING_PROJECT,
)
class TestParseBackfill(object):
def test_backfill_instantiation(self):
@ -47,6 +59,35 @@ class TestParseBackfill(object):
assert backfill.reason == DEFAULT_REASON
assert backfill.watchers == [DEFAULT_WATCHER]
assert backfill.status == DEFAULT_STATUS
assert backfill.billing_project is None
def test_backfill_instantiation_with_billing_project(self):
backfill = TEST_BACKFILL_3
assert backfill.entry_date == date(2021, 5, 3)
assert backfill.start_date == date(2021, 1, 3)
assert backfill.end_date == date(2021, 5, 3)
assert backfill.excluded_dates == [date(2021, 2, 3)]
assert backfill.reason == DEFAULT_REASON
assert backfill.watchers == [DEFAULT_WATCHER]
assert backfill.status == DEFAULT_STATUS
assert backfill.billing_project == DEFAULT_BILLING_PROJECT
def test_invalid_billing_project(self):
with pytest.raises(ValueError) as e:
invalid_billing_project = "mozdata"
Backfill(
TEST_BACKFILL_1.entry_date,
TEST_BACKFILL_1.start_date,
TEST_BACKFILL_1.end_date,
TEST_BACKFILL_1.excluded_dates,
TEST_BACKFILL_1.reason,
TEST_BACKFILL_1.watchers,
TEST_BACKFILL_1.status,
invalid_billing_project,
)
assert "Invalid billing project" in str(e.value)
def test_invalid_watcher(self):
with pytest.raises(ValueError) as e:

Просмотреть файл

@ -103,6 +103,24 @@ DATASET_METADATA_CONF_EMPTY_WORKGROUP = {
"default_table_workgroup_access": VALID_WORKGROUP_ACCESS,
}
PARTITIONED_TABLE_METADATA = {
"friendly_name": "test",
"description": "test",
"owners": ["test@example.org"],
"workgroup_access": VALID_WORKGROUP_ACCESS,
"bigquery": {
"time_partitioning": {
"type": "day",
"field": "submission_date",
"require_partition_filter": True,
}
},
}
DEFAULT_BILLING_PROJECT = "moz-fx-data-backfill-slots"
VALID_BILLING_PROJECT = "moz-fx-data-backfill-1"
INVALID_BILLING_PROJECT = "mozdata"
class TestBackfill:
@pytest.fixture
@ -152,6 +170,86 @@ class TestBackfill:
assert backfill.watchers == [DEFAULT_WATCHER]
assert backfill.reason == DEFAULT_REASON
assert backfill.status == DEFAULT_STATUS
assert backfill.billing_project is None
def test_create_backfill_with_billing_project(self, runner):
with runner.isolated_filesystem():
SQL_DIR = "sql/moz-fx-data-shared-prod/test/test_query_v1"
os.makedirs(SQL_DIR)
with open(
"sql/moz-fx-data-shared-prod/test/test_query_v1/query.sql", "w"
) as f:
f.write("SELECT 1")
with open(
"sql/moz-fx-data-shared-prod/test/test_query_v1/metadata.yaml",
"w",
) as f:
f.write(yaml.dump(TABLE_METADATA_CONF))
with open(
"sql/moz-fx-data-shared-prod/test/dataset_metadata.yaml", "w"
) as f:
f.write(yaml.dump(DATASET_METADATA_CONF))
result = runner.invoke(
create,
[
"moz-fx-data-shared-prod.test.test_query_v1",
"--start_date=2021-03-01",
f"--billing_project={VALID_BILLING_PROJECT}",
],
)
assert result.exit_code == 0
assert BACKFILL_FILE in os.listdir(
"sql/moz-fx-data-shared-prod/test/test_query_v1"
)
backfill_file = SQL_DIR + "/" + BACKFILL_FILE
backfill = Backfill.entries_from_file(backfill_file)[0]
assert backfill.entry_date == date.today()
assert backfill.start_date == date(2021, 3, 1)
assert backfill.end_date == date.today()
assert backfill.watchers == [DEFAULT_WATCHER]
assert backfill.reason == DEFAULT_REASON
assert backfill.status == DEFAULT_STATUS
assert backfill.billing_project == VALID_BILLING_PROJECT
def test_create_backfill_with_invalid_billing_project_should_fail(self, runner):
with runner.isolated_filesystem():
SQL_DIR = "sql/moz-fx-data-shared-prod/test/test_query_v1"
os.makedirs(SQL_DIR)
with open(
"sql/moz-fx-data-shared-prod/test/test_query_v1/query.sql", "w"
) as f:
f.write("SELECT 1")
with open(
"sql/moz-fx-data-shared-prod/test/test_query_v1/metadata.yaml",
"w",
) as f:
f.write(yaml.dump(TABLE_METADATA_CONF))
with open(
"sql/moz-fx-data-shared-prod/test/dataset_metadata.yaml", "w"
) as f:
f.write(yaml.dump(DATASET_METADATA_CONF))
result = runner.invoke(
create,
[
"moz-fx-data-shared-prod.test.test_query_v1",
"--start_date=2021-03-01",
f"--billing_project={INVALID_BILLING_PROJECT}",
],
)
assert result.exit_code == 1
assert "Invalid billing project" in str(result.exception)
def test_create_backfill_depends_on_past_should_fail(self, runner):
with runner.isolated_filesystem():
@ -442,7 +540,7 @@ class TestBackfill:
assert backfills[1] == backfill_entry_1
assert backfills[0] == backfill_entry_2
def test_create_backfill_with_exsting_entry_with_initiate_status_should_fail(
def test_create_backfill_with_existing_entry_with_initiate_status_should_fail(
self, runner
):
with runner.isolated_filesystem():
@ -535,6 +633,79 @@ class TestBackfill:
)
assert result.exit_code == 0
def test_validate_backfill_with_billing_project(self, runner):
with runner.isolated_filesystem():
SQL_DIR = "sql/moz-fx-data-shared-prod/test/test_query_v1"
os.makedirs(SQL_DIR)
with open(
"sql/moz-fx-data-shared-prod/test/test_query_v1/query.sql", "w"
) as f:
f.write("SELECT 1")
with open(
"sql/moz-fx-data-shared-prod/test/test_query_v1/metadata.yaml",
"w",
) as f:
f.write(yaml.dump(TABLE_METADATA_CONF))
with open(
"sql/moz-fx-data-shared-prod/test/dataset_metadata.yaml", "w"
) as f:
f.write(yaml.dump(DATASET_METADATA_CONF))
backfill_file = Path(SQL_DIR) / BACKFILL_FILE
backfill_text = (
BACKFILL_YAML_TEMPLATE + f" billing_project: {VALID_BILLING_PROJECT}"
)
backfill_file.write_text(backfill_text)
assert BACKFILL_FILE in os.listdir(SQL_DIR)
result = runner.invoke(
validate,
[
"moz-fx-data-shared-prod.test.test_query_v1",
],
)
assert result.exit_code == 0
def test_validate_backfill_with_invalid_billing_project_should_fail(self, runner):
with runner.isolated_filesystem():
SQL_DIR = "sql/moz-fx-data-shared-prod/test/test_query_v1"
os.makedirs(SQL_DIR)
with open(
"sql/moz-fx-data-shared-prod/test/test_query_v1/query.sql", "w"
) as f:
f.write("SELECT 1")
with open(
"sql/moz-fx-data-shared-prod/test/test_query_v1/metadata.yaml",
"w",
) as f:
f.write(yaml.dump(TABLE_METADATA_CONF))
with open(
"sql/moz-fx-data-shared-prod/test/dataset_metadata.yaml", "w"
) as f:
f.write(yaml.dump(DATASET_METADATA_CONF))
backfill_file = Path(SQL_DIR) / BACKFILL_FILE
backfill_text = (
BACKFILL_YAML_TEMPLATE + f" billing_project: {INVALID_BILLING_PROJECT}"
)
backfill_file.write_text(backfill_text)
assert BACKFILL_FILE in os.listdir(SQL_DIR)
result = runner.invoke(
validate,
[
"moz-fx-data-shared-prod.test.test_query_v1",
],
)
assert result.exit_code == 1
assert "Invalid billing project" in str(result.exception)
def test_validate_backfill_depends_on_past_should_fail(self, runner):
with runner.isolated_filesystem():
SQL_DIR = "sql/moz-fx-data-shared-prod/test/test_query_v1"
@ -1993,20 +2164,6 @@ class TestBackfill:
copy_table.side_effect = None
delete_table.side_effect = None
partitioned_table_metadata = {
"friendly_name": "test",
"description": "test",
"owners": ["test@example.org"],
"workgroup_access": VALID_WORKGROUP_ACCESS,
"bigquery": {
"time_partitioning": {
"type": "day",
"field": "submission_date",
"require_partition_filter": True,
}
},
}
with runner.isolated_filesystem():
SQL_DIR = "sql/moz-fx-data-shared-prod/test/test_query_v1"
os.makedirs(SQL_DIR)
@ -2020,7 +2177,7 @@ class TestBackfill:
"sql/moz-fx-data-shared-prod/test/test_query_v1/metadata.yaml",
"w",
) as f:
f.write(yaml.dump(partitioned_table_metadata))
f.write(yaml.dump(PARTITIONED_TABLE_METADATA))
with open(
"sql/moz-fx-data-shared-prod/test/dataset_metadata.yaml", "w"
@ -2056,10 +2213,10 @@ class TestBackfill:
)
assert delete_table.call_count == 1
@patch("google.cloud.bigquery.Client.get_table")
@patch("google.cloud.bigquery.Client")
@patch("subprocess.check_call")
def test_initiate_partitioned_backfill(self, check_call, get_table, runner):
get_table.side_effect = [
def test_initiate_partitioned_backfill(self, check_call, mock_client, runner):
mock_client().get_table.side_effect = [
NotFound( # Check that staging data does not exist
"moz-fx-data-shared-prod.backfills_staging_derived.test_query_v1_backup_2021_05_03"
"not found"
@ -2068,20 +2225,6 @@ class TestBackfill:
None, # Check that production data exists
]
partitioned_table_metadata = {
"friendly_name": "test",
"description": "test",
"owners": ["test@example.org"],
"workgroup_access": VALID_WORKGROUP_ACCESS,
"bigquery": {
"time_partitioning": {
"type": "day",
"field": "submission_date",
"require_partition_filter": True,
}
},
}
with runner.isolated_filesystem():
SQL_DIR = "sql/moz-fx-data-shared-prod/test/test_query_v1"
os.makedirs(SQL_DIR)
@ -2095,7 +2238,7 @@ class TestBackfill:
"sql/moz-fx-data-shared-prod/test/test_query_v1/metadata.yaml",
"w",
) as f:
f.write(yaml.dump(partitioned_table_metadata))
f.write(yaml.dump(PARTITIONED_TABLE_METADATA))
with open(
"sql/moz-fx-data-shared-prod/test/dataset_metadata.yaml", "w"
@ -2126,6 +2269,12 @@ class TestBackfill:
for day in range(3, 9)
]
expected_destination_table_params = [
f"--destination_table=moz-fx-data-shared-prod:backfills_staging_derived.test_query_v1_2021_05_03$2021010{day}"
for day in range(3, 9)
]
# this is inspecting calls to the underlying subprocess.check_call(["bq]"...)
assert check_call.call_count == 12 # 6 for dry run, 6 for backfill
for call in check_call.call_args_list:
submission_date_params = [
@ -2133,3 +2282,148 @@ class TestBackfill:
]
assert len(submission_date_params) == 1
assert submission_date_params[0] in expected_submission_date_params
assert f"--project_id={DEFAULT_BILLING_PROJECT}" in call.args[0]
destination_table_params = [
arg for arg in call.args[0] if "--destination_table" in arg
]
assert destination_table_params[0] in expected_destination_table_params
@patch("google.cloud.bigquery.Client")
@patch("subprocess.check_call")
def test_initiate_partitioned_backfill_with_valid_billing_project_from_entry(
self, check_call, mock_client, runner
):
mock_client().get_table.side_effect = [
NotFound( # Check that staging data does not exist
"moz-fx-data-shared-prod.backfills_staging_derived.test_query_v1_backup_2021_05_03"
"not found"
),
None, # Check that production data exists during dry run
None, # Check that production data exists
]
with runner.isolated_filesystem():
SQL_DIR = "sql/moz-fx-data-shared-prod/test/test_query_v1"
os.makedirs(SQL_DIR)
with open(
"sql/moz-fx-data-shared-prod/test/test_query_v1/query.sql", "w"
) as f:
f.write("SELECT 1")
with open(
"sql/moz-fx-data-shared-prod/test/test_query_v1/metadata.yaml",
"w",
) as f:
f.write(yaml.dump(PARTITIONED_TABLE_METADATA))
with open(
"sql/moz-fx-data-shared-prod/test/dataset_metadata.yaml", "w"
) as f:
f.write(yaml.dump(DATASET_METADATA_CONF_EMPTY_WORKGROUP))
backfill_file = Path(SQL_DIR) / BACKFILL_FILE
backfill_file.write_text(
f"""
2021-05-03:
start_date: 2021-01-03
end_date: 2021-01-08
reason: test_reason
watchers:
- test@example.org
status: Initiate
billing_project: {VALID_BILLING_PROJECT}
"""
)
result = runner.invoke(
initiate,
[
"moz-fx-data-shared-prod.test.test_query_v1",
"--parallelism=0",
],
)
assert result.exit_code == 0
expected_submission_date_params = [
f"--parameter=submission_date:DATE:2021-01-0{day}"
for day in range(3, 9)
]
expected_destination_table_params = [
f"--destination_table=moz-fx-data-shared-prod:backfills_staging_derived.test_query_v1_2021_05_03$2021010{day}"
for day in range(3, 9)
]
# this is inspecting calls to the underlying subprocess.check_call(["bq]"...)
assert check_call.call_count == 12 # 6 for dry run, 6 for backfill
for call in check_call.call_args_list:
submission_date_params = [
arg for arg in call.args[0] if "--parameter=submission_date" in arg
]
assert len(submission_date_params) == 1
assert submission_date_params[0] in expected_submission_date_params
assert f"--project_id={VALID_BILLING_PROJECT}" in call.args[0]
destination_table_params = [
arg for arg in call.args[0] if "--destination_table" in arg
]
assert destination_table_params[0] in expected_destination_table_params
@patch("google.cloud.bigquery.Client")
def test_initiate_partitioned_backfill_with_invalid_billing_project_from_entry_should_fail(
self, mock_client, runner
):
mock_client().get_table.side_effect = [
NotFound( # Check that staging data does not exist
"moz-fx-data-shared-prod.backfills_staging_derived.test_query_v1_backup_2021_05_03"
"not found"
),
None, # Check that production data exists during dry run
None, # Check that production data exists
]
with runner.isolated_filesystem():
SQL_DIR = "sql/moz-fx-data-shared-prod/test/test_query_v1"
os.makedirs(SQL_DIR)
with open(
"sql/moz-fx-data-shared-prod/test/test_query_v1/query.sql", "w"
) as f:
f.write("SELECT 1")
with open(
"sql/moz-fx-data-shared-prod/test/test_query_v1/metadata.yaml",
"w",
) as f:
f.write(yaml.dump(PARTITIONED_TABLE_METADATA))
with open(
"sql/moz-fx-data-shared-prod/test/dataset_metadata.yaml", "w"
) as f:
f.write(yaml.dump(DATASET_METADATA_CONF_EMPTY_WORKGROUP))
backfill_file = Path(SQL_DIR) / BACKFILL_FILE
backfill_file.write_text(
f"""
2021-05-03:
start_date: 2021-01-03
end_date: 2021-01-08
reason: test_reason
watchers:
- test@example.org
status: Initiate
billing_project: {INVALID_BILLING_PROJECT}
"""
)
result = runner.invoke(
initiate,
[
"moz-fx-data-shared-prod.test.test_query_v1",
"--parallelism=0",
],
)
assert result.exit_code == 1
assert "Invalid billing project" in str(result.exception)