зеркало из
1
0
Форкнуть 0
A static method within a class should be called with the class name. Or it should be excluded from the class.
This commit is contained in:
toshetah 2018-08-06 16:38:39 +03:00 коммит произвёл GitHub
Родитель 9fb686987d
Коммит dcdd27c89f
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 36 добавлений и 41 удалений

Просмотреть файл

@ -61,7 +61,6 @@ class _AadHelper(object):
return _get_header(token)
@staticmethod
def _get_header(token):
return "{0} {1}".format(
token[TokenResponseFields.TOKEN_TYPE], token[TokenResponseFields.ACCESS_TOKEN]

Просмотреть файл

@ -2,18 +2,18 @@
import re
_URI_FORMAT = re.compile("https://(\\w+).(queue|blob|table).core.windows.net/([\\w,-]+)\\?(.*)")
class _ConnectionString:
def __init__(self, storage_account_name, objectType, objectName, sas):
def __init__(self, storage_account_name, object_type, object_name, sas):
self.storage_account_name = storage_account_name
self.object_type = objectType
self.object_name = objectName
self.object_type = object_type
self.object_name = object_name
self.sas = sas
@staticmethod
def parse(uri):
""" Parses uri into a _ConnectionString object """
match = re.search(
"https://(\\w+).(queue|blob|table).core.windows.net/([\\w,-]+)\\?(.*)", uri
)
return _ConnectionString(match.group(1), match.group(2), match.group(3), match.group(4))
@classmethod
def parse(cls, uri):
"""Parses uri into a _ConnectionString object"""
match = _URI_FORMAT.search(uri)
return cls(match.group(1), match.group(2), match.group(3), match.group(4))

Просмотреть файл

@ -1,4 +1,4 @@
"""A file contains all descriptors ingest command should work with."""
"""Descriptors the ingest command should work with."""
import os
from io import BytesIO
@ -8,7 +8,7 @@ import tempfile
class FileDescriptor(object):
"""A class that defines a file to ingest."""
"""A file to ingest."""
def __init__(self, path, size=0, deleteSourcesOnSuccess=False):
self.path = path
@ -39,7 +39,7 @@ class FileDescriptor(object):
class BlobDescriptor(object):
"""A class that defines a blob to ingest."""
"""A blob to ingest."""
def __init__(self, path, size):
self.path = path

Просмотреть файл

@ -9,8 +9,6 @@ from .descriptors import BlobDescriptor
class _IngestionBlobInfo:
def __init__(self, blob, ingestionProperties, deleteSourcesOnSuccess=True, authContext=None):
if not isinstance(blob, BlobDescriptor):
raise TypeError("blob must be of type BlobDescriptor")
self.properties = dict()
self.properties["BlobPath"] = blob.path
self.properties["RawDataSize"] = blob.size
@ -26,10 +24,10 @@ class _IngestionBlobInfo:
# TODO: Add support for ingestion statuses
# self.properties["IngestionStatusInTable"] = None
# self.properties["BlobPathEncrypted"] = None
additional_properties = ingestionProperties.additional_properties or dict()
additional_properties = ingestionProperties.additional_properties or {}
additional_properties["authorizationContext"] = authContext
tags = list()
tags = []
if ingestionProperties.additional_tags:
tags.extend(ingestionProperties.additional_tags)
if ingestionProperties.drop_by_tags:
@ -37,13 +35,13 @@ class _IngestionBlobInfo:
if ingestionProperties.ingest_by_tags:
tags.extend(["ingest-by:" + ingest for ingest in ingestionProperties.ingest_by_tags])
if tags:
additional_properties["tags"] = _IngestionBlobInfo._convert_list_to_json(tags)
additional_properties["tags"] = _convert_list_to_json(tags)
if ingestionProperties.ingest_if_not_exists:
additional_properties["ingestIfNotExists"] = _IngestionBlobInfo._convert_list_to_json(
additional_properties["ingestIfNotExists"] = _convert_list_to_json(
ingestionProperties.ingest_if_not_exists
)
if ingestionProperties.mapping:
json_string = _IngestionBlobInfo._convert_dict_to_json(ingestionProperties.mapping)
json_string = _convert_dict_to_json(ingestionProperties.mapping)
additional_properties[
ingestionProperties.get_mapping_format() + "Mapping"
] = json_string
@ -51,7 +49,7 @@ class _IngestionBlobInfo:
key = ingestionProperties.get_mapping_format() + "MappingReference"
additional_properties[key] = ingestionProperties.mapping_reference
if ingestionProperties.validation_policy:
additional_properties["ValidationPolicy"] = _IngestionBlobInfo._convert_dict_to_json(
additional_properties["ValidationPolicy"] = _convert_dict_to_json(
ingestionProperties.validation_policy
)
if ingestionProperties.format:
@ -62,24 +60,22 @@ class _IngestionBlobInfo:
def to_json(self):
""" Converts this object to a json string """
return _IngestionBlobInfo._convert_list_to_json(self.properties)
return _convert_list_to_json(self.properties)
@staticmethod
def _convert_list_to_json(array):
""" Converts array to a json string """
return json.dumps(
array, skipkeys=False, allow_nan=False, indent=None, separators=(",", ":")
)
@staticmethod
def _convert_dict_to_json(array):
""" Converts array to a json string """
return json.dumps(
array,
skipkeys=False,
allow_nan=False,
indent=None,
separators=(",", ":"),
sort_keys=True,
default=lambda o: o.__dict__,
)
def _convert_list_to_json(array):
""" Converts array to a json string """
return json.dumps(array, skipkeys=False, allow_nan=False, indent=None, separators=(",", ":"))
def _convert_dict_to_json(array):
""" Converts array to a json string """
return json.dumps(
array,
skipkeys=False,
allow_nan=False,
indent=None,
separators=(",", ":"),
sort_keys=True,
default=lambda o: o.__dict__,
)