Add amazon glacier to GCS transfer operator (#10947)

Add Amazon Glacier to GCS transfer operator, Glacier job operator and sensor.
This commit is contained in:
Michał Słowikowski 2020-09-30 14:59:26 +02:00 коммит произвёл GitHub
Родитель 9860719c72
Коммит 00ffedb8c4
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
16 изменённых файлов: 922 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,72 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
from airflow import models
from airflow.providers.amazon.aws.operators.glacier import (
GlacierCreateJobOperator,
)
from airflow.providers.amazon.aws.sensors.glacier import GlacierJobOperationSensor
from airflow.providers.amazon.aws.transfers.glacier_to_gcs import GlacierToGCSOperator
from airflow.utils.dates import days_ago
VAULT_NAME = "airflow"
BUCKET_NAME = os.environ.get("GLACIER_GCS_BUCKET_NAME", "gs://glacier_bucket")
OBJECT_NAME = os.environ.get("GLACIER_OBJECT", "example-text.txt")
with models.DAG(
"example_glacier_to_gcs",
schedule_interval=None,
start_date=days_ago(1), # Override to match your needs
) as dag:
# [START howto_glacier_create_job_operator]
create_glacier_job = GlacierCreateJobOperator(
task_id="create_glacier_job",
aws_conn_id="aws_default",
vault_name=VAULT_NAME,
)
JOB_ID = '{{ task_instance.xcom_pull("create_glacier_job")["jobId"] }}'
# [END howto_glacier_create_job_operator]
# [START howto_glacier_job_operation_sensor]
wait_for_operation_complete = GlacierJobOperationSensor(
aws_conn_id="aws_default",
vault_name=VAULT_NAME,
job_id=JOB_ID,
task_id="wait_for_operation_complete",
)
# [END howto_glacier_job_operation_sensor]
# [START howto_glacier_transfer_data_to_gcs]
transfer_archive_to_gcs = GlacierToGCSOperator(
task_id="transfer_archive_to_gcs",
aws_conn_id="aws_default",
gcp_conn_id="google_cloud_default",
vault_name=VAULT_NAME,
bucket_name=BUCKET_NAME,
object_name=OBJECT_NAME,
gzip=False,
# Override to match your needs
# If chunk size is bigger than actual file size
# then whole file will be downloaded
chunk_size=1024,
delegate_to=None,
google_impersonation_chain=None,
)
# [END howto_glacier_transfer_data_to_gcs]
create_glacier_job >> wait_for_operation_complete >> transfer_archive_to_gcs

Просмотреть файл

@ -0,0 +1,73 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Any, Dict
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
class GlacierHook(AwsBaseHook):
"""
Hook for connection with Amazon Glacier
"""
def __init__(self, aws_conn_id: str = "aws_default") -> None:
super().__init__(client_type="glacier")
self.aws_conn_id = aws_conn_id
def retrieve_inventory(self, vault_name: str) -> Dict[str, Any]:
"""
Initiate an Amazon Glacier inventory-retrieval job
:param vault_name: the Glacier vault on which job is executed
:type vault_name: str
"""
job_params = {'Type': 'inventory-retrieval'}
self.log.info("Retrieving inventory for vault: %s", vault_name)
response = self.get_conn().initiate_job(vaultName=vault_name, jobParameters=job_params)
self.log.info("Initiated inventory-retrieval job for: %s", vault_name)
self.log.info("Retrieval Job ID: %s", response["jobId"])
return response
def retrieve_inventory_results(self, vault_name: str, job_id: str) -> Dict[str, Any]:
"""
Retrieve the results of an Amazon Glacier inventory-retrieval job
:param vault_name: the Glacier vault on which job is executed
:type vault_name: string
:param job_id: the job ID was returned by retrieve_inventory()
:type job_id: str
"""
self.log.info("Retrieving the job results for vault: %s...", vault_name)
response = self.get_conn().get_job_output(vaultName=vault_name, jobId=job_id)
return response
def describe_job(self, vault_name: str, job_id: str) -> Dict[str, Any]:
"""
Retrieve the status of an Amazon S3 Glacier job, such as an
inventory-retrieval job
:param vault_name: the Glacier vault on which job is executed
:type vault_name: string
:param job_id: the job ID was returned by retrieve_inventory()
:type job_id: str
"""
self.log.info("Retrieving status for vault: %s and job %s", vault_name, job_id)
response = self.get_conn().describe_job(vaultName=vault_name, jobId=job_id)
self.log.info("Job status: %s, code status: %s", response['Action'], response['StatusCode'])
return response

Просмотреть файл

@ -0,0 +1,54 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.glacier import GlacierHook
from airflow.utils.decorators import apply_defaults
class GlacierCreateJobOperator(BaseOperator):
"""
Initiate an Amazon Glacier inventory-retrieval job
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GlacierCreateJobOperator`
:param aws_conn_id: The reference to the AWS connection details
:type aws_conn_id: str
:param vault_name: the Glacier vault on which job is executed
:type vault_name: str
"""
template_fields = ("vault_name",)
@apply_defaults
def __init__(
self,
*,
aws_conn_id="aws_default",
vault_name: str,
**kwargs,
):
super().__init__(**kwargs)
self.aws_conn_id = aws_conn_id
self.vault_name = vault_name
def execute(self, context):
hook = GlacierHook(aws_conn_id=self.aws_conn_id)
response = hook.retrieve_inventory(vault_name=self.vault_name)
return response

Просмотреть файл

@ -0,0 +1,99 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from enum import Enum
from typing import Any
from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.glacier import GlacierHook
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
class JobStatus(Enum):
"""
Glacier jobs description
"""
IN_PROGRESS = "InProgress"
SUCCEEDED = "Succeeded"
class GlacierJobOperationSensor(BaseSensorOperator):
"""
Glacier sensor for checking job state. This operator runs only in reschedule mode.
:param aws_conn_id: The reference to the AWS connection details
:type aws_conn_id: str
:param vault_name: name of Glacier vault on which job is executed
:type vault_name: str
:param job_id: the job ID was returned by retrieve_inventory()
:type job_id: str
:param poke_interval: Time in seconds that the job should wait in
between each tries
:type poke_interval: float
:param mode: How the sensor operates.
Options are: ``{ poke | reschedule }``, default is ``poke``.
When set to ``poke`` the sensor is taking up a worker slot for its
whole execution time and sleeps between pokes. Use this mode if the
expected runtime of the sensor is short or if a short poke interval
is required. Note that the sensor will hold onto a worker slot and
a pool slot for the duration of the sensor's runtime in this mode.
When set to ``reschedule`` the sensor task frees the worker slot when
the criteria is not yet met and it's rescheduled at a later time. Use
this mode if the time before the criteria is met is expected to be
quite long. The poke interval should be more than one minute to
prevent too much load on the scheduler.
:type mode: str
"""
template_fields = ["vault_name", "job_id"]
@apply_defaults
def __init__(
self,
*,
aws_conn_id: str = 'aws_default',
vault_name: str,
job_id: str,
poke_interval: int = 60 * 20,
mode: str = "reschedule",
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
self.aws_conn_id = aws_conn_id
self.vault_name = vault_name
self.job_id = job_id
self.poke_interval = poke_interval
self.mode = mode
def poke(self, context) -> bool:
hook = GlacierHook(aws_conn_id=self.aws_conn_id)
response = hook.describe_job(vault_name=self.vault_name, job_id=self.job_id)
if response["StatusCode"] == JobStatus.SUCCEEDED.value:
self.log.info("Job status: %s, code status: %s", response["Action"], response["StatusCode"])
self.log.info("Job finished successfully")
return True
elif response["StatusCode"] == JobStatus.IN_PROGRESS.value:
self.log.info("Processing...")
self.log.warning("Code status: %s", response["StatusCode"])
return False
else:
raise AirflowException(
f'Sensor failed. Job status: {response["Action"]}, code status: {response["StatusCode"]}'
)

Просмотреть файл

@ -0,0 +1,122 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import tempfile
from typing import Optional, Union, Sequence
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.glacier import GlacierHook
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.utils.decorators import apply_defaults
class GlacierToGCSOperator(BaseOperator):
"""
Transfers data from Amazon Glacier to Google Cloud Storage
.. note::
Please be warn that GlacierToGCSOperator may depends on memory usage.
Transferring big files may not working well.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GlacierToGCSOperator`
:param aws_conn_id: The reference to the AWS connection details
:type aws_conn_id: str
:param gcp_conn_id: The reference to the GCP connection details
:type gcp_conn_id: str
:param vault_name: the Glacier vault on which job is executed
:type vault_name: string
:param bucket_name: the Google Cloud Storage bucket where the data will be transferred
:type bucket_name: str
:param object_name: the name of the object to check in the Google cloud
storage bucket.
:type object_name: str
:param gzip: option to compress local file or file data for upload
:type gzip: bool
:param chunk_size: size of chunk in bytes the that will downloaded from Glacier vault
:type chunk_size: int
:param delegate_to: The account to impersonate using domain-wide delegation of authority,
if any. For this to work, the service account making the request must have
domain-wide delegation enabled.
:type delegate_to: str
:param google_impersonation_chain: Optional Google service account to impersonate using
short-term credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
:type impersonation_chain: Union[str, Sequence[str]]
"""
template_fields = ("vault_name", "bucket_name", "object_name")
@apply_defaults
def __init__(
self,
*,
aws_conn_id="aws_default",
gcp_conn_id="google_cloud_default",
vault_name: str,
bucket_name: str,
object_name: str,
gzip: bool,
chunk_size=1024,
delegate_to=None,
google_impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
):
super().__init__(**kwargs)
self.aws_conn_id = aws_conn_id
self.gcp_conn_id = gcp_conn_id
self.vault_name = vault_name
self.bucket_name = bucket_name
self.object_name = object_name
self.gzip = gzip
self.chunk_size = chunk_size
self.delegate_to = delegate_to
self.impersonation_chain = google_impersonation_chain
def execute(self, context):
glacier_hook = GlacierHook(aws_conn_id=self.aws_conn_id)
gcs_hook = GCSHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
job_id = glacier_hook.retrieve_inventory(vault_name=self.vault_name)
with tempfile.NamedTemporaryFile() as temp_file:
glacier_data = glacier_hook.retrieve_inventory_results(
vault_name=self.vault_name, job_id=job_id["jobId"]
)
# Read the file content in chunks using StreamingBody
# https://botocore.amazonaws.com/v1/documentation/api/latest/reference/response.html
stream = glacier_data["body"]
for chunk in stream.iter_chunk(chunk_size=self.chunk_size):
temp_file.write(chunk)
temp_file.flush()
gcs_hook.upload(
bucket_name=self.bucket_name,
object_name=self.object_name,
filename=temp_file.name,
gzip=self.gzip,
)
return f"gs://{self.bucket_name}/{self.object_name}"

Просмотреть файл

@ -0,0 +1,72 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Amazon Glacier Operator
=======================
Amazon Glacier is a secure, durable, and extremely low-cost Amazon S3 cloud storage classes for data archiving and long-term backup.
For more information about the service visit `Amazon Glacier API documentation <https://docs.aws.amazon.com/code-samples/latest/catalog/code-catalog-python-example_code-glacier.html>`_
.. _howto/operator:GlacierCreateJobOperator:
GlacierCreateJobOperator
^^^^^^^^^^^^^^^^^^^^^^^^
Operator task is to initiate an Amazon Glacier inventory-retrieval job.
The operation returns dictionary of information related to the initiated job like *jobId* what is required for subsequent tasks.
To get more information about operator visit:
:class:`~airflow.providers.amazon.aws.transfers.glacier_to_gcs.GlacierCreateJobOperator`
Example usage:
.. exampleinclude:: /../airflow/providers/amazon/aws/example_dags/example_glacier_to_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_glacier_create_job_operator]
:end-before: [END howto_glacier_create_job_operator]
.. _howto/operator:GlacierJobOperationSensor:
GlacierJobOperationSensor
^^^^^^^^^^^^^^^^^^^^^^^^^
Operator task is to wait until task *create_glacier_job* will be completed.
When sensor returns *true* then subsequent tasks can be executed.
In this case subsequent tasks are: *GlacierDownloadArchive* and *GlacierTransferDataToGCS*.
Job states:
* *Succeeded* – job is finished and for example archives from the vault can be downloaded
* *InProgress* – job is in progress and you have to wait until it's done (*Succeeded*)
GlacierJobOperationSensor checks the job status.
If response status code is *succeeded* then sensor returns *true* and subsequent tasks will be executed.
If response code is *InProgress* then sensor returns *false* and reschedule task with *poke_interval=60 * 20*.
Which means that every next request will be sent every 20 minutes.
To get more information about operator visit:
:class:`~airflow.providers.amazon.aws.sensors.glacier.GlacierJobOperationSensor`
Example usage:
.. exampleinclude:: /../airflow/providers/amazon/aws/example_dags/example_glacier_to_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_glacier_transfer_data_to_gcs]
:end-before: [END howto_glacier_transfer_data_to_gcs]

Просмотреть файл

@ -26,6 +26,7 @@ Amazon AWS Operators
:glob:
*
transfer/index
.. note::
You can learn how to use Amazon AWS integrations by analyzing the

Просмотреть файл

@ -0,0 +1,45 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Amazon Glacier Transfer Operator
================================
Amazon Glacier is a secure, durable, and extremely low-cost Amazon S3 cloud storage classes for data archiving and long-term backup.
For more information about the service visit `Amazon Glacier API documentation <https://docs.aws.amazon.com/code-samples/latest/catalog/code-catalog-python-example_code-glacier.html>`_
.. _howto/operator:GlacierToGCSOperator:
GlacierToGCSOperator
^^^^^^^^^^^^^^^^^^^^
Operator task is transfer data from Glacier vault to Google Cloud Storage.
.. note::
Please be warn that GlacierToGCSOperator may depends on memory usage.
Transferring big files may not working well.
To get more information about operator visit:
:class:`~airflow.providers.amazon.aws.transfers.glacier_to_gcs.GlacierToGCSOperator`
Example usage:
.. exampleinclude:: /../airflow/providers/amazon/aws/example_dags/example_glacier_to_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_glacier_transfer_data_to_gcs]
:end-before: [END howto_glacier_transfer_data_to_gcs]

Просмотреть файл

@ -0,0 +1,28 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Amazon Transfer Operators
=========================
.. toctree::
:maxdepth: 1
:glob:
*

Просмотреть файл

@ -423,6 +423,12 @@ These integrations allow you to perform various operations within the Amazon Web
- :mod:`airflow.providers.amazon.aws.operators.datasync`
-
* - `Amazon Glacier <https://aws.amazon.com/glacier/>`__
- :doc:`How to use <howto/operator/amazon/aws/glacier>`
- :mod:`airflow.providers.amazon.aws.hooks.glacier`
- :mod:`airflow.providers.amazon.aws.sensors.glacier`
- :mod:`airflow.providers.amazon.aws.operators.glacier`
* - `AWS Glue Catalog <https://aws.amazon.com/glue/>`__
-
- :mod:`airflow.providers.amazon.aws.hooks.glue_catalog`
@ -939,6 +945,11 @@ These integrations allow you to copy data from/to Google Cloud.
- :doc:`How to use <howto/operator/google/transfer/s3_to_gcs>`
- :mod:`airflow.providers.google.cloud.transfers.s3_to_gcs`
* - `Amazon Glacier <https://aws.amazon.com/glacier/>`__
- `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
- :doc:`How to use <howto/operator/amazon/aws/transfer/glacier_to_gcs>`
- :mod:`airflow.providers.amazon.aws.transfers.glacier_to_gcs`,
* - `Apache Cassandra <http://cassandra.apache.org/>`__
- `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
-

Просмотреть файл

@ -473,6 +473,7 @@ devel = [
'qds-sdk>=1.9.6',
'requests_mock',
'setuptools',
'testfixtures',
'wheel',
'yamllint',
]

Просмотреть файл

@ -0,0 +1,128 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
import mock
from testfixtures import LogCapture
from airflow.providers.amazon.aws.hooks.glacier import GlacierHook
CREDENTIALS = "aws_conn"
VAULT_NAME = "airflow"
JOB_ID = "1234abcd"
REQUEST_RESULT = {"jobId": "1234abcd"}
RESPONSE_BODY = {"body": "data"}
JOB_STATUS = {"Action": "", "StatusCode": "Succeeded"}
class TestAmazonGlacierHook(unittest.TestCase):
def setUp(self):
with mock.patch("airflow.providers.amazon.aws.hooks.glacier.GlacierHook.__init__", return_value=None):
self.hook = GlacierHook(aws_conn_id="aws_default")
@mock.patch("airflow.providers.amazon.aws.hooks.glacier.GlacierHook.get_conn")
def test_retrieve_inventory_should_return_job_id(self, mock_conn):
# Given
job_id = {"jobId": "1234abcd"}
# when
mock_conn.return_value.initiate_job.return_value = job_id
result = self.hook.retrieve_inventory(VAULT_NAME)
# then
mock_conn.assert_called_once_with()
self.assertEqual(job_id, result)
@mock.patch("airflow.providers.amazon.aws.hooks.glacier.GlacierHook.get_conn")
def test_retrieve_inventory_should_log_mgs(self, mock_conn):
# given
job_id = {"jobId": "1234abcd"}
# when
with LogCapture() as log:
mock_conn.return_value.initiate_job.return_value = job_id
self.hook.retrieve_inventory(VAULT_NAME)
# then
log.check(
(
'airflow.providers.amazon.aws.hooks.glacier.GlacierHook',
'INFO',
f"Retrieving inventory for vault: {VAULT_NAME}",
),
(
'airflow.providers.amazon.aws.hooks.glacier.GlacierHook',
'INFO',
f"Initiated inventory-retrieval job for: {VAULT_NAME}",
),
(
'airflow.providers.amazon.aws.hooks.glacier.GlacierHook',
'INFO',
f"Retrieval Job ID: {job_id.get('jobId')}",
),
)
@mock.patch("airflow.providers.amazon.aws.hooks.glacier.GlacierHook.get_conn")
def test_retrieve_inventory_results_should_return_response(self, mock_conn):
# when
mock_conn.return_value.get_job_output.return_value = RESPONSE_BODY
response = self.hook.retrieve_inventory_results(VAULT_NAME, JOB_ID)
# then
mock_conn.assert_called_once_with()
self.assertEqual(response, RESPONSE_BODY)
@mock.patch("airflow.providers.amazon.aws.hooks.glacier.GlacierHook.get_conn")
def test_retrieve_inventory_results_should_log_mgs(self, mock_conn):
# when
with LogCapture() as log:
mock_conn.return_value.get_job_output.return_value = REQUEST_RESULT
self.hook.retrieve_inventory_results(VAULT_NAME, JOB_ID)
# then
log.check(
(
'airflow.providers.amazon.aws.hooks.glacier.GlacierHook',
'INFO',
f"Retrieving the job results for vault: {VAULT_NAME}...",
),
)
@mock.patch("airflow.providers.amazon.aws.hooks.glacier.GlacierHook.get_conn")
def test_describe_job_should_return_status_succeeded(self, mock_conn):
# when
mock_conn.return_value.describe_job.return_value = JOB_STATUS
response = self.hook.describe_job(VAULT_NAME, JOB_ID)
# then
mock_conn.assert_called_once_with()
self.assertEqual(response, JOB_STATUS)
@mock.patch("airflow.providers.amazon.aws.hooks.glacier.GlacierHook.get_conn")
def test_describe_job_should_log_mgs(self, mock_conn):
# when
with LogCapture() as log:
mock_conn.return_value.describe_job.return_value = JOB_STATUS
self.hook.describe_job(VAULT_NAME, JOB_ID)
# then
log.check(
(
'airflow.providers.amazon.aws.hooks.glacier.GlacierHook',
'INFO',
f"Retrieving status for vault: {VAULT_NAME} and job {JOB_ID}",
),
(
'airflow.providers.amazon.aws.hooks.glacier.GlacierHook',
'INFO',
f"Job status: {JOB_STATUS.get('Action')}, code status: {JOB_STATUS.get('StatusCode')}",
),
)

Просмотреть файл

@ -0,0 +1,43 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from unittest import TestCase
import mock
from airflow.providers.amazon.aws.operators.glacier import (
GlacierCreateJobOperator,
)
AWS_CONN_ID = "aws_default"
BUCKET_NAME = "airflow_bucket"
FILENAME = "path/to/file/"
GCP_CONN_ID = "google_cloud_default"
JOB_ID = "1a2b3c4d"
OBJECT_NAME = "file.csv"
TASK_ID = "glacier_job"
VAULT_NAME = "airflow"
class TestGlacierCreateJobOperator(TestCase):
@mock.patch("airflow.providers.amazon.aws.operators.glacier.GlacierHook")
def test_execute(self, hook_mock):
op = GlacierCreateJobOperator(aws_conn_id=AWS_CONN_ID, vault_name=VAULT_NAME, task_id=TASK_ID)
op.execute(mock.MagicMock())
hook_mock.assert_called_once_with(aws_conn_id=AWS_CONN_ID)
hook_mock.return_value.retrieve_inventory.assert_called_once_with(vault_name=VAULT_NAME)

Просмотреть файл

@ -0,0 +1,38 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from tests.test_utils.amazon_system_helpers import AWS_DAG_FOLDER, AmazonSystemTest
from tests.test_utils.gcp_system_helpers import GoogleSystemTest
BUCKET = "data_from_glacier"
class GlacierSystemTest(AmazonSystemTest):
"""
System test for AWS Glacier operators
"""
def setUp(self):
super().setUp()
GoogleSystemTest.create_gcs_bucket(BUCKET)
def tearDown(self):
GoogleSystemTest.delete_gcs_bucket(BUCKET) # pylint: disable=no-member
def test_run_example_dag(self):
self.run_dag(dag_id="example_glacier_to_gcs", dag_folder=AWS_DAG_FOLDER)

Просмотреть файл

@ -0,0 +1,69 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
import mock
from airflow import AirflowException
from airflow.providers.amazon.aws.sensors.glacier import GlacierJobOperationSensor, JobStatus
SUCCEEDED = "Succeeded"
IN_PROGRESS = "InProgress"
class TestAmazonGlacierSensor(unittest.TestCase):
def setUp(self):
self.op = GlacierJobOperationSensor(
task_id='test_athena_sensor',
aws_conn_id='aws_default',
vault_name="airflow",
job_id="1a2b3c4d",
poke_interval=60 * 20,
)
@mock.patch(
"airflow.providers.amazon.aws.sensors.glacier.GlacierHook.describe_job",
side_effect=[{"Action": "", "StatusCode": JobStatus.SUCCEEDED.value}],
)
def test_poke_succeeded(self, _):
self.assertTrue(self.op.poke(None))
@mock.patch(
"airflow.providers.amazon.aws.sensors.glacier.GlacierHook.describe_job",
side_effect=[{"Action": "", "StatusCode": JobStatus.IN_PROGRESS.value}],
)
def test_poke_in_progress(self, _):
self.assertFalse(self.op.poke(None))
@mock.patch(
"airflow.providers.amazon.aws.sensors.glacier.GlacierHook.describe_job",
side_effect=[{"Action": "", "StatusCode": ""}],
)
def test_poke_fail(self, _):
with self.assertRaises(AirflowException) as context:
self.op.poke(None)
self.assertIn('Sensor failed', str(context.exception))
class TestSensorJobDescription(unittest.TestCase):
def test_job_status_success(self):
self.assertEqual(JobStatus.SUCCEEDED.value, SUCCEEDED)
def test_job_status_in_progress(self):
self.assertEqual(JobStatus.IN_PROGRESS.value, IN_PROGRESS)

Просмотреть файл

@ -0,0 +1,66 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from unittest import TestCase
import mock
from airflow.providers.amazon.aws.transfers.glacier_to_gcs import GlacierToGCSOperator
AWS_CONN_ID = "aws_default"
BUCKET_NAME = "airflow_bucket"
FILENAME = "path/to/file/"
GCP_CONN_ID = "google_cloud_default"
JOB_ID = "1a2b3c4d"
OBJECT_NAME = "file.csv"
TASK_ID = "glacier_job"
VAULT_NAME = "airflow"
class TestGlacierToGCSOperator(TestCase):
@mock.patch("airflow.providers.amazon.aws.transfers.glacier_to_gcs.GlacierHook")
@mock.patch("airflow.providers.amazon.aws.transfers.glacier_to_gcs.GCSHook")
@mock.patch("airflow.providers.amazon.aws.transfers.glacier_to_gcs.tempfile")
def test_execute(self, mock_temp, hook_gcs_mock, hook_aws_mock):
op = GlacierToGCSOperator(
aws_conn_id=AWS_CONN_ID,
vault_name=VAULT_NAME,
gcp_conn_id=GCP_CONN_ID,
delegate_to=None,
google_impersonation_chain=None,
bucket_name=BUCKET_NAME,
object_name=OBJECT_NAME,
gzip=False,
task_id=TASK_ID,
)
op.execute(context=None)
hook_aws_mock.assert_called_once_with(aws_conn_id=AWS_CONN_ID)
hook_aws_mock.return_value.retrieve_inventory.assert_called_once_with(vault_name=VAULT_NAME)
hook_aws_mock.return_value.retrieve_inventory_results.assert_called_once_with(
vault_name=VAULT_NAME, job_id=hook_aws_mock.return_value.retrieve_inventory.return_value[JOB_ID]
)
hook_gcs_mock.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID, delegate_to=None, impersonation_chain=None
)
hook_gcs_mock.return_value.upload.assert_called_once_with(
bucket_name=BUCKET_NAME,
object_name=OBJECT_NAME,
gzip=False,
filename=mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name,
)