зеркало из
1
0
Форкнуть 0
* streaming from blob

* managed streaming
Q:  why dont we use aio library for storage?

* format

* fix test

* prepering client_base from_stream methods

* maybe better

* async impl
test fallback to queue

* format

* fix usage of ExecuteRequestParams in tests

* fix usage of ExecuteRequestParams in tests

* b

* fix aio

* add span headers for devbug

* add span headers for devbug

* add span headers for devbug

* t

* f

* revert

* print

* try copy

* remove print

---------

Co-authored-by: Ohad Bitton <ohbitton@microsoft.com>
This commit is contained in:
ohad bitton 2024-01-24 10:13:14 +02:00 коммит произвёл GitHub
Родитель 40197266f2
Коммит c6150c0056
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
11 изменённых файлов: 397 добавлений и 152 удалений

Просмотреть файл

@ -5,6 +5,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## Unreleased
### Added
- Streaming ingestion from blob
### Fixed
- Managed streaming fallback to queued
-
### Changed
- Changed binary files data format compression to false

Просмотреть файл

@ -51,42 +51,85 @@ class KustoClient(_KustoClientBase):
return await self.execute_mgmt(database, query, properties)
return await self.execute_query(database, query, properties)
@distributed_trace_async(name_of_span="KustoClient.query_cmd", kind=SpanKind.CLIENT)
@distributed_trace_async(name_of_span="AioKustoClient.query_cmd", kind=SpanKind.CLIENT)
@aio_documented_by(KustoClientSync.execute_query)
async def execute_query(self, database: str, query: str, properties: ClientRequestProperties = None) -> KustoResponseDataSet:
database = self._get_database_or_default(database)
Span.set_query_attributes(self._kusto_cluster, database, properties)
request = ExecuteRequestParams._from_query(
query,
database,
properties,
self._request_headers,
self._query_default_timeout,
self._mgmt_default_timeout,
self._client_server_delta,
self.client_details,
)
return await self._execute(self._query_endpoint, request, properties)
return await self._execute(self._query_endpoint, database, query, None, KustoClient._query_default_timeout, properties)
@distributed_trace_async(name_of_span="KustoClient.control_cmd", kind=SpanKind.CLIENT)
@distributed_trace_async(name_of_span="AioKustoClient.control_cmd", kind=SpanKind.CLIENT)
@aio_documented_by(KustoClientSync.execute_mgmt)
async def execute_mgmt(self, database: str, query: str, properties: ClientRequestProperties = None) -> KustoResponseDataSet:
database = self._get_database_or_default(database)
Span.set_query_attributes(self._kusto_cluster, database, properties)
request = ExecuteRequestParams._from_query(
query,
database,
properties,
self._request_headers,
self._mgmt_default_timeout,
self._mgmt_default_timeout,
self._client_server_delta,
self.client_details,
)
return await self._execute(self._mgmt_endpoint, request, properties)
return await self._execute(self._mgmt_endpoint, database, query, None, KustoClient._mgmt_default_timeout, properties)
@distributed_trace_async(name_of_span="KustoClient.streaming_ingest", kind=SpanKind.CLIENT)
@distributed_trace_async(name_of_span="AioKustoClient.streaming_ingest", kind=SpanKind.CLIENT)
@aio_documented_by(KustoClientSync.execute_streaming_ingest)
async def execute_streaming_ingest(
self,
database: Optional[str],
table: str,
stream: io.IOBase,
stream: Optional[io.IOBase],
blob_url: Optional[str],
stream_format: Union[DataFormat, str],
properties: ClientRequestProperties = None,
mapping_name: str = None,
):
database = self._get_database_or_default(database)
Span.set_streaming_ingest_attributes(self._kusto_cluster, database, table, properties)
stream_format = stream_format.kusto_value if isinstance(stream_format, DataFormat) else DataFormat[stream_format.upper()].kusto_value
endpoint = self._streaming_ingest_endpoint + database + "/" + table + "?streamFormat=" + stream_format
if mapping_name is not None:
endpoint = endpoint + "&mappingName=" + mapping_name
await self._execute(endpoint, database, None, stream, self._streaming_ingest_default_timeout, properties)
if blob_url:
endpoint += "&sourceKind=uri"
request = ExecuteRequestParams._from_blob_url(
blob_url,
properties,
self._request_headers,
self._streaming_ingest_default_timeout,
self._mgmt_default_timeout,
self._client_server_delta,
self.client_details,
)
elif stream:
request = ExecuteRequestParams._from_stream(
stream,
properties,
self._request_headers,
self._streaming_ingest_default_timeout,
self._mgmt_default_timeout,
self._client_server_delta,
self.client_details,
)
else:
raise Exception("execute_streaming_ingest is expecting either a stream or blob url")
Span.set_streaming_ingest_attributes(self._kusto_cluster, database, table, properties)
await self._execute(endpoint, request, properties)
@aio_documented_by(KustoClientSync._execute_streaming_query_parsed)
async def _execute_streaming_query_parsed(
@ -96,10 +139,13 @@ class KustoClient(_KustoClientBase):
timeout: timedelta = _KustoClientBase._query_default_timeout,
properties: Optional[ClientRequestProperties] = None,
) -> StreamingDataSetEnumerator:
response = await self._execute(self._query_endpoint, database, query, None, timeout, properties, stream_response=True)
request = ExecuteRequestParams._from_query(
query, database, properties, self._request_headers, timeout, self._mgmt_default_timeout, self._client_server_delta, self.client_details
)
response = await self._execute(self._query_endpoint, request, properties, stream_response=True)
return StreamingDataSetEnumerator(JsonTokenReader(response.content))
@distributed_trace_async(name_of_span="KustoClient.streaming_query", kind=SpanKind.CLIENT)
@distributed_trace_async(name_of_span="AioKustoClient.streaming_query", kind=SpanKind.CLIENT)
@aio_documented_by(KustoClientSync.execute_streaming_query)
async def execute_streaming_query(
self,
@ -118,41 +164,33 @@ class KustoClient(_KustoClientBase):
async def _execute(
self,
endpoint: str,
database: Optional[str],
query: Optional[str],
payload: Optional[io.IOBase],
timeout: timedelta,
properties: ClientRequestProperties = None,
request: ExecuteRequestParams,
properties: Optional[ClientRequestProperties] = None,
stream_response: bool = False,
) -> Union[KustoResponseDataSet, ClientResponse]:
"""Executes given query against this client"""
if self._is_closed:
raise KustoClosedError()
self.validate_endpoint()
request_params = ExecuteRequestParams(
database,
payload,
properties,
query,
timeout,
self._request_headers,
self._mgmt_default_timeout,
self._client_server_delta,
self.client_details,
)
json_payload = request_params.json_payload
request_headers = request_params.request_headers
timeout = request_params.timeout
request_headers = request.request_headers
timeout = request.timeout
if self._aad_helper:
request_headers["Authorization"] = await self._aad_helper.acquire_authorization_header_async()
invoker = lambda: self._session.post(
endpoint, headers=request_headers, json=json_payload, data=payload, timeout=timeout.seconds, proxy=self._proxy_url, allow_redirects=False
endpoint,
headers=request_headers,
json=request.json_payload,
data=request.payload,
timeout=timeout.seconds,
proxy=self._proxy_url,
allow_redirects=False,
)
try:
response = await MonitoredActivity.invoke_async(
invoker, name_of_span="KustoClient.http_post", tracing_attributes=Span.create_http_attributes("POST", endpoint, request_headers)
invoker, name_of_span="AioKustoClient.http_post", tracing_attributes=Span.create_http_attributes("POST", endpoint, request_headers)
)
except Exception as e:
raise KustoNetworkError(endpoint, None if properties is None else properties.client_request_id) from e
@ -172,7 +210,7 @@ class KustoClient(_KustoClientBase):
response_json = await response.json()
except Exception:
response_json = None
raise self._handle_http_error(e, endpoint, payload, response, response.status, response_json, response_text)
raise self._handle_http_error(e, endpoint, request.payload, response, response.status, response_json, response_text)
async with response:
response_json = None
@ -186,5 +224,5 @@ class KustoClient(_KustoClientBase):
response_text = await response.text()
except Exception:
response_text = None
raise self._handle_http_error(e, endpoint, payload, response, response.status, response_json, response_text)
return MonitoredActivity.invoke(lambda: self._kusto_parse_by_endpoint(endpoint, response_json), name_of_span="KustoClient.processing_response")
raise self._handle_http_error(e, endpoint, request.payload, response, response.status, response_json, response_text)
return MonitoredActivity.invoke(lambda: self._kusto_parse_by_endpoint(endpoint, response_json), name_of_span="AioKustoClient.processing_response")

Просмотреть файл

@ -175,8 +175,17 @@ class KustoClient(_KustoClientBase):
"""
database = self._get_database_or_default(database)
Span.set_query_attributes(self._kusto_cluster, database, properties)
return self._execute(self._query_endpoint, database, query, None, self._query_default_timeout, properties)
request = ExecuteRequestParams._from_query(
query,
database,
properties,
self._request_headers,
self._query_default_timeout,
self._mgmt_default_timeout,
self._client_server_delta,
self.client_details,
)
return self._execute(self._query_endpoint, request, properties)
@distributed_trace(name_of_span="KustoClient.control_cmd", kind=SpanKind.CLIENT)
def execute_mgmt(self, database: Optional[str], query: str, properties: Optional[ClientRequestProperties] = None) -> KustoResponseDataSet:
@ -191,15 +200,25 @@ class KustoClient(_KustoClientBase):
"""
database = self._get_database_or_default(database)
Span.set_query_attributes(self._kusto_cluster, database, properties)
return self._execute(self._mgmt_endpoint, database, query, None, self._mgmt_default_timeout, properties)
request = ExecuteRequestParams._from_query(
query,
database,
properties,
self._request_headers,
self._mgmt_default_timeout,
self._mgmt_default_timeout,
self._client_server_delta,
self.client_details,
)
return self._execute(self._mgmt_endpoint, request, properties)
@distributed_trace(name_of_span="KustoClient.streaming_ingest", kind=SpanKind.CLIENT)
def execute_streaming_ingest(
self,
database: Optional[str],
table: str,
stream: IO[AnyStr],
stream: Optional[IO[AnyStr]],
blob_url: Optional[str],
stream_format: Union[DataFormat, str],
properties: Optional[ClientRequestProperties] = None,
mapping_name: str = None,
@ -211,20 +230,44 @@ class KustoClient(_KustoClientBase):
https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-streaming
:param Optional[str] database: Target database. If not provided, will default to the "Initial Catalog" value in the connection string
:param str table: Target table.
:param io.BaseIO stream: stream object which contains the data to ingest.
:param Optional[IO[AnyStr]] stream: a stream object or which contains the data to ingest.
:param Optional[str] blob_url: An url to a blob which contains the data to ingest. Provide either this or stream.
:param DataFormat stream_format: Format of the data in the stream.
:param ClientRequestProperties properties: additional request properties.
:param str mapping_name: Pre-defined mapping of the table. Required when stream_format is json/avro.
"""
database = self._get_database_or_default(database)
Span.set_streaming_ingest_attributes(self._kusto_cluster, database, table, properties)
stream_format = stream_format.kusto_value if isinstance(stream_format, DataFormat) else DataFormat[stream_format.upper()].kusto_value
endpoint = self._streaming_ingest_endpoint + database + "/" + table + "?streamFormat=" + stream_format
if mapping_name is not None:
endpoint = endpoint + "&mappingName=" + mapping_name
if blob_url:
endpoint += "&sourceKind=uri"
request = ExecuteRequestParams._from_blob_url(
blob_url,
properties,
self._request_headers,
self._streaming_ingest_default_timeout,
self._mgmt_default_timeout,
self._client_server_delta,
self.client_details,
)
elif stream:
request = ExecuteRequestParams._from_stream(
stream,
properties,
self._request_headers,
self._streaming_ingest_default_timeout,
self._mgmt_default_timeout,
self._client_server_delta,
self.client_details,
)
else:
raise Exception("execute_streaming_ingest is expecting either a stream or blob url")
self._execute(endpoint, database, None, stream, self._streaming_ingest_default_timeout, properties)
Span.set_streaming_ingest_attributes(self._kusto_cluster, database, table, properties)
self._execute(endpoint, request, properties)
def _execute_streaming_query_parsed(
self,
@ -233,7 +276,10 @@ class KustoClient(_KustoClientBase):
timeout: timedelta = _KustoClientBase._query_default_timeout,
properties: Optional[ClientRequestProperties] = None,
) -> StreamingDataSetEnumerator:
response = self._execute(self._query_endpoint, database, query, None, timeout, properties, stream_response=True)
request = ExecuteRequestParams._from_query(
query, database, properties, self._request_headers, timeout, self._mgmt_default_timeout, self._client_server_delta, self.client_details
)
response = self._execute(self._query_endpoint, request, properties, stream_response=True)
response.raw.decode_content = True
return StreamingDataSetEnumerator(JsonTokenReader(response.raw))
@ -262,10 +308,7 @@ class KustoClient(_KustoClientBase):
def _execute(
self,
endpoint: str,
database: Optional[str],
query: Optional[str],
payload: Optional[IO[AnyStr]],
timeout: timedelta,
request: ExecuteRequestParams,
properties: Optional[ClientRequestProperties] = None,
stream_response: bool = False,
) -> Union[KustoResponseDataSet, Response]:
@ -273,20 +316,8 @@ class KustoClient(_KustoClientBase):
if self._is_closed:
raise KustoClosedError()
self.validate_endpoint()
request_params = ExecuteRequestParams(
database,
payload,
properties,
query,
timeout,
self._request_headers,
self._mgmt_default_timeout,
self._client_server_delta,
self.client_details,
)
json_payload = request_params.json_payload
request_headers = request_params.request_headers
timeout = request_params.timeout
request_headers = request.request_headers
if self._aad_helper:
request_headers["Authorization"] = self._aad_helper.acquire_authorization_header()
@ -294,9 +325,9 @@ class KustoClient(_KustoClientBase):
invoker = lambda: self._session.post(
endpoint,
headers=request_headers,
json=json_payload,
data=payload,
timeout=timeout.seconds,
json=request.json_payload,
data=request.payload,
timeout=request.timeout.seconds,
stream=stream_response,
allow_redirects=False,
)
@ -324,6 +355,6 @@ class KustoClient(_KustoClientBase):
response_json = response.json()
response.raise_for_status()
except Exception as e:
raise self._handle_http_error(e, endpoint, payload, response, response.status_code, response_json, response.text)
raise self._handle_http_error(e, endpoint, request.payload, response, response.status_code, response_json, response.text)
# trace response processing
return MonitoredActivity.invoke(lambda: self._kusto_parse_by_endpoint(endpoint, response_json), name_of_span="KustoClient.processing_response")

Просмотреть файл

@ -125,36 +125,82 @@ class _KustoClientBase(abc.ABC):
class ExecuteRequestParams:
def __init__(
self,
database: str,
payload: Optional[io.IOBase],
@staticmethod
def _from_stream(
stream: io.IOBase,
properties: ClientRequestProperties,
query: str,
request_headers: Any,
timeout: timedelta,
request_headers: dict,
mgmt_default_timeout: timedelta,
client_server_delta: timedelta,
client_details: ClientDetails,
):
request_headers = copy(request_headers)
request_headers["Connection"] = "Keep-Alive"
json_payload = None
if not payload:
json_payload = {"db": database, "csl": query}
if properties:
json_payload["properties"] = properties.to_json()
# Before 3.0 it was KPC.execute_streaming_ingest, but was changed to align with the other SDKs
client_request_id_prefix = "KPC.executeStreamingIngest;"
request_headers = request_headers.copy()
request_headers["Content-Encoding"] = "gzip"
if properties:
request_headers.update(json.loads(properties.to_json())["Options"])
client_request_id_prefix = "KPC.execute;"
request_headers["Content-Type"] = "application/json; charset=utf-8"
else:
if properties:
request_headers.update(json.loads(properties.to_json())["Options"])
return ExecuteRequestParams(
stream, None, request_headers, client_request_id_prefix, properties, timeout, mgmt_default_timeout, client_server_delta, client_details
)
# Before 3.0 it was KPC.execute_streaming_ingest, but was changed to align with the other SDKs
client_request_id_prefix = "KPC.executeStreamingIngest;"
request_headers["Content-Encoding"] = "gzip"
@staticmethod
def _from_query(
query: str,
database: str,
properties: ClientRequestProperties,
request_headers: Any,
timeout: timedelta,
mgmt_default_timeout: timedelta,
client_server_delta: timedelta,
client_details: ClientDetails,
):
json_payload = {"db": database, "csl": query}
if properties:
json_payload["properties"] = properties.to_json()
client_request_id_prefix = "KPC.execute;"
request_headers = request_headers.copy()
request_headers["Content-Type"] = "application/json; charset=utf-8"
return ExecuteRequestParams(
None, json_payload, request_headers, client_request_id_prefix, properties, timeout, mgmt_default_timeout, client_server_delta, client_details
)
@staticmethod
def _from_blob_url(
blob: str,
properties: ClientRequestProperties,
request_headers: Any,
timeout: timedelta,
mgmt_default_timeout: timedelta,
client_server_delta: timedelta,
client_details: ClientDetails,
):
json_payload = {"sourceUri": blob}
client_request_id_prefix = "KPC.executeStreamingIngestFromBlob;"
request_headers = request_headers.copy()
request_headers["Content-Type"] = "application/json; charset=utf-8"
if properties:
request_headers.update(json.loads(properties.to_json())["Options"])
return ExecuteRequestParams(
None, json_payload, request_headers, client_request_id_prefix, properties, timeout, mgmt_default_timeout, client_server_delta, client_details
)
def __init__(
self,
payload,
json_payload,
request_headers,
client_request_id_prefix,
properties: ClientRequestProperties,
timeout: timedelta,
mgmt_default_timeout: timedelta,
client_server_delta: timedelta,
client_details: ClientDetails,
):
special_headers = [
{
"name": "x-ms-client-request-id",
@ -201,3 +247,4 @@ class ExecuteRequestParams:
self.json_payload = json_payload
self.request_headers = request_headers
self.timeout = timeout
self.payload = payload

Просмотреть файл

@ -37,15 +37,14 @@ def test_properties():
def test_default_tracing_properties():
kcsb = KustoConnectionStringBuilder("test")
params = ExecuteRequestParams(
"somedatabase",
None,
ClientRequestProperties(),
params = ExecuteRequestParams._from_query(
"somequery",
timedelta(seconds=10),
"somedatabase",
ClientRequestProperties(),
{},
timedelta(seconds=10),
timedelta(seconds=10),
timedelta(seconds=10),
kcsb.client_details,
)
@ -60,15 +59,14 @@ def test_custom_kcsb_tracing_properties():
kcsb.application_for_tracing = "myApp"
kcsb.user_name_for_tracing = "myUser"
params = ExecuteRequestParams(
"somedatabase",
None,
ClientRequestProperties(),
params = ExecuteRequestParams._from_query(
"somequery",
timedelta(seconds=10),
"somedatabase",
ClientRequestProperties(),
{},
timedelta(seconds=10),
timedelta(seconds=10),
timedelta(seconds=10),
kcsb.client_details,
)
@ -84,15 +82,14 @@ def test_custom_crp_tracing_properties():
crp.application = "myApp2"
crp.user = "myUser2"
params = ExecuteRequestParams(
"somedatabase",
None,
crp,
params = ExecuteRequestParams._from_query(
"somequery",
timedelta(seconds=10),
"somedatabase",
crp,
{},
timedelta(seconds=10),
timedelta(seconds=10),
timedelta(seconds=10),
kcsb.client_details,
)
@ -110,15 +107,14 @@ def test_custom_crp_tracing_properties_override_kcsb():
crp.application = "myApp2"
crp.user = "myUser2"
params = ExecuteRequestParams(
"somedatabase",
None,
crp,
params = ExecuteRequestParams._from_query(
"somequery",
timedelta(seconds=10),
"somedatabase",
crp,
{},
timedelta(seconds=10),
timedelta(seconds=10),
timedelta(seconds=10),
kcsb.client_details,
)
@ -133,15 +129,14 @@ def test_set_connector_name_and_version():
kcsb._set_connector_details("myConnector", "myVersion", send_user=False)
crp = ClientRequestProperties()
params = ExecuteRequestParams(
"somedatabase",
None,
crp,
params = ExecuteRequestParams._from_query(
"somequery",
timedelta(seconds=10),
"somedatabase",
ClientRequestProperties(),
{},
timedelta(seconds=10),
timedelta(seconds=10),
timedelta(seconds=10),
kcsb.client_details,
)
@ -157,15 +152,14 @@ def test_set_connector_no_app_version():
kcsb._set_connector_details("myConnector", "myVersion", app_name="myApp", send_user=True)
crp = ClientRequestProperties()
params = ExecuteRequestParams(
"somedatabase",
None,
crp,
params = ExecuteRequestParams._from_query(
"somequery",
timedelta(seconds=10),
"somedatabase",
ClientRequestProperties(),
{},
timedelta(seconds=10),
timedelta(seconds=10),
timedelta(seconds=10),
kcsb.client_details,
)
@ -189,15 +183,14 @@ def test_set_connector_full():
)
crp = ClientRequestProperties()
params = ExecuteRequestParams(
"somedatabase",
None,
crp,
params = ExecuteRequestParams._from_query(
"somequery",
timedelta(seconds=10),
"somedatabase",
crp,
{},
timedelta(seconds=10),
timedelta(seconds=10),
timedelta(seconds=10),
kcsb.client_details,
)

Просмотреть файл

@ -12,6 +12,8 @@ from io import BytesIO, SEEK_END
from typing import Union, Optional, AnyStr, IO, List, Dict
from zipfile import ZipFile
from azure.storage.blob import BlobClient
OptionalUUID = Optional[Union[str, uuid.UUID]]
@ -150,6 +152,10 @@ class BlobDescriptor(DescriptorBase):
obfuscated_path = self.path.split("?")[0].split(";")[0]
return {self._BLOB_URI: obfuscated_path, self._SOURCE_ID: str(self.source_id)}
def fill_size(self):
if not self.size:
self.size = BlobClient.from_blob_url(self.path).get_blob_properties().size
class StreamDescriptor(DescriptorBase):
"""StreamDescriptor is used to describe a stream that will be used as ingestion source"""

Просмотреть файл

@ -1,7 +1,8 @@
import uuid
from io import SEEK_SET
from typing import AnyStr, IO, TYPE_CHECKING, Union
from typing import AnyStr, IO, TYPE_CHECKING, Union, Optional
from azure.kusto.ingest.descriptors import DescriptorBase
from tenacity import Retrying, _utils, stop_after_attempt, wait_random_exponential
from azure.core.tracing.decorator import distributed_trace
@ -11,7 +12,6 @@ from azure.kusto.data import KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoApiError, KustoClosedError
from azure.kusto.data._telemetry import MonitoredActivity
from . import BlobDescriptor, FileDescriptor, IngestionProperties, StreamDescriptor
from ._ingest_telemetry import IngestTracingAttributes
from ._stream_extensions import chain_streams, read_until_size_or_end
@ -108,30 +108,20 @@ class ManagedStreamingIngestClient(BaseIngestClient):
stream = stream_descriptor.stream
buffered_stream = read_until_size_or_end(stream, self.MAX_STREAMING_SIZE_IN_BYTES + 1)
if len(buffered_stream.getbuffer()) > self.MAX_STREAMING_SIZE_IN_BYTES:
stream_descriptor.stream = chain_streams([buffered_stream, stream])
return self.queued_client.ingest_from_stream(stream_descriptor, ingestion_properties)
length = len(buffered_stream.getbuffer())
stream_descriptor.stream = buffered_stream
try:
for attempt in Retrying(
stop=stop_after_attempt(self._num_of_attempts), wait=wait_random_exponential(max=self._max_seconds_per_retry), reraise=True
):
with attempt:
stream.seek(0, SEEK_SET)
client_request_id = ManagedStreamingIngestClient._get_request_id(stream_descriptor.source_id, attempt.retry_state.attempt_number - 1)
# trace attempt to ingest from stream
invoker = lambda: self.streaming_client._ingest_from_stream_with_client_request_id(
stream_descriptor, ingestion_properties, client_request_id
)
return MonitoredActivity.invoke(invoker, name_of_span="ManagedStreamingIngestClient.ingest_from_stream_attempt")
res = self._stream_with_retries(length, stream_descriptor, ingestion_properties)
if res:
return res
stream_descriptor.stream = chain_streams([buffered_stream, stream])
except KustoApiError as ex:
error = ex.get_api_error()
if error.permanent:
raise
buffered_stream.seek(0, SEEK_SET)
return self.queued_client.ingest_from_stream(stream_descriptor, ingestion_properties)
@ -151,9 +141,42 @@ class ManagedStreamingIngestClient(BaseIngestClient):
if self._is_closed:
raise KustoClosedError()
blob_descriptor.fill_size()
try:
res = self._stream_with_retries(blob_descriptor.size, blob_descriptor, ingestion_properties)
if res:
return res
except KustoApiError as ex:
error = ex.get_api_error()
if error.permanent:
raise
return self.queued_client.ingest_from_blob(blob_descriptor, ingestion_properties)
def _stream_with_retries(
self,
length: int,
descriptor: DescriptorBase,
props: IngestionProperties,
) -> Optional[IngestionResult]:
from_stream = isinstance(descriptor, StreamDescriptor)
if length > self.MAX_STREAMING_SIZE_IN_BYTES:
return None
for attempt in Retrying(stop=stop_after_attempt(self._num_of_attempts), wait=wait_random_exponential(max=self._max_seconds_per_retry), reraise=True):
with attempt:
client_request_id = ManagedStreamingIngestClient._get_request_id(descriptor.source_id, attempt.retry_state.attempt_number - 1)
# trace attempt to ingest from stream
if from_stream:
descriptor.stream.seek(0, SEEK_SET)
invoker = lambda: self.streaming_client._ingest_from_stream_with_client_request_id(descriptor, props, client_request_id)
else:
invoker = lambda: self.streaming_client.ingest_from_blob(descriptor, props, client_request_id)
return MonitoredActivity.invoke(
invoker,
name_of_span="ManagedStreamingIngestClient.ingest_from_stream_attempt",
tracing_attributes={"attemptNumber": attempt, "sourceIsStream": from_stream},
)
@staticmethod
def _get_request_id(source_id: uuid.UUID, attempt: int):
return f"KPC.executeManagedStreamingIngest;{source_id};{attempt}"

Просмотреть файл

@ -10,7 +10,7 @@ from azure.kusto.data import KustoClient, KustoConnectionStringBuilder, ClientRe
from ._ingest_telemetry import IngestTracingAttributes
from .base_ingest_client import BaseIngestClient, IngestionResult, IngestionStatus
from .descriptors import FileDescriptor, StreamDescriptor
from .descriptors import FileDescriptor, StreamDescriptor, BlobDescriptor
from .ingestion_properties import IngestionProperties
@ -79,9 +79,30 @@ class KustoStreamingIngestClient(BaseIngestClient):
ingestion_properties.database,
ingestion_properties.table,
stream_descriptor.stream,
None,
ingestion_properties.format.name,
additional_properties,
mapping_name=ingestion_properties.ingestion_mapping_reference,
)
return IngestionResult(IngestionStatus.SUCCESS, ingestion_properties.database, ingestion_properties.table, stream_descriptor.source_id)
def ingest_from_blob(
self, blob_descriptor: BlobDescriptor, ingestion_properties: IngestionProperties, client_request_id: Optional[str] = None
) -> IngestionResult:
IngestTracingAttributes.set_ingest_descriptor_attributes(blob_descriptor, ingestion_properties)
additional_properties = None
if client_request_id:
additional_properties = ClientRequestProperties()
additional_properties.client_request_id = client_request_id
self._kusto_client.execute_streaming_ingest(
ingestion_properties.database,
ingestion_properties.table,
None,
blob_descriptor.path,
ingestion_properties.format.name,
additional_properties,
mapping_name=ingestion_properties.ingestion_mapping_reference,
)
return IngestionResult(IngestionStatus.SUCCESS, ingestion_properties.database, ingestion_properties.table, blob_descriptor.source_id)

Просмотреть файл

@ -538,3 +538,32 @@ class TestE2E:
self.ingest_client.ingest_from_dataframe(df, ingestion_properties)
await self.assert_rows_added(1, timeout=120)
@pytest.mark.asyncio
async def test_streaming_ingest_from_blob(self, is_managed_streaming):
ingestion_properties = IngestionProperties(
database=self.test_db,
table=self.test_table,
data_format=DataFormat.JSON,
ingestion_mapping_reference="JsonMapping",
ingestion_mapping_kind=IngestionMappingKind.JSON,
)
containers = self.ingest_client._resource_manager.get_containers()
with FileDescriptor(self.json_file_path).open(False) as stream:
blob_descriptor = self.ingest_client.upload_blob(
containers,
FileDescriptor(self.json_file_path),
ingestion_properties.database,
ingestion_properties.table,
stream,
None,
10 * 60,
3,
)
if is_managed_streaming:
self.managed_streaming_ingest_client.ingest_from_blob(blob_descriptor, ingestion_properties)
else:
self.streaming_ingest_client.ingest_from_blob(blob_descriptor, ingestion_properties)
await self.assert_rows_added(2, timeout=120)

Просмотреть файл

@ -44,6 +44,36 @@ STORAGE_QUEUE4_URL = "https://storageaccount2.queue.core.windows.net/readyforagg
STORAGE_QUEUE5_URL = "https://storageaccount2.queue.core.windows.net/readyforaggregation-secured?5"
def request_callback_throw_transient(request):
response_status = 400
response_body = {
"error": {
"code": "BadRequest",
"message": "Request is invalid and cannot be executed.",
"@type": "Kusto.Common.Svc.Exceptions.AdminCommandWrongEndpointException",
"@message": "Cannot get ingestion resources from this service endpoint. The appropriate endpoint is most likely "
"'https://ingest-somecluster.kusto.windows.net/'.",
"@context": {
"timestamp": "2021-10-12T06:05:35.6602087Z",
"serviceAlias": "SomeCluster",
"machineName": "KEngine000000",
"processName": "Kusto.WinSvc.Svc",
"processId": 2648,
"threadId": 472,
"appDomainName": "Kusto.WinSvc.Svc.exe",
"clientRequestId": "KPC.execute;a3dfb878-9d2b-49d6-89a5-e9b3a9f1f674",
"activityId": "87eb8fc9-78b3-4580-bcc8-6c90482f9118",
"subActivityId": "bbfb038b-4467-4f96-afd4-945904fc6278",
"activityType": "DN.AdminCommand.IngestionResourcesGetCommand",
"parentActivityId": "00e678e9-4204-4143-8c94-6afd94c27430",
"activityStack": "(Activity stack: CRID=KPC.execute;a3dfb878-9d2b-49d6-89a5-e9b3a9f1f674 ARID=87eb8fc9-78b3-4580-bcc8-6c90482f9118 > DN.Admin.Client.ExecuteControlCommand/833dfb85-5d67-44b7-882d-eb2283e65780 > P.WCF.Service.ExecuteControlCommand..IInterNodeCommunicationAdminContract/3784e74f-1d89-4c15-adef-0a360c4c431e > DN.FE.ExecuteControlCommand/00e678e9-4204-4143-8c94-6afd94c27430 > DN.AdminCommand.IngestionResourcesGetCommand/bbfb038b-4467-4f96-afd4-945904fc6278)",
},
"@permanent": False,
}
}
return response_status, {}, json.dumps(response_body)
def request_callback(request):
body = json.loads(request.body.decode()) if type(request.body) == bytes else json.loads(request.body)
response_status = 400

Просмотреть файл

@ -11,7 +11,7 @@ import responses
from azure.kusto.data.data_format import DataFormat
from azure.kusto.data.exceptions import KustoApiError
from azure.kusto.ingest import ManagedStreamingIngestClient, IngestionProperties, IngestionStatus, BlobDescriptor
from test_kusto_ingest_client import request_callback as queued_request_callback, assert_queued_upload
from test_kusto_ingest_client import request_callback as queued_request_callback, assert_queued_upload, request_callback_throw_transient
from test_kusto_streaming_ingest_client import request_callback as streaming_request_callback, assert_managed_streaming_request_id
@ -21,6 +21,11 @@ class TransientResponseHelper:
self.total_calls = 0
@pytest.fixture(params=["Blob", "File"])
def is_blob(request):
return request.param == "Blob"
def transient_error_callback(helper: TransientResponseHelper, request, custom_request_id=None):
if custom_request_id:
assert request.headers["x-ms-client-request-id"] == custom_request_id
@ -84,7 +89,7 @@ class TestManagedStreamingIngestClient:
@patch("azure.storage.blob.BlobClient.upload_blob")
@patch("azure.storage.queue.QueueClient.send_message")
@patch("uuid.uuid4", return_value=MOCKED_UUID_4)
def test_fallback_big_file(self, mock_uuid, mock_put_message_in_queue, mock_upload_blob_from_stream, mock_aad):
def test_fallback_big_file(self, mock_uuid, mock_put_message_in_queue, mock_upload_blob_from_stream, mock_aad, is_blob):
responses.add_callback(
responses.POST, "https://ingest-somecluster.kusto.windows.net/v1/rest/mgmt", callback=queued_request_callback, content_type="application/json"
)
@ -108,10 +113,20 @@ class TestManagedStreamingIngestClient:
mock_upload_blob_from_stream.side_effect = check_bytes
f = NamedTemporaryFile(dir=".", mode="wb", delete=False)
blob_url = "https://storageaccount.blob.core.windows.net/tempstorage/database__table__11111111-1111-1111-1111-111111111111__{}?".format(
os.path.basename(f.name)
)
try:
f.write(initial_bytes)
f.close()
result = ingest_client.ingest_from_file(f.name, ingestion_properties=ingestion_properties)
if is_blob:
result = ingest_client.ingest_from_blob(BlobDescriptor(blob_url + "sas", 5 * 1024 * 1024), ingestion_properties=ingestion_properties)
f.close()
else:
f.write(initial_bytes)
f.close()
result = ingest_client.ingest_from_file(f.name, ingestion_properties=ingestion_properties)
except Exception as e:
print(e)
finally:
os.unlink(f.name)
@ -119,14 +134,13 @@ class TestManagedStreamingIngestClient:
assert_queued_upload(
mock_put_message_in_queue,
mock_upload_blob_from_stream,
"https://storageaccount.blob.core.windows.net/tempstorage/database__table__11111111-1111-1111-1111-111111111111__{}?".format(
os.path.basename(f.name)
),
mock_upload_blob_from_stream if not is_blob else None,
blob_url,
format=data_format.kusto_value,
)
mock_upload_blob_from_stream.assert_called()
if not is_blob:
mock_upload_blob_from_stream.assert_called()
@responses.activate
@patch("azure.kusto.data.security._AadHelper.acquire_authorization_header", return_value=None)
@ -314,6 +328,12 @@ class TestManagedStreamingIngestClient:
responses.add_callback(
responses.POST, "https://ingest-somecluster.kusto.windows.net/v1/rest/mgmt", callback=queued_request_callback, content_type="application/json"
)
responses.add_callback(
responses.POST,
"https://somecluster.kusto.windows.net/v1/rest/ingest/database/table?streamFormat=csv&sourceKind=uri",
callback=request_callback_throw_transient,
content_type="application/json",
)
ingest_client = ManagedStreamingIngestClient.from_dm_kcsb("https://ingest-somecluster.kusto.windows.net")
ingestion_properties = IngestionProperties(database="database", table="table")