Support google-cloud-tasks>=2.0.0 (#13334)

This commit is contained in:
Kamil Breguła 2020-12-28 11:29:28 +01:00 коммит произвёл GitHub
Родитель 09c6549200
Коммит 1f712219fa
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
6 изменённых файлов: 175 добавлений и 136 удалений

Просмотреть файл

@ -33,6 +33,7 @@ Details are covered in the UPDATING.md files for each library, but there are som
| [``google-cloud-os-login``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-oslogin/blob/master/UPGRADING.md) | | [``google-cloud-os-login``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-oslogin/blob/master/UPGRADING.md) |
| [``google-cloud-pubsub``](https://pypi.org/project/google-cloud-pubsub/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-pubsub/blob/master/UPGRADING.md) | | [``google-cloud-pubsub``](https://pypi.org/project/google-cloud-pubsub/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-pubsub/blob/master/UPGRADING.md) |
| [``google-cloud-kms``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.2.1,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-kms/blob/master/UPGRADING.md) | | [``google-cloud-kms``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.2.1,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-kms/blob/master/UPGRADING.md) |
| [``google-cloud-tasks``](https://pypi.org/project/google-cloud-tasks/) | ``>=1.2.1,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-tasks/blob/master/UPGRADING.md) |
### The field names use the snake_case convention ### The field names use the snake_case convention

Просмотреть файл

@ -21,11 +21,13 @@ This module contains a CloudTasksHook
which allows you to connect to Google Cloud Tasks service, which allows you to connect to Google Cloud Tasks service,
performing actions to queues or tasks. performing actions to queues or tasks.
""" """
from typing import Dict, List, Optional, Sequence, Tuple, Union from typing import Dict, List, Optional, Sequence, Tuple, Union
from google.api_core.retry import Retry from google.api_core.retry import Retry
from google.cloud.tasks_v2 import CloudTasksClient, enums from google.cloud.tasks_v2 import CloudTasksClient
from google.cloud.tasks_v2.types import FieldMask, Queue, Task from google.cloud.tasks_v2.types import Queue, Task
from google.protobuf.field_mask_pb2 import FieldMask
from airflow.exceptions import AirflowException from airflow.exceptions import AirflowException
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
@ -120,20 +122,19 @@ class CloudTasksHook(GoogleBaseHook):
client = self.get_conn() client = self.get_conn()
if queue_name: if queue_name:
full_queue_name = CloudTasksClient.queue_path(project_id, location, queue_name) full_queue_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}"
if isinstance(task_queue, Queue): if isinstance(task_queue, Queue):
task_queue.name = full_queue_name task_queue.name = full_queue_name
elif isinstance(task_queue, dict): elif isinstance(task_queue, dict):
task_queue['name'] = full_queue_name task_queue['name'] = full_queue_name
else: else:
raise AirflowException('Unable to set queue_name.') raise AirflowException('Unable to set queue_name.')
full_location_path = CloudTasksClient.location_path(project_id, location) full_location_path = f"projects/{project_id}/locations/{location}"
return client.create_queue( return client.create_queue(
parent=full_location_path, request={'parent': full_location_path, 'queue': task_queue},
queue=task_queue,
retry=retry, retry=retry,
timeout=timeout, timeout=timeout,
metadata=metadata, metadata=metadata or (),
) )
@GoogleBaseHook.fallback_to_default_project_id @GoogleBaseHook.fallback_to_default_project_id
@ -167,7 +168,7 @@ class CloudTasksHook(GoogleBaseHook):
:param update_mask: A mast used to specify which fields of the queue are being updated. :param update_mask: A mast used to specify which fields of the queue are being updated.
If empty, then all fields will be updated. If empty, then all fields will be updated.
If a dict is provided, it must be of the same form as the protobuf message. If a dict is provided, it must be of the same form as the protobuf message.
:type update_mask: dict or google.cloud.tasks_v2.types.FieldMask :type update_mask: dict or google.protobuf.field_mask_pb2.FieldMask
:param retry: (Optional) A retry object used to retry requests. :param retry: (Optional) A retry object used to retry requests.
If None is specified, requests will not be retried. If None is specified, requests will not be retried.
:type retry: google.api_core.retry.Retry :type retry: google.api_core.retry.Retry
@ -182,7 +183,7 @@ class CloudTasksHook(GoogleBaseHook):
client = self.get_conn() client = self.get_conn()
if queue_name and location: if queue_name and location:
full_queue_name = CloudTasksClient.queue_path(project_id, location, queue_name) full_queue_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}"
if isinstance(task_queue, Queue): if isinstance(task_queue, Queue):
task_queue.name = full_queue_name task_queue.name = full_queue_name
elif isinstance(task_queue, dict): elif isinstance(task_queue, dict):
@ -190,11 +191,10 @@ class CloudTasksHook(GoogleBaseHook):
else: else:
raise AirflowException('Unable to set queue_name.') raise AirflowException('Unable to set queue_name.')
return client.update_queue( return client.update_queue(
queue=task_queue, request={'queue': task_queue, 'update_mask': update_mask},
update_mask=update_mask,
retry=retry, retry=retry,
timeout=timeout, timeout=timeout,
metadata=metadata, metadata=metadata or (),
) )
@GoogleBaseHook.fallback_to_default_project_id @GoogleBaseHook.fallback_to_default_project_id
@ -230,8 +230,10 @@ class CloudTasksHook(GoogleBaseHook):
""" """
client = self.get_conn() client = self.get_conn()
full_queue_name = CloudTasksClient.queue_path(project_id, location, queue_name) full_queue_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}"
return client.get_queue(name=full_queue_name, retry=retry, timeout=timeout, metadata=metadata) return client.get_queue(
request={'name': full_queue_name}, retry=retry, timeout=timeout, metadata=metadata or ()
)
@GoogleBaseHook.fallback_to_default_project_id @GoogleBaseHook.fallback_to_default_project_id
def list_queues( def list_queues(
@ -270,14 +272,12 @@ class CloudTasksHook(GoogleBaseHook):
""" """
client = self.get_conn() client = self.get_conn()
full_location_path = CloudTasksClient.location_path(project_id, location) full_location_path = f"projects/{project_id}/locations/{location}"
queues = client.list_queues( queues = client.list_queues(
parent=full_location_path, request={'parent': full_location_path, 'filter': results_filter, 'page_size': page_size},
filter_=results_filter,
page_size=page_size,
retry=retry, retry=retry,
timeout=timeout, timeout=timeout,
metadata=metadata, metadata=metadata or (),
) )
return list(queues) return list(queues)
@ -313,8 +313,10 @@ class CloudTasksHook(GoogleBaseHook):
""" """
client = self.get_conn() client = self.get_conn()
full_queue_name = CloudTasksClient.queue_path(project_id, location, queue_name) full_queue_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}"
client.delete_queue(name=full_queue_name, retry=retry, timeout=timeout, metadata=metadata) client.delete_queue(
request={'name': full_queue_name}, retry=retry, timeout=timeout, metadata=metadata or ()
)
@GoogleBaseHook.fallback_to_default_project_id @GoogleBaseHook.fallback_to_default_project_id
def purge_queue( def purge_queue(
@ -349,8 +351,10 @@ class CloudTasksHook(GoogleBaseHook):
""" """
client = self.get_conn() client = self.get_conn()
full_queue_name = CloudTasksClient.queue_path(project_id, location, queue_name) full_queue_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}"
return client.purge_queue(name=full_queue_name, retry=retry, timeout=timeout, metadata=metadata) return client.purge_queue(
request={'name': full_queue_name}, retry=retry, timeout=timeout, metadata=metadata or ()
)
@GoogleBaseHook.fallback_to_default_project_id @GoogleBaseHook.fallback_to_default_project_id
def pause_queue( def pause_queue(
@ -385,8 +389,10 @@ class CloudTasksHook(GoogleBaseHook):
""" """
client = self.get_conn() client = self.get_conn()
full_queue_name = CloudTasksClient.queue_path(project_id, location, queue_name) full_queue_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}"
return client.pause_queue(name=full_queue_name, retry=retry, timeout=timeout, metadata=metadata) return client.pause_queue(
request={'name': full_queue_name}, retry=retry, timeout=timeout, metadata=metadata or ()
)
@GoogleBaseHook.fallback_to_default_project_id @GoogleBaseHook.fallback_to_default_project_id
def resume_queue( def resume_queue(
@ -421,8 +427,10 @@ class CloudTasksHook(GoogleBaseHook):
""" """
client = self.get_conn() client = self.get_conn()
full_queue_name = CloudTasksClient.queue_path(project_id, location, queue_name) full_queue_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}"
return client.resume_queue(name=full_queue_name, retry=retry, timeout=timeout, metadata=metadata) return client.resume_queue(
request={'name': full_queue_name}, retry=retry, timeout=timeout, metadata=metadata or ()
)
@GoogleBaseHook.fallback_to_default_project_id @GoogleBaseHook.fallback_to_default_project_id
def create_task( def create_task(
@ -432,7 +440,7 @@ class CloudTasksHook(GoogleBaseHook):
task: Union[Dict, Task], task: Union[Dict, Task],
project_id: str, project_id: str,
task_name: Optional[str] = None, task_name: Optional[str] = None,
response_view: Optional[enums.Task.View] = None, response_view: Optional = None,
retry: Optional[Retry] = None, retry: Optional[Retry] = None,
timeout: Optional[float] = None, timeout: Optional[float] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = None, metadata: Optional[Sequence[Tuple[str, str]]] = None,
@ -455,7 +463,7 @@ class CloudTasksHook(GoogleBaseHook):
:type task_name: str :type task_name: str
:param response_view: (Optional) This field specifies which subset of the Task will :param response_view: (Optional) This field specifies which subset of the Task will
be returned. be returned.
:type response_view: google.cloud.tasks_v2.enums.Task.View :type response_view: google.cloud.tasks_v2.Task.View
:param retry: (Optional) A retry object used to retry requests. :param retry: (Optional) A retry object used to retry requests.
If None is specified, requests will not be retried. If None is specified, requests will not be retried.
:type retry: google.api_core.retry.Retry :type retry: google.api_core.retry.Retry
@ -470,21 +478,21 @@ class CloudTasksHook(GoogleBaseHook):
client = self.get_conn() client = self.get_conn()
if task_name: if task_name:
full_task_name = CloudTasksClient.task_path(project_id, location, queue_name, task_name) full_task_name = (
f"projects/{project_id}/locations/{location}/queues/{queue_name}/tasks/{task_name}"
)
if isinstance(task, Task): if isinstance(task, Task):
task.name = full_task_name task.name = full_task_name
elif isinstance(task, dict): elif isinstance(task, dict):
task['name'] = full_task_name task['name'] = full_task_name
else: else:
raise AirflowException('Unable to set task_name.') raise AirflowException('Unable to set task_name.')
full_queue_name = CloudTasksClient.queue_path(project_id, location, queue_name) full_queue_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}"
return client.create_task( return client.create_task(
parent=full_queue_name, request={'parent': full_queue_name, 'task': task, 'response_view': response_view},
task=task,
response_view=response_view,
retry=retry, retry=retry,
timeout=timeout, timeout=timeout,
metadata=metadata, metadata=metadata or (),
) )
@GoogleBaseHook.fallback_to_default_project_id @GoogleBaseHook.fallback_to_default_project_id
@ -494,7 +502,7 @@ class CloudTasksHook(GoogleBaseHook):
queue_name: str, queue_name: str,
task_name: str, task_name: str,
project_id: str, project_id: str,
response_view: Optional[enums.Task.View] = None, response_view: Optional = None,
retry: Optional[Retry] = None, retry: Optional[Retry] = None,
timeout: Optional[float] = None, timeout: Optional[float] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = None, metadata: Optional[Sequence[Tuple[str, str]]] = None,
@ -513,7 +521,7 @@ class CloudTasksHook(GoogleBaseHook):
:type project_id: str :type project_id: str
:param response_view: (Optional) This field specifies which subset of the Task will :param response_view: (Optional) This field specifies which subset of the Task will
be returned. be returned.
:type response_view: google.cloud.tasks_v2.enums.Task.View :type response_view: google.cloud.tasks_v2.Task.View
:param retry: (Optional) A retry object used to retry requests. :param retry: (Optional) A retry object used to retry requests.
If None is specified, requests will not be retried. If None is specified, requests will not be retried.
:type retry: google.api_core.retry.Retry :type retry: google.api_core.retry.Retry
@ -527,13 +535,12 @@ class CloudTasksHook(GoogleBaseHook):
""" """
client = self.get_conn() client = self.get_conn()
full_task_name = CloudTasksClient.task_path(project_id, location, queue_name, task_name) full_task_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}/tasks/{task_name}"
return client.get_task( return client.get_task(
name=full_task_name, request={'name': full_task_name, 'response_view': response_view},
response_view=response_view,
retry=retry, retry=retry,
timeout=timeout, timeout=timeout,
metadata=metadata, metadata=metadata or (),
) )
@GoogleBaseHook.fallback_to_default_project_id @GoogleBaseHook.fallback_to_default_project_id
@ -542,7 +549,7 @@ class CloudTasksHook(GoogleBaseHook):
location: str, location: str,
queue_name: str, queue_name: str,
project_id: str, project_id: str,
response_view: Optional[enums.Task.View] = None, response_view: Optional = None,
page_size: Optional[int] = None, page_size: Optional[int] = None,
retry: Optional[Retry] = None, retry: Optional[Retry] = None,
timeout: Optional[float] = None, timeout: Optional[float] = None,
@ -560,7 +567,7 @@ class CloudTasksHook(GoogleBaseHook):
:type project_id: str :type project_id: str
:param response_view: (Optional) This field specifies which subset of the Task will :param response_view: (Optional) This field specifies which subset of the Task will
be returned. be returned.
:type response_view: google.cloud.tasks_v2.enums.Task.View :type response_view: google.cloud.tasks_v2.Task.View
:param page_size: (Optional) The maximum number of resources contained in the :param page_size: (Optional) The maximum number of resources contained in the
underlying API response. underlying API response.
:type page_size: int :type page_size: int
@ -576,14 +583,12 @@ class CloudTasksHook(GoogleBaseHook):
:rtype: list[google.cloud.tasks_v2.types.Task] :rtype: list[google.cloud.tasks_v2.types.Task]
""" """
client = self.get_conn() client = self.get_conn()
full_queue_name = CloudTasksClient.queue_path(project_id, location, queue_name) full_queue_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}"
tasks = client.list_tasks( tasks = client.list_tasks(
parent=full_queue_name, request={'parent': full_queue_name, 'response_view': response_view, 'page_size': page_size},
response_view=response_view,
page_size=page_size,
retry=retry, retry=retry,
timeout=timeout, timeout=timeout,
metadata=metadata, metadata=metadata or (),
) )
return list(tasks) return list(tasks)
@ -622,8 +627,10 @@ class CloudTasksHook(GoogleBaseHook):
""" """
client = self.get_conn() client = self.get_conn()
full_task_name = CloudTasksClient.task_path(project_id, location, queue_name, task_name) full_task_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}/tasks/{task_name}"
client.delete_task(name=full_task_name, retry=retry, timeout=timeout, metadata=metadata) client.delete_task(
request={'name': full_task_name}, retry=retry, timeout=timeout, metadata=metadata or ()
)
@GoogleBaseHook.fallback_to_default_project_id @GoogleBaseHook.fallback_to_default_project_id
def run_task( def run_task(
@ -632,7 +639,7 @@ class CloudTasksHook(GoogleBaseHook):
queue_name: str, queue_name: str,
task_name: str, task_name: str,
project_id: str, project_id: str,
response_view: Optional[enums.Task.View] = None, response_view: Optional = None,
retry: Optional[Retry] = None, retry: Optional[Retry] = None,
timeout: Optional[float] = None, timeout: Optional[float] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = None, metadata: Optional[Sequence[Tuple[str, str]]] = None,
@ -651,7 +658,7 @@ class CloudTasksHook(GoogleBaseHook):
:type project_id: str :type project_id: str
:param response_view: (Optional) This field specifies which subset of the Task will :param response_view: (Optional) This field specifies which subset of the Task will
be returned. be returned.
:type response_view: google.cloud.tasks_v2.enums.Task.View :type response_view: google.cloud.tasks_v2.Task.View
:param retry: (Optional) A retry object used to retry requests. :param retry: (Optional) A retry object used to retry requests.
If None is specified, requests will not be retried. If None is specified, requests will not be retried.
:type retry: google.api_core.retry.Retry :type retry: google.api_core.retry.Retry
@ -665,11 +672,10 @@ class CloudTasksHook(GoogleBaseHook):
""" """
client = self.get_conn() client = self.get_conn()
full_task_name = CloudTasksClient.task_path(project_id, location, queue_name, task_name) full_task_name = f"projects/{project_id}/locations/{location}/queues/{queue_name}/tasks/{task_name}"
return client.run_task( return client.run_task(
name=full_task_name, request={'name': full_task_name, 'response_view': response_view},
response_view=response_view,
retry=retry, retry=retry,
timeout=timeout, timeout=timeout,
metadata=metadata, metadata=metadata or (),
) )

Просмотреть файл

@ -25,9 +25,8 @@ from typing import Dict, Optional, Sequence, Tuple, Union
from google.api_core.exceptions import AlreadyExists from google.api_core.exceptions import AlreadyExists
from google.api_core.retry import Retry from google.api_core.retry import Retry
from google.cloud.tasks_v2 import enums from google.cloud.tasks_v2.types import Queue, Task
from google.cloud.tasks_v2.types import FieldMask, Queue, Task from google.protobuf.field_mask_pb2 import FieldMask
from google.protobuf.json_format import MessageToDict
from airflow.models import BaseOperator from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.tasks import CloudTasksHook from airflow.providers.google.cloud.hooks.tasks import CloudTasksHook
@ -136,7 +135,7 @@ class CloudTasksQueueCreateOperator(BaseOperator):
metadata=self.metadata, metadata=self.metadata,
) )
return MessageToDict(queue) return Queue.to_dict(queue)
class CloudTasksQueueUpdateOperator(BaseOperator): class CloudTasksQueueUpdateOperator(BaseOperator):
@ -159,7 +158,7 @@ class CloudTasksQueueUpdateOperator(BaseOperator):
:param update_mask: A mast used to specify which fields of the queue are being updated. :param update_mask: A mast used to specify which fields of the queue are being updated.
If empty, then all fields will be updated. If empty, then all fields will be updated.
If a dict is provided, it must be of the same form as the protobuf message. If a dict is provided, it must be of the same form as the protobuf message.
:type update_mask: dict or google.cloud.tasks_v2.types.FieldMask :type update_mask: dict or google.protobuf.field_mask_pb2.FieldMask
:param retry: (Optional) A retry object used to retry requests. :param retry: (Optional) A retry object used to retry requests.
If None is specified, requests will not be retried. If None is specified, requests will not be retried.
:type retry: google.api_core.retry.Retry :type retry: google.api_core.retry.Retry
@ -237,7 +236,7 @@ class CloudTasksQueueUpdateOperator(BaseOperator):
timeout=self.timeout, timeout=self.timeout,
metadata=self.metadata, metadata=self.metadata,
) )
return MessageToDict(queue) return Queue.to_dict(queue)
class CloudTasksQueueGetOperator(BaseOperator): class CloudTasksQueueGetOperator(BaseOperator):
@ -320,7 +319,7 @@ class CloudTasksQueueGetOperator(BaseOperator):
timeout=self.timeout, timeout=self.timeout,
metadata=self.metadata, metadata=self.metadata,
) )
return MessageToDict(queue) return Queue.to_dict(queue)
class CloudTasksQueuesListOperator(BaseOperator): class CloudTasksQueuesListOperator(BaseOperator):
@ -408,7 +407,7 @@ class CloudTasksQueuesListOperator(BaseOperator):
timeout=self.timeout, timeout=self.timeout,
metadata=self.metadata, metadata=self.metadata,
) )
return [MessageToDict(q) for q in queues] return [Queue.to_dict(q) for q in queues]
class CloudTasksQueueDeleteOperator(BaseOperator): class CloudTasksQueueDeleteOperator(BaseOperator):
@ -571,7 +570,7 @@ class CloudTasksQueuePurgeOperator(BaseOperator):
timeout=self.timeout, timeout=self.timeout,
metadata=self.metadata, metadata=self.metadata,
) )
return MessageToDict(queue) return Queue.to_dict(queue)
class CloudTasksQueuePauseOperator(BaseOperator): class CloudTasksQueuePauseOperator(BaseOperator):
@ -654,7 +653,7 @@ class CloudTasksQueuePauseOperator(BaseOperator):
timeout=self.timeout, timeout=self.timeout,
metadata=self.metadata, metadata=self.metadata,
) )
return MessageToDict(queue) return Queue.to_dict(queue)
class CloudTasksQueueResumeOperator(BaseOperator): class CloudTasksQueueResumeOperator(BaseOperator):
@ -737,7 +736,7 @@ class CloudTasksQueueResumeOperator(BaseOperator):
timeout=self.timeout, timeout=self.timeout,
metadata=self.metadata, metadata=self.metadata,
) )
return MessageToDict(queue) return Queue.to_dict(queue)
class CloudTasksTaskCreateOperator(BaseOperator): class CloudTasksTaskCreateOperator(BaseOperator):
@ -803,7 +802,7 @@ class CloudTasksTaskCreateOperator(BaseOperator):
task: Union[Dict, Task], task: Union[Dict, Task],
project_id: Optional[str] = None, project_id: Optional[str] = None,
task_name: Optional[str] = None, task_name: Optional[str] = None,
response_view: Optional[enums.Task.View] = None, response_view: Optional = None,
retry: Optional[Retry] = None, retry: Optional[Retry] = None,
timeout: Optional[float] = None, timeout: Optional[float] = None,
metadata: Optional[MetaData] = None, metadata: Optional[MetaData] = None,
@ -840,7 +839,7 @@ class CloudTasksTaskCreateOperator(BaseOperator):
timeout=self.timeout, timeout=self.timeout,
metadata=self.metadata, metadata=self.metadata,
) )
return MessageToDict(task) return Task.to_dict(task)
class CloudTasksTaskGetOperator(BaseOperator): class CloudTasksTaskGetOperator(BaseOperator):
@ -900,7 +899,7 @@ class CloudTasksTaskGetOperator(BaseOperator):
queue_name: str, queue_name: str,
task_name: str, task_name: str,
project_id: Optional[str] = None, project_id: Optional[str] = None,
response_view: Optional[enums.Task.View] = None, response_view: Optional = None,
retry: Optional[Retry] = None, retry: Optional[Retry] = None,
timeout: Optional[float] = None, timeout: Optional[float] = None,
metadata: Optional[MetaData] = None, metadata: Optional[MetaData] = None,
@ -935,7 +934,7 @@ class CloudTasksTaskGetOperator(BaseOperator):
timeout=self.timeout, timeout=self.timeout,
metadata=self.metadata, metadata=self.metadata,
) )
return MessageToDict(task) return Task.to_dict(task)
class CloudTasksTasksListOperator(BaseOperator): class CloudTasksTasksListOperator(BaseOperator):
@ -994,7 +993,7 @@ class CloudTasksTasksListOperator(BaseOperator):
location: str, location: str,
queue_name: str, queue_name: str,
project_id: Optional[str] = None, project_id: Optional[str] = None,
response_view: Optional[enums.Task.View] = None, response_view: Optional = None,
page_size: Optional[int] = None, page_size: Optional[int] = None,
retry: Optional[Retry] = None, retry: Optional[Retry] = None,
timeout: Optional[float] = None, timeout: Optional[float] = None,
@ -1030,7 +1029,7 @@ class CloudTasksTasksListOperator(BaseOperator):
timeout=self.timeout, timeout=self.timeout,
metadata=self.metadata, metadata=self.metadata,
) )
return [MessageToDict(t) for t in tasks] return [Task.to_dict(t) for t in tasks]
class CloudTasksTaskDeleteOperator(BaseOperator): class CloudTasksTaskDeleteOperator(BaseOperator):
@ -1134,7 +1133,7 @@ class CloudTasksTaskRunOperator(BaseOperator):
:type project_id: str :type project_id: str
:param response_view: (Optional) This field specifies which subset of the Task will :param response_view: (Optional) This field specifies which subset of the Task will
be returned. be returned.
:type response_view: google.cloud.tasks_v2.enums.Task.View :type response_view: google.cloud.tasks_v2.Task.View
:param retry: (Optional) A retry object used to retry requests. :param retry: (Optional) A retry object used to retry requests.
If None is specified, requests will not be retried. If None is specified, requests will not be retried.
:type retry: google.api_core.retry.Retry :type retry: google.api_core.retry.Retry
@ -1176,7 +1175,7 @@ class CloudTasksTaskRunOperator(BaseOperator):
queue_name: str, queue_name: str,
task_name: str, task_name: str,
project_id: Optional[str] = None, project_id: Optional[str] = None,
response_view: Optional[enums.Task.View] = None, response_view: Optional = None,
retry: Optional[Retry] = None, retry: Optional[Retry] = None,
timeout: Optional[float] = None, timeout: Optional[float] = None,
metadata: Optional[MetaData] = None, metadata: Optional[MetaData] = None,
@ -1211,4 +1210,4 @@ class CloudTasksTaskRunOperator(BaseOperator):
timeout=self.timeout, timeout=self.timeout,
metadata=self.metadata, metadata=self.metadata,
) )
return MessageToDict(task) return Task.to_dict(task)

Просмотреть файл

@ -272,7 +272,7 @@ google = [
'google-cloud-spanner>=1.10.0,<2.0.0', 'google-cloud-spanner>=1.10.0,<2.0.0',
'google-cloud-speech>=0.36.3,<2.0.0', 'google-cloud-speech>=0.36.3,<2.0.0',
'google-cloud-storage>=1.30,<2.0.0', 'google-cloud-storage>=1.30,<2.0.0',
'google-cloud-tasks>=1.2.1,<2.0.0', 'google-cloud-tasks>=2.0.0,<3.0.0',
'google-cloud-texttospeech>=0.4.0,<2.0.0', 'google-cloud-texttospeech>=0.4.0,<2.0.0',
'google-cloud-translate>=1.5.0,<2.0.0', 'google-cloud-translate>=1.5.0,<2.0.0',
'google-cloud-videointelligence>=1.7.0,<2.0.0', 'google-cloud-videointelligence>=1.7.0,<2.0.0',

Просмотреть файл

@ -72,11 +72,10 @@ class TestCloudTasksHook(unittest.TestCase):
self.assertIs(result, API_RESPONSE) self.assertIs(result, API_RESPONSE)
get_conn.return_value.create_queue.assert_called_once_with( get_conn.return_value.create_queue.assert_called_once_with(
parent=FULL_LOCATION_PATH, request=dict(parent=FULL_LOCATION_PATH, queue=Queue(name=FULL_QUEUE_PATH)),
queue=Queue(name=FULL_QUEUE_PATH),
retry=None, retry=None,
timeout=None, timeout=None,
metadata=None, metadata=(),
) )
@mock.patch( @mock.patch(
@ -94,11 +93,10 @@ class TestCloudTasksHook(unittest.TestCase):
self.assertIs(result, API_RESPONSE) self.assertIs(result, API_RESPONSE)
get_conn.return_value.update_queue.assert_called_once_with( get_conn.return_value.update_queue.assert_called_once_with(
queue=Queue(name=FULL_QUEUE_PATH, state=3), request=dict(queue=Queue(name=FULL_QUEUE_PATH, state=3), update_mask=None),
update_mask=None,
retry=None, retry=None,
timeout=None, timeout=None,
metadata=None, metadata=(),
) )
@mock.patch( @mock.patch(
@ -111,30 +109,28 @@ class TestCloudTasksHook(unittest.TestCase):
self.assertIs(result, API_RESPONSE) self.assertIs(result, API_RESPONSE)
get_conn.return_value.get_queue.assert_called_once_with( get_conn.return_value.get_queue.assert_called_once_with(
name=FULL_QUEUE_PATH, retry=None, timeout=None, metadata=None request=dict(name=FULL_QUEUE_PATH), retry=None, timeout=None, metadata=()
) )
@mock.patch( @mock.patch(
"airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn", "airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn",
**{"return_value.list_queues.return_value": API_RESPONSE}, # type: ignore **{"return_value.list_queues.return_value": [Queue(name=FULL_QUEUE_PATH)]}, # type: ignore
) )
def test_list_queues(self, get_conn): def test_list_queues(self, get_conn):
result = self.hook.list_queues(location=LOCATION, project_id=PROJECT_ID) result = self.hook.list_queues(location=LOCATION, project_id=PROJECT_ID)
self.assertEqual(result, list(API_RESPONSE)) self.assertEqual(result, [Queue(name=FULL_QUEUE_PATH)])
get_conn.return_value.list_queues.assert_called_once_with( get_conn.return_value.list_queues.assert_called_once_with(
parent=FULL_LOCATION_PATH, request=dict(parent=FULL_LOCATION_PATH, filter=None, page_size=None),
filter_=None,
page_size=None,
retry=None, retry=None,
timeout=None, timeout=None,
metadata=None, metadata=(),
) )
@mock.patch( @mock.patch(
"airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn", "airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn",
**{"return_value.delete_queue.return_value": API_RESPONSE}, # type: ignore **{"return_value.delete_queue.return_value": None}, # type: ignore
) )
def test_delete_queue(self, get_conn): def test_delete_queue(self, get_conn):
result = self.hook.delete_queue(location=LOCATION, queue_name=QUEUE_ID, project_id=PROJECT_ID) result = self.hook.delete_queue(location=LOCATION, queue_name=QUEUE_ID, project_id=PROJECT_ID)
@ -142,51 +138,51 @@ class TestCloudTasksHook(unittest.TestCase):
self.assertEqual(result, None) self.assertEqual(result, None)
get_conn.return_value.delete_queue.assert_called_once_with( get_conn.return_value.delete_queue.assert_called_once_with(
name=FULL_QUEUE_PATH, retry=None, timeout=None, metadata=None request=dict(name=FULL_QUEUE_PATH), retry=None, timeout=None, metadata=()
) )
@mock.patch( @mock.patch(
"airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn", "airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn",
**{"return_value.purge_queue.return_value": API_RESPONSE}, # type: ignore **{"return_value.purge_queue.return_value": Queue(name=FULL_QUEUE_PATH)}, # type: ignore
) )
def test_purge_queue(self, get_conn): def test_purge_queue(self, get_conn):
result = self.hook.purge_queue(location=LOCATION, queue_name=QUEUE_ID, project_id=PROJECT_ID) result = self.hook.purge_queue(location=LOCATION, queue_name=QUEUE_ID, project_id=PROJECT_ID)
self.assertEqual(result, API_RESPONSE) self.assertEqual(result, Queue(name=FULL_QUEUE_PATH))
get_conn.return_value.purge_queue.assert_called_once_with( get_conn.return_value.purge_queue.assert_called_once_with(
name=FULL_QUEUE_PATH, retry=None, timeout=None, metadata=None request=dict(name=FULL_QUEUE_PATH), retry=None, timeout=None, metadata=()
) )
@mock.patch( @mock.patch(
"airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn", "airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn",
**{"return_value.pause_queue.return_value": API_RESPONSE}, # type: ignore **{"return_value.pause_queue.return_value": Queue(name=FULL_QUEUE_PATH)}, # type: ignore
) )
def test_pause_queue(self, get_conn): def test_pause_queue(self, get_conn):
result = self.hook.pause_queue(location=LOCATION, queue_name=QUEUE_ID, project_id=PROJECT_ID) result = self.hook.pause_queue(location=LOCATION, queue_name=QUEUE_ID, project_id=PROJECT_ID)
self.assertEqual(result, API_RESPONSE) self.assertEqual(result, Queue(name=FULL_QUEUE_PATH))
get_conn.return_value.pause_queue.assert_called_once_with( get_conn.return_value.pause_queue.assert_called_once_with(
name=FULL_QUEUE_PATH, retry=None, timeout=None, metadata=None request=dict(name=FULL_QUEUE_PATH), retry=None, timeout=None, metadata=()
) )
@mock.patch( @mock.patch(
"airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn", "airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn",
**{"return_value.resume_queue.return_value": API_RESPONSE}, # type: ignore **{"return_value.resume_queue.return_value": Queue(name=FULL_QUEUE_PATH)}, # type: ignore
) )
def test_resume_queue(self, get_conn): def test_resume_queue(self, get_conn):
result = self.hook.resume_queue(location=LOCATION, queue_name=QUEUE_ID, project_id=PROJECT_ID) result = self.hook.resume_queue(location=LOCATION, queue_name=QUEUE_ID, project_id=PROJECT_ID)
self.assertEqual(result, API_RESPONSE) self.assertEqual(result, Queue(name=FULL_QUEUE_PATH))
get_conn.return_value.resume_queue.assert_called_once_with( get_conn.return_value.resume_queue.assert_called_once_with(
name=FULL_QUEUE_PATH, retry=None, timeout=None, metadata=None request=dict(name=FULL_QUEUE_PATH), retry=None, timeout=None, metadata=()
) )
@mock.patch( @mock.patch(
"airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn", "airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn",
**{"return_value.create_task.return_value": API_RESPONSE}, # type: ignore **{"return_value.create_task.return_value": Task(name=FULL_TASK_PATH)}, # type: ignore
) )
def test_create_task(self, get_conn): def test_create_task(self, get_conn):
result = self.hook.create_task( result = self.hook.create_task(
@ -197,20 +193,18 @@ class TestCloudTasksHook(unittest.TestCase):
task_name=TASK_NAME, task_name=TASK_NAME,
) )
self.assertEqual(result, API_RESPONSE) self.assertEqual(result, Task(name=FULL_TASK_PATH))
get_conn.return_value.create_task.assert_called_once_with( get_conn.return_value.create_task.assert_called_once_with(
parent=FULL_QUEUE_PATH, request=dict(parent=FULL_QUEUE_PATH, task=Task(name=FULL_TASK_PATH), response_view=None),
task=Task(name=FULL_TASK_PATH),
response_view=None,
retry=None, retry=None,
timeout=None, timeout=None,
metadata=None, metadata=(),
) )
@mock.patch( @mock.patch(
"airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn", "airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn",
**{"return_value.get_task.return_value": API_RESPONSE}, # type: ignore **{"return_value.get_task.return_value": Task(name=FULL_TASK_PATH)}, # type: ignore
) )
def test_get_task(self, get_conn): def test_get_task(self, get_conn):
result = self.hook.get_task( result = self.hook.get_task(
@ -220,37 +214,34 @@ class TestCloudTasksHook(unittest.TestCase):
project_id=PROJECT_ID, project_id=PROJECT_ID,
) )
self.assertEqual(result, API_RESPONSE) self.assertEqual(result, Task(name=FULL_TASK_PATH))
get_conn.return_value.get_task.assert_called_once_with( get_conn.return_value.get_task.assert_called_once_with(
name=FULL_TASK_PATH, request=dict(name=FULL_TASK_PATH, response_view=None),
response_view=None,
retry=None, retry=None,
timeout=None, timeout=None,
metadata=None, metadata=(),
) )
@mock.patch( @mock.patch(
"airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn", "airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn",
**{"return_value.list_tasks.return_value": API_RESPONSE}, # type: ignore **{"return_value.list_tasks.return_value": [Task(name=FULL_TASK_PATH)]}, # type: ignore
) )
def test_list_tasks(self, get_conn): def test_list_tasks(self, get_conn):
result = self.hook.list_tasks(location=LOCATION, queue_name=QUEUE_ID, project_id=PROJECT_ID) result = self.hook.list_tasks(location=LOCATION, queue_name=QUEUE_ID, project_id=PROJECT_ID)
self.assertEqual(result, list(API_RESPONSE)) self.assertEqual(result, [Task(name=FULL_TASK_PATH)])
get_conn.return_value.list_tasks.assert_called_once_with( get_conn.return_value.list_tasks.assert_called_once_with(
parent=FULL_QUEUE_PATH, request=dict(parent=FULL_QUEUE_PATH, response_view=None, page_size=None),
response_view=None,
page_size=None,
retry=None, retry=None,
timeout=None, timeout=None,
metadata=None, metadata=(),
) )
@mock.patch( @mock.patch(
"airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn", "airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn",
**{"return_value.delete_task.return_value": API_RESPONSE}, # type: ignore **{"return_value.delete_task.return_value": None}, # type: ignore
) )
def test_delete_task(self, get_conn): def test_delete_task(self, get_conn):
result = self.hook.delete_task( result = self.hook.delete_task(
@ -263,12 +254,12 @@ class TestCloudTasksHook(unittest.TestCase):
self.assertEqual(result, None) self.assertEqual(result, None)
get_conn.return_value.delete_task.assert_called_once_with( get_conn.return_value.delete_task.assert_called_once_with(
name=FULL_TASK_PATH, retry=None, timeout=None, metadata=None request=dict(name=FULL_TASK_PATH), retry=None, timeout=None, metadata=()
) )
@mock.patch( @mock.patch(
"airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn", "airflow.providers.google.cloud.hooks.tasks.CloudTasksHook.get_conn",
**{"return_value.run_task.return_value": API_RESPONSE}, # type: ignore **{"return_value.run_task.return_value": Task(name=FULL_TASK_PATH)}, # type: ignore
) )
def test_run_task(self, get_conn): def test_run_task(self, get_conn):
result = self.hook.run_task( result = self.hook.run_task(
@ -278,12 +269,11 @@ class TestCloudTasksHook(unittest.TestCase):
project_id=PROJECT_ID, project_id=PROJECT_ID,
) )
self.assertEqual(result, API_RESPONSE) self.assertEqual(result, Task(name=FULL_TASK_PATH))
get_conn.return_value.run_task.assert_called_once_with( get_conn.return_value.run_task.assert_called_once_with(
name=FULL_TASK_PATH, request=dict(name=FULL_TASK_PATH, response_view=None),
response_view=None,
retry=None, retry=None,
timeout=None, timeout=None,
metadata=None, metadata=(),
) )

Просмотреть файл

@ -57,7 +57,7 @@ class TestCloudTasksQueueCreate(unittest.TestCase):
result = operator.execute(context=None) result = operator.execute(context=None)
self.assertEqual({'name': FULL_QUEUE_PATH}, result) self.assertEqual({'name': FULL_QUEUE_PATH, 'state': 0}, result)
mock_hook.assert_called_once_with( mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID, gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None, impersonation_chain=None,
@ -81,7 +81,7 @@ class TestCloudTasksQueueUpdate(unittest.TestCase):
result = operator.execute(context=None) result = operator.execute(context=None)
self.assertEqual({'name': FULL_QUEUE_PATH}, result) self.assertEqual({'name': FULL_QUEUE_PATH, 'state': 0}, result)
mock_hook.assert_called_once_with( mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID, gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None, impersonation_chain=None,
@ -106,7 +106,7 @@ class TestCloudTasksQueueGet(unittest.TestCase):
result = operator.execute(context=None) result = operator.execute(context=None)
self.assertEqual({'name': FULL_QUEUE_PATH}, result) self.assertEqual({'name': FULL_QUEUE_PATH, 'state': 0}, result)
mock_hook.assert_called_once_with( mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID, gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None, impersonation_chain=None,
@ -129,7 +129,7 @@ class TestCloudTasksQueuesList(unittest.TestCase):
result = operator.execute(context=None) result = operator.execute(context=None)
self.assertEqual([{'name': FULL_QUEUE_PATH}], result) self.assertEqual([{'name': FULL_QUEUE_PATH, 'state': 0}], result)
mock_hook.assert_called_once_with( mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID, gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None, impersonation_chain=None,
@ -176,7 +176,7 @@ class TestCloudTasksQueuePurge(unittest.TestCase):
result = operator.execute(context=None) result = operator.execute(context=None)
self.assertEqual({'name': FULL_QUEUE_PATH}, result) self.assertEqual({'name': FULL_QUEUE_PATH, 'state': 0}, result)
mock_hook.assert_called_once_with( mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID, gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None, impersonation_chain=None,
@ -199,7 +199,7 @@ class TestCloudTasksQueuePause(unittest.TestCase):
result = operator.execute(context=None) result = operator.execute(context=None)
self.assertEqual({'name': FULL_QUEUE_PATH}, result) self.assertEqual({'name': FULL_QUEUE_PATH, 'state': 0}, result)
mock_hook.assert_called_once_with( mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID, gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None, impersonation_chain=None,
@ -222,7 +222,7 @@ class TestCloudTasksQueueResume(unittest.TestCase):
result = operator.execute(context=None) result = operator.execute(context=None)
self.assertEqual({'name': FULL_QUEUE_PATH}, result) self.assertEqual({'name': FULL_QUEUE_PATH, 'state': 0}, result)
mock_hook.assert_called_once_with( mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID, gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None, impersonation_chain=None,
@ -247,7 +247,16 @@ class TestCloudTasksTaskCreate(unittest.TestCase):
result = operator.execute(context=None) result = operator.execute(context=None)
self.assertEqual({'appEngineHttpRequest': {}}, result) self.assertEqual(
{
'app_engine_http_request': {'body': '', 'headers': {}, 'http_method': 0, 'relative_uri': ''},
'dispatch_count': 0,
'name': '',
'response_count': 0,
'view': 0,
},
result,
)
mock_hook.assert_called_once_with( mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID, gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None, impersonation_chain=None,
@ -275,7 +284,16 @@ class TestCloudTasksTaskGet(unittest.TestCase):
result = operator.execute(context=None) result = operator.execute(context=None)
self.assertEqual({'appEngineHttpRequest': {}}, result) self.assertEqual(
{
'app_engine_http_request': {'body': '', 'headers': {}, 'http_method': 0, 'relative_uri': ''},
'dispatch_count': 0,
'name': '',
'response_count': 0,
'view': 0,
},
result,
)
mock_hook.assert_called_once_with( mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID, gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None, impersonation_chain=None,
@ -300,7 +318,23 @@ class TestCloudTasksTasksList(unittest.TestCase):
result = operator.execute(context=None) result = operator.execute(context=None)
self.assertEqual([{'appEngineHttpRequest': {}}], result) self.assertEqual(
[
{
'app_engine_http_request': {
'body': '',
'headers': {},
'http_method': 0,
'relative_uri': '',
},
'dispatch_count': 0,
'name': '',
'response_count': 0,
'view': 0,
}
],
result,
)
mock_hook.assert_called_once_with( mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID, gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None, impersonation_chain=None,
@ -353,7 +387,16 @@ class TestCloudTasksTaskRun(unittest.TestCase):
result = operator.execute(context=None) result = operator.execute(context=None)
self.assertEqual({'appEngineHttpRequest': {}}, result) self.assertEqual(
{
'app_engine_http_request': {'body': '', 'headers': {}, 'http_method': 0, 'relative_uri': ''},
'dispatch_count': 0,
'name': '',
'response_count': 0,
'view': 0,
},
result,
)
mock_hook.assert_called_once_with( mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID, gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None, impersonation_chain=None,