[AIRFLOW-4758] Add GcsToGDriveOperator operator (#5822)

* [AIRFLOW-4758] Add GcsToGDriveOperator operator
This commit is contained in:
Kamil Breguła 2019-09-06 12:45:15 +02:00 коммит произвёл Jarek Potiuk
Родитель 6b82b9ef91
Коммит 0076e17a91
7 изменённых файлов: 809 добавлений и 1 удалений

Просмотреть файл

@ -0,0 +1,59 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example DAG using GoogleCloudStorageToGoogleDriveOperator.
"""
import os
import airflow
from airflow import models
from airflow.contrib.operators.gcs_to_gdrive_operator import GcsToGDriveOperator
GCS_TO_GDRIVE_BUCKET = os.environ.get("GCS_TO_DRIVE_BUCKET", "example-object")
default_args = {"start_date": airflow.utils.dates.days_ago(1)}
with models.DAG(
"example_gcs_to_gdrive", default_args=default_args, schedule_interval=None # Override to match your needs
) as dag:
# [START howto_operator_gcs_to_gdrive_copy_single_file]
copy_single_file = GcsToGDriveOperator(
task_id="copy_single_file",
source_bucket=GCS_TO_GDRIVE_BUCKET,
source_object="sales/january.avro",
destination_object="copied_sales/january-backup.avro",
)
# [END howto_operator_gcs_to_gdrive_copy_single_file]
# [START howto_operator_gcs_to_gdrive_copy_files]
copy_files = GcsToGDriveOperator(
task_id="copy_files",
source_bucket=GCS_TO_GDRIVE_BUCKET,
source_object="sales/*",
destination_object="copied_sales/",
)
# [END howto_operator_gcs_to_gdrive_copy_files]
# [START howto_operator_gcs_to_gdrive_move_files]
move_files = GcsToGDriveOperator(
task_id="move_files",
source_bucket=GCS_TO_GDRIVE_BUCKET,
source_object="sales/*.avro",
move_object=True,
)
# [END howto_operator_gcs_to_gdrive_move_files]

Просмотреть файл

@ -0,0 +1,135 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Hook for Google Drive service"""
from typing import Any
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
# noinspection PyAbstractClass
class GoogleDriveHook(GoogleCloudBaseHook):
"""
Hook for the Google Drive APIs.
:param api_version: API version used (for example v3).
:type api_version: str
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must have
domain-wide delegation enabled.
:type delegate_to: str
"""
_conn = None
def __init__(
self, api_version: str = "v3", gcp_conn_id: str = "google_cloud_default", delegate_to: str = None
) -> None:
super().__init__(gcp_conn_id, delegate_to)
self.api_version = api_version
self.num_retries = self._get_field("num_retries", 5)
def get_conn(self) -> Any:
"""
Retrieves the connection to Google Drive.
:return: Google Drive services object.
"""
if not self._conn:
http_authorized = self._authorize()
self._conn = build("drive", self.api_version, http=http_authorized, cache_discovery=False)
return self._conn
def _ensure_folders_exists(self, path: str) -> str:
service = self.get_conn()
current_parent = "root"
folders = path.split("/")
depth = 0
# First tries to enter directories
for current_folder in folders:
self.log.debug("Looking for %s directory with %s parent", current_folder, current_parent)
conditions = [
"mimeType = 'application/vnd.google-apps.folder'",
"name='{}'".format(current_folder),
"'{}' in parents".format(current_parent),
]
result = (
service.files() # pylint: disable=no-member
.list(q=" and ".join(conditions), spaces="drive", fields="files(id, name)")
.execute(num_retries=self.num_retries)
)
files = result.get("files", [])
if not files:
self.log.info("Not found %s directory", current_folder)
# If the directory does not exist, break loops
break
depth += 1
current_parent = files[0].get("id")
# Check if there are directories to process
if depth != len(folders):
# Create missing directories
for current_folder in folders[depth:]:
file_metadata = {
"name": current_folder,
"mimeType": "application/vnd.google-apps.folder",
"parents": [current_parent],
}
file = (
service.files() # pylint: disable=no-member
.create(body=file_metadata, fields="id")
.execute(num_retries=self.num_retries)
)
self.log.info("Created %s directory", current_folder)
current_parent = file.get("id")
# Return the ID of the last directory
return current_parent
def upload_file(self, local_location: str, remote_location: str) -> str:
"""
Uploads a file that is available locally to a Google Drive service.
:param local_location: The path where the file is available.
:type local_location: str
:param remote_location: The path where the file will be send
:type remote_location: str
:return: File ID
:rtype: str
"""
service = self.get_conn()
directory_path, _, filename = remote_location.rpartition("/")
if directory_path:
parent = self._ensure_folders_exists(directory_path)
else:
parent = "root"
file_metadata = {"name": filename, "parents": [parent]}
media = MediaFileUpload(local_location)
file = (
service.files() # pylint: disable=no-member
.create(body=file_metadata, media_body=media, fields="id")
.execute(num_retries=self.num_retries)
)
self.log.info("File %s uploaded to gdrive://%s.", local_location, remote_location)
return file.get("id")

Просмотреть файл

@ -0,0 +1,147 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This module contains a Google Cloud Storage operator.
"""
import tempfile
from typing import Optional
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.contrib.hooks.gdrive_hook import GoogleDriveHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
WILDCARD = "*"
class GcsToGDriveOperator(BaseOperator):
"""
Copies objects from a Google Cloud Storage service service to Google Drive service, with renaming
if requested.
Using this operator requires the following OAuth 2.0 scope:
.. code-block:: none
https://www.googleapis.com/auth/drive
:param source_bucket: The source Google Cloud Storage bucket where the object is. (templated)
:type source_bucket: str
:param source_object: The source name of the object to copy in the Google cloud
storage bucket. (templated)
You can use only one wildcard for objects (filenames) within your bucket. The wildcard can appear
inside the object name or at the end of the object name. Appending a wildcard to the bucket name
is unsupported.
:type source_object: str
:param destination_object: The destination name of the object in the destination Google Drive
service. (templated)
If a wildcard is supplied in the source_object argument, this is the prefix that will be prepended
to the final destination objects' paths.
Note that the source path's part before the wildcard will be removed;
if it needs to be retained it should be appended to destination_object.
For example, with prefix ``foo/*`` and destination_object ``blah/``, the file ``foo/baz`` will be
copied to ``blah/baz``; to retain the prefix write the destination_object as e.g. ``blah/foo``, in
which case the copied file will be named ``blah/foo/baz``.
:type destination_object: str
:param move_object: When move object is True, the object is moved instead of copied to the new location.
This is the equivalent of a mv command as opposed to a cp command.
:type move_object: bool
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must have domain-wide delegation enabled.
:type delegate_to: str
"""
template_fields = ("source_bucket", "source_object", "destination_object")
ui_color = "#f0eee4"
@apply_defaults
def __init__(
self,
source_bucket: str,
source_object: str,
destination_object: str = None,
move_object: bool = False,
gcp_conn_id: str = "google_cloud_default",
delegate_to: str = None,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.source_bucket = source_bucket
self.source_object = source_object
self.destination_object = destination_object
self.move_object = move_object
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.gcs_hook = None # type: Optional[GoogleCloudStorageHook]
self.gdrive_hook = None # type: Optional[GoogleDriveHook]
def execute(self, context):
self.gcs_hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to
)
self.gdrive_hook = GoogleDriveHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to)
if WILDCARD in self.source_object:
total_wildcards = self.source_object.count(WILDCARD)
if total_wildcards > 1:
error_msg = (
"Only one wildcard '*' is allowed in source_object parameter. "
"Found {} in {}.".format(total_wildcards, self.source_object)
)
raise AirflowException(error_msg)
prefix, delimiter = self.source_object.split(WILDCARD, 1)
objects = self.gcs_hook.list(self.source_bucket, prefix=prefix, delimiter=delimiter)
for source_object in objects:
if self.destination_object is None:
destination_object = source_object
else:
destination_object = source_object.replace(prefix, self.destination_object, 1)
self._copy_single_object(source_object=source_object, destination_object=destination_object)
else:
self._copy_single_object(
source_object=self.source_object, destination_object=self.destination_object
)
def _copy_single_object(self, source_object, destination_object):
self.log.info(
"Executing copy of gs://%s/%s to gdrive://%s",
self.source_bucket,
source_object,
destination_object,
)
with tempfile.NamedTemporaryFile() as file:
filename = file.name
self.gcs_hook.download(
bucket_name=self.source_bucket, object_name=source_object, filename=filename
)
self.gdrive_hook.upload_file(local_location=filename, remote_location=destination_object)
if self.move_object:
self.gcs_hook.delete(self.source_bucket, source_object)

Просмотреть файл

@ -0,0 +1,90 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Google Cloud Storage to Google Drive Transfer Operators
=======================================================
Google has two services that store data. The `Google Cloud Storage <https://cloud.google.com/storage/>`__ is
used to store large data from various applications. The `Google Drive <https://www.google.com/drive/>`__ is
used to store daily use data, including documents and photos. Google Cloud Storage has strong integration
with Google Cloud Platform services. Google Drive has built-in mechanisms to facilitate group work e.g.
document editor, file sharing mechanisms.
.. contents::
:depth: 1
:local:
Prerequisite Tasks
^^^^^^^^^^^^^^^^^^
.. include:: _partials/prerequisite_tasks.rst
.. _howto/operator:GcsToGDriveOperator:
Operator
^^^^^^^^
Transfer files between Google Storage and Google Drive is performed with the
:class:`~airflow.contrib.operators.gcs_to_gdrive_operator.GcsToGDriveOperator` operator.
You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.contrib.operators.gcs_to_gdrive_operator.GcsToGDriveOperator`
parameters which allows you to dynamically determine values.
Copy single files
-----------------
The following Operator would copy a single file.
.. exampleinclude:: ../../../../airflow/contrib/example_dags/example_gcs_to_gdrive.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcs_to_gdrive_copy_single_file]
:end-before: [END howto_operator_gcs_to_gdrive_copy_single_file]
Copy multiple files
-------------------
The following Operator would copy all the multiples files (i.e. using wildcard).
.. exampleinclude:: ../../../../airflow/contrib/example_dags/example_gcs_to_gdrive.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcs_to_gdrive_copy_files]
:end-before: [END howto_operator_gcs_to_gdrive_copy_files]
Move files
----------
Using the `move_object` parameter allows you to move the files. After copying the file to Google Drive,
the original file from the bucket is deleted.
.. exampleinclude:: ../../../../airflow/contrib/example_dags/example_gcs_to_gdrive.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcs_to_gdrive_move_files]
:end-before: [END howto_operator_gcs_to_gdrive_move_files]
Reference
^^^^^^^^^
For further information, look at:
* `Google Drive API Documentation <https://developers.google.com/drive/api/v3/about-sdk>`__
* `Google Cloud Storage Documentation <https://cloud.google.com/storage/>`__

Просмотреть файл

@ -0,0 +1,215 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import unittest
import mock
from airflow.contrib.hooks.gdrive_hook import GoogleDriveHook
from tests.contrib.utils.base_gcp_mock import GCP_CONNECTION_WITH_PROJECT_ID
class TestGoogleDriveHook(unittest.TestCase):
def setUp(self):
self.patcher_get_connections = mock.patch(
"airflow.hooks.base_hook.BaseHook.get_connections", return_value=[GCP_CONNECTION_WITH_PROJECT_ID]
)
self.patcher_get_connections.start()
self.gdrive_hook = GoogleDriveHook(gcp_conn_id="test")
def tearDown(self) -> None:
self.patcher_get_connections.stop()
@mock.patch(
"airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook._authorize", return_value="AUTHORIZE"
)
@mock.patch("airflow.contrib.hooks.gdrive_hook.build")
def test_get_conn(self, mock_discovery_build, mock_authorize):
self.gdrive_hook.get_conn()
mock_discovery_build.assert_called_once_with("drive", "v3", cache_discovery=False, http="AUTHORIZE")
@mock.patch("airflow.contrib.hooks.gdrive_hook.GoogleDriveHook.get_conn")
def test_ensure_folders_exists_when_no_folder_exists(self, mock_get_conn):
mock_get_conn.return_value.files.return_value.list.return_value.execute.return_value = {"files": []}
mock_get_conn.return_value.files.return_value.create.return_value.execute.side_effect = [
{"id": "ID_1"},
{"id": "ID_2"},
{"id": "ID_3"},
{"id": "ID_4"},
]
result_value = self.gdrive_hook._ensure_folders_exists("AAA/BBB/CCC/DDD")
mock_get_conn.assert_has_calls(
[
mock.call()
.files()
.create(
body={
"name": "AAA",
"mimeType": "application/vnd.google-apps.folder",
"parents": ["root"],
},
fields="id",
),
mock.call()
.files()
.create(
body={
"name": "BBB",
"mimeType": "application/vnd.google-apps.folder",
"parents": ["ID_1"],
},
fields="id",
),
mock.call()
.files()
.create(
body={
"name": "CCC",
"mimeType": "application/vnd.google-apps.folder",
"parents": ["ID_2"],
},
fields="id",
),
mock.call()
.files()
.create(
body={
"name": "DDD",
"mimeType": "application/vnd.google-apps.folder",
"parents": ["ID_3"],
},
fields="id",
),
],
any_order=True,
)
self.assertEqual("ID_4", result_value)
@mock.patch("airflow.contrib.hooks.gdrive_hook.GoogleDriveHook.get_conn")
def test_ensure_folders_exists_when_some_folders_exists(self, mock_get_conn):
mock_get_conn.return_value.files.return_value.list.return_value.execute.side_effect = [
{"files": [{"id": "ID_1"}]},
{"files": [{"id": "ID_2"}]},
{"files": []},
]
mock_get_conn.return_value.files.return_value.create.return_value.execute.side_effect = [
{"id": "ID_3"},
{"id": "ID_4"},
]
result_value = self.gdrive_hook._ensure_folders_exists("AAA/BBB/CCC/DDD")
mock_get_conn.assert_has_calls(
[
mock.call()
.files()
.create(
body={
"name": "CCC",
"mimeType": "application/vnd.google-apps.folder",
"parents": ["ID_2"],
},
fields="id",
),
mock.call()
.files()
.create(
body={
"name": "DDD",
"mimeType": "application/vnd.google-apps.folder",
"parents": ["ID_3"],
},
fields="id",
),
],
any_order=True,
)
self.assertEqual("ID_4", result_value)
@mock.patch("airflow.contrib.hooks.gdrive_hook.GoogleDriveHook.get_conn")
def test_ensure_folders_exists_when_all_folders_exists(self, mock_get_conn):
mock_get_conn.return_value.files.return_value.list.return_value.execute.side_effect = [
{"files": [{"id": "ID_1"}]},
{"files": [{"id": "ID_2"}]},
{"files": [{"id": "ID_3"}]},
{"files": [{"id": "ID_4"}]},
]
result_value = self.gdrive_hook._ensure_folders_exists("AAA/BBB/CCC/DDD")
mock_get_conn.return_value.files.return_value.create.assert_not_called()
self.assertEqual("ID_4", result_value)
@mock.patch("airflow.contrib.hooks.gdrive_hook.MediaFileUpload")
@mock.patch("airflow.contrib.hooks.gdrive_hook.GoogleDriveHook.get_conn")
@mock.patch("airflow.contrib.hooks.gdrive_hook.GoogleDriveHook._ensure_folders_exists")
def test_upload_file_to_root_directory(
self, mock_ensure_folders_exists, mock_get_conn, mock_media_file_upload
):
mock_get_conn.return_value.files.return_value.create.return_value.execute.return_value = {
"id": "FILE_ID"
}
return_value = self.gdrive_hook.upload_file("local_path", "remote_path")
mock_ensure_folders_exists.assert_not_called()
mock_get_conn.assert_has_calls(
[
mock.call()
.files()
.create(
body={"name": "remote_path", "parents": ["root"]},
fields="id",
media_body=mock_media_file_upload.return_value,
)
]
)
self.assertEqual(return_value, "FILE_ID")
@mock.patch("airflow.contrib.hooks.gdrive_hook.MediaFileUpload")
@mock.patch("airflow.contrib.hooks.gdrive_hook.GoogleDriveHook.get_conn")
@mock.patch(
"airflow.contrib.hooks.gdrive_hook.GoogleDriveHook._ensure_folders_exists", return_value="PARENT_ID"
)
def test_upload_file_to_subdirectory(
self, mock_ensure_folders_exists, mock_get_conn, mock_media_file_upload
):
mock_get_conn.return_value.files.return_value.create.return_value.execute.return_value = {
"id": "FILE_ID"
}
return_value = self.gdrive_hook.upload_file("local_path", "AA/BB/CC/remote_path")
mock_ensure_folders_exists.assert_called_once_with("AA/BB/CC")
mock_get_conn.assert_has_calls(
[
mock.call()
.files()
.create(
body={"name": "remote_path", "parents": ["PARENT_ID"]},
fields="id",
media_body=mock_media_file_upload.return_value,
)
]
)
self.assertEqual(return_value, "FILE_ID")

Просмотреть файл

@ -0,0 +1,149 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from unittest import mock
from airflow import AirflowException
from airflow.contrib.operators.gcs_to_gdrive_operator import GcsToGDriveOperator
MODULE = "airflow.contrib.operators.gcs_to_gdrive_operator"
class TestGcsToGDriveOperator(unittest.TestCase):
@mock.patch(MODULE + ".GoogleCloudStorageHook")
@mock.patch(MODULE + ".GoogleDriveHook")
@mock.patch(MODULE + ".tempfile.NamedTemporaryFile")
def test_should_copy_single_file(self, mock_named_temporary_file, mock_gdrive, mock_gcs_hook):
type(mock_named_temporary_file.return_value.__enter__.return_value).name = mock.PropertyMock(
side_effect=["TMP1"]
)
task = GcsToGDriveOperator(
task_id="copy_single_file",
source_bucket="data",
source_object="sales/sales-2017/january.avro",
destination_object="copied_sales/2017/january-backup.avro",
)
task.execute(mock.MagicMock())
mock_gcs_hook.assert_has_calls(
[
mock.call(delegate_to=None, google_cloud_storage_conn_id="google_cloud_default"),
mock.call().download(
bucket_name="data", filename="TMP1", object_name="sales/sales-2017/january.avro"
),
]
)
mock_gdrive.assert_has_calls(
[
mock.call(delegate_to=None, gcp_conn_id="google_cloud_default"),
mock.call().upload_file(
local_location="TMP1", remote_location="copied_sales/2017/january-backup.avro"
),
]
)
#
@mock.patch(MODULE + ".GoogleCloudStorageHook")
@mock.patch(MODULE + ".GoogleDriveHook")
@mock.patch(MODULE + ".tempfile.NamedTemporaryFile")
def test_should_copy_files(self, mock_named_temporary_file, mock_gdrive, mock_gcs_hook):
mock_gcs_hook.return_value.list.return_value = ["sales/A.avro", "sales/B.avro", "sales/C.avro"]
type(mock_named_temporary_file.return_value.__enter__.return_value).name = mock.PropertyMock(
side_effect=["TMP1", "TMP2", "TMP3"]
)
task = GcsToGDriveOperator(
task_id="copy_files",
source_bucket="data",
source_object="sales/sales-2017/*.avro",
destination_object="copied_sales/2017/",
)
task.execute(mock.MagicMock())
mock_gcs_hook.assert_has_calls(
[
mock.call(delegate_to=None, google_cloud_storage_conn_id="google_cloud_default"),
mock.call().list("data", delimiter=".avro", prefix="sales/sales-2017/"),
mock.call().download(bucket_name="data", filename="TMP1", object_name="sales/A.avro"),
mock.call().download(bucket_name="data", filename="TMP2", object_name="sales/B.avro"),
mock.call().download(bucket_name="data", filename="TMP3", object_name="sales/C.avro"),
]
)
mock_gdrive.assert_has_calls(
[
mock.call(delegate_to=None, gcp_conn_id="google_cloud_default"),
mock.call().upload_file(local_location="TMP1", remote_location="sales/A.avro"),
mock.call().upload_file(local_location="TMP2", remote_location="sales/B.avro"),
mock.call().upload_file(local_location="TMP3", remote_location="sales/C.avro"),
]
)
@mock.patch(MODULE + ".GoogleCloudStorageHook")
@mock.patch(MODULE + ".GoogleDriveHook")
@mock.patch(MODULE + ".tempfile.NamedTemporaryFile")
def test_should_move_files(self, mock_named_temporary_file, mock_gdrive, mock_gcs_hook):
type(mock_named_temporary_file.return_value.__enter__.return_value).name = mock.PropertyMock(
side_effect=["TMP1", "TMP2", "TMP3"]
)
mock_gcs_hook.return_value.list.return_value = ["sales/A.avro", "sales/B.avro", "sales/C.avro"]
task = GcsToGDriveOperator(
task_id="move_files",
source_bucket="data",
source_object="sales/sales-2017/*.avro",
move_object=True,
)
task.execute(mock.MagicMock())
mock_gcs_hook.assert_has_calls(
[
mock.call(delegate_to=None, google_cloud_storage_conn_id="google_cloud_default"),
mock.call().list("data", delimiter=".avro", prefix="sales/sales-2017/"),
mock.call().download(bucket_name="data", filename="TMP1", object_name="sales/A.avro"),
mock.call().delete("data", "sales/A.avro"),
mock.call().download(bucket_name="data", filename="TMP2", object_name="sales/B.avro"),
mock.call().delete("data", "sales/B.avro"),
mock.call().download(bucket_name="data", filename="TMP3", object_name="sales/C.avro"),
mock.call().delete("data", "sales/C.avro"),
]
)
mock_gdrive.assert_has_calls(
[
mock.call(delegate_to=None, gcp_conn_id="google_cloud_default"),
mock.call().upload_file(local_location="TMP1", remote_location="sales/A.avro"),
mock.call().upload_file(local_location="TMP2", remote_location="sales/B.avro"),
mock.call().upload_file(local_location="TMP3", remote_location="sales/C.avro"),
]
)
@mock.patch(MODULE + ".GoogleCloudStorageHook")
@mock.patch(MODULE + ".GoogleDriveHook")
@mock.patch(MODULE + ".tempfile.NamedTemporaryFile")
def test_should_raise_exception_on_multiple_wildcard(
self, mock_named_temporary_file, mock_gdrive, mock_gcs_hook
):
task = GcsToGDriveOperator(
task_id="move_files", source_bucket="data", source_object="sales/*/*.avro", move_object=True
)
with self.assertRaisesRegex(AirflowException, "Only one wildcard"):
task.execute(mock.MagicMock())

Просмотреть файл

@ -16,9 +16,11 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
from unittest import mock
from airflow.models import Connection
GCP_PROJECT_ID_HOOK_UNIT_TEST = 'example-project'
@ -40,6 +42,17 @@ def mock_base_gcp_hook_no_default_project_id(self, gcp_conn_id, delegate_to=None
self._conn = None
GCP_CONNECTION_WITH_PROJECT_ID = Connection(
extra=json.dumps({
'extra__google_cloud_platform__project': GCP_PROJECT_ID_HOOK_UNIT_TEST
})
)
GCP_CONNECTION_WITHOUT_PROJECT_ID = Connection(
extra=json.dumps({})
)
def get_open_mock():
mck = mock.mock_open()
open_module = 'builtins'