зеркало из
1
0
Форкнуть 0

Adding support for dataframes (#59)

* rebased

* remove headers from csv file

* fixed most of tests and updated travis-ci to match other ms python sdk's

* fixing tests and travis file

* minor change to travis

* added test for dataframes
also, improved writing dataframes to file with gzip in one step instead of two

* removing old commented out code

* removed resolved TODO's

* forgot minor chage

* bad .travis.yml

* tests depending on content size are fragile due to different OSs, relaxed assert

* further relaxed tests

* python < 3.6 json.loads bytes differently

* pr fixes

* minor fixes

* todo uppercase

* add black validation

* re-added black to installation

* black formatting

* final fix?
This commit is contained in:
Daniel Dubovski 2018-08-29 11:27:06 +03:00 коммит произвёл GitHub
Родитель 1008c0fd5e
Коммит d530c9c33f
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
30 изменённых файлов: 440 добавлений и 785 удалений

3
.pylintrc Normal file
Просмотреть файл

@ -0,0 +1,3 @@
# https://github.com/getsentry/responses/issues/74
[TYPECHECK]
ignored-classes= responses

Просмотреть файл

@ -1,17 +1,33 @@
dist: trusty
sudo: required
cache: pip
language: python
python:
- "2.7"
- "3.4"
- "3.5"
- "3.6"
matrix:
include:
- os: linux
python: "2.7"
- os: linux
python: "3.4"
- os: linux
python: "3.5"
- os: linux
python: "3.6"
fast_finish: true
before_install:
- if [[ -n "$TRAVIS_TAG" && "$TRAVIS_PYTHON_VERSION" != "3.6" ]]; then travis_terminate 0; fi; # Deploy on 3.6
install:
- pip install -U pip
- pip install -r dev_requirements.txt
- pip install codecov coverage
- pip install ./azure-kusto-data ./azure-kusto-ingest
- pip install --force-reinstall azure-nspkg==1.0.0
before_script:
- if [[ "$TRAVIS_PYTHON_VERSION" == "3.6" ]]; then black . --line-length 100 --check; fi
- if [[ "$TRAVIS_PYTHON_VERSION" == "3.6" ]]; then black . --line-length 120 --check; fi
script:
- pytest azure-kusto-data
- pytest azure-kusto-ingest
- pytest
after_success:
- coverage report
- codecov
deploy:
provider: pypi
user: microsoftkusto

Просмотреть файл

@ -12,9 +12,7 @@ import pandas
import six
# Regex for TimeSpan
_TIMESPAN_PATTERN = re.compile(
r"(-?)((?P<d>[0-9]*).)?(?P<h>[0-9]{2}):(?P<m>[0-9]{2}):(?P<s>[0-9]{2}(\.[0-9]+)?$)"
)
_TIMESPAN_PATTERN = re.compile(r"(-?)((?P<d>[0-9]*).)?(?P<h>[0-9]{2}):(?P<m>[0-9]{2}):(?P<s>[0-9]{2}(\.[0-9]+)?$)")
class WellKnownDataSet(Enum):
@ -95,9 +93,7 @@ class _KustoResultRow(object):
class _KustoResultColumn(object):
def __init__(self, json_column, ordianl):
self.column_name = json_column["ColumnName"]
self.column_type = (
json_column["ColumnType"] if "ColumnType" in json_column else json_column["DataType"]
)
self.column_type = json_column["ColumnType"] if "ColumnType" in json_column else json_column["DataType"]
self.ordinal = ordianl
@ -107,9 +103,7 @@ class _KustoResultTable(object):
def __init__(self, json_table):
self.table_name = json_table["TableName"]
self.table_id = json_table["TableId"] if "TableId" in json_table else None
self.table_kind = (
WellKnownDataSet[json_table["TableKind"]] if "TableKind" in json_table else None
)
self.table_kind = WellKnownDataSet[json_table["TableKind"]] if "TableKind" in json_table else None
self.columns = []
ordinal = 0
for column in json_table["Columns"]:
@ -134,18 +128,14 @@ class _KustoResultTable(object):
if not self.columns or not self._rows:
return pandas.DataFrame()
frame = pandas.DataFrame(
self._rows, columns=[column.column_name for column in self.columns]
)
frame = pandas.DataFrame(self._rows, columns=[column.column_name for column in self.columns])
for column in self.columns:
col_name = column.column_name
col_type = column.column_type
if col_type.lower() == "timespan":
frame[col_name] = pandas.to_timedelta(
frame[col_name].apply(
lambda t: t.replace(".", " days ") if t and "." in t.split(":")[0] else t
)
frame[col_name].apply(lambda t: t.replace(".", " days ") if t and "." in t.split(":")[0] else t)
)
elif col_type.lower() == "dynamic":
frame[col_name] = frame[col_name].apply(lambda x: json.loads(x) if x else None)
@ -213,9 +203,7 @@ class _KustoResponseDataSet:
"""Returns primary results. If there is more than one returns a list."""
if self.tables_count == 1:
return self.tables
primary = list(
filter(lambda x: x.table_kind == WellKnownDataSet.PrimaryResult, self.tables)
)
primary = list(filter(lambda x: x.table_kind == WellKnownDataSet.PrimaryResult, self.tables))
return primary
@ -223,8 +211,7 @@ class _KustoResponseDataSet:
def errors_count(self):
"""Checks whether an exception was thrown."""
query_status_table = next(
(t for t in self.tables if t.table_kind == WellKnownDataSet.QueryCompletionInformation),
None,
(t for t in self.tables if t.table_kind == WellKnownDataSet.QueryCompletionInformation), None
)
if not query_status_table:
return 0
@ -243,8 +230,7 @@ class _KustoResponseDataSet:
def get_exceptions(self):
"""Gets the excpetions retrieved from Kusto if exists."""
query_status_table = next(
(t for t in self.tables if t.table_kind == WellKnownDataSet.QueryCompletionInformation),
None,
(t for t in self.tables if t.table_kind == WellKnownDataSet.QueryCompletionInformation), None
)
if not query_status_table:
return []
@ -309,6 +295,4 @@ class _KustoResponseDataSetV2(_KustoResponseDataSet):
_crid_column = "ClientRequestId"
def __init__(self, json_response):
super(_KustoResponseDataSetV2, self).__init__(
[t for t in json_response if t["FrameType"] == "DataTable"]
)
super(_KustoResponseDataSetV2, self).__init__([t for t in json_response if t["FrameType"] == "DataTable"])

Просмотреть файл

@ -107,9 +107,7 @@ class KustoConnectionStringBuilder(object):
return kcsb
@classmethod
def with_aad_application_certificate_authentication(
cls, connection_string, aad_app_id, certificate, thumbprint
):
def with_aad_application_certificate_authentication(cls, connection_string, aad_app_id, certificate, thumbprint):
"""Creates a KustoConnection string builder that will authenticate with AAD application and
a certificate credentials.
:param str connection_string: Kusto connection string should by of the format:
@ -220,14 +218,7 @@ class KustoClient(object):
self._query_endpoint = "{0}/v2/rest/query".format(kusto_cluster)
self._aad_helper = _AadHelper(kcsb)
def execute(
self,
kusto_database,
query,
accept_partial_results=False,
timeout=None,
get_raw_response=False,
):
def execute(self, kusto_database, query, accept_partial_results=False, timeout=None, get_raw_response=False):
"""Executes a query or management command.
:param str kusto_database: Database against query will be executed.
:param str query: Query to be executed.
@ -240,21 +231,10 @@ class KustoClient(object):
Whether to get a raw response, or a parsed one.
"""
if query.startswith("."):
return self.execute_mgmt(
kusto_database, query, accept_partial_results, timeout, get_raw_response
)
return self.execute_query(
kusto_database, query, accept_partial_results, timeout, get_raw_response
)
return self.execute_mgmt(kusto_database, query, accept_partial_results, timeout, get_raw_response)
return self.execute_query(kusto_database, query, accept_partial_results, timeout, get_raw_response)
def execute_query(
self,
kusto_database,
query,
accept_partial_results=False,
timeout=None,
get_raw_response=False,
):
def execute_query(self, kusto_database, query, accept_partial_results=False, timeout=None, get_raw_response=False):
"""Executes a query.
:param str kusto_database: Database against query will be executed.
:param str query: Query to be executed.
@ -267,22 +247,10 @@ class KustoClient(object):
Whether to get a raw response, or a parsed one.
"""
return self._execute(
self._query_endpoint,
kusto_database,
query,
accept_partial_results,
timeout,
get_raw_response,
self._query_endpoint, kusto_database, query, accept_partial_results, timeout, get_raw_response
)
def execute_mgmt(
self,
kusto_database,
query,
accept_partial_results=False,
timeout=None,
get_raw_response=False,
):
def execute_mgmt(self, kusto_database, query, accept_partial_results=False, timeout=None, get_raw_response=False):
"""Executes a management command.
:param str kusto_database: Database against query will be executed.
:param str query: Query to be executed.
@ -295,22 +263,11 @@ class KustoClient(object):
Whether to get a raw response, or a parsed one.
"""
return self._execute(
self._mgmt_endpoint,
kusto_database,
query,
accept_partial_results,
timeout,
get_raw_response,
self._mgmt_endpoint, kusto_database, query, accept_partial_results, timeout, get_raw_response
)
def _execute(
self,
endpoint,
kusto_database,
kusto_query,
accept_partial_results=False,
timeout=None,
get_raw_response=False,
self, endpoint, kusto_database, kusto_query, accept_partial_results=False, timeout=None, get_raw_response=False
):
"""Executes given query against this client"""
@ -327,9 +284,7 @@ class KustoClient(object):
"x-ms-client-request-id": "KPC.execute;" + str(uuid.uuid4()),
}
response = requests.post(
endpoint, headers=request_headers, json=request_payload, timeout=timeout
)
response = requests.post(endpoint, headers=request_headers, json=request_payload, timeout=timeout)
if response.status_code == 200:
if get_raw_response:

Просмотреть файл

@ -26,9 +26,7 @@ class _AadHelper(object):
def __init__(self, kcsb):
authority = kcsb.authority_id or "common"
self._kusto_cluster = "{0.scheme}://{0.hostname}".format(urlparse(kcsb.data_source))
self._adal_context = AuthenticationContext(
"https://login.microsoftonline.com/{0}".format(authority)
)
self._adal_context = AuthenticationContext("https://login.microsoftonline.com/{0}".format(authority))
self._username = None
if all([kcsb.aad_user_id, kcsb.password]):
self._authentication_method = AuthenticationMethod.aad_username_password
@ -39,13 +37,7 @@ class _AadHelper(object):
self._authentication_method = AuthenticationMethod.aad_application_key
self._client_id = kcsb.application_client_id
self._client_secret = kcsb.application_key
elif all(
[
kcsb.application_client_id,
kcsb.application_certificate,
kcsb.application_certificate_thumbprint,
]
):
elif all([kcsb.application_client_id, kcsb.application_certificate, kcsb.application_certificate_thumbprint]):
self._authentication_method = AuthenticationMethod.aad_application_certificate
self._client_id = kcsb.application_client_id
self._certificate = kcsb.application_certificate
@ -56,9 +48,7 @@ class _AadHelper(object):
def acquire_token(self):
"""Acquire tokens from AAD."""
token = self._adal_context.acquire_token(
self._kusto_cluster, self._username, self._client_id
)
token = self._adal_context.acquire_token(self._kusto_cluster, self._username, self._client_id)
if token is not None:
expiration_date = dateutil.parser.parse(token[TokenResponseFields.EXPIRES_ON])
if expiration_date > datetime.now() + timedelta(minutes=1):
@ -82,9 +72,7 @@ class _AadHelper(object):
code = self._adal_context.acquire_user_code(self._kusto_cluster, self._client_id)
print(code[OAuth2DeviceCodeResponseParameters.MESSAGE])
webbrowser.open(code[OAuth2DeviceCodeResponseParameters.VERIFICATION_URL])
token = self._adal_context.acquire_token_with_device_code(
self._kusto_cluster, code, self._client_id
)
token = self._adal_context.acquire_token_with_device_code(self._kusto_cluster, code, self._client_id)
elif self._authentication_method is AuthenticationMethod.aad_application_certificate:
token = self._adal_context.acquire_token_with_client_certificate(
self._kusto_cluster, self._client_id, self._certificate, self._thumbprint
@ -98,6 +86,4 @@ class _AadHelper(object):
def _get_header(token):
return "{0} {1}".format(
token[TokenResponseFields.TOKEN_TYPE], token[TokenResponseFields.ACCESS_TOKEN]
)
return "{0} {1}".format(token[TokenResponseFields.TOKEN_TYPE], token[TokenResponseFields.ACCESS_TOKEN])

Просмотреть файл

@ -18,9 +18,7 @@ class azure_bdist_wheel(bdist_wheel):
description = "Create an Azure wheel distribution"
user_options = bdist_wheel.user_options + [
("azure-namespace-package=", None, "Name of the deepest nspkg used")
]
user_options = bdist_wheel.user_options + [("azure-namespace-package=", None, "Name of the deepest nspkg used")]
def initialize_options(self):
bdist_wheel.initialize_options(self)
@ -48,9 +46,7 @@ class azure_bdist_wheel(bdist_wheel):
logger.info("manually remove {} while building the wheel".format(init_file))
os.remove(init_file)
else:
raise ValueError(
"Unable to find {}. Are you sure of your namespace package?".format(init_file)
)
raise ValueError("Unable to find {}. Are you sure of your namespace package?".format(init_file))
bdist_wheel.write_record(self, bdist_dir, distinfo_dir)

Просмотреть файл

@ -1 +0,0 @@

Просмотреть файл

@ -10,9 +10,7 @@ KUSTO_CLUSTER = "https://help.kusto.windows.net"
# In case you want to authenticate with AAD application.
CLIENT_ID = "<insert here your AAD application id>"
CLIENT_SECRET = "<insert here your AAD application key>"
KCSB = KustoConnectionStringBuilder.with_aad_application_key_authentication(
KUSTO_CLUSTER, CLIENT_ID, CLIENT_SECRET
)
KCSB = KustoConnectionStringBuilder.with_aad_application_key_authentication(KUSTO_CLUSTER, CLIENT_ID, CLIENT_SECRET)
# In case you want to authenticate with AAD application certificate.
FILENAME = "path to a PEM certificate"

Просмотреть файл

@ -16,26 +16,19 @@ class ConverterTests(unittest.TestCase):
self.assertEqual(_KustoResultRow.to_timedelta("00:00:00"), timedelta(seconds=0))
self.assertEqual(_KustoResultRow.to_timedelta("00:00:03"), timedelta(seconds=3))
self.assertEqual(_KustoResultRow.to_timedelta("00:04:03"), timedelta(minutes=4, seconds=3))
self.assertEqual(
_KustoResultRow.to_timedelta("02:04:03"), timedelta(hours=2, minutes=4, seconds=3)
)
self.assertEqual(_KustoResultRow.to_timedelta("02:04:03"), timedelta(hours=2, minutes=4, seconds=3))
# Test milliseconds
self.assertEqual(_KustoResultRow.to_timedelta("00:00:00.099"), timedelta(milliseconds=99))
self.assertEqual(
_KustoResultRow.to_timedelta("02:04:03.0123"),
timedelta(hours=2, minutes=4, seconds=3, microseconds=12300),
_KustoResultRow.to_timedelta("02:04:03.0123"), timedelta(hours=2, minutes=4, seconds=3, microseconds=12300)
)
# Test days
self.assertEqual(_KustoResultRow.to_timedelta("01.00:00:00"), timedelta(days=1))
self.assertEqual(
_KustoResultRow.to_timedelta("02.04:05:07"),
timedelta(days=2, hours=4, minutes=5, seconds=7),
)
self.assertEqual(_KustoResultRow.to_timedelta("02.04:05:07"), timedelta(days=2, hours=4, minutes=5, seconds=7))
# Test negative
self.assertEqual(_KustoResultRow.to_timedelta("-01.00:00:00"), -timedelta(days=1))
self.assertEqual(
_KustoResultRow.to_timedelta("-02.04:05:07"),
-timedelta(days=2, hours=4, minutes=5, seconds=7),
_KustoResultRow.to_timedelta("-02.04:05:07"), -timedelta(days=2, hours=4, minutes=5, seconds=7)
)
# Test all together
self.assertEqual(_KustoResultRow.to_timedelta("00.00:00:00.000"), timedelta(seconds=0))
@ -43,9 +36,7 @@ class ConverterTests(unittest.TestCase):
_KustoResultRow.to_timedelta("02.04:05:07.789"),
timedelta(days=2, hours=4, minutes=5, seconds=7, milliseconds=789),
)
self.assertEqual(
_KustoResultRow.to_timedelta("03.00:00:00.111"), timedelta(days=3, milliseconds=111)
)
self.assertEqual(_KustoResultRow.to_timedelta("03.00:00:00.111"), timedelta(days=3, milliseconds=111))
# Test from Ticks
self.assertEqual(_KustoResultRow.to_timedelta(-80080008), timedelta(microseconds=-8008001))
self.assertEqual(_KustoResultRow.to_timedelta(10010001), timedelta(microseconds=1001000))

Просмотреть файл

@ -41,9 +41,7 @@ def mocked_requests_post(*args, **kwargs):
file_name = "querypartialresults.json"
elif "Deft" in kwargs["json"]["csl"]:
file_name = "deft.json"
with open(
os.path.join(os.path.dirname(__file__), "input", file_name), "r"
) as response_file:
with open(os.path.join(os.path.dirname(__file__), "input", file_name), "r") as response_file:
data = response_file.read()
return MockResponse(json.loads(data), 200)
@ -52,9 +50,7 @@ def mocked_requests_post(*args, **kwargs):
file_name = "versionshowcommandresult.json"
else:
file_name = "adminthenquery.json"
with open(
os.path.join(os.path.dirname(__file__), "input", file_name), "r"
) as response_file:
with open(os.path.join(os.path.dirname(__file__), "input", file_name), "r") as response_file:
data = response_file.read()
return MockResponse(json.loads(data), 200)
@ -148,18 +144,10 @@ class KustoClientTests(unittest.TestCase):
self.assertEqual(type(row["xtextWithNulls"]), type(expected["xtextWithNulls"]))
self.assertEqual(type(row["xdynamicWithNulls"]), type(expected["xdynamicWithNulls"]))
expected["rownumber"] = (
0 if expected["rownumber"] is None else expected["rownumber"] + 1
)
expected["rowguid"] = text_type(
"0000000{0}-0000-0000-0001-020304050607".format(expected["rownumber"])
)
expected["xdouble"] = round(
float(0) if expected["xdouble"] is None else expected["xdouble"] + 1.0001, 4
)
expected["xfloat"] = round(
float(0) if expected["xfloat"] is None else expected["xfloat"] + 1.01, 2
)
expected["rownumber"] = 0 if expected["rownumber"] is None else expected["rownumber"] + 1
expected["rowguid"] = text_type("0000000{0}-0000-0000-0001-020304050607".format(expected["rownumber"]))
expected["xdouble"] = round(float(0) if expected["xdouble"] is None else expected["xdouble"] + 1.0001, 4)
expected["xfloat"] = round(float(0) if expected["xfloat"] is None else expected["xfloat"] + 1.01, 2)
expected["xbool"] = False if expected["xbool"] is None else not expected["xbool"]
expected["xint16"] = 0 if expected["xint16"] is None else expected["xint16"] + 1
expected["xint32"] = 0 if expected["xint32"] is None else expected["xint32"] + 1
@ -168,9 +156,7 @@ class KustoClientTests(unittest.TestCase):
expected["xuint16"] = 0 if expected["xuint16"] is None else expected["xuint16"] + 1
expected["xuint32"] = 0 if expected["xuint32"] is None else expected["xuint32"] + 1
expected["xuint64"] = 0 if expected["xuint64"] is None else expected["xuint64"] + 1
expected["xdate"] = expected["xdate"] or datetime(
2013, 1, 1, 1, 1, 1, 0, tzinfo=tzutc()
)
expected["xdate"] = expected["xdate"] or datetime(2013, 1, 1, 1, 1, 1, 0, tzinfo=tzutc())
expected["xdate"] = expected["xdate"].replace(year=expected["xdate"].year + 1)
expected["xsmalltext"] = DIGIT_WORDS[int(expected["xint16"])]
expected["xtext"] = DIGIT_WORDS[int(expected["xint16"])]
@ -183,9 +169,7 @@ class KustoClientTests(unittest.TestCase):
* (-1) ** (expected["rownumber"] + 1)
)
if expected["xint16"] > 0:
expected["xdynamicWithNulls"] = text_type(
'{{"rowId":{0},"arr":[0,{0}]}}'.format(expected["xint16"])
)
expected["xdynamicWithNulls"] = text_type('{{"rowId":{0},"arr":[0,{0}]}}'.format(expected["xint16"]))
@patch("requests.post", side_effect=mocked_requests_post)
@patch("azure.kusto.data.security._AadHelper.acquire_token", side_effect=mocked_aad_helper)
@ -202,8 +186,7 @@ class KustoClientTests(unittest.TestCase):
result = primary_table[0]
self.assertEqual(result["BuildVersion"], "1.0.6693.14577")
self.assertEqual(
result["BuildTime"],
datetime(year=2018, month=4, day=29, hour=8, minute=5, second=54, tzinfo=tzutc()),
result["BuildTime"], datetime(year=2018, month=4, day=29, hour=8, minute=5, second=54, tzinfo=tzutc())
)
self.assertEqual(result["ServiceType"], "Engine")
self.assertEqual(result["ProductVersion"], "KustoMain_2018.04.29.5")
@ -213,11 +196,7 @@ class KustoClientTests(unittest.TestCase):
def test_sanity_data_frame(self, mock_post, mock_aad):
"""Tests KustoResponse to pandas.DataFrame."""
client = KustoClient("https://somecluster.kusto.windows.net")
data_frame = (
client.execute_query("PythonTest", "Deft")
.primary_results[0]
.to_dataframe(errors="ignore")
)
data_frame = client.execute_query("PythonTest", "Deft").primary_results[0].to_dataframe(errors="ignore")
self.assertEqual(len(data_frame.columns), 19)
expected_dict = {
"rownumber": Series([None, 0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]),
@ -237,13 +216,9 @@ class KustoClientTests(unittest.TestCase):
],
dtype=object,
),
"xdouble": Series(
[None, 0., 1.0001, 2.0002, 3.0003, 4.0004, 5.0005, 6.0006, 7.0007, 8.0008, 9.0009]
),
"xdouble": Series([None, 0., 1.0001, 2.0002, 3.0003, 4.0004, 5.0005, 6.0006, 7.0007, 8.0008, 9.0009]),
"xfloat": Series([None, 0., 1.01, 2.02, 3.03, 4.04, 5.05, 6.06, 7.07, 8.08, 9.09]),
"xbool": Series(
[None, False, True, False, True, False, True, False, True, False, True], dtype=bool
),
"xbool": Series([None, False, True, False, True, False, True, False, True, False, True], dtype=bool),
"xint16": Series([None, 0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]),
"xint32": Series([None, 0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]),
"xint64": Series([None, 0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]),
@ -268,40 +243,12 @@ class KustoClientTests(unittest.TestCase):
dtype="datetime64[ns]",
),
"xsmalltext": Series(
[
"",
"Zero",
"One",
"Two",
"Three",
"Four",
"Five",
"Six",
"Seven",
"Eight",
"Nine",
],
dtype=object,
["", "Zero", "One", "Two", "Three", "Four", "Five", "Six", "Seven", "Eight", "Nine"], dtype=object
),
"xtext": Series(
[
"",
"Zero",
"One",
"Two",
"Three",
"Four",
"Five",
"Six",
"Seven",
"Eight",
"Nine",
],
dtype=object,
),
"xnumberAsText": Series(
["", "0", "1", "2", "3", "4", "5", "6", "7", "8", "9"], dtype=object
["", "Zero", "One", "Two", "Three", "Four", "Five", "Six", "Seven", "Eight", "Nine"], dtype=object
),
"xnumberAsText": Series(["", "0", "1", "2", "3", "4", "5", "6", "7", "8", "9"], dtype=object),
"xtime": Series(
[
"NaT",

Просмотреть файл

@ -34,21 +34,15 @@ class KustoConnectionStringBuilderTests(unittest.TestCase):
uuid = str(uuid4())
key = "key of application"
kcsbs = [
KustoConnectionStringBuilder(
"localhost;Application client Id={0};application Key={1}".format(uuid, key)
),
KustoConnectionStringBuilder("localhost;Application client Id={0};application Key={1}".format(uuid, key)),
KustoConnectionStringBuilder(
"Data Source=localhost ; Application Client Id={0}; Appkey ={1}".format(uuid, key)
),
KustoConnectionStringBuilder(
" Addr = localhost ; AppClientId = {0} ; AppKey ={1}".format(uuid, key)
),
KustoConnectionStringBuilder(" Addr = localhost ; AppClientId = {0} ; AppKey ={1}".format(uuid, key)),
KustoConnectionStringBuilder(
"Network Address = localhost; AppClientId = {0} ; AppKey ={1}".format(uuid, key)
),
KustoConnectionStringBuilder.with_aad_application_key_authentication(
"localhost", uuid, key
),
KustoConnectionStringBuilder.with_aad_application_key_authentication("localhost", uuid, key),
]
kcsb1 = KustoConnectionStringBuilder("server=localhost")
@ -77,21 +71,15 @@ class KustoConnectionStringBuilderTests(unittest.TestCase):
user = "test"
password = "Pa$$w0rd"
kcsbs = [
KustoConnectionStringBuilder(
"localhost;AAD User ID={0};password={1}".format(user, password)
),
KustoConnectionStringBuilder("localhost;AAD User ID={0};password={1}".format(user, password)),
KustoConnectionStringBuilder(
"Data Source=localhost ; AaD User ID={0}; Password ={1}".format(user, password)
),
KustoConnectionStringBuilder(
" Addr = localhost ; AAD User ID = {0} ; Pwd ={1}".format(user, password)
),
KustoConnectionStringBuilder(" Addr = localhost ; AAD User ID = {0} ; Pwd ={1}".format(user, password)),
KustoConnectionStringBuilder(
"Network Address = localhost; AAD User iD = {0} ; Pwd = {1} ".format(user, password)
),
KustoConnectionStringBuilder.with_aad_user_password_authentication(
"localhost", user, password
),
KustoConnectionStringBuilder.with_aad_user_password_authentication("localhost", user, password),
]
kcsb1 = KustoConnectionStringBuilder("Server=localhost")

Просмотреть файл

@ -10,14 +10,16 @@ import tempfile
class FileDescriptor(object):
"""A file to ingest."""
def __init__(self, path, size=0, deleteSourcesOnSuccess=False):
# TODO: this should be changed. holding zipped data in memory isn't efficient
# also, init should be a lean method, not potentially reading and writing files
def __init__(self, path, size=0):
self.path = path
self.size = size
self.delete_sources_on_success = deleteSourcesOnSuccess
self.stream_name = os.path.basename(self.path)
if self.path.endswith(".gz") or self.path.endswith(".zip"):
self.zipped_stream = open(self.path, "rb")
if self.size <= 0:
# TODO: this can be improved by reading last 4 bytes
self.size = int(os.path.getsize(self.path)) * 5
else:
self.size = int(os.path.getsize(self.path))
@ -29,13 +31,11 @@ class FileDescriptor(object):
shutil.copyfileobj(f_in, f_out)
self.zipped_stream.seek(0)
def delete_files(self, success):
def delete_files(self):
"""Deletes the gz file if the original file was not zipped.
In case of success deletes the original file as well."""
if self.zipped_stream is not None:
self.zipped_stream.close()
if success and self.delete_sources_on_success:
os.remove(self.path)
class BlobDescriptor(object):

Просмотреть файл

@ -9,52 +9,48 @@ from ._descriptors import BlobDescriptor
class _IngestionBlobInfo:
def __init__(self, blob, ingestionProperties, deleteSourcesOnSuccess=True, authContext=None):
def __init__(self, blob, ingestion_properties, auth_context=None):
self.properties = dict()
self.properties["BlobPath"] = blob.path
self.properties["RawDataSize"] = blob.size
self.properties["DatabaseName"] = ingestionProperties.database
self.properties["TableName"] = ingestionProperties.table
self.properties["RetainBlobOnSuccess"] = not deleteSourcesOnSuccess
self.properties["FlushImmediately"] = ingestionProperties.flush_immediately
self.properties["DatabaseName"] = ingestion_properties.database
self.properties["TableName"] = ingestion_properties.table
self.properties["RetainBlobOnSuccess"] = True
self.properties["FlushImmediately"] = ingestion_properties.flush_immediately
self.properties["IgnoreSizeLimit"] = False
self.properties["ReportLevel"] = ingestionProperties.report_level.value
self.properties["ReportMethod"] = ingestionProperties.report_method.value
self.properties["ReportLevel"] = ingestion_properties.report_level.value
self.properties["ReportMethod"] = ingestion_properties.report_method.value
self.properties["SourceMessageCreationTime"] = datetime.utcnow().isoformat()
self.properties["Id"] = text_type(uuid.uuid4())
# TODO: Add support for ingestion statuses
# self.properties["IngestionStatusInTable"] = None
# self.properties["BlobPathEncrypted"] = None
additional_properties = ingestionProperties.additional_properties or {}
additional_properties["authorizationContext"] = authContext
additional_properties = ingestion_properties.additional_properties or {}
additional_properties["authorizationContext"] = auth_context
tags = []
if ingestionProperties.additional_tags:
tags.extend(ingestionProperties.additional_tags)
if ingestionProperties.drop_by_tags:
tags.extend(["drop-by:" + drop for drop in ingestionProperties.drop_by_tags])
if ingestionProperties.ingest_by_tags:
tags.extend(["ingest-by:" + ingest for ingest in ingestionProperties.ingest_by_tags])
if ingestion_properties.additional_tags:
tags.extend(ingestion_properties.additional_tags)
if ingestion_properties.drop_by_tags:
tags.extend(["drop-by:" + drop for drop in ingestion_properties.drop_by_tags])
if ingestion_properties.ingest_by_tags:
tags.extend(["ingest-by:" + ingest for ingest in ingestion_properties.ingest_by_tags])
if tags:
additional_properties["tags"] = _convert_list_to_json(tags)
if ingestionProperties.ingest_if_not_exists:
if ingestion_properties.ingest_if_not_exists:
additional_properties["ingestIfNotExists"] = _convert_list_to_json(
ingestionProperties.ingest_if_not_exists
ingestion_properties.ingest_if_not_exists
)
if ingestionProperties.mapping:
json_string = _convert_dict_to_json(ingestionProperties.mapping)
additional_properties[
ingestionProperties.get_mapping_format() + "Mapping"
] = json_string
if ingestionProperties.mapping_reference:
key = ingestionProperties.get_mapping_format() + "MappingReference"
additional_properties[key] = ingestionProperties.mapping_reference
if ingestionProperties.validation_policy:
additional_properties["ValidationPolicy"] = _convert_dict_to_json(
ingestionProperties.validation_policy
)
if ingestionProperties.format:
additional_properties["format"] = ingestionProperties.format.name
if ingestion_properties.mapping:
json_string = _convert_dict_to_json(ingestion_properties.mapping)
additional_properties[ingestion_properties.get_mapping_format() + "Mapping"] = json_string
if ingestion_properties.mapping_reference:
key = ingestion_properties.get_mapping_format() + "MappingReference"
additional_properties[key] = ingestion_properties.mapping_reference
if ingestion_properties.validation_policy:
additional_properties["ValidationPolicy"] = _convert_dict_to_json(ingestion_properties.validation_policy)
if ingestion_properties.format:
additional_properties["format"] = ingestion_properties.format.name
if additional_properties:
self.properties["AdditionalProperties"] = additional_properties

Просмотреть файл

@ -3,7 +3,9 @@
import base64
import random
import uuid
from six import text_type
import os
import time
import tempfile
from azure.storage.common import CloudStorageAccount
@ -28,68 +30,81 @@ class KustoIngestClient(object):
kusto_client = KustoClient(kcsb)
self._resource_manager = _ResourceManager(kusto_client)
def ingest_from_multiple_files(self, files, delete_sources_on_success, ingestion_properties):
def ingest_from_dataframe(self, df, ingestion_properties):
file_name = "df_{timestamp}_{pid}.csv.gz".format(timestamp=int(time.time()), pid=os.getpid())
temp_file_path = os.path.join(tempfile.gettempdir(), file_name)
df.to_csv(temp_file_path, index=False, encoding="utf-8", header=False, compression="gzip")
fd = FileDescriptor(temp_file_path)
blob_name = "{db}__{table}__{guid}__{file}".format(
db=ingestion_properties.database, table=ingestion_properties.table, guid=uuid.uuid4(), file=file_name
)
containers = self._resource_manager.get_containers()
container_details = random.choice(containers)
storage_client = CloudStorageAccount(container_details.storage_account_name, sas_token=container_details.sas)
blob_service = storage_client.create_block_blob_service()
blob_service.create_blob_from_path(
container_name=container_details.object_name, blob_name=blob_name, file_path=temp_file_path
)
url = blob_service.make_blob_url(container_details.object_name, blob_name, sas_token=container_details.sas)
self.ingest_from_blob(BlobDescriptor(url, fd.size), ingestion_properties=ingestion_properties)
fd.delete_files()
os.unlink(temp_file_path)
def ingest_from_file(self, file, ingestion_properties):
"""Enqueuing an ingest command from local files.
:param files: List of FileDescriptor or file paths. The list of files to be ingested.
:param bool delete_sources_on_success: After a successful ingest,
whether to delete the origin files.
:param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
"""
blobs = list()
file_descriptors = list()
for file in files:
if isinstance(file, FileDescriptor):
descriptor = file
else:
descriptor = FileDescriptor(file, deleteSourcesOnSuccess=delete_sources_on_success)
file_descriptors.append(descriptor)
blob_name = (
ingestion_properties.database
+ "__"
+ ingestion_properties.table
+ "__"
+ text_type(uuid.uuid4())
+ "__"
+ descriptor.stream_name
)
containers = self._resource_manager.get_containers()
container_details = random.choice(containers)
storage_client = CloudStorageAccount(
container_details.storage_account_name, sas_token=container_details.sas
)
blob_service = storage_client.create_block_blob_service()
blob_service.create_blob_from_stream(
container_name=container_details.object_name,
blob_name=blob_name,
stream=descriptor.zipped_stream,
)
url = blob_service.make_blob_url(
container_details.object_name, blob_name, sas_token=container_details.sas
)
blobs.append(BlobDescriptor(url, descriptor.size))
self.ingest_from_multiple_blobs(blobs, delete_sources_on_success, ingestion_properties)
for descriptor in file_descriptors:
descriptor.delete_files(True)
containers = self._resource_manager.get_containers()
def ingest_from_multiple_blobs(self, blobs, delete_sources_on_success, ingestion_properties):
if isinstance(file, FileDescriptor):
descriptor = file
else:
descriptor = FileDescriptor(file)
file_descriptors.append(descriptor)
blob_name = "{db}__{table}__{guid}__{file}".format(
db=ingestion_properties.database,
table=ingestion_properties.table,
guid=uuid.uuid4(),
file=descriptor.stream_name,
)
container_details = random.choice(containers)
storage_client = CloudStorageAccount(container_details.storage_account_name, sas_token=container_details.sas)
blob_service = storage_client.create_block_blob_service()
blob_service.create_blob_from_stream(
container_name=container_details.object_name, blob_name=blob_name, stream=descriptor.zipped_stream
)
url = blob_service.make_blob_url(container_details.object_name, blob_name, sas_token=container_details.sas)
self.ingest_from_blob(BlobDescriptor(url, descriptor.size), ingestion_properties=ingestion_properties)
def ingest_from_blob(self, blob, ingestion_properties):
"""Enqueuing an ingest command from azure blobs.
:param files: List of BlobDescriptor. The list of blobs to be ingested. Please provide the
raw blob size to each of the descriptors.
:param bool delete_sources_on_success: After a successful ingest,
whether to delete the origin files.
:param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
"""
for blob in blobs:
queues = self._resource_manager.get_ingestion_queues()
queue_details = random.choice(queues)
storage_client = CloudStorageAccount(
queue_details.storage_account_name, sas_token=queue_details.sas
)
queue_service = storage_client.create_queue_service()
authorization_context = self._resource_manager.get_authorization_context()
ingestion_blob_info = _IngestionBlobInfo(
blob, ingestion_properties, delete_sources_on_success, authorization_context
)
ingestion_blob_info_json = ingestion_blob_info.to_json()
encoded = base64.b64encode(ingestion_blob_info_json.encode("utf-8")).decode("utf-8")
queue_service.put_message(queue_name=queue_details.object_name, content=encoded)
queues = self._resource_manager.get_ingestion_queues()
queue_details = random.choice(queues)
storage_client = CloudStorageAccount(queue_details.storage_account_name, sas_token=queue_details.sas)
queue_service = storage_client.create_queue_service()
authorization_context = self._resource_manager.get_authorization_context()
ingestion_blob_info = _IngestionBlobInfo(
blob, ingestion_properties=ingestion_properties, auth_context=authorization_context
)
ingestion_blob_info_json = ingestion_blob_info.to_json()
encoded = base64.b64encode(ingestion_blob_info_json.encode("utf-8")).decode("utf-8")
queue_service.put_message(queue_name=queue_details.object_name, content=encoded)

Просмотреть файл

@ -5,7 +5,7 @@ from datetime import datetime, timedelta
from ._connection_string import _ConnectionString
class _IngestClientResources:
class _IngestClientResources(object):
def __init__(
self,
secured_ready_for_aggregation_queues=None,
@ -31,7 +31,7 @@ class _IngestClientResources:
return all(resources)
class _ResourceManager:
class _ResourceManager(object):
def __init__(self, kusto_client):
self._kusto_client = kusto_client
self._refresh_period = timedelta(hours=1)
@ -45,31 +45,20 @@ class _ResourceManager:
def _refresh_ingest_client_resources(self):
if (
not self._ingest_client_resources
or (self._ingest_client_resources_last_update + self._refresh_period)
<= datetime.utcnow()
or (self._ingest_client_resources_last_update + self._refresh_period) <= datetime.utcnow()
or not self._ingest_client_resources.is_applicable()
):
self._ingest_client_resources = self._get_ingest_client_resources_from_service()
self._ingest_client_resources_last_update = datetime.utcnow()
def _get_resource_by_name(self, df, resource_name):
resource = (
df[df["ResourceTypeName"] == resource_name]
.StorageRoot.map(_ConnectionString.parse)
.tolist()
)
resource = df[df["ResourceTypeName"] == resource_name].StorageRoot.map(_ConnectionString.parse).tolist()
return resource
def _get_ingest_client_resources_from_service(self):
df = (
self._kusto_client.execute("NetDefaultDB", ".get ingestion resources")
.primary_results[0]
.to_dataframe()
)
df = self._kusto_client.execute("NetDefaultDB", ".get ingestion resources").primary_results[0].to_dataframe()
secured_ready_for_aggregation_queues = self._get_resource_by_name(
df, "SecuredReadyForAggregationQueue"
)
secured_ready_for_aggregation_queues = self._get_resource_by_name(df, "SecuredReadyForAggregationQueue")
failed_ingestions_queues = self._get_resource_by_name(df, "FailedIngestionsQueue")
successful_ingestions_queues = self._get_resource_by_name(df, "SuccessfulIngestionsQueue")
containers = self._get_resource_by_name(df, "TempStorage")
@ -93,9 +82,9 @@ class _ResourceManager:
self._authorization_context_last_update = datetime.utcnow()
def _get_authorization_context_from_service(self):
return self._kusto_client.execute(
"NetDefaultDB", ".get kusto identity token"
).primary_results[0][0]["AuthorizationContext"]
return self._kusto_client.execute("NetDefaultDB", ".get kusto identity token").primary_results[0][0][
"AuthorizationContext"
]
def get_ingestion_queues(self):
self._refresh_ingest_client_resources()

Просмотреть файл

@ -18,9 +18,7 @@ class azure_bdist_wheel(bdist_wheel):
description = "Create an Azure wheel distribution"
user_options = bdist_wheel.user_options + [
("azure-namespace-package=", None, "Name of the deepest nspkg used")
]
user_options = bdist_wheel.user_options + [("azure-namespace-package=", None, "Name of the deepest nspkg used")]
def initialize_options(self):
bdist_wheel.initialize_options(self)
@ -48,9 +46,7 @@ class azure_bdist_wheel(bdist_wheel):
logger.info("manually remove {} while building the wheel".format(init_file))
os.remove(init_file)
else:
raise ValueError(
"Unable to find {}. Are you sure of your namespace package?".format(init_file)
)
raise ValueError("Unable to find {}. Are you sure of your namespace package?".format(init_file))
bdist_wheel.write_record(self, bdist_dir, distinfo_dir)

Просмотреть файл

@ -0,0 +1,2 @@
[tool.black]
line-length = 120

Просмотреть файл

@ -47,6 +47,7 @@ setup(
"azure-storage-common>=1.1.0",
"azure-storage-queue>=1.1.0",
"six>=1.10.0",
"pandas>=0.15.0",
],
cmdclass=cmdclass,
)

Просмотреть файл

@ -1 +0,0 @@

Просмотреть файл

@ -44,88 +44,38 @@ class Helpers:
mappings.append(CsvColumnMapping(columnName="xdate", cslDataType="datetime", ordinal=12))
mappings.append(CsvColumnMapping(columnName="xsmalltext", cslDataType="string", ordinal=13))
mappings.append(CsvColumnMapping(columnName="xtext", cslDataType="string", ordinal=14))
mappings.append(
CsvColumnMapping(columnName="xnumberAsText", cslDataType="string", ordinal=15)
)
mappings.append(CsvColumnMapping(columnName="xnumberAsText", cslDataType="string", ordinal=15))
mappings.append(CsvColumnMapping(columnName="xtime", cslDataType="timespan", ordinal=16))
mappings.append(
CsvColumnMapping(columnName="xtextWithNulls", cslDataType="string", ordinal=17)
)
mappings.append(
CsvColumnMapping(columnName="xdynamicWithNulls", cslDataType="dynamic", ordinal=18)
)
mappings.append(CsvColumnMapping(columnName="xtextWithNulls", cslDataType="string", ordinal=17))
mappings.append(CsvColumnMapping(columnName="xdynamicWithNulls", cslDataType="dynamic", ordinal=18))
return mappings
@staticmethod
def create_deft_table_json_mappings():
"""A method to define json mappings to deft table."""
mappings = list()
mappings.append(JsonColumnMapping(columnName="rownumber", jsonPath="$.rownumber", cslDataType="int"))
mappings.append(JsonColumnMapping(columnName="rowguid", jsonPath="$.rowguid", cslDataType="string"))
mappings.append(JsonColumnMapping(columnName="xdouble", jsonPath="$.xdouble", cslDataType="real"))
mappings.append(JsonColumnMapping(columnName="xfloat", jsonPath="$.xfloat", cslDataType="real"))
mappings.append(JsonColumnMapping(columnName="xbool", jsonPath="$.xbool", cslDataType="bool"))
mappings.append(JsonColumnMapping(columnName="xint16", jsonPath="$.xint16", cslDataType="int"))
mappings.append(JsonColumnMapping(columnName="xint32", jsonPath="$.xint32", cslDataType="int"))
mappings.append(JsonColumnMapping(columnName="xint64", jsonPath="$.xint64", cslDataType="long"))
mappings.append(JsonColumnMapping(columnName="xuint8", jsonPath="$.xuint8", cslDataType="long"))
mappings.append(JsonColumnMapping(columnName="xuint16", jsonPath="$.xuint16", cslDataType="long"))
mappings.append(JsonColumnMapping(columnName="xuint32", jsonPath="$.xuint32", cslDataType="long"))
mappings.append(JsonColumnMapping(columnName="xuint64", jsonPath="$.xuint64", cslDataType="long"))
mappings.append(JsonColumnMapping(columnName="xdate", jsonPath="$.xdate", cslDataType="datetime"))
mappings.append(JsonColumnMapping(columnName="xsmalltext", jsonPath="$.xsmalltext", cslDataType="string"))
mappings.append(JsonColumnMapping(columnName="xtext", jsonPath="$.xtext", cslDataType="string"))
mappings.append(JsonColumnMapping(columnName="xnumberAsText", jsonPath="$.xnumberAsText", cslDataType="string"))
mappings.append(JsonColumnMapping(columnName="xtime", jsonPath="$.xtime", cslDataType="timespan"))
mappings.append(
JsonColumnMapping(columnName="rownumber", jsonPath="$.rownumber", cslDataType="int")
JsonColumnMapping(columnName="xtextWithNulls", jsonPath="$.xtextWithNulls", cslDataType="string")
)
mappings.append(
JsonColumnMapping(columnName="rowguid", jsonPath="$.rowguid", cslDataType="string")
)
mappings.append(
JsonColumnMapping(columnName="xdouble", jsonPath="$.xdouble", cslDataType="real")
)
mappings.append(
JsonColumnMapping(columnName="xfloat", jsonPath="$.xfloat", cslDataType="real")
)
mappings.append(
JsonColumnMapping(columnName="xbool", jsonPath="$.xbool", cslDataType="bool")
)
mappings.append(
JsonColumnMapping(columnName="xint16", jsonPath="$.xint16", cslDataType="int")
)
mappings.append(
JsonColumnMapping(columnName="xint32", jsonPath="$.xint32", cslDataType="int")
)
mappings.append(
JsonColumnMapping(columnName="xint64", jsonPath="$.xint64", cslDataType="long")
)
mappings.append(
JsonColumnMapping(columnName="xuint8", jsonPath="$.xuint8", cslDataType="long")
)
mappings.append(
JsonColumnMapping(columnName="xuint16", jsonPath="$.xuint16", cslDataType="long")
)
mappings.append(
JsonColumnMapping(columnName="xuint32", jsonPath="$.xuint32", cslDataType="long")
)
mappings.append(
JsonColumnMapping(columnName="xuint64", jsonPath="$.xuint64", cslDataType="long")
)
mappings.append(
JsonColumnMapping(columnName="xdate", jsonPath="$.xdate", cslDataType="datetime")
)
mappings.append(
JsonColumnMapping(
columnName="xsmalltext", jsonPath="$.xsmalltext", cslDataType="string"
)
)
mappings.append(
JsonColumnMapping(columnName="xtext", jsonPath="$.xtext", cslDataType="string")
)
mappings.append(
JsonColumnMapping(
columnName="xnumberAsText", jsonPath="$.xnumberAsText", cslDataType="string"
)
)
mappings.append(
JsonColumnMapping(columnName="xtime", jsonPath="$.xtime", cslDataType="timespan")
)
mappings.append(
JsonColumnMapping(
columnName="xtextWithNulls", jsonPath="$.xtextWithNulls", cslDataType="string"
)
)
mappings.append(
JsonColumnMapping(
columnName="xdynamicWithNulls",
jsonPath="$.xdynamicWithNulls",
cslDataType="dynamic",
)
JsonColumnMapping(columnName="xdynamicWithNulls", jsonPath="$.xdynamicWithNulls", cslDataType="dynamic")
)
return mappings
@ -137,18 +87,12 @@ KUSTO_CLIENT.execute("PythonTest", ".drop table Deft ifexists")
# Sanity test - ingest from csv to a non-existing table
CSV_INGESTION_PROPERTIES = IngestionProperties(
"PythonTest",
"Deft",
dataFormat=DataFormat.csv,
mapping=Helpers.create_deft_table_csv_mappings(),
"PythonTest", "Deft", dataFormat=DataFormat.csv, mapping=Helpers.create_deft_table_csv_mappings()
)
CSV_FILE_PATH = os.path.join(os.getcwd(), "azure-kusto-ingest", "tests", "input", "dataset.csv")
ZIPPED_CSV_FILE_PATH = os.path.join(
os.getcwd(), "azure-kusto-ingest", "tests", "input", "dataset.csv.gz"
)
KUSTO_INGEST_CLIENT.ingest_from_multiple_files(
[CSV_FILE_PATH, ZIPPED_CSV_FILE_PATH], False, CSV_INGESTION_PROPERTIES
)
ZIPPED_CSV_FILE_PATH = os.path.join(os.getcwd(), "azure-kusto-ingest", "tests", "input", "dataset.csv.gz")
for f in [CSV_FILE_PATH, ZIPPED_CSV_FILE_PATH]:
KUSTO_INGEST_CLIENT.ingest_from_file(f, CSV_INGESTION_PROPERTIES)
time.sleep(60)
RESPONSE = KUSTO_CLIENT.execute("PythonTest", "Deft | count")
@ -160,18 +104,14 @@ for row in RESPONSE.primary_results[0]:
# Sanity test - ingest from json to an existing table
JSON_INGESTION_PROPERTIES = IngestionProperties(
"PythonTest",
"Deft",
dataFormat=DataFormat.json,
mapping=Helpers.create_deft_table_json_mappings(),
"PythonTest", "Deft", dataFormat=DataFormat.json, mapping=Helpers.create_deft_table_json_mappings()
)
JSON_FILE_PATH = os.path.join(os.getcwd(), "azure-kusto-ingest", "tests", "input", "dataset.json")
ZIPPED_JSON_FILE_PATH = os.path.join(
os.getcwd(), "azure-kusto-ingest", "tests", "input", "dataset.jsonz.gz"
)
KUSTO_INGEST_CLIENT.ingest_from_multiple_files(
[JSON_FILE_PATH, ZIPPED_JSON_FILE_PATH], False, JSON_INGESTION_PROPERTIES
)
ZIPPED_JSON_FILE_PATH = os.path.join(os.getcwd(), "azure-kusto-ingest", "tests", "input", "dataset.jsonz.gz")
for f in [JSON_FILE_PATH, ZIPPED_JSON_FILE_PATH]:
KUSTO_INGEST_CLIENT.ingest_from_file(f, JSON_INGESTION_PROPERTIES)
time.sleep(60)
RESPONSE = KUSTO_CLIENT.execute("PythonTest", "Deft | count")
for row in RESPONSE.primary_results[0]:
@ -199,9 +139,9 @@ JSON_INGESTION_PROPERTIES = IngestionProperties(
reportMethod=ReportMethod.QueueAndTable,
validationPolicy=VALIDATION_POLICY,
)
KUSTO_INGEST_CLIENT.ingest_from_multiple_files(
[JSON_FILE_PATH, ZIPPED_JSON_FILE_PATH], False, JSON_INGESTION_PROPERTIES
)
for f in [JSON_FILE_PATH, ZIPPED_JSON_FILE_PATH]:
KUSTO_INGEST_CLIENT.ingest_from_file(f, JSON_INGESTION_PROPERTIES)
time.sleep(60)
RESPONSE = KUSTO_CLIENT.execute("PythonTest", "Deft | count")
for row in RESPONSE.primary_results[0]:
@ -219,9 +159,9 @@ JSON_INGESTION_PROPERTIES = IngestionProperties(
ingestIfNotExists=["ingestByTag"],
dropByTags=["drop", "drop-by"],
)
KUSTO_INGEST_CLIENT.ingest_from_multiple_files(
[JSON_FILE_PATH, ZIPPED_JSON_FILE_PATH], False, JSON_INGESTION_PROPERTIES
)
for f in [JSON_FILE_PATH, ZIPPED_JSON_FILE_PATH]:
KUSTO_INGEST_CLIENT.ingest_from_file(f, JSON_INGESTION_PROPERTIES)
time.sleep(60)
RESPONSE = KUSTO_CLIENT.execute("PythonTest", "Deft | count")
for row in RESPONSE.primary_results[0]:
@ -232,13 +172,10 @@ for row in RESPONSE.primary_results[0]:
# Test ingest with TSV format and csvMapping
TSV_INGESTION_PROPERTIES = IngestionProperties(
"PythonTest",
"Deft",
dataFormat=DataFormat.tsv,
mapping=Helpers.create_deft_table_csv_mappings(),
"PythonTest", "Deft", dataFormat=DataFormat.tsv, mapping=Helpers.create_deft_table_csv_mappings()
)
TSV_FILE_PATH = os.path.join(os.getcwd(), "azure-kusto-ingest", "tests", "input", "dataset.tsv")
KUSTO_INGEST_CLIENT.ingest_from_multiple_files([TSV_FILE_PATH], False, TSV_INGESTION_PROPERTIES)
KUSTO_INGEST_CLIENT.ingest_from_file(TSV_FILE_PATH, TSV_INGESTION_PROPERTIES)
time.sleep(60)
RESPONSE = KUSTO_CLIENT.execute("PythonTest", "Deft | count")
for row in RESPONSE.primary_results[0]:

Просмотреть файл

@ -1,10 +0,0 @@
{
"Tables": [{
"TableName": "Table_0",
"Columns": [{
"ColumnName": "AuthorizationContext",
"DataType": "String"
}],
"Rows": [["authorization_context"]]
}]
}

Просмотреть файл

@ -1,39 +0,0 @@
{
"Tables": [{
"TableName": "Table_0",
"Columns": [{
"ColumnName": "ResourceTypeName",
"DataType": "String"
},
{
"ColumnName": "StorageRoot",
"DataType": "String"
}],
"Rows": [["SecuredReadyForAggregationQueue",
"https://storageaccount.queue.core.windows.net/readyforaggregation-secured?sas"],
["SecuredReadyForAggregationQueue",
"https://storageaccount.queue.core.windows.net/readyforaggregation-secured?sas"],
["SecuredReadyForAggregationQueue",
"https://storageaccount.queue.core.windows.net/readyforaggregation-secured?sas"],
["SecuredReadyForAggregationQueue",
"https://storageaccount.queue.core.windows.net/readyforaggregation-secured?sas"],
["SecuredReadyForAggregationQueue",
"https://storageaccount.queue.core.windows.net/readyforaggregation-secured?sas"],
["FailedIngestionsQueue",
"https://storageaccount.queue.core.windows.net/failedingestions?sas"],
["SuccessfulIngestionsQueue",
"https://storageaccount.queue.core.windows.net/successfulingestions?sas"],
["TempStorage",
"https://storageaccount.blob.core.windows.net/tempstorage?sas"],
["TempStorage",
"https://storageaccount.blob.core.windows.net/tempstorage?sas"],
["TempStorage",
"https://storageaccount.blob.core.windows.net/tempstorage?sas"],
["TempStorage",
"https://storageaccount.blob.core.windows.net/tempstorage?sas"],
["TempStorage",
"https://storageaccount.blob.core.windows.net/tempstorage?sas"],
["IngestionsStatusTable",
"https://storageaccount.table.core.windows.net/ingestionsstatus?sas"]]
}]
}

Просмотреть файл

@ -1,17 +1,9 @@
"""Sample how to use Kusto Ingest client"""
from azure.kusto.data.request import KustoConnectionStringBuilder
from azure.kusto.ingest import (
KustoIngestClient,
IngestionProperties,
FileDescriptor,
BlobDescriptor,
DataFormat,
)
from azure.kusto.ingest import KustoIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat
INGESTION_PROPERTIES = IngestionProperties(
database="database name", table="table name", dataFormat=DataFormat.csv
)
INGESTION_PROPERTIES = IngestionProperties(database="database name", table="table name", dataFormat=DataFormat.csv)
INGEST_CLIENT = KustoIngestClient("https://ingest-<clustername>.kusto.windows.net")
@ -20,20 +12,10 @@ KCSB = KustoConnectionStringBuilder.with_aad_application_key_authentication(
)
INGEST_CLIENT = KustoIngestClient(KCSB)
FILE_DESCRIPTOR = FileDescriptor(
"E:\\filePath.csv", 3333
) # 3333 is the raw size of the data in bytes.
INGEST_CLIENT.ingest_from_multiple_files(
[FILE_DESCRIPTOR], delete_sources_on_success=True, ingestion_properties=INGESTION_PROPERTIES
)
FILE_DESCRIPTOR = FileDescriptor("E:\\filePath.csv", 3333) # 3333 is the raw size of the data in bytes.
INGEST_CLIENT.ingest_from_file(FILE_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
INGEST_CLIENT.ingest_from_multiple_files(
["E:\\filePath.csv"], delete_sources_on_success=True, ingestion_properties=INGESTION_PROPERTIES
)
INGEST_CLIENT.ingest_from_file("E:\\filePath.csv", ingestion_properties=INGESTION_PROPERTIES)
BLOB_DESCRIPTOR = BlobDescriptor(
"https://path-to-blob.csv.gz?sas", 10
) # 10 is the raw size of the data in bytes.
INGEST_CLIENT.ingest_from_multiple_blobs(
[BLOB_DESCRIPTOR], delete_sources_on_success=True, ingestion_properties=INGESTION_PROPERTIES
)
BLOB_DESCRIPTOR = BlobDescriptor("https://path-to-blob.csv.gz?sas", 10) # 10 is the raw size of the data in bytes.
INGEST_CLIENT.ingest_from_blob(BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)

Просмотреть файл

@ -13,9 +13,7 @@ class ConnectionStringTests(unittest.TestCase):
container_name = "containername"
container_sas = "somesas"
uri = "https://{}.blob.core.windows.net/{}?{}".format(
storage_name, container_name, container_sas
)
uri = "https://{}.blob.core.windows.net/{}?{}".format(storage_name, container_name, container_sas)
connection_string = _ConnectionString.parse(uri)
self.assertEqual(connection_string.storage_account_name, storage_name)
self.assertEqual(connection_string.object_type, "blob")

Просмотреть файл

@ -1,5 +1,3 @@
"""Test class for FileDescriptor and BlobDescriptor."""
import sys
from os import path
import unittest
@ -19,7 +17,7 @@ class DescriptorsTest(unittest.TestCase):
self.assertTrue(descriptor.zipped_stream.readable(), True)
self.assertEquals(descriptor.zipped_stream.tell(), 0)
self.assertEqual(descriptor.zipped_stream.closed, False)
descriptor.delete_files(True)
descriptor.delete_files()
self.assertEqual(descriptor.zipped_stream.closed, True)
def test_unzipped_file_without_size(self):
@ -32,7 +30,7 @@ class DescriptorsTest(unittest.TestCase):
self.assertTrue(descriptor.zipped_stream.readable(), True)
self.assertEquals(descriptor.zipped_stream.tell(), 0)
self.assertEqual(descriptor.zipped_stream.closed, False)
descriptor.delete_files(True)
descriptor.delete_files()
self.assertEqual(descriptor.zipped_stream.closed, True)
def test_zipped_file_with_size(self):
@ -45,7 +43,7 @@ class DescriptorsTest(unittest.TestCase):
self.assertTrue(descriptor.zipped_stream.readable(), True)
self.assertEquals(descriptor.zipped_stream.tell(), 0)
self.assertEqual(descriptor.zipped_stream.closed, False)
descriptor.delete_files(True)
descriptor.delete_files()
self.assertEqual(descriptor.zipped_stream.closed, True)
def test_zipped_file_without_size(self):
@ -58,5 +56,5 @@ class DescriptorsTest(unittest.TestCase):
self.assertTrue(descriptor.zipped_stream.readable(), True)
self.assertEquals(descriptor.zipped_stream.tell(), 0)
self.assertEqual(descriptor.zipped_stream.closed, False)
descriptor.delete_files(True)
descriptor.delete_files()
self.assertEqual(descriptor.zipped_stream.closed, True)

Просмотреть файл

@ -1,5 +1,3 @@
"""Tests serialization of ingestion blob info. This serialization will be queued to the DM."""
import unittest
import re
import json
@ -45,9 +43,7 @@ class IngestionBlobInfoTest(unittest.TestCase):
validationPolicy=validation_policy,
)
blob = BlobDescriptor("somepath", 10)
blob_info = _IngestionBlobInfo(
blob, properties, deleteSourcesOnSuccess=True, authContext="authorizationContextText"
)
blob_info = _IngestionBlobInfo(blob, properties, auth_context="authorizationContextText")
self._verify_ingestion_blob_info_result(blob_info.to_json())
def test_blob_csv_mapping_reference(self):
@ -70,9 +66,7 @@ class IngestionBlobInfoTest(unittest.TestCase):
validationPolicy=validation_policy,
)
blob = BlobDescriptor("somepath", 10)
blob_info = _IngestionBlobInfo(
blob, properties, deleteSourcesOnSuccess=True, authContext="authorizationContextText"
)
blob_info = _IngestionBlobInfo(blob, properties, auth_context="authorizationContextText")
self._verify_ingestion_blob_info_result(blob_info.to_json())
def test_blob_info_json_mapping(self):
@ -95,9 +89,7 @@ class IngestionBlobInfoTest(unittest.TestCase):
validationPolicy=validation_policy,
)
blob = BlobDescriptor("somepath", 10)
blob_info = _IngestionBlobInfo(
blob, properties, deleteSourcesOnSuccess=True, authContext="authorizationContextText"
)
blob_info = _IngestionBlobInfo(blob, properties, auth_context="authorizationContextText")
self._verify_ingestion_blob_info_result(blob_info.to_json())
def test_blob_json_mapping_reference(self):
@ -120,19 +112,14 @@ class IngestionBlobInfoTest(unittest.TestCase):
validationPolicy=validation_policy,
)
blob = BlobDescriptor("somepath", 10)
blob_info = _IngestionBlobInfo(
blob, properties, deleteSourcesOnSuccess=True, authContext="authorizationContextText"
)
blob_info = _IngestionBlobInfo(blob, properties, auth_context="authorizationContextText")
self._verify_ingestion_blob_info_result(blob_info.to_json())
def test_blob_info_csv_exceptions(self):
"""Tests invalid ingestion properties."""
with self.assertRaises(KustoDuplicateMappingError):
IngestionProperties(
database="database",
table="table",
mapping="mapping",
mappingReference="mappingReference",
database="database", table="table", mapping="mapping", mappingReference="mappingReference"
)
def _verify_ingestion_blob_info_result(self, ingestion_blob_info):
@ -150,12 +137,8 @@ class IngestionBlobInfoTest(unittest.TestCase):
self.assertIsInstance(result["ReportLevel"], int)
self.assertIsInstance(UUID(result["Id"]), UUID)
self.assertRegexpMatches(result["SourceMessageCreationTime"], TIMESTAMP_REGEX)
self.assertEquals(
result["AdditionalProperties"]["authorizationContext"], "authorizationContextText"
)
self.assertEquals(
result["AdditionalProperties"]["ingestIfNotExists"], '["ingestIfNotExistTags"]'
)
self.assertEquals(result["AdditionalProperties"]["authorizationContext"], "authorizationContextText")
self.assertEquals(result["AdditionalProperties"]["ingestIfNotExists"], '["ingestIfNotExistTags"]')
self.assertIn(
result["AdditionalProperties"]["ValidationPolicy"],
(
@ -164,6 +147,5 @@ class IngestionBlobInfoTest(unittest.TestCase):
),
)
self.assertEquals(
result["AdditionalProperties"]["tags"],
'["tag","drop-by:dropByTags","ingest-by:ingestByTags"]',
result["AdditionalProperties"]["tags"], '["tag","drop-by:dropByTags","ingest-by:ingestByTags"]'
)

Просмотреть файл

@ -1,122 +1,209 @@
"""Test for KustoIngestClient."""
import os
import json
import unittest
import json
import base64
from mock import patch
from six import text_type
import responses
import io
from azure.kusto.ingest import KustoIngestClient, IngestionProperties, DataFormat
UUID_REGEX = "[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}"
BLOB_NAME_REGEX = "database__table__" + UUID_REGEX + "__dataset.csv.gz"
BLOB_URL_REGEX = (
"https://storageaccount.blob.core.windows.net/tempstorage/database__table__"
+ UUID_REGEX
+ "__dataset.csv.gz[?]sas"
"https://storageaccount.blob.core.windows.net/tempstorage/database__table__" + UUID_REGEX + "__dataset.csv.gz[?]sas"
)
def mocked_aad_helper(*args, **kwargs):
"""Mock to replace _AadHelper._acquire_token"""
return None
def request_callback(request):
body = json.loads(request.body.decode()) if type(request.body) == bytes else json.loads(request.body)
response_status = 400
response_headers = []
response_body = {}
if ".get ingestion resources" in body["csl"]:
response_status = 200
response_body = {
"Tables": [
{
"TableName": "Table_0",
"Columns": [
{"ColumnName": "ResourceTypeName", "DataType": "String"},
{"ColumnName": "StorageRoot", "DataType": "String"},
],
"Rows": [
[
"SecuredReadyForAggregationQueue",
"https://storageaccount.queue.core.windows.net/readyforaggregation-secured?sas",
],
[
"SecuredReadyForAggregationQueue",
"https://storageaccount.queue.core.windows.net/readyforaggregation-secured?sas",
],
[
"SecuredReadyForAggregationQueue",
"https://storageaccount.queue.core.windows.net/readyforaggregation-secured?sas",
],
[
"SecuredReadyForAggregationQueue",
"https://storageaccount.queue.core.windows.net/readyforaggregation-secured?sas",
],
[
"SecuredReadyForAggregationQueue",
"https://storageaccount.queue.core.windows.net/readyforaggregation-secured?sas",
],
["FailedIngestionsQueue", "https://storageaccount.queue.core.windows.net/failedingestions?sas"],
[
"SuccessfulIngestionsQueue",
"https://storageaccount.queue.core.windows.net/successfulingestions?sas",
],
["TempStorage", "https://storageaccount.blob.core.windows.net/tempstorage?sas"],
["TempStorage", "https://storageaccount.blob.core.windows.net/tempstorage?sas"],
["TempStorage", "https://storageaccount.blob.core.windows.net/tempstorage?sas"],
["TempStorage", "https://storageaccount.blob.core.windows.net/tempstorage?sas"],
["TempStorage", "https://storageaccount.blob.core.windows.net/tempstorage?sas"],
["IngestionsStatusTable", "https://storageaccount.table.core.windows.net/ingestionsstatus?sas"],
],
}
]
}
def mocked_requests_post(*args, **kwargs):
"""Mock to replace requests.post"""
if ".get kusto identity token" in body["csl"]:
response_status = 200
response_body = {
"Tables": [
{
"TableName": "Table_0",
"Columns": [{"ColumnName": "AuthorizationContext", "DataType": "String"}],
"Rows": [["authorization_context"]],
}
]
}
class MockResponse:
"""Mock class for KustoResponse."""
def __init__(self, json_data, status_code):
self.json_data = json_data
self.text = text_type(json_data)
self.status_code = status_code
self.headers = None
def json(self):
"""Get json data from response."""
return self.json_data
if args[0] == "https://ingest-somecluster.kusto.windows.net/v1/rest/mgmt":
if ".get ingestion resources" in kwargs["json"]["csl"]:
file_name = "ingestionresourcesresult.json"
if ".get kusto identity token" in kwargs["json"]["csl"]:
file_name = "identitytokenresult.json"
with open(
os.path.join(os.path.dirname(__file__), "input", file_name), "r"
) as response_file:
data = response_file.read()
return MockResponse(json.loads(data), 200)
return MockResponse(None, 404)
def mocked_create_blob_from_stream(self, *args, **kwargs):
"""Mock to replace BlockBlobService.create_blob_from_stream"""
tc = unittest.TestCase("__init__")
tc.assertEqual(self.account_name, "storageaccount")
tc.assertEqual(self.sas_token, "sas")
tc.assertEqual(kwargs["container_name"], "tempstorage")
tc.assertIsNotNone(kwargs["blob_name"])
tc.assertRegexpMatches(kwargs["blob_name"], BLOB_NAME_REGEX)
tc.assertIsNotNone(kwargs["stream"])
def mocked_queue_put_message(self, *args, **kwargs):
"""Mock to replace QueueService.put_message"""
tc = unittest.TestCase("__init__")
tc.assertEqual(self.account_name, "storageaccount")
tc.assertEqual(self.sas_token, "sas")
tc.assertEqual(kwargs["queue_name"], "readyforaggregation-secured")
tc.assertIsNotNone(kwargs["content"])
encoded = kwargs["content"]
ingestion_blob_info_json = base64.b64decode(encoded.encode("utf-8")).decode("utf-8")
result = json.loads(ingestion_blob_info_json)
tc.assertIsNotNone(result)
tc.assertIsInstance(result, dict)
tc.assertRegexpMatches(result["BlobPath"], BLOB_URL_REGEX)
tc.assertEquals(result["DatabaseName"], "database")
tc.assertEquals(result["TableName"], "table")
tc.assertGreater(result["RawDataSize"], 0)
tc.assertEquals(result["AdditionalProperties"]["authorizationContext"], "authorization_context")
return (response_status, response_headers, json.dumps(response_body))
class KustoIngestClientTests(unittest.TestCase):
"""Test class for KustoIngestClient."""
MOCKED_UUID_4 = "1111-111111-111111-1111"
MOCKED_PID = 64
MOCKED_TIME = 100
@patch("requests.post", side_effect=mocked_requests_post)
@patch("azure.kusto.data.security._AadHelper.acquire_token", side_effect=mocked_aad_helper)
@patch(
"azure.storage.blob.BlockBlobService.create_blob_from_stream",
autospec=True,
side_effect=mocked_create_blob_from_stream,
)
@patch(
"azure.storage.queue.QueueService.put_message",
autospec=True,
side_effect=mocked_queue_put_message,
)
def test_sanity_ingest(self, mock_post, mock_aad, mock_block_blob, mock_queue):
"""Test simple ingest"""
@responses.activate
@patch("azure.kusto.data.security._AadHelper.acquire_token", return_value=None)
@patch("azure.storage.blob.BlockBlobService.create_blob_from_stream")
@patch("azure.storage.queue.QueueService.put_message")
@patch("uuid.uuid4", return_value=MOCKED_UUID_4)
def test_sanity_ingest_from_file(
self, mock_uuid, mock_put_message_in_queue, mock_create_blob_from_stream, mock_aad
):
responses.add_callback(
responses.POST,
"https://ingest-somecluster.kusto.windows.net/v1/rest/mgmt",
callback=request_callback,
content_type="application/json",
)
ingest_client = KustoIngestClient("https://ingest-somecluster.kusto.windows.net")
ingestion_properties = IngestionProperties(database="database", table="table", dataFormat=DataFormat.csv)
ingestion_properties = IngestionProperties(
database="database", table="table", dataFormat=DataFormat.csv
# ensure test can work when executed from within directories
current_dir = os.getcwd()
path_parts = ["azure-kusto-ingest", "tests", "input", "dataset.csv"]
missing_path_parts = []
for path_part in path_parts:
if path_part not in current_dir:
missing_path_parts.append(path_part)
file_path = os.path.join(current_dir, *missing_path_parts)
ingest_client.ingest_from_file(file_path, ingestion_properties=ingestion_properties)
# mock_put_message_in_queue
assert mock_put_message_in_queue.call_count == 1
put_message_in_queue_mock_kwargs = mock_put_message_in_queue.call_args_list[0][1]
assert put_message_in_queue_mock_kwargs["queue_name"] == "readyforaggregation-secured"
queued_message = base64.b64decode(put_message_in_queue_mock_kwargs["content"].encode("utf-8")).decode("utf-8")
queued_message_json = json.loads(queued_message)
# mock_create_blob_from_stream
assert (
queued_message_json["BlobPath"]
== "https://storageaccount.blob.core.windows.net/tempstorage/database__table__1111-111111-111111-1111__dataset.csv.gz?sas"
)
assert queued_message_json["DatabaseName"] == "database"
assert queued_message_json["IgnoreSizeLimit"] == False
assert queued_message_json["AdditionalProperties"]["format"] == "csv"
assert queued_message_json["FlushImmediately"] == False
assert queued_message_json["TableName"] == "table"
assert queued_message_json["RawDataSize"] > 0
assert queued_message_json["RetainBlobOnSuccess"] == True
create_blob_from_stream_mock_kwargs = mock_create_blob_from_stream.call_args_list[0][1]
assert create_blob_from_stream_mock_kwargs["container_name"] == "tempstorage"
assert type(create_blob_from_stream_mock_kwargs["stream"]) == io.BytesIO
assert (
create_blob_from_stream_mock_kwargs["blob_name"]
== "database__table__1111-111111-111111-1111__dataset.csv.gz"
)
file_path = os.path.join(os.getcwd(), "azure-kusto-ingest", "tests", "input", "dataset.csv")
ingest_client.ingest_from_multiple_files(
[file_path], delete_sources_on_success=False, ingestion_properties=ingestion_properties
@responses.activate
@patch("azure.kusto.data.security._AadHelper.acquire_token", return_value=None)
@patch("azure.storage.blob.BlockBlobService.create_blob_from_path")
@patch("azure.storage.queue.QueueService.put_message")
@patch("uuid.uuid4", return_value=MOCKED_UUID_4)
@patch("time.time", return_value=MOCKED_TIME)
@patch("os.getpid", return_value=MOCKED_PID)
def test_simple_ingest_from_dataframe(
self, mock_pid, mock_time, mock_uuid, mock_put_message_in_queue, mock_create_blob_from_path, mock_aad
):
responses.add_callback(
responses.POST,
"https://ingest-somecluster.kusto.windows.net/v1/rest/mgmt",
callback=request_callback,
content_type="application/json",
)
ingest_client = KustoIngestClient("https://ingest-somecluster.kusto.windows.net")
ingestion_properties = IngestionProperties(database="database", table="table", dataFormat=DataFormat.csv)
from pandas import DataFrame
fields = ["id", "name", "value"]
rows = [[1, "abc", 15.3], [2, "cde", 99.9]]
df = DataFrame(data=rows, columns=fields)
ingest_client.ingest_from_dataframe(df, ingestion_properties=ingestion_properties)
# mock_put_message_in_queue
assert mock_put_message_in_queue.call_count == 1
put_message_in_queue_mock_kwargs = mock_put_message_in_queue.call_args_list[0][1]
assert put_message_in_queue_mock_kwargs["queue_name"] == "readyforaggregation-secured"
queued_message = base64.b64decode(put_message_in_queue_mock_kwargs["content"].encode("utf-8")).decode("utf-8")
queued_message_json = json.loads(queued_message)
# mock_create_blob_from_stream
assert (
queued_message_json["BlobPath"]
== "https://storageaccount.blob.core.windows.net/tempstorage/database__table__1111-111111-111111-1111__df_100_64.csv.gz?sas"
)
assert queued_message_json["DatabaseName"] == "database"
assert queued_message_json["IgnoreSizeLimit"] == False
assert queued_message_json["AdditionalProperties"]["format"] == "csv"
assert queued_message_json["FlushImmediately"] == False
assert queued_message_json["TableName"] == "table"
assert queued_message_json["RawDataSize"] > 0
assert queued_message_json["RetainBlobOnSuccess"] == True
create_blob_from_path_mock_kwargs = mock_create_blob_from_path.call_args_list[0][1]
import tempfile
assert create_blob_from_path_mock_kwargs["container_name"] == "tempstorage"
assert create_blob_from_path_mock_kwargs["file_path"] == os.path.join(tempfile.gettempdir(), "df_100_64.csv.gz")
assert (
create_blob_from_path_mock_kwargs["blob_name"]
== "database__table__1111-111111-111111-1111__df_100_64.csv.gz"
)

Просмотреть файл

@ -1,118 +0,0 @@
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003" ToolsVersion="4.0">
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>f9929b44-7edf-4436-9e6a-2bf7ecc96a26</ProjectGuid>
<ProjectHome>.</ProjectHome>
<SearchPath>
</SearchPath>
<WorkingDirectory>.</WorkingDirectory>
<OutputPath>.</OutputPath>
<Name>azure-kusto-python</Name>
<RootNamespace>azure-kusto-python</RootNamespace>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)' == 'Debug' ">
<DebugSymbols>true</DebugSymbols>
<EnableUnmanagedDebugging>false</EnableUnmanagedDebugging>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
<DebugSymbols>true</DebugSymbols>
<EnableUnmanagedDebugging>false</EnableUnmanagedDebugging>
</PropertyGroup>
<ItemGroup>
<Folder Include="azure-kusto-data\" />
<Folder Include="azure-kusto-data\azure\" />
<Folder Include="azure-kusto-data\azure\kusto\" />
<Folder Include="azure-kusto-data\azure\kusto\data\" />
<Folder Include="azure-kusto-data\tests\" />
<Folder Include="azure-kusto-data\tests\input\" />
<Folder Include="azure-kusto-ingest\" />
<Folder Include="azure-kusto-ingest\azure\" />
<Folder Include="azure-kusto-ingest\azure\kusto\" />
<Folder Include="azure-kusto-ingest\azure\kusto\ingest\" />
<Folder Include="azure-kusto-ingest\tests\" />
<Folder Include="azure-kusto-ingest\tests\input\" />
</ItemGroup>
<ItemGroup>
<Compile Include="azure-kusto-data\azure\kusto\data\aad_helper.py">
<SubType>Code</SubType>
</Compile>
<Compile Include="azure-kusto-data\azure\kusto\data\kusto_client.py" />
<Compile Include="azure-kusto-data\azure\kusto\data\kusto_exceptions.py" />
<Compile Include="azure-kusto-data\azure\kusto\data\version.py" />
<Compile Include="azure-kusto-data\azure\kusto\data\__init__.py" />
<Compile Include="azure-kusto-data\azure\kusto\__init__.py" />
<Compile Include="azure-kusto-data\azure\__init__.py" />
<Compile Include="azure-kusto-data\azure_bdist_wheel.py" />
<Compile Include="azure-kusto-data\setup.py" />
<Compile Include="azure-kusto-data\tests\test_converter.py" />
<Compile Include="azure-kusto-data\tests\test_functional.py" />
<Compile Include="azure-kusto-data\tests\sample_driver.py" />
<Compile Include="azure-kusto-data\tests\test_kusto_client.py" />
<Compile Include="azure-kusto-data\tests\__init__.py">
<SubType>Code</SubType>
</Compile>
<Compile Include="azure-kusto-ingest\azure\kusto\ingest\connection_string.py" />
<Compile Include="azure-kusto-ingest\azure\kusto\ingest\descriptors.py" />
<Compile Include="azure-kusto-ingest\azure\kusto\ingest\ingestion_blob_info.py" />
<Compile Include="azure-kusto-ingest\azure\kusto\ingest\ingestion_properties.py" />
<Compile Include="azure-kusto-ingest\azure\kusto\ingest\kusto_ingest_client.py" />
<Compile Include="azure-kusto-ingest\azure\kusto\ingest\kusto_ingest_client_exceptions.py" />
<Compile Include="azure-kusto-ingest\azure\kusto\ingest\resource_manager.py">
<SubType>Code</SubType>
</Compile>
<Compile Include="azure-kusto-ingest\azure\kusto\ingest\version.py" />
<Compile Include="azure-kusto-ingest\azure\kusto\ingest\__init__.py" />
<Compile Include="azure-kusto-ingest\azure\kusto\__init__.py" />
<Compile Include="azure-kusto-ingest\azure\__init__.py" />
<Compile Include="azure-kusto-ingest\azure_bdist_wheel.py" />
<Compile Include="azure-kusto-ingest\setup.py" />
<Compile Include="azure-kusto-ingest\tests\input\__init__.py">
<SubType>Code</SubType>
</Compile>
<Compile Include="azure-kusto-ingest\tests\test_connection_string.py" />
<Compile Include="azure-kusto-ingest\tests\test_descriptors.py" />
<Compile Include="azure-kusto-ingest\tests\test_ingestion_blob_info.py" />
<Compile Include="azure-kusto-ingest\tests\sample.py" />
<Compile Include="azure-kusto-ingest\tests\__init__.py">
<SubType>Code</SubType>
</Compile>
<Compile Include="build_packages.py" />
<Compile Include="setup.py">
<SubType>Code</SubType>
</Compile>
</ItemGroup>
<ItemGroup>
<Content Include=".travis.yml" />
<Content Include="azure-kusto-data\MANIFEST.in" />
<Content Include="azure-kusto-data\README.rst" />
<Content Include="azure-kusto-data\requirements.txt" />
<Content Include="azure-kusto-data\setup.cfg" />
<Content Include="azure-kusto-data\tests\input\deft.json" />
<Content Include="azure-kusto-data\tests\input\querypartialresults.json" />
<Content Include="azure-kusto-data\tests\input\versionshowcommandresult.json" />
<Content Include="azure-kusto-ingest\MANIFEST.in" />
<Content Include="azure-kusto-ingest\README.rst" />
<Content Include="azure-kusto-ingest\requirements.txt" />
<Content Include="azure-kusto-ingest\setup.cfg" />
<Content Include="azure-kusto-ingest\tests\input\dataset.csv" />
<Content Include="azure-kusto-ingest\tests\input\dataset.csv.gz" />
<Content Include="azure-kusto-ingest\tests\input\dataset.json" />
<Content Include="azure-kusto-ingest\tests\input\dataset.jsonz.gz" />
<Content Include="dev_requirements.txt" />
<Content Include="README.md" />
<Content Include="setup.cfg">
<SubType>Code</SubType>
</Content>
</ItemGroup>
<Import Project="$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)\Python Tools\Microsoft.PythonTools.targets" />
<!-- Uncomment the CoreCompile target to enable the Build command in
Visual Studio and specify your pre- and post-build commands in
the BeforeBuild and AfterBuild targets below. -->
<!--<Target Name="CoreCompile" />-->
<Target Name="BeforeBuild">
</Target>
<Target Name="AfterBuild">
</Target>
</Project>

Просмотреть файл

@ -1,23 +0,0 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.27004.2010
MinimumVisualStudioVersion = 10.0.40219.1
Project("{888888A0-9F3D-457C-B088-3A5042F75D52}") = "azure-kusto-python", "azure-kusto-python.pyproj", "{F9929B44-7EDF-4436-9E6A-2BF7ECC96A26}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{F9929B44-7EDF-4436-9E6A-2BF7ECC96A26}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F9929B44-7EDF-4436-9E6A-2BF7ECC96A26}.Release|Any CPU.ActiveCfg = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {C9CB338A-3469-457D-916C-CAB8C0A5BF95}
EndGlobalSection
EndGlobal

Просмотреть файл

@ -1,4 +1,4 @@
-e azure-kusto-data
-e azure-kusto-ingest
pytest>=3.2.0
mock>=2.0.0
responses>=0.9.0
black;python_version == '3.6'