Adding resource manager (#36)
Adding resource manager. Calling .get ingestion resources instead of previous two commands.
This commit is contained in:
Родитель
a6e416f3f0
Коммит
b207a1d706
|
@ -0,0 +1,92 @@
|
|||
"""This module is serve as a cache to all resources needed by the kusto ingest client."""
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from .connection_string import _ConnectionString
|
||||
|
||||
class _IngestClientResources:
|
||||
def __init__(self,
|
||||
secured_ready_for_aggregation_queues = None,
|
||||
failed_ingestions_queues = None,
|
||||
successful_ingestions_queues = None,
|
||||
containers = None,
|
||||
status_tables = None
|
||||
):
|
||||
self.secured_ready_for_aggregation_queues = secured_ready_for_aggregation_queues
|
||||
self.failed_ingestions_queues = failed_ingestions_queues
|
||||
self.successful_ingestions_queues = successful_ingestions_queues
|
||||
self.containers = containers
|
||||
self.status_tables = status_tables
|
||||
|
||||
def is_applicable(self):
|
||||
resources = [self.secured_ready_for_aggregation_queues, self.failed_ingestions_queues, self.failed_ingestions_queues, self.containers, self.status_tables]
|
||||
return all(resources)
|
||||
|
||||
class _ResourceManager:
|
||||
def __init__(self,
|
||||
kusto_client):
|
||||
self._kusto_client = kusto_client
|
||||
self._refresh_period = timedelta(hours=1)
|
||||
|
||||
self._ingest_client_resources = None
|
||||
self._ingest_client_resources_last_update = None
|
||||
|
||||
self._authorization_context = None
|
||||
self._authorization_context_last_update = None
|
||||
|
||||
def _refresh_ingest_client_resources(self):
|
||||
if not self._ingest_client_resources \
|
||||
or (self._ingest_client_resources_last_update + self._refresh_period) <= datetime.utcnow() \
|
||||
or not self._ingest_client_resources.is_applicable():
|
||||
self._ingest_client_resources = self._get_ingest_client_resources_from_service()
|
||||
self._ingest_client_resources_last_update = datetime.utcnow()
|
||||
|
||||
def _get_resource_by_name(self, df, resource_name):
|
||||
resource = df[df['ResourceTypeName'] == resource_name].StorageRoot.map(_ConnectionString.parse).tolist()
|
||||
return resource
|
||||
|
||||
def _get_ingest_client_resources_from_service(self):
|
||||
df = self._kusto_client.execute("NetDefaultDB", ".get ingestion resources").to_dataframe()
|
||||
|
||||
secured_ready_for_aggregation_queues = self._get_resource_by_name(df, 'SecuredReadyForAggregationQueue')
|
||||
failed_ingestions_queues = self._get_resource_by_name(df, 'FailedIngestionsQueue')
|
||||
successful_ingestions_queues = self._get_resource_by_name(df, 'SuccessfulIngestionsQueue')
|
||||
containers = self._get_resource_by_name(df, 'TempStorage')
|
||||
status_tables = self._get_resource_by_name(df, 'IngestionsStatusTable')
|
||||
|
||||
return _IngestClientResources(secured_ready_for_aggregation_queues, failed_ingestions_queues, successful_ingestions_queues, containers, status_tables)
|
||||
|
||||
def _refresh_authorization_context(self):
|
||||
if not self._authorization_context \
|
||||
or self._authorization_context.isspace() \
|
||||
or (self._authorization_context_last_update + self._refresh_period) <= datetime.utcnow():
|
||||
self._authorization_context = self._get_authorization_context_from_service()
|
||||
self._authorization_context_last_update = datetime.utcnow()
|
||||
|
||||
def _get_authorization_context_from_service(self):
|
||||
df = self._kusto_client.execute("NetDefaultDB", ".get kusto identity token").to_dataframe()
|
||||
return df['AuthorizationContext'].values[0]
|
||||
|
||||
def get_ingestion_queues(self):
|
||||
self._refresh_ingest_client_resources()
|
||||
return self._ingest_client_resources.secured_ready_for_aggregation_queues
|
||||
|
||||
def get_failed_ingestions_queues(self):
|
||||
self._refresh_ingest_client_resources()
|
||||
return self._ingest_client_resources.failed_ingestions_queues
|
||||
|
||||
def get_successful_ingestions_queues(self):
|
||||
self._refresh_ingest_client_resources()
|
||||
return self._ingest_client_resources.successful_ingestions_queues
|
||||
|
||||
def get_containers(self):
|
||||
self._refresh_ingest_client_resources()
|
||||
return self._ingest_client_resources.containers
|
||||
|
||||
def get_ingestions_status_tables(self):
|
||||
self._refresh_ingest_client_resources()
|
||||
return self._ingest_client_resources.status_tables
|
||||
|
||||
def get_authorization_context(self):
|
||||
self._refresh_authorization_context()
|
||||
return self._authorization_context
|
|
@ -1,5 +1,4 @@
|
|||
""" A class to parse uris recieved from the DM.
|
||||
"""
|
||||
"""A class to parse uris recieved from the DM."""
|
||||
|
||||
import re
|
||||
|
||||
|
@ -13,5 +12,5 @@ class _ConnectionString:
|
|||
@staticmethod
|
||||
def parse(uri):
|
||||
""" Parses uri into a _ConnectionString object """
|
||||
match = re.search("https://(\\w+).(queue|blob).core.windows.net/([\\w,-]+)\\?(.*)", uri)
|
||||
match = re.search("https://(\\w+).(queue|blob|table).core.windows.net/([\\w,-]+)\\?(.*)", uri)
|
||||
return _ConnectionString(match.group(1), match.group(2), match.group(3), match.group(4))
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
""" A file contains all descriptors ingest command should work with.
|
||||
"""
|
||||
"""A file contains all descriptors ingest command should work with."""
|
||||
|
||||
import os
|
||||
from io import BytesIO
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
""" This module represents the object to write to azure queue that the DM is listening to.
|
||||
"""
|
||||
"""This module represents the object to write to azure queue that the DM is listening to."""
|
||||
|
||||
import json
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
|
||||
from .descriptors import BlobDescriptor
|
||||
|
||||
class _IngestionBlobInfo:
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
"""This file has all classes to define ingestion properties."""
|
||||
|
||||
from enum import Enum, IntEnum
|
||||
|
||||
from .kusto_ingest_client_exceptions import KustoDuplicateMappingError
|
||||
|
||||
class DataFormat(Enum):
|
||||
|
|
|
@ -1,18 +1,15 @@
|
|||
"""
|
||||
Kusto ingest client for Python.
|
||||
"""
|
||||
"""Kusto ingest client for Python."""
|
||||
|
||||
import base64
|
||||
import os
|
||||
import random
|
||||
import uuid
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from azure.storage.common import CloudStorageAccount
|
||||
|
||||
from azure.kusto.data import KustoClient
|
||||
from .descriptors import BlobDescriptor, FileDescriptor
|
||||
from .connection_string import _ConnectionString
|
||||
from .ingestion_blob_info import _IngestionBlobInfo
|
||||
from ._resource_manager import _ResourceManager
|
||||
|
||||
class KustoIngestClient:
|
||||
"""
|
||||
|
@ -73,18 +70,13 @@ class KustoIngestClient:
|
|||
authority : 'microsoft.com', optional
|
||||
In case your tenant is not microsoft please use this param.
|
||||
"""
|
||||
self._kusto_client = KustoClient(kusto_cluster,
|
||||
kusto_client = KustoClient(kusto_cluster,
|
||||
client_id=client_id,
|
||||
client_secret=client_secret,
|
||||
username=username,
|
||||
password=password,
|
||||
authority=authority)
|
||||
self._last_time_refreshed_containers = datetime.min
|
||||
self._temp_storage_objects = None
|
||||
self._last_time_refreshed_queues = datetime.min
|
||||
self._queues = None
|
||||
self._last_time_refreshed_token = datetime.min
|
||||
self._kusto_token = None
|
||||
self._resource_manager = _ResourceManager(kusto_client)
|
||||
|
||||
def ingest_from_multiple_files(self, files, delete_sources_on_success, ingestion_properties):
|
||||
"""
|
||||
|
@ -99,7 +91,6 @@ class KustoIngestClient:
|
|||
ingestion_properties : kusto_ingest_client.ingestion_properties.IngestionProperties
|
||||
The ingestion properties.
|
||||
"""
|
||||
self._refresh_containers_if_needed()
|
||||
blobs = list()
|
||||
file_descriptors = list()
|
||||
for file in files:
|
||||
|
@ -109,7 +100,8 @@ class KustoIngestClient:
|
|||
descriptor = FileDescriptor(file, deleteSourcesOnSuccess=delete_sources_on_success)
|
||||
file_descriptors.append(descriptor)
|
||||
blob_name = ingestion_properties.database + "__" + ingestion_properties.table + "__" + str(uuid.uuid4()) + "__" + descriptor.stream_name
|
||||
container_details = random.choice(self._temp_storage_objects)
|
||||
containers = self._resource_manager.get_containers()
|
||||
container_details = random.choice(containers)
|
||||
storage_client = CloudStorageAccount(container_details.storage_account_name,
|
||||
sas_token=container_details.sas)
|
||||
blob_service = storage_client.create_block_blob_service()
|
||||
|
@ -138,56 +130,17 @@ class KustoIngestClient:
|
|||
ingestion_properties : kusto_ingest_client.ingestion_properties.IngestionProperties
|
||||
The ingestion properties.
|
||||
"""
|
||||
self._refresh_queues_if_needed()
|
||||
self._refresh_token_if_needed()
|
||||
for blob in blobs:
|
||||
queue_details = random.choice(self._queues)
|
||||
queues = self._resource_manager.get_ingestion_queues()
|
||||
queue_details = random.choice(queues)
|
||||
storage_client = CloudStorageAccount(queue_details.storage_account_name,
|
||||
sas_token=queue_details.sas)
|
||||
queue_service = storage_client.create_queue_service()
|
||||
authorization_context = self._resource_manager.get_authorization_context()
|
||||
ingestion_blob_info = _IngestionBlobInfo(blob,
|
||||
ingestion_properties,
|
||||
delete_sources_on_success,
|
||||
self._kusto_token)
|
||||
authorization_context)
|
||||
ingestion_blob_info_json = ingestion_blob_info.to_json()
|
||||
encoded = base64.b64encode(ingestion_blob_info_json.encode('utf-8')).decode('utf-8')
|
||||
queue_service.put_message(queue_details.object_name, encoded)
|
||||
|
||||
def _refresh_containers_if_needed(self):
|
||||
if (self._last_time_refreshed_containers > datetime.utcnow() - timedelta(hours=2)
|
||||
or not self._temp_storage_objects):
|
||||
self._last_time_refreshed_containers = datetime.utcnow()
|
||||
self._temp_storage_objects = self._get_temp_storage_objects()
|
||||
|
||||
def _get_temp_storage_objects(self):
|
||||
response = self._kusto_client.execute_mgmt("NetDefaultDB", ".create tempstorage")
|
||||
storages = list()
|
||||
for row in response.iter_all():
|
||||
storages.append(_ConnectionString.parse(row["StorageRoot"]))
|
||||
return storages
|
||||
|
||||
def _refresh_queues_if_needed(self):
|
||||
if (self._last_time_refreshed_queues > datetime.utcnow() - timedelta(hours=2)
|
||||
or not self._queues):
|
||||
self._last_time_refreshed_queues = datetime.utcnow()
|
||||
self._queues = self._get_queues()
|
||||
|
||||
def _get_queues(self):
|
||||
response = self._kusto_client.execute_mgmt("NetDefaultDB",
|
||||
'.get ingestion queues "SecuredReadyForAggregationQueue" withsas')
|
||||
queues = list()
|
||||
for row in response.iter_all():
|
||||
queues.append(_ConnectionString.parse(row["Uri"]))
|
||||
return queues
|
||||
|
||||
def _refresh_token_if_needed(self):
|
||||
if (self._last_time_refreshed_token > datetime.utcnow() - timedelta(hours=2)
|
||||
or not self._kusto_token):
|
||||
self._last_time_refreshed_token = datetime.utcnow()
|
||||
self._kusto_token = self._get_kusto_token()
|
||||
|
||||
def _get_kusto_token(self):
|
||||
response = self._kusto_client.execute_mgmt("NetDefaultDB", '.get kusto identity token')
|
||||
for row in response.iter_all():
|
||||
result = row["AuthorizationContext"]
|
||||
return result
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
""" All exceptions can be raised by kusto_ingest_client """
|
||||
"""All exceptions can be raised by kusto_ingest_client"""
|
||||
|
||||
from azure.kusto.data import KustoClientError
|
||||
|
||||
|
|
|
@ -1,20 +1,20 @@
|
|||
from azure.kusto.ingest import KustoIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat
|
||||
|
||||
ingestion_properties = IngestionProperties(database="database name", table="table name", dataFormat=DataFormat.csv)
|
||||
|
||||
ingest_client = KustoIngestClient("https://ingest-<clustername>.kusto.windows.net")
|
||||
ingest_client = KustoIngestClient("https://ingest-<clustername>.kusto.windows.net", client_id="aad app id", client_secret="secret")
|
||||
|
||||
file_descriptor = FileDescriptor("E:\\filePath.csv", 3333) # 3333 is the raw size of the data in bytes.
|
||||
ingest_client.ingest_from_multiple_files([file_descriptor],
|
||||
delete_sources_on_success=True,
|
||||
ingestion_properties=ingestion_properties)
|
||||
|
||||
ingest_client.ingest_from_multiple_files(["E:\\filePath.csv"],
|
||||
delete_sources_on_success=True,
|
||||
ingestion_properties=ingestion_properties)
|
||||
|
||||
blob_descriptor = BlobDescriptor("https://path-to-blob.csv.gz?sas", 10) # 10 is the raw size of the data in bytes.
|
||||
ingest_client.ingest_from_multiple_blobs([blob_descriptor],
|
||||
delete_sources_on_success=True,
|
||||
ingestion_properties=ingestion_properties)
|
||||
from azure.kusto.ingest import KustoIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat
|
||||
|
||||
ingestion_properties = IngestionProperties(database="database name", table="table name", dataFormat=DataFormat.csv)
|
||||
|
||||
ingest_client = KustoIngestClient("https://ingest-<clustername>.kusto.windows.net")
|
||||
ingest_client = KustoIngestClient("https://ingest-<clustername>.kusto.windows.net", client_id="aad app id", client_secret="secret")
|
||||
|
||||
file_descriptor = FileDescriptor("E:\\filePath.csv", 3333) # 3333 is the raw size of the data in bytes.
|
||||
ingest_client.ingest_from_multiple_files([file_descriptor],
|
||||
delete_sources_on_success=True,
|
||||
ingestion_properties=ingestion_properties)
|
||||
|
||||
ingest_client.ingest_from_multiple_files(["E:\\filePath.csv"],
|
||||
delete_sources_on_success=True,
|
||||
ingestion_properties=ingestion_properties)
|
||||
|
||||
blob_descriptor = BlobDescriptor("https://path-to-blob.csv.gz?sas", 10) # 10 is the raw size of the data in bytes.
|
||||
ingest_client.ingest_from_multiple_blobs([blob_descriptor],
|
||||
delete_sources_on_success=True,
|
||||
ingestion_properties=ingestion_properties)
|
||||
|
|
|
@ -59,6 +59,9 @@
|
|||
<Compile Include="azure-kusto-ingest\azure\kusto\ingest\ingestion_properties.py" />
|
||||
<Compile Include="azure-kusto-ingest\azure\kusto\ingest\kusto_ingest_client.py" />
|
||||
<Compile Include="azure-kusto-ingest\azure\kusto\ingest\kusto_ingest_client_exceptions.py" />
|
||||
<Compile Include="azure-kusto-ingest\azure\kusto\ingest\resource_manager.py">
|
||||
<SubType>Code</SubType>
|
||||
</Compile>
|
||||
<Compile Include="azure-kusto-ingest\azure\kusto\ingest\version.py" />
|
||||
<Compile Include="azure-kusto-ingest\azure\kusto\ingest\__init__.py" />
|
||||
<Compile Include="azure-kusto-ingest\azure\kusto\__init__.py" />
|
||||
|
|
Загрузка…
Ссылка в новой задаче