Step to validate airflow dags as part of CI (#1446)

* script to validate dag tags and step to circle ci

* trying out dag tagg validation through parsing

* added missing tag so that tag check does not fail

* Using SQL approach for validation, added extra logging and clean up

* added check to make sure all DAGs have tags

* fixed 3 DAGs missing tags

* implemented suggestions by @haroldwoo in #1446

Co-authored-by: = <=>
This commit is contained in:
kik-kik 2022-01-14 13:04:32 +01:00 коммит произвёл GitHub
Родитель 6ee60b4c44
Коммит 1dbfb6ad5c
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
8 изменённых файлов: 141 добавлений и 7 удалений

Просмотреть файл

@ -58,8 +58,10 @@ jobs:
docker-compose build
# now take ownership of the folder
sudo chown -R 10001:10001 .
- run:
name: Test if dag scripts can be parsed
name: Test if dag scripts can be parsed by Airflow and that tags have been correctly set
# Valid DAG tags are defined in: `bin/test-dag-tags.py`
command: bash bin/test-parse
deploy:

3
.gitignore поставляемый
Просмотреть файл

@ -9,7 +9,10 @@ unittests.cfg
airflow-webserver.pid
airflow-worker.pid
.config
.viminfo
.credentials
.bash_history
.mysql_history
/dags/bigquery-etl-dags
/dags/bigquery-etl-dags/*

Просмотреть файл

@ -17,7 +17,7 @@ RUN apt-get update && \
apt-get install -y --no-install-recommends \
apt-transport-https build-essential curl git libpq-dev python-dev \
default-libmysqlclient-dev gettext sqlite3 libffi-dev libsasl2-dev \
lsb-release gnupg vim screen procps && \
lsb-release gnupg vim screen procps default-mysql-client && \
CLOUD_SDK_REPO="cloud-sdk-$(lsb_release -c -s)" && \
echo "deb http://packages.cloud.google.com/apt $CLOUD_SDK_REPO main" | tee -a /etc/apt/sources.list.d/google-cloud-sdk.list && \
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add - && \

111
bin/test-dag-tags.py Normal file
Просмотреть файл

@ -0,0 +1,111 @@
#!/bin/python3
from collections import defaultdict
import subprocess
from typing import Dict
VALID_TAGS = ("impact/tier_1", "impact/tier_2", "impact/tier_3", "repo/bigquery-etl", "repo/telemetry-airflow",)
REQUIRED_TAG_TYPES = ("impact",)
class DagValidationError(Exception):
pass
def _execute_shell_cmd(cmd: str, cmd_params: Dict[str, dict] = dict()) -> str:
mask_string = "###"
cmd_params_log = {
key: val["value"] if not val["value"] else mask_string
for key, val in cmd_params.items()
}
print("Executing command: %s" % (cmd.format(**cmd_params_log)))
cmd_params_formatted = {
key: val["value"]
for key, val in cmd_params.items()
}
cmd_output = subprocess.run(
cmd.format(**cmd_params_formatted),
shell=True,
capture_output=True,
text=True,
)
try:
cmd_output.check_returncode()
except subprocess.CalledProcessError as _err:
print(cmd_output.stdout)
print(cmd_output.stderr)
raise _err
print("Command executed successfully, processing output...")
return cmd_output.stdout.strip().replace("\r", "").split("\n")
def get_loaded_airflow_dag_tags_from_db(pswd: str) -> Dict[str, str]:
shell_cmd = "docker-compose exec web mysql -Ns -h db -u root -p{pswd} airflow -e 'SELECT dag_id, name FROM dag_tag;'"
cmd_params = {
"pswd": {
"value": pswd,
"is_sensitive": True,
}
}
cmd_output = _execute_shell_cmd(shell_cmd, cmd_params)
dags = defaultdict(list)
for dag in cmd_output:
dag_name, tag_name = dag.split("\t")
dags[dag_name] = dags[dag_name] + [tag_name]
print("Number of DAGs with tags found: %s" % (len(dags)))
return dags
def get_loaded_dags_from_db(pswd) -> int:
cmd_params = {
"pswd": {
"value": pswd,
"is_sensitive": True,
}
}
return _execute_shell_cmd("docker-compose exec web mysql -Ns -h db -u root -p{pswd} airflow -e 'SELECT dag_id from dag WHERE is_subdag = 0;'", cmd_params)
if __name__ == "__main__":
db_pass = "secret"
# Assumes the web and db containers are already running
tag_errors = 0
dags = get_loaded_dags_from_db(db_pass)
num_of_dags = len(dags)
dags_with_tags = get_loaded_airflow_dag_tags_from_db(db_pass)
num_of_dags_with_tags = len(dags_with_tags)
if num_of_dags != num_of_dags_with_tags:
print("Num of DAGs in `dag` table: %s, num of dags with tags: %s" % (num_of_dags, num_of_dags_with_tags))
dags_with_missing_tags = set(dags).difference(set(dags_with_tags))
raise DagValidationError("The following DAGs are missing tags entirely: %s" % (dags_with_missing_tags))
for file_name, tags in dags_with_tags.items():
tag_categories = [category.split("/")[0] for category in tags]
if not all(req_tag in tag_categories for req_tag in REQUIRED_TAG_TYPES):
tag_errors += 1
print('%s is missing a required tag. Required tags include: %s. Please refer to: https://mozilla.github.io/bigquery-etl/reference/airflow_tags/ for more information.' % (file_name, REQUIRED_TAG_TYPES))
if any(tag not in VALID_TAGS for tag in tags):
tag_errors += 1
print('DAG file: %s contains an invalid tag. Tags specified: %s, valid tags: %s.' % (file_name, tags, VALID_TAGS))
if tag_errors:
raise DagValidationError("DAG tags validation failed.")
print("DAG tags validation for %s DAGs successful." % (len(dags)))

Просмотреть файл

@ -53,7 +53,7 @@ function main() {
echo "Existing container is up, please bring it down and re-run."
exit 1
fi
# Register the cleanup function, note that the testing function may override
# the trap on the exit signal.
function cleanup {
@ -88,6 +88,15 @@ function main() {
echo "Unit tests passed!"
fi
# Validate Airflow tags
# see Github Issue: https://github.com/mozilla/telemetry-airflow/issues/1443
python3 bin/test-dag-tags.py
if [[ $? -ne 0 ]]; then
echo "Invalid tag configuration detected."
exit 1
fi
# END Validate Airflow tags
echo "All checks passed, the dags can be parsed locally."
}

Просмотреть файл

@ -36,6 +36,7 @@ with DAG(
default_args=default_args,
doc_md=DOCS,
schedule_interval="0 5 * * *",
tags=tags,
) as dag:
casa_sync_start = FivetranOperator(

Просмотреть файл

@ -43,6 +43,7 @@ with DAG(
default_args=default_args,
# dag runs daily but tasks only run on certain days
schedule_interval="0 5 * * *",
tags=tags,
doc_md=__doc__,
) as dag:
# top_signatures_correlations uploads results to public analysis bucket

Просмотреть файл

@ -4,6 +4,7 @@ from airflow import DAG
from operators.backport.fivetran.operator import FivetranOperator
from operators.backport.fivetran.sensor import FivetranSensor
from airflow.operators.dummy import DummyOperator
from utils.tags import Tag
docs = """
### fivetran_intacct
@ -27,6 +28,9 @@ default_args = {
"retries": 2,
"retry_delay": timedelta(minutes=30),
}
tags = [Tag.ImpactTier.tier_1, "repo/telemetry-airflow", ]
list_of_connectors ={
"moz": "decently_wouldst",
"germany": "backslid_mumps",
@ -47,10 +51,13 @@ list_of_connectors ={
"uk": "toy_tribute"
}
with DAG('fivetran_intacct_historical',
default_args=default_args,
doc_md=docs,
schedule_interval="0 5 * * *") as dag:
with DAG(
'fivetran_intacct_historical',
default_args=default_args,
doc_md=docs,
schedule_interval="0 5 * * *",
tags=tags,
) as dag:
fivetran_sensors_complete = DummyOperator(
task_id='intacct-fivetran-sensors-complete',