From 9143893c991ae69891908ab3e5ade95702740492 Mon Sep 17 00:00:00 2001 From: Tomek Date: Wed, 4 Sep 2019 22:21:15 +0200 Subject: [PATCH] [AIRFLOW-5347] Add system tests for GoogleCloudStorage (#5951) --- airflow/contrib/example_dags/example_gcs.py | 134 ++++++++++++++++++ .../contrib/example_dags/example_gcs_acl.py | 77 ---------- docs/howto/operator/gcp/gcs.rst | 8 +- ..._operator_system.py => test_gcs_system.py} | 29 ++-- .../operators/test_gcs_system_helper.py | 45 ++++++ 5 files changed, 204 insertions(+), 89 deletions(-) create mode 100644 airflow/contrib/example_dags/example_gcs.py delete mode 100644 airflow/contrib/example_dags/example_gcs_acl.py rename tests/contrib/operators/{test_gcs_acl_operator_system.py => test_gcs_system.py} (56%) create mode 100644 tests/contrib/operators/test_gcs_system_helper.py diff --git a/airflow/contrib/example_dags/example_gcs.py b/airflow/contrib/example_dags/example_gcs.py new file mode 100644 index 0000000000..2512fc5b96 --- /dev/null +++ b/airflow/contrib/example_dags/example_gcs.py @@ -0,0 +1,134 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example Airflow DAG for Google Cloud Storage operators. +""" + +import os +import airflow +from airflow import models +from airflow.operators.bash_operator import BashOperator +from airflow.contrib.operators.gcs_operator import ( + GoogleCloudStorageCreateBucketOperator, +) +from airflow.contrib.operators.gcs_list_operator import GoogleCloudStorageListOperator +from airflow.contrib.operators.gcs_download_operator import ( + GoogleCloudStorageDownloadOperator, +) +from airflow.contrib.operators.gcs_delete_operator import ( + GoogleCloudStorageDeleteOperator, +) +from airflow.operators.local_to_gcs import FileToGoogleCloudStorageOperator +from airflow.operators.gcs_to_gcs import GoogleCloudStorageToGoogleCloudStorageOperator +from airflow.contrib.operators.gcs_acl_operator import ( + GoogleCloudStorageBucketCreateAclEntryOperator, + GoogleCloudStorageObjectCreateAclEntryOperator, +) + +default_args = {"start_date": airflow.utils.dates.days_ago(1)} + +# [START howto_operator_gcs_acl_args_common] +PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-id") +BUCKET_1 = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket") +GCS_ACL_ENTITY = os.environ.get("GCS_ACL_ENTITY", "allUsers") +GCS_ACL_BUCKET_ROLE = "OWNER" +GCS_ACL_OBJECT_ROLE = "OWNER" +# [END howto_operator_gcs_acl_args_common] + +BUCKET_2 = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket-2") + +PATH_TO_UPLOAD_FILE = os.environ.get( + "GCP_GCS_PATH_TO_UPLOAD_FILE", "test-gcs-example.txt" +) +PATH_TO_SAVED_FILE = os.environ.get( + "GCP_GCS_PATH_TO_SAVED_FILE", "test-gcs-example-download.txt" +) + +BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1] + + +with models.DAG( + "example_gcs", default_args=default_args, schedule_interval=None +) as dag: + create_bucket1 = GoogleCloudStorageCreateBucketOperator( + task_id="create_bucket1", bucket_name=BUCKET_1, project_id=PROJECT_ID + ) + + create_bucket2 = GoogleCloudStorageCreateBucketOperator( + task_id="create_bucket2", bucket_name=BUCKET_2, project_id=PROJECT_ID + ) + + list_buckets = GoogleCloudStorageListOperator( + task_id="list_buckets", bucket=BUCKET_1 + ) + + list_buckets_result = BashOperator( + task_id="list_buckets_result", + bash_command="echo \"{{ task_instance.xcom_pull('list_buckets') }}\"", + ) + + upload_file = FileToGoogleCloudStorageOperator( + task_id="upload_file", + src=PATH_TO_UPLOAD_FILE, + dst=BUCKET_FILE_LOCATION, + bucket=BUCKET_1, + ) + + # [START howto_operator_gcs_bucket_create_acl_entry_task] + gcs_bucket_create_acl_entry_task = GoogleCloudStorageBucketCreateAclEntryOperator( + bucket=BUCKET_1, + entity=GCS_ACL_ENTITY, + role=GCS_ACL_BUCKET_ROLE, + task_id="gcs_bucket_create_acl_entry_task", + ) + # [END howto_operator_gcs_bucket_create_acl_entry_task] + + # [START howto_operator_gcs_object_create_acl_entry_task] + gcs_object_create_acl_entry_task = GoogleCloudStorageObjectCreateAclEntryOperator( + bucket=BUCKET_1, + object_name=BUCKET_FILE_LOCATION, + entity=GCS_ACL_ENTITY, + role=GCS_ACL_OBJECT_ROLE, + task_id="gcs_object_create_acl_entry_task", + ) + # [END howto_operator_gcs_object_create_acl_entry_task] + + download_file = GoogleCloudStorageDownloadOperator( + task_id="download_file", + object_name=BUCKET_FILE_LOCATION, + bucket=BUCKET_1, + filename=PATH_TO_SAVED_FILE, + ) + + copy_file = GoogleCloudStorageToGoogleCloudStorageOperator( + task_id="copy_file", + source_bucket=BUCKET_1, + source_object=BUCKET_FILE_LOCATION, + destination_bucket=BUCKET_2, + destination_object=BUCKET_FILE_LOCATION, + ) + + delete_files = GoogleCloudStorageDeleteOperator( + task_id="delete_files", bucket_name=BUCKET_1, prefix="" + ) + + [create_bucket1, create_bucket2] >> list_buckets >> list_buckets_result + [create_bucket1, create_bucket2] >> upload_file + upload_file >> [download_file, copy_file] + upload_file >> gcs_bucket_create_acl_entry_task >> gcs_object_create_acl_entry_task >> delete_files diff --git a/airflow/contrib/example_dags/example_gcs_acl.py b/airflow/contrib/example_dags/example_gcs_acl.py deleted file mode 100644 index 27713eb9ed..0000000000 --- a/airflow/contrib/example_dags/example_gcs_acl.py +++ /dev/null @@ -1,77 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -""" -Example Airflow DAG that creates a new ACL entry on the specified bucket and object. - -This DAG relies on the following OS environment variables - -* GCS_ACL_BUCKET - Name of a bucket. -* GCS_ACL_OBJECT - Name of the object. For information about how to URL encode object - names to be path safe, see: - https://cloud.google.com/storage/docs/json_api/#encoding -* GCS_ACL_ENTITY - The entity holding the permission. -* GCS_ACL_BUCKET_ROLE - The access permission for the entity for the bucket. -* GCS_ACL_OBJECT_ROLE - The access permission for the entity for the object. -""" - -import os - -import airflow -from airflow import models -from airflow.contrib.operators.gcs_acl_operator import \ - GoogleCloudStorageBucketCreateAclEntryOperator, \ - GoogleCloudStorageObjectCreateAclEntryOperator - -# [START howto_operator_gcs_acl_args_common] -GCS_ACL_BUCKET = os.environ.get('GCS_ACL_BUCKET', 'example-bucket') -GCS_ACL_OBJECT = os.environ.get('GCS_ACL_OBJECT', 'example-object') -GCS_ACL_ENTITY = os.environ.get('GCS_ACL_ENTITY', 'example-entity') -GCS_ACL_BUCKET_ROLE = os.environ.get('GCS_ACL_BUCKET_ROLE', 'example-bucket-role') -GCS_ACL_OBJECT_ROLE = os.environ.get('GCS_ACL_OBJECT_ROLE', 'example-object-role') -# [END howto_operator_gcs_acl_args_common] - -default_args = { - 'start_date': airflow.utils.dates.days_ago(1) -} - -with models.DAG( - 'example_gcs_acl', - default_args=default_args, - schedule_interval=None # Change to match your use case -) as dag: - # [START howto_operator_gcs_bucket_create_acl_entry_task] - gcs_bucket_create_acl_entry_task = GoogleCloudStorageBucketCreateAclEntryOperator( - bucket=GCS_ACL_BUCKET, - entity=GCS_ACL_ENTITY, - role=GCS_ACL_BUCKET_ROLE, - task_id="gcs_bucket_create_acl_entry_task" - ) - # [END howto_operator_gcs_bucket_create_acl_entry_task] - # [START howto_operator_gcs_object_create_acl_entry_task] - gcs_object_create_acl_entry_task = GoogleCloudStorageObjectCreateAclEntryOperator( - bucket=GCS_ACL_BUCKET, - object_name=GCS_ACL_OBJECT, - entity=GCS_ACL_ENTITY, - role=GCS_ACL_OBJECT_ROLE, - task_id="gcs_object_create_acl_entry_task" - ) - # [END howto_operator_gcs_object_create_acl_entry_task] - - gcs_bucket_create_acl_entry_task >> gcs_object_create_acl_entry_task diff --git a/docs/howto/operator/gcp/gcs.rst b/docs/howto/operator/gcp/gcs.rst index 9ec6210fb0..4b21c31d5b 100644 --- a/docs/howto/operator/gcp/gcs.rst +++ b/docs/howto/operator/gcp/gcs.rst @@ -53,7 +53,7 @@ Arguments Some arguments in the example DAG are taken from the OS environment variables: -.. exampleinclude:: ../../../../airflow/contrib/example_dags/example_gcs_acl.py +.. exampleinclude:: ../../../../airflow/contrib/example_dags/example_gcs.py :language: python :start-after: [START howto_operator_gcs_acl_args_common] :end-before: [END howto_operator_gcs_acl_args_common] @@ -61,7 +61,7 @@ Some arguments in the example DAG are taken from the OS environment variables: Using the operator """""""""""""""""" -.. exampleinclude:: ../../../../airflow/contrib/example_dags/example_gcs_acl.py +.. exampleinclude:: ../../../../airflow/contrib/example_dags/example_gcs.py :language: python :dedent: 4 :start-after: [START howto_operator_gcs_bucket_create_acl_entry_task] @@ -97,7 +97,7 @@ Arguments Some arguments in the example DAG are taken from the OS environment variables: -.. exampleinclude:: ../../../../airflow/contrib/example_dags/example_gcs_acl.py +.. exampleinclude:: ../../../../airflow/contrib/example_dags/example_gcs.py :language: python :start-after: [START howto_operator_gcs_acl_args_common] :end-before: [END howto_operator_gcs_acl_args_common] @@ -105,7 +105,7 @@ Some arguments in the example DAG are taken from the OS environment variables: Using the operator """""""""""""""""" -.. exampleinclude:: ../../../../airflow/contrib/example_dags/example_gcs_acl.py +.. exampleinclude:: ../../../../airflow/contrib/example_dags/example_gcs.py :language: python :dedent: 4 :start-after: [START howto_operator_gcs_object_create_acl_entry_task] diff --git a/tests/contrib/operators/test_gcs_acl_operator_system.py b/tests/contrib/operators/test_gcs_system.py similarity index 56% rename from tests/contrib/operators/test_gcs_acl_operator_system.py rename to tests/contrib/operators/test_gcs_system.py index e29984ee3e..3dbac392fa 100644 --- a/tests/contrib/operators/test_gcs_acl_operator_system.py +++ b/tests/contrib/operators/test_gcs_system.py @@ -18,17 +18,30 @@ # under the License. import unittest -from tests.contrib.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem +from tests.contrib.utils.base_gcp_system_test_case import ( + SKIP_TEST_WARNING, + TestDagGcpSystem, +) from tests.contrib.utils.gcp_authenticator import GCP_GCS_KEY +from tests.contrib.operators.test_gcs_system_helper import GcsSystemTestHelper @unittest.skipIf(TestDagGcpSystem.skip_check(GCP_GCS_KEY), SKIP_TEST_WARNING) -class CloudStorageExampleDagsSystemTest(TestDagGcpSystem): - def __init__(self, method_name='runTest'): - super().__init__( - method_name, - dag_id='example_gcs_acl', - gcp_key=GCP_GCS_KEY) +class GoogleCloudStorageExampleDagsTest(TestDagGcpSystem): + def setUp(self): + super().setUp() + self.helper.create_test_file() - def test_run_example_dag_gcs_acl(self): + def tearDown(self): + self.gcp_authenticator.gcp_authenticate() + self.helper.remove_test_files() + self.helper.remove_bucket() + self.gcp_authenticator.gcp_revoke_authentication() + super().tearDown() + + def __init__(self, method_name="runTest"): + super().__init__(method_name, dag_id="example_gcs", gcp_key=GCP_GCS_KEY) + self.helper = GcsSystemTestHelper() + + def test_run_example_dag(self): self._run_dag() diff --git a/tests/contrib/operators/test_gcs_system_helper.py b/tests/contrib/operators/test_gcs_system_helper.py new file mode 100644 index 0000000000..379ac24d08 --- /dev/null +++ b/tests/contrib/operators/test_gcs_system_helper.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import os + +from tests.contrib.utils.logging_command_executor import LoggingCommandExecutor + +BUCKET_1 = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket") +BUCKET_2 = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket-2") + +PATH_TO_UPLOAD_FILE = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE", "test-gcs-example.txt") +PATH_TO_SAVED_FILE = os.environ.get("GCP_GCS_PATH_TO_SAVED_FILE", "test-gcs-example-download.txt") + + +class GcsSystemTestHelper(LoggingCommandExecutor): + @staticmethod + def create_test_file(): + # Create test file for upload + with open(PATH_TO_UPLOAD_FILE, "w+") as file: + file.writelines(["This is a test file"]) + + @staticmethod + def remove_test_files(): + os.remove(PATH_TO_UPLOAD_FILE) + os.remove(PATH_TO_SAVED_FILE) + + def remove_bucket(self): + self.execute_cmd(["gsutil", "rm", "-r", "gs://{bucket}".format(bucket=BUCKET_1)]) + self.execute_cmd(["gsutil", "rm", "-r", "gs://{bucket}".format(bucket=BUCKET_2)])