diff --git a/azure-kusto-ingest/azure/kusto/ingest/_resource_manager.py b/azure-kusto-ingest/azure/kusto/ingest/_resource_manager.py new file mode 100644 index 0000000..dc117af --- /dev/null +++ b/azure-kusto-ingest/azure/kusto/ingest/_resource_manager.py @@ -0,0 +1,92 @@ +"""This module is serve as a cache to all resources needed by the kusto ingest client.""" + +from datetime import datetime, timedelta + +from .connection_string import _ConnectionString + +class _IngestClientResources: + def __init__(self, + secured_ready_for_aggregation_queues = None, + failed_ingestions_queues = None, + successful_ingestions_queues = None, + containers = None, + status_tables = None + ): + self.secured_ready_for_aggregation_queues = secured_ready_for_aggregation_queues + self.failed_ingestions_queues = failed_ingestions_queues + self.successful_ingestions_queues = successful_ingestions_queues + self.containers = containers + self.status_tables = status_tables + + def is_applicable(self): + resources = [self.secured_ready_for_aggregation_queues, self.failed_ingestions_queues, self.failed_ingestions_queues, self.containers, self.status_tables] + return all(resources) + +class _ResourceManager: + def __init__(self, + kusto_client): + self._kusto_client = kusto_client + self._refresh_period = timedelta(hours=1) + + self._ingest_client_resources = None + self._ingest_client_resources_last_update = None + + self._authorization_context = None + self._authorization_context_last_update = None + + def _refresh_ingest_client_resources(self): + if not self._ingest_client_resources \ + or (self._ingest_client_resources_last_update + self._refresh_period) <= datetime.utcnow() \ + or not self._ingest_client_resources.is_applicable(): + self._ingest_client_resources = self._get_ingest_client_resources_from_service() + self._ingest_client_resources_last_update = datetime.utcnow() + + def _get_resource_by_name(self, df, resource_name): + resource = df[df['ResourceTypeName'] == resource_name].StorageRoot.map(_ConnectionString.parse).tolist() + return resource + + def _get_ingest_client_resources_from_service(self): + df = self._kusto_client.execute("NetDefaultDB", ".get ingestion resources").to_dataframe() + + secured_ready_for_aggregation_queues = self._get_resource_by_name(df, 'SecuredReadyForAggregationQueue') + failed_ingestions_queues = self._get_resource_by_name(df, 'FailedIngestionsQueue') + successful_ingestions_queues = self._get_resource_by_name(df, 'SuccessfulIngestionsQueue') + containers = self._get_resource_by_name(df, 'TempStorage') + status_tables = self._get_resource_by_name(df, 'IngestionsStatusTable') + + return _IngestClientResources(secured_ready_for_aggregation_queues, failed_ingestions_queues, successful_ingestions_queues, containers, status_tables) + + def _refresh_authorization_context(self): + if not self._authorization_context \ + or self._authorization_context.isspace() \ + or (self._authorization_context_last_update + self._refresh_period) <= datetime.utcnow(): + self._authorization_context = self._get_authorization_context_from_service() + self._authorization_context_last_update = datetime.utcnow() + + def _get_authorization_context_from_service(self): + df = self._kusto_client.execute("NetDefaultDB", ".get kusto identity token").to_dataframe() + return df['AuthorizationContext'].values[0] + + def get_ingestion_queues(self): + self._refresh_ingest_client_resources() + return self._ingest_client_resources.secured_ready_for_aggregation_queues + + def get_failed_ingestions_queues(self): + self._refresh_ingest_client_resources() + return self._ingest_client_resources.failed_ingestions_queues + + def get_successful_ingestions_queues(self): + self._refresh_ingest_client_resources() + return self._ingest_client_resources.successful_ingestions_queues + + def get_containers(self): + self._refresh_ingest_client_resources() + return self._ingest_client_resources.containers + + def get_ingestions_status_tables(self): + self._refresh_ingest_client_resources() + return self._ingest_client_resources.status_tables + + def get_authorization_context(self): + self._refresh_authorization_context() + return self._authorization_context diff --git a/azure-kusto-ingest/azure/kusto/ingest/connection_string.py b/azure-kusto-ingest/azure/kusto/ingest/connection_string.py index f35ef5c..cc44348 100644 --- a/azure-kusto-ingest/azure/kusto/ingest/connection_string.py +++ b/azure-kusto-ingest/azure/kusto/ingest/connection_string.py @@ -1,5 +1,4 @@ -""" A class to parse uris recieved from the DM. -""" +"""A class to parse uris recieved from the DM.""" import re @@ -13,5 +12,5 @@ class _ConnectionString: @staticmethod def parse(uri): """ Parses uri into a _ConnectionString object """ - match = re.search("https://(\\w+).(queue|blob).core.windows.net/([\\w,-]+)\\?(.*)", uri) + match = re.search("https://(\\w+).(queue|blob|table).core.windows.net/([\\w,-]+)\\?(.*)", uri) return _ConnectionString(match.group(1), match.group(2), match.group(3), match.group(4)) diff --git a/azure-kusto-ingest/azure/kusto/ingest/descriptors.py b/azure-kusto-ingest/azure/kusto/ingest/descriptors.py index ee94361..7b10fca 100644 --- a/azure-kusto-ingest/azure/kusto/ingest/descriptors.py +++ b/azure-kusto-ingest/azure/kusto/ingest/descriptors.py @@ -1,5 +1,4 @@ -""" A file contains all descriptors ingest command should work with. -""" +"""A file contains all descriptors ingest command should work with.""" import os from io import BytesIO diff --git a/azure-kusto-ingest/azure/kusto/ingest/ingestion_blob_info.py b/azure-kusto-ingest/azure/kusto/ingest/ingestion_blob_info.py index a5bd28c..fc178b7 100644 --- a/azure-kusto-ingest/azure/kusto/ingest/ingestion_blob_info.py +++ b/azure-kusto-ingest/azure/kusto/ingest/ingestion_blob_info.py @@ -1,9 +1,9 @@ -""" This module represents the object to write to azure queue that the DM is listening to. -""" +"""This module represents the object to write to azure queue that the DM is listening to.""" import json import uuid from datetime import datetime + from .descriptors import BlobDescriptor class _IngestionBlobInfo: diff --git a/azure-kusto-ingest/azure/kusto/ingest/ingestion_properties.py b/azure-kusto-ingest/azure/kusto/ingest/ingestion_properties.py index 79d9c04..560982a 100644 --- a/azure-kusto-ingest/azure/kusto/ingest/ingestion_properties.py +++ b/azure-kusto-ingest/azure/kusto/ingest/ingestion_properties.py @@ -1,6 +1,7 @@ """This file has all classes to define ingestion properties.""" from enum import Enum, IntEnum + from .kusto_ingest_client_exceptions import KustoDuplicateMappingError class DataFormat(Enum): diff --git a/azure-kusto-ingest/azure/kusto/ingest/kusto_ingest_client.py b/azure-kusto-ingest/azure/kusto/ingest/kusto_ingest_client.py index 3c2d9e5..78eb28d 100644 --- a/azure-kusto-ingest/azure/kusto/ingest/kusto_ingest_client.py +++ b/azure-kusto-ingest/azure/kusto/ingest/kusto_ingest_client.py @@ -1,18 +1,15 @@ -""" - Kusto ingest client for Python. -""" +"""Kusto ingest client for Python.""" import base64 -import os import random import uuid -from datetime import datetime, timedelta from azure.storage.common import CloudStorageAccount + from azure.kusto.data import KustoClient from .descriptors import BlobDescriptor, FileDescriptor -from .connection_string import _ConnectionString from .ingestion_blob_info import _IngestionBlobInfo +from ._resource_manager import _ResourceManager class KustoIngestClient: """ @@ -73,18 +70,13 @@ class KustoIngestClient: authority : 'microsoft.com', optional In case your tenant is not microsoft please use this param. """ - self._kusto_client = KustoClient(kusto_cluster, + kusto_client = KustoClient(kusto_cluster, client_id=client_id, client_secret=client_secret, username=username, password=password, authority=authority) - self._last_time_refreshed_containers = datetime.min - self._temp_storage_objects = None - self._last_time_refreshed_queues = datetime.min - self._queues = None - self._last_time_refreshed_token = datetime.min - self._kusto_token = None + self._resource_manager = _ResourceManager(kusto_client) def ingest_from_multiple_files(self, files, delete_sources_on_success, ingestion_properties): """ @@ -99,7 +91,6 @@ class KustoIngestClient: ingestion_properties : kusto_ingest_client.ingestion_properties.IngestionProperties The ingestion properties. """ - self._refresh_containers_if_needed() blobs = list() file_descriptors = list() for file in files: @@ -109,7 +100,8 @@ class KustoIngestClient: descriptor = FileDescriptor(file, deleteSourcesOnSuccess=delete_sources_on_success) file_descriptors.append(descriptor) blob_name = ingestion_properties.database + "__" + ingestion_properties.table + "__" + str(uuid.uuid4()) + "__" + descriptor.stream_name - container_details = random.choice(self._temp_storage_objects) + containers = self._resource_manager.get_containers() + container_details = random.choice(containers) storage_client = CloudStorageAccount(container_details.storage_account_name, sas_token=container_details.sas) blob_service = storage_client.create_block_blob_service() @@ -138,56 +130,17 @@ class KustoIngestClient: ingestion_properties : kusto_ingest_client.ingestion_properties.IngestionProperties The ingestion properties. """ - self._refresh_queues_if_needed() - self._refresh_token_if_needed() for blob in blobs: - queue_details = random.choice(self._queues) + queues = self._resource_manager.get_ingestion_queues() + queue_details = random.choice(queues) storage_client = CloudStorageAccount(queue_details.storage_account_name, sas_token=queue_details.sas) queue_service = storage_client.create_queue_service() + authorization_context = self._resource_manager.get_authorization_context() ingestion_blob_info = _IngestionBlobInfo(blob, ingestion_properties, delete_sources_on_success, - self._kusto_token) + authorization_context) ingestion_blob_info_json = ingestion_blob_info.to_json() encoded = base64.b64encode(ingestion_blob_info_json.encode('utf-8')).decode('utf-8') queue_service.put_message(queue_details.object_name, encoded) - - def _refresh_containers_if_needed(self): - if (self._last_time_refreshed_containers > datetime.utcnow() - timedelta(hours=2) - or not self._temp_storage_objects): - self._last_time_refreshed_containers = datetime.utcnow() - self._temp_storage_objects = self._get_temp_storage_objects() - - def _get_temp_storage_objects(self): - response = self._kusto_client.execute_mgmt("NetDefaultDB", ".create tempstorage") - storages = list() - for row in response.iter_all(): - storages.append(_ConnectionString.parse(row["StorageRoot"])) - return storages - - def _refresh_queues_if_needed(self): - if (self._last_time_refreshed_queues > datetime.utcnow() - timedelta(hours=2) - or not self._queues): - self._last_time_refreshed_queues = datetime.utcnow() - self._queues = self._get_queues() - - def _get_queues(self): - response = self._kusto_client.execute_mgmt("NetDefaultDB", - '.get ingestion queues "SecuredReadyForAggregationQueue" withsas') - queues = list() - for row in response.iter_all(): - queues.append(_ConnectionString.parse(row["Uri"])) - return queues - - def _refresh_token_if_needed(self): - if (self._last_time_refreshed_token > datetime.utcnow() - timedelta(hours=2) - or not self._kusto_token): - self._last_time_refreshed_token = datetime.utcnow() - self._kusto_token = self._get_kusto_token() - - def _get_kusto_token(self): - response = self._kusto_client.execute_mgmt("NetDefaultDB", '.get kusto identity token') - for row in response.iter_all(): - result = row["AuthorizationContext"] - return result diff --git a/azure-kusto-ingest/azure/kusto/ingest/kusto_ingest_client_exceptions.py b/azure-kusto-ingest/azure/kusto/ingest/kusto_ingest_client_exceptions.py index 9d441ad..283be49 100644 --- a/azure-kusto-ingest/azure/kusto/ingest/kusto_ingest_client_exceptions.py +++ b/azure-kusto-ingest/azure/kusto/ingest/kusto_ingest_client_exceptions.py @@ -1,4 +1,4 @@ -""" All exceptions can be raised by kusto_ingest_client """ +"""All exceptions can be raised by kusto_ingest_client""" from azure.kusto.data import KustoClientError diff --git a/azure-kusto-ingest/tests/sample.py b/azure-kusto-ingest/tests/sample.py index 43e856b..f47b754 100644 --- a/azure-kusto-ingest/tests/sample.py +++ b/azure-kusto-ingest/tests/sample.py @@ -1,20 +1,20 @@ -from azure.kusto.ingest import KustoIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat - -ingestion_properties = IngestionProperties(database="database name", table="table name", dataFormat=DataFormat.csv) - -ingest_client = KustoIngestClient("https://ingest-.kusto.windows.net") -ingest_client = KustoIngestClient("https://ingest-.kusto.windows.net", client_id="aad app id", client_secret="secret") - -file_descriptor = FileDescriptor("E:\\filePath.csv", 3333) # 3333 is the raw size of the data in bytes. -ingest_client.ingest_from_multiple_files([file_descriptor], - delete_sources_on_success=True, - ingestion_properties=ingestion_properties) - -ingest_client.ingest_from_multiple_files(["E:\\filePath.csv"], - delete_sources_on_success=True, - ingestion_properties=ingestion_properties) - -blob_descriptor = BlobDescriptor("https://path-to-blob.csv.gz?sas", 10) # 10 is the raw size of the data in bytes. -ingest_client.ingest_from_multiple_blobs([blob_descriptor], - delete_sources_on_success=True, - ingestion_properties=ingestion_properties) +from azure.kusto.ingest import KustoIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat + +ingestion_properties = IngestionProperties(database="database name", table="table name", dataFormat=DataFormat.csv) + +ingest_client = KustoIngestClient("https://ingest-.kusto.windows.net") +ingest_client = KustoIngestClient("https://ingest-.kusto.windows.net", client_id="aad app id", client_secret="secret") + +file_descriptor = FileDescriptor("E:\\filePath.csv", 3333) # 3333 is the raw size of the data in bytes. +ingest_client.ingest_from_multiple_files([file_descriptor], + delete_sources_on_success=True, + ingestion_properties=ingestion_properties) + +ingest_client.ingest_from_multiple_files(["E:\\filePath.csv"], + delete_sources_on_success=True, + ingestion_properties=ingestion_properties) + +blob_descriptor = BlobDescriptor("https://path-to-blob.csv.gz?sas", 10) # 10 is the raw size of the data in bytes. +ingest_client.ingest_from_multiple_blobs([blob_descriptor], + delete_sources_on_success=True, + ingestion_properties=ingestion_properties) diff --git a/azure-kusto-python.pyproj b/azure-kusto-python.pyproj index dd1d605..2e134a7 100644 --- a/azure-kusto-python.pyproj +++ b/azure-kusto-python.pyproj @@ -59,6 +59,9 @@ + + Code +