This commit is contained in:
Mikaël Ducharme 2023-03-01 14:47:21 -05:00 коммит произвёл GitHub
Родитель 248b35d72b
Коммит ac5f33b427
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
10 изменённых файлов: 323 добавлений и 96 удалений

Просмотреть файл

@ -13,7 +13,6 @@ orbs:
docker: circleci/docker@2.2.0
python: circleci/python@2.1.1
jobs:
unit-tests:
executor: &python-executor
@ -30,6 +29,44 @@ jobs:
- store_test_results:
path: test-results
black:
executor: *python-executor
steps:
- checkout
- python/install-packages:
pip-dependency-file: requirements.txt
pkg-manager: pip
- run: &git-diff-py-files
name: List added, copied, modified, and renamed *py files
command: git diff --name-only --diff-filter=ACMR origin/main | grep -E "(.py$)" > diff.txt || true
- run:
name: List black errors
command: black . --check &> lint_checks.txt || true
- run:
name: 🧹 Diff-based black
command: &display-lint-errors |
grep -Ff diff.txt lint_checks.txt > lint_errors.txt || true
if [ -s lint_errors.txt ]; then
cat lint_errors.txt
printf 'Run the following command to fix your branch:\n make fixes'
exit 1
fi
ruff:
executor: *python-executor
steps:
- checkout
- python/install-packages:
pip-dependency-file: requirements.txt
pkg-manager: pip
- run: *git-diff-py-files
- run:
name: List ruff errors
command: ruff check . &> lint_checks.txt || true
- run:
name: 🧹 Diff-based ruff
command: *display-lint-errors
validate-requirements:
executor: *python-executor
steps:
@ -58,6 +95,13 @@ jobs:
workflows:
ci:
jobs:
- validate-requirements:
name: 🧪 Validate requirements
filters: &ci-filter
branches:
ignore: main
tags:
ignore: /.*/
- docker/publish:
name: 🛠️ Docker build test
before_build: &version
@ -75,20 +119,26 @@ workflows:
deploy: false
image: $CIRCLE_PROJECT_REPONAME
tag: ${CIRCLE_SHA1:0:9}
filters: &ci-filter
branches:
ignore: main
tags:
ignore: /.*/
filters: *ci-filter
requires:
- 🧪 Validate requirements
- black:
name: ⚫ black
filters: *ci-filter
requires:
- 🧪 Validate requirements
- ruff:
name: 🚓 ruff
filters: *ci-filter
requires:
- 🧪 Validate requirements
- unit-tests:
name: 🧪 Unit tests
filters: *ci-filter
- validate-requirements:
name: 🧪 Validate requirements
filters: *ci-filter
requires:
- ⚫ black
- 🚓 ruff
publish:
jobs:

Просмотреть файл

@ -21,6 +21,10 @@ build:
pip-compile:
pip-compile
fixes:
ruff check $$(git diff --name-only --diff-filter=ACMR origin/main | grep -E "(.py$$)") --fix
black $$(git diff --name-only --diff-filter=ACMR origin/main | grep -E "(.py$$)")
clean: stop
docker-compose rm -f
rm -rf logs/*

Просмотреть файл

@ -6,9 +6,8 @@ from airflow.models import DagModel
from airflow.models.param import Param
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.utils.trigger_rule import TriggerRule
from utils.backfill import BackfillParams
from utils.tags import Tag
@ -27,7 +26,11 @@ def dry_run_branch_callable(params: dict) -> str:
def clear_branch_callable(params: dict) -> str:
backfill_params = BackfillParams(**params)
return TaskId.clear_tasks.value if backfill_params.clear else TaskId.do_not_clear_tasks.value
return (
TaskId.clear_tasks.value
if backfill_params.clear
else TaskId.do_not_clear_tasks.value
)
def param_validation(params: dict) -> bool:
@ -65,7 +68,7 @@ doc_md = """
@dag(
dag_id='backfill',
dag_id="backfill",
schedule_interval=None,
doc_md=doc_md,
catchup=False,
@ -73,17 +76,20 @@ doc_md = """
dagrun_timeout=datetime.timedelta(days=1),
tags=[Tag.ImpactTier.tier_3, Tag.Triage.record_only],
render_template_as_native_obj=True,
params={"dag_name": Param("dag_name", type="string"),
"start_date": Param((datetime.date.today() - datetime.timedelta(days=10)).isoformat(),
type="string",
format="date-time"),
"end_date": Param(datetime.date.today().isoformat(),
type="string",
format="date-time"),
"clear": Param(False, type="boolean"),
"dry_run": Param(True, type="boolean"),
"task_regex": Param(None, type=["string", "null"]),
}
params={
"dag_name": Param("dag_name", type="string"),
"start_date": Param(
(datetime.date.today() - datetime.timedelta(days=10)).isoformat(),
type="string",
format="date-time",
),
"end_date": Param(
datetime.date.today().isoformat(), type="string", format="date-time"
),
"clear": Param(False, type="boolean"),
"dry_run": Param(True, type="boolean"),
"task_regex": Param(None, type=["string", "null"]),
},
)
def backfill_dag():
param_validation_task = PythonOperator(
@ -120,12 +126,19 @@ def backfill_dag():
)
backfill_task = BashOperator(
task_id='execute_backfill',
task_id="execute_backfill",
bash_command="{{ ti.xcom_pull(task_ids='generate_backfill_command') }}",
)
param_validation_task >> dry_run_branch_task >> [dry_run_task, real_deal_task] >> clear_branch_task >> [
clear_tasks_task, do_not_clear_tasks_task] >> generate_backfill_command_task >> backfill_task
(
param_validation_task
>> dry_run_branch_task
>> [dry_run_task, real_deal_task]
>> clear_branch_task
>> [clear_tasks_task, do_not_clear_tasks_task]
>> generate_backfill_command_task
>> backfill_task
)
dag = backfill_dag()

Просмотреть файл

@ -1,7 +1,8 @@
from __future__ import annotations
import dataclasses
import datetime
import re
from typing import Optional, List
@dataclasses.dataclass
@ -11,51 +12,58 @@ class BackfillParams:
end_date: str
clear: bool
dry_run: bool
task_regex: Optional[str]
task_regex: str | None
def validate_date_range(self) -> None:
start_date = datetime.datetime.fromisoformat(self.start_date)
end_date = datetime.datetime.fromisoformat(self.end_date)
if start_date > end_date:
raise ValueError(f"`start_date`={self.start_date} is greater than `end_date`={self.end_date}")
raise ValueError(
f"`start_date`={self.start_date} is greater than `end_date`={self.end_date}"
)
def validate_regex_pattern(self) -> None:
if self.task_regex:
try:
re.compile(self.task_regex)
except re.error:
raise ValueError(f"Invalid regex pattern for `task_regex`={self.task_regex}")
raise ValueError(
f"Invalid regex pattern for `task_regex`={self.task_regex}"
) from None
def generate_backfill_command(self) -> List[str]:
"""Backfill command based off the Airflow plugin implemented by hwoo
def generate_backfill_command(self) -> list[str]:
"""
Backfill command based off the Airflow plugin implemented by hwoo.
Original implementation in plugins/backfill/main.py
"""
# Construct the airflow command
cmd = ['airflow']
cmd = ["airflow"]
if self.clear:
cmd.extend(['tasks', 'clear'])
cmd.extend(["tasks", "clear"])
if self.dry_run:
# For dry runs we simply time out to avoid zombie procs waiting on user input.
# The output is what we're interested in
timeout_list = ['timeout', '60']
timeout_list = ["timeout", "60"]
cmd = timeout_list + cmd
else:
cmd.append('-y')
cmd.append("-y")
if self.task_regex:
cmd.extend(['-t', str(self.task_regex)])
cmd.extend(["-t", str(self.task_regex)])
else:
cmd.extend(['dags', 'backfill', '--donot-pickle'])
cmd.extend(["dags", "backfill", "--donot-pickle"])
if self.dry_run:
cmd.append('--dry-run')
cmd.append("--dry-run")
if self.task_regex:
cmd.extend(['-t', str(self.task_regex)])
cmd.extend(["-t", str(self.task_regex)])
cmd.extend(['-s', str(self.start_date), '-e', str(self.end_date), str(self.dag_name)])
cmd.extend(
["-s", str(self.start_date), "-e", str(self.end_date), str(self.dag_name)]
)
return cmd

40
pyproject.toml Normal file
Просмотреть файл

@ -0,0 +1,40 @@
[tool.ruff.isort]
known-third-party = ["airflow"]
[tool.ruff]
select = [
"E", # pycodestyle
"W", # pycodestyle
"F", # Pyflakes
"B", # flake8-bugbear
"C4", # flake8-comprehensions
"D", # flake8-docstrings
"I", # isort
"SIM", # flake8-simplify
"TCH", # flake8-type-checking
"TID", # flake8-tidy-imports
"Q", # flake8-quotes
"UP", # pyupgrade
"PT", # flake8-pytest-style
"RUF", # Ruff-specific rules
]
ignore = [
"E501", # line too long, handled by black
# Docstring linting
"D100", # Missing docstring in public module
"D101", # Missing docstring in public class
"D102", # Missing docstring in public method
"D103", # Missing docstring in public function
"D104", # Missing docstring in public package
"D105", # Missing docstring in magic method
"D107", # Missing docstring in __init__
"D202", # No blank lines allowed after function docstring -> clashes with Black
"D203", # 1 blank line required before class docstring
"D212", # Multi-line docstring summary should start at the first line
"D415", # First line should end with a period, question mark, or exclamation point
"D416", #Section name should end with a colon ("{name}")
# flake8-pytest-style:
"PT011", # pytest.raises({exception}) is too broad, set the match parameter or use a more specific exception
# To enable when we migrate to Python 3.10
"B905", # `zip()` without an explicit `strict=` parameter
]

Просмотреть файл

@ -10,10 +10,14 @@ apache-airflow-providers-http
apache-airflow-providers-slack
airflow-provider-fivetran==1.1.2
# Code quality
pytest==6.2.5
pytest-mock==3.10.0
black==22.6.0
ruff==0.0.252
# Misc
mozlogging
pytest
pytest-mock
# Required for backfill UI
flask-admin

Просмотреть файл

@ -121,6 +121,10 @@ billiard==3.6.4.0
# via
# -c ./constraints.txt
# celery
black==22.6.0
# via
# -c ./constraints.txt
# -r requirements.in
blinker==1.4
# via
# -c ./constraints.txt
@ -176,6 +180,7 @@ charset-normalizer==2.0.12
click==8.1.3
# via
# -c ./constraints.txt
# black
# celery
# click-didyoumean
# click-plugins
@ -804,6 +809,10 @@ mypy-boto3-redshift-data==1.24.11.post3
# via
# -c ./constraints.txt
# apache-airflow-providers-amazon
mypy-extensions==0.4.3
# via
# -c ./constraints.txt
# black
mysql-connector-python==8.0.29
# via
# -c ./constraints.txt
@ -850,10 +859,15 @@ pathspec==0.9.0
# via
# -c ./constraints.txt
# apache-airflow
# black
pendulum==2.1.2
# via
# -c ./constraints.txt
# apache-airflow
platformdirs==2.5.2
# via
# -c ./constraints.txt
# black
pluggy==1.0.0
# via
# -c ./constraints.txt
@ -1096,6 +1110,8 @@ rsa==4.8
# via
# -c ./constraints.txt
# google-auth
ruff==0.0.252
# via -r requirements.in
s3transfer==0.6.0
# via
# -c ./constraints.txt
@ -1195,6 +1211,10 @@ toml==0.10.2
# via
# -c ./constraints.txt
# pytest
tomli==2.0.1
# via
# -c ./constraints.txt
# black
tornado==6.2
# via
# -c ./constraints.txt
@ -1203,6 +1223,7 @@ typing-extensions==4.3.0
# via
# -c ./constraints.txt
# apache-airflow
# black
# looker-sdk
# mypy-boto3-rds
# mypy-boto3-redshift-data

Просмотреть файл

@ -3,7 +3,6 @@ import os
import pathlib
import sys
import warnings
from typing import Dict, Union
import pytest
from airflow.models import DagBag
@ -41,15 +40,17 @@ def get_dag_bag(session_mocker) -> DagBag:
def env_load_variables_from_json(path: pathlib.Path) -> None:
"""Load Airflow Variables as environment variables from a JSON file generated
by running `airflow variables export <filename>.json`. Variable values
must be `str` or `int`.
"""
Load Airflow Variables as environment variables from a JSON file.
JSON file should be generated by running `airflow variables export <filename>.json`.
Variable values must be `str` or `int`.
See this link for more information on Airflow Variables as environment variables
https://airflow.apache.org/docs/apache-airflow/stable/howto/variable.html
"""
with open(path, "r") as file:
variables: Dict[str, Union[str, int]] = json.load(file)
with open(path) as file:
variables: dict[str, str | int] = json.load(file)
for name, value in variables.items():
formatted_variable_name = f"AIRFLOW_VAR_{name.upper()}"
@ -57,17 +58,19 @@ def env_load_variables_from_json(path: pathlib.Path) -> None:
def env_load_connections_from_json(path: pathlib.Path) -> None:
"""Load Airflow Connections as environment variables from a JSON file generated
by running `airflow connections export <filename>.json`. Uses a Connection object
to ensure correct Connection parsing.
"""
Load Airflow Connections as environment variables from a JSON file.
JSON file should be generated by running `airflow connections export <filename>.json`.
Uses a Connection object to ensure correct Connection parsing.
See this link for more information on Airflow Connections as environment variables
https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html
"""
from airflow.models import Connection
with open(path, "r") as file:
connections: Dict[str, Dict] = json.load(file)
with open(path) as file:
connections: dict[str, dict] = json.load(file)
for name, params in connections.items():
conn_instance = Connection.from_json(value=json.dumps(params), conn_id=name)

Просмотреть файл

@ -1,5 +1,6 @@
def test_dag_validity(get_dag_bag):
"""Test all DAGs can be parsed.
"""
Test all DAGs can be parsed.
This test should be equivalent to the integration test using airflow CLI.
At the moment, there is a discrepancy between this unit test and the integration
@ -20,9 +21,14 @@ def test_dag_tags(get_dag_bag):
"""Check tags in all DAGs are valid."""
valid_tags = {
"impact/tier_1", "impact/tier_2", "impact/tier_3", "repo/bigquery-etl",
"repo/telemetry-airflow", "repo/private-bigquery-etl",
"triage/no_triage", "triage/record_only"
"impact/tier_1",
"impact/tier_2",
"impact/tier_3",
"repo/bigquery-etl",
"repo/telemetry-airflow",
"repo/private-bigquery-etl",
"triage/no_triage",
"triage/record_only",
}
dagbag = get_dag_bag
@ -32,7 +38,7 @@ def test_dag_tags(get_dag_bag):
def test_dag_tags_required(get_dag_bag):
"""Check at least one tag per DAG is of the required type"""
"""Check at least one tag per DAG is of the required type."""
required_tag_type = "impact"
dagbag = get_dag_bag
@ -42,6 +48,6 @@ def test_dag_tags_required(get_dag_bag):
if dag.is_subdag:
continue
assert [tag for tag in dag.tags if
required_tag_type in tag], f"DAG: {dag_name}: Missing required tag " \
f"type `{required_tag_type}`"
assert [
tag for tag in dag.tags if required_tag_type in tag
], f"DAG: {dag_name}: Missing required tag type `{required_tag_type}`"

Просмотреть файл

@ -1,23 +1,21 @@
from typing import List
import pytest
from dags.utils.backfill import BackfillParams
@pytest.fixture(scope="function")
@pytest.fixture()
def base_params() -> dict:
return {
'clear': False,
'dry_run': True,
'dag_name': 'dag_name',
'end_date': '2022-11-10',
'start_date': '2022-10-31',
'task_regex': None,
"clear": False,
"dry_run": True,
"dag_name": "dag_name",
"end_date": "2022-11-10",
"start_date": "2022-10-31",
"task_regex": None,
}
@pytest.fixture(scope="function")
@pytest.fixture()
def base_backfill_params(base_params: dict) -> BackfillParams:
return BackfillParams(**base_params)
@ -27,7 +25,10 @@ def test_date_validation(base_backfill_params) -> None:
base_backfill_params.validate_date_range()
# invalid date range
base_backfill_params.start_date, base_backfill_params.end_date = base_backfill_params.end_date, base_backfill_params.start_date
base_backfill_params.start_date, base_backfill_params.end_date = (
base_backfill_params.end_date,
base_backfill_params.start_date,
)
with pytest.raises(ValueError):
base_backfill_params.validate_date_range()
@ -47,7 +48,8 @@ def test_validate_regex_pattern(base_backfill_params) -> None:
def test_generate_backfill_command(base_backfill_params) -> None:
"""Assert backfill commands are equivalent between the backfill plugin and backfill DAG
"""
Assert backfill commands are equivalent between the backfill plugin and backfill DAG.
Expected results were generated from the plugin implementation
@ -55,33 +57,109 @@ def test_generate_backfill_command(base_backfill_params) -> None:
test_start_date = "2022-01-01"
test_end_date = "2022-01-10"
test_params: List[BackfillParams] = [
BackfillParams(clear=True, dry_run=True, task_regex=None, dag_name="test_value", start_date=test_start_date,
end_date=test_end_date),
BackfillParams(clear=False, dry_run=True, task_regex=None, dag_name="test_value", start_date=test_start_date,
end_date=test_end_date),
BackfillParams(clear=True, dry_run=False, task_regex=None, dag_name="test_value", start_date=test_start_date,
end_date=test_end_date),
BackfillParams(clear=False, dry_run=False, task_regex=None, dag_name="test_value", start_date=test_start_date,
end_date=test_end_date),
BackfillParams(clear=False, dry_run=False, task_regex="/ab+c/", dag_name="test_value",
start_date=test_start_date,
end_date=test_end_date),
test_params: list[BackfillParams] = [
BackfillParams(
clear=True,
dry_run=True,
task_regex=None,
dag_name="test_value",
start_date=test_start_date,
end_date=test_end_date,
),
BackfillParams(
clear=False,
dry_run=True,
task_regex=None,
dag_name="test_value",
start_date=test_start_date,
end_date=test_end_date,
),
BackfillParams(
clear=True,
dry_run=False,
task_regex=None,
dag_name="test_value",
start_date=test_start_date,
end_date=test_end_date,
),
BackfillParams(
clear=False,
dry_run=False,
task_regex=None,
dag_name="test_value",
start_date=test_start_date,
end_date=test_end_date,
),
BackfillParams(
clear=False,
dry_run=False,
task_regex="/ab+c/",
dag_name="test_value",
start_date=test_start_date,
end_date=test_end_date,
),
]
expected_results = [
[
'timeout', '60', 'airflow', 'tasks', 'clear', '-s', '2022-01-01', '-e', '2022-01-10', 'test_value'],
"timeout",
"60",
"airflow",
"tasks",
"clear",
"-s",
"2022-01-01",
"-e",
"2022-01-10",
"test_value",
],
[
'airflow', 'dags', 'backfill', '--donot-pickle', '--dry-run', '-s', '2022-01-01', '-e', '2022-01-10',
'test_value'],
"airflow",
"dags",
"backfill",
"--donot-pickle",
"--dry-run",
"-s",
"2022-01-01",
"-e",
"2022-01-10",
"test_value",
],
[
'airflow', 'tasks', 'clear', '-y', '-s', '2022-01-01', '-e', '2022-01-10', 'test_value'],
"airflow",
"tasks",
"clear",
"-y",
"-s",
"2022-01-01",
"-e",
"2022-01-10",
"test_value",
],
[
'airflow', 'dags', 'backfill', '--donot-pickle', '-s', '2022-01-01', '-e', '2022-01-10', 'test_value'],
"airflow",
"dags",
"backfill",
"--donot-pickle",
"-s",
"2022-01-01",
"-e",
"2022-01-10",
"test_value",
],
[
'airflow', 'dags', 'backfill', '--donot-pickle', '-t', '/ab+c/', '-s', '2022-01-01', '-e', '2022-01-10',
'test_value']
"airflow",
"dags",
"backfill",
"--donot-pickle",
"-t",
"/ab+c/",
"-s",
"2022-01-01",
"-e",
"2022-01-10",
"test_value",
],
]
for params, result in zip(test_params, expected_results):