[AIRFLOW-5422] Add type annotations to GCP operators

This commit is contained in:
Tobiasz Kędzierski 2019-08-31 13:58:07 +02:00 коммит произвёл Jarek Potiuk
Родитель 497c02a2d2
Коммит ceb4c086b1
39 изменённых файлов: 1285 добавлений и 1175 удалений

Просмотреть файл

@ -20,6 +20,7 @@
This module contains Google BigQuery check operator.
"""
import warnings
from typing import Any, SupportsAbs, Optional
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.operators.check_operator import \
@ -72,17 +73,17 @@ class BigQueryCheckOperator(CheckOperator):
@apply_defaults
def __init__(self,
sql,
gcp_conn_id='google_cloud_default',
bigquery_conn_id=None,
use_legacy_sql=True,
*args, **kwargs):
sql: str,
gcp_conn_id: str = 'google_cloud_default',
bigquery_conn_id: Optional[str] = None,
use_legacy_sql: bool = True,
*args, **kwargs) -> None:
super().__init__(sql=sql, *args, **kwargs)
if not bigquery_conn_id:
warnings.warn(
"The bigquery_conn_id parameter has been deprecated. You should pass "
"the gcp_conn_id parameter.", DeprecationWarning, stacklevel=3)
gcp_conn_id = bigquery_conn_id
gcp_conn_id = bigquery_conn_id # type: ignore
self.gcp_conn_id = gcp_conn_id
self.sql = sql
@ -113,13 +114,13 @@ class BigQueryValueCheckOperator(ValueCheckOperator):
template_ext = ('.sql', )
@apply_defaults
def __init__(self, sql,
pass_value,
tolerance=None,
gcp_conn_id='google_cloud_default',
bigquery_conn_id=None,
use_legacy_sql=True,
*args, **kwargs):
def __init__(self, sql: str,
pass_value: Any,
tolerance: Any = None,
gcp_conn_id: str = 'google_cloud_default',
bigquery_conn_id: Optional[str] = None,
use_legacy_sql: bool = True,
*args, **kwargs) -> None:
super().__init__(
sql=sql, pass_value=pass_value, tolerance=tolerance,
*args, **kwargs)
@ -171,13 +172,15 @@ class BigQueryIntervalCheckOperator(IntervalCheckOperator):
@apply_defaults
def __init__(self,
table,
metrics_thresholds,
date_filter_column='ds',
days_back=-7,
gcp_conn_id='google_cloud_default',
bigquery_conn_id=None,
use_legacy_sql=True, *args, **kwargs):
table: str,
metrics_thresholds: dict,
date_filter_column: str = 'ds',
days_back: SupportsAbs[int] = -7,
gcp_conn_id: str = 'google_cloud_default',
bigquery_conn_id: Optional[str] = None,
use_legacy_sql: bool = True,
*args,
**kwargs) -> None:
super().__init__(
table=table, metrics_thresholds=metrics_thresholds,
date_filter_column=date_filter_column, days_back=days_back,

Просмотреть файл

@ -20,6 +20,7 @@
This module contains a Google BigQuery data operator.
"""
import warnings
from typing import Optional
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
@ -79,15 +80,15 @@ class BigQueryGetDataOperator(BaseOperator):
@apply_defaults
def __init__(self,
dataset_id,
table_id,
max_results='100',
selected_fields=None,
gcp_conn_id='google_cloud_default',
bigquery_conn_id=None,
delegate_to=None,
dataset_id: str,
table_id: str,
max_results: str = '100',
selected_fields: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
bigquery_conn_id: Optional[str] = None,
delegate_to: Optional[str] = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
if bigquery_conn_id:

Просмотреть файл

@ -22,7 +22,7 @@ This module contains Google BigQuery operators.
import json
import warnings
from typing import Iterable, List, Optional, Union
from typing import Iterable, List, Optional, Union, Dict
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook, _parse_gcs_url
@ -204,9 +204,9 @@ class BigQueryOperator(BaseOperator):
api_resource_configs: Optional[dict] = None,
cluster_fields: Optional[List[str]] = None,
location: Optional[str] = None,
encryption_configuration=None,
encryption_configuration: Optional[dict] = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
if bigquery_conn_id:
@ -406,18 +406,18 @@ class BigQueryCreateEmptyTableOperator(BaseOperator):
# pylint: disable=too-many-arguments
@apply_defaults
def __init__(self,
dataset_id,
table_id,
project_id=None,
schema_fields=None,
gcs_schema_object=None,
time_partitioning=None,
bigquery_conn_id='google_cloud_default',
google_cloud_storage_conn_id='google_cloud_default',
delegate_to=None,
labels=None,
encryption_configuration=None,
*args, **kwargs):
dataset_id: str,
table_id: str,
project_id: Optional[str] = None,
schema_fields: Optional[List] = None,
gcs_schema_object: Optional[str] = None,
time_partitioning: Optional[Dict] = None,
bigquery_conn_id: str = 'google_cloud_default',
google_cloud_storage_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
labels: Optional[Dict] = None,
encryption_configuration: Optional[Dict] = None,
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)
@ -553,26 +553,26 @@ class BigQueryCreateExternalTableOperator(BaseOperator):
# pylint: disable=too-many-arguments
@apply_defaults
def __init__(self,
bucket,
source_objects,
destination_project_dataset_table,
schema_fields=None,
schema_object=None,
source_format='CSV',
compression='NONE',
skip_leading_rows=0,
field_delimiter=',',
max_bad_records=0,
quote_character=None,
allow_quoted_newlines=False,
allow_jagged_rows=False,
bigquery_conn_id='google_cloud_default',
google_cloud_storage_conn_id='google_cloud_default',
delegate_to=None,
src_fmt_configs=None,
labels=None,
encryption_configuration=None,
*args, **kwargs):
bucket: str,
source_objects: List,
destination_project_dataset_table: str,
schema_fields: Optional[List] = None,
schema_object: Optional[str] = None,
source_format: str = 'CSV',
compression: str = 'NONE',
skip_leading_rows: int = 0,
field_delimiter: str = ',',
max_bad_records: int = 0,
quote_character: Optional[str] = None,
allow_quoted_newlines: bool = False,
allow_jagged_rows: bool = False,
bigquery_conn_id: str = 'google_cloud_default',
google_cloud_storage_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
src_fmt_configs: Optional[dict] = None,
labels: Optional[Dict] = None,
encryption_configuration: Optional[Dict] = None,
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)
@ -675,13 +675,13 @@ class BigQueryDeleteDatasetOperator(BaseOperator):
@apply_defaults
def __init__(self,
dataset_id,
project_id=None,
delete_contents=False,
gcp_conn_id='google_cloud_default',
bigquery_conn_id=None,
delegate_to=None,
*args, **kwargs):
dataset_id: str,
project_id: Optional[str] = None,
delete_contents: bool = False,
gcp_conn_id: str = 'google_cloud_default',
bigquery_conn_id: Optional[str] = None,
delegate_to: Optional[str] = None,
*args, **kwargs) -> None:
if bigquery_conn_id:
warnings.warn(
@ -754,14 +754,14 @@ class BigQueryCreateEmptyDatasetOperator(BaseOperator):
@apply_defaults
def __init__(self,
dataset_id,
project_id=None,
dataset_reference=None,
location=None,
gcp_conn_id='google_cloud_default',
bigquery_conn_id=None,
delegate_to=None,
*args, **kwargs):
dataset_id: str,
project_id: Optional[str] = None,
dataset_reference: Optional[Dict] = None,
location: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
bigquery_conn_id: Optional[str] = None,
delegate_to: Optional[str] = None,
*args, **kwargs) -> None:
if bigquery_conn_id:
warnings.warn(
@ -816,11 +816,11 @@ class BigQueryGetDatasetOperator(BaseOperator):
@apply_defaults
def __init__(self,
dataset_id,
project_id=None,
gcp_conn_id='google_cloud_default',
delegate_to=None,
*args, **kwargs):
dataset_id: str,
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
*args, **kwargs) -> None:
self.dataset_id = dataset_id
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
@ -865,12 +865,12 @@ class BigQueryPatchDatasetOperator(BaseOperator):
@apply_defaults
def __init__(self,
dataset_id,
dataset_resource,
project_id=None,
gcp_conn_id='google_cloud_default',
delegate_to=None,
*args, **kwargs):
dataset_id: str,
dataset_resource: dict,
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
*args, **kwargs) -> None:
self.dataset_id = dataset_id
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
@ -919,12 +919,12 @@ class BigQueryUpdateDatasetOperator(BaseOperator):
@apply_defaults
def __init__(self,
dataset_id,
dataset_resource,
project_id=None,
gcp_conn_id='google_cloud_default',
delegate_to=None,
*args, **kwargs):
dataset_id: str,
dataset_resource: dict,
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
*args, **kwargs) -> None:
self.dataset_id = dataset_id
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id

Просмотреть файл

@ -20,6 +20,7 @@
This module contains Google BigQuery table delete operator.
"""
import warnings
from typing import Optional
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
@ -52,13 +53,13 @@ class BigQueryTableDeleteOperator(BaseOperator):
@apply_defaults
def __init__(self,
deletion_dataset_table,
gcp_conn_id='google_cloud_default',
bigquery_conn_id=None,
delegate_to=None,
ignore_if_missing=False,
deletion_dataset_table: str,
gcp_conn_id: str = 'google_cloud_default',
bigquery_conn_id: Optional[str] = None,
delegate_to: Optional[str] = None,
ignore_if_missing: bool = False,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
if bigquery_conn_id:

Просмотреть файл

@ -20,6 +20,7 @@
This module contains a Google BigQuery to BigQuery operator.
"""
import warnings
from typing import Dict, Union, List, Optional
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
@ -74,17 +75,17 @@ class BigQueryToBigQueryOperator(BaseOperator):
@apply_defaults
def __init__(self,
source_project_dataset_tables,
destination_project_dataset_table,
write_disposition='WRITE_EMPTY',
create_disposition='CREATE_IF_NEEDED',
gcp_conn_id='google_cloud_default',
bigquery_conn_id=None,
delegate_to=None,
labels=None,
encryption_configuration=None,
source_project_dataset_tables: Union[List[str], str],
destination_project_dataset_table: str,
write_disposition: str = 'WRITE_EMPTY',
create_disposition: str = 'CREATE_IF_NEEDED',
gcp_conn_id: str = 'google_cloud_default',
bigquery_conn_id: Optional[str] = None,
delegate_to: Optional[str] = None,
labels: Optional[Dict] = None,
encryption_configuration: Optional[Dict] = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
if bigquery_conn_id:

Просмотреть файл

@ -20,6 +20,7 @@
This module contains a Google BigQuery to GCS operator.
"""
import warnings
from typing import Dict, List, Optional
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
@ -43,7 +44,7 @@ class BigQueryToCloudStorageOperator(BaseOperator):
Storage URI (e.g. gs://some-bucket/some-file.txt). (templated) Follows
convention defined here:
https://cloud.google.com/bigquery/exporting-data-from-bigquery#exportingmultiple
:type destination_cloud_storage_uris: list
:type destination_cloud_storage_uris: List[str]
:param compression: Type of compression to use.
:type compression: str
:param export_format: File format to export.
@ -72,18 +73,18 @@ class BigQueryToCloudStorageOperator(BaseOperator):
@apply_defaults
def __init__(self, # pylint: disable=too-many-arguments
source_project_dataset_table,
destination_cloud_storage_uris,
compression='NONE',
export_format='CSV',
field_delimiter=',',
print_header=True,
gcp_conn_id='google_cloud_default',
bigquery_conn_id=None,
delegate_to=None,
labels=None,
source_project_dataset_table: str,
destination_cloud_storage_uris: List[str],
compression: str = 'NONE',
export_format: str = 'CSV',
field_delimiter: str = ',',
print_header: bool = True,
gcp_conn_id: str = 'google_cloud_default',
bigquery_conn_id: Optional[str] = None,
delegate_to: Optional[str] = None,
labels: Optional[Dict] = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
if bigquery_conn_id:

Просмотреть файл

@ -19,6 +19,7 @@
"""
This module contains a Google BigQuery to MySQL operator.
"""
from typing import Optional
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
@ -77,17 +78,17 @@ class BigQueryToMySqlOperator(BaseOperator):
@apply_defaults
def __init__(self,
dataset_table,
mysql_table,
selected_fields=None,
gcp_conn_id='google_cloud_default',
mysql_conn_id='mysql_default',
database=None,
delegate_to=None,
replace=False,
batch_size=1000,
dataset_table: str,
mysql_table: str,
selected_fields: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
mysql_conn_id: str = 'mysql_default',
database: Optional[str] = None,
delegate_to: Optional[str] = None,
replace: bool = False,
batch_size: int = 1000,
*args,
**kwargs):
**kwargs) -> None:
super(BigQueryToMySqlOperator, self).__init__(*args, **kwargs)
self.selected_fields = selected_fields
self.gcp_conn_id = gcp_conn_id

Просмотреть файл

@ -26,6 +26,7 @@ from base64 import b64encode
from datetime import datetime
from decimal import Decimal
from tempfile import NamedTemporaryFile
from typing import Optional
from uuid import UUID
from cassandra.util import Date, Time, SortedSet, OrderedMapSerializedKey
@ -82,18 +83,18 @@ class CassandraToGoogleCloudStorageOperator(BaseOperator):
@apply_defaults
def __init__(self,
cql,
bucket,
filename,
schema_filename=None,
approx_max_file_size_bytes=1900000000,
gzip=False,
cassandra_conn_id='cassandra_default',
gcp_conn_id='google_cloud_default',
google_cloud_storage_conn_id=None,
delegate_to=None,
cql: str,
bucket: str,
filename: str,
schema_filename: Optional[str] = None,
approx_max_file_size_bytes: int = 1900000000,
gzip: bool = False,
cassandra_conn_id: str = 'cassandra_default',
gcp_conn_id: str = 'google_cloud_default',
google_cloud_storage_conn_id: Optional[str] = None,
delegate_to: Optional[str] = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
if google_cloud_storage_conn_id:

Просмотреть файл

@ -20,6 +20,7 @@
This module contains Google Cloud Storage ACL entry operator.
"""
import warnings
from typing import Optional
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.models import BaseOperator
@ -59,17 +60,16 @@ class GoogleCloudStorageBucketCreateAclEntryOperator(BaseOperator):
@apply_defaults
def __init__(
self,
bucket,
entity,
role,
user_project=None,
gcp_conn_id='google_cloud_default',
google_cloud_storage_conn_id=None,
bucket: str,
entity: str,
role: str,
user_project: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
google_cloud_storage_conn_id: Optional[str] = None,
*args,
**kwargs
):
super().__init__(*args,
**kwargs)
) -> None:
super().__init__(*args, **kwargs)
if google_cloud_storage_conn_id:
warnings.warn(
@ -129,17 +129,16 @@ class GoogleCloudStorageObjectCreateAclEntryOperator(BaseOperator):
@apply_defaults
def __init__(self,
bucket,
object_name,
entity,
role,
generation=None,
user_project=None,
gcp_conn_id='google_cloud_default',
google_cloud_storage_conn_id=None,
*args, **kwargs):
super().__init__(*args,
**kwargs)
bucket: str,
object_name: str,
entity: str,
role: str,
generation: Optional[int] = None,
user_project: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
google_cloud_storage_conn_id: Optional[str] = None,
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)
if google_cloud_storage_conn_id:
warnings.warn(

Просмотреть файл

@ -61,7 +61,7 @@ class GoogleCloudStorageDeleteOperator(BaseOperator):
gcp_conn_id: str = 'google_cloud_default',
google_cloud_storage_conn_id: Optional[str] = None,
delegate_to: Optional[str] = None,
*args, **kwargs):
*args, **kwargs) -> None:
if google_cloud_storage_conn_id:
warnings.warn(

Просмотреть файл

@ -22,6 +22,7 @@ This module contains Google Cloud Storage download operator.
import sys
import warnings
from typing import Optional
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.models import BaseOperator
@ -67,15 +68,15 @@ class GoogleCloudStorageDownloadOperator(BaseOperator):
@apply_defaults
def __init__(self,
bucket,
object_name=None,
filename=None,
store_to_xcom_key=None,
gcp_conn_id='google_cloud_default',
google_cloud_storage_conn_id=None,
delegate_to=None,
bucket: str,
object_name: Optional[str] = None,
filename: Optional[str] = None,
store_to_xcom_key: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
google_cloud_storage_conn_id: Optional[str] = None,
delegate_to: Optional[str] = None,
*args,
**kwargs):
**kwargs) -> None:
# To preserve backward compatibility
# TODO: Remove one day
if object_name is None:

Просмотреть файл

@ -20,7 +20,7 @@
This module contains a Google Cloud Storage list operator.
"""
import warnings
from typing import Iterable
from typing import Iterable, Optional
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.models import BaseOperator
@ -71,14 +71,14 @@ class GoogleCloudStorageListOperator(BaseOperator):
@apply_defaults
def __init__(self,
bucket,
prefix=None,
delimiter=None,
gcp_conn_id='google_cloud_default',
google_cloud_storage_conn_id=None,
delegate_to=None,
bucket: str,
prefix: Optional[str] = None,
delimiter: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
google_cloud_storage_conn_id: Optional[str] = None,
delegate_to: Optional[str] = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
if google_cloud_storage_conn_id:

Просмотреть файл

@ -20,6 +20,7 @@
This module contains a Google Cloud Storage Bucket operator.
"""
import warnings
from typing import Dict, Optional
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.models import BaseOperator
@ -95,17 +96,17 @@ class GoogleCloudStorageCreateBucketOperator(BaseOperator):
@apply_defaults
def __init__(self,
bucket_name,
resource=None,
storage_class='MULTI_REGIONAL',
location='US',
project_id=None,
labels=None,
gcp_conn_id='google_cloud_default',
google_cloud_storage_conn_id=None,
delegate_to=None,
bucket_name: str,
resource: Optional[Dict] = None,
storage_class: str = 'MULTI_REGIONAL',
location: str = 'US',
project_id: Optional[str] = None,
labels: Optional[Dict] = None,
gcp_conn_id: str = 'google_cloud_default',
google_cloud_storage_conn_id: Optional[str] = None,
delegate_to: Optional[str] = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
if google_cloud_storage_conn_id:

Просмотреть файл

@ -188,7 +188,7 @@ class GcpBodyFieldValidator(LoggingMixin):
:type api_version: str
"""
def __init__(self, validation_specs: Sequence[str], api_version: str) -> None:
def __init__(self, validation_specs: Sequence[Dict], api_version: str) -> None:
super().__init__()
self._validation_specs = validation_specs
self._api_version = api_version

Просмотреть файл

@ -466,7 +466,7 @@ class CloudTasksHook(GoogleCloudBaseHook):
:type task_name: str
:param response_view: (Optional) This field specifies which subset of the Task will
be returned.
:type response_view: google.cloud.tasks_v2.types.Task.View
:type response_view: google.cloud.tasks_v2.enums.Task.View
:param retry: (Optional) A retry object used to retry requests.
If None is specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
@ -528,7 +528,7 @@ class CloudTasksHook(GoogleCloudBaseHook):
:type project_id: str
:param response_view: (Optional) This field specifies which subset of the Task will
be returned.
:type response_view: google.cloud.tasks_v2.types.Task.View
:type response_view: google.cloud.tasks_v2.enums.Task.View
:param retry: (Optional) A retry object used to retry requests.
If None is specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
@ -577,7 +577,7 @@ class CloudTasksHook(GoogleCloudBaseHook):
:type project_id: str
:param response_view: (Optional) This field specifies which subset of the Task will
be returned.
:type response_view: google.cloud.tasks_v2.types.Task.View
:type response_view: google.cloud.tasks_v2.enums.Task.View
:param page_size: (Optional) The maximum number of resources contained in the
underlying API response.
:type page_size: int
@ -674,7 +674,7 @@ class CloudTasksHook(GoogleCloudBaseHook):
:type project_id: str
:param response_view: (Optional) This field specifies which subset of the Task will
be returned.
:type response_view: google.cloud.tasks_v2.types.Task.View
:type response_view: google.cloud.tasks_v2.enums.Task.View
:param retry: (Optional) A retry object used to retry requests.
If None is specified, requests will not be retried.
:type retry: google.api_core.retry.Retry

Просмотреть файл

@ -22,7 +22,7 @@
This module contains Google AutoML operators.
"""
import ast
from typing import Sequence, Tuple, Union, List, Dict
from typing import Sequence, Tuple, Union, List, Dict, Optional
from google.api_core.retry import Retry
from google.protobuf.json_format import MessageToDict
@ -31,6 +31,8 @@ from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.gcp.hooks.automl import CloudAutoMLHook
MetaData = Sequence[Tuple[str, str]]
class AutoMLTrainModelOperator(BaseOperator):
"""
@ -66,14 +68,14 @@ class AutoMLTrainModelOperator(BaseOperator):
self,
model: dict,
location: str,
project_id: str = None,
metadata: Sequence[Tuple[str, str]] = None,
timeout: float = None,
retry: Retry = None,
project_id: Optional[str] = None,
metadata: Optional[MetaData] = None,
timeout: Optional[float] = None,
retry: Optional[Retry] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.model = model
@ -142,15 +144,15 @@ class AutoMLPredictOperator(BaseOperator):
model_id: str,
location: str,
payload: dict,
params: Dict[str, str] = None,
project_id: str = None,
metadata: Sequence[Tuple[str, str]] = None,
timeout: float = None,
retry: Retry = None,
params: Optional[Dict[str, str]] = None,
project_id: Optional[str] = None,
metadata: Optional[MetaData] = None,
timeout: Optional[float] = None,
retry: Optional[Retry] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.model_id = model_id
@ -236,15 +238,15 @@ class AutoMLBatchPredictOperator(BaseOperator):
input_config: dict,
output_config: dict,
location: str,
project_id: str = None,
params: Dict[str, str] = None,
metadata: Sequence[Tuple[str, str]] = None,
timeout: float = None,
retry: Retry = None,
project_id: Optional[str] = None,
params: Optional[Dict[str, str]] = None,
metadata: Optional[MetaData] = None,
timeout: Optional[float] = None,
retry: Optional[Retry] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.model_id = model_id
@ -314,14 +316,14 @@ class AutoMLCreateDatasetOperator(BaseOperator):
self,
dataset: dict,
location: str,
project_id: str = None,
metadata: Sequence[Tuple[str, str]] = None,
timeout: float = None,
retry: Retry = None,
project_id: Optional[str] = None,
metadata: Optional[MetaData] = None,
timeout: Optional[float] = None,
retry: Optional[Retry] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.dataset = dataset
@ -391,14 +393,14 @@ class AutoMLImportDataOperator(BaseOperator):
dataset_id: str,
location: str,
input_config: dict,
project_id: str = None,
metadata: Sequence[Tuple[str, str]] = None,
timeout: float = None,
retry: Retry = None,
project_id: Optional[str] = None,
metadata: Optional[MetaData] = None,
timeout: Optional[float] = None,
retry: Optional[Retry] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.dataset_id = dataset_id
@ -482,17 +484,17 @@ class AutoMLTablesListColumnSpecsOperator(BaseOperator):
dataset_id: str,
table_spec_id: str,
location: str,
field_mask: dict = None,
filter_: str = None,
page_size: int = None,
project_id: str = None,
metadata: Sequence[Tuple[str, str]] = None,
timeout: float = None,
retry: Retry = None,
field_mask: Optional[dict] = None,
filter_: Optional[str] = None,
page_size: Optional[int] = None,
project_id: Optional[str] = None,
metadata: Optional[MetaData] = None,
timeout: Optional[float] = None,
retry: Optional[Retry] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.dataset_id = dataset_id
self.table_spec_id = table_spec_id
@ -567,15 +569,15 @@ class AutoMLTablesUpdateDatasetOperator(BaseOperator):
self,
dataset: dict,
location: str,
project_id: str = None,
update_mask: dict = None,
metadata: Sequence[Tuple[str, str]] = None,
timeout: float = None,
retry: Retry = None,
project_id: Optional[str] = None,
update_mask: Optional[dict] = None,
metadata: Optional[MetaData] = None,
timeout: Optional[float] = None,
retry: Optional[Retry] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.dataset = dataset
@ -638,14 +640,14 @@ class AutoMLGetModelOperator(BaseOperator):
self,
model_id: str,
location: str,
project_id: str = None,
metadata: Sequence[Tuple[str, str]] = None,
timeout: float = None,
retry: Retry = None,
project_id: Optional[str] = None,
metadata: Optional[MetaData] = None,
timeout: Optional[float] = None,
retry: Optional[Retry] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.model_id = model_id
@ -705,14 +707,14 @@ class AutoMLDeleteModelOperator(BaseOperator):
self,
model_id: str,
location: str,
project_id: str = None,
metadata: Sequence[Tuple[str, str]] = None,
timeout: float = None,
retry: Retry = None,
project_id: Optional[str] = None,
metadata: Optional[MetaData] = None,
timeout: Optional[float] = None,
retry: Optional[Retry] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.model_id = model_id
@ -782,15 +784,15 @@ class AutoMLDeployModelOperator(BaseOperator):
self,
model_id: str,
location: str,
project_id: str = None,
image_detection_metadata: dict = None,
project_id: Optional[str] = None,
image_detection_metadata: Optional[dict] = None,
metadata: Sequence[Tuple[str, str]] = None,
timeout: float = None,
retry: Retry = None,
timeout: Optional[float] = None,
retry: Optional[Retry] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.model_id = model_id
@ -862,16 +864,16 @@ class AutoMLTablesListTableSpecsOperator(BaseOperator):
self,
dataset_id: str,
location: str,
page_size: int = None,
filter_: str = None,
project_id: str = None,
metadata: Sequence[Tuple[str, str]] = None,
timeout: float = None,
retry: Retry = None,
page_size: Optional[int] = None,
filter_: Optional[str] = None,
project_id: Optional[str] = None,
metadata: Optional[MetaData] = None,
timeout: Optional[float] = None,
retry: Optional[Retry] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.dataset_id = dataset_id
self.filter_ = filter_
@ -933,14 +935,14 @@ class AutoMLListDatasetOperator(BaseOperator):
def __init__(
self,
location: str,
project_id: str = None,
metadata: Sequence[Tuple[str, str]] = None,
timeout: float = None,
retry: Retry = None,
project_id: Optional[str] = None,
metadata: Optional[MetaData] = None,
timeout: Optional[float] = None,
retry: Optional[Retry] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.location = location
self.project_id = project_id
@ -1005,14 +1007,14 @@ class AutoMLDeleteDatasetOperator(BaseOperator):
self,
dataset_id: Union[str, List[str]],
location: str,
project_id: str = None,
metadata: Sequence[Tuple[str, str]] = None,
timeout: float = None,
retry: Retry = None,
project_id: Optional[str] = None,
metadata: Optional[MetaData] = None,
timeout: Optional[float] = None,
retry: Optional[Retry] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.dataset_id = dataset_id

Просмотреть файл

@ -19,10 +19,11 @@
"""
This module contains Google Cloud Bigtable operators.
"""
from typing import Iterable
from enum import IntEnum
from typing import Iterable, List, Optional, Dict
import google.api_core.exceptions
from google.cloud.bigtable.column_family import GarbageCollectionRule
from airflow import AirflowException
from airflow.models import BaseOperator
@ -86,29 +87,31 @@ class BigtableInstanceCreateOperator(BaseOperator, BigtableValidationMixin):
:type timeout: int
:param timeout: (optional) timeout (in seconds) for instance creation.
If None is not specified, Operator will wait indefinitely.
:param gcp_conn_id: The connection ID to use to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
REQUIRED_ATTRIBUTES = ('instance_id', 'main_cluster_id',
'main_cluster_zone')
'main_cluster_zone') # type: Iterable[str]
template_fields = ['project_id', 'instance_id', 'main_cluster_id',
'main_cluster_zone']
'main_cluster_zone'] # type: Iterable[str]
@apply_defaults
def __init__(self, # pylint: disable=too-many-arguments
instance_id,
main_cluster_id,
main_cluster_zone,
project_id=None,
replica_cluster_id=None,
replica_cluster_zone=None,
instance_display_name=None,
instance_type=None,
instance_labels=None,
cluster_nodes=None,
cluster_storage_type=None,
timeout=None,
gcp_conn_id='google_cloud_default',
*args, **kwargs):
instance_id: str,
main_cluster_id: str,
main_cluster_zone: str,
project_id: Optional[str] = None,
replica_cluster_id: Optional[str] = None,
replica_cluster_zone: Optional[str] = None,
instance_display_name: Optional[str] = None,
instance_type: Optional[IntEnum] = None,
instance_labels: Optional[int] = None,
cluster_nodes: Optional[int] = None,
cluster_storage_type: Optional[IntEnum] = None,
timeout: Optional[float] = None,
gcp_conn_id: str = 'google_cloud_default',
*args, **kwargs) -> None:
self.project_id = project_id
self.instance_id = instance_id
self.main_cluster_id = main_cluster_id
@ -174,16 +177,18 @@ class BigtableInstanceDeleteOperator(BaseOperator, BigtableValidationMixin):
:param project_id: Optional, the ID of the GCP project. If set to None or missing,
the default project_id from the GCP connection is used.
:type project_id: str
:param gcp_conn_id: The connection ID to use to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
REQUIRED_ATTRIBUTES = ('instance_id',)
template_fields = ['project_id', 'instance_id']
REQUIRED_ATTRIBUTES = ('instance_id',) # type: Iterable[str]
template_fields = ['project_id', 'instance_id'] # type: Iterable[str]
@apply_defaults
def __init__(self,
instance_id,
project_id=None,
gcp_conn_id='google_cloud_default',
*args, **kwargs):
instance_id: str,
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
*args, **kwargs) -> None:
self.project_id = project_id
self.instance_id = instance_id
self._validate_inputs()
@ -232,19 +237,21 @@ class BigtableTableCreateOperator(BaseOperator, BigtableValidationMixin):
:param column_families: (Optional) A map columns to create.
The key is the column_id str and the value is a
:class:`google.cloud.bigtable.column_family.GarbageCollectionRule`
:param gcp_conn_id: The connection ID to use to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
REQUIRED_ATTRIBUTES = ('instance_id', 'table_id')
template_fields = ['project_id', 'instance_id', 'table_id']
REQUIRED_ATTRIBUTES = ('instance_id', 'table_id') # type: Iterable[str]
template_fields = ['project_id', 'instance_id', 'table_id'] # type: Iterable[str]
@apply_defaults
def __init__(self,
instance_id,
table_id,
project_id=None,
initial_split_keys=None,
column_families=None,
gcp_conn_id='google_cloud_default',
*args, **kwargs):
instance_id: str,
table_id: str,
project_id: Optional[str] = None,
initial_split_keys: Optional[List] = None,
column_families: Optional[Dict[str, GarbageCollectionRule]] = None,
gcp_conn_id: str = 'google_cloud_default',
*args, **kwargs) -> None:
self.project_id = project_id
self.instance_id = instance_id
self.table_id = table_id
@ -320,18 +327,20 @@ class BigtableTableDeleteOperator(BaseOperator, BigtableValidationMixin):
the default project_id from the GCP connection is used.
:type app_profile_id: str
:parm app_profile_id: Application profile.
:param gcp_conn_id: The connection ID to use to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
REQUIRED_ATTRIBUTES = ('instance_id', 'table_id')
template_fields = ['project_id', 'instance_id', 'table_id']
REQUIRED_ATTRIBUTES = ('instance_id', 'table_id') # type: Iterable[str]
template_fields = ['project_id', 'instance_id', 'table_id'] # type: Iterable[str]
@apply_defaults
def __init__(self,
instance_id,
table_id,
project_id=None,
app_profile_id=None,
gcp_conn_id='google_cloud_default',
*args, **kwargs):
instance_id: str,
table_id: str,
project_id: Optional[str] = None,
app_profile_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
*args, **kwargs) -> None:
self.project_id = project_id
self.instance_id = instance_id
self.table_id = table_id
@ -383,18 +392,20 @@ class BigtableClusterUpdateOperator(BaseOperator, BigtableValidationMixin):
:param nodes: The desired number of nodes for the Cloud Bigtable cluster.
:type project_id: str
:param project_id: Optional, the ID of the GCP project.
:param gcp_conn_id: The connection ID to use to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
REQUIRED_ATTRIBUTES = ('instance_id', 'cluster_id', 'nodes')
template_fields = ['project_id', 'instance_id', 'cluster_id', 'nodes']
REQUIRED_ATTRIBUTES = ('instance_id', 'cluster_id', 'nodes') # type: Iterable[str]
template_fields = ['project_id', 'instance_id', 'cluster_id', 'nodes'] # type: Iterable[str]
@apply_defaults
def __init__(self,
instance_id,
cluster_id,
nodes,
project_id=None,
gcp_conn_id='google_cloud_default',
*args, **kwargs):
instance_id: str,
cluster_id: str,
nodes: str,
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
*args, **kwargs) -> None:
self.project_id = project_id
self.instance_id = instance_id
self.cluster_id = cluster_id

Просмотреть файл

@ -19,6 +19,7 @@
"""Operators that integrat with Google Cloud Build service."""
from copy import deepcopy
import re
from typing import Dict, Iterable, Any, Optional
from urllib.parse import urlparse, unquote
from airflow import AirflowException
@ -42,7 +43,7 @@ class BuildProcessor:
See: https://cloud.google.com/cloud-build/docs/api/reference/rest/Shared.Types/Build
:type body: dict
"""
def __init__(self, body):
def __init__(self, body: Dict) -> None:
self.body = deepcopy(body)
def _verify_source(self):
@ -127,7 +128,7 @@ class BuildProcessor:
return source_dict
@staticmethod
def _convert_storage_url_to_dict(storage_url):
def _convert_storage_url_to_dict(storage_url: str) -> Dict[str, Any]:
"""
Convert url to object in Google Cloud Storage to a format supported by the API
@ -165,18 +166,24 @@ class CloudBuildCreateBuildOperator(BaseOperator):
:param body: The request body.
See: https://cloud.google.com/cloud-build/docs/api/reference/rest/Shared.Types/Build
:type body: dict
:param project_id: ID of the Google Cloud project if None then
default project_id is used.
:type project_id: str
:param gcp_conn_id: The connection ID to use to connect to Google Cloud Platform.
:type gcp_conn_id: str
:param api_version: API version used (for example v1 or v1beta1).
:type api_version: str
"""
template_fields = ("body", "gcp_conn_id", "api_version")
template_fields = ("body", "gcp_conn_id", "api_version") # type: Iterable[str]
@apply_defaults
def __init__(
self, body, project_id=None, gcp_conn_id="google_cloud_default", api_version="v1", *args, **kwargs
):
def __init__(self,
body: dict,
project_id: Optional[str] = None,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v1",
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.body = body
self.project_id = project_id

Просмотреть файл

@ -19,6 +19,7 @@
"""
This module contains Google Cloud SQL operators.
"""
from typing import Union, List, Optional, Iterable, Dict
from googleapiclient.errors import HttpError
@ -153,18 +154,18 @@ class CloudSqlBaseOperator(BaseOperator):
"""
@apply_defaults
def __init__(self,
instance,
project_id=None,
gcp_conn_id='google_cloud_default',
api_version='v1beta4',
*args, **kwargs):
instance: str,
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
api_version: str = 'v1beta4',
*args, **kwargs) -> None:
self.project_id = project_id
self.instance = instance
self.gcp_conn_id = gcp_conn_id
self.api_version = api_version
self._validate_inputs()
self._hook = CloudSqlHook(gcp_conn_id=self.gcp_conn_id,
api_version=self.api_version)
api_version=self.api_version) # type: CloudSqlHook
super().__init__(*args, **kwargs)
def _validate_inputs(self):
@ -235,13 +236,13 @@ class CloudSqlInstanceCreateOperator(CloudSqlBaseOperator):
@apply_defaults
def __init__(self,
body,
instance,
project_id=None,
gcp_conn_id='google_cloud_default',
api_version='v1beta4',
validate_body=True,
*args, **kwargs):
body: dict,
instance: str,
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
api_version: str = 'v1beta4',
validate_body: bool = True,
*args, **kwargs) -> None:
self.body = body
self.validate_body = validate_body
super().__init__(
@ -309,12 +310,12 @@ class CloudSqlInstancePatchOperator(CloudSqlBaseOperator):
@apply_defaults
def __init__(self,
body,
instance,
project_id=None,
gcp_conn_id='google_cloud_default',
api_version='v1beta4',
*args, **kwargs):
body: dict,
instance: str,
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
api_version: str = 'v1beta4',
*args, **kwargs) -> None:
self.body = body
super().__init__(
project_id=project_id, instance=instance, gcp_conn_id=gcp_conn_id,
@ -361,11 +362,11 @@ class CloudSqlInstanceDeleteOperator(CloudSqlBaseOperator):
@apply_defaults
def __init__(self,
instance,
project_id=None,
gcp_conn_id='google_cloud_default',
api_version='v1beta4',
*args, **kwargs):
instance: str,
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
api_version: str = 'v1beta4',
*args, **kwargs) -> None:
super().__init__(
project_id=project_id, instance=instance, gcp_conn_id=gcp_conn_id,
api_version=api_version, *args, **kwargs)
@ -410,13 +411,13 @@ class CloudSqlInstanceDatabaseCreateOperator(CloudSqlBaseOperator):
@apply_defaults
def __init__(self,
instance,
body,
project_id=None,
gcp_conn_id='google_cloud_default',
api_version='v1beta4',
validate_body=True,
*args, **kwargs):
instance: str,
body: dict,
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
api_version: str = 'v1beta4',
validate_body: bool = True,
*args, **kwargs) -> None:
self.body = body
self.validate_body = validate_body
super().__init__(
@ -483,14 +484,14 @@ class CloudSqlInstanceDatabasePatchOperator(CloudSqlBaseOperator):
@apply_defaults
def __init__(self,
instance,
database,
body,
project_id=None,
gcp_conn_id='google_cloud_default',
api_version='v1beta4',
validate_body=True,
*args, **kwargs):
instance: str,
database: str,
body: dict,
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
api_version: str = 'v1beta4',
validate_body: bool = True,
*args, **kwargs) -> None:
self.database = database
self.body = body
self.validate_body = validate_body
@ -552,12 +553,12 @@ class CloudSqlInstanceDatabaseDeleteOperator(CloudSqlBaseOperator):
@apply_defaults
def __init__(self,
instance,
database,
project_id=None,
gcp_conn_id='google_cloud_default',
api_version='v1beta4',
*args, **kwargs):
instance: str,
database: str,
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
api_version: str = 'v1beta4',
*args, **kwargs) -> None:
self.database = database
super().__init__(
project_id=project_id, instance=instance, gcp_conn_id=gcp_conn_id,
@ -614,13 +615,13 @@ class CloudSqlInstanceExportOperator(CloudSqlBaseOperator):
@apply_defaults
def __init__(self,
instance,
body,
project_id=None,
gcp_conn_id='google_cloud_default',
api_version='v1beta4',
validate_body=True,
*args, **kwargs):
instance: str,
body: dict,
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
api_version: str = 'v1beta4',
validate_body: bool = True,
*args, **kwargs) -> None:
self.body = body
self.validate_body = validate_body
super().__init__(
@ -690,13 +691,13 @@ class CloudSqlInstanceImportOperator(CloudSqlBaseOperator):
@apply_defaults
def __init__(self,
instance,
body,
project_id=None,
gcp_conn_id='google_cloud_default',
api_version='v1beta4',
validate_body=True,
*args, **kwargs):
instance: str,
body: dict,
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
api_version: str = 'v1beta4',
validate_body: bool = True,
*args, **kwargs) -> None:
self.body = body
self.validate_body = validate_body
super().__init__(
@ -737,7 +738,7 @@ class CloudSqlQueryOperator(BaseOperator):
you can use CREATE TABLE IF NOT EXISTS to create a table.
:type sql: str or list[str]
:param parameters: (optional) the parameters to render the SQL query with.
:type parameters: mapping or iterable
:type parameters: dict or iterable
:param autocommit: if True, each command is automatically committed.
(default value: False)
:type autocommit: bool
@ -757,12 +758,12 @@ class CloudSqlQueryOperator(BaseOperator):
@apply_defaults
def __init__(self,
sql,
autocommit=False,
parameters=None,
gcp_conn_id='google_cloud_default',
gcp_cloudsql_conn_id='google_cloud_sql_default',
*args, **kwargs):
sql: Union[List[str], str],
autocommit: bool = False,
parameters: Optional[Union[Dict, Iterable]] = None,
gcp_conn_id: str = 'google_cloud_default',
gcp_cloudsql_conn_id: str = 'google_cloud_sql_default',
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.sql = sql
self.gcp_conn_id = gcp_conn_id

Просмотреть файл

@ -23,6 +23,7 @@ This module contains Google Cloud Transfer operators.
from copy import deepcopy
from datetime import date, time
from typing import Optional, Dict
from airflow import AirflowException
from airflow.gcp.hooks.cloud_storage_transfer_service import (
@ -67,7 +68,7 @@ class TransferJobPreprocessor:
Helper class for preprocess of transfer job body.
"""
def __init__(self, body, aws_conn_id='aws_default', default_schedule=False):
def __init__(self, body: dict, aws_conn_id: str = 'aws_default', default_schedule: bool = False) -> None:
self.body = body
self.aws_conn_id = aws_conn_id
self.default_schedule = default_schedule
@ -142,7 +143,7 @@ class TransferJobValidator:
"""
Helper class for validating transfer job body.
"""
def __init__(self, body):
def __init__(self, body: dict) -> None:
if not body:
raise AirflowException("The required parameter 'body' is empty or None")
@ -220,13 +221,13 @@ class GcpTransferServiceJobCreateOperator(BaseOperator):
@apply_defaults
def __init__(
self,
body,
aws_conn_id='aws_default',
gcp_conn_id='google_cloud_default',
api_version='v1',
body: dict,
aws_conn_id: str = 'aws_default',
gcp_conn_id: str = 'google_cloud_default',
api_version: str = 'v1',
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.body = deepcopy(body)
self.aws_conn_id = aws_conn_id
@ -279,14 +280,14 @@ class GcpTransferServiceJobUpdateOperator(BaseOperator):
@apply_defaults
def __init__(
self,
job_name,
body,
aws_conn_id='aws_default',
gcp_conn_id='google_cloud_default',
api_version='v1',
job_name: str,
body: dict,
aws_conn_id: str = 'aws_default',
gcp_conn_id: str = 'google_cloud_default',
api_version: str = 'v1',
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.job_name = job_name
self.body = body
@ -335,8 +336,14 @@ class GcpTransferServiceJobDeleteOperator(BaseOperator):
@apply_defaults
def __init__(
self, job_name, gcp_conn_id='google_cloud_default', api_version='v1', project_id=None, *args, **kwargs
):
self,
job_name: str,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v1",
project_id: Optional[str] = None,
*args,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.job_name = job_name
self.project_id = project_id
@ -376,7 +383,14 @@ class GcpTransferServiceOperationGetOperator(BaseOperator):
# [END gcp_transfer_operation_get_template_fields]
@apply_defaults
def __init__(self, operation_name, gcp_conn_id='google_cloud_default', api_version='v1', *args, **kwargs):
def __init__(
self,
operation_name: str,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v1",
*args,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.operation_name = operation_name
self.gcp_conn_id = gcp_conn_id
@ -416,11 +430,11 @@ class GcpTransferServiceOperationsListOperator(BaseOperator):
# [END gcp_transfer_operations_list_template_fields]
def __init__(self,
request_filter=None,
gcp_conn_id='google_cloud_default',
api_version='v1',
request_filter: Optional[Dict] = None,
gcp_conn_id: str = 'google_cloud_default',
api_version: str = 'v1',
*args,
**kwargs):
**kwargs) -> None:
# To preserve backward compatibility
# TODO: remove one day
if request_filter is None:
@ -467,7 +481,14 @@ class GcpTransferServiceOperationPauseOperator(BaseOperator):
# [END gcp_transfer_operation_pause_template_fields]
@apply_defaults
def __init__(self, operation_name, gcp_conn_id='google_cloud_default', api_version='v1', *args, **kwargs):
def __init__(
self,
operation_name: str,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v1",
*args,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.operation_name = operation_name
self.gcp_conn_id = gcp_conn_id
@ -503,7 +524,14 @@ class GcpTransferServiceOperationResumeOperator(BaseOperator):
# [END gcp_transfer_operation_resume_template_fields]
@apply_defaults
def __init__(self, operation_name, gcp_conn_id='google_cloud_default', api_version='v1', *args, **kwargs):
def __init__(
self,
operation_name: str,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v1",
*args,
**kwargs
) -> None:
self.operation_name = operation_name
self.gcp_conn_id = gcp_conn_id
self.api_version = api_version
@ -540,7 +568,14 @@ class GcpTransferServiceOperationCancelOperator(BaseOperator):
# [END gcp_transfer_operation_cancel_template_fields]
@apply_defaults
def __init__(self, operation_name, api_version='v1', gcp_conn_id='google_cloud_default', *args, **kwargs):
def __init__(
self,
operation_name: str,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v1",
*args,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.operation_name = operation_name
self.api_version = api_version
@ -624,21 +659,21 @@ class S3ToGoogleCloudStorageTransferOperator(BaseOperator):
@apply_defaults
def __init__( # pylint: disable=too-many-arguments
self,
s3_bucket,
gcs_bucket,
project_id=None,
aws_conn_id='aws_default',
gcp_conn_id='google_cloud_default',
delegate_to=None,
description=None,
schedule=None,
object_conditions=None,
transfer_options=None,
wait=True,
timeout=None,
s3_bucket: str,
gcs_bucket: str,
project_id: Optional[str] = None,
aws_conn_id: str = 'aws_default',
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
description: Optional[str] = None,
schedule: Optional[Dict] = None,
object_conditions: Optional[Dict] = None,
transfer_options: Optional[Dict] = None,
wait: bool = True,
timeout: Optional[float] = None,
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.s3_bucket = s3_bucket
@ -763,20 +798,20 @@ class GoogleCloudStorageToGoogleCloudStorageTransferOperator(BaseOperator):
@apply_defaults
def __init__( # pylint: disable=too-many-arguments
self,
source_bucket,
destination_bucket,
project_id=None,
gcp_conn_id='google_cloud_default',
delegate_to=None,
description=None,
schedule=None,
object_conditions=None,
transfer_options=None,
wait=True,
timeout=None,
source_bucket: str,
destination_bucket: str,
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
description: Optional[str] = None,
schedule: Optional[Dict] = None,
object_conditions: Optional[Dict] = None,
transfer_options: Optional[Dict] = None,
wait: bool = True,
timeout: Optional[float] = None,
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.source_bucket = source_bucket

Просмотреть файл

@ -21,7 +21,7 @@ This module contains Google Compute Engine operators.
"""
from copy import deepcopy
from typing import Dict
from typing import Dict, Optional, List, Any
from json_merge_patch import merge
from googleapiclient.errors import HttpError
@ -41,12 +41,12 @@ class GceBaseOperator(BaseOperator):
@apply_defaults
def __init__(self,
zone,
resource_id,
project_id=None,
gcp_conn_id='google_cloud_default',
api_version='v1',
*args, **kwargs):
zone: str,
resource_id: str,
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
api_version: str = 'v1',
*args, **kwargs) -> None:
self.project_id = project_id
self.zone = zone
self.resource_id = resource_id
@ -98,12 +98,12 @@ class GceInstanceStartOperator(GceBaseOperator):
@apply_defaults
def __init__(self,
zone,
resource_id,
project_id=None,
gcp_conn_id='google_cloud_default',
api_version='v1',
*args, **kwargs):
zone: str,
resource_id: str,
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
api_version: str = 'v1',
*args, **kwargs) -> None:
super().__init__(
project_id=project_id, zone=zone, resource_id=resource_id,
gcp_conn_id=gcp_conn_id, api_version=api_version, *args, **kwargs)
@ -146,12 +146,12 @@ class GceInstanceStopOperator(GceBaseOperator):
@apply_defaults
def __init__(self,
zone,
resource_id,
project_id=None,
gcp_conn_id='google_cloud_default',
api_version='v1',
*args, **kwargs):
zone: str,
resource_id: str,
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
api_version: str = 'v1',
*args, **kwargs) -> None:
super().__init__(
project_id=project_id, zone=zone, resource_id=resource_id,
gcp_conn_id=gcp_conn_id, api_version=api_version, *args, **kwargs)
@ -204,16 +204,16 @@ class GceSetMachineTypeOperator(GceBaseOperator):
@apply_defaults
def __init__(self,
zone,
resource_id,
body,
project_id=None,
gcp_conn_id='google_cloud_default',
api_version='v1',
validate_body=True,
*args, **kwargs):
zone: str,
resource_id: str,
body: dict,
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
api_version: str = 'v1',
validate_body: bool = True,
*args, **kwargs) -> None:
self.body = body
self._field_validator = None
self._field_validator = None # type: Optional[GcpBodyFieldValidator]
if validate_body:
self._field_validator = GcpBodyFieldValidator(
SET_MACHINE_TYPE_VALIDATION_SPECIFICATION, api_version=api_version)
@ -262,7 +262,7 @@ GCE_INSTANCE_TEMPLATE_VALIDATION_PATCH_SPECIFICATION = [
dict(name="guestAccelerators", optional=True), # not validating deeper
dict(name="minCpuPlatform", optional=True),
]),
]
] # type: List[Dict[str, Any]]
GCE_INSTANCE_TEMPLATE_FIELDS_TO_SANITIZE = [
"kind",
@ -326,17 +326,17 @@ class GceInstanceTemplateCopyOperator(GceBaseOperator):
@apply_defaults
def __init__(self,
resource_id,
body_patch,
project_id=None,
resource_id: str,
body_patch: dict,
project_id: Optional[str] = None,
request_id=None,
gcp_conn_id='google_cloud_default',
api_version='v1',
validate_body=True,
*args, **kwargs):
gcp_conn_id: str = 'google_cloud_default',
api_version: str = 'v1',
validate_body: bool = True,
*args, **kwargs) -> None:
self.body_patch = body_patch
self.request_id = request_id
self._field_validator = None
self._field_validator = None # Optional[GcpBodyFieldValidator]
if 'name' not in self.body_patch:
raise AirflowException("The body '{}' should contain at least "
"name for the new operator in the 'name' field".
@ -436,16 +436,16 @@ class GceInstanceGroupManagerUpdateTemplateOperator(GceBaseOperator):
@apply_defaults
def __init__(self,
resource_id,
zone,
source_template,
destination_template,
project_id=None,
resource_id: str,
zone: str,
source_template: str,
destination_template: str,
project_id: Optional[str] = None,
update_policy=None,
request_id=None,
gcp_conn_id='google_cloud_default',
gcp_conn_id: str = 'google_cloud_default',
api_version='beta',
*args, **kwargs):
*args, **kwargs) -> None:
self.zone = zone
self.source_template = source_template
self.destination_template = destination_template

Просмотреть файл

@ -25,6 +25,7 @@ import re
import uuid
import copy
from enum import Enum
from typing import List, Optional
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.gcp.hooks.dataflow import DataFlowHook
@ -167,18 +168,18 @@ class DataFlowJavaOperator(BaseOperator):
@apply_defaults
def __init__(
self,
jar,
job_name='{{task.task_id}}',
dataflow_default_options=None,
options=None,
gcp_conn_id='google_cloud_default',
delegate_to=None,
poll_sleep=10,
job_class=None,
check_if_running=CheckJobRunning.WaitForRun,
multiple_jobs=None,
jar: str,
job_name: str = '{{task.task_id}}',
dataflow_default_options: Optional[dict] = None,
options: Optional[dict] = None,
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
poll_sleep: int = 10,
job_class: Optional[str] = None,
check_if_running: CheckJobRunning = CheckJobRunning.WaitForRun,
multiple_jobs: Optional[bool] = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
dataflow_default_options = dataflow_default_options or {}
@ -295,15 +296,15 @@ class DataflowTemplateOperator(BaseOperator):
@apply_defaults
def __init__(
self,
template,
job_name='{{task.task_id}}',
dataflow_default_options=None,
parameters=None,
gcp_conn_id='google_cloud_default',
delegate_to=None,
poll_sleep=10,
template: str,
job_name: str = '{{task.task_id}}',
dataflow_default_options: Optional[dict] = None,
parameters: Optional[dict] = None,
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
poll_sleep: int = 10,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
dataflow_default_options = dataflow_default_options or {}
@ -368,16 +369,16 @@ class DataFlowPythonOperator(BaseOperator):
@apply_defaults
def __init__(
self,
py_file,
job_name='{{task.task_id}}',
py_options=None,
dataflow_default_options=None,
options=None,
gcp_conn_id='google_cloud_default',
delegate_to=None,
poll_sleep=10,
py_file: str,
job_name: str = '{{task.task_id}}',
py_options: Optional[List[str]] = None,
dataflow_default_options: Optional[dict] = None,
options: Optional[dict] = None,
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
poll_sleep: int = 10,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
@ -417,11 +418,11 @@ class GoogleCloudBucketHelper:
GCS_PREFIX_LENGTH = 5
def __init__(self,
gcp_conn_id='google_cloud_default',
delegate_to=None):
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None) -> None:
self._gcs_hook = GoogleCloudStorageHook(gcp_conn_id, delegate_to)
def google_cloud_to_local(self, file_name):
def google_cloud_to_local(self, file_name: str) -> str:
"""
Checks whether the file specified by file_name is stored in Google Cloud
Storage (GCS), if so, downloads the file and saves it locally. The full

Просмотреть файл

@ -27,9 +27,10 @@ import os
import re
import time
import uuid
from datetime import timedelta
from datetime import datetime, timedelta
from typing import List, Dict, Set, Optional
from airflow.gcp.hooks.dataproc import DataProcHook
from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
@ -44,12 +45,12 @@ class DataprocOperationBaseOperator(BaseOperator):
"""
@apply_defaults
def __init__(self,
project_id,
region='global',
gcp_conn_id='google_cloud_default',
delegate_to=None,
project_id: str,
region: str = 'global',
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
@ -194,42 +195,42 @@ class DataprocClusterCreateOperator(DataprocOperationBaseOperator):
# pylint: disable=too-many-arguments,too-many-locals
@apply_defaults
def __init__(self,
project_id,
cluster_name,
num_workers,
zone=None,
network_uri=None,
subnetwork_uri=None,
internal_ip_only=None,
tags=None,
storage_bucket=None,
init_actions_uris=None,
init_action_timeout="10m",
metadata=None,
custom_image=None,
custom_image_project_id=None,
image_version=None,
autoscaling_policy=None,
properties=None,
optional_components=None,
num_masters=1,
master_machine_type='n1-standard-4',
master_disk_type='pd-standard',
master_disk_size=1024,
worker_machine_type='n1-standard-4',
worker_disk_type='pd-standard',
worker_disk_size=1024,
num_preemptible_workers=0,
labels=None,
region='global',
service_account=None,
service_account_scopes=None,
idle_delete_ttl=None,
auto_delete_time=None,
auto_delete_ttl=None,
customer_managed_key=None,
project_id: str,
cluster_name: str,
num_workers: int,
zone: Optional[str] = None,
network_uri: Optional[str] = None,
subnetwork_uri: Optional[str] = None,
internal_ip_only: Optional[bool] = None,
tags: Optional[List[str]] = None,
storage_bucket: Optional[str] = None,
init_actions_uris: Optional[List[str]] = None,
init_action_timeout: str = "10m",
metadata: Optional[Dict] = None,
custom_image: Optional[str] = None,
custom_image_project_id: Optional[str] = None,
image_version: Optional[str] = None,
autoscaling_policy: Optional[str] = None,
properties: Optional[Dict] = None,
optional_components: Optional[List[str]] = None,
num_masters: int = 1,
master_machine_type: str = 'n1-standard-4',
master_disk_type: str = 'pd-standard',
master_disk_size: int = 1024,
worker_machine_type: str = 'n1-standard-4',
worker_disk_type: str = 'pd-standard',
worker_disk_size: int = 1024,
num_preemptible_workers: int = 0,
labels: Optional[Dict] = None,
region: str = 'global',
service_account: Optional[str] = None,
service_account_scopes: Optional[List[str]] = None,
idle_delete_ttl: Optional[int] = None,
auto_delete_time: Optional[datetime] = None,
auto_delete_ttl: Optional[int] = None,
customer_managed_key: Optional[str] = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(project_id=project_id, region=region, *args, **kwargs)
self.cluster_name = cluster_name
@ -506,21 +507,21 @@ class DataprocClusterScaleOperator(DataprocOperationBaseOperator):
@apply_defaults
def __init__(self,
cluster_name,
project_id,
region='global',
num_workers=2,
num_preemptible_workers=0,
graceful_decommission_timeout=None,
cluster_name: str,
project_id: str,
region: str = 'global',
num_workers: int = 2,
num_preemptible_workers: int = 0,
graceful_decommission_timeout: Optional[str] = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(project_id=project_id, region=region, *args, **kwargs)
self.cluster_name = cluster_name
self.num_workers = num_workers
self.num_preemptible_workers = num_preemptible_workers
# Optional
self.optional_arguments = {}
self.optional_arguments = {} # type: Dict
if graceful_decommission_timeout:
self.optional_arguments['gracefulDecommissionTimeout'] = \
self._get_graceful_decommission_timeout(
@ -606,11 +607,11 @@ class DataprocClusterDeleteOperator(DataprocOperationBaseOperator):
@apply_defaults
def __init__(self,
cluster_name,
project_id,
region='global',
cluster_name: str,
project_id: str,
region: str = 'global',
*args,
**kwargs):
**kwargs) -> None:
super().__init__(project_id=project_id, region=region, *args, **kwargs)
self.cluster_name = cluster_name
@ -674,17 +675,17 @@ class DataProcJobBaseOperator(BaseOperator):
@apply_defaults
def __init__(self,
job_name='{{task.task_id}}_{{ds_nodash}}',
cluster_name="cluster-1",
dataproc_properties=None,
dataproc_jars=None,
gcp_conn_id='google_cloud_default',
delegate_to=None,
labels=None,
region='global',
job_error_states=None,
job_name: str = '{{task.task_id}}_{{ds_nodash}}',
cluster_name: str = "cluster-1",
dataproc_properties: Optional[Dict] = None,
dataproc_jars: Optional[List[str]] = None,
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
labels: Optional[Dict] = None,
region: str = 'global',
job_error_states: Optional[Set[str]] = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
@ -780,11 +781,11 @@ class DataProcPigOperator(DataProcJobBaseOperator):
@apply_defaults
def __init__(
self,
query=None,
query_uri=None,
variables=None,
query: Optional[str] = None,
query_uri: Optional[str] = None,
variables: Optional[Dict] = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
self.query = query
@ -823,11 +824,11 @@ class DataProcHiveOperator(DataProcJobBaseOperator):
@apply_defaults
def __init__(
self,
query=None,
query_uri=None,
variables=None,
query: Optional[str] = None,
query_uri: Optional[str] = None,
variables: Optional[Dict] = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
self.query = query
@ -867,11 +868,11 @@ class DataProcSparkSqlOperator(DataProcJobBaseOperator):
@apply_defaults
def __init__(
self,
query=None,
query_uri=None,
variables=None,
query: Optional[str] = None,
query_uri: Optional[str] = None,
variables: Optional[Dict] = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
self.query = query
@ -918,13 +919,13 @@ class DataProcSparkOperator(DataProcJobBaseOperator):
@apply_defaults
def __init__(
self,
main_jar=None,
main_class=None,
arguments=None,
archives=None,
files=None,
main_jar: Optional[str] = None,
main_class: Optional[str] = None,
arguments: Optional[List] = None,
archives: Optional[List] = None,
files: Optional[List] = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
self.main_jar = main_jar
@ -970,13 +971,13 @@ class DataProcHadoopOperator(DataProcJobBaseOperator):
@apply_defaults
def __init__(
self,
main_jar=None,
main_class=None,
arguments=None,
archives=None,
files=None,
main_jar: Optional[str] = None,
main_class: Optional[str] = None,
arguments: Optional[List] = None,
archives: Optional[List] = None,
files: Optional[List] = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
self.main_jar = main_jar
@ -1049,13 +1050,13 @@ class DataProcPySparkOperator(DataProcJobBaseOperator):
@apply_defaults
def __init__(
self,
main,
arguments=None,
archives=None,
pyfiles=None,
files=None,
main: str,
arguments: Optional[List] = None,
archives: Optional[List] = None,
pyfiles: Optional[List] = None,
files: List = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
self.main = main
@ -1118,7 +1119,7 @@ class DataprocWorkflowTemplateInstantiateOperator(DataprocOperationBaseOperator)
template_fields = ['template_id']
@apply_defaults
def __init__(self, template_id, parameters, *args, **kwargs):
def __init__(self, template_id: str, parameters: Dict[str, str], *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.template_id = template_id
self.parameters = parameters
@ -1148,7 +1149,7 @@ class DataprocWorkflowTemplateInstantiateInlineOperator(
https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.workflowTemplates/instantiateInline
:param template: The template contents. (templated)
:type template: map
:type template: dict
:param project_id: The ID of the google cloud project in which
the template runs
:type project_id: str
@ -1165,7 +1166,7 @@ class DataprocWorkflowTemplateInstantiateInlineOperator(
template_fields = ['template']
@apply_defaults
def __init__(self, template, *args, **kwargs):
def __init__(self, template: Dict, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.template = template

Просмотреть файл

@ -20,6 +20,7 @@
"""
This module contains Google Datastore operators.
"""
from typing import Optional
from airflow.gcp.hooks.datastore import DatastoreHook
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
@ -63,18 +64,18 @@ class DatastoreExportOperator(BaseOperator):
@apply_defaults
def __init__(self, # pylint:disable=too-many-arguments
bucket,
namespace=None,
datastore_conn_id='google_cloud_default',
cloud_storage_conn_id='google_cloud_default',
delegate_to=None,
entity_filter=None,
labels=None,
polling_interval_in_seconds=10,
overwrite_existing=False,
project_id=None,
bucket: str,
namespace: Optional[str] = None,
datastore_conn_id: str = 'google_cloud_default',
cloud_storage_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
entity_filter: Optional[dict] = None,
labels: Optional[dict] = None,
polling_interval_in_seconds: int = 10,
overwrite_existing: bool = False,
project_id: Optional[str] = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
self.datastore_conn_id = datastore_conn_id
self.cloud_storage_conn_id = cloud_storage_conn_id
@ -141,24 +142,24 @@ class DatastoreImportOperator(BaseOperator):
:type delegate_to: str
:param polling_interval_in_seconds: number of seconds to wait before polling for
execution status again
:type polling_interval_in_seconds: int
:type polling_interval_in_seconds: float
"""
template_fields = ['bucket', 'file', 'namespace', 'entity_filter', 'labels']
@apply_defaults
def __init__(self,
bucket,
file,
namespace=None,
entity_filter=None,
labels=None,
datastore_conn_id='google_cloud_default',
delegate_to=None,
polling_interval_in_seconds=10,
project_id=None,
bucket: str,
file: str,
namespace: Optional[str] = None,
entity_filter: Optional[dict] = None,
labels: Optional[dict] = None,
datastore_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
polling_interval_in_seconds: float = 10,
project_id: Optional[str] = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
self.datastore_conn_id = datastore_conn_id
self.delegate_to = delegate_to

Просмотреть файл

@ -23,6 +23,7 @@ This module contains various GCP Cloud DLP operators
which allow you to perform basic operations using
Cloud DLP.
"""
from typing import Optional
from airflow.gcp.hooks.dlp import CloudDLPHook
from airflow.models import BaseOperator
@ -58,11 +59,11 @@ class CloudDLPCancelDLPJobOperator(BaseOperator):
def __init__(
self,
dlp_job_id,
project_id=None,
project_id: Optional[str] = None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -127,13 +128,13 @@ class CloudDLPCreateDeidentifyTemplateOperator(BaseOperator):
def __init__(
self,
organization_id=None,
project_id=None,
project_id: Optional[str] = None,
deidentify_template=None,
template_id=None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -196,15 +197,15 @@ class CloudDLPCreateDLPJobOperator(BaseOperator):
@apply_defaults
def __init__(
self,
project_id=None,
project_id: Optional[str] = None,
inspect_job=None,
risk_job=None,
job_id=None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
wait_until_finished=True,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -275,13 +276,13 @@ class CloudDLPCreateInspectTemplateOperator(BaseOperator):
def __init__(
self,
organization_id=None,
project_id=None,
project_id: Optional[str] = None,
inspect_template=None,
template_id=None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -340,13 +341,13 @@ class CloudDLPCreateJobTriggerOperator(BaseOperator):
@apply_defaults
def __init__(
self,
project_id=None,
project_id: Optional[str] = None,
job_trigger=None,
trigger_id=None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -412,13 +413,13 @@ class CloudDLPCreateStoredInfoTypeOperator(BaseOperator):
def __init__(
self,
organization_id=None,
project_id=None,
project_id: Optional[str] = None,
config=None,
stored_info_type_id=None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -497,16 +498,16 @@ class CloudDLPDeidentifyContentOperator(BaseOperator):
@apply_defaults
def __init__(
self,
project_id=None,
project_id: Optional[str] = None,
deidentify_config=None,
inspect_config=None,
item=None,
inspect_template_name=None,
deidentify_template_name=None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -570,11 +571,11 @@ class CloudDLPDeleteDeidentifyTemplateOperator(BaseOperator):
self,
template_id,
organization_id=None,
project_id=None,
project_id: Optional[str] = None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -629,11 +630,11 @@ class CloudDLPDeleteDlpJobOperator(BaseOperator):
def __init__(
self,
dlp_job_id,
project_id=None,
project_id: Optional[str] = None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -689,11 +690,11 @@ class CloudDLPDeleteInspectTemplateOperator(BaseOperator):
self,
template_id,
organization_id=None,
project_id=None,
project_id: Optional[str] = None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -747,11 +748,11 @@ class CloudDLPDeleteJobTriggerOperator(BaseOperator):
def __init__(
self,
job_trigger_id,
project_id=None,
project_id: Optional[str] = None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -812,11 +813,11 @@ class CloudDLPDeleteStoredInfoTypeOperator(BaseOperator):
self,
stored_info_type_id,
organization_id=None,
project_id=None,
project_id: Optional[str] = None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -875,11 +876,11 @@ class CloudDLPGetDeidentifyTemplateOperator(BaseOperator):
self,
template_id,
organization_id=None,
project_id=None,
project_id: Optional[str] = None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -934,11 +935,11 @@ class CloudDLPGetDlpJobOperator(BaseOperator):
def __init__(
self,
dlp_job_id,
project_id=None,
project_id: Optional[str] = None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -995,11 +996,11 @@ class CloudDLPGetInspectTemplateOperator(BaseOperator):
self,
template_id,
organization_id=None,
project_id=None,
project_id: Optional[str] = None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -1054,11 +1055,11 @@ class CloudDLPGetJobTripperOperator(BaseOperator):
def __init__(
self,
job_trigger_id,
project_id=None,
project_id: Optional[str] = None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -1120,11 +1121,11 @@ class CloudDLPGetStoredInfoTypeOperator(BaseOperator):
self,
stored_info_type_id,
organization_id=None,
project_id=None,
project_id: Optional[str] = None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -1191,14 +1192,14 @@ class CloudDLPInspectContentOperator(BaseOperator):
@apply_defaults
def __init__(
self,
project_id=None,
project_id: Optional[str] = None,
inspect_config=None,
item=None,
inspect_template_name=None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -1262,13 +1263,13 @@ class CloudDLPListDeidentifyTemplatesOperator(BaseOperator):
def __init__(
self,
organization_id=None,
project_id=None,
project_id: Optional[str] = None,
page_size=None,
order_by=None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -1332,15 +1333,15 @@ class CloudDLPListDlpJobsOperator(BaseOperator):
@apply_defaults
def __init__(
self,
project_id=None,
project_id: Optional[str] = None,
results_filter=None,
page_size=None,
job_type=None,
order_by=None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -1401,9 +1402,9 @@ class CloudDLPListInfoTypesOperator(BaseOperator):
language_code=None,
results_filter=None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -1463,13 +1464,13 @@ class CloudDLPListInspectTemplatesOperator(BaseOperator):
def __init__(
self,
organization_id=None,
project_id=None,
project_id: Optional[str] = None,
page_size=None,
order_by=None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -1531,14 +1532,14 @@ class CloudDLPListJobTriggersOperator(BaseOperator):
@apply_defaults
def __init__(
self,
project_id=None,
project_id: Optional[str] = None,
page_size=None,
order_by=None,
results_filter=None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -1602,13 +1603,13 @@ class CloudDLPListStoredInfoTypesOperator(BaseOperator):
def __init__(
self,
organization_id=None,
project_id=None,
project_id: Optional[str] = None,
page_size=None,
order_by=None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -1681,15 +1682,15 @@ class CloudDLPRedactImageOperator(BaseOperator):
@apply_defaults
def __init__(
self,
project_id=None,
project_id: Optional[str] = None,
inspect_config=None,
image_redaction_configs=None,
include_findings=None,
byte_item=None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -1767,16 +1768,16 @@ class CloudDLPReidentifyContentOperator(BaseOperator):
@apply_defaults
def __init__(
self,
project_id=None,
project_id: Optional[str] = None,
reidentify_config=None,
inspect_config=None,
item=None,
inspect_template_name=None,
reidentify_template_name=None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -1852,13 +1853,13 @@ class CloudDLPUpdateDeidentifyTemplateOperator(BaseOperator):
self,
template_id,
organization_id=None,
project_id=None,
project_id: Optional[str] = None,
deidentify_template=None,
update_mask=None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -1932,13 +1933,13 @@ class CloudDLPUpdateInspectTemplateOperator(BaseOperator):
self,
template_id,
organization_id=None,
project_id=None,
project_id: Optional[str] = None,
inspect_template=None,
update_mask=None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -2007,13 +2008,13 @@ class CloudDLPUpdateJobTriggerOperator(BaseOperator):
def __init__(
self,
job_trigger_id,
project_id=None,
project_id: Optional[str] = None,
job_trigger=None,
update_mask=None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
@ -2086,13 +2087,13 @@ class CloudDLPUpdateStoredInfoTypeOperator(BaseOperator):
self,
stored_info_type_id,
organization_id=None,
project_id=None,
project_id: Optional[str] = None,
config=None,
update_mask=None,
retry=None,
timeout=None,
timeout: Optional[float] = None,
metadata=None,
gcp_conn_id="google_cloud_default",
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):

Просмотреть файл

@ -21,7 +21,7 @@ This module contains Google Cloud Functions operators.
"""
import re
from typing import Optional, Dict
from typing import Optional, List, Dict, Any
from googleapiclient.errors import HttpError
@ -80,7 +80,7 @@ CLOUD_FUNCTION_VALIDATION = [
])
])
]),
]
] # type: List[Dict[str, Any]]
class GcfFunctionDeployOperator(BaseOperator):
@ -123,14 +123,14 @@ class GcfFunctionDeployOperator(BaseOperator):
@apply_defaults
def __init__(self,
location,
body,
project_id=None,
gcp_conn_id='google_cloud_default',
api_version='v1',
zip_path=None,
validate_body=True,
*args, **kwargs):
location: str,
body: Dict,
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
api_version: str = 'v1',
zip_path: Optional[str] = None,
validate_body: bool = True,
*args, **kwargs) -> None:
self.project_id = project_id
self.location = location
self.body = body
@ -138,7 +138,7 @@ class GcfFunctionDeployOperator(BaseOperator):
self.api_version = api_version
self.zip_path = zip_path
self.zip_path_preprocessor = ZipPathPreprocessor(body, zip_path)
self._field_validator = None
self._field_validator = None # type: Optional[GcpBodyFieldValidator]
if validate_body:
self._field_validator = GcpBodyFieldValidator(CLOUD_FUNCTION_VALIDATION,
api_version=api_version)
@ -223,13 +223,16 @@ class ZipPathPreprocessor:
:param body: Body passed to the create/update method calls.
:type body: dict
:param zip_path: path to the zip file containing source code.
:type body: dict
:param zip_path: (optional) Path to zip file containing source code of the function. If the path
is set, the sourceUploadUrl should not be specified in the body or it should
be empty. Then the zip file will be uploaded using the upload URL generated
via generateUploadUrl from the Cloud Functions API.
:type zip_path: str
"""
upload_function = None # type: Optional[bool]
def __init__(self, body, zip_path):
def __init__(self, body: dict, zip_path: Optional[str] = None) -> None:
self.body = body
self.zip_path = zip_path
@ -312,10 +315,10 @@ class GcfFunctionDeleteOperator(BaseOperator):
@apply_defaults
def __init__(self,
name,
gcp_conn_id='google_cloud_default',
api_version='v1',
*args, **kwargs):
name: str,
gcp_conn_id: str = 'google_cloud_default',
api_version: str = 'v1',
*args, **kwargs) -> None:
self.name = name
self.gcp_conn_id = gcp_conn_id
self.api_version = api_version

Просмотреть файл

@ -24,8 +24,10 @@ This module contains Google Kubernetes Engine operators.
import os
import subprocess
import tempfile
from typing import Union, Dict, Optional
from google.auth.environment_vars import CREDENTIALS
from google.cloud.container_v1.types import Cluster
from airflow import AirflowException
from airflow.gcp.hooks.kubernetes_engine import GKEClusterHook
@ -69,13 +71,13 @@ class GKEClusterDeleteOperator(BaseOperator):
@apply_defaults
def __init__(self,
project_id,
name,
location,
gcp_conn_id='google_cloud_default',
api_version='v2',
project_id: str,
name: str,
location: str,
gcp_conn_id: str = 'google_cloud_default',
api_version: str = 'v2',
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
self.project_id = project_id
@ -144,13 +146,13 @@ class GKEClusterCreateOperator(BaseOperator):
@apply_defaults
def __init__(self,
project_id,
location,
body,
gcp_conn_id='google_cloud_default',
api_version='v2',
project_id: str,
location: str,
body: Optional[Union[Dict, Cluster]],
gcp_conn_id: str = 'google_cloud_default',
api_version: str = 'v2',
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
self.project_id = project_id
@ -224,10 +226,10 @@ class GKEPodOperator(KubernetesPodOperator):
@apply_defaults
def __init__(self,
project_id,
location,
cluster_name,
gcp_conn_id='google_cloud_default',
project_id: str,
location: str,
cluster_name: str,
gcp_conn_id: str = 'google_cloud_default',
*args,
**kwargs):
super().__init__(*args, **kwargs)

Просмотреть файл

@ -17,7 +17,7 @@
This module contains GCP MLEngine operators.
"""
import re
from typing import List, Optional
from airflow.gcp.hooks.mlengine import MLEngineHook
from airflow.exceptions import AirflowException
@ -28,7 +28,7 @@ from airflow.utils.log.logging_mixin import LoggingMixin
log = LoggingMixin().log
def _normalize_mlengine_job_id(job_id):
def _normalize_mlengine_job_id(job_id: str) -> str:
"""
Replaces invalid MLEngine job_id characters with '_'.
@ -173,22 +173,22 @@ class MLEngineBatchPredictionOperator(BaseOperator):
@apply_defaults
def __init__(self, # pylint:disable=too-many-arguments
project_id,
job_id,
region,
data_format,
input_paths,
output_path,
model_name=None,
version_name=None,
uri=None,
max_worker_count=None,
runtime_version=None,
signature_name=None,
gcp_conn_id='google_cloud_default',
delegate_to=None,
project_id: str,
job_id: str,
region: str,
data_format: str,
input_paths: List[str],
output_path: str,
model_name: Optional[str] = None,
version_name: Optional[str] = None,
uri: Optional[str] = None,
max_worker_count: Optional[int] = None,
runtime_version: Optional[str] = None,
signature_name: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
self._project_id = project_id
@ -317,13 +317,13 @@ class MLEngineModelOperator(BaseOperator):
@apply_defaults
def __init__(self,
project_id,
model,
operation='create',
gcp_conn_id='google_cloud_default',
delegate_to=None,
project_id: str,
model: dict,
operation: str = 'create',
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
self._project_id = project_id
self._model = model
@ -406,15 +406,15 @@ class MLEngineVersionOperator(BaseOperator):
@apply_defaults
def __init__(self,
project_id,
model_name,
version_name=None,
version=None,
operation='create',
gcp_conn_id='google_cloud_default',
delegate_to=None,
project_id: str,
model_name: str,
version_name: Optional[str] = None,
version: Optional[dict] = None,
operation: str = 'create',
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
self._project_id = project_id
@ -528,22 +528,22 @@ class MLEngineTrainingOperator(BaseOperator):
@apply_defaults
def __init__(self, # pylint:disable=too-many-arguments
project_id,
job_id,
package_uris,
training_python_module,
training_args,
region,
scale_tier=None,
master_type=None,
runtime_version=None,
python_version=None,
job_dir=None,
gcp_conn_id='google_cloud_default',
delegate_to=None,
mode='PRODUCTION',
project_id: str,
job_id: str,
package_uris: str,
training_python_module: str,
training_args: str,
region: str,
scale_tier: Optional[str] = None,
master_type: Optional[str] = None,
runtime_version: Optional[str] = None,
python_version: Optional[str] = None,
job_dir: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
mode: str = 'PRODUCTION',
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
self._project_id = project_id
self._job_id = job_id

Просмотреть файл

@ -19,11 +19,18 @@
"""
This module contains Google Cloud Language operators.
"""
from typing import Union, Tuple, Sequence, Optional
from google.protobuf.json_format import MessageToDict
from google.cloud.language_v1.types import Document
from google.cloud.language_v1 import enums
from google.api_core.retry import Retry
from airflow.gcp.hooks.natural_language import CloudNaturalLanguageHook
from airflow.models import BaseOperator
MetaData = Sequence[Tuple[str, str]]
class CloudLanguageAnalyzeEntitiesOperator(BaseOperator):
"""
@ -38,14 +45,14 @@ class CloudLanguageAnalyzeEntitiesOperator(BaseOperator):
If a dict is provided, it must be of the same form as the protobuf message Document
:type document: dict or google.cloud.language_v1.types.Document
:param encoding_type: The encoding type used by the API to calculate offsets.
:type encoding_type: google.cloud.language_v1.types.EncodingType
:type encoding_type: google.cloud.language_v1.enums.EncodingType
:param retry: A retry object used to retry requests. If None is specified, requests will not be
retried.
:param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
retry is specified, the timeout applies to each individual attempt.
:type timeout: float
:param metadata: Additional metadata that is provided to the method.
:type metadata: seq[tuple[str, str]]]
:type metadata: Sequence[Tuple[str, str]]
:param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
:type gcp_conn_id: str
"""
@ -55,15 +62,15 @@ class CloudLanguageAnalyzeEntitiesOperator(BaseOperator):
def __init__(
self,
document,
encoding_type=None,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id="google_cloud_default",
document: Union[dict, Document],
encoding_type: Optional[enums.EncodingType] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.document = document
self.encoding_type = encoding_type
@ -97,14 +104,14 @@ class CloudLanguageAnalyzeEntitySentimentOperator(BaseOperator):
If a dict is provided, it must be of the same form as the protobuf message Document
:type document: dict or google.cloud.language_v1.types.Document
:param encoding_type: The encoding type used by the API to calculate offsets.
:type encoding_type: google.cloud.language_v1.types.EncodingType
:type encoding_type: google.cloud.language_v1.enums.EncodingType
:param retry: A retry object used to retry requests. If None is specified, requests will not be
retried.
:param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
retry is specified, the timeout applies to each individual attempt.
:type timeout: float
:param metadata: Additional metadata that is provided to the method.
:type metadata: seq[tuple[str, str]]]
:type metadata: Sequence[Tuple[str, str]]]
:rtype: google.cloud.language_v1.types.AnalyzeEntitiesResponse
:param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
:type gcp_conn_id: str
@ -115,15 +122,15 @@ class CloudLanguageAnalyzeEntitySentimentOperator(BaseOperator):
def __init__(
self,
document,
encoding_type=None,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id="google_cloud_default",
document: Union[dict, Document],
encoding_type: Optional[enums.EncodingType] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.document = document
self.encoding_type = encoding_type
@ -160,7 +167,7 @@ class CloudLanguageAnalyzeSentimentOperator(BaseOperator):
If a dict is provided, it must be of the same form as the protobuf message Document
:type document: dict or google.cloud.language_v1.types.Document
:param encoding_type: The encoding type used by the API to calculate offsets.
:type encoding_type: google.cloud.language_v1.types.EncodingType
:type encoding_type: google.cloud.language_v1.enums.EncodingType
:param retry: A retry object used to retry requests. If None is specified, requests will not be
retried.
:param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
@ -178,15 +185,15 @@ class CloudLanguageAnalyzeSentimentOperator(BaseOperator):
def __init__(
self,
document,
encoding_type=None,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id="google_cloud_default",
document: Union[dict, Document],
encoding_type: Optional[enums.EncodingType] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.document = document
self.encoding_type = encoding_type
@ -234,14 +241,14 @@ class CloudLanguageClassifyTextOperator(BaseOperator):
def __init__(
self,
document,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id="google_cloud_default",
document: Union[dict, Document],
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.document = document
self.retry = retry

Просмотреть файл

@ -19,6 +19,8 @@
"""
This module contains Google PubSub operators.
"""
from typing import List, Optional
from airflow.gcp.hooks.pubsub import PubSubHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
@ -75,13 +77,13 @@ class PubSubTopicCreateOperator(BaseOperator):
@apply_defaults
def __init__(
self,
project,
topic,
fail_if_exists=False,
gcp_conn_id='google_cloud_default',
delegate_to=None,
project: str,
topic: str,
fail_if_exists: bool = False,
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
self.project = project
@ -177,15 +179,15 @@ class PubSubSubscriptionCreateOperator(BaseOperator):
def __init__(
self,
topic_project,
topic,
topic: str,
subscription=None,
subscription_project=None,
ack_deadline_secs=10,
fail_if_exists=False,
gcp_conn_id='google_cloud_default',
delegate_to=None,
ack_deadline_secs: int = 10,
fail_if_exists: bool = False,
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
self.topic_project = topic_project
self.topic = topic
@ -256,13 +258,13 @@ class PubSubTopicDeleteOperator(BaseOperator):
@apply_defaults
def __init__(
self,
project,
topic,
project: str,
topic: str,
fail_if_not_exists=False,
gcp_conn_id='google_cloud_default',
delegate_to=None,
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
self.project = project
@ -331,13 +333,13 @@ class PubSubSubscriptionDeleteOperator(BaseOperator):
@apply_defaults
def __init__(
self,
project,
subscription,
project: str,
subscription: str,
fail_if_not_exists=False,
gcp_conn_id='google_cloud_default',
delegate_to=None,
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
self.project = project
@ -408,13 +410,13 @@ class PubSubPublishOperator(BaseOperator):
@apply_defaults
def __init__(
self,
project,
topic,
messages,
gcp_conn_id='google_cloud_default',
delegate_to=None,
project: str,
topic: str,
messages: List,
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(*args, **kwargs)
self.gcp_conn_id = gcp_conn_id

Просмотреть файл

@ -19,6 +19,7 @@
"""
This module contains Google Spanner operators.
"""
from typing import List, Optional
from airflow import AirflowException
from airflow.gcp.hooks.spanner import CloudSpannerHook
@ -57,13 +58,13 @@ class CloudSpannerInstanceDeployOperator(BaseOperator):
@apply_defaults
def __init__(self,
instance_id,
configuration_name,
node_count,
display_name,
project_id=None,
gcp_conn_id='google_cloud_default',
*args, **kwargs):
instance_id: int,
configuration_name: str,
node_count: str,
display_name: str,
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
*args, **kwargs) -> None:
self.instance_id = instance_id
self.project_id = project_id
self.configuration_name = configuration_name
@ -118,10 +119,10 @@ class CloudSpannerInstanceDeleteOperator(BaseOperator):
@apply_defaults
def __init__(self,
instance_id,
project_id=None,
gcp_conn_id='google_cloud_default',
*args, **kwargs):
instance_id: int,
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
*args, **kwargs) -> None:
self.instance_id = instance_id
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
@ -174,12 +175,12 @@ class CloudSpannerInstanceDatabaseQueryOperator(BaseOperator):
@apply_defaults
def __init__(self,
instance_id,
instance_id: int,
database_id,
query,
project_id=None,
gcp_conn_id='google_cloud_default',
*args, **kwargs):
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
*args, **kwargs) -> None:
self.instance_id = instance_id
self.project_id = project_id
self.database_id = database_id
@ -257,12 +258,12 @@ class CloudSpannerInstanceDatabaseDeployOperator(BaseOperator):
@apply_defaults
def __init__(self,
instance_id,
database_id,
ddl_statements,
project_id=None,
gcp_conn_id='google_cloud_default',
*args, **kwargs):
instance_id: int,
database_id: str,
ddl_statements: List[str],
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
*args, **kwargs) -> None:
self.instance_id = instance_id
self.project_id = project_id
self.database_id = database_id
@ -331,13 +332,13 @@ class CloudSpannerInstanceDatabaseUpdateOperator(BaseOperator):
@apply_defaults
def __init__(self,
instance_id,
database_id,
ddl_statements,
project_id=None,
operation_id=None,
gcp_conn_id='google_cloud_default',
*args, **kwargs):
instance_id: int,
database_id: str,
ddl_statements: List[str],
project_id: Optional[str] = None,
operation_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
*args, **kwargs) -> None:
self.instance_id = instance_id
self.project_id = project_id
self.database_id = database_id
@ -403,11 +404,11 @@ class CloudSpannerInstanceDatabaseDeleteOperator(BaseOperator):
@apply_defaults
def __init__(self,
instance_id,
database_id,
project_id=None,
gcp_conn_id='google_cloud_default',
*args, **kwargs):
instance_id: int,
database_id: str,
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
*args, **kwargs) -> None:
self.instance_id = instance_id
self.project_id = project_id
self.database_id = database_id

Просмотреть файл

@ -19,9 +19,13 @@
"""
This module contains a Google Speech to Text operator.
"""
from typing import Optional
from google.api_core.retry import Retry
from google.cloud.speech_v1.types import RecognitionConfig
from airflow import AirflowException
from airflow.gcp.hooks.speech_to_text import GCPSpeechToTextHook
from airflow.gcp.hooks.speech_to_text import RecognitionAudio, GCPSpeechToTextHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
@ -61,15 +65,15 @@ class GcpSpeechToTextRecognizeSpeechOperator(BaseOperator):
@apply_defaults
def __init__(
self,
audio,
config,
project_id=None,
gcp_conn_id="google_cloud_default",
retry=None,
timeout=None,
audio: RecognitionAudio,
config: RecognitionConfig,
project_id: Optional[str] = None,
gcp_conn_id: str = "google_cloud_default",
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
*args,
**kwargs
):
) -> None:
self.audio = audio
self.config = config
self.project_id = project_id

Просмотреть файл

@ -22,11 +22,18 @@ This module contains various GCP Cloud Tasks operators
which allow you to perform basic operations using
Cloud Tasks queues/tasks.
"""
from typing import Tuple, Sequence, Union, Dict, Optional
from google.api_core.retry import Retry
from google.cloud.tasks_v2.types import Queue, FieldMask, Task
from google.cloud.tasks_v2 import enums
from airflow.gcp.hooks.tasks import CloudTasksHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
MetaData = Sequence[Tuple[str, str]]
class CloudTasksQueueCreateOperator(BaseOperator):
"""
@ -69,17 +76,17 @@ class CloudTasksQueueCreateOperator(BaseOperator):
@apply_defaults
def __init__(
self,
location,
task_queue,
project_id=None,
queue_name=None,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id="google_cloud_default",
location: str,
task_queue: Queue,
project_id: Optional[str] = None,
queue_name: Optional[str] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.location = location
self.task_queue = task_queue
@ -150,18 +157,18 @@ class CloudTasksQueueUpdateOperator(BaseOperator):
@apply_defaults
def __init__(
self,
task_queue,
project_id=None,
location=None,
queue_name=None,
update_mask=None,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id="google_cloud_default",
task_queue: Queue,
project_id: Optional[str] = None,
location: Optional[str] = None,
queue_name: Optional[str] = None,
update_mask: Union[Dict, FieldMask] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.task_queue = task_queue
self.project_id = project_id
@ -217,16 +224,16 @@ class CloudTasksQueueGetOperator(BaseOperator):
@apply_defaults
def __init__(
self,
location,
queue_name,
project_id=None,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id="google_cloud_default",
location: str,
queue_name: str,
project_id: Optional[str] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.location = location
self.queue_name = queue_name
@ -282,17 +289,17 @@ class CloudTasksQueuesListOperator(BaseOperator):
@apply_defaults
def __init__(
self,
location,
project_id=None,
location: str,
project_id: Optional[str] = None,
results_filter=None,
page_size=None,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id="google_cloud_default",
page_size: Optional[int] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.location = location
self.project_id = project_id
@ -345,16 +352,16 @@ class CloudTasksQueueDeleteOperator(BaseOperator):
@apply_defaults
def __init__(
self,
location,
queue_name,
project_id=None,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id="google_cloud_default",
location: str,
queue_name: str,
project_id: Optional[str] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.location = location
self.queue_name = queue_name
@ -406,16 +413,16 @@ class CloudTasksQueuePurgeOperator(BaseOperator):
@apply_defaults
def __init__(
self,
location,
queue_name,
project_id=None,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id="google_cloud_default",
location: str,
queue_name: str,
project_id: Optional[str] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.location = location
self.queue_name = queue_name
@ -467,16 +474,16 @@ class CloudTasksQueuePauseOperator(BaseOperator):
@apply_defaults
def __init__(
self,
location,
queue_name,
project_id=None,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id="google_cloud_default",
location: str,
queue_name: str,
project_id: Optional[str] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.location = location
self.queue_name = queue_name
@ -528,16 +535,16 @@ class CloudTasksQueueResumeOperator(BaseOperator):
@apply_defaults
def __init__(
self,
location,
queue_name,
project_id=None,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id="google_cloud_default",
location: str,
queue_name: str,
project_id: Optional[str] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.location = location
self.queue_name = queue_name
@ -578,7 +585,7 @@ class CloudTasksTaskCreateOperator(BaseOperator):
:type task_name: str
:param response_view: (Optional) This field specifies which subset of the Task will
be returned.
:type response_view: google.cloud.tasks_v2.types.Task.View
:type response_view: google.cloud.tasks_v2.enums.Task.View
:param retry: (Optional) A retry object used to retry requests.
If None is specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
@ -603,21 +610,21 @@ class CloudTasksTaskCreateOperator(BaseOperator):
)
@apply_defaults
def __init__(
def __init__( # pylint: disable=too-many-arguments
self,
location,
queue_name,
task,
project_id=None,
task_name=None,
response_view=None,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id="google_cloud_default",
location: str,
queue_name: str,
task: Union[Dict, Task],
project_id: Optional[str] = None,
task_name: Optional[str] = None,
response_view: Optional[enums.Task.View] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
): # pylint: disable=too-many-arguments
) -> None:
super().__init__(*args, **kwargs)
self.location = location
self.queue_name = queue_name
@ -660,7 +667,7 @@ class CloudTasksTaskGetOperator(BaseOperator):
:type project_id: str
:param response_view: (Optional) This field specifies which subset of the Task will
be returned.
:type response_view: google.cloud.tasks_v2.types.Task.View
:type response_view: google.cloud.tasks_v2.enums.Task.View
:param retry: (Optional) A retry object used to retry requests.
If None is specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
@ -686,18 +693,18 @@ class CloudTasksTaskGetOperator(BaseOperator):
@apply_defaults
def __init__(
self,
location,
queue_name,
task_name,
project_id=None,
response_view=None,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id="google_cloud_default",
location: str,
queue_name: str,
task_name: str,
project_id: Optional[str] = None,
response_view: Optional[enums.Task.View] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.location = location
self.queue_name = queue_name
@ -736,7 +743,7 @@ class CloudTasksTasksListOperator(BaseOperator):
:type project_id: str
:param response_view: (Optional) This field specifies which subset of the Task will
be returned.
:type response_view: google.cloud.tasks_v2.types.Task.View
:type response_view: google.cloud.tasks_v2.enums.Task.View
:param page_size: (Optional) The maximum number of resources contained in the
underlying API response.
:type page_size: int
@ -759,18 +766,18 @@ class CloudTasksTasksListOperator(BaseOperator):
@apply_defaults
def __init__(
self,
location,
queue_name,
project_id=None,
response_view=None,
page_size=None,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id="google_cloud_default",
location: str,
queue_name: str,
project_id: Optional[str] = None,
response_view: Optional[enums.Task.View] = None,
page_size: Optional[int] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.location = location
self.queue_name = queue_name
@ -833,17 +840,17 @@ class CloudTasksTaskDeleteOperator(BaseOperator):
@apply_defaults
def __init__(
self,
location,
queue_name,
task_name,
project_id=None,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id="google_cloud_default",
location: str,
queue_name: str,
task_name: str,
project_id: Optional[str] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.location = location
self.queue_name = queue_name
@ -882,7 +889,7 @@ class CloudTasksTaskRunOperator(BaseOperator):
:type project_id: str
:param response_view: (Optional) This field specifies which subset of the Task will
be returned.
:type response_view: google.cloud.tasks_v2.types.Task.View
:type response_view: google.cloud.tasks_v2.enums.Task.View
:param retry: (Optional) A retry object used to retry requests.
If None is specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
@ -908,18 +915,18 @@ class CloudTasksTaskRunOperator(BaseOperator):
@apply_defaults
def __init__(
self,
location,
queue_name,
task_name,
project_id=None,
response_view=None,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id="google_cloud_default",
location: str,
queue_name: str,
task_name: str,
project_id: Optional[str] = None,
response_view: Optional[enums.Task.View] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.location = location
self.queue_name = queue_name

Просмотреть файл

@ -21,6 +21,10 @@ This module contains a Google Text to Speech operator.
"""
from tempfile import NamedTemporaryFile
from typing import Dict, Union, Optional
from google.api_core.retry import Retry
from google.cloud.texttospeech_v1.types import SynthesisInput, VoiceSelectionParams, AudioConfig
from airflow import AirflowException
from airflow.gcp.hooks.text_to_speech import GCPTextToSpeechHook
@ -79,18 +83,18 @@ class GcpTextToSpeechSynthesizeOperator(BaseOperator):
@apply_defaults
def __init__(
self,
input_data,
voice,
audio_config,
target_bucket_name,
target_filename,
project_id=None,
gcp_conn_id="google_cloud_default",
retry=None,
timeout=None,
input_data: Union[Dict, SynthesisInput],
voice: Union[Dict, VoiceSelectionParams],
audio_config: Union[Dict, AudioConfig],
target_bucket_name: str,
target_filename: str,
project_id: Optional[str] = None,
gcp_conn_id: str = "google_cloud_default",
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
*args,
**kwargs
):
) -> None:
self.input_data = input_data
self.voice = voice
self.audio_config = audio_config

Просмотреть файл

@ -19,6 +19,7 @@
"""
This module contains Google Translate operators.
"""
from typing import List, Union
from airflow import AirflowException
from airflow.gcp.hooks.translate import CloudTranslateHook
@ -80,15 +81,15 @@ class CloudTranslateTextOperator(BaseOperator):
@apply_defaults
def __init__(
self,
values,
target_language,
format_,
source_language,
model,
values: Union[List[str], str],
target_language: str,
format_: str,
source_language: str,
model: str,
gcp_conn_id='google_cloud_default',
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.values = values
self.target_language = target_language

Просмотреть файл

@ -19,7 +19,10 @@
"""
This module contains a Google Cloud Translate Speech operator.
"""
from typing import Optional
from google.protobuf.json_format import MessageToDict
from google.cloud.speech_v1.types import RecognitionAudio, RecognitionConfig
from airflow import AirflowException
from airflow.gcp.hooks.speech_to_text import GCPSpeechToTextHook
@ -102,17 +105,17 @@ class GcpTranslateSpeechOperator(BaseOperator):
@apply_defaults
def __init__(
self,
audio,
config,
target_language,
format_,
source_language,
model,
project_id=None,
audio: RecognitionAudio,
config: RecognitionConfig,
target_language: str,
format_: str,
source_language: str,
model: str,
project_id: Optional[str] = None,
gcp_conn_id='google_cloud_default',
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.audio = audio
self.config = config

Просмотреть файл

@ -19,9 +19,12 @@
"""
This module contains Google Cloud Vision operators.
"""
from typing import Dict, Union, Optional
from google.api_core.retry import Retry
from google.protobuf.json_format import MessageToDict
from google.cloud.videointelligence_v1 import enums
from google.cloud.videointelligence_v1.types import VideoContext
from airflow.gcp.hooks.video_intelligence import CloudVideoIntelligenceHook
from airflow.models import BaseOperator
@ -68,17 +71,17 @@ class CloudVideoIntelligenceDetectVideoLabelsOperator(BaseOperator):
def __init__(
self,
input_uri,
input_content=None,
output_uri=None,
video_context=None,
location=None,
retry=None,
timeout=None,
gcp_conn_id="google_cloud_default",
input_uri: str,
input_content: Optional[bytes] = None,
output_uri: Optional[str] = None,
video_context: Union[Dict, VideoContext] = None,
location: Optional[str] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.input_uri = input_uri
self.input_content = input_content
@ -147,17 +150,17 @@ class CloudVideoIntelligenceDetectVideoExplicitContentOperator(BaseOperator):
def __init__(
self,
input_uri,
output_uri=None,
input_content=None,
video_context=None,
location=None,
retry=None,
timeout=None,
gcp_conn_id="google_cloud_default",
input_uri: str,
output_uri: Optional[str] = None,
input_content: Optional[bytes] = None,
video_context: Union[Dict, VideoContext] = None,
location: Optional[str] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.input_uri = input_uri
self.output_uri = output_uri
@ -226,17 +229,17 @@ class CloudVideoIntelligenceDetectVideoShotsOperator(BaseOperator):
def __init__(
self,
input_uri,
output_uri=None,
input_content=None,
video_context=None,
location=None,
retry=None,
timeout=None,
gcp_conn_id="google_cloud_default",
input_uri: str,
output_uri: Optional[str] = None,
input_content: Optional[bytes] = None,
video_context: Union[Dict, VideoContext] = None,
location: Optional[str] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.input_uri = input_uri
self.output_uri = output_uri

Просмотреть файл

@ -38,6 +38,8 @@ from airflow.gcp.hooks.vision import CloudVisionHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
MetaData = Sequence[Tuple[str, str]]
class CloudVisionProductSetCreateOperator(BaseOperator):
"""
@ -82,15 +84,15 @@ class CloudVisionProductSetCreateOperator(BaseOperator):
self,
product_set: Union[dict, ProductSet],
location: str,
project_id: str = None,
product_set_id: str = None,
retry: Retry = None,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = None,
project_id: Optional[str] = None,
product_set_id: Optional[str] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.location = location
self.project_id = project_id
@ -158,14 +160,14 @@ class CloudVisionProductSetGetOperator(BaseOperator):
self,
location: str,
product_set_id: str,
project_id: str = None,
retry: Retry = None,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = None,
project_id: Optional[str] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = 'google_cloud_default',
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.location = location
self.project_id = project_id
@ -242,17 +244,17 @@ class CloudVisionProductSetUpdateOperator(BaseOperator):
def __init__(
self,
product_set: Union[Dict, ProductSet],
location: str = None,
product_set_id: str = None,
project_id: str = None,
location: Optional[str] = None,
product_set_id: Optional[str] = None,
project_id: Optional[str] = None,
update_mask: Union[Dict, FieldMask] = None,
retry: Retry = None,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = 'google_cloud_default',
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.product_set = product_set
self.update_mask = update_mask
@ -317,14 +319,14 @@ class CloudVisionProductSetDeleteOperator(BaseOperator):
self,
location: str,
product_set_id: str,
project_id: str = None,
retry: Retry = None,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = None,
project_id: Optional[str] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = 'google_cloud_default',
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.location = location
self.project_id = project_id
@ -395,15 +397,15 @@ class CloudVisionProductCreateOperator(BaseOperator):
self,
location: str,
product: str,
project_id: str = None,
product_id: str = None,
retry: Retry = None,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = None,
project_id: Optional[str] = None,
product_id: Optional[str] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = 'google_cloud_default',
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.location = location
self.product = product
@ -474,14 +476,14 @@ class CloudVisionProductGetOperator(BaseOperator):
self,
location: str,
product_id: str,
project_id: str = None,
retry: Retry = None,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = None,
project_id: Optional[str] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.location = location
self.product_id = product_id
@ -569,17 +571,17 @@ class CloudVisionProductUpdateOperator(BaseOperator):
def __init__(
self,
product: Union[Dict, Product],
location: str = None,
product_id: str = None,
project_id: str = None,
location: Optional[str] = None,
product_id: Optional[str] = None,
project_id: Optional[str] = None,
update_mask: Union[Dict, FieldMask] = None,
retry: Retry = None,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = 'google_cloud_default',
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.product = product
self.location = location
@ -649,14 +651,14 @@ class CloudVisionProductDeleteOperator(BaseOperator):
self,
location: str,
product_id: str,
project_id: str = None,
retry: Retry = None,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = None,
project_id: Optional[str] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = 'google_cloud_default',
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.location = location
self.product_id = product_id
@ -709,12 +711,12 @@ class CloudVisionAnnotateImageOperator(BaseOperator):
def __init__(
self,
request: Union[Dict, AnnotateImageRequest],
retry: Retry = None,
timeout: float = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
gcp_conn_id: str = 'google_cloud_default',
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.request = request
self.retry = retry
@ -791,15 +793,15 @@ class CloudVisionReferenceImageCreateOperator(BaseOperator):
location: str,
reference_image: Union[Dict, ReferenceImage],
product_id: str,
reference_image_id: str = None,
project_id: str = None,
retry: Retry = None,
timeout: str = None,
metadata: Sequence[Tuple[str, str]] = None,
reference_image_id: Optional[str] = None,
project_id: Optional[str] = None,
retry: Optional[Retry] = None,
timeout: Optional[str] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = 'google_cloud_default',
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.location = location
self.product_id = product_id
@ -878,14 +880,14 @@ class CloudVisionAddProductToProductSetOperator(BaseOperator):
product_set_id: str,
product_id: str,
location: str,
project_id: str = None,
retry: Retry = None,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = None,
project_id: Optional[str] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.product_set_id = product_set_id
self.product_id = product_id
@ -949,14 +951,14 @@ class CloudVisionRemoveProductFromProductSetOperator(BaseOperator):
product_set_id: str,
product_id: str,
location: str,
project_id: str = None,
retry: Retry = None,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = None,
project_id: Optional[str] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[MetaData] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.product_set_id = product_set_id
self.product_id = product_id
@ -1015,16 +1017,16 @@ class CloudVisionDetectTextOperator(BaseOperator):
def __init__(
self,
image: Union[Dict, Image],
max_results: int = None,
retry: Retry = None,
timeout: float = None,
language_hints: Union[str, List[str]] = None,
web_detection_params: Dict = None,
additional_properties: Dict = None,
max_results: Optional[int] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
language_hints: Optional[Union[str, List[str]]] = None,
web_detection_params: Optional[Dict] = None,
additional_properties: Optional[Dict] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.image = image
self.max_results = max_results
@ -1078,22 +1080,22 @@ class CloudVisionDetectDocumentTextOperator(BaseOperator):
:type additional_properties: dict
"""
# [START vision_document_detect_text_set_template_fields]
template_fields = ("image", "max_results", "timeout", "gcp_conn_id")
template_fields = ("image", "max_results", "timeout", "gcp_conn_id") # Iterable[str]
# [END vision_document_detect_text_set_template_fields]
def __init__(
self,
image: Union[Dict, Image],
max_results: int = None,
retry: Retry = None,
timeout: float = None,
language_hints: Union[str, List[str]] = None,
web_detection_params: Dict = None,
additional_properties: Dict = None,
max_results: Optional[int] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
language_hints: Optional[Union[str, List[str]]] = None,
web_detection_params: Optional[Dict] = None,
additional_properties: Optional[Dict] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.image = image
self.max_results = max_results
@ -1146,14 +1148,14 @@ class CloudVisionDetectImageLabelsOperator(BaseOperator):
def __init__(
self,
image: Union[Dict, Image],
max_results: int = None,
retry: Retry = None,
timeout: float = None,
additional_properties: Dict = None,
max_results: Optional[int] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
additional_properties: Optional[Dict] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.image = image
self.max_results = max_results
@ -1202,14 +1204,14 @@ class CloudVisionDetectImageSafeSearchOperator(BaseOperator):
def __init__(
self,
image: Union[Dict, Image],
max_results: int = None,
retry: Retry = None,
timeout: float = None,
additional_properties: Dict = None,
max_results: Optional[int] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
additional_properties: Optional[Dict] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
):
) -> None:
super().__init__(*args, **kwargs)
self.image = image
self.max_results = max_results

Просмотреть файл

@ -23,6 +23,7 @@ Google Cloud Storage operator.
import os
import warnings
from tempfile import NamedTemporaryFile
from typing import Optional
from airflow.contrib.hooks.azure_data_lake_hook import AzureDataLakeHook
from airflow.contrib.operators.adls_list_operator import AzureDataLakeStorageListOperator
@ -99,16 +100,16 @@ class AdlsToGoogleCloudStorageOperator(AzureDataLakeStorageListOperator):
@apply_defaults
def __init__(self,
src_adls,
dest_gcs,
azure_data_lake_conn_id,
gcp_conn_id='google_cloud_default',
google_cloud_storage_conn_id=None,
delegate_to=None,
replace=False,
gzip=False,
src_adls: str,
dest_gcs: str,
azure_data_lake_conn_id: str,
gcp_conn_id: str = 'google_cloud_default',
google_cloud_storage_conn_id: Optional[str] = None,
delegate_to: Optional[str] = None,
replace: bool = False,
gzip: bool = False,
*args,
**kwargs):
**kwargs) -> None:
super().__init__(
path=src_adls,