DENG-2950 - Rename drafting -> initiate, modify json output of scheduled command (#5164)
* Update json write for scheduled commands to include date and watchers for DAG * Rename drafting to initiate
This commit is contained in:
Родитель
efc0862ff3
Коммит
027eb69562
|
@ -49,7 +49,7 @@ yaml.add_representer(Literal, literal_presenter)
|
|||
class BackfillStatus(enum.Enum):
|
||||
"""Represents backfill status types."""
|
||||
|
||||
DRAFTING = "Drafting"
|
||||
INITIATE = "Initiate"
|
||||
VALIDATED = "Validated"
|
||||
COMPLETE = "Complete"
|
||||
|
||||
|
|
|
@ -181,10 +181,10 @@ def qualified_table_name_matching(qualified_table_name) -> Tuple[str, str, str]:
|
|||
return project_id, dataset_id, table_id
|
||||
|
||||
|
||||
def get_backfill_entries_to_process_dict(
|
||||
def get_backfill_entries_to_initiate(
|
||||
sql_dir, project, qualified_table_name=None
|
||||
) -> Dict[str, Backfill]:
|
||||
"""Return backfill entries that require processing."""
|
||||
"""Return backfill entries to initiate."""
|
||||
try:
|
||||
bigquery.Client(project="")
|
||||
except DefaultCredentialsError:
|
||||
|
@ -198,12 +198,12 @@ def get_backfill_entries_to_process_dict(
|
|||
if qualified_table_name:
|
||||
backfills_dict = {
|
||||
qualified_table_name: get_entries_from_qualified_table_name(
|
||||
sql_dir, qualified_table_name, BackfillStatus.DRAFTING.value
|
||||
sql_dir, qualified_table_name, BackfillStatus.INITIATE.value
|
||||
)
|
||||
}
|
||||
else:
|
||||
backfills_dict = get_qualified_table_name_to_entries_map_by_project(
|
||||
sql_dir, project, BackfillStatus.DRAFTING.value
|
||||
sql_dir, project, BackfillStatus.INITIATE.value
|
||||
)
|
||||
|
||||
backfills_to_process_dict = {}
|
||||
|
@ -221,7 +221,7 @@ def get_backfill_entries_to_process_dict(
|
|||
sys.exit(1)
|
||||
elif (len(entries)) > 1:
|
||||
click.echo(
|
||||
f"There should not be more than one entry in backfill.yaml file with status: {BackfillStatus.DRAFTING} "
|
||||
f"There should not be more than one entry in backfill.yaml file with status: {BackfillStatus.INITIATE} "
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
|
|
|
@ -68,10 +68,10 @@ def validate_entries(backfills: list) -> None:
|
|||
validate_reason(backfill_entry_1)
|
||||
validate_excluded_dates(backfill_entry_1)
|
||||
|
||||
# validate against other entries with drafting status
|
||||
if backfill_entry_1.status == BackfillStatus.DRAFTING:
|
||||
# validate against other entries with initiate status
|
||||
if backfill_entry_1.status == BackfillStatus.INITIATE:
|
||||
for backfill_entry_2 in backfills[i + 1 :]:
|
||||
if backfill_entry_2.status == BackfillStatus.DRAFTING:
|
||||
if backfill_entry_2.status == BackfillStatus.INITIATE:
|
||||
validate_duplicate_entry_dates(backfill_entry_1, backfill_entry_2)
|
||||
validate_overlap_dates(backfill_entry_1, backfill_entry_2)
|
||||
|
||||
|
|
|
@ -21,7 +21,7 @@ from ..backfill.parse import (
|
|||
from ..backfill.utils import (
|
||||
BACKFILL_DESTINATION_DATASET,
|
||||
BACKFILL_DESTINATION_PROJECT,
|
||||
get_backfill_entries_to_process_dict,
|
||||
get_backfill_entries_to_initiate,
|
||||
get_backfill_file_from_qualified_table_name,
|
||||
get_backfill_staging_qualified_table_name,
|
||||
get_entries_from_qualified_table_name,
|
||||
|
@ -124,12 +124,12 @@ def create(
|
|||
excluded_dates=[e.date() for e in list(exclude)],
|
||||
reason=DEFAULT_REASON,
|
||||
watchers=[watcher],
|
||||
status=BackfillStatus.DRAFTING,
|
||||
status=BackfillStatus.INITIATE,
|
||||
)
|
||||
|
||||
for existing_entry in existing_backfills:
|
||||
validate_duplicate_entry_dates(new_entry, existing_entry)
|
||||
if existing_entry.status == BackfillStatus.DRAFTING:
|
||||
if existing_entry.status == BackfillStatus.INITIATE:
|
||||
validate_overlap_dates(new_entry, existing_entry)
|
||||
|
||||
existing_backfills.insert(0, new_entry)
|
||||
|
@ -228,7 +228,7 @@ def validate(
|
|||
|
||||
\b
|
||||
# Get info from all tables with specific status.
|
||||
./bqetl backfill info --status=Drafting
|
||||
./bqetl backfill info --status=Initiate
|
||||
""",
|
||||
)
|
||||
@click.argument("qualified_table_name", required=False)
|
||||
|
@ -292,31 +292,42 @@ def info(ctx, qualified_table_name, sql_dir, project_id, status):
|
|||
@project_id_option(
|
||||
ConfigLoader.get("default", "project", fallback="moz-fx-data-shared-prod")
|
||||
)
|
||||
@click.option(
|
||||
"--status",
|
||||
type=click.Choice([s.value for s in BackfillStatus]),
|
||||
default=BackfillStatus.INITIATE.value,
|
||||
help="Whether to get backfills to process or to complete.",
|
||||
)
|
||||
@click.option("--json_path", type=click.Path())
|
||||
@click.pass_context
|
||||
def scheduled(ctx, qualified_table_name, sql_dir, project_id, json_path=None):
|
||||
def scheduled(ctx, qualified_table_name, sql_dir, project_id, status, json_path=None):
|
||||
"""Return list of backfill(s) that require processing."""
|
||||
total_backfills_count = 0
|
||||
match status:
|
||||
case BackfillStatus.INITIATE.value:
|
||||
backfills = get_backfill_entries_to_initiate(
|
||||
sql_dir, project_id, qualified_table_name
|
||||
)
|
||||
case BackfillStatus.COMPLETE.value:
|
||||
raise NotImplementedError("Placeholder - TODO")
|
||||
case _:
|
||||
raise ValueError(f"Invalid status status {status}.")
|
||||
|
||||
backfills_to_process_dict = get_backfill_entries_to_process_dict(
|
||||
sql_dir, project_id, qualified_table_name
|
||||
)
|
||||
for qualified_table_name, entry in backfills.items():
|
||||
click.echo(f"Backfill scheduled for {qualified_table_name}:\n{entry}")
|
||||
|
||||
for qualified_table_name, entry_to_process in backfills_to_process_dict.items():
|
||||
total_backfills_count += 1
|
||||
click.echo(f"{len(backfills)} backfill(s) require processing.")
|
||||
|
||||
click.echo(f"Backfill entry scheduled for {qualified_table_name}:")
|
||||
if backfills and json_path is not None:
|
||||
formatted_backfills = [
|
||||
{
|
||||
"qualified_table_name": qualified_table_name,
|
||||
"entry_date": entry.entry_date.strftime("%Y-%m-%d"),
|
||||
"watchers": entry.watchers,
|
||||
}
|
||||
for qualified_table_name, entry in backfills.items()
|
||||
]
|
||||
|
||||
# For future us: this will probably end up being a write to something machine-readable for automation to pick up
|
||||
click.echo(str(entry_to_process))
|
||||
|
||||
click.echo(
|
||||
f"\nThere are a total of {total_backfills_count} backfill(s) that require processing."
|
||||
)
|
||||
|
||||
if backfills_to_process_dict and json_path is not None:
|
||||
scheduled_backfills_json = json.dumps(list(backfills_to_process_dict.keys()))
|
||||
Path(json_path).write_text(scheduled_backfills_json)
|
||||
Path(json_path).write_text(json.dumps(formatted_backfills))
|
||||
|
||||
|
||||
@backfill.command(
|
||||
|
@ -348,7 +359,7 @@ def process(ctx, qualified_table_name, sql_dir, project_id, dry_run):
|
|||
"""Process backfill entry with drafting status in backfill.yaml file(s)."""
|
||||
click.echo("Backfill processing initiated....")
|
||||
|
||||
backfills_to_process_dict = get_backfill_entries_to_process_dict(
|
||||
backfills_to_process_dict = get_backfill_entries_to_initiate(
|
||||
sql_dir, project_id, qualified_table_name
|
||||
)
|
||||
|
||||
|
|
|
@ -4,4 +4,4 @@
|
|||
reason: This is mostly a test backfill, this table will replace newtab_interactions_v1 eventually
|
||||
watchers:
|
||||
- anicholson@mozilla.com
|
||||
status: Drafting
|
||||
status: Initiate
|
||||
|
|
|
@ -6,4 +6,4 @@
|
|||
reason: Please provide a reason for the backfill and links to any related bugzilla or jira tickets
|
||||
watchers:
|
||||
- nobody@mozilla.com
|
||||
status: Drafting
|
||||
status: Initiate
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
reason: Please provide a reason for the backfill and links to any related bugzilla or jira tickets
|
||||
watchers:
|
||||
- nobody@mozilla.com
|
||||
status: Drafting
|
||||
status: Initiate
|
||||
|
||||
2021-05-03:
|
||||
start_date: 2021-01-03
|
||||
|
@ -16,4 +16,4 @@
|
|||
reason: Please provide a reason for the backfill and links to any related bugzilla or jira tickets
|
||||
watchers:
|
||||
- nobody@mozilla.com
|
||||
status: Drafting
|
||||
status: Initiate
|
||||
|
|
|
@ -6,4 +6,4 @@
|
|||
reason: no_reason
|
||||
watchers:
|
||||
- test@example.org
|
||||
status: Drafting
|
||||
status: Initiate
|
||||
|
|
|
@ -11,7 +11,7 @@ from bigquery_etl.backfill.parse import (
|
|||
BackfillStatus,
|
||||
)
|
||||
|
||||
DEFAULT_STATUS = BackfillStatus.DRAFTING
|
||||
DEFAULT_STATUS = BackfillStatus.INITIATE
|
||||
|
||||
TEST_DIR = Path(__file__).parent.parent
|
||||
|
||||
|
@ -333,7 +333,7 @@ class TestParseBackfill(object):
|
|||
" or jira tickets\n"
|
||||
" watchers:\n"
|
||||
" - nobody@mozilla.com\n"
|
||||
" status: Drafting\n"
|
||||
" status: Initiate\n"
|
||||
)
|
||||
|
||||
results = TEST_BACKFILL_1.to_yaml()
|
||||
|
@ -349,7 +349,7 @@ class TestParseBackfill(object):
|
|||
excluded_dates = [2021-02-03]
|
||||
reason = Please provide a reason for the backfill and links to any related bugzilla or jira tickets
|
||||
watcher(s) = [nobody@mozilla.com]
|
||||
status = Drafting
|
||||
status = Initiate
|
||||
"""
|
||||
|
||||
assert actual_backfill_str == expected_backfill_str
|
||||
|
@ -364,7 +364,7 @@ class TestParseBackfill(object):
|
|||
" or jira tickets\n"
|
||||
" watchers:\n"
|
||||
" - nobody@mozilla.com\n"
|
||||
" status: Drafting\n"
|
||||
" status: Initiate\n"
|
||||
)
|
||||
|
||||
TEST_BACKFILL_1.excluded_dates = []
|
||||
|
|
|
@ -27,7 +27,7 @@ from bigquery_etl.backfill.utils import (
|
|||
)
|
||||
from bigquery_etl.cli.backfill import create, info, scheduled, validate
|
||||
|
||||
DEFAULT_STATUS = BackfillStatus.DRAFTING
|
||||
DEFAULT_STATUS = BackfillStatus.INITIATE
|
||||
VALID_REASON = "test_reason"
|
||||
VALID_WATCHER = "test@example.org"
|
||||
VALID_BACKFILL = Backfill(
|
||||
|
@ -48,7 +48,7 @@ BACKFILL_YAML_TEMPLATE = (
|
|||
" reason: test_reason\n"
|
||||
" watchers:\n"
|
||||
" - test@example.org\n"
|
||||
" status: Drafting\n"
|
||||
" status: Initiate\n"
|
||||
)
|
||||
|
||||
VALID_WORKGROUP_ACCESS = [
|
||||
|
@ -830,7 +830,7 @@ class TestBackfill:
|
|||
+ " reason: test_reason\n"
|
||||
" watchers:\n"
|
||||
" - test@example.org\n"
|
||||
" status: Drafting\n"
|
||||
" status: Initiate\n"
|
||||
)
|
||||
)
|
||||
|
||||
|
@ -877,7 +877,7 @@ class TestBackfill:
|
|||
+ " reason: test_reason\n"
|
||||
" watchers:\n"
|
||||
" - test@example.org\n"
|
||||
" status: Drafting\n"
|
||||
" status: Initiate\n"
|
||||
)
|
||||
|
||||
assert BACKFILL_FILE in os.listdir(SQL_DIR)
|
||||
|
@ -921,7 +921,7 @@ class TestBackfill:
|
|||
" reason: test_reason\n"
|
||||
" watchers:\n"
|
||||
" - test@example.org\n"
|
||||
" status: Drafting\n"
|
||||
" status: Initiate\n"
|
||||
)
|
||||
|
||||
assert BACKFILL_FILE in os.listdir(SQL_DIR)
|
||||
|
@ -965,7 +965,7 @@ class TestBackfill:
|
|||
" reason: test_reason\n"
|
||||
" watchers:\n"
|
||||
" - test@example.org\n"
|
||||
" status: Drafting\n"
|
||||
" status: Initiate\n"
|
||||
)
|
||||
|
||||
assert BACKFILL_FILE in os.listdir(SQL_DIR)
|
||||
|
@ -1098,7 +1098,7 @@ class TestBackfill:
|
|||
|
||||
assert result.exit_code == 0
|
||||
assert qualified_table_name_1 in result.output
|
||||
assert BackfillStatus.DRAFTING.value in result.output
|
||||
assert BackfillStatus.INITIATE.value in result.output
|
||||
assert BackfillStatus.VALIDATED.value in result.output
|
||||
assert "total of 2 backfill(s)" in result.output
|
||||
assert qualified_table_name_2 not in result.output
|
||||
|
@ -1126,12 +1126,12 @@ class TestBackfill:
|
|||
|
||||
result = runner.invoke(
|
||||
info,
|
||||
[qualified_table_name, "--status=drafting"],
|
||||
[qualified_table_name, "--status=initiate"],
|
||||
)
|
||||
|
||||
assert result.exit_code == 0
|
||||
assert qualified_table_name in result.output
|
||||
assert BackfillStatus.DRAFTING.value in result.output
|
||||
assert BackfillStatus.INITIATE.value in result.output
|
||||
assert "total of 1 backfill(s)" in result.output
|
||||
assert BackfillStatus.VALIDATED.value not in result.output
|
||||
assert BackfillStatus.COMPLETE.value not in result.output
|
||||
|
@ -1213,7 +1213,7 @@ class TestBackfill:
|
|||
assert result.exit_code == 0
|
||||
assert qualified_table_name_1 in result.output
|
||||
assert qualified_table_name_2 in result.output
|
||||
assert BackfillStatus.DRAFTING.value in result.output
|
||||
assert BackfillStatus.INITIATE.value in result.output
|
||||
assert BackfillStatus.VALIDATED.value in result.output
|
||||
assert "total of 3 backfill(s)" in result.output
|
||||
assert BackfillStatus.COMPLETE.value not in result.output
|
||||
|
@ -1268,7 +1268,7 @@ class TestBackfill:
|
|||
assert qualified_table_name_2 in result.output
|
||||
assert BackfillStatus.VALIDATED.value in result.output
|
||||
assert "total of 2 backfill(s)" in result.output
|
||||
assert BackfillStatus.DRAFTING.value not in result.output
|
||||
assert BackfillStatus.INITIATE.value not in result.output
|
||||
assert BackfillStatus.COMPLETE.value not in result.output
|
||||
|
||||
def test_backfill_info_with_invalid_path(self, runner):
|
||||
|
@ -1824,8 +1824,5 @@ class TestBackfill:
|
|||
)
|
||||
|
||||
assert result.exit_code == 0
|
||||
assert (
|
||||
"There are a total of 1 backfill(s) that require processing."
|
||||
in result.output
|
||||
)
|
||||
assert "1 backfill(s) require processing." in result.output
|
||||
assert Path("tmp.json").exists()
|
||||
|
|
Загрузка…
Ссылка в новой задаче