chore: Replace gke_command usage with GKEPodOperator. (#1948)

This commit is contained in:
Mikaël Ducharme 2024-03-12 15:08:53 -04:00 коммит произвёл GitHub
Родитель 5166835fd5
Коммит f317239ff2
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
19 изменённых файлов: 143 добавлений и 245 удалений

Просмотреть файл

@ -1,7 +1,9 @@
from datetime import datetime, timedelta
from airflow import DAG
from utils.gcp import bigquery_etl_query, gke_command
from operators.gcp_container_operator import GKEPodOperator
from utils.gcp import bigquery_etl_query
from utils.tags import Tag
default_args = {
@ -76,10 +78,10 @@ with DAG(
if i == 0:
commands.append("--overwrite")
app_store_analytics = gke_command(
app_store_analytics = GKEPodOperator(
task_id=f"app_store_analytics_{app_name}",
command=commands,
docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/app-store-analytics-export:latest",
arguments=commands,
image="gcr.io/moz-fx-data-airflow-prod-88e0/app-store-analytics-export:latest",
gcp_conn_id="google_cloud_airflow_gke",
dag=dag,
)

Просмотреть файл

@ -3,7 +3,7 @@ import datetime
from airflow.decorators import dag, task
from airflow.models.param import Param
from utils.gcp import gke_command
from operators.gcp_container_operator import GKEPodOperator
from utils.tags import Tag
doc_md = """
@ -159,11 +159,11 @@ def bqetl_backfill_dag():
return cmd
gke_command(
GKEPodOperator(
reattach_on_restart=True,
task_id="bqetl_backfill",
command=generate_backfill_command(),
docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest",
arguments=generate_backfill_command(),
image="gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest",
gcp_conn_id="google_cloud_airflow_gke",
)

Просмотреть файл

@ -2,7 +2,7 @@ from datetime import datetime
from airflow import DAG
from utils.gcp import gke_command
from operators.gcp_container_operator import GKEPodOperator
from utils.tags import Tag
DOCS = """
@ -45,9 +45,9 @@ with DAG(
tags=tags,
catchup=False,
) as dag:
broken_site_report_ml = gke_command(
broken_site_report_ml = GKEPodOperator(
task_id="broken_site_report_ml",
command=[
arguments=[
"python",
"broken_site_report_ml/main.py",
"--bq_project_id",
@ -55,6 +55,6 @@ with DAG(
"--bq_dataset_id",
"webcompat_user_reports",
],
docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/broken-site-report-ml_docker_etl:latest",
image="gcr.io/moz-fx-data-airflow-prod-88e0/broken-site-report-ml_docker_etl:latest",
dag=dag,
)

Просмотреть файл

@ -1,16 +1,16 @@
"""
Runs a Docker image that imports Quicksuggest suggestions
from Remote Settings to BigQuery.
Runs a Docker image that imports Quicksuggest suggestions from Remote Settings to BigQuery.
See the [`quicksuggest2bq`](https://github.com/mozilla/docker-etl/tree/main/jobs/quicksuggest2bq)
docker image defined in `docker-etl`.
"""
from airflow import DAG
from datetime import datetime, timedelta
from utils.gcp import gke_command
from utils.tags import Tag
from airflow import DAG
from operators.gcp_container_operator import GKEPodOperator
from utils.tags import Tag
default_args = {
"owner": "wstuckey@mozilla.com",
@ -35,14 +35,17 @@ with DAG(
tags=tags,
) as dag:
quicksuggest2bq = gke_command(
quicksuggest2bq = GKEPodOperator(
task_id="quicksuggest2bq",
command=[
"python", "quicksuggest2bq/main.py",
"--destination-project", project_id,
"--destination-table-id", table_id,
arguments=[
"python",
"quicksuggest2bq/main.py",
"--destination-project",
project_id,
"--destination-table-id",
table_id,
],
docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/quicksuggest2bq_docker_etl:latest",
image="gcr.io/moz-fx-data-airflow-prod-88e0/quicksuggest2bq_docker_etl:latest",
gcp_conn_id="google_cloud_airflow_gke",
dag=dag,
email=[

Просмотреть файл

@ -4,10 +4,10 @@ from airflow import models
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.utils.task_group import TaskGroup
from operators.gcp_container_operator import GKEPodOperator
from utils.gcp import (
bigquery_etl_copy_deduplicate,
bigquery_etl_query,
gke_command,
)
from utils.tags import Tag
@ -378,15 +378,15 @@ with models.DAG(
'AS "refusing to archive empty partition"'
)
main_v5 = "moz-fx-data-shared-prod:telemetry_stable.main_v5"
archive_main = gke_command(
archive_main = GKEPodOperator(
reattach_on_restart=True,
task_id="archive_main",
cmds=["bash", "-x", "-c"],
command=[
arguments=[
f"bq query '{precheck_query}' &&"
f"bq cp -f {main_v5}'$'{archive_partition_id} "
f"{main_v5}_archive'$'{archive_partition_id} && "
f"bq rm -f {main_v5}'$'{archive_partition_id}"
],
docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest",
image="gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest",
)

Просмотреть файл

@ -2,7 +2,7 @@ from datetime import datetime, timedelta
from airflow import DAG
from utils.gcp import gke_command
from operators.gcp_container_operator import GKEPodOperator
from utils.tags import Tag
DOCS = """
@ -56,9 +56,9 @@ with DAG(
schedule_interval="@daily",
tags=tags,
) as dag:
dap_collector = gke_command(
dap_collector = GKEPodOperator(
task_id="dap_collector",
command=[
arguments=[
"python",
"dap_collector/main.py",
"--date={{ ds }}",
@ -70,6 +70,6 @@ with DAG(
"--table-id",
table_id,
],
docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/dap-collector_docker_etl:latest",
image="gcr.io/moz-fx-data-airflow-prod-88e0/dap-collector_docker_etl:latest",
gcp_conn_id="google_cloud_airflow_gke",
)

Просмотреть файл

@ -1,6 +1,5 @@
"""
See [experiments-monitoring-data-export in the docker-etl repository]
(https://github.com/mozilla/docker-etl/tree/main/jobs/experiments-monitoring-data-export).
See [experiments-monitoring-data-export in the docker-etl repository](https://github.com/mozilla/docker-etl/tree/main/jobs/experiments-monitoring-data-export).
This DAG exports views related to experiment monitoring to GCS as JSON
every 5 minutes to power the Experimenter console.
@ -10,22 +9,22 @@ from datetime import datetime
from airflow import DAG
from utils.gcp import gke_command
from operators.gcp_container_operator import GKEPodOperator
from utils.tags import Tag
default_args = {
'owner': 'ascholtz@mozilla.com',
'depends_on_past': False,
'start_date': datetime(2021, 1, 8),
'email_on_failure': True,
'email_on_retry': True,
"owner": "ascholtz@mozilla.com",
"depends_on_past": False,
"start_date": datetime(2021, 1, 8),
"email_on_failure": True,
"email_on_retry": True,
}
tags = [Tag.ImpactTier.tier_2]
# We rely on max_active_runs=1 at the DAG level to manage the dependency on past runs.
with DAG(
'experiments_live',
"experiments_live",
default_args=default_args,
max_active_tasks=4,
max_active_runs=1,
@ -43,15 +42,17 @@ with DAG(
"moz-fx-data-shared-prod.telemetry_derived.experiment_cumulative_ad_clicks_v1",
"moz-fx-data-shared-prod.telemetry_derived.experiment_cumulative_search_count_v1",
"moz-fx-data-shared-prod.telemetry_derived.experiment_cumulative_search_with_ads_count_v1",
"moz-fx-data-shared-prod.telemetry.experiment_enrollment_daily_active_population"
"moz-fx-data-shared-prod.telemetry.experiment_enrollment_daily_active_population",
]
experiment_enrollment_export = gke_command(
experiment_enrollment_export = GKEPodOperator(
task_id="experiment_enrollment_export",
command=[
"python", "experiments_monitoring_data_export/export.py",
arguments=[
"python",
"experiments_monitoring_data_export/export.py",
"--datasets",
] + experiment_datasets,
docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/experiments-monitoring-data-export_docker_etl:latest",
*experiment_datasets,
],
image="gcr.io/moz-fx-data-airflow-prod-88e0/experiments-monitoring-data-export_docker_etl:latest",
dag=dag,
)

Просмотреть файл

@ -1,7 +1,8 @@
from datetime import datetime, timedelta
from airflow import DAG
from utils.gcp import gke_command
from operators.gcp_container_operator import GKEPodOperator
from utils.tags import Tag
DOCS = """
@ -53,9 +54,9 @@ with DAG(
schedule_interval="@daily",
tags=tags,
) as dag:
contile_adm_request = gke_command(
contile_adm_request = GKEPodOperator(
task_id="contile_adm_request",
command=[
arguments=[
"python",
"influxdb_to_bigquery/main.py",
"--date={{ ds }}",
@ -67,12 +68,12 @@ with DAG(
"--bq_dataset_id=telemetry_derived",
"--bq_table_id=contile_tiles_adm_request",
],
docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/influxdb-to-bigquery_docker_etl:latest",
image="gcr.io/moz-fx-data-airflow-prod-88e0/influxdb-to-bigquery_docker_etl:latest",
gcp_conn_id="google_cloud_airflow_gke",
)
adm_response_tiles_count = gke_command(
adm_response_tiles_count = GKEPodOperator(
task_id="adm_response_tiles_count",
command=[
arguments=[
"python",
"influxdb_to_bigquery/main.py",
"--date={{ ds }}",
@ -84,12 +85,12 @@ with DAG(
"--bq_dataset_id=telemetry_derived",
"--bq_table_id=contile_tiles_adm_response_tiles_count",
],
docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/influxdb-to-bigquery_docker_etl:latest",
image="gcr.io/moz-fx-data-airflow-prod-88e0/influxdb-to-bigquery_docker_etl:latest",
gcp_conn_id="google_cloud_airflow_gke",
)
adm_empty_response = gke_command(
adm_empty_response = GKEPodOperator(
task_id="adm_empty_response",
command=[
arguments=[
"python",
"influxdb_to_bigquery/main.py",
"--date={{ ds }}",
@ -101,7 +102,7 @@ with DAG(
"--bq_dataset_id=telemetry_derived",
"--bq_table_id=contile_filter_adm_empty_response",
],
docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/influxdb-to-bigquery_docker_etl:latest",
image="gcr.io/moz-fx-data-airflow-prod-88e0/influxdb-to-bigquery_docker_etl:latest",
gcp_conn_id="google_cloud_airflow_gke",
)

Просмотреть файл

@ -5,6 +5,7 @@ This DAG runs the forecast Desktop DAU and Mobile DAU. The output powers KPI das
This DAG is high priority for week 1 of the month and low priority otherwise.
"""
import os
from collections import namedtuple
from datetime import datetime, timedelta
@ -12,8 +13,8 @@ from datetime import datetime, timedelta
from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
from operators.gcp_container_operator import GKEPodOperator
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import gke_command
from utils.tags import Tag
default_args = {
@ -66,10 +67,10 @@ with DAG(
if not isinstance(config.wait_tasks, list):
wait_tasks = [wait_tasks]
forecast_task = gke_command(
forecast_task = GKEPodOperator(
task_id=f"kpi_forecasting_{id}",
command=["python", script_path, "-c", config_path],
docker_image=IMAGE,
arguments=["python", script_path, "-c", config_path],
image=IMAGE,
dag=dag,
)

Просмотреть файл

@ -19,7 +19,7 @@ from datetime import datetime, timedelta
from airflow import DAG
from utils.gcp import gke_command
from operators.gcp_container_operator import GKEPodOperator
from utils.tags import Tag
default_args = {
@ -48,22 +48,19 @@ with DAG(
doc_md=__doc__,
tags=tags,
) as dag:
mad_server_pull = gke_command(
mad_server_pull = GKEPodOperator(
task_id="mad_server_pull",
# Controls the entrypoint of the container, which for mad-server
# defaults to bin/run rather than a shell.
cmds=[
"/bin/bash",
],
command=[
arguments=[
"bin/airflow-pull",
],
docker_image="us-west1-docker.pkg.dev/moz-fx-data-airflow-prod-88e0/data-science-artifacts/mad-server:latest",
image="us-west1-docker.pkg.dev/moz-fx-data-airflow-prod-88e0/data-science-artifacts/mad-server:latest",
startup_timeout_seconds=500,
gcp_conn_id="google_cloud_airflow_gke",
gke_project_id="moz-fx-data-airflow-gke-prod",
gke_cluster_name="workloads-prod-v1",
gke_location="us-west1",
env_vars={
"GCS_BUCKET": gcs_bucket,
"GCS_ROOT_TRAINING": gcs_root_training,
@ -77,23 +74,19 @@ with DAG(
"gleonard@mozilla.com",
],
)
mad_train_model = gke_command(
mad_train_model = GKEPodOperator(
task_id="train_model",
cmds=[
"/bin/bash",
],
command=[
arguments=[
"bin/train_model",
"--publish",
"--publish-as-latest",
"./working",
],
docker_image="us-west1-docker.pkg.dev/moz-fx-data-airflow-prod-88e0/data-science-artifacts/mad-server:latest",
image="us-west1-docker.pkg.dev/moz-fx-data-airflow-prod-88e0/data-science-artifacts/mad-server:latest",
startup_timeout_seconds=500,
gcp_conn_id="google_cloud_airflow_gke",
gke_project_id="moz-fx-data-airflow-gke-prod",
gke_cluster_name="workloads-prod-v1",
gke_location="us-west1",
env_vars={
"GCS_BUCKET": gcs_bucket,
"GCS_ROOT_TRAINING": gcs_root_training,
@ -107,23 +100,20 @@ with DAG(
"gleonard@mozilla.com",
],
)
new_data_eval = gke_command(
new_data_eval = GKEPodOperator(
task_id="evaluate_new_data",
cmds=[
"/bin/bash",
],
command=[
arguments=[
"bin/evaluate_new_data",
"--publish",
"--publish-as-latest",
"./working",
],
docker_image="us-west1-docker.pkg.dev/moz-fx-data-airflow-prod-88e0/data-science-artifacts/mad-server:latest",
image="us-west1-docker.pkg.dev/moz-fx-data-airflow-prod-88e0/data-science-artifacts/mad-server:latest",
startup_timeout_seconds=500,
gcp_conn_id="google_cloud_airflow_gke",
gke_project_id="moz-fx-data-airflow-gke-prod",
gke_cluster_name="workloads-prod-v1",
gke_location="us-west1",
env_vars={
"GCS_BUCKET": gcs_bucket,
"GCS_ROOT_TRAINING": gcs_root_training,

Просмотреть файл

@ -14,8 +14,9 @@ from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.subdag import SubDagOperator
from airflow.providers.google.cloud.operators.gcs import GCSDeleteObjectsOperator
from operators.gcp_container_operator import GKEPodOperator
from utils.dataproc import copy_artifacts_dev, moz_dataproc_pyspark_runner
from utils.gcp import gke_command
from utils.tags import Tag
EXPORT_TO_AVRO = True
@ -91,9 +92,7 @@ mobile_aggregate_view_dataproc = SubDagOperator(
additional_metadata={
"PIP_PACKAGES": "git+https://github.com/mozilla/python_mozaggregator.git@pbd_fix_2"
},
python_driver_code="gs://{}/jobs/mozaggregator_runner.py".format(
artifact_bucket
),
python_driver_code=f"gs://{artifact_bucket}/jobs/mozaggregator_runner.py",
py_args=[
"mobile",
"--date",
@ -123,10 +122,10 @@ mobile_aggregate_view_dataproc = SubDagOperator(
# export to avro, if necessary
if EXPORT_TO_AVRO:
gke_command(
GKEPodOperator(
task_id="export_mobile_metrics_avro",
cmds=["bash"],
command=[
arguments=[
"bin/export-avro.sh",
"moz-fx-data-shared-prod",
"moz-fx-data-shared-prod:analysis",
@ -135,7 +134,7 @@ if EXPORT_TO_AVRO:
'""',
"{{ ds }}",
],
docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/python_mozaggregator:latest",
image="gcr.io/moz-fx-data-airflow-prod-88e0/python_mozaggregator:latest",
dag=dag,
).set_downstream(mobile_aggregate_view_dataproc)

Просмотреть файл

@ -5,11 +5,12 @@ The container is defined in
[docker-etl](https://github.com/mozilla/docker-etl/tree/main/jobs/play-store-export)
"""
from airflow import DAG
from datetime import datetime, timedelta
from utils.gcp import gke_command
from utils.tags import Tag
from airflow import DAG
from operators.gcp_container_operator import GKEPodOperator
from utils.tags import Tag
default_args = {
"owner": "akomar@mozilla.com",
@ -25,23 +26,26 @@ project_id = "moz-fx-data-marketing-prod"
tags = [Tag.ImpactTier.tier_3]
with DAG("play_store_export",
with DAG(
"play_store_export",
default_args=default_args,
doc_md=__doc__,
schedule_interval="@daily",
tags=tags,
) as dag:
play_store_export = gke_command(
play_store_export = GKEPodOperator(
task_id="play_store_export",
command=[
"python", "play_store_export/export.py",
arguments=[
"python",
"play_store_export/export.py",
"--date={{ yesterday_ds }}",
"--backfill-day-count=60",
"--project", project_id,
"--project",
project_id,
"--transfer-config={{ var.value.play_store_transfer_config_id }}",
],
docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/play-store-export:latest",
image="gcr.io/moz-fx-data-airflow-prod-88e0/play-store-export:latest",
gcp_conn_id="google_cloud_airflow_gke",
dag=dag,
email=[

Просмотреть файл

@ -4,9 +4,11 @@ Daily deployment of static bigquery-etl data to various projects.
See the publish command [here](https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/static/__init__.py).
"""
from datetime import datetime, timedelta
from airflow import DAG
from datetime import timedelta, datetime
from utils.gcp import gke_command
from operators.gcp_container_operator import GKEPodOperator
from utils.tags import Tag
IMAGE = "gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest"
@ -35,20 +37,20 @@ with DAG(
tags=tags,
) as dag:
publish_static_mozdata = gke_command(
publish_static_mozdata = GKEPodOperator(
task_id="publish_static_mozdata",
command=[
"script/bqetl", "static", "publish",
"--project_id", "mozdata"
],
docker_image=IMAGE,
arguments=["script/bqetl", "static", "publish", "--project_id", "mozdata"],
image=IMAGE,
)
publish_static_shared_prod = gke_command(
publish_static_shared_prod = GKEPodOperator(
task_id="publish_static_shared_prod",
command=[
"script/bqetl", "static", "publish",
"--project_id", "moz-fx-data-shared-prod"
arguments=[
"script/bqetl",
"static",
"publish",
"--project_id",
"moz-fx-data-shared-prod",
],
docker_image=IMAGE,
image=IMAGE,
)

Просмотреть файл

@ -5,13 +5,14 @@ The container is defined in
[docker-etl](https://github.com/mozilla/docker-etl/tree/main/jobs/search-alert)
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import gke_command
from utils.tags import Tag
from operators.gcp_container_operator import GKEPodOperator
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.tags import Tag
default_args = {
"owner": "akomar@mozilla.com",
@ -29,7 +30,8 @@ default_args = {
tags = [Tag.ImpactTier.tier_2]
with DAG("search_alert",
with DAG(
"search_alert",
default_args=default_args,
doc_md=__doc__,
schedule_interval="0 4 * * *",
@ -53,14 +55,15 @@ with DAG("search_alert",
dag=dag,
)
search_alert = gke_command(
search_alert = GKEPodOperator(
task_id="search_alert",
command=[
"python", "search_alert/main.py",
arguments=[
"python",
"search_alert/main.py",
"--submission_date={{ ds }}",
"--project_id=mozdata",
],
docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/search-alert_docker_etl:latest",
image="gcr.io/moz-fx-data-airflow-prod-88e0/search-alert_docker_etl:latest",
gcp_conn_id="google_cloud_airflow_gke",
)

Просмотреть файл

@ -12,7 +12,7 @@ from datetime import datetime, timedelta
from airflow import DAG
from utils.gcp import gke_command
from operators.gcp_container_operator import GKEPodOperator
from utils.tags import Tag
default_args = {
@ -39,9 +39,9 @@ with DAG(
doc_md=__doc__,
tags=tags,
) as dag:
search_term_data_validation = gke_command(
search_term_data_validation = GKEPodOperator(
task_id="search_term_data_validation_v2",
command=[
arguments=[
"python",
"search_term_data_validation_v2/main.py",
"--data_validation_origin",
@ -49,6 +49,6 @@ with DAG(
"--data_validation_reporting_destination",
"moz-fx-data-shared-prod.search_terms_derived.search_term_data_validation_reports_v1",
],
docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/search-term-data-validation-v2_docker_etl:latest",
image="gcr.io/moz-fx-data-airflow-prod-88e0/search-term-data-validation-v2_docker_etl:latest",
dag=dag,
)

Просмотреть файл

@ -3,7 +3,7 @@ from datetime import datetime, timedelta
from airflow import DAG
from timetable import MultiWeekTimetable
from utils.gcp import gke_command
from operators.gcp_container_operator import GKEPodOperator
from utils.tags import Tag
docs = """
@ -83,41 +83,41 @@ base_command = [
# and don't slow down other tables. run them in a separate project with their own slot
# reservation to ensure they can finish on time, because they use more slots than
# everything else combined
telemetry_main = gke_command(
telemetry_main = GKEPodOperator(
task_id="telemetry_main",
name="shredder-telemetry-main",
command=[
arguments=[
*base_command,
"--parallelism=2",
"--billing-project=moz-fx-data-shredder",
"--only=telemetry_stable.main_v5",
],
docker_image=docker_image,
image=docker_image,
is_delete_operator_pod=True,
reattach_on_restart=True,
dag=dag,
)
telemetry_main_use_counter = gke_command(
telemetry_main_use_counter = GKEPodOperator(
task_id="telemetry_main_use_counter",
name="shredder-telemetry-main-use-counter",
command=[
arguments=[
*base_command,
"--parallelism=2",
"--billing-project=moz-fx-data-shredder",
"--only=telemetry_stable.main_use_counter_v4",
],
docker_image=docker_image,
image=docker_image,
is_delete_operator_pod=True,
reattach_on_restart=True,
dag=dag,
)
# everything else
flat_rate = gke_command(
flat_rate = GKEPodOperator(
task_id="all",
name="shredder-all",
command=[
arguments=[
*base_command,
"--parallelism=4",
"--billing-project=moz-fx-data-bq-batch-prod",
@ -125,7 +125,7 @@ flat_rate = gke_command(
"telemetry_stable.main_v5",
"telemetry_stable.main_use_counter_v4",
],
docker_image=docker_image,
image=docker_image,
is_delete_operator_pod=True,
reattach_on_restart=True,
# Needed to scale the highmem pool from 0 -> 1, because cluster autoscaling

Просмотреть файл

@ -1,7 +1,8 @@
from datetime import datetime
from airflow import DAG
from utils.gcp import gke_command
from operators.gcp_container_operator import GKEPodOperator
from utils.tags import Tag
DOCS = """
@ -44,9 +45,9 @@ with DAG(
tags=tags,
catchup=False,
) as dag:
webcompat_kb_import = gke_command(
webcompat_kb_import = GKEPodOperator(
task_id="webcompat_kb",
command=[
arguments=[
"python",
"webcompat_kb/main.py",
"--bq_project_id",
@ -54,6 +55,6 @@ with DAG(
"--bq_dataset_id",
"webcompat_knowledge_base",
],
docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/webcompat-kb_docker_etl:latest",
image="gcr.io/moz-fx-data-airflow-prod-88e0/webcompat-kb_docker_etl:latest",
dag=dag,
)

Просмотреть файл

@ -1,8 +1,6 @@
import json
import re
from airflow import models
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.google.cloud.operators.dataproc import (
ClusterGenerator,
DataprocCreateClusterOperator,
@ -490,70 +488,3 @@ def normalize_table_id(table_name):
raise ValueError("table_name cannot contain more than 1024 characters")
else:
return re.sub("\\W+", "_", table_name).lower()
def gke_command(
task_id,
command,
docker_image,
aws_conn_id="aws_dev_iam_s3",
gcp_conn_id="google_cloud_airflow_gke",
gke_project_id=GCP_PROJECT_ID,
gke_location="us-west1",
gke_cluster_name="workloads-prod-v1",
gke_namespace="default",
xcom_push=False,
reattach_on_restart=False,
env_vars=None,
is_delete_operator_pod=False,
**kwargs,
):
"""
Run a docker command on GKE.
:param str task_id: [Required] ID for the task
:param List[str] command: [Required] Command to run
:param str docker_image: [Required] docker image to use
:param str aws_conn_id: Airflow connection id for AWS access
:param str gcp_conn_id: Airflow connection id for GCP access
:param str gke_project_id: GKE cluster project id
:param str gke_location: GKE cluster location
:param str gke_cluster_name: GKE cluster name
:param str gke_namespace: GKE cluster namespace
:param bool xcom_push: Return the output of this command as an xcom
:param Dict[str, Any] kwargs: Additional keyword arguments for
GKEPodOperator
:return: GKEPodOperator
"""
kwargs["name"] = kwargs.get("name", task_id.replace("_", "-"))
context_env_vars = {
key: value
for key, value in zip(
("AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_SESSION_TOKEN"),
AwsBaseHook(aws_conn_id=aws_conn_id, client_type="s3").get_credentials()
if aws_conn_id
else (),
)
if value is not None
}
context_env_vars["XCOM_PUSH"] = json.dumps(xcom_push)
if env_vars:
context_env_vars.update(env_vars)
return GKEPodOperator(
task_id=task_id,
gcp_conn_id=gcp_conn_id,
project_id=gke_project_id,
location=gke_location,
cluster_name=gke_cluster_name,
namespace=gke_namespace,
image=docker_image,
arguments=command,
do_xcom_push=xcom_push,
reattach_on_restart=reattach_on_restart,
env_vars=context_env_vars,
is_delete_operator_pod=is_delete_operator_pod,
**kwargs,
)

Просмотреть файл

@ -1,40 +0,0 @@
from utils.gcp import gke_command
def update_hotlist(
task_id,
project_id,
source_dataset_id,
destination_dataset_id=None,
docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest",
gcp_conn_id="google_cloud_airflow_gke",
**kwargs,
):
"""
Update hotlist.
:param task_id: Airflow task id
:param project_id: GCP project to write to
:param source_dataset_id: Bigquery dataset to read from in queries
:param destination_dataset_id: Bigquery dataset to write results to. Defaults to source_dataset_id
:param docker_image: Docker image
:param gcp_conn_id: Airflow GCP connection
"""
if destination_dataset_id is None:
destination_dataset_id = source_dataset_id
env_vars = {
"PROJECT": project_id,
"PROD_DATASET": source_dataset_id,
"DATASET": destination_dataset_id,
"SUBMISSION_DATE": "{{ ds }}",
}
command = ["script/glam/update_probe_hotlist"]
return gke_command(
task_id=task_id,
cmds=["bash"],
env_vars=env_vars,
command=command,
docker_image=docker_image,
gcp_conn_id=gcp_conn_id,
**kwargs,
)