Stop using start_date in default_args in example_dags (#9982)

This commit is contained in:
Kaxil Naik 2020-07-25 00:16:25 +01:00 коммит произвёл GitHub
Родитель fc033048f7
Коммит 8b10a4b35e
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
69 изменённых файлов: 95 добавлений и 185 удалений

Просмотреть файл

@ -323,6 +323,12 @@ repos:
entry: "(airflow\\.){0,1}utils\\.dates\\.days_ago"
files: \.py$
pass_filenames: true
- id: restrict-start_date-in-default_args-in-example_dags
language: pygrep
name: "'start_date' should not be defined in default_args in example_dags"
entry: "default_args\\s*=\\s*{\\s*(\"|')start_date(\"|')"
files: \.*example_dags.*.py$
pass_filenames: true
- id: check-integrations
name: Check if integration list is aligned
entry: ./scripts/ci/pre_commit/pre_commit_check_integrations.sh

Просмотреть файл

@ -26,12 +26,10 @@ from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
default_args = {"start_date": days_ago(1)}
with models.DAG(
dag_id="example_complex",
default_args=default_args,
schedule_interval=None,
start_date=days_ago(1),
tags=['example'],
) as dag:

Просмотреть файл

@ -29,7 +29,8 @@ from airflow.utils.dates import days_ago
dag = DAG(
dag_id="example_trigger_target_dag",
default_args={"start_date": days_ago(2), "owner": "airflow"},
default_args={"owner": "airflow"},
start_date=days_ago(2),
schedule_interval=None,
tags=['example']
)

Просмотреть файл

@ -45,12 +45,11 @@ DESTINATION_LOCATION_URI = getenv(
"DESTINATION_LOCATION_URI", "s3://mybucket/prefix")
# [END howto_operator_datasync_1_args_2]
default_args = {"start_date": days_ago(1)}
with models.DAG(
"example_datasync_1_1",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=['example'],
) as dag:
@ -64,7 +63,7 @@ with models.DAG(
with models.DAG(
"example_datasync_1_2",
default_args=default_args,
start_date=days_ago(1),
schedule_interval=None, # Override to match your needs
) as dag:
# [START howto_operator_datasync_1_2]

Просмотреть файл

@ -77,13 +77,12 @@ UPDATE_TASK_KWARGS = json.loads(
getenv("UPDATE_TASK_KWARGS", default_update_task_kwargs)
)
default_args = {"start_date": days_ago(1)}
# [END howto_operator_datasync_2_args]
with models.DAG(
"example_datasync_2",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=['example'],
) as dag:

Просмотреть файл

@ -51,8 +51,6 @@ YOUTUBE_VIDEO_PARTS = getenv("YOUTUBE_VIDEO_PARTS", "snippet")
YOUTUBE_VIDEO_FIELDS = getenv("YOUTUBE_VIDEO_FIELDS", "items(id,snippet(description,publishedAt,tags,title))")
# [END howto_operator_google_api_to_s3_transfer_advanced_env_variables]
default_args = {"start_date": days_ago(1)}
# pylint: disable=unused-argument
# [START howto_operator_google_api_to_s3_transfer_advanced_task_1_2]
@ -74,8 +72,8 @@ s3_file_name, _ = s3_file.rsplit('.', 1)
with DAG(
dag_id="example_google_api_to_s3_transfer_advanced",
default_args=default_args,
schedule_interval=None,
start_date=days_ago(1),
tags=['example']
) as dag:
# [START howto_operator_google_api_to_s3_transfer_advanced_task_1]

Просмотреть файл

@ -32,12 +32,11 @@ GOOGLE_SHEET_RANGE = getenv("GOOGLE_SHEET_RANGE")
S3_DESTINATION_KEY = getenv("S3_DESTINATION_KEY", "s3://bucket/key.json")
# [END howto_operator_google_api_to_s3_transfer_basic_env_variables]
default_args = {"start_date": days_ago(1)}
with DAG(
dag_id="example_google_api_to_s3_transfer_basic",
default_args=default_args,
schedule_interval=None,
start_date=days_ago(1),
tags=['example']
) as dag:
# [START howto_operator_google_api_to_s3_transfer_basic_task_1]

Просмотреть файл

@ -33,11 +33,9 @@ IMAP_MAIL_FILTER = getenv("IMAP_MAIL_FILTER", "All")
S3_DESTINATION_KEY = getenv("S3_DESTINATION_KEY", "s3://bucket/key.json")
# [END howto_operator_imap_attachment_to_s3_env_variables]
default_args = {"start_date": days_ago(1)}
with DAG(
dag_id="example_imap_attachment_to_s3",
default_args=default_args,
start_date=days_ago(1),
schedule_interval=None,
tags=['example']
) as dag:

Просмотреть файл

@ -34,8 +34,6 @@ S3_KEY = getenv("S3_KEY", "key")
REDSHIFT_TABLE = getenv("REDSHIFT_TABLE", "test_table")
# [END howto_operator_s3_to_redshift_env_variables]
default_args = {"start_date": days_ago(1)}
def _add_sample_data_to_s3():
s3_hook = S3Hook()
@ -50,7 +48,7 @@ def _remove_sample_data_from_s3():
with DAG(
dag_id="example_s3_to_redshift",
default_args=default_args,
start_date=days_ago(1),
schedule_interval=None,
tags=['example']
) as dag:

Просмотреть файл

@ -64,12 +64,10 @@ FIELDS_TO_EXTRACT = [
# [END howto_google_ads_env_variables]
default_args = {"start_date": dates.days_ago(1)}
with models.DAG(
"example_google_ads",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=dates.days_ago(1),
) as dag:
# [START howto_google_ads_to_gcs_operator]
run_operator = GoogleAdsToGcsOperator(

Просмотреть файл

@ -53,14 +53,13 @@ DATASET = {
IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [GCP_AUTOML_TEXT_CLS_BUCKET]}}
default_args = {"start_date": days_ago(1)}
extract_object_id = CloudAutoMLHook.extract_object_id
# Example DAG for AutoML Natural Language Text Classification
with models.DAG(
"example_automl_text_cls",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=['example'],
) as example_dag:
create_dataset_task = AutoMLCreateDatasetOperator(

Просмотреть файл

@ -50,14 +50,13 @@ DATASET = {"display_name": "test_text_dataset", "text_extraction_dataset_metadat
IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [GCP_AUTOML_TEXT_BUCKET]}}
default_args = {"start_date": days_ago(1)}
extract_object_id = CloudAutoMLHook.extract_object_id
# Example DAG for AutoML Natural Language Entities Extraction
with models.DAG(
"example_automl_text",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
user_defined_macros={"extract_object_id": extract_object_id},
tags=['example'],
) as example_dag:

Просмотреть файл

@ -53,14 +53,13 @@ DATASET = {
IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [GCP_AUTOML_SENTIMENT_BUCKET]}}
default_args = {"start_date": days_ago(1)}
extract_object_id = CloudAutoMLHook.extract_object_id
# Example DAG for AutoML Natural Language Text Sentiment
with models.DAG(
"example_automl_text_sentiment",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
user_defined_macros={"extract_object_id": extract_object_id},
tags=['example'],
) as example_dag:

Просмотреть файл

@ -59,7 +59,6 @@ DATASET = {
IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [GCP_AUTOML_DATASET_BUCKET]}}
default_args = {"start_date": days_ago(1)}
extract_object_id = CloudAutoMLHook.extract_object_id
@ -76,8 +75,8 @@ def get_target_column_spec(columns_specs: List[Dict], column_name: str) -> str:
# Example DAG to create dataset, train model_id and deploy it.
with models.DAG(
"example_create_and_deploy",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
user_defined_macros={
"get_target_column_spec": get_target_column_spec,
"target": TARGET,
@ -184,8 +183,8 @@ with models.DAG(
# Example DAG for AutoML datasets operations
with models.DAG(
"example_automl_dataset",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
user_defined_macros={"extract_object_id": extract_object_id},
) as example_dag:
create_dataset_task = AutoMLCreateDatasetOperator(
@ -249,8 +248,9 @@ with models.DAG(
with models.DAG(
"example_gcp_get_deploy",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=["example"],
) as get_deploy_dag:
# [START howto_operator_get_model]
get_model_task = AutoMLGetModelOperator(
@ -273,8 +273,9 @@ with models.DAG(
with models.DAG(
"example_gcp_predict",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=["example"],
) as predict_dag:
# [START howto_operator_prediction]
predict_task = AutoMLPredictOperator(

Просмотреть файл

@ -56,15 +56,14 @@ DATASET = {
IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [GCP_AUTOML_TRANSLATION_BUCKET]}}
default_args = {"start_date": days_ago(1)}
extract_object_id = CloudAutoMLHook.extract_object_id
# Example DAG for AutoML Translation
with models.DAG(
"example_automl_translation",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
user_defined_macros={"extract_object_id": extract_object_id},
tags=['example'],
) as example_dag:

Просмотреть файл

@ -53,15 +53,14 @@ DATASET = {
IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [GCP_AUTOML_VIDEO_BUCKET]}}
default_args = {"start_date": days_ago(1)}
extract_object_id = CloudAutoMLHook.extract_object_id
# Example DAG for AutoML Video Intelligence Classification
with models.DAG(
"example_automl_video",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
user_defined_macros={"extract_object_id": extract_object_id},
tags=['example'],
) as example_dag:

Просмотреть файл

@ -54,15 +54,14 @@ DATASET = {
IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [GCP_AUTOML_TRACKING_BUCKET]}}
default_args = {"start_date": days_ago(1)}
extract_object_id = CloudAutoMLHook.extract_object_id
# Example DAG for AutoML Video Intelligence Object Tracking
with models.DAG(
"example_automl_video_tracking",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
user_defined_macros={"extract_object_id": extract_object_id},
tags=['example'],
) as example_dag:

Просмотреть файл

@ -53,15 +53,14 @@ DATASET = {
IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [GCP_AUTOML_VISION_BUCKET]}}
default_args = {"start_date": days_ago(1)}
extract_object_id = CloudAutoMLHook.extract_object_id
# Example DAG for AutoML Vision Classification
with models.DAG(
"example_automl_vision",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
user_defined_macros={"extract_object_id": extract_object_id},
tags=['example'],
) as example_dag:

Просмотреть файл

@ -53,15 +53,14 @@ DATASET = {
IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [GCP_AUTOML_DETECTION_BUCKET]}}
default_args = {"start_date": days_ago(1)}
extract_object_id = CloudAutoMLHook.extract_object_id
# Example DAG for AutoML Vision Object Detection
with models.DAG(
"example_automl_vision_detection",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
user_defined_macros={"extract_object_id": extract_object_id},
tags=['example'],
) as example_dag:

Просмотреть файл

@ -69,12 +69,10 @@ TRANSFER_CONFIG = ParseDict(
# [END howto_bigquery_dts_create_args]
default_args = {"start_date": days_ago(1)}
with models.DAG(
"example_gcp_bigquery_dts",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=['example'],
) as dag:
# [START howto_bigquery_create_data_transfer]

Просмотреть файл

@ -33,8 +33,6 @@ from airflow.providers.google.cloud.operators.bigquery import (
)
from airflow.utils.dates import days_ago
default_args = {"start_date": days_ago(1)}
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
BQ_LOCATION = "europe-north1"
@ -52,8 +50,8 @@ DATA_SAMPLE_GCS_OBJECT_NAME = DATA_SAMPLE_GCS_URL_PARTS.path[1:]
with models.DAG(
"example_bigquery_operations",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=["example"],
) as dag:
# [START howto_operator_bigquery_create_table]
@ -176,8 +174,8 @@ with models.DAG(
with models.DAG(
"example_bigquery_operations_location",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=["example"],
):
create_dataset_with_location = BigQueryCreateEmptyDatasetOperator(

Просмотреть файл

@ -54,15 +54,13 @@ SCHEMA = [
{"name": "ds", "type": "DATE", "mode": "NULLABLE"},
]
default_args = {"start_date": days_ago(1)}
for location in [None, LOCATION]:
dag_id = "example_bigquery_queries_location" if location else "example_bigquery_queries"
with models.DAG(
dag_id,
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=["example"],
user_defined_macros={"DATASET": DATASET_NAME, "TABLE": TABLE_1}
) as dag_with_locations:

Просмотреть файл

@ -33,12 +33,10 @@ DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "test_dataset_transfe
ORIGIN = "origin"
TARGET = "target"
default_args = {"start_date": days_ago(1)}
with models.DAG(
"example_bigquery_to_bigquery",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=["example"],
) as dag:
copy_selected_data = BigQueryToBigQueryOperator(

Просмотреть файл

@ -35,12 +35,10 @@ DATA_EXPORT_BUCKET_NAME = os.environ.get(
)
TABLE = "table_42"
default_args = {"start_date": days_ago(1)}
with models.DAG(
"example_bigquery_to_gcs",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=["example"],
) as dag:
bigquery_to_gcs = BigQueryToGCSOperator(

Просмотреть файл

@ -37,12 +37,10 @@ DATA_EXPORT_BUCKET_NAME = os.environ.get(
ORIGIN = "origin"
TARGET = "target"
default_args = {"start_date": days_ago(1)}
with models.DAG(
"example_bigquery_transfer",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=["example"],
) as dag:
copy_selected_data = BigQueryToBigQueryOperator(

Просмотреть файл

@ -75,8 +75,8 @@ default_args = {
with models.DAG(
'example_gcp_bigtable_operators',
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=['example'],
) as dag:
# [START howto_operator_gcp_bigtable_instance_create]

Просмотреть файл

@ -53,12 +53,10 @@ FIRST_INSTANCE = {"tier": Instance.Tier.BASIC, "memory_size_gb": 1}
SECOND_INSTANCE = {"tier": Instance.Tier.STANDARD_HA, "memory_size_gb": 3}
default_args = {"start_date": dates.days_ago(1)}
with models.DAG(
"gcp_cloud_memorystore",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=dates.days_ago(1),
tags=['example'],
) as dag:
# [START howto_operator_create_instance]

Просмотреть файл

@ -174,8 +174,8 @@ default_args = {
with models.DAG(
'example_gcp_sql',
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=['example'],
) as dag:
# ############################################## #

Просмотреть файл

@ -87,10 +87,6 @@ SQL = [
'DROP TABLE TABLE_TEST2',
]
default_args = {
'start_date': days_ago(1)
}
# [START howto_operator_cloudsql_query_connections]
@ -272,8 +268,8 @@ tasks = []
with models.DAG(
dag_id='example_gcp_sql_query',
default_args=default_args,
schedule_interval=None,
start_date=days_ago(1),
tags=['example'],
) as dag:
prev_task = None

Просмотреть файл

@ -92,13 +92,13 @@ aws_to_gcs_transfer_body = {
# [START howto_operator_gcp_transfer_default_args]
default_args = {'start_date': days_ago(1)}
default_args = {'owner': 'airflow'}
# [END howto_operator_gcp_transfer_default_args]
with models.DAG(
'example_gcp_transfer_aws',
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=['example'],
) as dag:

Просмотреть файл

@ -82,12 +82,10 @@ update_body = {
}
# [END howto_operator_gcp_transfer_update_job_body]
default_args = {"start_date": days_ago(1)}
with models.DAG(
"example_gcp_transfer",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=["example"],
) as dag:

Просмотреть файл

@ -53,8 +53,8 @@ GCE_SHORT_MACHINE_TYPE_NAME = os.environ.get('GCE_SHORT_MACHINE_TYPE_NAME', 'n1-
with models.DAG(
'example_gcp_compute',
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=['example'],
) as dag:
# [START howto_operator_gce_start]

Просмотреть файл

@ -95,8 +95,8 @@ UPDATE_POLICY = {
with models.DAG(
'example_gcp_compute_igm',
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=['example'],
) as dag:
# [START howto_operator_gce_igm_copy_template]

Просмотреть файл

@ -39,8 +39,6 @@ from airflow.providers.google.cloud.operators.datacatalog import (
from airflow.utils.dates import days_ago
from airflow.utils.helpers import chain
default_args = {"start_date": days_ago(1)}
PROJECT_ID = "polidea-airflow"
LOCATION = "us-central1"
ENTRY_GROUP_ID = "important_data_jan_2019"
@ -50,7 +48,7 @@ FIELD_NAME_1 = "first"
FIELD_NAME_2 = "second"
FIELD_NAME_3 = "first-rename"
with models.DAG("example_gcp_datacatalog", default_args=default_args, schedule_interval=None) as dag:
with models.DAG("example_gcp_datacatalog", start_date=days_ago(1), schedule_interval=None) as dag:
# Create
# [START howto_operator_gcp_datacatalog_create_entry_group]
create_entry_group = CloudDataCatalogCreateEntryGroupOperator(

Просмотреть файл

@ -41,7 +41,6 @@ GCS_JAR_BUCKET_NAME = GCS_JAR_PARTS.netloc
GCS_JAR_OBJECT_NAME = GCS_JAR_PARTS.path[1:]
default_args = {
"start_date": days_ago(1),
'dataflow_default_options': {
'tempLocation': GCS_TMP,
'stagingLocation': GCS_STAGING,
@ -50,8 +49,8 @@ default_args = {
with models.DAG(
"example_gcp_dataflow_native_java",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=['example'],
) as dag_native_java:
@ -93,6 +92,7 @@ with models.DAG(
with models.DAG(
"example_gcp_dataflow_native_python",
default_args=default_args,
start_date=days_ago(1),
schedule_interval=None, # Override to match your needs
tags=['example'],
) as dag_native_python:
@ -133,6 +133,7 @@ with models.DAG(
with models.DAG(
"example_gcp_dataflow_template",
default_args=default_args,
start_date=days_ago(1),
schedule_interval=None, # Override to match your needs
tags=['example'],
) as dag_template:

Просмотреть файл

@ -133,12 +133,11 @@ PIPELINE = {
}
# [END howto_data_fusion_env_variables]
default_args = {"start_date": dates.days_ago(1)}
with models.DAG(
"example_data_fusion",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=dates.days_ago(1)
) as dag:
# [START howto_cloud_data_fusion_create_instance_operator]
create_instance = CloudDataFusionCreateInstanceOperator(

Просмотреть файл

@ -146,7 +146,7 @@ HADOOP_JOB = {
with models.DAG(
"example_gcp_dataproc",
default_args={"start_date": days_ago(1)},
start_date=days_ago(1),
schedule_interval=None,
) as dag:
# [START how_to_cloud_dataproc_create_cluster_operator]

Просмотреть файл

@ -33,12 +33,10 @@ from airflow.utils import dates
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
BUCKET = os.environ.get("GCP_DATASTORE_BUCKET", "datastore-system-test")
default_args = {"start_date": dates.days_ago(1)}
with models.DAG(
"example_gcp_datastore",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=dates.days_ago(1),
tags=['example'],
) as dag:
export_task = CloudDatastoreExportEntitiesOperator(

Просмотреть файл

@ -35,9 +35,6 @@ from airflow.providers.google.cloud.operators.dlp import (
)
from airflow.utils.dates import days_ago
default_args = {"start_date": days_ago(1)}
GCP_PROJECT = os.environ.get("GCP_PROJECT_ID", "example-project")
TEMPLATE_ID = "dlp-inspect-838746"
ITEM = ContentItem(
@ -54,8 +51,8 @@ INSPECT_TEMPLATE = InspectTemplate(inspect_config=INSPECT_CONFIG)
with models.DAG(
"example_gcp_dlp",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=['example'],
) as dag:
create_template = CloudDLPCreateInspectTemplateOperator(

Просмотреть файл

@ -55,12 +55,10 @@ PARAMS = {
}
# [END howto_FB_ADS_variables]
default_args = {"start_date": days_ago(1)}
with models.DAG(
"example_facebook_ads_to_gcs",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1)
) as dag:
create_bucket = GCSCreateBucketOperator(

Просмотреть файл

@ -78,7 +78,7 @@ body = {
# [START howto_operator_gcf_default_args]
default_args = {
'start_date': dates.days_ago(1)
'owner': 'airflow'
}
# [END howto_operator_gcf_default_args]
@ -101,8 +101,8 @@ else:
with models.DAG(
'example_gcp_function',
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=dates.days_ago(1),
tags=['example'],
) as dag:
# [START howto_operator_gcf_deploy]

Просмотреть файл

@ -34,8 +34,6 @@ from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesyste
from airflow.utils.dates import days_ago
from airflow.utils.state import State
default_args = {"start_date": days_ago(1)}
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-id")
BUCKET_1 = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
GCS_ACL_ENTITY = os.environ.get("GCS_ACL_ENTITY", "allUsers")
@ -57,7 +55,7 @@ PATH_TO_SAVED_FILE = os.environ.get(
BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
with models.DAG(
"example_gcs", default_args=default_args, schedule_interval=None, tags=['example'],
"example_gcs", start_date=days_ago(1), schedule_interval=None, tags=['example'],
) as dag:
create_bucket1 = GCSCreateBucketOperator(
task_id="create_bucket1", bucket_name=BUCKET_1, project_id=PROJECT_ID

Просмотреть файл

@ -26,8 +26,6 @@ from airflow.providers.google.cloud.operators.gcs import GCSSynchronizeBucketsOp
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.utils.dates import days_ago
default_args = {"start_date": days_ago(1)}
BUCKET_1_SRC = os.environ.get("GCP_GCS_BUCKET_1_SRC", "test-gcs-sync-1-src")
BUCKET_1_DST = os.environ.get("GCP_GCS_BUCKET_1_DST", "test-gcs-sync-1-dst")
@ -41,7 +39,7 @@ OBJECT_1 = os.environ.get("GCP_GCS_OBJECT_1", "test-gcs-to-gcs-1")
OBJECT_2 = os.environ.get("GCP_GCS_OBJECT_2", "test-gcs-to-gcs-2")
with models.DAG(
"example_gcs_to_gcs", default_args=default_args, schedule_interval=None, tags=['example']
"example_gcs_to_gcs", start_date=days_ago(1), schedule_interval=None, tags=['example']
) as dag:
# [START howto_synch_bucket]
sync_bucket = GCSSynchronizeBucketsOperator(

Просмотреть файл

@ -25,8 +25,6 @@ from airflow import models
from airflow.providers.google.cloud.transfers.gcs_to_sftp import GCSToSFTPOperator
from airflow.utils.dates import days_ago
default_args = {"start_date": days_ago(1)}
BUCKET_SRC = os.environ.get("GCP_GCS_BUCKET_1_SRC", "test-gcs-sftp")
OBJECT_SRC_1 = "parent-1.bin"
OBJECT_SRC_2 = "parent-2.bin"
@ -36,7 +34,7 @@ DESTINATION_PATH_2 = "/tmp/dirs/"
with models.DAG(
"example_gcs_to_sftp", default_args=default_args, schedule_interval=None, tags=['example']
"example_gcs_to_sftp", start_date=days_ago(1), schedule_interval=None, tags=['example']
) as dag:
# [START howto_operator_gcs_to_sftp_copy_single_file]
copy_file_from_gcs_to_sftp = GCSToSFTPOperator(

Просмотреть файл

@ -36,12 +36,10 @@ CLUSTER_NAME = os.environ.get("GCP_GKE_CLUSTER_NAME", "cluster-name")
CLUSTER = {"name": CLUSTER_NAME, "initial_node_count": 1}
# [END howto_operator_gcp_gke_create_cluster_definition]
default_args = {"start_date": days_ago(1)}
with models.DAG(
"example_gcp_gke",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=['example'],
) as dag:
# [START howto_operator_gke_create_cluster]

Просмотреть файл

@ -58,8 +58,8 @@ default_args = {
with models.DAG(
"example_gcp_mlengine",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=['example'],
) as dag:
# [START howto_operator_gcp_mlengine_training]

Просмотреть файл

@ -47,12 +47,10 @@ document_gcs = Document(gcs_content_uri=GCS_CONTENT_URI, type="PLAIN_TEXT")
# [END howto_operator_gcp_natural_language_document_gcs]
default_args = {"start_date": days_ago(1)}
with models.DAG(
"example_gcp_natural_language",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1)
) as dag:
# [START howto_operator_gcp_natural_language_analyze_entities]

Просмотреть файл

@ -26,12 +26,10 @@ GCS_BUCKET = "postgres_to_gcs_example"
FILENAME = "test_file"
SQL_QUERY = "select * from test_table;"
default_args = {"start_date": days_ago(1)}
with models.DAG(
dag_id='example_postgres_to_gcs',
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=['example'],
) as dag:
upload_data = PostgresToGCSOperator(

Просмотреть файл

@ -44,12 +44,10 @@ def safe_name(s: str) -> str:
return re.sub("[^0-9a-zA-Z_]+", "_", s)
default_args = {"start_date": days_ago(1)}
with models.DAG(
dag_id="example_presto_to_gcs",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=["example"],
) as dag:

Просмотреть файл

@ -35,8 +35,6 @@ TOPIC_FOR_SENSOR_DAG = "PubSubSensorTestTopic"
TOPIC_FOR_OPERATOR_DAG = "PubSubOperatorTestTopic"
MESSAGE = {"data": b"Tool", "attributes": {"name": "wrench", "mass": "1.3kg", "count": "3"}}
default_args = {"start_date": days_ago(1)}
# [START howto_operator_gcp_pubsub_pull_messages_result_cmd]
echo_cmd = """
{% for m in task_instance.xcom_pull('pull_messages') %}
@ -47,8 +45,8 @@ echo_cmd = """
with models.DAG(
"example_gcp_pubsub_sensor",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1)
) as example_sensor_dag:
# [START howto_operator_gcp_pubsub_create_topic]
create_topic = PubSubCreateTopicOperator(
@ -108,8 +106,8 @@ with models.DAG(
with models.DAG(
"example_gcp_pubsub_operator",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1)
) as example_operator_dag:
# [START howto_operator_gcp_pubsub_create_topic]
create_topic = PubSubCreateTopicOperator(

Просмотреть файл

@ -25,8 +25,6 @@ from airflow import models
from airflow.providers.google.cloud.transfers.sftp_to_gcs import SFTPToGCSOperator
from airflow.utils.dates import days_ago
default_args = {"start_date": days_ago(1)}
BUCKET_SRC = os.environ.get("GCP_GCS_BUCKET_1_SRC", "test-sftp-gcs")
TMP_PATH = "/tmp"
@ -39,7 +37,7 @@ OBJECT_SRC_3 = "parent-3.txt"
with models.DAG(
"example_sftp_to_gcs", default_args=default_args, schedule_interval=None
"example_sftp_to_gcs", start_date=days_ago(1), schedule_interval=None
) as dag:
# [START howto_operator_sftp_to_gcs_copy_single_file]
copy_file_from_sftp_to_gcs = SFTPToGCSOperator(

Просмотреть файл

@ -25,11 +25,9 @@ from airflow.utils.dates import days_ago
BUCKET = os.environ.get("GCP_GCS_BUCKET", "test28397yeo")
SPREADSHEET_ID = os.environ.get("SPREADSHEET_ID", "1234567890qwerty")
default_args = {"start_date": days_ago(1)}
with models.DAG(
"example_sheets_to_gcs",
default_args=default_args,
start_date=days_ago(1),
schedule_interval=None, # Override to match your needs
tags=["example"],
) as dag:

Просмотреть файл

@ -58,8 +58,8 @@ default_args = {
with models.DAG(
'example_gcp_spanner',
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=['example'],
) as dag:
# Create

Просмотреть файл

@ -41,11 +41,9 @@ CONFIG = {"encoding": "LINEAR16", "language_code": "en_US"}
AUDIO = {"uri": "gs://{bucket}/{object}".format(bucket=BUCKET_NAME, object=FILENAME)}
# [END howto_operator_speech_to_text_api_arguments]
default_args = {"start_date": dates.days_ago(1)}
with models.DAG(
"example_gcp_speech_to_text",
default_args=default_args,
start_date=dates.days_ago(1),
schedule_interval=None, # Override to match your needs
tags=['example'],
) as dag:

Просмотреть файл

@ -106,12 +106,10 @@ TEST_NOTIFICATION_CHANNEL_2 = {
"type": "slack"
}
default_args = {"start_date": days_ago(1)}
with models.DAG(
'example_stackdriver',
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=['example']
) as dag:
# [START howto_operator_gcp_stackdriver_upsert_notification_channel]

Просмотреть файл

@ -35,7 +35,6 @@ from airflow.providers.google.cloud.operators.tasks import (
)
from airflow.utils.dates import days_ago
default_args = {"start_date": days_ago(1)}
timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(datetime.now() + timedelta(hours=12)) # pylint: disable=no-member
@ -55,8 +54,8 @@ TASK = {
with models.DAG(
"example_gcp_tasks",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=['example'],
) as dag:

Просмотреть файл

@ -35,11 +35,9 @@ VOICE = {"language_code": "en-US", "ssml_gender": "FEMALE"}
AUDIO_CONFIG = {"audio_encoding": "LINEAR16"}
# [END howto_operator_text_to_speech_api_arguments]
default_args = {"start_date": dates.days_ago(1)}
with models.DAG(
"example_gcp_text_to_speech",
default_args=default_args,
start_date=dates.days_ago(1),
schedule_interval=None, # Override to match your needs
tags=['example'],
) as dag:

Просмотреть файл

@ -27,12 +27,10 @@ from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.translate import CloudTranslateTextOperator
from airflow.utils.dates import days_ago
default_args = {'start_date': days_ago(1)}
with models.DAG(
'example_gcp_translate',
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=['example'],
) as dag:
# [START howto_operator_translate_text]

Просмотреть файл

@ -45,12 +45,11 @@ MODEL = 'base'
SOURCE_LANGUAGE = None # type: None
# [END howto_operator_translate_speech_arguments]
default_args = {"start_date": dates.days_ago(1)}
with models.DAG(
"example_gcp_translate_speech",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=dates.days_ago(1),
tags=['example'],
) as dag:
text_to_speech_synthesize_task = CloudTextToSpeechSynthesizeOperator(

Просмотреть файл

@ -36,8 +36,6 @@ from airflow.providers.google.cloud.operators.video_intelligence import (
)
from airflow.utils.dates import days_ago
default_args = {"start_date": days_ago(1)}
# [START howto_operator_video_intelligence_os_args]
GCP_BUCKET_NAME = os.environ.get(
"GCP_VIDEO_INTELLIGENCE_BUCKET_NAME", "test-bucket-name"
@ -52,8 +50,8 @@ INPUT_URI = "gs://{}/video.mp4".format(GCP_BUCKET_NAME)
with models.DAG(
"example_gcp_video_intelligence",
default_args=default_args,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=['example'],
) as dag:

Просмотреть файл

@ -64,8 +64,6 @@ from google.cloud.vision import enums # isort:skip pylint: disable=wrong-import
# [END howto_operator_vision_enums_import]
default_args = {'start_date': days_ago(1)}
GCP_VISION_LOCATION = os.environ.get('GCP_VISION_LOCATION', 'europe-west1')
GCP_VISION_PRODUCT_SET_ID = os.environ.get('GCP_VISION_PRODUCT_SET_ID', 'product_set_explicit_id')
@ -98,7 +96,7 @@ DETECT_IMAGE = {"source": {"image_uri": GCP_VISION_ANNOTATE_IMAGE_URL}}
# [END howto_operator_vision_detect_image_param]
with models.DAG(
'example_gcp_vision_autogenerated_id', default_args=default_args, schedule_interval=None
'example_gcp_vision_autogenerated_id', start_date=days_ago(1), schedule_interval=None
) as dag_autogenerated_id:
# ################################## #
# ### Autogenerated IDs examples ### #
@ -236,7 +234,7 @@ with models.DAG(
remove_product_from_product_set >> product_set_delete
with models.DAG(
'example_gcp_vision_explicit_id', default_args=default_args, schedule_interval=None
'example_gcp_vision_explicit_id', start_date=days_ago(1), schedule_interval=None
) as dag_explicit_id:
# ############################# #
# ### Explicit IDs examples ### #
@ -401,7 +399,7 @@ with models.DAG(
remove_product_from_product_set_2 >> product_delete_2
with models.DAG(
'example_gcp_vision_annotate_image', default_args=default_args, schedule_interval=None
'example_gcp_vision_annotate_image', start_date=days_ago(1), schedule_interval=None
) as dag_annotate_image:
# ############################## #
# ### Annotate image example ### #

Просмотреть файл

@ -37,12 +37,10 @@ WEB_PROPERTY_AD_WORDS_LINK_ID = os.environ.get(
)
DATA_ID = "kjdDu3_tQa6n8Q1kXFtSmg"
default_args = {"start_date": dates.days_ago(1)}
with models.DAG(
"example_google_analytics",
default_args=default_args,
schedule_interval=None, # Override to match your needs
schedule_interval=None, # Override to match your needs,
start_date=dates.days_ago(1),
) as dag:
# [START howto_marketing_platform_list_accounts_operator]
list_account = GoogleAnalyticsListAccountsOperator(task_id="list_account")

Просмотреть файл

@ -83,12 +83,10 @@ CONVERSION_UPDATE = {
"value": 123.4,
}
default_args = {"start_date": dates.days_ago(1)}
with models.DAG(
"example_campaign_manager",
default_args=default_args,
schedule_interval=None, # Override to match your needs
schedule_interval=None, # Override to match your needs,
start_date=dates.days_ago(1)
) as dag:
# [START howto_campaign_manager_insert_report_operator]
create_report = GoogleCampaignManagerInsertReportOperator(

Просмотреть файл

@ -80,12 +80,10 @@ DOWNLOAD_LINE_ITEMS_REQUEST: Dict = {
"fileSpec": "EWF"}
# [END howto_display_video_env_variables]
default_args = {"start_date": dates.days_ago(1)}
with models.DAG(
"example_display_video",
default_args=default_args,
schedule_interval=None, # Override to match your needs
schedule_interval=None, # Override to match your needs,
start_date=dates.days_ago(1)
) as dag:
# [START howto_google_display_video_createquery_report_operator]
create_report = GoogleDisplayVideo360CreateReportOperator(

Просмотреть файл

@ -43,12 +43,10 @@ REPORT = {
}
# [END howto_search_ads_env_variables]
default_args = {"start_date": dates.days_ago(1)}
with models.DAG(
"example_search_ads",
default_args=default_args,
schedule_interval=None, # Override to match your needs
schedule_interval=None, # Override to match your needs,
start_date=dates.days_ago(1)
) as dag:
# [START howto_search_ads_generate_report_operator]
generate_report = GoogleSearchAdsInsertReportOperator(

Просмотреть файл

@ -26,12 +26,10 @@ from airflow.utils.dates import days_ago
GCS_TO_GDRIVE_BUCKET = os.environ.get("GCS_TO_DRIVE_BUCKET", "example-object")
default_args = {"start_date": days_ago(1)}
with models.DAG(
"example_gcs_to_gdrive",
default_args=default_args,
schedule_interval=None, # Override to match your needs
schedule_interval=None, # Override to match your needs,
start_date=days_ago(1),
tags=['example'],
) as dag:
# [START howto_operator_gcs_to_gdrive_copy_single_file]

Просмотреть файл

@ -27,11 +27,9 @@ BUCKET = os.environ.get("GCP_GCS_BUCKET", "example-test-bucket3")
SPREADSHEET_ID = os.environ.get("SPREADSHEET_ID", "example-spreadsheetID")
NEW_SPREADSHEET_ID = os.environ.get("NEW_SPREADSHEET_ID", "1234567890qwerty")
default_args = {"start_date": days_ago(1)}
with models.DAG(
"example_gcs_to_sheets",
default_args=default_args,
start_date=days_ago(1),
schedule_interval=None, # Override to match your needs
tags=["example"],
) as dag:

Просмотреть файл

@ -34,12 +34,10 @@ SPREADSHEET = {
"sheets": [{"properties": {"title": "Sheet1"}}],
}
default_args = {"start_date": days_ago(1)}
with models.DAG(
"example_sheets_gcs",
default_args=default_args,
schedule_interval=None, # Override to match your needs
schedule_interval=None, # Override to match your needs,
start_date=days_ago(1),
tags=["example"],
) as dag:
# [START upload_sheet_to_gcs]

Просмотреть файл

@ -50,7 +50,7 @@ class TestMarkTasks(unittest.TestCase):
cls.dag3 = dagbag.dags['example_trigger_target_dag']
cls.dag3.sync_to_db()
cls.execution_dates = [days_ago(2), days_ago(1)]
start_date3 = cls.dag3.default_args["start_date"]
start_date3 = cls.dag3.start_date
cls.dag3_execution_dates = [start_date3, start_date3 + timedelta(days=1),
start_date3 + timedelta(days=2)]