[AIRFLOW-5631] Change way of running GCP system tests (#6299)

* [AIRFLOW-5631] Change way of running GCP system tests

This commit proposes a new way of running GCP related system tests.
It uses SystemTests base class and authentication is provided by a
context manager thus it's easier to understand what's going on.
This commit is contained in:
Tomek 2019-10-28 14:26:38 +01:00 коммит произвёл Jarek Potiuk
Родитель 3fb9d4935f
Коммит ffe7ba9819
39 изменённых файлов: 835 добавлений и 860 удалений

Просмотреть файл

@ -17,7 +17,6 @@
# specific language governing permissions and limitations
# under the License.
"""This module is deprecated. Please use `airflow.gcp.hooks.base`."""
import warnings
# pylint: disable=unused-import

Просмотреть файл

@ -0,0 +1,186 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This module contains a mechanism for providing temporary
Google Cloud Platform authentication.
"""
import json
import os
import tempfile
from contextlib import contextmanager
from typing import Dict, Optional, Sequence
from urllib.parse import urlencode
from google.auth.environment_vars import CREDENTIALS
from airflow.exceptions import AirflowException
AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT = "AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT"
def build_gcp_conn(
key_file_path: Optional[str] = None,
scopes: Optional[Sequence[str]] = None,
project_id: Optional[str] = None,
) -> str:
"""
Builds a variable that can be used as ``AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT`` with provided service key,
scopes and project id.
:param key_file_path: Path to service key.
:type key_file_path: Optional[str]
:param scopes: Required OAuth scopes.
:type scopes: Optional[List[str]]
:param project_id: The GCP project id to be used for the connection.
:type project_id: Optional[str]
:return: String representing Airflow connection.
"""
conn = "google-cloud-platform://?{}"
extras = "extra__google_cloud_platform"
query_params = dict()
if key_file_path:
query_params["{}__key_path".format(extras)] = key_file_path
if scopes:
scopes_string = ",".join(scopes)
query_params["{}__scope".format(extras)] = scopes_string
if project_id:
query_params["{}__projects".format(extras)] = project_id
query = urlencode(query_params)
return conn.format(query)
@contextmanager
def temporary_environment_variable(variable_name: str, value: str):
"""
Context manager that set up temporary value for a given environment
variable and the restore initial state.
:param variable_name: Name of the environment variable
:type variable_name: str
:param value: The temporary value
:type value: str
"""
# Save initial value
init_value = os.environ.get(variable_name)
try:
# set temporary value
os.environ[variable_name] = value
yield
finally:
# Restore initial state (remove or restore)
if variable_name in os.environ:
del os.environ[variable_name]
if init_value:
os.environ[variable_name] = init_value
@contextmanager
def provide_gcp_credentials(
key_file_path: Optional[str] = None, key_file_dict: Optional[Dict] = None
):
"""
Context manager that provides a GCP credentials for application supporting `Application
Default Credentials (ADC) strategy <https://cloud.google.com/docs/authentication/production>`__.
It can be used to provide credentials for external programs (e.g. gcloud) that expect authorization
file in ``GOOGLE_APPLICATION_CREDENTIALS`` environment variable.
:param key_file_path: Path to file with GCP credentials .json file.
:type key_file_path: str
:param key_file_dict: Dictionary with credentials.
:type key_file_dict: Dict
"""
msg = "Please provide `key_file_path` or `key_file_dict`."
assert key_file_path or key_file_dict, msg
if key_file_path and key_file_path.endswith(".p12"):
raise AirflowException(
"Legacy P12 key file are not supported, use a JSON key file."
)
with tempfile.NamedTemporaryFile(mode="w+t") as conf_file:
if not key_file_path and key_file_dict:
conf_file.write(json.dumps(key_file_dict))
conf_file.flush()
key_file_path = conf_file.name
if key_file_path:
with temporary_environment_variable(CREDENTIALS, key_file_path):
yield
else:
# We will use the default service account credentials.
yield
@contextmanager
def provide_gcp_connection(
key_file_path: Optional[str] = None,
scopes: Optional[Sequence] = None,
project_id: Optional[str] = None,
):
"""
Context manager that provides a temporary value of AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT
connection. It build a new connection that includes path to provided service json,
required scopes and project id.
:param key_file_path: Path to file with GCP credentials .json file.
:type key_file_path: str
:param scopes: OAuth scopes for the connection
:type scopes: Sequence
:param project_id: The id of GCP project for the connection.
:type project_id: str
"""
if key_file_path and key_file_path.endswith(".p12"):
raise AirflowException(
"Legacy P12 key file are not supported, use a JSON key file."
)
conn = build_gcp_conn(
scopes=scopes, key_file_path=key_file_path, project_id=project_id
)
with temporary_environment_variable(AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT, conn):
yield
@contextmanager
def provide_gcp_conn_and_credentials(
key_file_path: Optional[str] = None,
scopes: Optional[Sequence] = None,
project_id: Optional[str] = None,
):
"""
Context manager that provides both:
- GCP credentials for application supporting `Application Default Credentials (ADC)
strategy <https://cloud.google.com/docs/authentication/production>`__.
- temporary value of ``AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT`` connection
:param key_file_path: Path to file with GCP credentials .json file.
:type key_file_path: str
:param scopes: OAuth scopes for the connection
:type scopes: Sequence
:param project_id: The id of GCP project for the connection.
:type project_id: str
"""
with provide_gcp_credentials(key_file_path), provide_gcp_connection(
key_file_path, scopes, project_id
):
yield

Просмотреть файл

@ -16,39 +16,22 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from tests.gcp.utils.base_gcp_system_test_case import (
SKIP_LONG_TEST_WARNING, SKIP_TEST_WARNING, TestDagGcpSystem,
)
from tests.gcp.utils.gcp_authenticator import GCP_AUTOML_KEY
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_AUTOML_KEY), SKIP_TEST_WARNING)
class AutoMLDatasetOperationsSystemTest(TestDagGcpSystem):
def __init__(self, method_name="runTest"):
super().__init__(
method_name,
dag_id="example_automl_dataset",
require_local_executor=True,
gcp_key=GCP_AUTOML_KEY,
)
@skip_gcp_system(GCP_AUTOML_KEY, require_local_executor=True, long_lasting=True)
class AutoMLDatasetOperationsSystemTest(SystemTest):
@provide_gcp_context(GCP_AUTOML_KEY)
def test_run_example_dag(self):
self._run_dag()
self.run_dag('example_automl_dataset', GCP_DAG_FOLDER)
@unittest.skipIf(
TestDagGcpSystem.skip_long(GCP_AUTOML_KEY), SKIP_LONG_TEST_WARNING
)
class AutoMLModelOperationsSystemTest(TestDagGcpSystem):
def __init__(self, method_name="runTest"):
super().__init__(
method_name,
dag_id="example_create_and_deploy",
require_local_executor=True,
gcp_key=GCP_AUTOML_KEY,
)
@skip_gcp_system(GCP_AUTOML_KEY, require_local_executor=True, long_lasting=True)
class AutoMLModelOperationsSystemTest(SystemTest):
@provide_gcp_context(GCP_AUTOML_KEY)
def test_run_example_dag(self):
self._run_dag()
self.run_dag('example_create_and_deploy', GCP_DAG_FOLDER)

Просмотреть файл

@ -17,42 +17,36 @@
# specific language governing permissions and limitations
# under the License.
import unittest
from airflow.gcp.example_dags.example_bigquery_dts import (
BUCKET_URI, GCP_DTS_BQ_DATASET, GCP_DTS_BQ_TABLE, GCP_PROJECT_ID,
)
from tests.gcp.operators.test_bigquery_dts_system_helper import GcpBigqueryDtsTestHelper
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_BIGQUERY_KEY
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_BIGQUERY_KEY), SKIP_TEST_WARNING)
class GcpBigqueryDtsSystemTest(TestDagGcpSystem):
def __init__(self, method_name="runTest"):
super(GcpBigqueryDtsSystemTest, self).__init__(
method_name, dag_id="example_gcp_bigquery_dts", gcp_key=GCP_BIGQUERY_KEY
)
self.helper = GcpBigqueryDtsTestHelper()
@skip_gcp_system(GCP_BIGQUERY_KEY, require_local_executor=True)
class GcpBigqueryDtsSystemTest(SystemTest):
helper = GcpBigqueryDtsTestHelper()
@provide_gcp_context(GCP_BIGQUERY_KEY)
def setUp(self):
super().setUp()
self.gcp_authenticator.gcp_authenticate()
self.helper.create_dataset(
project_id=GCP_PROJECT_ID,
dataset=GCP_DTS_BQ_DATASET,
table=GCP_DTS_BQ_TABLE,
)
self.helper.upload_data(dataset=GCP_DTS_BQ_DATASET, table=GCP_DTS_BQ_TABLE, gcs_file=BUCKET_URI)
self.gcp_authenticator.gcp_revoke_authentication()
@provide_gcp_context(GCP_BIGQUERY_KEY)
def tearDown(self):
self.gcp_authenticator.gcp_authenticate()
self.helper.delete_dataset(
project_id=GCP_PROJECT_ID, dataset=GCP_DTS_BQ_DATASET
)
self.gcp_authenticator.gcp_revoke_authentication()
super().tearDown()
@provide_gcp_context(GCP_BIGQUERY_KEY)
def test_run_example_dag_function(self):
self._run_dag()
self.run_dag('example_gcp_bigquery_dts', GCP_DAG_FOLDER)

Просмотреть файл

@ -17,40 +17,32 @@
# specific language governing permissions and limitations
# under the License.
"""System tests for Google Cloud Build operators"""
import unittest
from tests.gcp.operators.test_bigquery_system_helper import GCPBigQueryTestHelper
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_BIGQUERY_KEY
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_BIGQUERY_KEY), SKIP_TEST_WARNING)
class BigQueryExampleDagsSystemTest(TestDagGcpSystem):
@skip_gcp_system(GCP_BIGQUERY_KEY, require_local_executor=True)
class BigQueryExampleDagsSystemTest(SystemTest):
"""
System tests for Google BigQuery operators
It use a real service.
"""
helper = GCPBigQueryTestHelper()
def __init__(self, method_name="runTest"):
super().__init__(method_name, dag_id="example_bigquery", gcp_key=GCP_BIGQUERY_KEY)
self.helper = GCPBigQueryTestHelper()
@provide_gcp_context(GCP_BIGQUERY_KEY)
def setUp(self):
super().setUp()
self.gcp_authenticator.gcp_authenticate()
try:
self.helper.create_repository_and_bucket()
finally:
self.gcp_authenticator.gcp_revoke_authentication()
self.helper.create_repository_and_bucket()
@provide_gcp_context(GCP_BIGQUERY_KEY)
def test_run_example_dag(self):
self._run_dag()
self.run_dag('example_bigquery', GCP_DAG_FOLDER)
@provide_gcp_context(GCP_BIGQUERY_KEY)
def tearDown(self):
self.gcp_authenticator.gcp_authenticate()
try:
self.helper.delete_bucket()
finally:
self.gcp_authenticator.gcp_revoke_authentication()
self.helper.delete_bucket()
super().tearDown()

Просмотреть файл

@ -16,30 +16,23 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from tests.gcp.operators.test_bigtable_system_helper import GCPBigtableTestHelper
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_BIGTABLE_KEY
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_BIGTABLE_KEY), SKIP_TEST_WARNING)
class BigTableExampleDagsSystemTest(TestDagGcpSystem):
def __init__(self, method_name='runTest'):
super().__init__(
method_name,
dag_id='example_gcp_bigtable_operators',
require_local_executor=True,
gcp_key=GCP_BIGTABLE_KEY)
self.helper = GCPBigtableTestHelper()
@skip_gcp_system(GCP_BIGTABLE_KEY, require_local_executor=True)
class BigTableExampleDagsSystemTest(SystemTest):
helper = GCPBigtableTestHelper()
@provide_gcp_context(GCP_BIGTABLE_KEY)
def test_run_example_dag_gcs_bigtable(self):
self._run_dag()
self.run_dag('example_gcp_bigtable_operators', GCP_DAG_FOLDER)
@provide_gcp_context(GCP_BIGTABLE_KEY)
def tearDown(self):
self.gcp_authenticator.gcp_authenticate()
try:
self.helper.delete_instance()
finally:
self.gcp_authenticator.gcp_revoke_authentication()
self.helper.delete_instance()
super().tearDown()

Просмотреть файл

@ -17,47 +17,34 @@
# specific language governing permissions and limitations
# under the License.
"""System tests for Google Cloud Build operators"""
import unittest
from tests.gcp.operators.test_cloud_build_system_helper import GCPCloudBuildTestHelper
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_CLOUD_BUILD_KEY
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_CLOUD_BUILD_KEY), SKIP_TEST_WARNING)
class CloudBuildExampleDagsSystemTest(TestDagGcpSystem):
@skip_gcp_system(GCP_CLOUD_BUILD_KEY, require_local_executor=True)
class CloudBuildExampleDagsSystemTest(SystemTest):
"""
System tests for Google Cloud Build operators
It use a real service.
"""
helper = GCPCloudBuildTestHelper()
def __init__(self, method_name="runTest"):
super().__init__(
method_name,
dag_id="example_gcp_cloud_build",
require_local_executor=True,
gcp_key=GCP_CLOUD_BUILD_KEY,
)
self.helper = GCPCloudBuildTestHelper()
@provide_gcp_context(GCP_CLOUD_BUILD_KEY)
def setUp(self):
super().setUp()
self.gcp_authenticator.gcp_authenticate()
try:
self.helper.create_repository_and_bucket()
finally:
self.gcp_authenticator.gcp_revoke_authentication()
self.helper.create_repository_and_bucket()
@provide_gcp_context(GCP_CLOUD_BUILD_KEY)
def test_run_example_dag(self):
self._run_dag()
self.run_dag("example_gcp_cloud_build", GCP_DAG_FOLDER)
@provide_gcp_context(GCP_CLOUD_BUILD_KEY)
def tearDown(self):
self.gcp_authenticator.gcp_authenticate()
try:
self.helper.delete_bucket()
self.helper.delete_docker_images()
self.helper.delete_repo()
finally:
self.gcp_authenticator.gcp_revoke_authentication()
self.helper.delete_bucket()
self.helper.delete_docker_images()
self.helper.delete_repo()
super().tearDown()

Просмотреть файл

@ -17,46 +17,32 @@
# specific language governing permissions and limitations
# under the License.
"""System tests for Google Cloud Memorystore operators"""
import unittest
from tests.gcp.operators.test_cloud_memorystore_system_helper import GCPCloudMemorystoreTestHelper
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_MEMORYSTORE # TODO: Update it
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_MEMORYSTORE), SKIP_TEST_WARNING)
class CloudBuildExampleDagsSystemTest(TestDagGcpSystem):
@skip_gcp_system(GCP_MEMORYSTORE, require_local_executor=True)
class CloudBuildExampleDagsSystemTest(SystemTest):
"""
System tests for Google Cloud Memorystore operators
It use a real service.
"""
helper = GCPCloudMemorystoreTestHelper()
def __init__(self, method_name="runTest"):
super().__init__(
method_name,
dag_id="gcp_cloud_memorystore",
dag_name="example_cloud_memorystore.py",
require_local_executor=False,
gcp_key=GCP_MEMORYSTORE,
)
self.helper = GCPCloudMemorystoreTestHelper()
@provide_gcp_context(GCP_MEMORYSTORE)
def setUp(self):
super().setUp()
self.gcp_authenticator.gcp_authenticate()
try:
self.helper.create_bucket()
finally:
self.gcp_authenticator.gcp_revoke_authentication()
self.helper.create_bucket()
@provide_gcp_context(GCP_MEMORYSTORE)
def test_run_example_dag(self):
self._run_dag()
self.run_dag('gcp_cloud_memorystore', GCP_DAG_FOLDER)
@provide_gcp_context(GCP_MEMORYSTORE)
def tearDown(self):
self.gcp_authenticator.gcp_authenticate()
try:
self.helper.delete_bucket()
finally:
self.gcp_authenticator.gcp_revoke_authentication()
self.helper.delete_bucket()
super().tearDown()

Просмотреть файл

@ -20,49 +20,25 @@ import os
import random
import string
import time
import unittest
from os.path import dirname
from airflow import AirflowException
from airflow.gcp.hooks.cloud_sql import CloudSqlProxyRunner
from tests.gcp.operators.test_cloud_sql_system_helper import CloudSqlQueryTestHelper
from tests.gcp.utils.base_gcp_system_test_case import TestBaseGcpSystem, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_CLOUDSQL_KEY
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'project-id')
SKIP_CLOUDSQL_QUERY_WARNING = """
This test is skipped from automated runs intentionally
as creating databases in Google Cloud SQL takes a very
long time. You can still set GCP_ENABLE_CLOUDSQL_QUERY_TEST
environment variable to 'True' and then you should be able to
run it manually after you create the database
Creating the database can be done by running
`{}/test_cloud_sql_operatorquery_system_helper.py \
--action=before-tests`
(you should remember to delete the database with --action=after-tests flag)
""".format(dirname(__file__))
GCP_ENABLE_CLOUDSQL_QUERY_TEST = os.environ.get('GCP_ENABLE_CLOUDSQL_QUERY_TEST')
enable_cloudsql_query_test = bool(GCP_ENABLE_CLOUDSQL_QUERY_TEST == 'True')
SQL_QUERY_TEST_HELPER = CloudSqlQueryTestHelper()
@unittest.skipIf(not enable_cloudsql_query_test, SKIP_CLOUDSQL_QUERY_WARNING)
class CloudSqlProxySystemTest(TestBaseGcpSystem):
def __init__(self, method_name='runTest'):
super().__init__(
method_name,
gcp_key='gcp_cloudsql.json')
@skip_gcp_system(GCP_CLOUDSQL_KEY)
class CloudSqlProxySystemTest(SystemTest):
@provide_gcp_context(GCP_CLOUDSQL_KEY)
def setUp(self):
super().setUp()
self.gcp_authenticator.gcp_authenticate()
SQL_QUERY_TEST_HELPER.check_if_instances_are_up(instance_suffix="_QUERY")
self.gcp_authenticator.gcp_revoke_authentication()
@staticmethod
def generate_unique_path():
@ -94,8 +70,8 @@ class CloudSqlProxySystemTest(TestBaseGcpSystem):
runner.stop_proxy()
self.assertIsNone(runner.sql_proxy_process)
@provide_gcp_context(GCP_CLOUDSQL_KEY)
def test_start_proxy_with_all_instances_generated_credential_file(self):
self.gcp_authenticator.set_dictionary_in_airflow_connection()
runner = CloudSqlProxyRunner(path_prefix='/tmp/' + self.generate_unique_path(),
project_id=GCP_PROJECT_ID,
instance_specification='')
@ -120,21 +96,14 @@ class CloudSqlProxySystemTest(TestBaseGcpSystem):
self.assertEqual(runner.get_proxy_version(), "1.13")
@unittest.skipIf(not enable_cloudsql_query_test, SKIP_CLOUDSQL_QUERY_WARNING)
class CloudSqlQueryExampleDagsSystemTest(TestDagGcpSystem):
def __init__(self, method_name='runTest'):
super().__init__(
method_name,
dag_id='example_gcp_sql_query',
gcp_key=GCP_CLOUDSQL_KEY)
@skip_gcp_system(GCP_CLOUDSQL_KEY)
class CloudSqlQueryExampleDagsSystemTest(SystemTest):
@provide_gcp_context(GCP_CLOUDSQL_KEY)
def setUp(self):
super().setUp()
self.gcp_authenticator.gcp_authenticate()
SQL_QUERY_TEST_HELPER.check_if_instances_are_up(instance_suffix="_QUERY")
SQL_QUERY_TEST_HELPER.setup_instances(instance_suffix="_QUERY")
self.gcp_authenticator.gcp_revoke_authentication()
@provide_gcp_context(GCP_CLOUDSQL_KEY)
def test_run_example_dag_cloudsql_query(self):
self._run_dag()
self.run_dag('example_gcp_sql_query', GCP_DAG_FOLDER)

Просмотреть файл

@ -17,42 +17,34 @@
# specific language governing permissions and limitations
# under the License.
import os
import unittest
from airflow import AirflowException
from tests.gcp.operators.test_cloud_sql_system_helper import CloudSqlQueryTestHelper
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_CLOUDSQL_KEY
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'project-id')
SQL_QUERY_TEST_HELPER = CloudSqlQueryTestHelper()
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_CLOUDSQL_KEY), SKIP_TEST_WARNING)
class CloudSqlExampleDagsIntegrationTest(TestDagGcpSystem):
def __init__(self, method_name='runTest'):
super().__init__(
method_name,
dag_id='example_gcp_sql',
gcp_key=GCP_CLOUDSQL_KEY)
@skip_gcp_system(GCP_CLOUDSQL_KEY, require_local_executor=True)
class CloudSqlExampleDagsIntegrationTest(SystemTest):
@provide_gcp_context(GCP_CLOUDSQL_KEY)
def tearDown(self):
# Delete instances just in case the test failed and did not cleanup after itself
self.gcp_authenticator.gcp_authenticate()
try:
SQL_QUERY_TEST_HELPER.delete_instances(instance_suffix="-failover-replica")
SQL_QUERY_TEST_HELPER.delete_instances(instance_suffix="-read-replica")
SQL_QUERY_TEST_HELPER.delete_instances()
SQL_QUERY_TEST_HELPER.delete_instances(instance_suffix="2")
SQL_QUERY_TEST_HELPER.delete_service_account_acls()
finally:
self.gcp_authenticator.gcp_revoke_authentication()
SQL_QUERY_TEST_HELPER.delete_instances(instance_suffix="-failover-replica")
SQL_QUERY_TEST_HELPER.delete_instances(instance_suffix="-read-replica")
SQL_QUERY_TEST_HELPER.delete_instances()
SQL_QUERY_TEST_HELPER.delete_instances(instance_suffix="2")
SQL_QUERY_TEST_HELPER.delete_service_account_acls()
super().tearDown()
@provide_gcp_context(GCP_CLOUDSQL_KEY)
def test_run_example_dag_cloudsql(self):
try:
self._run_dag()
self.run_dag('example_gcp_sql', GCP_DAG_FOLDER)
except AirflowException as e:
self.log.warning(
"In case you see 'The instance or operation is not in an appropriate "

Просмотреть файл

@ -27,8 +27,8 @@ from threading import Thread
from urllib.parse import urlsplit
from tests.contrib.utils.logging_command_executor import LoggingCommandExecutor
from tests.gcp.utils.base_gcp_system_test_case import RetrieveVariables
from tests.gcp.utils.gcp_authenticator import GCP_CLOUDSQL_KEY, GcpAuthenticator
from tests.test_utils.gcp_system_helpers import RetrieveVariables
retrieve_variables = RetrieveVariables()

Просмотреть файл

@ -16,32 +16,27 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from tests.gcp.operators.test_cloud_storage_transfer_service_system_helper import GCPTransferTestHelper
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_GCS_TRANSFER_KEY
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_GCS_TRANSFER_KEY), SKIP_TEST_WARNING)
class GcpTransferExampleDagsSystemTest(TestDagGcpSystem):
@skip_gcp_system(GCP_GCS_TRANSFER_KEY, require_local_executor=True)
class GcpTransferExampleDagsSystemTest(SystemTest):
helper = GCPTransferTestHelper()
@provide_gcp_context(GCP_GCS_TRANSFER_KEY)
def setUp(self):
super().setUp()
self.gcp_authenticator.gcp_authenticate()
self.helper.create_gcs_buckets()
self.gcp_authenticator.gcp_revoke_authentication()
@provide_gcp_context(GCP_GCS_TRANSFER_KEY)
def tearDown(self):
self.gcp_authenticator.gcp_authenticate()
self.helper.delete_gcs_buckets()
self.gcp_authenticator.gcp_revoke_authentication()
super().tearDown()
def __init__(self, method_name='runTest'):
super().__init__(
method_name, dag_id='example_gcp_transfer', gcp_key=GCP_GCS_TRANSFER_KEY
)
self.helper = GCPTransferTestHelper()
@provide_gcp_context(GCP_GCS_TRANSFER_KEY)
def test_run_example_dag_compute(self):
self._run_dag()
self.run_dag('example_gcp_transfer', GCP_DAG_FOLDER)

Просмотреть файл

@ -25,8 +25,8 @@ from googleapiclient import discovery
from googleapiclient._auth import default_credentials, with_scopes
from tests.contrib.utils.logging_command_executor import LoggingCommandExecutor
from tests.gcp.utils.base_gcp_system_test_case import RetrieveVariables
from tests.gcp.utils.gcp_authenticator import GCP_GCS_TRANSFER_KEY, GcpAuthenticator
from tests.test_utils.gcp_system_helpers import RetrieveVariables
retrieve_variables = RetrieveVariables()

Просмотреть файл

@ -16,66 +16,48 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from tests.gcp.operators.test_compute_system_helper import GCPComputeTestHelper
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_COMPUTE_KEY
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_COMPUTE_KEY), SKIP_TEST_WARNING)
class GcpComputeExampleDagsSystemTest(TestDagGcpSystem):
@skip_gcp_system(GCP_COMPUTE_KEY, require_local_executor=True)
class GcpComputeExampleDagsSystemTest(SystemTest):
helper = GCPComputeTestHelper()
@provide_gcp_context(GCP_COMPUTE_KEY)
def setUp(self):
super().setUp()
self.gcp_authenticator.gcp_authenticate()
self.helper.delete_instance()
self.helper.create_instance()
self.gcp_authenticator.gcp_revoke_authentication()
@provide_gcp_context(GCP_COMPUTE_KEY)
def tearDown(self):
self.gcp_authenticator.gcp_authenticate()
self.helper.delete_instance()
self.gcp_authenticator.gcp_revoke_authentication()
super().tearDown()
def __init__(self, method_name='runTest'):
super().__init__(
method_name,
dag_id='example_gcp_compute',
gcp_key=GCP_COMPUTE_KEY)
self.helper = GCPComputeTestHelper()
@provide_gcp_context(GCP_COMPUTE_KEY)
def test_run_example_dag_compute(self):
self._run_dag()
self.run_dag('example_gcp_compute', GCP_DAG_FOLDER)
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_COMPUTE_KEY), SKIP_TEST_WARNING)
class GcpComputeIgmExampleDagsSystemTest(TestDagGcpSystem):
@skip_gcp_system(GCP_COMPUTE_KEY, require_local_executor=True)
class GcpComputeIgmExampleDagsSystemTest(SystemTest):
helper = GCPComputeTestHelper()
@provide_gcp_context(GCP_COMPUTE_KEY)
def setUp(self):
super().setUp()
self.gcp_authenticator.gcp_authenticate()
try:
self.helper.delete_instance_group_and_template(silent=True)
self.helper.create_instance_group_and_template()
finally:
self.gcp_authenticator.gcp_revoke_authentication()
self.helper.delete_instance_group_and_template(silent=True)
self.helper.create_instance_group_and_template()
@provide_gcp_context(GCP_COMPUTE_KEY)
def tearDown(self):
self.gcp_authenticator.gcp_authenticate()
try:
self.helper.delete_instance_group_and_template()
finally:
self.gcp_authenticator.gcp_revoke_authentication()
self.helper.delete_instance_group_and_template()
super().tearDown()
def __init__(self, method_name='runTest'):
super().__init__(
method_name,
dag_id='example_gcp_compute_igm',
gcp_key=GCP_COMPUTE_KEY)
self.helper = GCPComputeTestHelper()
@provide_gcp_context(GCP_COMPUTE_KEY)
def test_run_example_dag_compute_igm(self):
self._run_dag()
self.run_dag('example_gcp_compute_igm', GCP_DAG_FOLDER)

Просмотреть файл

@ -17,18 +17,14 @@
# specific language governing permissions and limitations
# under the License.
import unittest
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_DATAFLOW_KEY
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_DATAFLOW_KEY), SKIP_TEST_WARNING)
class CloudDataflowExampleDagsSystemTest(TestDagGcpSystem):
def __init__(self, method_name='runTest'):
super().__init__(
method_name, dag_id='example_gcp_dataflow', gcp_key=GCP_DATAFLOW_KEY
)
@skip_gcp_system(GCP_DATAFLOW_KEY, require_local_executor=True)
class CloudDataflowExampleDagsSystemTest(SystemTest):
@provide_gcp_context(GCP_DATAFLOW_KEY)
def test_run_example_dag_function(self):
self._run_dag()
self.run_dag('example_gcp_dataflow', GCP_DAG_FOLDER)

Просмотреть файл

@ -17,37 +17,32 @@
# specific language governing permissions and limitations
# under the License.
import os
import unittest
from tests.contrib.operators.test_dataproc_operator_system_helper import DataprocTestHelper
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_DATAPROC_KEY
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
BUCKET = os.environ.get("GCP_DATAPROC_BUCKET", "dataproc-system-tests")
PYSPARK_MAIN = os.environ.get("PYSPARK_MAIN", "hello_world.py")
PYSPARK_URI = "gs://{}/{}".format(BUCKET, PYSPARK_MAIN)
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_DATAPROC_KEY), SKIP_TEST_WARNING)
class DataprocExampleDagsTest(TestDagGcpSystem):
@skip_gcp_system(GCP_DATAPROC_KEY, require_local_executor=True)
class DataprocExampleDagsTest(SystemTest):
helper = DataprocTestHelper()
@provide_gcp_context(GCP_DATAPROC_KEY)
def setUp(self):
super().setUp()
self.gcp_authenticator.gcp_authenticate()
self.helper.create_test_bucket(BUCKET)
self.helper.upload_test_file(PYSPARK_URI, PYSPARK_MAIN)
self.gcp_authenticator.gcp_revoke_authentication()
@provide_gcp_context(GCP_DATAPROC_KEY)
def tearDown(self):
self.gcp_authenticator.gcp_authenticate()
self.helper.delete_gcs_bucket_elements(BUCKET)
self.gcp_authenticator.gcp_revoke_authentication()
super().tearDown()
def __init__(self, method_name="runTest"):
super().__init__(
method_name, dag_id="example_gcp_dataproc", gcp_key=GCP_DATAPROC_KEY
)
self.helper = DataprocTestHelper()
@provide_gcp_context(GCP_DATAPROC_KEY)
def test_run_example_dag(self):
self._run_dag()
self.run_dag(dag_id="example_gcp_dataproc", dag_folder=GCP_DAG_FOLDER)

Просмотреть файл

@ -17,32 +17,27 @@
# specific language governing permissions and limitations
# under the License.
import unittest
from tests.gcp.operators.test_datastore_system_helper import GcpDatastoreSystemTestHelper
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_DATASTORE_KEY
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_DATASTORE_KEY), SKIP_TEST_WARNING)
class GcpDatastoreSystemTest(TestDagGcpSystem):
@skip_gcp_system(GCP_DATASTORE_KEY, require_local_executor=True)
class GcpDatastoreSystemTest(SystemTest):
helper = GcpDatastoreSystemTestHelper()
@provide_gcp_context(GCP_DATASTORE_KEY)
def setUp(self):
super().setUp()
self.gcp_authenticator.gcp_authenticate()
self.helper.create_bucket()
self.gcp_authenticator.gcp_revoke_authentication()
@provide_gcp_context(GCP_DATASTORE_KEY)
def tearDown(self):
self.gcp_authenticator.gcp_authenticate()
self.helper.delete_bucket()
self.gcp_authenticator.gcp_revoke_authentication()
super().tearDown()
def __init__(self, method_name="runTest"):
super().__init__(
method_name, dag_id="example_gcp_datastore", gcp_key=GCP_DATASTORE_KEY
)
self.helper = GcpDatastoreSystemTestHelper()
@provide_gcp_context(GCP_DATASTORE_KEY)
def test_run_example_dag(self):
self._run_dag()
self.run_dag('example_gcp_datastore', GCP_DAG_FOLDER)

Просмотреть файл

@ -23,19 +23,13 @@ This module contains various unit tests for
example_gcp_dlp DAG
"""
import unittest
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_DLP_KEY
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_DLP_KEY), SKIP_TEST_WARNING)
class GcpDLPExampleDagsSystemTest(TestDagGcpSystem):
def __init__(self, method_name='runTest'):
super().__init__(
method_name,
dag_id='example_gcp_dlp',
gcp_key=GCP_DLP_KEY)
@skip_gcp_system(GCP_DLP_KEY, require_local_executor=True)
class GcpDLPExampleDagsSystemTest(SystemTest):
@provide_gcp_context(GCP_DLP_KEY)
def test_run_example_dag_function(self):
self._run_dag()
self.run_dag('example_gcp_dlp', GCP_DAG_FOLDER)

Просмотреть файл

@ -17,19 +17,13 @@
# specific language governing permissions and limitations
# under the License.
import unittest
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_FUNCTION_KEY
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_FUNCTION_KEY), SKIP_TEST_WARNING)
class GcpFunctionExampleDagsSystemTest(TestDagGcpSystem):
def __init__(self, method_name='runTest'):
super().__init__(
method_name,
dag_id='example_gcp_function',
gcp_key=GCP_FUNCTION_KEY)
@skip_gcp_system(GCP_FUNCTION_KEY, require_local_executor=True)
class GcpFunctionExampleDagsSystemTest(SystemTest):
@provide_gcp_context(GCP_FUNCTION_KEY)
def test_run_example_dag_function(self):
self._run_dag()
self.run_dag('example_gcp_function', GCP_DAG_FOLDER)

Просмотреть файл

@ -16,29 +16,29 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from tests.gcp.operators.test_gcs_system_helper import GcsSystemTestHelper
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_GCS_KEY
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_GCS_KEY), SKIP_TEST_WARNING)
class GoogleCloudStorageExampleDagsTest(TestDagGcpSystem):
@skip_gcp_system(GCP_GCS_KEY, require_local_executor=True)
class GoogleCloudStorageExampleDagsTest(SystemTest):
helper = GcsSystemTestHelper()
@provide_gcp_context(GCP_GCS_KEY)
def setUp(self):
super().setUp()
self.helper.create_test_file()
@provide_gcp_context(GCP_GCS_KEY)
def tearDown(self):
self.gcp_authenticator.gcp_authenticate()
self.helper.remove_test_files()
self.helper.remove_bucket()
self.gcp_authenticator.gcp_revoke_authentication()
super().tearDown()
def __init__(self, method_name="runTest"):
super().__init__(method_name, dag_id="example_gcs", gcp_key=GCP_GCS_KEY)
self.helper = GcsSystemTestHelper()
@provide_gcp_context(GCP_GCS_KEY)
def test_run_example_dag(self):
self._run_dag()
self.run_dag('example_gcs', GCP_DAG_FOLDER)

Просмотреть файл

@ -16,18 +16,15 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_GKE_KEY
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_GKE_KEY), SKIP_TEST_WARNING)
class KubernetesEngineExampleDagTest(TestDagGcpSystem):
def __init__(self, method_name="runTest"):
super().__init__(method_name, dag_id="example_gcp_gke", gcp_key=GCP_GKE_KEY)
@skip_gcp_system(GCP_GKE_KEY, require_local_executor=True)
class KubernetesEngineExampleDagTest(SystemTest):
@provide_gcp_context(GCP_GKE_KEY)
def test_run_example_dag(self):
self.gcp_authenticator.gcp_authenticate()
self._run_dag()
self.gcp_authenticator.gcp_revoke_authentication()
self.run_dag('example_gcp_gke', GCP_DAG_FOLDER)

Просмотреть файл

@ -16,32 +16,25 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from tests.gcp.operators.test_mlengine_system_helper import MlEngineSystemTestHelper
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_AI_KEY
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_AI_KEY), SKIP_TEST_WARNING)
class MlEngineExampleDagTest(TestDagGcpSystem):
def __init__(self, method_name="runTest"):
super().__init__(method_name, dag_id="example_gcp_mlengine", gcp_key=GCP_AI_KEY)
self.helper = MlEngineSystemTestHelper()
@skip_gcp_system(GCP_AI_KEY)
class MlEngineExampleDagTest(SystemTest):
helper = MlEngineSystemTestHelper()
@provide_gcp_context(GCP_AI_KEY)
def setUp(self):
super().setUp()
self.gcp_authenticator.gcp_authenticate()
self.helper.create_gcs_buckets()
self.gcp_authenticator.gcp_revoke_authentication()
@provide_gcp_context(GCP_AI_KEY)
def tearDown(self):
self.gcp_authenticator.gcp_authenticate()
self.helper.delete_gcs_buckets()
self.gcp_authenticator.gcp_revoke_authentication()
super().tearDown()
@provide_gcp_context(GCP_AI_KEY)
def test_run_example_dag(self):
self.gcp_authenticator.gcp_authenticate()
self._run_dag()
self.gcp_authenticator.gcp_revoke_authentication()
self.run_dag('example_gcp_mlengine', GCP_DAG_FOLDER)

Просмотреть файл

@ -16,18 +16,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_AI_KEY
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_AI_KEY), SKIP_TEST_WARNING)
class CloudNaturalLanguageExampleDagsTest(TestDagGcpSystem):
def __init__(self, method_name="runTest"):
super().__init__(
method_name, dag_id="example_gcp_natural_language", gcp_key=GCP_AI_KEY
)
def test_run_example_dagr(self):
self._run_dag()
@skip_gcp_system(GCP_AI_KEY, require_local_executor=True)
class CloudNaturalLanguageExampleDagsTest(SystemTest):
@provide_gcp_context(GCP_AI_KEY)
def test_run_example_dag(self):
self.run_dag('example_gcp_natural_language', GCP_DAG_FOLDER)

Просмотреть файл

@ -16,21 +16,15 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_PUBSUB_KEY
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_PUBSUB_KEY), SKIP_TEST_WARNING)
class PubSubSystemTest(TestDagGcpSystem):
def __init__(self, method_name="runTest"):
super().__init__(
method_name,
dag_id="example_gcp_pubsub",
require_local_executor=True,
gcp_key=GCP_PUBSUB_KEY,
)
@skip_gcp_system(GCP_PUBSUB_KEY, require_local_executor=True)
class PubSubSystemTest(SystemTest):
@provide_gcp_context(GCP_PUBSUB_KEY)
def test_run_example_dag(self):
self._run_dag()
self.run_dag(dag_id="example_gcp_pubsub", dag_folder=GCP_DAG_FOLDER)

Просмотреть файл

@ -16,29 +16,22 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from tests.gcp.operators.test_spanner_system_helper import GCPSpannerTestHelper
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_SPANNER_KEY
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_SPANNER_KEY), SKIP_TEST_WARNING)
class CloudSpannerExampleDagsTest(TestDagGcpSystem):
def __init__(self, method_name='runTest'):
super().__init__(
method_name,
dag_id='example_gcp_spanner',
gcp_key=GCP_SPANNER_KEY)
self.helper = GCPSpannerTestHelper()
@skip_gcp_system(GCP_SPANNER_KEY, require_local_executor=True)
class CloudSpannerExampleDagsTest(SystemTest):
helper = GCPSpannerTestHelper()
@provide_gcp_context(GCP_SPANNER_KEY)
def tearDown(self):
self.gcp_authenticator.gcp_authenticate()
try:
self.helper.delete_instance()
finally:
self.gcp_authenticator.gcp_revoke_authentication()
self.helper.delete_instance()
super().tearDown()
@provide_gcp_context(GCP_SPANNER_KEY)
def test_run_example_dag_spanner(self):
self._run_dag()
self.run_dag('example_gcp_spanner', GCP_DAG_FOLDER)

Просмотреть файл

@ -17,32 +17,27 @@
# specific language governing permissions and limitations
# under the License.
import unittest
from tests.gcp.operators.test_speech_system_helper import GCPTextToSpeechTestHelper
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_GCS_KEY
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_GCS_KEY), SKIP_TEST_WARNING)
class GCPTextToSpeechExampleDagSystemTest(TestDagGcpSystem):
@skip_gcp_system(GCP_GCS_KEY, require_local_executor=True)
class GCPTextToSpeechExampleDagSystemTest(SystemTest):
helper = GCPTextToSpeechTestHelper()
@provide_gcp_context(GCP_GCS_KEY)
def setUp(self):
super().setUp()
self.gcp_authenticator.gcp_authenticate()
self.helper.create_target_bucket()
self.gcp_authenticator.gcp_revoke_authentication()
@provide_gcp_context(GCP_GCS_KEY)
def tearDown(self):
self.gcp_authenticator.gcp_authenticate()
self.helper.delete_target_bucket()
self.gcp_authenticator.gcp_revoke_authentication()
super().tearDown()
def __init__(self, method_name="runTest"):
super().__init__(
method_name, dag_id="example_gcp_speech", gcp_key=GCP_GCS_KEY
)
self.helper = GCPTextToSpeechTestHelper()
@provide_gcp_context(GCP_GCS_KEY)
def test_run_example_dag_gcp_text_to_speech(self):
self._run_dag()
self.run_dag("example_gcp_speech", GCP_DAG_FOLDER)

Просмотреть файл

@ -22,8 +22,8 @@ import argparse
import os
from tests.contrib.utils.logging_command_executor import LoggingCommandExecutor
from tests.gcp.utils.base_gcp_system_test_case import RetrieveVariables
from tests.gcp.utils.gcp_authenticator import GCP_GCS_KEY, GcpAuthenticator
from tests.test_utils.gcp_system_helpers import RetrieveVariables
retrieve_variables = RetrieveVariables()

Просмотреть файл

@ -17,19 +17,13 @@
# specific language governing permissions and limitations
# under the License.
import unittest
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_TASKS_KEY
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_TASKS_KEY), SKIP_TEST_WARNING)
class GcpTasksExampleDagsSystemTest(TestDagGcpSystem):
def __init__(self, method_name='runTest'):
super().__init__(
method_name,
dag_id='example_gcp_tasks',
gcp_key=GCP_TASKS_KEY)
@skip_gcp_system(GCP_TASKS_KEY)
class GcpTasksExampleDagsSystemTest(SystemTest):
@provide_gcp_context(GCP_TASKS_KEY)
def test_run_example_dag_function(self):
self._run_dag()
self.run_dag('example_gcp_tasks', GCP_DAG_FOLDER)

Просмотреть файл

@ -17,18 +17,14 @@
# specific language governing permissions and limitations
# under the License.
import unittest
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_AI_KEY
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_AI_KEY), SKIP_TEST_WARNING)
class CloudTranslateExampleDagsSystemTest(TestDagGcpSystem):
def __init__(self, method_name='runTest'):
super().__init__(
method_name, dag_id='example_gcp_translate', gcp_key=GCP_AI_KEY
)
@skip_gcp_system(GCP_AI_KEY, require_local_executor=True)
class CloudTranslateExampleDagsSystemTest(SystemTest):
@provide_gcp_context(GCP_AI_KEY)
def test_run_example_dag_function(self):
self._run_dag()
self.run_dag('example_gcp_translate', GCP_DAG_FOLDER)

Просмотреть файл

@ -16,37 +16,28 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from tests.gcp.operators.test_video_intelligence_system_helper import GCPVideoIntelligenceHelper
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_AI_KEY
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_AI_KEY), SKIP_TEST_WARNING)
class CloudVideoIntelligenceExampleDagsTest(TestDagGcpSystem):
def __init__(self, method_name="runTest"):
super().__init__(
method_name, dag_id="example_gcp_video_intelligence", gcp_key=GCP_AI_KEY
)
self.helper = GCPVideoIntelligenceHelper()
@skip_gcp_system(GCP_AI_KEY, require_local_executor=True)
class CloudVideoIntelligenceExampleDagsTest(SystemTest):
helper = GCPVideoIntelligenceHelper()
@provide_gcp_context(GCP_AI_KEY)
def setUp(self):
self.gcp_authenticator.gcp_authenticate()
try:
self.helper.create_bucket()
self.gcp_authenticator.gcp_revoke_authentication()
finally:
pass
self.helper.create_bucket()
super().setUp()
@provide_gcp_context(GCP_AI_KEY)
def tearDown(self):
self.gcp_authenticator.gcp_authenticate()
try:
self.helper.delete_bucket()
finally:
self.gcp_authenticator.gcp_revoke_authentication()
self.helper.delete_bucket()
super().tearDown()
@provide_gcp_context(GCP_AI_KEY)
def test_run_example_dag_spanner(self):
self._run_dag()
self.run_dag('example_gcp_video_intelligence', GCP_DAG_FOLDER)

Просмотреть файл

@ -1,277 +0,0 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import subprocess
import unittest
from glob import glob
from shutil import move
from tempfile import mkdtemp
from airflow import AirflowException, LoggingMixin, models
from airflow.utils import db as db_utils
from airflow.utils.timezone import datetime
from tests.contrib.utils.run_once_decorator import run_once
from tests.gcp.utils.gcp_authenticator import GcpAuthenticator
AIRFLOW_MAIN_FOLDER = os.path.realpath(os.path.join(
os.path.dirname(os.path.realpath(__file__)),
os.pardir, os.pardir, os.pardir))
AIRFLOW_PARENT_FOLDER = os.path.realpath(os.path.join(AIRFLOW_MAIN_FOLDER,
os.pardir, os.pardir, os.pardir))
ENV_FILE_RETRIEVER = os.path.join(AIRFLOW_PARENT_FOLDER,
"get_system_test_environment_variables.py")
# Retrieve environment variables from parent directory retriever - it should be
# in the path ${AIRFLOW_SOURCES}/../../get_system_test_environment_variables.py
# and it should print all the variables in form of key=value to the stdout
class RetrieveVariables:
@staticmethod
@run_once
def retrieve_variables():
if os.path.isfile(ENV_FILE_RETRIEVER):
if os.environ.get('AIRFLOW__CORE__UNIT_TEST_MODE'):
raise Exception("Please unset the AIRFLOW__CORE__UNIT_TEST_MODE")
variables = subprocess.check_output([ENV_FILE_RETRIEVER]).decode("utf-8")
print("Applying variables retrieved")
for line in variables.split("\n"):
try:
variable, key = line.split("=")
except ValueError:
continue
print("{}={}".format(variable, key))
os.environ[variable] = key
RetrieveVariables.retrieve_variables()
DEFAULT_DATE = datetime(2015, 1, 1)
GCP_OPERATORS_EXAMPLES_DAG_FOLDER = os.path.join(
AIRFLOW_MAIN_FOLDER, "airflow", "gcp", "example_dags")
OPERATORS_EXAMPLES_DAG_FOLDER = os.path.join(
AIRFLOW_MAIN_FOLDER, "airflow", "example_dags")
AIRFLOW_HOME = os.environ.get('AIRFLOW_HOME',
os.path.join(os.path.expanduser('~'), 'airflow'))
DAG_FOLDER = os.path.join(AIRFLOW_HOME, "dags")
SKIP_TEST_WARNING = """
The test is only run when the test is run in with GCP-system-tests enabled
environment. You can enable it in one of two ways:
* Set GCP_CONFIG_DIR environment variable to point to the GCP configuration
directory which keeps variables.env file with environment variables to set
and keys directory which keeps service account keys in .json format
* Run this test within automated environment variable workspace where
config directory is checked out next to the airflow one.
""".format(__file__)
SKIP_LONG_TEST_WARNING = """
The test is only run when the test is run in with GCP-system-tests enabled
environment. And environment variable GCP_ENABLE_LONG_TESTS is set to True.
You can enable it in one of two ways:
* Set GCP_CONFIG_DIR environment variable to point to the GCP configuration
directory which keeps variables.env file with environment variables to set
and keys directory which keeps service account keys in .json format and
set GCP_ENABLE_LONG_TESTS to True
* Run this test within automated environment variable workspace where
config directory is checked out next to the airflow one.
""".format(__file__)
class TestBaseGcpSystem(unittest.TestCase, LoggingMixin):
def __init__(self,
method_name,
gcp_key,
project_extra=None):
super().__init__(methodName=method_name)
self.gcp_authenticator = GcpAuthenticator(gcp_key=gcp_key,
project_extra=project_extra)
self.setup_called = False
@staticmethod
def skip_check(key_name):
return GcpAuthenticator(key_name).full_key_path is None
@staticmethod
def skip_long(key_name):
if os.environ.get('GCP_ENABLE_LONG_TESTS') == 'True':
return GcpAuthenticator(key_name).full_key_path is None
return True
def setUp(self):
self.gcp_authenticator.gcp_store_authentication()
self.gcp_authenticator.gcp_authenticate()
# We checked that authentication works. Ne we revoke it to make
# sure we are not relying on the default authentication
self.gcp_authenticator.gcp_revoke_authentication()
self.setup_called = True
# noinspection PyPep8Naming
def tearDown(self):
self.gcp_authenticator.gcp_restore_authentication()
class TestDagGcpSystem(TestBaseGcpSystem):
def __init__(self,
method_name,
gcp_key,
dag_id=None,
dag_name=None,
require_local_executor=False,
example_dags_folder=GCP_OPERATORS_EXAMPLES_DAG_FOLDER,
project_extra=None):
super().__init__(method_name=method_name,
gcp_key=gcp_key,
project_extra=project_extra)
self.dag_id = dag_id
self.dag_name = self.dag_id + '.py' if not dag_name else dag_name
self.example_dags_folder = example_dags_folder
self.require_local_executor = require_local_executor
self.temp_dir = None
self.args = {}
@staticmethod
def _get_dag_folder():
return DAG_FOLDER
@staticmethod
def _get_files_to_link(path):
"""
Returns all file names (note - file names not paths)
that have the same base name as the .py dag file (for example dag_name.sql etc.)
:param path: path to the dag file.
:return: list of files matching the base name
"""
prefix, ext = os.path.splitext(path)
assert ext == '.py', "Dag name should be a .py file and is {} file".format(ext)
files_to_link = []
for file in glob(prefix + ".*"):
files_to_link.append(os.path.basename(file))
return files_to_link
def _symlink_dag_and_associated_files(self, remove=False):
target_folder = self._get_dag_folder()
source_path = os.path.join(self.example_dags_folder, self.dag_name)
for file_name in self._get_files_to_link(source_path):
source_path = os.path.join(self.example_dags_folder, file_name)
target_path = os.path.join(target_folder, file_name)
if remove:
try:
self.log.info("Remove symlink: %s -> %s", target_path, source_path)
os.remove(target_path)
except OSError:
pass
else:
if not os.path.exists(target_path):
self.log.info("Symlink: %s -> %s ", target_path, source_path)
os.symlink(source_path, target_path)
else:
self.log.info("Symlink %s already exists. Not symlinking it.", target_path)
def _store_dags_to_temporary_directory(self):
dag_folder = self._get_dag_folder()
self.temp_dir = mkdtemp()
self.log.info("Storing DAGS from %s to temporary directory %s", dag_folder, self.temp_dir)
try:
os.mkdir(dag_folder)
except OSError:
pass
for file in os.listdir(dag_folder):
move(os.path.join(dag_folder, file), os.path.join(self.temp_dir, file))
def _restore_dags_from_temporary_directory(self):
dag_folder = self._get_dag_folder()
self.log.info("Restoring DAGS to %s from temporary directory %s", dag_folder, self.temp_dir)
for file in os.listdir(self.temp_dir):
move(os.path.join(self.temp_dir, file), os.path.join(dag_folder, file))
def _run_dag(self, dag_id=None):
self.log.info("Attempting to run DAG: %s", self.dag_id)
if not self.setup_called:
raise AirflowException("Please make sure to call super.setUp() in your "
"test class!")
dag_folder = self._get_dag_folder()
dag_bag = models.DagBag(dag_folder=dag_folder, include_examples=False)
self.args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
dag = dag_bag.get_dag(self.dag_id or dag_id)
if dag is None:
raise AirflowException(
"The Dag {} could not be found. It's either an import problem or "
"the dag {} was not symlinked to the DAGs folder. "
"The content of the {} folder is {}".
format(self.dag_id,
self.dag_name,
dag_folder,
os.listdir(dag_folder)))
dag.clear(reset_dag_runs=True)
dag.run(ignore_first_depends_on_past=True, verbose=True)
@staticmethod
def _check_local_executor_setup():
postgres_path = os.path.realpath(os.path.join(
AIRFLOW_MAIN_FOLDER,
"tests", "contrib", "operators", "postgres_local_executor.cfg"))
if postgres_path != os.environ.get('AIRFLOW_CONFIG'):
raise AirflowException(
"""
Please set AIRFLOW_CONFIG variable to '{}'
and make sure you have a Postgres server running locally and
airflow/airflow.db database created.
You can create the database via these commands:
'createuser root'
'createdb airflow/airflow.db`
""".format(postgres_path))
# noinspection PyPep8Naming
def setUp(self):
if self.require_local_executor:
self._check_local_executor_setup()
try:
# We want to avoid random errors while database got reset - those
# Are apparently triggered by parser trying to parse DAGs while
# The tables are dropped. We move the dags temporarily out of the dags folder
# and move them back after reset
self._store_dags_to_temporary_directory()
try:
db_utils.upgradedb()
db_utils.resetdb()
finally:
self._restore_dags_from_temporary_directory()
self._symlink_dag_and_associated_files()
super().setUp()
except Exception as e:
# In case of any error during setup - restore the authentication
self.gcp_authenticator.gcp_restore_authentication()
raise e
def tearDown(self):
self._symlink_dag_and_associated_files(remove=True)
super().tearDown()

Просмотреть файл

@ -0,0 +1,147 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
import os
import unittest
from io import StringIO
from uuid import uuid4
from google.auth.environment_vars import CREDENTIALS
from airflow.gcp.utils.credentials_provider import (
AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT, build_gcp_conn, provide_gcp_conn_and_credentials,
provide_gcp_connection, provide_gcp_credentials, temporary_environment_variable,
)
from tests.compat import mock
ENV_VALUE = "test_env"
TEMP_VARIABLE = "temp_variable"
KEY = str(uuid4())
ENV_CRED = "temp_cred"
class TestHelper(unittest.TestCase):
def test_build_gcp_conn_path(self):
value = "test"
conn = build_gcp_conn(key_file_path=value)
self.assertEqual(
"google-cloud-platform://?extra__google_cloud_platform__key_path=test", conn
)
def test_build_gcp_conn_scopes(self):
value = ["test", "test2"]
conn = build_gcp_conn(scopes=value)
self.assertEqual(
"google-cloud-platform://?extra__google_cloud_platform__scope=test%2Ctest2",
conn,
)
def test_build_gcp_conn_project(self):
value = "test"
conn = build_gcp_conn(project_id=value)
self.assertEqual(
"google-cloud-platform://?extra__google_cloud_platform__projects=test", conn
)
class TestTemporaryEnvironmentVariable(unittest.TestCase):
@mock.patch.dict(os.environ, clear=True)
def test_temporary_environment_variable_delete(self):
with temporary_environment_variable(KEY, ENV_VALUE):
self.assertEqual(os.environ.get(KEY), ENV_VALUE)
self.assertNotIn(KEY, os.environ)
@mock.patch.dict(os.environ, {KEY: ENV_VALUE})
def test_temporary_environment_variable_restore(self):
with temporary_environment_variable(KEY, TEMP_VARIABLE):
self.assertEqual(os.environ.get(KEY), TEMP_VARIABLE)
self.assertEqual(os.environ.get(KEY), ENV_VALUE)
@mock.patch.dict(os.environ, clear=True)
def test_temporary_environment_variable_error(self):
with self.assertRaises(Exception):
with temporary_environment_variable(KEY, ENV_VALUE):
self.assertEqual(os.environ.get(KEY), ENV_VALUE)
raise Exception("test")
self.assertNotIn(KEY, os.environ)
class TestProvideGcpCredentials(unittest.TestCase):
@mock.patch.dict(os.environ, {CREDENTIALS: ENV_VALUE})
@mock.patch("tempfile.NamedTemporaryFile")
def test_provide_gcp_credentials_key_content(self, mock_file):
file_dict = {"foo": "bar"}
string_file = StringIO()
file_content = json.dumps(file_dict)
file_name = "/test/mock-file"
mock_file_handler = mock_file.return_value.__enter__.return_value
mock_file_handler.name = file_name
mock_file_handler.write = string_file.write
with provide_gcp_credentials(key_file_dict=file_dict):
self.assertEqual(os.environ[CREDENTIALS], file_name)
self.assertEqual(file_content, string_file.getvalue())
self.assertEqual(os.environ[CREDENTIALS], ENV_VALUE)
@mock.patch.dict(os.environ, {CREDENTIALS: ENV_VALUE})
def test_provide_gcp_credentials_keep_environment(self):
key_path = "/test/key-path"
with provide_gcp_credentials(key_file_path=key_path):
self.assertEqual(os.environ[CREDENTIALS], key_path)
self.assertEqual(os.environ[CREDENTIALS], ENV_VALUE)
class TestProvideGcpConnection(unittest.TestCase):
@mock.patch.dict(os.environ, {AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT: ENV_VALUE})
@mock.patch("airflow.gcp.utils.credentials_provider.build_gcp_conn")
def test_provide_gcp_connection(self, mock_builder):
mock_builder.return_value = TEMP_VARIABLE
path = "path/to/file.json"
scopes = ["scopes"]
project_id = "project_id"
with provide_gcp_connection(path, scopes, project_id):
mock_builder.assert_called_once_with(
key_file_path=path, scopes=scopes, project_id=project_id
)
self.assertEqual(
os.environ[AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT], TEMP_VARIABLE
)
self.assertEqual(os.environ[AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT], ENV_VALUE)
class TestProvideGcpConnAndCredentials(unittest.TestCase):
@mock.patch.dict(
os.environ,
{AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT: ENV_VALUE, CREDENTIALS: ENV_VALUE},
)
@mock.patch("airflow.gcp.utils.credentials_provider.build_gcp_conn")
def test_provide_gcp_conn_and_credentials(self, mock_builder):
mock_builder.return_value = TEMP_VARIABLE
path = "path/to/file.json"
scopes = ["scopes"]
project_id = "project_id"
with provide_gcp_conn_and_credentials(path, scopes, project_id):
mock_builder.assert_called_once_with(
key_file_path=path, scopes=scopes, project_id=project_id
)
self.assertEqual(
os.environ[AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT], TEMP_VARIABLE
)
self.assertEqual(os.environ[CREDENTIALS], path)
self.assertEqual(os.environ[AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT], ENV_VALUE)
self.assertEqual(os.environ[CREDENTIALS], ENV_VALUE)

Просмотреть файл

@ -17,48 +17,33 @@
# specific language governing permissions and limitations
# under the License.
"""System tests for Google Cloud Build operators"""
import unittest
from tests.gcp.utils.base_gcp_system_test_case import (
OPERATORS_EXAMPLES_DAG_FOLDER, SKIP_TEST_WARNING, TestDagGcpSystem,
)
from tests.gcp.utils.gcp_authenticator import GCP_GCS_KEY
from tests.operators.test_gcs_to_gcs_system_helper import GcsToGcsTestHelper
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_GCS_KEY), SKIP_TEST_WARNING)
class GcsToGcsExampleDagsSystemTest(TestDagGcpSystem):
@skip_gcp_system(GCP_GCS_KEY, require_local_executor=True)
class GcsToGcsExampleDagsSystemTest(SystemTest):
"""
System tests for Google Cloud Storage to Google Cloud Storage transfer operators
It use a real service.
"""
helper = GcsToGcsTestHelper()
def __init__(self, method_name="runTest"):
super().__init__(
method_name,
dag_id="example_gcs_to_gcs",
dag_name="example_gcs_to_gcs.py",
example_dags_folder=OPERATORS_EXAMPLES_DAG_FOLDER,
gcp_key=GCP_GCS_KEY,
)
self.helper = GcsToGcsTestHelper()
@provide_gcp_context(GCP_GCS_KEY)
def setUp(self):
super().setUp()
self.gcp_authenticator.gcp_authenticate()
try:
self.helper.create_buckets()
finally:
self.gcp_authenticator.gcp_revoke_authentication()
self.helper.create_buckets()
@provide_gcp_context(GCP_GCS_KEY)
def test_run_example_dag(self):
self._run_dag()
self.run_dag('example_gcs_to_gcs', GCP_DAG_FOLDER)
@provide_gcp_context(GCP_GCS_KEY)
def tearDown(self):
self.gcp_authenticator.gcp_authenticate()
try:
self.helper.delete_buckets()
finally:
self.gcp_authenticator.gcp_revoke_authentication()
self.helper.delete_buckets()
super().tearDown()

Просмотреть файл

@ -17,48 +17,33 @@
# specific language governing permissions and limitations
# under the License.
"""System tests for Google Cloud Build operators"""
import unittest
from tests.gcp.utils.base_gcp_system_test_case import (
OPERATORS_EXAMPLES_DAG_FOLDER, SKIP_TEST_WARNING, TestDagGcpSystem,
)
from tests.gcp.utils.gcp_authenticator import GCP_GCS_KEY
from tests.operators.test_gcs_to_sftp_system_helper import GcsToSFTPTestHelper
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_GCS_KEY), SKIP_TEST_WARNING)
class GcsToSftpExampleDagsSystemTest(TestDagGcpSystem):
@skip_gcp_system(GCP_GCS_KEY)
class GcsToSftpExampleDagsSystemTest(SystemTest):
"""
System tests for Google Cloud Storage to SFTP transfer operator
It use a real service.
"""
def __init__(self, method_name="runTest"):
super().__init__(
method_name,
dag_id="example_gcs_to_sftp",
dag_name="example_gcs_to_sftp.py",
example_dags_folder=OPERATORS_EXAMPLES_DAG_FOLDER,
gcp_key=GCP_GCS_KEY,
)
self.helper = GcsToSFTPTestHelper()
helper = GcsToSFTPTestHelper()
@provide_gcp_context(GCP_GCS_KEY)
def setUp(self):
super().setUp()
self.gcp_authenticator.gcp_authenticate()
try:
self.helper.create_buckets()
finally:
self.gcp_authenticator.gcp_revoke_authentication()
self.helper.create_buckets()
@provide_gcp_context(GCP_GCS_KEY)
def test_run_example_dag(self):
self._run_dag()
self.run_dag("example_gcs_to_sftp", GCP_DAG_FOLDER)
@provide_gcp_context(GCP_GCS_KEY)
def tearDown(self):
self.gcp_authenticator.gcp_authenticate()
try:
self.helper.delete_buckets()
finally:
self.gcp_authenticator.gcp_revoke_authentication()
self.helper.delete_buckets()
super().tearDown()

Просмотреть файл

@ -17,43 +17,34 @@
# specific language governing permissions and limitations
# under the License.
import unittest
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_AI_KEY
from tests.providers.google.cloud.operators.test_vision_system_helper import GCPVisionTestHelper
from tests.test_utils.gcp_system_helpers import GCP_DAG_FOLDER, provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
VISION_HELPER = GCPVisionTestHelper()
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_AI_KEY), SKIP_TEST_WARNING)
class CloudVisionExampleDagsSystemTest(TestDagGcpSystem):
def __init__(self, method_name='runTest'):
super().__init__(
method_name, dag_name='example_vision.py', gcp_key=GCP_AI_KEY
)
@skip_gcp_system(GCP_AI_KEY)
class CloudVisionExampleDagsSystemTest(SystemTest):
@provide_gcp_context(GCP_AI_KEY)
def setUp(self):
super().setUp()
self.gcp_authenticator.gcp_authenticate()
try:
VISION_HELPER.create_bucket()
finally:
self.gcp_authenticator.gcp_revoke_authentication()
VISION_HELPER.create_bucket()
@provide_gcp_context(GCP_AI_KEY)
def tearDown(self):
self.gcp_authenticator.gcp_authenticate()
try:
VISION_HELPER.delete_bucket()
finally:
self.gcp_authenticator.gcp_revoke_authentication()
VISION_HELPER.delete_bucket()
super().tearDown()
@provide_gcp_context(GCP_AI_KEY)
def test_run_example_gcp_vision_autogenerated_id_dag(self):
self._run_dag('example_gcp_vision_autogenerated_id')
self.run_dag('example_gcp_vision_autogenerated_id', GCP_DAG_FOLDER)
@provide_gcp_context(GCP_AI_KEY)
def test_run_example_gcp_vision_explicit_id_dag(self):
self._run_dag('example_gcp_vision_explicit_id')
self.run_dag('example_gcp_vision_explicit_id', GCP_DAG_FOLDER)
@provide_gcp_context(GCP_AI_KEY)
def test_run_example_gcp_vision_annotate_image_dag(self):
self._run_dag('example_gcp_vision_annotate_image')
self.run_dag('example_gcp_vision_annotate_image', GCP_DAG_FOLDER)

Просмотреть файл

@ -16,47 +16,36 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GOOGLE_CAMPAIGN_MANAGER_KEY
from tests.providers.google.marketing_platform.operators.test_campaign_manager_system_helper import (
GoogleCampaignManagerTestHelper,
)
from tests.test_utils.gcp_system_helpers import provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
# Required scopes for future tests
# ['https://www.googleapis.com/auth/dfatrafficking',
# 'https://www.googleapis.com/auth/dfareporting',
# 'https://www.googleapis.com/auth/ddmconversions']
# Required scopes
SCOPES = [
'https://www.googleapis.com/auth/dfatrafficking',
'https://www.googleapis.com/auth/dfareporting',
'https://www.googleapis.com/auth/ddmconversions'
]
@unittest.skipIf(
TestDagGcpSystem.skip_check(GOOGLE_CAMPAIGN_MANAGER_KEY), SKIP_TEST_WARNING
)
class CampaignManagerSystemTest(TestDagGcpSystem):
def __init__(self, method_name="runTest"):
super().__init__(
method_name,
dag_id="example_campaign_manager",
gcp_key=GOOGLE_CAMPAIGN_MANAGER_KEY,
)
self.helper = GoogleCampaignManagerTestHelper()
@skip_gcp_system(GOOGLE_CAMPAIGN_MANAGER_KEY)
class CampaignManagerSystemTest(SystemTest):
helper = GoogleCampaignManagerTestHelper()
@provide_gcp_context(GOOGLE_CAMPAIGN_MANAGER_KEY)
def setUp(self):
super().setUp()
self.gcp_authenticator.gcp_authenticate()
try:
self.helper.create_bucket()
finally:
self.gcp_authenticator.gcp_revoke_authentication()
self.helper.create_bucket()
@provide_gcp_context(GOOGLE_CAMPAIGN_MANAGER_KEY)
def tearDown(self):
self.gcp_authenticator.gcp_authenticate()
try:
self.helper.delete_bucket()
finally:
self.gcp_authenticator.gcp_revoke_authentication()
self.helper.delete_bucket()
super().tearDown()
@provide_gcp_context(GOOGLE_CAMPAIGN_MANAGER_KEY, scopes=SCOPES)
def test_run_example_dag(self):
self._run_dag()
self.run_dag('example_campaign_manager', "airflow/providers/google/marketing_platform/example_dags")

Просмотреть файл

@ -15,37 +15,31 @@
# specific language governing permissions and limitations
# under the License.
import unittest
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_DISPLAY_VIDEO_KEY
from tests.providers.google.marketing_platform.operators.test_display_video_system_helper import (
GcpDisplayVideoSystemTestHelper,
)
from tests.test_utils.gcp_system_helpers import provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
# Requires the following scope:
# https://www.googleapis.com/auth/doubleclickbidmanager
SCOPES = ["https://www.googleapis.com/auth/doubleclickbidmanager"]
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_DISPLAY_VIDEO_KEY), SKIP_TEST_WARNING)
class DisplayVideoSystemTest(TestDagGcpSystem):
@skip_gcp_system(GCP_DISPLAY_VIDEO_KEY)
class DisplayVideoSystemTest(SystemTest):
helper = GcpDisplayVideoSystemTestHelper()
@provide_gcp_context(GCP_DISPLAY_VIDEO_KEY)
def setUp(self):
super().setUp()
self.gcp_authenticator.gcp_authenticate()
self.helper.create_bucket()
self.gcp_authenticator.gcp_revoke_authentication()
@provide_gcp_context(GCP_DISPLAY_VIDEO_KEY)
def tearDown(self):
self.gcp_authenticator.gcp_authenticate()
self.helper.delete_bucket()
self.gcp_authenticator.gcp_revoke_authentication()
super().tearDown()
def __init__(self, method_name="runTest"):
super().__init__(
method_name, dag_id="example_display_video", gcp_key=GCP_DISPLAY_VIDEO_KEY
)
self.helper = GcpDisplayVideoSystemTestHelper()
@provide_gcp_context(GCP_DISPLAY_VIDEO_KEY, scopes=SCOPES)
def test_run_example_dag(self):
self._run_dag()
self.run_dag('example_display_video', "airflow/providers/google/marketing_platform/example_dags")

Просмотреть файл

@ -16,34 +16,29 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_SEARCHADS_KEY
from tests.providers.google.marketing_platform.operators.test_search_ads_system_helper import (
GoogleSearchAdsSystemTestHelper,
)
from tests.test_utils.gcp_system_helpers import provide_gcp_context, skip_gcp_system
from tests.test_utils.system_tests_class import SystemTest
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_SEARCHADS_KEY), SKIP_TEST_WARNING)
class SearchAdsSystemTest(TestDagGcpSystem):
def __init__(self, method_name="runTest"):
super().__init__(
method_name, dag_id="example_search_ads", gcp_key=GCP_SEARCHADS_KEY
)
self.helper = GoogleSearchAdsSystemTestHelper()
@skip_gcp_system(GCP_SEARCHADS_KEY)
class SearchAdsSystemTest(SystemTest):
helper = GoogleSearchAdsSystemTestHelper()
@provide_gcp_context(GCP_SEARCHADS_KEY)
def setUp(self):
super().setUp()
self.gcp_authenticator.gcp_authenticate()
self.helper.create_bucket()
self.gcp_authenticator.gcp_revoke_authentication()
@provide_gcp_context(GCP_SEARCHADS_KEY)
def tearDown(self):
self.gcp_authenticator.gcp_authenticate()
self.helper.delete_bucket()
self.gcp_authenticator.gcp_revoke_authentication()
super().tearDown()
@provide_gcp_context(GCP_SEARCHADS_KEY)
def test_run_example_dag(self):
self._run_dag()
self.run_dag("example_search_ads", "airflow/providers/google/marketing_platform/example_dags")

Просмотреть файл

@ -0,0 +1,185 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import subprocess
import unittest
from typing import Optional, Sequence
from airflow.gcp.utils.credentials_provider import provide_gcp_conn_and_credentials
from tests.contrib.utils.run_once_decorator import run_once
from tests.gcp.utils.gcp_authenticator import GcpAuthenticator
GCP_DAG_FOLDER = "airflow/gcp/example_dags"
AIRFLOW_MAIN_FOLDER = os.path.realpath(
os.path.join(
os.path.dirname(os.path.realpath(__file__)), os.pardir, os.pardir, os.pardir
)
)
AIRFLOW_PARENT_FOLDER = os.path.realpath(
os.path.join(AIRFLOW_MAIN_FOLDER, os.pardir, os.pardir, os.pardir)
)
ENV_FILE_RETRIEVER = os.path.join(
AIRFLOW_PARENT_FOLDER, "get_system_test_environment_variables.py"
)
AIRFLOW_HOME = os.environ.get(
"AIRFLOW_HOME", os.path.join(os.path.expanduser("~"), "airflow")
)
POSTGRES_LOCAL_EXECUTOR = os.path.realpath(
os.path.join(
AIRFLOW_HOME, "tests", "contrib", "operators", "postgres_local_executor.cfg"
)
)
SKIP_TEST_WARNING = """
The test is only run when the test is run in environment with GCP-system-tests enabled
environment. You can enable it in one of two ways:
* Set GCP_CONFIG_DIR environment variable to point to the GCP configuration
directory which keeps variables.env file with environment variables to set
and keys directory which keeps service account keys in .json format
* Run this test within automated environment variable workspace where
config directory is checked out next to the airflow one.
"""
SKIP_LONG_TEST_WARNING = """
The test is only run when the test is run in with GCP-system-tests enabled
environment. And environment variable GCP_ENABLE_LONG_TESTS is set to True.
You can enable it in one of two ways:
* Set GCP_CONFIG_DIR environment variable to point to the GCP configuration
directory which keeps variables.env file with environment variables to set
and keys directory which keeps service account keys in .json format and
set GCP_ENABLE_LONG_TESTS to True
* Run this test within automated environment variable workspace where
config directory is checked out next to the airflow one.
"""
LOCAL_EXECUTOR_WARNING = """
The test requires local executor. Please set AIRFLOW_CONFIG variable to '{}'
and make sure you have a Postgres server running locally and
airflow/airflow.db database created.
You can create the database via these commands:
'createuser root'
'createdb airflow/airflow.db`
"""
class RetrieveVariables:
"""
Retrieve environment variables from parent directory retriever - it should be
in the path ${AIRFLOW_SOURCES}/../../get_system_test_environment_variables.py
and it should print all the variables in form of key=value to the stdout
"""
@staticmethod
@run_once
def retrieve_variables():
if os.path.isfile(ENV_FILE_RETRIEVER):
if os.environ.get("AIRFLOW__CORE__UNIT_TEST_MODE"):
raise Exception("Please unset the AIRFLOW__CORE__UNIT_TEST_MODE")
variables = subprocess.check_output([ENV_FILE_RETRIEVER]).decode("utf-8")
print("Applying variables retrieved")
for line in variables.split("\n"):
try:
variable, key = line.split("=")
except ValueError:
continue
print("{}={}".format(variable, key))
os.environ[variable] = key
RetrieveVariables.retrieve_variables()
def skip_gcp_system(
service_key: str, long_lasting: bool = False, require_local_executor: bool = False
):
"""
Decorator for skipping GCP system tests.
:param service_key: name of the service key that will be used to provide credentials
:type service_key: str
:param long_lasting: set True if a test take relatively long time
:type long_lasting: bool
:param require_local_executor: set True if test config must use local executor
:type require_local_executor: bool
"""
if GcpAuthenticator(service_key).full_key_path is None:
return unittest.skip(SKIP_TEST_WARNING)
if long_lasting and os.environ.get("GCP_ENABLE_LONG_TESTS") == "True":
return unittest.skip(SKIP_LONG_TEST_WARNING)
if require_local_executor and POSTGRES_LOCAL_EXECUTOR != os.environ.get(
"AIRFLOW_CONFIG"
):
return unittest.skip(LOCAL_EXECUTOR_WARNING.format(POSTGRES_LOCAL_EXECUTOR))
return lambda cls: cls
def resolve_full_gcp_key_path(key: str) -> str:
"""
Returns path full path to provided GCP key.
:param key: Name of the GCP key, for example ``my_service.json``
:type key: str
:returns: Full path to the key
"""
if "/" not in key:
path = os.environ.get("GCP_CONFIG_DIR", "/config")
key = os.path.join(path, "keys", key)
return key
def provide_gcp_context(
key_file_path: Optional[str] = None,
scopes: Optional[Sequence] = None,
project_id: Optional[str] = None,
):
"""
Context manager that provides both:
- GCP credentials for application supporting `Application Default Credentials (ADC)
strategy <https://cloud.google.com/docs/authentication/production>`__.
- temporary value of ``AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT`` connection
Moreover it resolves full path to service keys so user can pass ``myservice.json``
as ``key_file_path``.
:param key_file_path: Path to file with GCP credentials .json file.
:type key_file_path: str
:param scopes: OAuth scopes for the connection
:type scopes: Sequence
:param project_id: The id of GCP project for the connection.
:type project_id: str
"""
key_file_path = resolve_full_gcp_key_path(key_file_path) # type: ignore
return provide_gcp_conn_and_credentials(
key_file_path=key_file_path, scopes=scopes, project_id=project_id
)