зеркало из
1
0
Форкнуть 0

User/t eitanmoed/distributed tracing (#419)

* added distributed_trace decorator to data sdk

* added distributed tracing example to sample app

* cleaner syntax

* added telemetry class

* formatted

* reverted kcsb

* reverted kcsb2

* ingestfrom* query,cntrlcmd, tracing added no test

* ingestfrom* query,cntrlcmd, tracing added no test post format

* ingestfrom* tracing added

* ingestion_properties added

* pretest and prenotes

* added notes

* kusto telemetry class created

* formatted

* removed file

* tests

* async implementation

* fixed uuid value attribute telemetry issue

* revert in kcsb

* fixed async trace

* fixed async get token

* added context dict of get_token

* added tracing attribute abstraction for descriptors and improved encapsulation of ClientRequestionProperties, IngestionProperties to include tracing attributes

* edited tests to to consider context call before get_token call during trace

* increased testing

* removed irrelevant constants

* removed unneccesary nesting

* test specific responses

* merged conflicts

* removed unused dependencies

* reformat

---------

Co-authored-by: AsafMah <asafmahlev@microsoft.com>
This commit is contained in:
Eitan Moed 2023-02-08 11:27:33 +02:00 коммит произвёл GitHub
Родитель 800d945e45
Коммит 921249ccd8
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
24 изменённых файлов: 633 добавлений и 52 удалений

1
.gitignore поставляемый
Просмотреть файл

@ -266,6 +266,7 @@ paket-files/
# JetBrains Rider
.idea/
.run/
*.sln.iml
# CodeRush

Просмотреть файл

@ -5,7 +5,11 @@ from urllib.parse import urljoin
import requests
from azure.kusto.data.exceptions import KustoServiceError
from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing import SpanKind
from ._telemetry import KustoTracingAttributes, KustoTracing
from .exceptions import KustoServiceError
METADATA_ENDPOINT = "v1/rest/auth/metadata"
@ -69,15 +73,24 @@ class CloudSettings:
)
@classmethod
@distributed_trace(name_of_span="CloudSettings.get_cloud_info", kind=SpanKind.CLIENT)
def get_cloud_info_for_cluster(cls, kusto_uri: str, proxies: Optional[Dict[str, str]] = None) -> CloudInfo:
# tracing attributes for cloud info
KustoTracingAttributes.set_cloud_info_attributes(kusto_uri)
if kusto_uri in cls._cloud_cache: # Double-checked locking to avoid unnecessary lock access
return cls._cloud_cache[kusto_uri]
with cls._cloud_cache_lock:
if kusto_uri in cls._cloud_cache:
return cls._cloud_cache[kusto_uri]
url = urljoin(kusto_uri, METADATA_ENDPOINT)
result = requests.get(urljoin(kusto_uri, METADATA_ENDPOINT), proxies=proxies)
# trace http get call for result
http_trace_attributes = KustoTracingAttributes.create_http_attributes(url=url, method="GET")
result = KustoTracing.call_func_tracing(
requests.get, url, proxies=proxies, name_of_span="CloudSettings.http_get", tracing_attributes=http_trace_attributes
)
if result.status_code == 200:
content = result.json()

Просмотреть файл

@ -0,0 +1,159 @@
from typing import Callable, Optional
from azure.core.settings import settings
from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing.decorator_async import distributed_trace_async
from azure.core.tracing import SpanKind
from .client_request_properties import ClientRequestProperties
class KustoTracingAttributes:
"""
Additional ADX attributes for telemetry spans
"""
_KUSTO_CLUSTER = "kusto_cluster"
_DATABASE = "database"
_TABLE = "table"
_AUTH_METHOD = "authentication_method"
_CLIENT_ACTIVITY_ID = "client_activity_id"
_SPAN_COMPONENT = "component"
_HTTP = "http"
_HTTP_USER_AGENT = "http.user_agent"
_HTTP_METHOD = "http.method"
_HTTP_URL = "http.url"
@classmethod
def add_attributes(cls, **kwargs) -> None:
"""
Add ADX attributes to the current span
:key dict tracing_attributes: key, val ADX attributes to include in span of trace
"""
tracing_attributes: dict = kwargs.pop("tracing_attributes", {})
span_impl_type = settings.tracing_implementation()
if span_impl_type is None:
return
current_span = span_impl_type.get_current_span()
span = span_impl_type(span=current_span)
for key, val in tracing_attributes.items():
span.add_attribute(key, val)
@classmethod
def set_query_attributes(cls, cluster: str, database: str, properties: Optional[ClientRequestProperties] = None) -> None:
query_attributes: dict = cls.create_query_attributes(cluster, database, properties)
cls.add_attributes(tracing_attributes=query_attributes)
@classmethod
def set_streaming_ingest_attributes(cls, cluster: str, database: str, table: str, properties: Optional[ClientRequestProperties] = None) -> None:
ingest_attributes: dict = cls.create_streaming_ingest_attributes(cluster, database, table, properties)
cls.add_attributes(tracing_attributes=ingest_attributes)
@classmethod
def set_http_attributes(cls, url: str, method: str, headers: dict) -> None:
http_tracing_attributes: dict = cls.create_http_attributes(method, url, headers)
cls.add_attributes(tracing_attributes=http_tracing_attributes)
@classmethod
def set_cloud_info_attributes(cls, url: str) -> None:
cloud_info_attributes: dict = cls.create_cloud_info_attributes(url)
cls.add_attributes(tracing_attributes=cloud_info_attributes)
@classmethod
def create_query_attributes(cls, cluster: str, database: str, properties: Optional[ClientRequestProperties] = None) -> dict:
query_attributes: dict = {cls._KUSTO_CLUSTER: cluster, cls._DATABASE: database}
if properties:
query_attributes.update(properties.get_tracing_attributes())
return query_attributes
@classmethod
def create_streaming_ingest_attributes(cls, cluster: str, database: str, table: str, properties: Optional[ClientRequestProperties] = None) -> dict:
ingest_attributes: dict = {cls._KUSTO_CLUSTER: cluster, cls._DATABASE: database, cls._TABLE: table}
if properties:
ingest_attributes.update(properties.get_tracing_attributes())
return ingest_attributes
@classmethod
def create_http_attributes(cls, method: str, url: str, headers: dict = None) -> dict:
if headers is None:
headers = {}
http_tracing_attributes: dict = {
cls._SPAN_COMPONENT: cls._HTTP,
cls._HTTP_METHOD: method,
cls._HTTP_URL: url,
}
user_agent = headers.get("User-Agent")
if user_agent:
http_tracing_attributes[cls._HTTP_USER_AGENT] = user_agent
return http_tracing_attributes
@classmethod
def create_cloud_info_attributes(cls, url: str) -> dict:
ingest_attributes: dict = {cls._HTTP_URL: url}
return ingest_attributes
@classmethod
def create_cluster_attributes(cls, cluster_uri: str) -> dict:
cluster_attributes = {cls._KUSTO_CLUSTER: cluster_uri}
return cluster_attributes
class KustoTracing:
@staticmethod
def call_func_tracing(func: Callable, *args, **kwargs):
"""
Prepares function for tracing and calls it
:param func: function to trace
:type func: Callable
:key str name_of_span: name of the trace span
:key dict tracing_attributes: key/value dictionary of attributes to include in span of trace
:key str kind: the type of span
:param kwargs: function arguments
"""
name_of_span: str = kwargs.pop("name_of_span", None)
tracing_attributes: dict = kwargs.pop("tracing_attributes", {})
kind: str = kwargs.pop("kind", SpanKind.CLIENT)
kusto_trace: Callable = distributed_trace(name_of_span=name_of_span, tracing_attributes=tracing_attributes, kind=kind)
kusto_func: Callable = kusto_trace(func)
return kusto_func(*args, **kwargs)
@staticmethod
async def call_func_tracing_async(func: Callable, *args, **kwargs):
"""
Prepares function for tracing and calls it
:param func: function to trace
:type func: Callable
:key str name_of_span: name of the trace span
:key dict tracing_attributes: key/value dictionary of attributes to include in span of trace
:key str kind: the type of span
:param kwargs: function arguments
"""
name_of_span: str = kwargs.pop("name_of_span", None)
tracing_attributes: dict = kwargs.pop("tracing_attributes", {})
kind: str = kwargs.pop("kind", SpanKind.CLIENT)
kusto_trace: Callable = distributed_trace_async(name_of_span=name_of_span, tracing_attributes=tracing_attributes, kind=kind)
kusto_func: Callable = kusto_trace(func)
return await kusto_func(*args, **kwargs)
@staticmethod
def prepare_func_tracing(func: Callable, **kwargs):
"""
Prepares function for tracing
:param func: function to trace
:type func: Callable
:key str name_of_span: name of the trace span
:key dict tracing_attributes: key/value dictionary of attributes to include in span of trace
:key str kind: the type of span
"""
name_of_span: str = kwargs.pop("name_of_span", None)
tracing_attributes: dict = kwargs.pop("tracing_attributes", {})
kind: str = kwargs.pop("kind", SpanKind.CLIENT)
kusto_trace: Callable = distributed_trace(name_of_span=name_of_span, tracing_attributes=tracing_attributes, kind=kind)
return kusto_trace(func)

Просмотреть файл

@ -8,10 +8,14 @@ from threading import Lock
from typing import Callable, Coroutine, List, Optional, Any
from azure.core.exceptions import ClientAuthenticationError
from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing.decorator_async import distributed_trace_async
from azure.core.tracing import SpanKind
from azure.identity import AzureCliCredential, ManagedIdentityCredential
from msal import ConfidentialClientApplication, PublicClientApplication
from ._cloud_settings import CloudInfo, CloudSettings
from ._telemetry import KustoTracing
from .exceptions import KustoAioSyntaxError, KustoAsyncUsageError, KustoClientError
try:
@ -129,16 +133,23 @@ class TokenProviderBase(abc.ABC):
def get_token(self):
"""Get a token silently from cache or authenticate if cached token is not found"""
if self.is_async:
raise KustoAsyncUsageError("get_token", self.is_async)
self._init_once()
token = self._get_token_from_cache_impl()
if token is None:
with self._lock:
token = self._get_token_impl()
@distributed_trace(name_of_span=f"{self.name()}.get_token", tracing_attributes=self.context(), kind=SpanKind.CLIENT)
def _get_token():
if self.is_async:
raise KustoAsyncUsageError("get_token", self.is_async)
self._init_once()
return self._valid_token_or_throw(token)
token = self._get_token_from_cache_impl()
if token is None:
with self._lock:
token = KustoTracing.call_func_tracing(
self._get_token_impl, name_of_span=f"{self.name()}.get_token_impl", tracing_attributes=self.context()
)
return self._valid_token_or_throw(token)
return _get_token()
def context(self) -> dict:
if self.is_async:
@ -156,18 +167,24 @@ class TokenProviderBase(abc.ABC):
async def get_token_async(self):
"""Get a token asynchronously silently from cache or authenticate if cached token is not found"""
if not self.is_async:
raise KustoAsyncUsageError("get_token_async", self.is_async)
@distributed_trace_async(name_of_span=f"{self.name()}.get_token_async", tracing_attributes=self.context_async(), kind=SpanKind.CLIENT)
async def _get_token_async():
if not self.is_async:
raise KustoAsyncUsageError("get_token_async", self.is_async)
await self._init_once_async()
await self._init_once_async()
token = self._get_token_from_cache_impl()
token = self._get_token_from_cache_impl()
if token is None:
async with self._async_lock:
token = await self._get_token_impl_async()
if token is None:
async with self._async_lock:
token = await KustoTracing.call_func_tracing_async(
self._get_token_impl_async, name_of_span=f"{self.name()}.get_token_impl_async", tracing_attributes=self.context_async()
)
return self._valid_token_or_throw(token)
return self._valid_token_or_throw(token)
return await _get_token_async()
@staticmethod
@abc.abstractmethod

Просмотреть файл

@ -2,7 +2,12 @@ import io
from datetime import timedelta
from typing import Optional, Union
from azure.core.tracing.decorator_async import distributed_trace_async
from azure.core.tracing import SpanKind
from .response import KustoStreamingResponseDataSet
from .._telemetry import KustoTracing, KustoTracingAttributes
from .._decorators import aio_documented_by, documented_by
from ..aio.streaming_response import JsonTokenReader, StreamingDataSetEnumerator
from ..client import KustoClient as KustoClientSync
@ -45,14 +50,21 @@ class KustoClient(_KustoClientBase):
return await self.execute_mgmt(database, query, properties)
return await self.execute_query(database, query, properties)
@distributed_trace_async(name_of_span="KustoClient.query_cmd", kind=SpanKind.CLIENT)
@aio_documented_by(KustoClientSync.execute_query)
async def execute_query(self, database: str, query: str, properties: ClientRequestProperties = None) -> KustoResponseDataSet:
KustoTracingAttributes.set_query_attributes(self._kusto_cluster, database, properties)
return await self._execute(self._query_endpoint, database, query, None, KustoClient._query_default_timeout, properties)
@distributed_trace_async(name_of_span="KustoClient.control_cmd", kind=SpanKind.CLIENT)
@aio_documented_by(KustoClientSync.execute_mgmt)
async def execute_mgmt(self, database: str, query: str, properties: ClientRequestProperties = None) -> KustoResponseDataSet:
KustoTracingAttributes.set_query_attributes(self._kusto_cluster, database, properties)
return await self._execute(self._mgmt_endpoint, database, query, None, KustoClient._mgmt_default_timeout, properties)
@distributed_trace_async(name_of_span="KustoClient.streaming_ingest", kind=SpanKind.CLIENT)
@aio_documented_by(KustoClientSync.execute_streaming_ingest)
async def execute_streaming_ingest(
self,
@ -63,6 +75,8 @@ class KustoClient(_KustoClientBase):
properties: ClientRequestProperties = None,
mapping_name: str = None,
):
KustoTracingAttributes.set_streaming_ingest_attributes(self._kusto_cluster, database, table, properties)
stream_format = stream_format.kusto_value if isinstance(stream_format, DataFormat) else DataFormat[stream_format.upper()].kusto_value
endpoint = self._streaming_ingest_endpoint + database + "/" + table + "?streamFormat=" + stream_format
if mapping_name is not None:
@ -77,10 +91,13 @@ class KustoClient(_KustoClientBase):
response = await self._execute(self._query_endpoint, database, query, None, timeout, properties, stream_response=True)
return StreamingDataSetEnumerator(JsonTokenReader(response.content))
@distributed_trace_async(name_of_span="KustoClient.streaming_query", kind=SpanKind.CLIENT)
@aio_documented_by(KustoClientSync.execute_streaming_query)
async def execute_streaming_query(
self, database: str, query: str, timeout: timedelta = _KustoClientBase._query_default_timeout, properties: Optional[ClientRequestProperties] = None
) -> KustoStreamingResponseDataSet:
KustoTracingAttributes.set_query_attributes(self._kusto_cluster, database, properties)
response = await self._execute_streaming_query_parsed(database, query, timeout, properties)
return KustoStreamingResponseDataSet(response)
@ -116,7 +133,18 @@ class KustoClient(_KustoClientBase):
if self._aad_helper:
request_headers["Authorization"] = await self._aad_helper.acquire_authorization_header_async()
response = await self._session.post(endpoint, headers=request_headers, data=payload, json=json_payload, timeout=timeout.seconds, proxy=self._proxy_url)
http_trace_attributes = KustoTracingAttributes.create_http_attributes(url=endpoint, method="POST", headers=request_headers)
response = await KustoTracing.call_func_tracing_async(
self._session.post,
endpoint,
headers=request_headers,
json=json_payload,
data=payload,
timeout=timeout.seconds,
proxy=self._proxy_url,
name_of_span="KustoClient.http_post",
tracing_attributes=http_trace_attributes,
)
if stream_response:
try:
@ -145,4 +173,4 @@ class KustoClient(_KustoClientBase):
response_text = None
raise self._handle_http_error(e, endpoint, payload, response, response.status, response_json, response_text)
return self._kusto_parse_by_endpoint(endpoint, response_json)
return KustoTracing.call_func_tracing(self._kusto_parse_by_endpoint, endpoint, response_json, name_of_span="KustoClient.processing_response")

Просмотреть файл

@ -10,6 +10,11 @@ import requests.adapters
from requests import Response
from urllib3.connection import HTTPConnection
from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing import SpanKind
from azure.kusto.data._telemetry import KustoTracingAttributes, KustoTracing
from .client_base import ExecuteRequestParams, _KustoClientBase
from .client_request_properties import ClientRequestProperties
from .data_format import DataFormat
@ -154,6 +159,7 @@ class KustoClient(_KustoClientBase):
return self.execute_mgmt(database, query, properties)
return self.execute_query(database, query, properties)
@distributed_trace(name_of_span="KustoClient.query_cmd", kind=SpanKind.CLIENT)
def execute_query(self, database: str, query: str, properties: Optional[ClientRequestProperties] = None) -> KustoResponseDataSet:
"""
Execute a KQL query.
@ -164,8 +170,11 @@ class KustoClient(_KustoClientBase):
:return: Kusto response data set.
:rtype: azure.kusto.data.response.KustoResponseDataSet
"""
KustoTracingAttributes.set_query_attributes(self._kusto_cluster, database, properties)
return self._execute(self._query_endpoint, database, query, None, self._query_default_timeout, properties)
@distributed_trace(name_of_span="KustoClient.control_cmd", kind=SpanKind.CLIENT)
def execute_mgmt(self, database: str, query: str, properties: Optional[ClientRequestProperties] = None) -> KustoResponseDataSet:
"""
Execute a KQL control command.
@ -176,8 +185,11 @@ class KustoClient(_KustoClientBase):
:return: Kusto response data set.
:rtype: azure.kusto.data.response.KustoResponseDataSet
"""
KustoTracingAttributes.set_query_attributes(self._kusto_cluster, database, properties)
return self._execute(self._mgmt_endpoint, database, query, None, self._mgmt_default_timeout, properties)
@distributed_trace(name_of_span="KustoClient.streaming_ingest", kind=SpanKind.CLIENT)
def execute_streaming_ingest(
self,
database: str,
@ -199,6 +211,8 @@ class KustoClient(_KustoClientBase):
:param ClientRequestProperties properties: additional request properties.
:param str mapping_name: Pre-defined mapping of the table. Required when stream_format is json/avro.
"""
KustoTracingAttributes.set_streaming_ingest_attributes(self._kusto_cluster, database, table, properties)
stream_format = stream_format.kusto_value if isinstance(stream_format, DataFormat) else DataFormat[stream_format.upper()].kusto_value
endpoint = self._streaming_ingest_endpoint + database + "/" + table + "?streamFormat=" + stream_format
if mapping_name is not None:
@ -213,6 +227,7 @@ class KustoClient(_KustoClientBase):
response.raw.decode_content = True
return StreamingDataSetEnumerator(JsonTokenReader(response.raw))
@distributed_trace(name_of_span="KustoClient.streaming_query", kind=SpanKind.CLIENT)
def execute_streaming_query(
self, database: str, query: str, timeout: timedelta = _KustoClientBase._query_default_timeout, properties: Optional[ClientRequestProperties] = None
) -> KustoStreamingResponseDataSet:
@ -226,6 +241,8 @@ class KustoClient(_KustoClientBase):
:param azure.kusto.data.ClientRequestProperties properties: Optional additional properties.
:return KustoStreamingResponseDataSet:
"""
KustoTracingAttributes.set_query_attributes(self._kusto_cluster, database, properties)
return KustoStreamingResponseDataSet(self._execute_streaming_query_parsed(database, query, timeout, properties))
def _execute(
@ -258,7 +275,20 @@ class KustoClient(_KustoClientBase):
timeout = request_params.timeout
if self._aad_helper:
request_headers["Authorization"] = self._aad_helper.acquire_authorization_header()
response = self._session.post(endpoint, headers=request_headers, json=json_payload, data=payload, timeout=timeout.seconds, stream=stream_response)
# trace http post call for response
http_trace_attributes = KustoTracingAttributes.create_http_attributes(url=endpoint, method="POST", headers=request_headers)
response = KustoTracing.call_func_tracing(
self._session.post,
endpoint,
headers=request_headers,
json=json_payload,
data=payload,
timeout=timeout.seconds,
stream=stream_response,
name_of_span="KustoClient.http_post",
tracing_attributes=http_trace_attributes,
)
if stream_response:
try:
@ -273,5 +303,5 @@ class KustoClient(_KustoClientBase):
response.raise_for_status()
except Exception as e:
raise self._handle_http_error(e, endpoint, payload, response, response.status_code, response_json, response.text)
return self._kusto_parse_by_endpoint(endpoint, response_json)
# trace response processing
return KustoTracing.call_func_tracing(self._kusto_parse_by_endpoint, endpoint, response_json, name_of_span="KustoClient.processing_response")

Просмотреть файл

@ -9,6 +9,8 @@ class ClientRequestProperties:
For more information please look at: https://docs.microsoft.com/en-us/azure/kusto/api/netfx/request-properties
"""
_CLIENT_REQUEST_ID = "client_request_id"
results_defer_partial_query_failures_option_name = "deferpartialqueryfailures"
request_timeout_option_name = "servertimeout"
no_request_timeout_option_name = "norequesttimeout"
@ -49,3 +51,7 @@ class ClientRequestProperties:
def to_json(self) -> str:
"""Safe serialization to a JSON string."""
return json.dumps({"Options": self._options, "Parameters": self._parameters}, default=str)
def get_tracing_attributes(self) -> dict:
"""Gets dictionary of attributes to be documented during tracing"""
return {self._CLIENT_REQUEST_ID: str(self.client_request_id)}

Просмотреть файл

@ -12,6 +12,8 @@ class KustoConnectionStringBuilder:
https://github.com/Azure/azure-kusto-python/blob/master/azure-kusto-data/tests/sample.py
"""
kcsb_invalid_item_error = "%s is not supported as an item in KustoConnectionStringBuilder"
@unique
class ValidKeywords(Enum):
"""
@ -134,7 +136,10 @@ class KustoConnectionStringBuilder:
for kvp_string in connection_string.split(";"):
key, _, value = kvp_string.partition("=")
keyword = self.ValidKeywords.parse(key)
try:
keyword = self.ValidKeywords.parse(key)
except KeyError:
raise KeyError(self.kcsb_invalid_item_error % key)
value_stripped = value.strip()
if keyword.is_str_type():
self[keyword] = value_stripped.rstrip("/")
@ -150,7 +155,7 @@ class KustoConnectionStringBuilder:
try:
keyword = key if isinstance(key, self.ValidKeywords) else self.ValidKeywords.parse(key)
except KeyError:
raise KeyError("%s is not supported as an item in KustoConnectionStringBuilder" % key)
raise KeyError(self.kcsb_invalid_item_error % key)
if value is None:
raise TypeError("Value cannot be None.")
@ -182,6 +187,7 @@ class KustoConnectionStringBuilder:
"""
assert_string_is_not_empty(user_id)
assert_string_is_not_empty(password)
kcsb = cls(connection_string)
kcsb[kcsb.ValidKeywords.aad_federated_security] = True
kcsb[kcsb.ValidKeywords.aad_user_id] = user_id
@ -200,6 +206,7 @@ class KustoConnectionStringBuilder:
:param str user_token: AAD user token.
"""
assert_string_is_not_empty(user_token)
kcsb = cls(connection_string)
kcsb[kcsb.ValidKeywords.aad_federated_security] = True
kcsb[kcsb.ValidKeywords.user_token] = user_token
@ -220,6 +227,7 @@ class KustoConnectionStringBuilder:
assert_string_is_not_empty(aad_app_id)
assert_string_is_not_empty(app_key)
assert_string_is_not_empty(authority_id)
kcsb = cls(connection_string)
kcsb[kcsb.ValidKeywords.aad_federated_security] = True
kcsb[kcsb.ValidKeywords.application_client_id] = aad_app_id

Просмотреть файл

@ -45,6 +45,6 @@ setup(
packages=find_packages(exclude=["azure", "tests"]),
package_data={"": ["wellKnownKustoEndpoints.json"]},
include_package_data=True,
install_requires=["python-dateutil>=2.8.0", "requests>=2.13.0", "azure-identity>=1.5.0,<2", "msal>=1.9.0,<2", "ijson~=3.1"],
install_requires=["python-dateutil>=2.8.0", "requests>=2.13.0", "azure-identity>=1.5.0,<2", "msal>=1.9.0,<2", "ijson~=3.1", "azure-core>=1.11.0<2"],
extras_require={"pandas": ["pandas"], "aio": ["aiohttp>=3.4.4,<4", "asgiref>=3.2.3,<4"]},
)

Просмотреть файл

@ -83,7 +83,12 @@ class TestTokenProvider:
provider.get_token()
assert False, "Expected KustoAsyncUsageError to occur"
except KustoAsyncUsageError as e:
assert str(e) == "Method get_token can't be called from an asynchronous client"
assert (
str(e) == "Method get_token can't be called from an asynchronous client"
or str(e) == "Method context can't be called from an asynchronous client"
)
# context is called for tracing purposes
try:
provider.context()
assert False, "Expected KustoAsyncUsageError to occur"

Просмотреть файл

@ -0,0 +1,89 @@
import pytest
from azure.kusto.data._telemetry import KustoTracing, KustoTracingAttributes
from azure.kusto.data.client_request_properties import ClientRequestProperties
class TestTelemetry:
"""
Tests for telemetry class to make sure adding tracing doesn't impact functionality of original code
"""
@staticmethod
def plus_one(num):
KustoTracingAttributes.add_attributes(tracing_attributes={"foo": "bar"})
return num + 1
@staticmethod
async def plus_one_async(num):
KustoTracingAttributes.add_attributes(tracing_attributes={"foo": "bar"})
return num + 1
@staticmethod
def test_call_func_tracing():
res = KustoTracing.call_func_tracing(TestTelemetry.plus_one, 1, name_of_span="plus_one")
assert res == 2
@staticmethod
def test_prepare_func_tracing():
res = KustoTracing.prepare_func_tracing(TestTelemetry.plus_one, name_of_span="plus_one")
assert res(1) == 2
@staticmethod
@pytest.mark.asyncio
async def test_call_func_tracing_async():
res = KustoTracing.call_func_tracing_async(TestTelemetry.plus_one_async, 1, name_of_span="plus_one")
assert await res == 2
@staticmethod
def test_get_client_request_properties_attributes():
attributes = ClientRequestProperties().get_tracing_attributes()
keynames = {"client_request_id"}
assert isinstance(attributes, dict)
for key, val in attributes.items():
assert key in keynames
assert isinstance(val, str)
for key in keynames:
assert key in attributes.keys()
@staticmethod
def test_create_query_attributes():
attributes = KustoTracingAttributes.create_query_attributes("cluster_test", "database_test", ClientRequestProperties())
keynames = {"kusto_cluster", "database", "client_request_id"}
assert isinstance(attributes, dict)
for key, val in attributes.items():
assert isinstance(val, str)
for key in keynames:
assert key in attributes.keys()
attributes = KustoTracingAttributes.create_query_attributes("cluster_test", "database_test")
keynames = {"kusto_cluster", "database"}
assert isinstance(attributes, dict)
for key, val in attributes.items():
assert isinstance(val, str)
for key in keynames:
assert key in attributes.keys()
@staticmethod
def test_create_ingest_attributes():
attributes = KustoTracingAttributes.create_streaming_ingest_attributes("cluster_test", "database_test", "table", ClientRequestProperties())
keynames = {"kusto_cluster", "database", "table", "client_request_id"}
assert isinstance(attributes, dict)
for key, val in attributes.items():
assert isinstance(val, str)
for key in keynames:
assert key in attributes.keys()
attributes = KustoTracingAttributes.create_streaming_ingest_attributes("cluster_test", "database_test", "table")
keynames = {"kusto_cluster", "database", "table"}
assert isinstance(attributes, dict)
for key, val in attributes.items():
assert isinstance(val, str)
for key in keynames:
assert key in attributes.keys()
@staticmethod
def test_create_http_attributes():
attributes = KustoTracingAttributes.create_http_attributes("method_test", "url_test")
assert attributes == {"component": "http", "http.method": "method_test", "http.url": "url_test"}
headers = {"User-Agent": "user_agent_test"}
attributes = KustoTracingAttributes.create_http_attributes("method_test", "url_test", headers)
assert attributes == {"component": "http", "http.method": "method_test", "http.url": "url_test", "http.user_agent": "user_agent_test"}

Просмотреть файл

@ -114,7 +114,12 @@ class TokenProviderTests(unittest.TestCase):
async_to_sync(provider.get_token_async)()
assert False, "Expected KustoAsyncUsageError to occur"
except KustoAsyncUsageError as e:
assert str(e) == "Method get_token_async can't be called from a synchronous client"
assert (
str(e) == "Method get_token_async can't be called from a synchronous client"
or str(e) == "Method context_async can't be called from a synchronous client"
)
# context_async is called for tracing purposes
try:
async_to_sync(provider.context_async)()
assert False, "Expected KustoAsyncUsageError to occur"

Просмотреть файл

@ -0,0 +1,24 @@
import uuid
from azure.kusto.data._telemetry import KustoTracingAttributes
from .descriptors import DescriptorBase
from .ingestion_properties import IngestionProperties
class IngestTracingAttributes:
"""
Additional ADX attributes for telemetry spans
"""
_BLOB_QUEUE_NAME = "blob_queue_name"
_SOURCE_ID = "source_id"
@classmethod
def set_ingest_descriptor_attributes(cls, descriptor: DescriptorBase, ingestion_properties: IngestionProperties) -> None:
KustoTracingAttributes.add_attributes(tracing_attributes={**ingestion_properties.get_tracing_attributes(), **descriptor.get_tracing_attributes()})
@classmethod
def create_enqueue_request_attributes(cls, queue_name: str, source_id: uuid.UUID) -> dict:
enqueue_request_attributes = {cls._BLOB_QUEUE_NAME: queue_name, cls._SOURCE_ID: str(source_id)}
return enqueue_request_attributes

Просмотреть файл

@ -7,6 +7,7 @@ from tenacity import retry_if_exception_type, stop_after_attempt, Retrying, wait
from azure.kusto.data import KustoClient
from azure.kusto.data._models import KustoResultTable
from azure.kusto.data._telemetry import KustoTracing, KustoTracingAttributes
from azure.kusto.data.exceptions import KustoThrottlingError
_URI_FORMAT = re.compile("https://(\\w+).(queue|blob|table).(core.\\w+.\\w+)/([\\w,-]+)\\?(.*)")
@ -102,8 +103,14 @@ class _ResourceManager:
def _get_resource_by_name(self, table: KustoResultTable, resource_name: str):
return [_ResourceUri.parse(row["StorageRoot"]) for row in table if row["ResourceTypeName"] == resource_name]
def _get_ingest_client_resources_from_service(self) -> _IngestClientResources:
result = self._retryer(self._kusto_client.execute, "NetDefaultDB", ".get ingestion resources")
def _get_ingest_client_resources_from_service(self):
# trace all calls to get ingestion resources
trace_get_ingestion_resources = KustoTracing.prepare_func_tracing(
self._kusto_client.execute,
name_of_span="_ResourceManager.get_ingestion_resources",
tracing_attributes=KustoTracingAttributes.create_cluster_attributes(self._kusto_client._kusto_cluster),
)
result = self._retryer(trace_get_ingestion_resources, "NetDefaultDB", ".get ingestion resources")
table = result.primary_results[0]
secured_ready_for_aggregation_queues = self._get_resource_by_name(table, "SecuredReadyForAggregationQueue")
@ -124,7 +131,13 @@ class _ResourceManager:
self._authorization_context_last_update = datetime.utcnow()
def _get_authorization_context_from_service(self):
result = self._retryer(self._kusto_client.execute, "NetDefaultDB", ".get kusto identity token")
# trace all calls to get identity token
trace_get_identity_token = KustoTracing.prepare_func_tracing(
self._kusto_client.execute,
name_of_span="_ResourceManager.get_identity_token",
tracing_attributes=KustoTracingAttributes.create_cluster_attributes(self._kusto_client._kusto_cluster),
)
result = self._retryer(trace_get_identity_token, "NetDefaultDB", ".get kusto identity token")
return result.primary_results[0][0]["AuthorizationContext"]
def get_ingestion_queues(self) -> List[_ResourceUri]:

Просмотреть файл

@ -127,10 +127,7 @@ class BaseIngestClient(metaclass=ABCMeta):
:param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
:return prepared stream descriptor
"""
if not isinstance(stream_descriptor, StreamDescriptor):
new_descriptor = StreamDescriptor(stream_descriptor)
else:
new_descriptor = copy(stream_descriptor)
new_descriptor = StreamDescriptor.get_instance(stream_descriptor)
if isinstance(new_descriptor.stream, TextIOWrapper):
new_descriptor.stream = new_descriptor.stream.buffer
@ -149,10 +146,7 @@ class BaseIngestClient(metaclass=ABCMeta):
:param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
:return prepared file descriptor
"""
if not isinstance(file_descriptor, FileDescriptor):
descriptor = FileDescriptor(file_descriptor)
else:
descriptor = file_descriptor
descriptor = FileDescriptor.get_instance(file_descriptor)
should_compress = BaseIngestClient._should_compress(descriptor, ingestion_properties)
return descriptor, should_compress

Просмотреть файл

@ -1,10 +1,12 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License
import abc
import os
import random
import shutil
import struct
import uuid
from copy import copy
from gzip import GzipFile
from io import BytesIO, SEEK_END
from typing import Union, Optional, AnyStr, IO, List, Dict
@ -28,7 +30,18 @@ def ensure_uuid(maybe_uuid: OptionalUUID) -> uuid.UUID:
return uuid.UUID(f"{maybe_uuid}", version=4)
class FileDescriptor:
class DescriptorBase(abc.ABC):
"""This base class abstracts tracing attributes for all implementations."""
_SOURCE_ID = "source_id"
@abc.abstractmethod
def get_tracing_attributes(self) -> dict:
"""Gets dictionary of attributes to be documented during tracing"""
pass
class FileDescriptor(DescriptorBase):
"""FileDescriptor is used to describe a file that will be used as an ingestion source."""
# Gzip keeps the decompressed stream size as a UINT32 in the last 4 bytes of the stream, however this poses a limit to the expressed size which is 4GB
@ -36,6 +49,7 @@ class FileDescriptor:
# The below constant expresses the maximal size of a compressed stream that will not cause the UINT32 to rollover given a maximal compression ratio of 1:40
GZIP_MAX_DISK_SIZE_FOR_DETECTION = int(4 * 1024 * 1024 * 1024 / 40)
DEFAULT_COMPRESSION_RATIO = 11
_FILE_PATH = "file_path"
def __init__(self, path: str, size: Optional[int] = None, source_id: OptionalUUID = None):
"""
@ -107,10 +121,21 @@ class FileDescriptor:
file_stream.seek(0)
return file_stream
def get_tracing_attributes(self) -> dict:
return {self._FILE_PATH: self.stream_name, self._SOURCE_ID: str(self.source_id)}
class BlobDescriptor:
@classmethod
def get_instance(cls, file_descriptor: Union["FileDescriptor", str]) -> "FileDescriptor":
if not isinstance(file_descriptor, cls):
return cls(file_descriptor)
return file_descriptor
class BlobDescriptor(DescriptorBase):
"""BlobDescriptor is used to describe a blob that will be used as an ingestion source"""
_BLOB_URI = "blob_uri"
def __init__(self, path: str, size: Optional[int] = None, source_id: OptionalUUID = None):
"""
:param path: blob uri.
@ -155,10 +180,15 @@ class BlobDescriptor:
raise KustoBlobError(e)
return BlobDescriptor(blob_client.url, descriptor.size, descriptor.source_id)
def get_tracing_attributes(self) -> dict:
return {self._BLOB_URI: self.path, self._SOURCE_ID: str(self.source_id)}
class StreamDescriptor:
class StreamDescriptor(DescriptorBase):
"""StreamDescriptor is used to describe a stream that will be used as ingestion source"""
_STREAM_NAME = "stream_name"
# TODO: currently we always assume that streams are gz compressed (will get compressed before sending), should we expand that?
def __init__(
self, stream: IO[AnyStr], source_id: OptionalUUID = None, is_compressed: bool = False, stream_name: Optional[str] = None, size: Optional[int] = None
@ -203,11 +233,19 @@ class StreamDescriptor:
:param Union[FileDescriptor, str] file_descriptor: File Descriptor instance
:return new StreamDescriptor instance
"""
if isinstance(file_descriptor, FileDescriptor):
descriptor = file_descriptor
else:
descriptor = FileDescriptor(file_descriptor)
descriptor = FileDescriptor.get_instance(file_descriptor)
stream = open(descriptor.path, "rb")
is_compressed = descriptor.path.endswith(".gz") or descriptor.path.endswith(".zip")
stream_descriptor = StreamDescriptor(stream, descriptor.source_id, is_compressed, descriptor.stream_name, descriptor.size)
return stream_descriptor
@classmethod
def get_instance(cls, stream_descriptor: Union["StreamDescriptor", IO[AnyStr]]) -> "StreamDescriptor":
if not isinstance(stream_descriptor, cls):
descriptor = cls(stream_descriptor)
else:
descriptor = copy(stream_descriptor)
return descriptor
def get_tracing_attributes(self) -> dict:
return {self._STREAM_NAME: self.stream_name, self._SOURCE_ID: str(self.source_id)}

Просмотреть файл

@ -4,15 +4,20 @@ import random
from typing import Union, AnyStr, IO, List, Optional, Dict
from urllib.parse import urlparse
from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing import SpanKind
from azure.storage.queue import QueueServiceClient, TextBase64EncodePolicy
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data._telemetry import KustoTracing
from azure.kusto.data.exceptions import KustoClosedError, KustoServiceError
from .ingestion_blob_info import IngestionBlobInfo
from ._ingest_telemetry import IngestTracingAttributes
from ._resource_manager import _ResourceManager, _ResourceUri
from .base_ingest_client import BaseIngestClient, IngestionResult, IngestionStatus
from .descriptors import BlobDescriptor, FileDescriptor, StreamDescriptor
from .exceptions import KustoInvalidEndpointError
from .ingestion_blob_info import IngestionBlobInfo
from .ingestion_properties import IngestionProperties
@ -48,6 +53,7 @@ class QueuedIngestClient(BaseIngestClient):
self._resource_manager.set_proxy(proxy_url)
self._proxy_dict = {"http": proxy_url, "https": proxy_url}
@distributed_trace(name_of_span="QueuedIngestClient.ingest_from_file", kind=SpanKind.CLIENT)
def ingest_from_file(self, file_descriptor: Union[FileDescriptor, str], ingestion_properties: IngestionProperties) -> IngestionResult:
"""Enqueue an ingest command from local files.
To learn more about ingestion methods go to:
@ -55,6 +61,9 @@ class QueuedIngestClient(BaseIngestClient):
:param file_descriptor: a FileDescriptor to be ingested.
:param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
"""
file_descriptor = FileDescriptor.get_instance(file_descriptor)
IngestTracingAttributes.set_ingest_descriptor_attributes(file_descriptor, ingestion_properties)
super().ingest_from_file(file_descriptor, ingestion_properties)
containers = self._get_containers()
@ -72,11 +81,15 @@ class QueuedIngestClient(BaseIngestClient):
)
return self.ingest_from_blob(blob_descriptor, ingestion_properties=ingestion_properties)
@distributed_trace(name_of_span="QueuedIngestClient.ingest_from_stream", kind=SpanKind.CLIENT)
def ingest_from_stream(self, stream_descriptor: Union[StreamDescriptor, IO[AnyStr]], ingestion_properties: IngestionProperties) -> IngestionResult:
"""Ingest from io streams.
:param stream_descriptor: An object that contains a description of the stream to be ingested.
:param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
"""
stream_descriptor = StreamDescriptor.get_instance(stream_descriptor)
IngestTracingAttributes.set_ingest_descriptor_attributes(stream_descriptor, ingestion_properties)
super().ingest_from_stream(stream_descriptor, ingestion_properties)
containers = self._get_containers()
@ -93,6 +106,7 @@ class QueuedIngestClient(BaseIngestClient):
)
return self.ingest_from_blob(blob_descriptor, ingestion_properties=ingestion_properties)
@distributed_trace(name_of_span="QueuedIngestClient.ingest_from_blob", kind=SpanKind.CLIENT)
def ingest_from_blob(self, blob_descriptor: BlobDescriptor, ingestion_properties: IngestionProperties) -> IngestionResult:
"""Enqueue an ingest command from azure blobs.
To learn more about ingestion methods go to:
@ -100,6 +114,8 @@ class QueuedIngestClient(BaseIngestClient):
:param azure.kusto.ingest.BlobDescriptor blob_descriptor: An object that contains a description of the blob to be ingested.
:param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
"""
IngestTracingAttributes.set_ingest_descriptor_attributes(blob_descriptor, ingestion_properties)
if self._is_closed:
raise KustoClosedError()
@ -115,7 +131,15 @@ class QueuedIngestClient(BaseIngestClient):
ingestion_blob_info = IngestionBlobInfo(blob_descriptor, ingestion_properties=ingestion_properties, auth_context=authorization_context)
ingestion_blob_info_json = ingestion_blob_info.to_json()
with queue_service.get_queue_client(queue=random_queue.object_name, message_encode_policy=TextBase64EncodePolicy()) as queue_client:
queue_client.send_message(content=ingestion_blob_info_json, timeout=self._SERVICE_CLIENT_TIMEOUT_SECONDS)
# trace enqueuing of blob for ingestion
enqueue_trace_attributes = IngestTracingAttributes.create_enqueue_request_attributes(queue_client.queue_name, blob_descriptor.source_id)
KustoTracing.call_func_tracing(
queue_client.send_message,
content=ingestion_blob_info_json,
timeout=self._SERVICE_CLIENT_TIMEOUT_SECONDS,
name_of_span="QueuedIngestClient.enqueue_request",
tracing_attributes=enqueue_trace_attributes,
)
return IngestionResult(
IngestionStatus.QUEUED, ingestion_properties.database, ingestion_properties.table, blob_descriptor.source_id, blob_descriptor.path

Просмотреть файл

@ -152,6 +152,9 @@ class IngestionProperties:
For more information check out https://docs.microsoft.com/en-us/azure/data-explorer/ingestion-properties
"""
_DATABASE = "database"
_TABLE = "table"
def __init__(
self,
database: str,
@ -215,3 +218,7 @@ class IngestionProperties:
self.report_method = report_method
self.validation_policy = validation_policy
self.additional_properties = additional_properties
def get_tracing_attributes(self) -> dict:
"""Gets dictionary of attributes to be documented during tracing"""
return {self._DATABASE: self.database, self._TABLE: self.table}

Просмотреть файл

@ -4,9 +4,16 @@ from typing import AnyStr, IO, TYPE_CHECKING, Union
from tenacity import Retrying, _utils, stop_after_attempt, wait_random_exponential
from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing import SpanKind
from azure.kusto.data import KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoApiError, KustoClosedError
from azure.kusto.data._telemetry import KustoTracing
from . import BlobDescriptor, FileDescriptor, IngestionProperties, StreamDescriptor
from ._ingest_telemetry import IngestTracingAttributes
from ._stream_extensions import chain_streams, read_until_size_or_end
from .base_ingest_client import BaseIngestClient, IngestionResult
from .ingest_client import QueuedIngestClient
@ -25,7 +32,7 @@ class ManagedStreamingIngestClient(BaseIngestClient):
Managed streaming ingest client will fall back to queued if:
- Multiple transient errors were encountered when trying to do streaming ingestion
- The ingestion is too large for streaming ingestion (over 4MB)
- The ingestion is directly for a blob
- The ingestion is directly from a blob
"""
MAX_STREAMING_SIZE_IN_BYTES = 4 * 1024 * 1024
@ -78,7 +85,11 @@ class ManagedStreamingIngestClient(BaseIngestClient):
self.queued_client.set_proxy(proxy_url)
self.streaming_client.set_proxy(proxy_url)
@distributed_trace(kind=SpanKind.CLIENT)
def ingest_from_file(self, file_descriptor: Union[FileDescriptor, str], ingestion_properties: IngestionProperties) -> IngestionResult:
file_descriptor = FileDescriptor.get_instance(file_descriptor)
IngestTracingAttributes.set_ingest_descriptor_attributes(file_descriptor, ingestion_properties)
super().ingest_from_file(file_descriptor, ingestion_properties)
stream_descriptor = StreamDescriptor.from_file_descriptor(file_descriptor)
@ -86,7 +97,11 @@ class ManagedStreamingIngestClient(BaseIngestClient):
with stream_descriptor.stream:
return self.ingest_from_stream(stream_descriptor, ingestion_properties)
@distributed_trace(kind=SpanKind.CLIENT)
def ingest_from_stream(self, stream_descriptor: Union[StreamDescriptor, IO[AnyStr]], ingestion_properties: IngestionProperties) -> IngestionResult:
stream_descriptor = StreamDescriptor.get_instance(stream_descriptor)
IngestTracingAttributes.set_ingest_descriptor_attributes(stream_descriptor, ingestion_properties)
super().ingest_from_stream(stream_descriptor, ingestion_properties)
stream_descriptor = BaseIngestClient._prepare_stream(stream_descriptor, ingestion_properties)
@ -107,7 +122,14 @@ class ManagedStreamingIngestClient(BaseIngestClient):
with attempt:
stream.seek(0, SEEK_SET)
client_request_id = ManagedStreamingIngestClient._get_request_id(stream_descriptor.source_id, attempt.retry_state.attempt_number - 1)
return self.streaming_client._ingest_from_stream_with_client_request_id(stream_descriptor, ingestion_properties, client_request_id)
# trace attempt to ingest from stream
return KustoTracing.call_func_tracing(
self.streaming_client._ingest_from_stream_with_client_request_id,
stream_descriptor,
ingestion_properties,
client_request_id,
name_of_span="ManagedStreamingIngestClient.ingest_from_stream_attempt",
)
except KustoApiError as ex:
error = ex.get_api_error()
if error.permanent:
@ -115,6 +137,7 @@ class ManagedStreamingIngestClient(BaseIngestClient):
return self.queued_client.ingest_from_stream(stream_descriptor, ingestion_properties)
@distributed_trace(kind=SpanKind.CLIENT)
def ingest_from_blob(self, blob_descriptor: BlobDescriptor, ingestion_properties: IngestionProperties):
"""
Enqueue an ingest command from azure blobs.
@ -126,6 +149,8 @@ class ManagedStreamingIngestClient(BaseIngestClient):
:param azure.kusto.ingest.BlobDescriptor blob_descriptor: An object that contains a description of the blob to be ingested.
:param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
"""
IngestTracingAttributes.set_ingest_descriptor_attributes(blob_descriptor, ingestion_properties)
if self._is_closed:
raise KustoClosedError()

Просмотреть файл

@ -1,10 +1,14 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License
from typing import Union, AnyStr, Optional
from typing import IO
from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing import SpanKind
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder, ClientRequestProperties
from ._ingest_telemetry import IngestTracingAttributes
from .base_ingest_client import BaseIngestClient, IngestionResult, IngestionStatus
from .descriptors import FileDescriptor, StreamDescriptor
from .ingestion_properties import IngestionProperties
@ -32,11 +36,15 @@ class KustoStreamingIngestClient(BaseIngestClient):
def set_proxy(self, proxy_url: str):
self._kusto_client.set_proxy(proxy_url)
@distributed_trace(kind=SpanKind.CLIENT)
def ingest_from_file(self, file_descriptor: Union[FileDescriptor, str], ingestion_properties: IngestionProperties) -> IngestionResult:
"""Ingest from local files.
:param file_descriptor: a FileDescriptor to be ingested.
:param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
"""
file_descriptor = FileDescriptor.get_instance(file_descriptor)
IngestTracingAttributes.set_ingest_descriptor_attributes(file_descriptor, ingestion_properties)
super().ingest_from_file(file_descriptor, ingestion_properties)
stream_descriptor = StreamDescriptor.from_file_descriptor(file_descriptor)
@ -44,12 +52,16 @@ class KustoStreamingIngestClient(BaseIngestClient):
with stream_descriptor.stream:
return self.ingest_from_stream(stream_descriptor, ingestion_properties)
@distributed_trace(kind=SpanKind.CLIENT)
def ingest_from_stream(self, stream_descriptor: Union[StreamDescriptor, IO[AnyStr]], ingestion_properties: IngestionProperties) -> IngestionResult:
"""Ingest from io streams.
:param azure.kusto.ingest.StreamDescriptor stream_descriptor: An object that contains a description of the stream to
be ingested.
:param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
"""
stream_descriptor = StreamDescriptor.get_instance(stream_descriptor)
IngestTracingAttributes.set_ingest_descriptor_attributes(stream_descriptor, ingestion_properties)
super().ingest_from_stream(stream_descriptor, ingestion_properties)
return self._ingest_from_stream_with_client_request_id(stream_descriptor, ingestion_properties, None)

Просмотреть файл

@ -0,0 +1,26 @@
from io import BytesIO
from azure.kusto.ingest import FileDescriptor, BlobDescriptor, StreamDescriptor
from azure.kusto.ingest.ingestion_properties import IngestionProperties
def test_get_tracing_attributes():
ingestion_properties = IngestionProperties("database_test", "table_test")
assert {"database": "database_test", "table": "table_test"} == ingestion_properties.get_tracing_attributes()
dummy_stream = BytesIO(b"dummy")
stream = StreamDescriptor(dummy_stream)
dummy_path = "dummy"
blob = BlobDescriptor(dummy_path)
file = FileDescriptor(dummy_path)
descriptors = [stream, blob, file]
keynames = [{"stream_name", "source_id"}, {"blob_uri", "source_id"}, {"file_path", "source_id"}]
for i in range(len(descriptors)):
attributes = descriptors[i].get_tracing_attributes()
assert isinstance(attributes, dict)
for key, val in attributes.items():
assert key in keynames[i]
assert isinstance(val, str)
for key in keynames[i]:
assert key in attributes.keys()

Просмотреть файл

@ -5,5 +5,4 @@ pandas>=0.24.0
black;python_version >= '3.6'
aioresponses>=0.6.2
pytest-asyncio>=0.12.0
azure-core>=1.11.0
asgiref>=3.2.3

Просмотреть файл

@ -1,5 +1,8 @@
azure-core-tracing-opentelemetry~=1.0.0b9
azure-kusto-data>=4.0.0,<5.0.0
azure-kusto-ingest>=4.0.0,<5.0.0
azure-monitor-opentelemetry-exporter~=1.0.0b7
inflection~=0.5.1
opentelemetry-sdk~=1.12.0
tqdm~=4.64.0

Просмотреть файл

@ -2,6 +2,7 @@ import dataclasses
import enum
import json
import uuid
import os
from dataclasses import dataclass
from typing import List
@ -11,6 +12,48 @@ from azure.kusto.data import DataFormat, KustoClient
from azure.kusto.ingest import QueuedIngestClient
from utils import AuthenticationModeOptions, Utils
# Declare OpenTelemetry as enabled tracing plugin for Azure SDKs
from azure.core.settings import settings
from azure.core.tracing import SpanKind
from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing.ext.opentelemetry_span import OpenTelemetrySpan
# See https://github.com/open-telemetry/opentelemetry-python for details on regular open telemetry usage
from opentelemetry import trace
from opentelemetry.trace import Tracer
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
# alternative azure monitor exporter for distributed tracing
from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter
def enable_distributed_tracing() -> "Tracer":
"""
Sets up distributed tracing environment for sample app
Returns a 'Tracer'
"""
settings.tracing_implementation = OpenTelemetrySpan
# In the below example, we use a simple console exporter, but you can use anything OpenTelemetry supports. To use
# Azure Monitor Exporter uncomment the lines below and add the 'uuid of the instrumentation key (see your Azure
# Monitor account)'.
exporter = ConsoleSpanExporter()
# exporter = AzureMonitorTraceExporter.from_connection_string(conn_str=os.environ["APPLICATIONINSIGHTS_CONNECTION_STRING"])
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
span_processor = SimpleSpanProcessor(exporter)
tr: TracerProvider = trace.get_tracer_provider()
tr.add_span_processor(span_processor)
return tracer
class SourceType(enum.Enum):
"""
@ -111,6 +154,7 @@ class KustoSampleApp:
Utils.error_handler(f"Couldn't read load config file from file '{config_file_name}'", ex)
@classmethod
@distributed_trace(kind=SpanKind.CLIENT)
def pre_ingestion_querying(cls, config: ConfigJson, kusto_client: KustoClient) -> None:
"""
First phase, pre ingestion - will reach the provided DB with several control commands and a query based on the configuration File.
@ -159,6 +203,7 @@ class KustoSampleApp:
Utils.Queries.execute_command(kusto_client, database_name, command)
@classmethod
@distributed_trace(name_of_span="KustoSampleApp.query", kind=SpanKind.CLIENT) # We can give similar spans the same name
def query_existing_number_of_rows(cls, kusto_client: KustoClient, database_name: str, table_name: str) -> None:
"""
Queries the data on the existing number of rows.
@ -170,6 +215,7 @@ class KustoSampleApp:
Utils.Queries.execute_command(kusto_client, database_name, command)
@classmethod
@distributed_trace(name_of_span="KustoSampleApp.query", kind=SpanKind.CLIENT) # We can give similar spans the same name
def query_first_two_rows(cls, kusto_client: KustoClient, database_name: str, table_name: str) -> None:
"""
Queries the first two rows of the table.
@ -270,6 +316,7 @@ class KustoSampleApp:
Utils.Queries.execute_command(kusto_client, database_name, mapping_command)
@classmethod
@distributed_trace(kind=SpanKind.CLIENT)
def ingest_data(
cls,
data_file: ConfigData,
@ -309,6 +356,7 @@ class KustoSampleApp:
Utils.error_handler(f"Unknown source '{source_type}' for file '{source_uri}'")
@classmethod
@distributed_trace(kind=SpanKind.CLIENT)
def post_ingestion_querying(cls, kusto_client: KustoClient, database_name: str, table_name: str, config_ingest_data: bool) -> None:
"""
Third and final phase - simple queries to validate the script ran successfully.
@ -342,6 +390,7 @@ class KustoSampleApp:
input("Press ENTER to proceed with this operation...")
@distributed_trace(name_of_span="KustoSampleApp.main", kind=SpanKind.CLIENT)
def main():
print("Kusto sample app is starting...")
@ -374,4 +423,10 @@ def main():
if __name__ == "__main__":
# Uncomment the lines below to enable distributed tracing
# tracer = enable_distributed_tracing()
# with tracer.start_as_current_span(name="KustoSampleApp"):
# main()
main()