Increase task name limit from 62 characters to 250 characters (#3876)

The 62 character limit was due to a Kubernetes pod label limit, which has been worked around as of Airflow 2.0.1.
This commit is contained in:
Sean Rose 2023-06-01 09:13:38 -07:00 коммит произвёл GitHub
Родитель 94d28a329f
Коммит 3f4f5a7f94
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 66 добавлений и 60 удалений

Просмотреть файл

@ -29,6 +29,7 @@ QUERY_FILE_RE = re.compile(
r"([a-zA-Z0-9_]+)_(v[0-9]+)/(?:query\.sql|part1\.sql|script\.sql|query\.py)$"
)
DEFAULT_DESTINATION_TABLE_STR = "use-default-destination-table"
MAX_TASK_NAME_LENGTH = 250
class TaskParseException(Exception):
@ -249,10 +250,10 @@ class Task:
def validate_task_name(self, attribute, value):
"""Validate the task name."""
if value is not None:
if len(value) < 1 or len(value) > 62:
if len(value) < 1 or len(value) > MAX_TASK_NAME_LENGTH:
raise ValueError(
f"Invalid task name {value}. "
+ "The task name has to be 1 to 62 characters long."
f"The task name has to be 1 to {MAX_TASK_NAME_LENGTH} characters long."
)
@retry_delay.validator
@ -275,7 +276,9 @@ class Task:
if self.task_name is None:
# limiting task name to allow longer dataset names
self.task_name = f"{self.dataset}__{self.table}__{self.version}"[-62:]
self.task_name = f"{self.dataset}__{self.table}__{self.version}"[
-MAX_TASK_NAME_LENGTH:
]
self.validate_task_name(None, self.task_name)
if self.destination_table == DEFAULT_DESTINATION_TABLE_STR:

Просмотреть файл

@ -344,6 +344,36 @@ with DAG(
depends_on_past=False,
)
firefox_accounts_derived__fxa_users_services_devices_first_seen__v1 = bigquery_etl_query(
task_id="firefox_accounts_derived__fxa_users_services_devices_first_seen__v1",
destination_table="fxa_users_services_devices_first_seen_v1",
dataset_id="firefox_accounts_derived",
project_id="moz-fx-data-shared-prod",
owner="kignasiak@mozilla.com",
email=[
"dthorn@mozilla.com",
"kignasiak@mozilla.com",
"telemetry-alerts@mozilla.com",
],
date_partition_parameter="submission_date",
depends_on_past=True,
)
firefox_accounts_derived__fxa_users_services_devices_last_seen__v1 = bigquery_etl_query(
task_id="firefox_accounts_derived__fxa_users_services_devices_last_seen__v1",
destination_table="fxa_users_services_devices_last_seen_v1",
dataset_id="firefox_accounts_derived",
project_id="moz-fx-data-shared-prod",
owner="kignasiak@mozilla.com",
email=[
"dthorn@mozilla.com",
"kignasiak@mozilla.com",
"telemetry-alerts@mozilla.com",
],
date_partition_parameter="submission_date",
depends_on_past=True,
)
firefox_accounts_derived__fxa_users_services_first_seen__v2 = bigquery_etl_query(
task_id="firefox_accounts_derived__fxa_users_services_first_seen__v2",
destination_table="fxa_users_services_first_seen_v2",
@ -395,36 +425,6 @@ with DAG(
arguments=["--schema_update_option=ALLOW_FIELD_ADDITION"],
)
fox_accounts_derived__fxa_users_services_devices_last_seen__v1 = bigquery_etl_query(
task_id="fox_accounts_derived__fxa_users_services_devices_last_seen__v1",
destination_table="fxa_users_services_devices_last_seen_v1",
dataset_id="firefox_accounts_derived",
project_id="moz-fx-data-shared-prod",
owner="kignasiak@mozilla.com",
email=[
"dthorn@mozilla.com",
"kignasiak@mozilla.com",
"telemetry-alerts@mozilla.com",
],
date_partition_parameter="submission_date",
depends_on_past=True,
)
ox_accounts_derived__fxa_users_services_devices_first_seen__v1 = bigquery_etl_query(
task_id="ox_accounts_derived__fxa_users_services_devices_first_seen__v1",
destination_table="fxa_users_services_devices_first_seen_v1",
dataset_id="firefox_accounts_derived",
project_id="moz-fx-data-shared-prod",
owner="kignasiak@mozilla.com",
email=[
"dthorn@mozilla.com",
"kignasiak@mozilla.com",
"telemetry-alerts@mozilla.com",
],
date_partition_parameter="submission_date",
depends_on_past=True,
)
firefox_accounts_derived__exact_mau28__v1.set_upstream(
firefox_accounts_derived__fxa_users_last_seen__v1
)
@ -493,14 +493,14 @@ with DAG(
firefox_accounts_derived__fxa_stdout_events__v1
)
firefox_accounts_derived__fxa_users_services_devices_first_seen__v1.set_upstream(
firefox_accounts_derived__fxa_users_services_devices_daily__v1
)
firefox_accounts_derived__fxa_users_services_devices_last_seen__v1.set_upstream(
firefox_accounts_derived__fxa_users_services_devices_daily__v1
)
firefox_accounts_derived__fxa_users_services_first_seen__v2.set_upstream(
firefox_accounts_derived__fxa_users_services_daily__v2
)
fox_accounts_derived__fxa_users_services_devices_last_seen__v1.set_upstream(
firefox_accounts_derived__fxa_users_services_devices_daily__v1
)
ox_accounts_derived__fxa_users_services_devices_first_seen__v1.set_upstream(
firefox_accounts_derived__fxa_users_services_devices_daily__v1
)

Просмотреть файл

@ -40,15 +40,17 @@ with DAG(
doc_md=docs,
tags=tags,
) as dag:
g_mozilla_focus_beta_derived__additional_deletion_requests__v1 = bigquery_etl_query(
task_id="g_mozilla_focus_beta_derived__additional_deletion_requests__v1",
destination_table="additional_deletion_requests_v1",
dataset_id="org_mozilla_focus_beta_derived",
project_id="moz-fx-data-shared-prod",
owner="dthorn@mozilla.com",
email=["dthorn@mozilla.com", "telemetry-alerts@mozilla.com"],
date_partition_parameter="submission_date",
depends_on_past=False,
org_mozilla_focus_beta_derived__additional_deletion_requests__v1 = (
bigquery_etl_query(
task_id="org_mozilla_focus_beta_derived__additional_deletion_requests__v1",
destination_table="additional_deletion_requests_v1",
dataset_id="org_mozilla_focus_beta_derived",
project_id="moz-fx-data-shared-prod",
owner="dthorn@mozilla.com",
email=["dthorn@mozilla.com", "telemetry-alerts@mozilla.com"],
date_partition_parameter="submission_date",
depends_on_past=False,
)
)
org_mozilla_focus_derived__additional_deletion_requests__v1 = bigquery_etl_query(
@ -62,8 +64,8 @@ with DAG(
depends_on_past=False,
)
ozilla_focus_nightly_derived__additional_deletion_requests__v1 = bigquery_etl_query(
task_id="ozilla_focus_nightly_derived__additional_deletion_requests__v1",
org_mozilla_focus_nightly_derived__additional_deletion_requests__v1 = bigquery_etl_query(
task_id="org_mozilla_focus_nightly_derived__additional_deletion_requests__v1",
destination_table="additional_deletion_requests_v1",
dataset_id="org_mozilla_focus_nightly_derived",
project_id="moz-fx-data-shared-prod",
@ -85,7 +87,7 @@ with DAG(
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
g_mozilla_focus_beta_derived__additional_deletion_requests__v1.set_upstream(
org_mozilla_focus_beta_derived__additional_deletion_requests__v1.set_upstream(
wait_for_copy_deduplicate_all
)
@ -93,6 +95,6 @@ with DAG(
wait_for_copy_deduplicate_all
)
ozilla_focus_nightly_derived__additional_deletion_requests__v1.set_upstream(
org_mozilla_focus_nightly_derived__additional_deletion_requests__v1.set_upstream(
wait_for_copy_deduplicate_all
)

Просмотреть файл

@ -7,6 +7,7 @@ import pytest
from bigquery_etl.metadata.parse_metadata import Metadata
from bigquery_etl.query_scheduling.dag_collection import DagCollection
from bigquery_etl.query_scheduling.task import (
MAX_TASK_NAME_LENGTH,
Task,
TaskParseException,
TaskRef,
@ -193,7 +194,7 @@ class TestTask:
scheduling = {
"dag_name": "bqetl_test_dag",
"default_args": {"owner": "test@example.org"},
"task_name": "a" * 63,
"task_name": "a" * (MAX_TASK_NAME_LENGTH + 1),
}
metadata = Metadata("test", "test", ["test@example.org"], {}, scheduling)
@ -215,13 +216,13 @@ class TestTask:
scheduling = {
"dag_name": "bqetl_test_dag",
"default_args": {"owner": "test@example.org"},
"task_name": "a" * 62,
"task_name": "a" * MAX_TASK_NAME_LENGTH,
}
metadata = Metadata("test", "test", ["test@example.org"], {}, scheduling)
task = Task.of_query(query_file, metadata)
assert task.task_name == "a" * 62
assert task.task_name == "a" * MAX_TASK_NAME_LENGTH
def test_validate_task_name(self):
query_file = (
@ -230,7 +231,7 @@ class TestTask:
/ "test_sql"
/ "moz-fx-data-test-project"
/ "test"
/ (("a" * 63) + "_v1")
/ (("a" * MAX_TASK_NAME_LENGTH) + "_v1")
/ "query.sql"
)
@ -242,10 +243,10 @@ class TestTask:
metadata = Metadata("test", "test", ["test@example.org"], {}, scheduling)
task = Task.of_query(query_file, metadata)
assert task.task_name == "a" * 58 + "__v1"
assert task.task_name == ("a" * (MAX_TASK_NAME_LENGTH - 4)) + "__v1"
with pytest.raises(ValueError):
task.task_name = "a" * 64
task.task_name = ("a" * MAX_TASK_NAME_LENGTH) + "__v1"
Task.validate_task_name(task, "task_name", task.task_name)
def test_dag_name_validation(self):