This commit is contained in:
Mikaël Ducharme 2023-06-28 12:41:31 -04:00 коммит произвёл GitHub
Родитель e8ec66cd27
Коммит 3e3acc0f69
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
14 изменённых файлов: 849 добавлений и 708 удалений

Просмотреть файл

@ -1,4 +1,4 @@
FROM apache/airflow:slim-2.3.3-python3.10
FROM apache/airflow:slim-2.5.3-python3.10
ARG PROJECT_DIR="/opt/airflow"

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -55,7 +55,7 @@ def generate_bash_command(params: dict) -> str:
doc_md = """
# Backfill DAG
#### Use with caution
#### Use with caution
#### Some tips/notes:
@ -79,12 +79,12 @@ doc_md = """
params={
"dag_name": Param("dag_name", type="string"),
"start_date": Param(
(datetime.date.today() - datetime.timedelta(days=10)).isoformat(),
(datetime.datetime.today() - datetime.timedelta(days=10)).isoformat(),
type="string",
format="date-time",
),
"end_date": Param(
datetime.date.today().isoformat(), type="string", format="date-time"
datetime.datetime.today().isoformat(), type="string", format="date-time"
),
"clear": Param(False, type="boolean"),
"dry_run": Param(True, type="boolean"),

Просмотреть файл

@ -63,7 +63,6 @@ with models.DAG(
default_args=default_args,
tags=tags,
) as dag:
# This single task is responsible for sequentially running copy queries
# over all the tables in _live datasets into _stable datasets except those
# that are specifically used in another DAG.
@ -89,8 +88,8 @@ with models.DAG(
"telemetry_live.first_shutdown_v4",
"telemetry_live.saved_session_v4",
],
node_selectors={"nodepool": "highmem"},
resources=resources,
node_selector={"nodepool": "highmem"},
container_resources=resources,
)
with TaskGroup("copy_deduplicate_all_external") as copy_deduplicate_all_external:

Просмотреть файл

@ -13,7 +13,7 @@ from collections import namedtuple
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import ExternalTaskSensor
from operators.gcp_container_operator import GKEPodOperator
from utils.constants import ALLOWED_STATES, FAILED_STATES
@ -65,8 +65,7 @@ for platform, config in CONFIGS.items():
start_date=datetime(2023, 3, 20),
doc_md=__doc__,
) as dag:
run_all = DummyOperator(
run_all = EmptyOperator(
task_id="run_all",
)

Просмотреть файл

@ -1,18 +1,20 @@
from airflow.providers.google.cloud.transfers.bigquery_to_gcs import BigQueryToGCSOperator
from airflow.providers.google.cloud.operators.gcs import GCSDeleteObjectsOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.models import DAG
from airflow.operators.subdag import SubDagOperator
from airflow.providers.google.cloud.operators.gcs import GCSDeleteObjectsOperator
from airflow.providers.google.cloud.transfers.bigquery_to_gcs import (
BigQueryToGCSOperator,
)
from utils.gcp import bigquery_etl_query
gcp_conn_id = "google_cloud_airflow_dataproc"
project_id = "moz-fx-data-shared-prod"
glam_bucket = "moz-fx-data-glam-prod-fca7-etl-data"
def extracts_subdag(
parent_dag_name, child_dag_name, default_args, schedule_interval, dataset_id
):
dag_id = "{}.{}".format(parent_dag_name, child_dag_name)
dag_id = f"{parent_dag_name}.{child_dag_name}"
dag = DAG(
dag_id=dag_id, default_args=default_args, schedule_interval=schedule_interval
)
@ -21,13 +23,13 @@ def extracts_subdag(
SubDagOperator(
subdag=extract_channel_subdag(
dag_id,
"extract_{}".format(channel),
f"extract_{channel}",
default_args,
schedule_interval,
dataset_id,
channel,
),
task_id="extract_{}".format(channel),
task_id=f"extract_{channel}",
dag=dag,
)
@ -43,14 +45,14 @@ def extract_channel_subdag(
channel,
):
dag = DAG(
dag_id="{}.{}".format(parent_dag_name, child_dag_name),
dag_id=f"{parent_dag_name}.{child_dag_name}",
default_args=default_args,
schedule_interval=schedule_interval,
)
bq_extract_table = "glam_extract_firefox_{}_v1".format(channel)
bq_extract_table = f"glam_extract_firefox_{channel}_v1"
etl_query = bigquery_etl_query(
task_id="glam_client_probe_counts_{}_extract".format(channel),
task_id=f"glam_client_probe_counts_{channel}_extract",
destination_table=bq_extract_table,
dataset_id=dataset_id,
project_id=project_id,
@ -59,14 +61,14 @@ def extract_channel_subdag(
sql_file_path="sql/moz-fx-data-shared-prod/{}/glam_client_probe_counts_extract_v1/query.sql".format(
dataset_id
),
parameters=("channel:STRING:{}".format(channel),),
parameters=(f"channel:STRING:{channel}",),
dag=dag,
)
gcs_delete = GCSDeleteObjectsOperator(
task_id="glam_gcs_delete_old_{}_extracts".format(channel),
task_id=f"glam_gcs_delete_old_{channel}_extracts",
bucket_name=glam_bucket,
prefix="aggs-desktop-{}".format(channel),
prefix=f"aggs-desktop-{channel}",
gcp_conn_id=gcp_conn_id,
dag=dag,
)
@ -75,7 +77,7 @@ def extract_channel_subdag(
bucket=glam_bucket, channel=channel
)
bq2gcs = BigQueryToGCSOperator(
task_id="glam_extract_{}_to_csv".format(channel),
task_id=f"glam_extract_{channel}_to_csv",
source_project_dataset_table="{}.{}.{}".format(
project_id, dataset_id, bq_extract_table
),
@ -98,17 +100,17 @@ def extract_user_counts(
schedule_interval,
dataset_id,
task_prefix,
file_prefix
file_prefix,
):
bq_extract_table="glam_{}_extract_v1".format(task_prefix)
bq_extract_table = f"glam_{task_prefix}_extract_v1"
dag = DAG(
dag_id="{}.{}".format(parent_dag_name, child_dag_name),
dag_id=f"{parent_dag_name}.{child_dag_name}",
default_args=default_args,
schedule_interval=schedule_interval,
)
etl_query = bigquery_etl_query(
task_id="glam_{}_extract".format(task_prefix),
task_id=f"glam_{task_prefix}_extract",
destination_table=bq_extract_table,
dataset_id=dataset_id,
project_id=project_id,
@ -117,27 +119,25 @@ def extract_user_counts(
dag=dag,
)
gcs_delete = GCSDeleteObjectsOperator(
task_id="glam_gcs_delete_{}_extracts".format(task_prefix),
task_id=f"glam_gcs_delete_{task_prefix}_extracts",
bucket_name=glam_bucket,
prefix="glam-extract-firefox-{}".format(file_prefix),
prefix=f"glam-extract-firefox-{file_prefix}",
gcp_conn_id=gcp_conn_id,
dag=dag,
)
if file_prefix=="sample-counts":
if file_prefix == "sample-counts":
gcs_destination = "gs://{}/glam-extract-firefox-{}-*.csv".format(
glam_bucket, file_prefix
)
else:
glam_bucket, file_prefix
)
else:
gcs_destination = "gs://{}/glam-extract-firefox-{}.csv".format(
glam_bucket, file_prefix
)
glam_bucket, file_prefix
)
bq2gcs = BigQueryToGCSOperator(
task_id="glam_extract_{}_to_csv".format(task_prefix),
task_id=f"glam_extract_{task_prefix}_to_csv",
source_project_dataset_table="{}.{}.{}".format(
project_id, dataset_id, bq_extract_table
),

Просмотреть файл

@ -1,17 +0,0 @@
### Using kube_client.py from 1.10.2
We used to include the airflow/contrib/kubernetes/kube_client.py from 1.10.2
because the 1.10.7 kube_client.py has some configuration issues when
trying to push xcom from gkepodoperator. if do_push_xcom is set to False,
the upstream GkePodOperator works fine.
### As of 1.10.12 I've removed the backported 1.10.7 gcp_container_operator,
kubernetes_pod_operator, and the 1.10.2 kube_client
### Fivetran operator backported from 2.0+
Fivetran provides an [operator and sensor](https://github.com/fivetran/airflow-provider-fivetran)
for integrating with the Fivetran API for Airflow version 2.0+. This was backported for
Airflow 1.10.15, and then our backport was removed after upgrading to Airflow 2.3.3.
### For 2.1.0 I've removed bigquery_operator_1_10_2.py, in favor of the new
google provider code.

Просмотреть файл

Просмотреть файл

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@ -17,9 +16,9 @@
# specific language governing permissions and limitations
# under the License.
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.sensors.base import BaseSensorOperator
class BigQuerySQLSensorOperator(BaseSensorOperator):
"""
@ -39,42 +38,40 @@ class BigQuerySQLSensorOperator(BaseSensorOperator):
:type timeout: int
"""
template_fields = BaseSensorOperator.template_fields + (
'sql',
)
template_fields = (*BaseSensorOperator.template_fields, "sql")
def __init__(self,
sql,
gcp_conn_id='bigquery_default_conn',
use_legacy_sql=False,
timeout=60*60*24,
*args,
**kwargs):
super(BigQuerySQLSensorOperator, self).__init__(
timeout=timeout,
*args,
**kwargs)
def __init__(
self,
sql,
gcp_conn_id="bigquery_default_conn",
use_legacy_sql=False,
timeout=60 * 60 * 24,
*args,
**kwargs
):
super().__init__(timeout=timeout, *args, **kwargs) # noqa: B026
self.sql = sql
self.gcp_conn_id = gcp_conn_id
self.use_legacy_sql = use_legacy_sql
self.poke_interval = 120
self.mode = 'reschedule'
self.pool = 'DATA_ENG_EXTERNALTASKSENSOR'
self.mode = "reschedule"
self.pool = "DATA_ENG_EXTERNALTASKSENSOR"
def poke(self, context):
self.log.info('Running query: %s', self.sql)
self.log.info("Running query: %s", self.sql)
record = self.get_db_hook().get_first(self.sql)
self.log.info('Resulting Record: %s', record)
self.log.info("Resulting Record: %s", record)
if not record:
return False
else:
if str(record[0]).lower() in ('0', '', 'false', 'null',):
return False
else:
return True
if record:
return record[0].lower() not in (
"0",
"",
"false",
"null",
)
return False
def get_db_hook(self):
return BigQueryHook(gcp_conn_id=self.gcp_conn_id,
use_legacy_sql=self.use_legacy_sql)
return BigQueryHook(
gcp_conn_id=self.gcp_conn_id, use_legacy_sql=self.use_legacy_sql
)

Просмотреть файл

@ -74,7 +74,6 @@ with DAG(
schedule_interval="0 0 * * *",
tags=tags,
) as dag:
airflow_gke_prod_kwargs = {
"gcp_conn_id": "google_cloud_airflow_gke",
"project_id": "moz-fx-data-airflow-gke-prod",
@ -112,7 +111,7 @@ with DAG(
name="probe-scraper-moz-central",
# Needed to scale the highmem pool from 0 -> 1, because cluster autoscaling
# works on pod resource requests, instead of usage
resources={
container_resources={
"request_memory": "13312Mi",
"request_cpu": None,
"limit_memory": "20480Mi",
@ -120,7 +119,7 @@ with DAG(
"limit_gpu": None,
},
# This python job requires 13 GB of memory, thus the highmem node pool
node_selectors={"nodepool": "highmem"},
node_selector={"nodepool": "highmem"},
# Due to the nature of the container run, we set get_logs to False, to avoid
# urllib3.exceptions.ProtocolError: 'Connection broken: IncompleteRead(0 bytes
# read)' errors where the pod continues to run, but airflow loses its connection

Просмотреть файл

@ -127,7 +127,7 @@ flat_rate = gke_command(
reattach_on_restart=True,
# Needed to scale the highmem pool from 0 -> 1, because cluster autoscaling
# works on pod resource requests, instead of usage
resources={
container_resources={
"request_memory": "13312Mi",
"request_cpu": None,
"limit_memory": "20480Mi",
@ -135,7 +135,7 @@ flat_rate = gke_command(
"limit_gpu": None,
},
# This job was being killed by Kubernetes for using too much memory, thus the highmem node pool
node_selectors={"nodepool": "highmem"},
node_selector={"nodepool": "highmem"},
# Give additional time since we may need to scale up when running this job
startup_timeout_seconds=360,
dag=dag,

Просмотреть файл

@ -11,7 +11,7 @@
# AIRFLOW_PROJ_DIR - Base path to which all the files will be volumed.
# Default: .
---
version: '3'
version: '3.8'
x-airflow-common:
&airflow-common
build: .
@ -30,6 +30,11 @@ x-airflow-common:
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.default'
# yamllint disable rule:line-length
# Use simple http server on scheduler for health checks
# See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server
# yamllint enable rule:line-length
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
DEPLOY_ENVIRONMENT: dev
volumes:
- ./dags:/opt/airflow/dags
@ -57,8 +62,9 @@ services:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
interval: 10s
retries: 5
start_period: 5s
restart: always
redis:
@ -67,21 +73,23 @@ services:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
interval: 10s
timeout: 30s
retries: 50
start_period: 30s
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- '8080:8080'
- "8080:8080"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
@ -92,10 +100,11 @@ services:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 10s
test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
@ -109,9 +118,10 @@ services:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
environment:
<<: *airflow-common-env
# Required to handle warm shutdown of the celery workers properly
@ -128,9 +138,10 @@ services:
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 10s
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
@ -164,7 +175,7 @@ services:
echo "If you are on Linux, you SHOULD follow the instructions below to set "
echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
echo "For other operating systems you can get rid of the warning with manually created .env file:"
echo " See: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#setting-the-right-airflow-user"
echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
echo
fi
one_meg=1048576
@ -197,7 +208,7 @@ services:
echo
echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of resources available:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin"
echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
echo
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
@ -213,7 +224,7 @@ services:
_PIP_ADDITIONAL_REQUIREMENTS: ''
user: "0:0"
volumes:
- .:/sources
- ${AIRFLOW_PROJ_DIR:-.}:/sources
airflow-cli:
<<: *airflow-common
@ -237,12 +248,13 @@ services:
profiles:
- flower
ports:
- 5555:5555
- "5555:5555"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on

Просмотреть файл

@ -1,24 +1,21 @@
# Official Airflow constraints file
# Doc: https://airflow.apache.org/docs/apache-airflow/stable/installation/installing-from-pypi.html#constraints-files
# File: https://raw.githubusercontent.com/apache/airflow/constraints-2.3.3/constraints-3.10.txt
# File: https://raw.githubusercontent.com/apache/airflow/constraints-2.5.3/constraints-3.10.txt
--constraint ./constraints.txt
# Airflow dependencies
apache-airflow[amazon,async,celery,cncf.kubernetes,github_enterprise,google_auth,jdbc,mysql,password,postgres,redis,statsd]==2.3.3
apache-airflow[amazon,async,celery,cncf.kubernetes,github_enterprise,google_auth,jdbc,mysql,password,postgres,redis,statsd]==2.5.3
apache-airflow-providers-google
apache-airflow-providers-http
apache-airflow-providers-slack
airflow-provider-fivetran==1.1.2
# Code quality
pytest==6.2.5
pytest==7.2.2
pytest-mock==3.10.0
black==22.6.0
black==23.1a1
ruff==0.0.269
# Misc
mozlogging
# Required for backfill UI
flask-admin
shelljob==0.5.6

Разница между файлами не показана из-за своего большого размера Загрузить разницу