Added support for ingestionMapping, ingestionMappingType and ingestionMappingReference (#168)
This commit is contained in:
Родитель
d81a549a86
Коммит
54b03b0730
|
@ -391,6 +391,7 @@ class KustoClient(object):
|
|||
:return: Kusto response data set.
|
||||
:rtype: azure.kusto.data._response.KustoResponseDataSet
|
||||
"""
|
||||
query = query.strip()
|
||||
if query.startswith("."):
|
||||
return self.execute_mgmt(database, query, properties)
|
||||
return self.execute_query(database, query, properties)
|
||||
|
|
|
@ -1,2 +0,0 @@
|
|||
include *.rst
|
||||
include azure/__init__.py
|
|
@ -6,7 +6,7 @@ Microsoft Azure Kusto Ingest Library for Python
|
|||
from azure.kusto.data.request import KustoConnectionStringBuilder
|
||||
from azure.kusto.ingest import KustoIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat
|
||||
|
||||
ingestion_props = IngestionProperties(database="{database_name}", table="{table_name}", dataFormat=DataFormat.csv)
|
||||
ingestion_props = IngestionProperties(database="{database_name}", table="{table_name}", dataFormat=DataFormat.CSV)
|
||||
client = KustoIngestClient("https://ingest-{cluster_name}.kusto.windows.net")
|
||||
|
||||
file_descriptor = FileDescriptor("{filename}.csv", 3333) # 3333 is the raw size of the data in bytes.
|
||||
|
|
|
@ -13,6 +13,7 @@ from ._ingestion_properties import (
|
|||
CsvColumnMapping,
|
||||
JsonColumnMapping,
|
||||
IngestionProperties,
|
||||
IngestionMappingType,
|
||||
)
|
||||
from .exceptions import KustoMissingMappingReferenceError
|
||||
|
||||
|
|
|
@ -43,16 +43,18 @@ class _IngestionBlobInfo:
|
|||
additional_properties["ingestIfNotExists"] = _convert_list_to_json(
|
||||
ingestion_properties.ingest_if_not_exists
|
||||
)
|
||||
if ingestion_properties.mapping:
|
||||
json_string = _convert_dict_to_json(ingestion_properties.mapping)
|
||||
additional_properties[ingestion_properties.get_mapping_format() + "Mapping"] = json_string
|
||||
if ingestion_properties.mapping_reference:
|
||||
key = ingestion_properties.get_mapping_format() + "MappingReference"
|
||||
additional_properties[key] = ingestion_properties.mapping_reference
|
||||
if ingestion_properties.ingestion_mapping:
|
||||
json_string = _convert_dict_to_json(ingestion_properties.ingestion_mapping)
|
||||
additional_properties["ingestionMapping"] = json_string
|
||||
|
||||
if ingestion_properties.ingestion_mapping_reference:
|
||||
additional_properties["ingestionMappingReference"] = ingestion_properties.ingestion_mapping_reference
|
||||
if ingestion_properties.ingestion_mapping_type:
|
||||
additional_properties["ingestionMappingType"] = ingestion_properties.ingestion_mapping_type.value
|
||||
if ingestion_properties.validation_policy:
|
||||
additional_properties["ValidationPolicy"] = _convert_dict_to_json(ingestion_properties.validation_policy)
|
||||
if ingestion_properties.format:
|
||||
additional_properties["format"] = ingestion_properties.format.name
|
||||
additional_properties["format"] = ingestion_properties.format.value
|
||||
|
||||
if additional_properties:
|
||||
self.properties["AdditionalProperties"] = additional_properties
|
||||
|
|
|
@ -1,24 +1,36 @@
|
|||
"""This file has all classes to define ingestion properties."""
|
||||
|
||||
from enum import Enum, IntEnum
|
||||
import warnings
|
||||
|
||||
from .exceptions import KustoDuplicateMappingError
|
||||
from .exceptions import (
|
||||
KustoDuplicateMappingError,
|
||||
KustoDuplicateMappingReferenceError,
|
||||
KustoMappingAndMappingReferenceError,
|
||||
)
|
||||
|
||||
|
||||
class DataFormat(Enum):
|
||||
"""All data formats supported by Kusto."""
|
||||
|
||||
csv = "csv"
|
||||
tsv = "tsv"
|
||||
scsv = "scsv"
|
||||
sohsv = "sohsv"
|
||||
psv = "psv"
|
||||
txt = "txt"
|
||||
json = "json"
|
||||
singlejson = "singlejson"
|
||||
avro = "avro"
|
||||
parquet = "parquet"
|
||||
multijson = "multijson"
|
||||
CSV = "csv"
|
||||
TSV = "tsv"
|
||||
SCSV = "scsv"
|
||||
SOHSV = "sohsv"
|
||||
PSV = "psv"
|
||||
TXT = "txt"
|
||||
JSON = "json"
|
||||
SINGLEJSON = "singlejson"
|
||||
AVRO = "avro"
|
||||
PARQUET = "parquet"
|
||||
MULTIJSON = "multijson"
|
||||
|
||||
|
||||
class IngestionMappingType(Enum):
|
||||
CSV = "Csv"
|
||||
JSON = "Json"
|
||||
AVRO = "Avro"
|
||||
PARQUET = "Parquet"
|
||||
|
||||
|
||||
class ValidationOptions(IntEnum):
|
||||
|
@ -99,9 +111,12 @@ class IngestionProperties:
|
|||
self,
|
||||
database,
|
||||
table,
|
||||
dataFormat=DataFormat.csv,
|
||||
dataFormat=DataFormat.CSV,
|
||||
mapping=None,
|
||||
ingestionMapping=None,
|
||||
mappingReference=None,
|
||||
ingestionMappingType=None,
|
||||
ingestionMappingReference=None,
|
||||
additionalTags=None,
|
||||
ingestIfNotExists=None,
|
||||
ingestByTags=None,
|
||||
|
@ -112,13 +127,44 @@ class IngestionProperties:
|
|||
validationPolicy=None,
|
||||
additionalProperties=None,
|
||||
):
|
||||
if mapping is not None and mappingReference is not None:
|
||||
# mapping_reference will be deprecated in the next major version
|
||||
if mappingReference is not None:
|
||||
warnings.warn(
|
||||
"""
|
||||
mappingReference will be deprecated in the next major version.
|
||||
Please use ingestionMappingReference instead
|
||||
""",
|
||||
PendingDeprecationWarning,
|
||||
)
|
||||
|
||||
# mapping will be deprecated in the next major version
|
||||
if mapping is not None:
|
||||
warnings.warn(
|
||||
"""
|
||||
mapping will be deprecated in the next major version.
|
||||
Please use ingestionMapping instead
|
||||
""",
|
||||
PendingDeprecationWarning,
|
||||
)
|
||||
|
||||
if mapping is not None and ingestionMapping is not None:
|
||||
raise KustoDuplicateMappingError()
|
||||
|
||||
mapping_exists = mapping is not None or ingestionMapping is not None
|
||||
if mapping_exists and (mappingReference is not None or ingestionMappingReference is not None):
|
||||
raise KustoMappingAndMappingReferenceError()
|
||||
|
||||
if mappingReference is not None and ingestionMappingReference is not None:
|
||||
raise KustoDuplicateMappingReferenceError()
|
||||
|
||||
self.database = database
|
||||
self.table = table
|
||||
self.format = dataFormat
|
||||
self.mapping = mapping
|
||||
self.mapping_reference = mappingReference
|
||||
self.ingestion_mapping = ingestionMapping if ingestionMapping is not None else mapping
|
||||
self.ingestion_mapping_type = ingestionMappingType
|
||||
self.ingestion_mapping_reference = (
|
||||
ingestionMappingReference if ingestionMappingReference is not None else mappingReference
|
||||
)
|
||||
self.additional_tags = additionalTags
|
||||
self.ingest_if_not_exists = ingestIfNotExists
|
||||
self.ingest_by_tags = ingestByTags
|
||||
|
@ -128,12 +174,3 @@ class IngestionProperties:
|
|||
self.report_method = reportMethod
|
||||
self.validation_policy = validationPolicy
|
||||
self.additional_properties = additionalProperties
|
||||
|
||||
def get_mapping_format(self):
|
||||
"""Dictating the corresponding mapping to the format."""
|
||||
if self.format in [DataFormat.json, DataFormat.singlejson, DataFormat.multijson]:
|
||||
return DataFormat.json.name
|
||||
elif self.format == DataFormat.avro:
|
||||
return DataFormat.avro.name
|
||||
else:
|
||||
return DataFormat.csv.name
|
||||
|
|
|
@ -19,7 +19,7 @@ class KustoStreamingIngestClient(object):
|
|||
Tests are run using pytest.
|
||||
"""
|
||||
|
||||
_mapping_required_formats = [DataFormat.json, DataFormat.singlejson, DataFormat.avro]
|
||||
_mapping_required_formats = {DataFormat.JSON, DataFormat.SINGLEJSON, DataFormat.AVRO}
|
||||
|
||||
def __init__(self, kcsb):
|
||||
"""Kusto Streaming Ingest Client constructor.
|
||||
|
@ -45,7 +45,7 @@ class KustoStreamingIngestClient(object):
|
|||
|
||||
fd = FileDescriptor(temp_file_path)
|
||||
|
||||
ingestion_properties.format = DataFormat.csv
|
||||
ingestion_properties.format = DataFormat.CSV
|
||||
|
||||
stream_descriptor = StreamDescriptor(fd.zipped_stream, fd.source_id, True)
|
||||
|
||||
|
@ -88,7 +88,7 @@ class KustoStreamingIngestClient(object):
|
|||
|
||||
if (
|
||||
ingestion_properties.format in self._mapping_required_formats
|
||||
and ingestion_properties.mapping_reference is None
|
||||
and ingestion_properties.ingestion_mapping_reference is None
|
||||
):
|
||||
raise KustoMissingMappingReferenceError()
|
||||
|
||||
|
@ -109,5 +109,5 @@ class KustoStreamingIngestClient(object):
|
|||
ingestion_properties.table,
|
||||
stream,
|
||||
ingestion_properties.format.name,
|
||||
mapping_name=ingestion_properties.mapping_reference,
|
||||
mapping_name=ingestion_properties.ingestion_mapping_reference,
|
||||
)
|
||||
|
|
|
@ -3,17 +3,37 @@
|
|||
from azure.kusto.data.exceptions import KustoClientError
|
||||
|
||||
|
||||
class KustoDuplicateMappingError(KustoClientError):
|
||||
class KustoMappingAndMappingReferenceError(KustoClientError):
|
||||
"""
|
||||
Error to be raised when ingestion properties has both
|
||||
ingestion mappings and ingestion mapping reference.
|
||||
Error to be raised when ingestion properties include both
|
||||
a mapping and a mapping reference
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
message = "Ingestion properties contains ingestion mapping and ingestion mapping reference."
|
||||
message = "Ingestion properties contain both an explicit mapping and a mapping reference."
|
||||
super(KustoMappingAndMappingReferenceError, self).__init__(message)
|
||||
|
||||
|
||||
class KustoDuplicateMappingError(KustoClientError):
|
||||
"""
|
||||
Error to be raised when ingestion properties include two explicit mappings.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
message = "Ingestion properties contain two explicit mappings."
|
||||
super(KustoDuplicateMappingError, self).__init__(message)
|
||||
|
||||
|
||||
class KustoDuplicateMappingReferenceError(KustoClientError):
|
||||
"""
|
||||
Error to be raised when ingestion properties include two mapping references.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
message = "Ingestion properties contain two mapping references."
|
||||
super(KustoDuplicateMappingReferenceError, self).__init__(message)
|
||||
|
||||
|
||||
class KustoMissingMappingReferenceError(KustoClientError):
|
||||
"""
|
||||
Error to be raised when ingestion properties has data format of Json, SingleJson, MultiJson or Avro
|
||||
|
|
|
@ -1,2 +0,0 @@
|
|||
[tool.black]
|
||||
line-length = 120
|
|
@ -115,7 +115,7 @@ def test_csv_ingest_non_existing_table():
|
|||
csv_ingest_props = IngestionProperties(
|
||||
db_name,
|
||||
table_name,
|
||||
dataFormat=DataFormat.csv,
|
||||
dataFormat=DataFormat.CSV,
|
||||
mapping=Helpers.create_deft_table_csv_mappings(),
|
||||
reportLevel=ReportLevel.FailuresAndSuccesses,
|
||||
)
|
||||
|
@ -156,7 +156,7 @@ def test_json_ingest_existing_table():
|
|||
json_ingestion_props = IngestionProperties(
|
||||
db_name,
|
||||
table_name,
|
||||
dataFormat=DataFormat.json,
|
||||
dataFormat=DataFormat.JSON,
|
||||
mapping=Helpers.create_deft_table_json_mappings(),
|
||||
reportLevel=ReportLevel.FailuresAndSuccesses,
|
||||
)
|
||||
|
@ -197,7 +197,7 @@ def test_ingest_complicated_props():
|
|||
json_ingestion_props = IngestionProperties(
|
||||
db_name,
|
||||
table_name,
|
||||
dataFormat=DataFormat.json,
|
||||
dataFormat=DataFormat.JSON,
|
||||
mapping=Helpers.create_deft_table_json_mappings(),
|
||||
additionalTags=["a", "b"],
|
||||
ingestIfNotExists=["aaaa", "bbbb"],
|
||||
|
@ -243,7 +243,7 @@ def test_json_ingestion_ingest_by_tag():
|
|||
json_ingestion_props = IngestionProperties(
|
||||
db_name,
|
||||
table_name,
|
||||
dataFormat=DataFormat.json,
|
||||
dataFormat=DataFormat.JSON,
|
||||
mapping=Helpers.create_deft_table_json_mappings(),
|
||||
ingestIfNotExists=["ingestByTag"],
|
||||
reportLevel=ReportLevel.FailuresAndSuccesses,
|
||||
|
@ -280,7 +280,7 @@ def test_tsv_ingestion_csv_mapping():
|
|||
tsv_ingestion_props = IngestionProperties(
|
||||
db_name,
|
||||
table_name,
|
||||
dataFormat=DataFormat.tsv,
|
||||
dataFormat=DataFormat.TSV,
|
||||
mapping=Helpers.create_deft_table_csv_mappings(),
|
||||
reportLevel=ReportLevel.FailuresAndSuccesses,
|
||||
)
|
||||
|
@ -321,7 +321,7 @@ def test_streaming_ingest_from_opened_file():
|
|||
|
||||
file_path = os.path.join(current_dir, *missing_path_parts)
|
||||
stream = open(file_path, "r")
|
||||
ingestion_properties = IngestionProperties(database=db_name, table=table_name, dataFormat=DataFormat.csv)
|
||||
ingestion_properties = IngestionProperties(database=db_name, table=table_name, dataFormat=DataFormat.CSV)
|
||||
ingest_client.ingest_from_stream(stream, ingestion_properties=ingestion_properties)
|
||||
|
||||
|
||||
|
@ -336,7 +336,7 @@ def test_streaming_ingest_form_csv_file():
|
|||
|
||||
file_path = os.path.join(current_dir, *missing_path_parts)
|
||||
|
||||
ingestion_properties = IngestionProperties(database=db_name, table=table_name, dataFormat=DataFormat.csv)
|
||||
ingestion_properties = IngestionProperties(database=db_name, table=table_name, dataFormat=DataFormat.CSV)
|
||||
ingest_client.ingest_from_file(file_path, ingestion_properties=ingestion_properties)
|
||||
|
||||
path_parts = ["azure-kusto-ingest", "tests", "input", "dataset.csv.gz"]
|
||||
|
@ -352,7 +352,7 @@ def test_streaming_ingest_form_csv_file():
|
|||
|
||||
@pytest.mark.run(order=8)
|
||||
def test_streaming_ingest_from_json_no_mapping():
|
||||
ingestion_properties = IngestionProperties(database=db_name, table=table_name, dataFormat=DataFormat.json)
|
||||
ingestion_properties = IngestionProperties(database=db_name, table=table_name, dataFormat=DataFormat.JSON)
|
||||
try:
|
||||
current_dir = os.getcwd()
|
||||
path_parts = ["azure-kusto-ingest", "tests", "input", "dataset.json"]
|
||||
|
@ -385,7 +385,7 @@ def test_streaming_ingest_from_json_file():
|
|||
|
||||
file_path = os.path.join(current_dir, *missing_path_parts)
|
||||
ingestion_properties = IngestionProperties(
|
||||
database=db_name, table=table_name, dataFormat=DataFormat.json, mappingReference="JsonMapping"
|
||||
database=db_name, table=table_name, dataFormat=DataFormat.JSON, mappingReference="JsonMapping"
|
||||
)
|
||||
ingest_client.ingest_from_file(file_path, ingestion_properties=ingestion_properties)
|
||||
|
||||
|
@ -402,7 +402,7 @@ def test_streaming_ingest_from_json_file():
|
|||
|
||||
@pytest.mark.run(order=10)
|
||||
def test_streaming_ingest_from_io_streams():
|
||||
ingestion_properties = IngestionProperties(database=db_name, table=table_name, dataFormat=DataFormat.csv)
|
||||
ingestion_properties = IngestionProperties(database=db_name, table=table_name, dataFormat=DataFormat.CSV)
|
||||
byte_sequence = b'0,00000000-0000-0000-0001-020304050607,0,0,0,0,0,0,0,0,0,0,2014-01-01T01:01:01.0000000Z,Zero,"Zero",0,00:00:00,,null'
|
||||
bytes_stream = io.BytesIO(byte_sequence)
|
||||
ingest_client.ingest_from_stream(bytes_stream, ingestion_properties=ingestion_properties)
|
||||
|
@ -413,9 +413,9 @@ def test_streaming_ingest_from_io_streams():
|
|||
|
||||
byte_sequence = b'{"rownumber": 0, "rowguid": "00000000-0000-0000-0001-020304050607", "xdouble": 0.0, "xfloat": 0.0, "xbool": 0, "xint16": 0, "xint32": 0, "xint64": 0, "xunit8": 0, "xuint16": 0, "xunit32": 0, "xunit64": 0, "xdate": "2014-01-01T01:01:01Z", "xsmalltext": "Zero", "xtext": "Zero", "xnumberAsText": "0", "xtime": "00:00:00", "xtextWithNulls": null, "xdynamicWithNulls": ""}'
|
||||
bytes_stream = io.BytesIO(byte_sequence)
|
||||
ingestion_properties.format = DataFormat.json
|
||||
ingestion_properties.format = DataFormat.JSON
|
||||
|
||||
ingestion_properties.mapping_reference = "JsonMapping"
|
||||
ingestion_properties.ingestion_mapping_reference = "JsonMapping"
|
||||
ingest_client.ingest_from_stream(bytes_stream, ingestion_properties=ingestion_properties)
|
||||
|
||||
str_sequence = u'{"rownumber": 0, "rowguid": "00000000-0000-0000-0001-020304050607", "xdouble": 0.0, "xfloat": 0.0, "xbool": 0, "xint16": 0, "xint32": 0, "xint64": 0, "xunit8": 0, "xuint16": 0, "xunit32": 0, "xunit64": 0, "xdate": "2014-01-01T01:01:01Z", "xsmalltext": "Zero", "xtext": "Zero", "xnumberAsText": "0", "xtime": "00:00:00", "xtextWithNulls": null, "xdynamicWithNulls": ""}'
|
||||
|
@ -483,5 +483,5 @@ def test_streaming_ingest_from_dataframe():
|
|||
]
|
||||
]
|
||||
df = DataFrame(data=rows, columns=fields)
|
||||
ingestion_properties = IngestionProperties(database=db_name, table=table_name, dataFormat=DataFormat.csv)
|
||||
ingestion_properties = IngestionProperties(database=db_name, table=table_name, dataFormat=DataFormat.CSV)
|
||||
ingest_client.ingest_from_dataframe(df, ingestion_properties)
|
||||
|
|
|
@ -9,6 +9,7 @@ from azure.kusto.ingest import (
|
|||
StreamDescriptor,
|
||||
DataFormat,
|
||||
ReportLevel,
|
||||
IngestionMappingType,
|
||||
KustoStreamingIngestClient,
|
||||
)
|
||||
|
||||
|
@ -61,9 +62,12 @@ client = KustoIngestClient(kcsb)
|
|||
ingestion_props = IngestionProperties(
|
||||
database="{database_name}",
|
||||
table="{table_name}",
|
||||
dataFormat=DataFormat.csv,
|
||||
# incase status update for success are also required
|
||||
dataFormat=DataFormat.CSV,
|
||||
# in case status update for success are also required
|
||||
# reportLevel=ReportLevel.FailuresAndSuccesses,
|
||||
# in case a mapping is required
|
||||
# ingestionMappingReference="{json_mapping_that_already_exists_on_table}"
|
||||
# ingestionMappingType=IngestionMappingType.Json
|
||||
)
|
||||
|
||||
# ingest from file
|
||||
|
@ -143,7 +147,7 @@ cluster = "https://{cluster_name}.kusto.windows.net"
|
|||
|
||||
client = KustoStreamingIngestClient(kcsb)
|
||||
|
||||
ingestion_props = IngestionProperties(database="{database_name}", table="{table_name}", dataFormat=DataFormat.csv)
|
||||
ingestion_props = IngestionProperties(database="{database_name}", table="{table_name}", dataFormat=DataFormat.CSV)
|
||||
|
||||
# ingest from file
|
||||
file_descriptor = FileDescriptor("{filename}.csv", 3333) # 3333 is the raw size of the data in bytes.
|
||||
|
|
|
@ -4,7 +4,11 @@ import json
|
|||
from uuid import UUID
|
||||
from six import assertRegex
|
||||
from azure.kusto.ingest._ingestion_blob_info import _IngestionBlobInfo
|
||||
from azure.kusto.ingest.exceptions import KustoDuplicateMappingError
|
||||
from azure.kusto.ingest.exceptions import (
|
||||
KustoDuplicateMappingError,
|
||||
KustoDuplicateMappingReferenceError,
|
||||
KustoMappingAndMappingReferenceError,
|
||||
)
|
||||
from azure.kusto.ingest import (
|
||||
BlobDescriptor,
|
||||
IngestionProperties,
|
||||
|
@ -32,8 +36,8 @@ class IngestionBlobInfoTest(unittest.TestCase):
|
|||
properties = IngestionProperties(
|
||||
database="database",
|
||||
table="table",
|
||||
dataFormat=DataFormat.csv,
|
||||
mapping=[CsvColumnMapping("ColumnName", "cslDataType", 1)],
|
||||
dataFormat=DataFormat.CSV,
|
||||
ingestionMapping=[CsvColumnMapping("ColumnName", "cslDataType", 1)],
|
||||
additionalTags=["tag"],
|
||||
ingestIfNotExists=["ingestIfNotExistTags"],
|
||||
ingestByTags=["ingestByTags"],
|
||||
|
@ -55,7 +59,7 @@ class IngestionBlobInfoTest(unittest.TestCase):
|
|||
properties = IngestionProperties(
|
||||
database="database",
|
||||
table="table",
|
||||
dataFormat=DataFormat.csv,
|
||||
dataFormat=DataFormat.CSV,
|
||||
mappingReference="csvMappingReference",
|
||||
additionalTags=["tag"],
|
||||
ingestIfNotExists=["ingestIfNotExistTags"],
|
||||
|
@ -78,8 +82,8 @@ class IngestionBlobInfoTest(unittest.TestCase):
|
|||
properties = IngestionProperties(
|
||||
database="database",
|
||||
table="table",
|
||||
dataFormat=DataFormat.json,
|
||||
mapping=[JsonColumnMapping("ColumnName", "jsonpath", "datatype")],
|
||||
dataFormat=DataFormat.JSON,
|
||||
ingestionMapping=[JsonColumnMapping("ColumnName", "jsonpath", "datatype")],
|
||||
additionalTags=["tag"],
|
||||
ingestIfNotExists=["ingestIfNotExistTags"],
|
||||
ingestByTags=["ingestByTags"],
|
||||
|
@ -101,7 +105,7 @@ class IngestionBlobInfoTest(unittest.TestCase):
|
|||
properties = IngestionProperties(
|
||||
database="database",
|
||||
table="table",
|
||||
dataFormat=DataFormat.json,
|
||||
dataFormat=DataFormat.JSON,
|
||||
mappingReference="jsonMappingReference",
|
||||
additionalTags=["tag"],
|
||||
ingestIfNotExists=["ingestIfNotExistTags"],
|
||||
|
@ -119,9 +123,43 @@ class IngestionBlobInfoTest(unittest.TestCase):
|
|||
def test_blob_info_csv_exceptions(self):
|
||||
"""Tests invalid ingestion properties."""
|
||||
with self.assertRaises(KustoDuplicateMappingError):
|
||||
IngestionProperties(
|
||||
database="database", table="table", mapping="mapping", ingestionMapping="ingestionMapping"
|
||||
)
|
||||
|
||||
with self.assertRaises(KustoMappingAndMappingReferenceError):
|
||||
IngestionProperties(
|
||||
database="database",
|
||||
table="table",
|
||||
mapping="mapping",
|
||||
ingestionMappingReference="ingestionMappingReference",
|
||||
)
|
||||
|
||||
with self.assertRaises(KustoMappingAndMappingReferenceError):
|
||||
IngestionProperties(
|
||||
database="database",
|
||||
table="table",
|
||||
ingestionMapping="ingestionMapping",
|
||||
ingestionMappingReference="ingestionMappingReference",
|
||||
)
|
||||
with self.assertRaises(KustoMappingAndMappingReferenceError):
|
||||
IngestionProperties(
|
||||
database="database", table="table", mapping="mapping", mappingReference="mappingReference"
|
||||
)
|
||||
with self.assertRaises(KustoMappingAndMappingReferenceError):
|
||||
IngestionProperties(
|
||||
database="database",
|
||||
table="table",
|
||||
ingestionMapping="ingestionMapping",
|
||||
mappingReference="mappingReference",
|
||||
)
|
||||
with self.assertRaises(KustoDuplicateMappingReferenceError):
|
||||
IngestionProperties(
|
||||
database="database",
|
||||
table="table",
|
||||
mappingReference="mappingReference",
|
||||
ingestionMappingReference="ingestionMappingReference",
|
||||
)
|
||||
|
||||
def _verify_ingestion_blob_info_result(self, ingestion_blob_info):
|
||||
result = json.loads(ingestion_blob_info)
|
||||
|
|
|
@ -115,7 +115,7 @@ class KustoIngestClientTests(unittest.TestCase):
|
|||
)
|
||||
|
||||
ingest_client = KustoIngestClient("https://ingest-somecluster.kusto.windows.net")
|
||||
ingestion_properties = IngestionProperties(database="database", table="table", dataFormat=DataFormat.csv)
|
||||
ingestion_properties = IngestionProperties(database="database", table="table", dataFormat=DataFormat.CSV)
|
||||
|
||||
# ensure test can work when executed from within directories
|
||||
current_dir = os.getcwd()
|
||||
|
@ -178,7 +178,7 @@ class KustoIngestClientTests(unittest.TestCase):
|
|||
)
|
||||
|
||||
ingest_client = KustoIngestClient("https://ingest-somecluster.kusto.windows.net")
|
||||
ingestion_properties = IngestionProperties(database="database", table="table", dataFormat=DataFormat.csv)
|
||||
ingestion_properties = IngestionProperties(database="database", table="table", dataFormat=DataFormat.CSV)
|
||||
|
||||
from pandas import DataFrame
|
||||
|
||||
|
|
|
@ -54,7 +54,7 @@ class KustoStreamingIngestClientTests(unittest.TestCase):
|
|||
)
|
||||
|
||||
ingest_client = KustoStreamingIngestClient("https://somecluster.kusto.windows.net")
|
||||
ingestion_properties = IngestionProperties(database="database", table="table", dataFormat=DataFormat.csv)
|
||||
ingestion_properties = IngestionProperties(database="database", table="table", dataFormat=DataFormat.CSV)
|
||||
|
||||
# ensure test can work when executed from within directories
|
||||
current_dir = os.getcwd()
|
||||
|
@ -79,7 +79,7 @@ class KustoStreamingIngestClientTests(unittest.TestCase):
|
|||
ingest_client.ingest_from_file(file_path, ingestion_properties=ingestion_properties)
|
||||
|
||||
ingestion_properties = IngestionProperties(
|
||||
database="database", table="table", dataFormat=DataFormat.json, mappingReference="JsonMapping"
|
||||
database="database", table="table", dataFormat=DataFormat.JSON, mappingReference="JsonMapping"
|
||||
)
|
||||
|
||||
path_parts = ["azure-kusto-ingest", "tests", "input", "dataset.json"]
|
||||
|
@ -102,7 +102,7 @@ class KustoStreamingIngestClientTests(unittest.TestCase):
|
|||
|
||||
ingest_client.ingest_from_file(file_path, ingestion_properties=ingestion_properties)
|
||||
|
||||
ingestion_properties = IngestionProperties(database="database", table="table", dataFormat=DataFormat.tsv)
|
||||
ingestion_properties = IngestionProperties(database="database", table="table", dataFormat=DataFormat.TSV)
|
||||
|
||||
path_parts = ["azure-kusto-ingest", "tests", "input", "dataset.tsv"]
|
||||
missing_path_parts = []
|
||||
|
@ -123,7 +123,7 @@ class KustoStreamingIngestClientTests(unittest.TestCase):
|
|||
)
|
||||
|
||||
ingest_client = KustoStreamingIngestClient("https://somecluster.kusto.windows.net")
|
||||
ingestion_properties = IngestionProperties(database="database", table="table", dataFormat=DataFormat.csv)
|
||||
ingestion_properties = IngestionProperties(database="database", table="table", dataFormat=DataFormat.CSV)
|
||||
|
||||
from pandas import DataFrame
|
||||
|
||||
|
@ -142,7 +142,7 @@ class KustoStreamingIngestClientTests(unittest.TestCase):
|
|||
)
|
||||
|
||||
ingest_client = KustoStreamingIngestClient("https://somecluster.kusto.windows.net")
|
||||
ingestion_properties = IngestionProperties(database="database", table="table", dataFormat=DataFormat.csv)
|
||||
ingestion_properties = IngestionProperties(database="database", table="table", dataFormat=DataFormat.CSV)
|
||||
|
||||
byte_sequence = b"56,56,56"
|
||||
bytes_stream = io.BytesIO(byte_sequence)
|
||||
|
@ -154,13 +154,13 @@ class KustoStreamingIngestClientTests(unittest.TestCase):
|
|||
|
||||
byte_sequence = b'{"Name":"Ben","Age":"56","Weight":"75"}'
|
||||
bytes_stream = io.BytesIO(byte_sequence)
|
||||
ingestion_properties.format = DataFormat.json
|
||||
ingestion_properties.format = DataFormat.JSON
|
||||
try:
|
||||
ingest_client.ingest_from_stream(bytes_stream, ingestion_properties=ingestion_properties)
|
||||
except KustoMissingMappingReferenceError:
|
||||
pass
|
||||
|
||||
ingestion_properties.mapping_reference = "JsonMapping"
|
||||
ingestion_properties.ingestion_mapping_reference = "JsonMapping"
|
||||
ingest_client.ingest_from_stream(bytes_stream, ingestion_properties=ingestion_properties)
|
||||
|
||||
str_sequence = u'{"Name":"Ben","Age":"56","Weight":"75"}'
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
[tool.black]
|
||||
line-length = 120
|
2
test.bat
2
test.bat
|
@ -2,6 +2,6 @@ cd %PROJECTS_HOME%\azure-kusto-python
|
|||
call workon kusto
|
||||
call pip uninstall azure-kusto-data azure-kusto-ingest -y
|
||||
call pip install ./azure-kusto-data[pandas] ./azure-kusto-ingest[pandas]
|
||||
call pip install --force-reinstall azure-nspkg==1.0.0
|
||||
call pip install --force-reinstall azure-nspkg==2.0.0
|
||||
call pytest
|
||||
pause
|
Загрузка…
Ссылка в новой задаче