diff --git a/.circleci/config.yml b/.circleci/config.yml index 36d40239..4e538b80 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -13,7 +13,6 @@ orbs: docker: circleci/docker@2.2.0 python: circleci/python@2.1.1 - jobs: unit-tests: executor: &python-executor @@ -30,6 +29,44 @@ jobs: - store_test_results: path: test-results + black: + executor: *python-executor + steps: + - checkout + - python/install-packages: + pip-dependency-file: requirements.txt + pkg-manager: pip + - run: &git-diff-py-files + name: List added, copied, modified, and renamed *py files + command: git diff --name-only --diff-filter=ACMR origin/main | grep -E "(.py$)" > diff.txt || true + - run: + name: List black errors + command: black . --check &> lint_checks.txt || true + - run: + name: 🧹 Diff-based black + command: &display-lint-errors | + grep -Ff diff.txt lint_checks.txt > lint_errors.txt || true + if [ -s lint_errors.txt ]; then + cat lint_errors.txt + printf 'Run the following command to fix your branch:\n make fixes' + exit 1 + fi + + ruff: + executor: *python-executor + steps: + - checkout + - python/install-packages: + pip-dependency-file: requirements.txt + pkg-manager: pip + - run: *git-diff-py-files + - run: + name: List ruff errors + command: ruff check . &> lint_checks.txt || true + - run: + name: 🧹 Diff-based ruff + command: *display-lint-errors + validate-requirements: executor: *python-executor steps: @@ -58,6 +95,13 @@ jobs: workflows: ci: jobs: + - validate-requirements: + name: 🧪 Validate requirements + filters: &ci-filter + branches: + ignore: main + tags: + ignore: /.*/ - docker/publish: name: 🛠️ Docker build test before_build: &version @@ -75,20 +119,26 @@ workflows: deploy: false image: $CIRCLE_PROJECT_REPONAME tag: ${CIRCLE_SHA1:0:9} - filters: &ci-filter - branches: - ignore: main - tags: - ignore: /.*/ + filters: *ci-filter + requires: + - 🧪 Validate requirements + - black: + name: ⚫ black + filters: *ci-filter + requires: + - 🧪 Validate requirements + - ruff: + name: 🚓 ruff + filters: *ci-filter + requires: + - 🧪 Validate requirements - unit-tests: name: 🧪 Unit tests filters: *ci-filter - - - validate-requirements: - name: 🧪 Validate requirements - filters: *ci-filter - + requires: + - ⚫ black + - 🚓 ruff publish: jobs: diff --git a/Makefile b/Makefile index b1caeb4d..dd010841 100644 --- a/Makefile +++ b/Makefile @@ -21,6 +21,10 @@ build: pip-compile: pip-compile +fixes: + ruff check $$(git diff --name-only --diff-filter=ACMR origin/main | grep -E "(.py$$)") --fix + black $$(git diff --name-only --diff-filter=ACMR origin/main | grep -E "(.py$$)") + clean: stop docker-compose rm -f rm -rf logs/* diff --git a/dags/backfill.py b/dags/backfill.py index 91cfe877..82333e7c 100644 --- a/dags/backfill.py +++ b/dags/backfill.py @@ -6,9 +6,8 @@ from airflow.models import DagModel from airflow.models.param import Param from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator -from airflow.operators.python import PythonOperator, BranchPythonOperator +from airflow.operators.python import BranchPythonOperator, PythonOperator from airflow.utils.trigger_rule import TriggerRule - from utils.backfill import BackfillParams from utils.tags import Tag @@ -27,7 +26,11 @@ def dry_run_branch_callable(params: dict) -> str: def clear_branch_callable(params: dict) -> str: backfill_params = BackfillParams(**params) - return TaskId.clear_tasks.value if backfill_params.clear else TaskId.do_not_clear_tasks.value + return ( + TaskId.clear_tasks.value + if backfill_params.clear + else TaskId.do_not_clear_tasks.value + ) def param_validation(params: dict) -> bool: @@ -65,7 +68,7 @@ doc_md = """ @dag( - dag_id='backfill', + dag_id="backfill", schedule_interval=None, doc_md=doc_md, catchup=False, @@ -73,17 +76,20 @@ doc_md = """ dagrun_timeout=datetime.timedelta(days=1), tags=[Tag.ImpactTier.tier_3, Tag.Triage.record_only], render_template_as_native_obj=True, - params={"dag_name": Param("dag_name", type="string"), - "start_date": Param((datetime.date.today() - datetime.timedelta(days=10)).isoformat(), - type="string", - format="date-time"), - "end_date": Param(datetime.date.today().isoformat(), - type="string", - format="date-time"), - "clear": Param(False, type="boolean"), - "dry_run": Param(True, type="boolean"), - "task_regex": Param(None, type=["string", "null"]), - } + params={ + "dag_name": Param("dag_name", type="string"), + "start_date": Param( + (datetime.date.today() - datetime.timedelta(days=10)).isoformat(), + type="string", + format="date-time", + ), + "end_date": Param( + datetime.date.today().isoformat(), type="string", format="date-time" + ), + "clear": Param(False, type="boolean"), + "dry_run": Param(True, type="boolean"), + "task_regex": Param(None, type=["string", "null"]), + }, ) def backfill_dag(): param_validation_task = PythonOperator( @@ -120,12 +126,19 @@ def backfill_dag(): ) backfill_task = BashOperator( - task_id='execute_backfill', + task_id="execute_backfill", bash_command="{{ ti.xcom_pull(task_ids='generate_backfill_command') }}", ) - param_validation_task >> dry_run_branch_task >> [dry_run_task, real_deal_task] >> clear_branch_task >> [ - clear_tasks_task, do_not_clear_tasks_task] >> generate_backfill_command_task >> backfill_task + ( + param_validation_task + >> dry_run_branch_task + >> [dry_run_task, real_deal_task] + >> clear_branch_task + >> [clear_tasks_task, do_not_clear_tasks_task] + >> generate_backfill_command_task + >> backfill_task + ) dag = backfill_dag() diff --git a/dags/utils/backfill.py b/dags/utils/backfill.py index b250845f..36f435c3 100644 --- a/dags/utils/backfill.py +++ b/dags/utils/backfill.py @@ -1,7 +1,8 @@ +from __future__ import annotations + import dataclasses import datetime import re -from typing import Optional, List @dataclasses.dataclass @@ -11,51 +12,58 @@ class BackfillParams: end_date: str clear: bool dry_run: bool - task_regex: Optional[str] + task_regex: str | None def validate_date_range(self) -> None: start_date = datetime.datetime.fromisoformat(self.start_date) end_date = datetime.datetime.fromisoformat(self.end_date) if start_date > end_date: - raise ValueError(f"`start_date`={self.start_date} is greater than `end_date`={self.end_date}") + raise ValueError( + f"`start_date`={self.start_date} is greater than `end_date`={self.end_date}" + ) def validate_regex_pattern(self) -> None: if self.task_regex: try: re.compile(self.task_regex) except re.error: - raise ValueError(f"Invalid regex pattern for `task_regex`={self.task_regex}") + raise ValueError( + f"Invalid regex pattern for `task_regex`={self.task_regex}" + ) from None - def generate_backfill_command(self) -> List[str]: - """Backfill command based off the Airflow plugin implemented by hwoo + def generate_backfill_command(self) -> list[str]: + """ + Backfill command based off the Airflow plugin implemented by hwoo. Original implementation in plugins/backfill/main.py """ # Construct the airflow command - cmd = ['airflow'] + cmd = ["airflow"] if self.clear: - cmd.extend(['tasks', 'clear']) + cmd.extend(["tasks", "clear"]) if self.dry_run: # For dry runs we simply time out to avoid zombie procs waiting on user input. # The output is what we're interested in - timeout_list = ['timeout', '60'] + timeout_list = ["timeout", "60"] cmd = timeout_list + cmd else: - cmd.append('-y') + cmd.append("-y") if self.task_regex: - cmd.extend(['-t', str(self.task_regex)]) + cmd.extend(["-t", str(self.task_regex)]) else: - cmd.extend(['dags', 'backfill', '--donot-pickle']) + cmd.extend(["dags", "backfill", "--donot-pickle"]) if self.dry_run: - cmd.append('--dry-run') + cmd.append("--dry-run") if self.task_regex: - cmd.extend(['-t', str(self.task_regex)]) + cmd.extend(["-t", str(self.task_regex)]) - cmd.extend(['-s', str(self.start_date), '-e', str(self.end_date), str(self.dag_name)]) + cmd.extend( + ["-s", str(self.start_date), "-e", str(self.end_date), str(self.dag_name)] + ) return cmd diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 00000000..f61c90e9 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,40 @@ +[tool.ruff.isort] +known-third-party = ["airflow"] + +[tool.ruff] +select = [ + "E", # pycodestyle + "W", # pycodestyle + "F", # Pyflakes + "B", # flake8-bugbear + "C4", # flake8-comprehensions + "D", # flake8-docstrings + "I", # isort + "SIM", # flake8-simplify + "TCH", # flake8-type-checking + "TID", # flake8-tidy-imports + "Q", # flake8-quotes + "UP", # pyupgrade + "PT", # flake8-pytest-style + "RUF", # Ruff-specific rules +] +ignore = [ + "E501", # line too long, handled by black + # Docstring linting + "D100", # Missing docstring in public module + "D101", # Missing docstring in public class + "D102", # Missing docstring in public method + "D103", # Missing docstring in public function + "D104", # Missing docstring in public package + "D105", # Missing docstring in magic method + "D107", # Missing docstring in __init__ + "D202", # No blank lines allowed after function docstring -> clashes with Black + "D203", # 1 blank line required before class docstring + "D212", # Multi-line docstring summary should start at the first line + "D415", # First line should end with a period, question mark, or exclamation point + "D416", #Section name should end with a colon ("{name}") + # flake8-pytest-style: + "PT011", # pytest.raises({exception}) is too broad, set the match parameter or use a more specific exception + # To enable when we migrate to Python 3.10 + "B905", # `zip()` without an explicit `strict=` parameter +] diff --git a/requirements.in b/requirements.in index bef38e0a..69aff59c 100644 --- a/requirements.in +++ b/requirements.in @@ -10,10 +10,14 @@ apache-airflow-providers-http apache-airflow-providers-slack airflow-provider-fivetran==1.1.2 +# Code quality +pytest==6.2.5 +pytest-mock==3.10.0 +black==22.6.0 +ruff==0.0.252 + # Misc mozlogging -pytest -pytest-mock # Required for backfill UI flask-admin diff --git a/requirements.txt b/requirements.txt index aac556ca..a855f50e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -121,6 +121,10 @@ billiard==3.6.4.0 # via # -c ./constraints.txt # celery +black==22.6.0 + # via + # -c ./constraints.txt + # -r requirements.in blinker==1.4 # via # -c ./constraints.txt @@ -176,6 +180,7 @@ charset-normalizer==2.0.12 click==8.1.3 # via # -c ./constraints.txt + # black # celery # click-didyoumean # click-plugins @@ -804,6 +809,10 @@ mypy-boto3-redshift-data==1.24.11.post3 # via # -c ./constraints.txt # apache-airflow-providers-amazon +mypy-extensions==0.4.3 + # via + # -c ./constraints.txt + # black mysql-connector-python==8.0.29 # via # -c ./constraints.txt @@ -850,10 +859,15 @@ pathspec==0.9.0 # via # -c ./constraints.txt # apache-airflow + # black pendulum==2.1.2 # via # -c ./constraints.txt # apache-airflow +platformdirs==2.5.2 + # via + # -c ./constraints.txt + # black pluggy==1.0.0 # via # -c ./constraints.txt @@ -1096,6 +1110,8 @@ rsa==4.8 # via # -c ./constraints.txt # google-auth +ruff==0.0.252 + # via -r requirements.in s3transfer==0.6.0 # via # -c ./constraints.txt @@ -1195,6 +1211,10 @@ toml==0.10.2 # via # -c ./constraints.txt # pytest +tomli==2.0.1 + # via + # -c ./constraints.txt + # black tornado==6.2 # via # -c ./constraints.txt @@ -1203,6 +1223,7 @@ typing-extensions==4.3.0 # via # -c ./constraints.txt # apache-airflow + # black # looker-sdk # mypy-boto3-rds # mypy-boto3-redshift-data diff --git a/tests/conftest.py b/tests/conftest.py index 96760fb6..37902afa 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,7 +3,6 @@ import os import pathlib import sys import warnings -from typing import Dict, Union import pytest from airflow.models import DagBag @@ -41,15 +40,17 @@ def get_dag_bag(session_mocker) -> DagBag: def env_load_variables_from_json(path: pathlib.Path) -> None: - """Load Airflow Variables as environment variables from a JSON file generated - by running `airflow variables export .json`. Variable values - must be `str` or `int`. + """ + Load Airflow Variables as environment variables from a JSON file. + + JSON file should be generated by running `airflow variables export .json`. + Variable values must be `str` or `int`. See this link for more information on Airflow Variables as environment variables https://airflow.apache.org/docs/apache-airflow/stable/howto/variable.html """ - with open(path, "r") as file: - variables: Dict[str, Union[str, int]] = json.load(file) + with open(path) as file: + variables: dict[str, str | int] = json.load(file) for name, value in variables.items(): formatted_variable_name = f"AIRFLOW_VAR_{name.upper()}" @@ -57,17 +58,19 @@ def env_load_variables_from_json(path: pathlib.Path) -> None: def env_load_connections_from_json(path: pathlib.Path) -> None: - """Load Airflow Connections as environment variables from a JSON file generated - by running `airflow connections export .json`. Uses a Connection object - to ensure correct Connection parsing. + """ + Load Airflow Connections as environment variables from a JSON file. + + JSON file should be generated by running `airflow connections export .json`. + Uses a Connection object to ensure correct Connection parsing. See this link for more information on Airflow Connections as environment variables https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html """ from airflow.models import Connection - with open(path, "r") as file: - connections: Dict[str, Dict] = json.load(file) + with open(path) as file: + connections: dict[str, dict] = json.load(file) for name, params in connections.items(): conn_instance = Connection.from_json(value=json.dumps(params), conn_id=name) diff --git a/tests/dags/test_dag_validity.py b/tests/dags/test_dag_validity.py index b167868e..a273dd75 100644 --- a/tests/dags/test_dag_validity.py +++ b/tests/dags/test_dag_validity.py @@ -1,5 +1,6 @@ def test_dag_validity(get_dag_bag): - """Test all DAGs can be parsed. + """ + Test all DAGs can be parsed. This test should be equivalent to the integration test using airflow CLI. At the moment, there is a discrepancy between this unit test and the integration @@ -20,9 +21,14 @@ def test_dag_tags(get_dag_bag): """Check tags in all DAGs are valid.""" valid_tags = { - "impact/tier_1", "impact/tier_2", "impact/tier_3", "repo/bigquery-etl", - "repo/telemetry-airflow", "repo/private-bigquery-etl", - "triage/no_triage", "triage/record_only" + "impact/tier_1", + "impact/tier_2", + "impact/tier_3", + "repo/bigquery-etl", + "repo/telemetry-airflow", + "repo/private-bigquery-etl", + "triage/no_triage", + "triage/record_only", } dagbag = get_dag_bag @@ -32,7 +38,7 @@ def test_dag_tags(get_dag_bag): def test_dag_tags_required(get_dag_bag): - """Check at least one tag per DAG is of the required type""" + """Check at least one tag per DAG is of the required type.""" required_tag_type = "impact" dagbag = get_dag_bag @@ -42,6 +48,6 @@ def test_dag_tags_required(get_dag_bag): if dag.is_subdag: continue - assert [tag for tag in dag.tags if - required_tag_type in tag], f"DAG: {dag_name}: Missing required tag " \ - f"type `{required_tag_type}`" + assert [ + tag for tag in dag.tags if required_tag_type in tag + ], f"DAG: {dag_name}: Missing required tag type `{required_tag_type}`" diff --git a/tests/utils/test_backfill.py b/tests/utils/test_backfill.py index 2c9e8c10..95bdedd1 100644 --- a/tests/utils/test_backfill.py +++ b/tests/utils/test_backfill.py @@ -1,23 +1,21 @@ -from typing import List - import pytest from dags.utils.backfill import BackfillParams -@pytest.fixture(scope="function") +@pytest.fixture() def base_params() -> dict: return { - 'clear': False, - 'dry_run': True, - 'dag_name': 'dag_name', - 'end_date': '2022-11-10', - 'start_date': '2022-10-31', - 'task_regex': None, + "clear": False, + "dry_run": True, + "dag_name": "dag_name", + "end_date": "2022-11-10", + "start_date": "2022-10-31", + "task_regex": None, } -@pytest.fixture(scope="function") +@pytest.fixture() def base_backfill_params(base_params: dict) -> BackfillParams: return BackfillParams(**base_params) @@ -27,7 +25,10 @@ def test_date_validation(base_backfill_params) -> None: base_backfill_params.validate_date_range() # invalid date range - base_backfill_params.start_date, base_backfill_params.end_date = base_backfill_params.end_date, base_backfill_params.start_date + base_backfill_params.start_date, base_backfill_params.end_date = ( + base_backfill_params.end_date, + base_backfill_params.start_date, + ) with pytest.raises(ValueError): base_backfill_params.validate_date_range() @@ -47,7 +48,8 @@ def test_validate_regex_pattern(base_backfill_params) -> None: def test_generate_backfill_command(base_backfill_params) -> None: - """Assert backfill commands are equivalent between the backfill plugin and backfill DAG + """ + Assert backfill commands are equivalent between the backfill plugin and backfill DAG. Expected results were generated from the plugin implementation @@ -55,33 +57,109 @@ def test_generate_backfill_command(base_backfill_params) -> None: test_start_date = "2022-01-01" test_end_date = "2022-01-10" - test_params: List[BackfillParams] = [ - BackfillParams(clear=True, dry_run=True, task_regex=None, dag_name="test_value", start_date=test_start_date, - end_date=test_end_date), - BackfillParams(clear=False, dry_run=True, task_regex=None, dag_name="test_value", start_date=test_start_date, - end_date=test_end_date), - BackfillParams(clear=True, dry_run=False, task_regex=None, dag_name="test_value", start_date=test_start_date, - end_date=test_end_date), - BackfillParams(clear=False, dry_run=False, task_regex=None, dag_name="test_value", start_date=test_start_date, - end_date=test_end_date), - BackfillParams(clear=False, dry_run=False, task_regex="/ab+c/", dag_name="test_value", - start_date=test_start_date, - end_date=test_end_date), + test_params: list[BackfillParams] = [ + BackfillParams( + clear=True, + dry_run=True, + task_regex=None, + dag_name="test_value", + start_date=test_start_date, + end_date=test_end_date, + ), + BackfillParams( + clear=False, + dry_run=True, + task_regex=None, + dag_name="test_value", + start_date=test_start_date, + end_date=test_end_date, + ), + BackfillParams( + clear=True, + dry_run=False, + task_regex=None, + dag_name="test_value", + start_date=test_start_date, + end_date=test_end_date, + ), + BackfillParams( + clear=False, + dry_run=False, + task_regex=None, + dag_name="test_value", + start_date=test_start_date, + end_date=test_end_date, + ), + BackfillParams( + clear=False, + dry_run=False, + task_regex="/ab+c/", + dag_name="test_value", + start_date=test_start_date, + end_date=test_end_date, + ), ] expected_results = [ [ - 'timeout', '60', 'airflow', 'tasks', 'clear', '-s', '2022-01-01', '-e', '2022-01-10', 'test_value'], + "timeout", + "60", + "airflow", + "tasks", + "clear", + "-s", + "2022-01-01", + "-e", + "2022-01-10", + "test_value", + ], [ - 'airflow', 'dags', 'backfill', '--donot-pickle', '--dry-run', '-s', '2022-01-01', '-e', '2022-01-10', - 'test_value'], + "airflow", + "dags", + "backfill", + "--donot-pickle", + "--dry-run", + "-s", + "2022-01-01", + "-e", + "2022-01-10", + "test_value", + ], [ - 'airflow', 'tasks', 'clear', '-y', '-s', '2022-01-01', '-e', '2022-01-10', 'test_value'], + "airflow", + "tasks", + "clear", + "-y", + "-s", + "2022-01-01", + "-e", + "2022-01-10", + "test_value", + ], [ - 'airflow', 'dags', 'backfill', '--donot-pickle', '-s', '2022-01-01', '-e', '2022-01-10', 'test_value'], + "airflow", + "dags", + "backfill", + "--donot-pickle", + "-s", + "2022-01-01", + "-e", + "2022-01-10", + "test_value", + ], [ - 'airflow', 'dags', 'backfill', '--donot-pickle', '-t', '/ab+c/', '-s', '2022-01-01', '-e', '2022-01-10', - 'test_value'] + "airflow", + "dags", + "backfill", + "--donot-pickle", + "-t", + "/ab+c/", + "-s", + "2022-01-01", + "-e", + "2022-01-10", + "test_value", + ], ] for params, result in zip(test_params, expected_results):