Revert "New option for the auto delete policy time - apart from the timeout with default of 7 days Make sure we use the same client request properties in all the flow Small bug fix - not allowed url to contain "ingest-""

This reverts commit dcdcb66d
This commit is contained in:
KustoIbizaExtension Build 2021-06-08 17:11:36 +03:00
Родитель dcdcb66dfd
Коммит 9bccb17acf
11 изменённых файлов: 77 добавлений и 117 удалений

Просмотреть файл

@ -44,9 +44,8 @@ trait KustoOptions {
// required permissions
val KUSTO_DATABASE: String = newOption("kustoDatabase")
// An integer number corresponding to the period in seconds after which the operation will timeout.
// For write operations this limit starts ticking only after the data was processed by the connector and it starts
// polling on the ingestion results. Default 2 days
// An integer number corresponding to the period in seconds after which the operation will timeout. Default: '5400'
// (90 minutes)
val KUSTO_TIMEOUT_LIMIT: String = newOption("timeoutLimit")
// An id of the source used for tracing of the write operation

Просмотреть файл

@ -33,9 +33,6 @@ object KustoSinkOptions extends KustoOptions{
// partition. Kusto's ingestion also aggregates data, default suggested by Kusto is 1GB but here we suggest to cut
// it at 100MB to adjust it to spark pulling of data.
val KUSTO_CLIENT_BATCHING_LIMIT: String = newOption("clientBatchingLimit")
// Time after which staging resources are cleaned if they weren't cleaned at the end of the run
val KUSTO_STAGING_RESOURCES_CLEANUP: String = newOption("stagingResourcesCleanup")
}
object SinkTableCreationMode extends Enumeration {
@ -46,9 +43,7 @@ object SinkTableCreationMode extends Enumeration {
case class WriteOptions(tableCreateOptions: SinkTableCreationMode.SinkTableCreationMode = SinkTableCreationMode.FailIfNotExist,
isAsync: Boolean = false,
writeResultLimit: String = KustoSinkOptions.NONE_RESULT_LIMIT,
timeZone: String = "UTC",
timeout: FiniteDuration,
timeZone: String = "UTC", timeout: FiniteDuration,
IngestionProperties: Option[String] = None,
batchLimit: Int = 100,
requestId: String = UUID.randomUUID().toString,
autoCleanupTime: FiniteDuration)
requestId: String = UUID.randomUUID().toString)

Просмотреть файл

@ -8,7 +8,6 @@ import java.util
import java.util.zip.GZIPOutputStream
import java.util.{TimeZone, UUID}
import com.microsoft.azure.kusto.data.ClientRequestProperties
import com.microsoft.azure.kusto.ingest.IngestionProperties.DATA_FORMAT
import com.microsoft.azure.kusto.ingest.result.IngestionResult
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo
@ -18,6 +17,7 @@ import com.microsoft.kusto.spark.authentication.KustoAuthentication
import com.microsoft.kusto.spark.common.KustoCoordinates
import com.microsoft.kusto.spark.utils.CslCommandsGenerator.generateTableGetSchemaAsRowsCommand
import com.microsoft.kusto.spark.utils.{KustoClient, KustoClientCache, KustoQueryUtils, KustoConstants => KCONST, KustoDataSourceUtils => KDSU}
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.TaskContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.InternalRow
@ -49,55 +49,49 @@ object KustoWriter {
tableCoordinates.clusterUrl, tableCoordinates.database)
}
val crp = new ClientRequestProperties
crp.setClientRequestId(writeOptions.requestId)
val table = tableCoordinates.table.get
val tmpTableName: String = KustoQueryUtils.simplifyName(TempIngestionTablePrefix +
data.sparkSession.sparkContext.appName +
"_" + table + batchId.map(b => s"_${b.toString}").getOrElse("") + "_" + writeOptions.requestId)
"_" + table + batchId.map(b => s"_${b.toString}").getOrElse("") + "_" + writeOptions.requestId + UUID.randomUUID()
.toString)
implicit val parameters: KustoWriteResource = KustoWriteResource(authentication, tableCoordinates, data.schema,
writeOptions, tmpTableName)
implicit val parameters: KustoWriteResource = KustoWriteResource(authentication, tableCoordinates, data.schema, writeOptions, tmpTableName)
val stagingTableIngestionProperties = getIngestionProperties(writeOptions, parameters)
val ingestIfNotExistsTags = stagingTableIngestionProperties.getIngestIfNotExists
val schemaShowCommandResult = kustoClient.engineClient.execute(tableCoordinates.database,
generateTableGetSchemaAsRowsCommand(tableCoordinates.table.get), crp).getPrimaryResults
val schemaShowCommandResult = kustoClient.engineClient.execute(tableCoordinates.database, generateTableGetSchemaAsRowsCommand(tableCoordinates.table.get)).getPrimaryResults
// Remove the ingestIfNotExists tags because several partitions can ingest into the staging table and should not interfere one another
stagingTableIngestionProperties.setIngestIfNotExists(new util.ArrayList())
val shouldIngest = kustoClient.shouldIngestData(tableCoordinates, writeOptions.IngestionProperties, tableExists =
schemaShowCommandResult.count() > 0, crp)
val shouldIngest = kustoClient.shouldIngestData(tableCoordinates, writeOptions.IngestionProperties, tableExists = schemaShowCommandResult.count() > 0)
if (!shouldIngest) {
KDSU.logInfo(myName, s"Ingestion skipped: Provided ingest-by tags are present in the destination table '$table'")
} else {
// KustoWriter will create a temporary table ingesting the data to it.
// Only if all executors succeeded the table will be appended to the original destination table.
kustoClient.initializeTablesBySchema(tableCoordinates, tmpTableName, data
.schema, schemaShowCommandResult, writeOptions, crp)
kustoClient.initializeTablesBySchema(tableCoordinates, tmpTableName, writeOptions.tableCreateOptions, data
.schema, schemaShowCommandResult, writeOptions.timeout)
KDSU.logInfo(myName, s"Successfully created temporary table $tmpTableName, will be deleted after completing the operation")
kustoClient.setMappingOnStagingTableIfNeeded(stagingTableIngestionProperties, table, crp)
if (stagingTableIngestionProperties.getFlushImmediately) {
kustoClient.setMappingOnStagingTableIfNeeded(stagingTableIngestionProperties, table)
if (stagingTableIngestionProperties.getFlushImmediately){
KDSU.logWarn(myName, "Its not recommended to set flushImmediately to true")
}
val rdd = data.queryExecution.toRdd
val partitionsResults = rdd.sparkContext.collectionAccumulator[PartitionResult]
if (writeOptions.isAsync) {
val asyncWork = rdd.foreachPartitionAsync { rows => ingestRowsIntoTempTbl(rows, batchIdIfExists, partitionsResults) }
KDSU.logInfo(myName, s"asynchronous write to Kusto table '$table' in progress")
// This part runs back on the driver
asyncWork.onSuccess {
case _ => kustoClient.finalizeIngestionWhenWorkersSucceeded(
tableCoordinates, batchIdIfExists, kustoClient.engineClient, tmpTableName, partitionsResults,
writeOptions, ingestIfNotExistsTags, crp)
tableCoordinates, batchIdIfExists, kustoClient.engineClient, tmpTableName, partitionsResults, writeOptions.timeout, writeOptions.requestId, ingestIfNotExistsTags, isAsync = true)
}
asyncWork.onFailure {
case exception: Exception =>
kustoClient.cleanupIngestionByproducts(
tableCoordinates.database, kustoClient.engineClient, tmpTableName, crp)
kustoClient.cleanupIngestionByproducts(tableCoordinates.database, kustoClient.engineClient, tmpTableName)
KDSU.reportExceptionAndThrow(myName, exception, "writing data", tableCoordinates.clusterUrl, tableCoordinates.database, table, shouldNotThrow = true)
KDSU.logError(myName, "The exception is not visible in the driver since we're in async mode")
}
@ -106,13 +100,11 @@ object KustoWriter {
rdd.foreachPartition { rows => ingestRowsIntoTempTbl(rows, batchIdIfExists, partitionsResults) }
catch {
case exception: Exception =>
kustoClient.cleanupIngestionByproducts(
tableCoordinates.database, kustoClient.engineClient, tmpTableName, crp)
kustoClient.cleanupIngestionByproducts(tableCoordinates.database, kustoClient.engineClient, tmpTableName)
throw exception
}
kustoClient.finalizeIngestionWhenWorkersSucceeded(
tableCoordinates, batchIdIfExists, kustoClient.engineClient, tmpTableName, partitionsResults, writeOptions,
ingestIfNotExistsTags, crp)
tableCoordinates, batchIdIfExists, kustoClient.engineClient, tmpTableName, partitionsResults, writeOptions.timeout, writeOptions.requestId, ingestIfNotExistsTags)
}
}
}
@ -207,7 +199,7 @@ object KustoWriter {
val partitionId = TaskContext.getPartitionId
Future {
var props = ingestionProperties
if (!ingestionProperties.getFlushImmediately && flushImmediately) {
if(!ingestionProperties.getFlushImmediately && flushImmediately){
// Need to copy the ingestionProperties so that only this one will be flushed immediately
props = SparkIngestionProperties.cloneIngestionProperties(ingestionProperties)
props.setFlushImmediately(true)
@ -235,7 +227,7 @@ object KustoWriter {
RowCSVWriterUtils.writeRowAsCSV(row, schema, timeZone, blobWriter.csvWriter)
val count = blobWriter.csvWriter.getCounter
val shouldNotCommitBlockBlob = count < maxBlobSize
val shouldNotCommitBlockBlob = count < maxBlobSize
if (shouldNotCommitBlockBlob) {
blobWriter
} else {

Просмотреть файл

@ -113,8 +113,7 @@ private[kusto] object KustoReader {
// Check whether the result is empty, causing an IO exception on reading empty parquet file
// We don't mind generating the filtered query again - it only happens upon exception
val filteredQuery = KustoFilter.pruneAndFilter(request.schema, request.query, filtering)
val count = KDSU.countRows(kustoClient, filteredQuery, request.kustoCoordinates.database, request
.clientRequestProperties.orNull)
val count = KDSU.countRows(kustoClient, filteredQuery, request.kustoCoordinates.database)
if (count == 0) {
request.sparkSession.emptyDataFrame.rdd

Просмотреть файл

@ -61,8 +61,7 @@ private[kusto] case class KustoRelation(kustoCoordinates: KustoCoordinates,
if (readOptions.readMode.isEmpty){
var count = 0
try {
count = KDSU.
estimateRowsCount(kustoClient, query, kustoCoordinates.database, clientRequestProperties.orNull)
count = KDSU.estimateRowsCount(kustoClient, query, kustoCoordinates.database)
}catch {
// Assume count is high if estimation got timed out
case e: Exception =>
@ -162,8 +161,6 @@ private[kusto] case class KustoRelation(kustoCoordinates: KustoCoordinates,
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
KustoWriter.write(Some(0), data, kustoCoordinates, authentication, writeOptions =
WriteOptions.apply(timeout = new FiniteDuration(KustoConstants.DefaultWaitingIntervalLongRunning.toInt,
TimeUnit.SECONDS), autoCleanupTime = new FiniteDuration(KustoConstants.DefaultCleaningInterval.toInt,
TimeUnit.SECONDS)))
WriteOptions.apply(timeout = new FiniteDuration(KustoConstants.DefaultWaitingIntervalLongRunning.toLong, TimeUnit.MINUTES)))
}
}

Просмотреть файл

@ -6,13 +6,14 @@ import java.util.StringJoiner
import java.util.concurrent.TimeUnit
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder
import com.microsoft.azure.kusto.data.{Client, ClientFactory, ClientRequestProperties, KustoResultSetTable}
import com.microsoft.azure.kusto.data.{Client, ClientFactory, KustoResultSetTable}
import com.microsoft.azure.kusto.ingest.result.{IngestionStatus, OperationStatus}
import com.microsoft.azure.kusto.ingest.{IngestClient, IngestClientFactory, IngestionProperties}
import com.microsoft.azure.storage.StorageException
import com.microsoft.kusto.spark.common.KustoCoordinates
import com.microsoft.kusto.spark.datasink.KustoWriter.DelayPeriodBetweenCalls
import com.microsoft.kusto.spark.datasink.{PartitionResult, SinkTableCreationMode, SparkIngestionProperties, WriteOptions}
import com.microsoft.kusto.spark.datasink.KustoWriter.{DelayPeriodBetweenCalls, LegacyTempIngestionTablePrefix, TempIngestionTablePrefix}
import com.microsoft.kusto.spark.datasink.SinkTableCreationMode.SinkTableCreationMode
import com.microsoft.kusto.spark.datasink.{PartitionResult, SinkTableCreationMode, SparkIngestionProperties}
import com.microsoft.kusto.spark.datasource.KustoStorageParameters
import com.microsoft.kusto.spark.utils.CslCommandsGenerator._
import com.microsoft.kusto.spark.utils.KustoDataSourceUtils.extractSchemaFromResultTable
@ -26,6 +27,7 @@ import shaded.parquet.org.codehaus.jackson.map.ObjectMapper
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Await, Future}
class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuilder, val ingestKcsb: ConnectionStringBuilder) {
@ -41,13 +43,12 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil
private val myName = this.getClass.getSimpleName
private val durationFormat = "dd:HH:mm:ss"
def initializeTablesBySchema(tableCoordinates: KustoCoordinates,
tmpTableName: String,
tableCreation: SinkTableCreationMode = SinkTableCreationMode.FailIfNotExist,
schema: StructType,
schemaShowCommandResult: KustoResultSetTable,
writeOptions: WriteOptions,
crp: ClientRequestProperties): Unit = {
writeTimeout: FiniteDuration): Unit = {
var tmpTableSchema: String = ""
val database = tableCoordinates.database
@ -55,7 +56,7 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil
if (schemaShowCommandResult.count() == 0) {
// Table Does not exist
if (writeOptions.tableCreateOptions == SinkTableCreationMode.FailIfNotExist) {
if (tableCreation == SinkTableCreationMode.FailIfNotExist) {
throw new RuntimeException(s"Table '$table' doesn't exist in database '$database', cluster '${tableCoordinates.clusterAlias} and tableCreateOptions is set to FailIfNotExist.")
} else {
// Parse dataframe schema and create a destination table with that schema
@ -65,7 +66,7 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil
tableSchemaBuilder.add(s"['${field.name}']:$fieldType")
}
tmpTableSchema = tableSchemaBuilder.toString
engineClient.execute(database, generateTableCreateCommand(table, tmpTableSchema), crp)
engineClient.execute(database, generateTableCreateCommand(table, tmpTableSchema))
}
} else {
// Table exists. Parse kusto table schema and check if it matches the dataframes schema
@ -74,12 +75,12 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil
// Create a temporary table with the kusto or dataframe parsed schema with retention and delete set to after the
// write operation times out. Engine recommended keeping the retention although we use auto delete.
engineClient.execute(database, generateTempTableCreateCommand(tmpTableName, tmpTableSchema), crp)
engineClient.execute(database, generateTempTableCreateCommand(tmpTableName, tmpTableSchema))
engineClient.execute(database, generateTableAlterRetentionPolicy(tmpTableName,
DurationFormatUtils.formatDuration(writeOptions.autoCleanupTime.toMillis, durationFormat, true),
recoverable = false), crp)
val instant = Instant.now.plusSeconds(writeOptions.autoCleanupTime.toSeconds)
engineClient.execute(database, generateTableAlterAutoDeletePolicy(tmpTableName, instant), crp)
DurationFormatUtils.formatDuration(writeTimeout.toMillis, durationFormat,true),
recoverable=false))
val instant = Instant.now.plusSeconds(writeTimeout.toSeconds)
engineClient.execute(database, generateTableAlterAutoDeletePolicy(tmpTableName, instant))
}
def getTempBlobForIngestion: ContainerAndSas = {
@ -95,15 +96,14 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil
kustoAdminClient: Client,
tmpTableName: String,
partitionsResults: CollectionAccumulator[PartitionResult],
writeOptions: WriteOptions,
timeout: FiniteDuration,
requestId: String,
ingestIfNotExistsTags: util.ArrayList[String],
crp: ClientRequestProperties
): Unit = {
isAsync: Boolean = false): Unit = {
import coordinates._
val mergeTask = Future {
KDSU.logInfo(myName, s"Polling on ingestion results for requestId: ${writeOptions.requestId}, will move data to " +
s"destination table when finished")
KDSU.logInfo(myName, s"Polling on ingestion results for requestId: $requestId, will move data to destination table when finished")
try {
partitionsResults.value.asScala.foreach {
@ -116,18 +116,17 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil
finalRes
} catch {
case _: StorageException =>
KDSU.logWarn(myName, s"Failed to fetch operation status transiently - will keep polling. RequestId: ${writeOptions.requestId}")
KDSU.logWarn(myName, s"Failed to fetch operation status transiently - will keep polling. RequestId: $requestId")
None
case e: Exception => KDSU.reportExceptionAndThrow(myName, e, s"Failed to fetch operation status. RequestId: ${writeOptions.requestId}"); None
case e: Exception => KDSU.reportExceptionAndThrow(myName, e, s"Failed to fetch operation status. RequestId: $requestId"); None
}
},
0,
DelayPeriodBetweenCalls,
(writeOptions.timeout.toMillis / DelayPeriodBetweenCalls + 5).toInt,
(timeout.toMillis / DelayPeriodBetweenCalls + 5).toInt,
res => res.isDefined && res.get.status == OperationStatus.Pending,
res => finalRes = res,
maxWaitTimeBetweenCalls = KDSU.WriteMaxWaitTime.toMillis.toInt)
.await(writeOptions.timeout.toMillis, TimeUnit.MILLISECONDS)
maxWaitTimeBetweenCalls = KDSU.WriteMaxWaitTime.toMillis.toInt).await(timeout.toMillis, TimeUnit.MILLISECONDS)
if (finalRes.isDefined) {
finalRes.get.status match {
@ -155,17 +154,15 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil
// Protect tmp table from merge/rebuild and move data to the table requested by customer. This operation is atomic.
// We are using the ingestIfNotExists Tags here too (on top of the check at the start of the flow) so that if
// several flows started together only one of them would ingest
kustoAdminClient.execute(database, generateTableAlterMergePolicyCommand(tmpTableName, allowMerge = false,
allowRebuild = false), crp)
val res = kustoAdminClient.execute(database, generateTableMoveExtentsCommand(tmpTableName, table.get,
ingestIfNotExistsTags), crp).getPrimaryResults
kustoAdminClient.execute(database, generateTableAlterMergePolicyCommand(tmpTableName, allowMerge = false, allowRebuild = false))
val res = kustoAdminClient.execute(database, generateTableMoveExtentsCommand(tmpTableName, table.get, ingestIfNotExistsTags)).getPrimaryResults
if (!res.next()) {
// Extents that were moved are returned by move extents command
KDSU.logInfo(myName, s"Ingestion skipped: Provided ingest-by tags are present in the destination table '$table'")
}
KDSU.logInfo(myName, s"write to Kusto table '${table.get}' finished successfully requestId: ${writeOptions.requestId} $batchIdIfExists")
KDSU.logInfo(myName, s"write to Kusto table '${table.get}' finished successfully requestId: $requestId $batchIdIfExists")
} else {
KDSU.logWarn(myName, s"write to Kusto table '${table.get}' finished with no data written requestId: ${writeOptions.requestId} $batchIdIfExists")
KDSU.logWarn(myName, s"write to Kusto table '${table.get}' finished with no data written requestId: $requestId $batchIdIfExists")
}
} catch {
case ex: Exception =>
@ -175,29 +172,27 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil
"Trying to poll on pending ingestions", coordinates.clusterUrl, coordinates.database, coordinates.table.getOrElse("Unspecified table name")
)
} finally {
cleanupIngestionByproducts(database, kustoAdminClient, tmpTableName, crp)
cleanupIngestionByproducts(database, kustoAdminClient, tmpTableName)
}
}
if (!writeOptions.isAsync) {
Await.result(mergeTask, writeOptions.timeout)
if (!isAsync) {
Await.result(mergeTask, timeout)
}
}
private[kusto] def cleanupIngestionByproducts(database: String, kustoAdminClient: Client, tmpTableName: String,
crp: ClientRequestProperties): Unit = {
private[kusto] def cleanupIngestionByproducts(database: String, kustoAdminClient: Client, tmpTableName: String): Unit = {
try {
kustoAdminClient.execute(database, generateTableDropCommand(tmpTableName), crp)
kustoAdminClient.execute(database, generateTableDropCommand(tmpTableName))
KDSU.logInfo(myName, s"Temporary table '$tmpTableName' deleted successfully")
}
catch {
case exception: Exception =>
KDSU.reportExceptionAndThrow(myName, exception, s"deleting temporary table $tmpTableName", database, shouldNotThrow = false)
KDSU.reportExceptionAndThrow(myName, exception, s"deleting temporary table $tmpTableName", database = database, shouldNotThrow = false)
}
}
private[kusto] def setMappingOnStagingTableIfNeeded(stagingTableIngestionProperties: IngestionProperties,
originalTable: String, crp: ClientRequestProperties): Unit = {
private[kusto] def setMappingOnStagingTableIfNeeded(stagingTableIngestionProperties: IngestionProperties, originalTable: String): Unit = {
val mapping = stagingTableIngestionProperties.getIngestionMapping
val mappingReferenceName = mapping.getIngestionMappingReference
if (StringUtils.isNotBlank(mappingReferenceName)) {
@ -209,21 +204,19 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil
while (mappings.next && !found) {
if (mappings.getString(0).equals(mappingReferenceName)) {
val policyJson = mappings.getString(2).replace("\"", "'")
val cmd = generateCreateTableMappingCommand(stagingTableIngestionProperties.getTableName, mappingKind,
mappingReferenceName, policyJson)
engineClient.execute(stagingTableIngestionProperties.getDatabaseName, cmd, crp)
val c = generateCreateTableMappingCommand(stagingTableIngestionProperties.getTableName, mappingKind, mappingReferenceName, policyJson)
engineClient.execute(stagingTableIngestionProperties.getDatabaseName, c)
found = true
}
}
}
}
def fetchTableExtentsTags(database: String, table: String, crp: ClientRequestProperties): KustoResultSetTable = {
def fetchTableExtentsTags(database: String, table: String): KustoResultSetTable = {
engineClient.execute(database, CslCommandsGenerator.generateFetchTableIngestByTagsCommand(table)).getPrimaryResults
}
def shouldIngestData(tableCoordinates: KustoCoordinates, ingestionProperties: Option[String],
tableExists: Boolean, crp: ClientRequestProperties): Boolean = {
def shouldIngestData(tableCoordinates: KustoCoordinates, ingestionProperties: Option[String], tableExists: Boolean): Boolean = {
var shouldIngest = true
if (tableExists && ingestionProperties.isDefined) {
@ -231,7 +224,7 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil
if (ingestIfNotExistsTags != null && !ingestIfNotExistsTags.isEmpty) {
val ingestIfNotExistsTagsSet = ingestIfNotExistsTags.asScala.toSet
val res = fetchTableExtentsTags(tableCoordinates.database, tableCoordinates.table.get, crp)
val res = fetchTableExtentsTags(tableCoordinates.database, tableCoordinates.table.get)
if (res.next()) {
val tagsArray = res.getObject(0).asInstanceOf[JSONArray]
for (i <- 0 until tagsArray.length) {

Просмотреть файл

@ -5,10 +5,8 @@ import scala.concurrent.duration._
object KustoConstants {
// Setting high value to have no timeout on Await commands
val DefaultWaitingIntervalLongRunning: String = (2 days).toSeconds.toString
val DefaultCleaningInterval: String = (7 days).toSeconds.toString
val DefaultPeriodicSamplePeriod: FiniteDuration = 1 seconds
val DefaultIngestionTaskTime: FiniteDuration = 20 seconds
val NoTimeout: String = (-1 seconds).toSeconds.toString
val ClientName: String = KustoDataSourceUtils.clientName
val DefaultBufferSize: Int = 16 * 1024
val StorageExpiryMinutes: Int = 120

Просмотреть файл

@ -211,10 +211,7 @@ object KustoDataSourceUtils {
case _: java.lang.IllegalArgumentException => throw new InvalidParameterException(s"KUSTO_WRITE_ENABLE_ASYNC is expecting either 'true' or 'false', got: '$isAsyncParam'")
}
val timeout = new FiniteDuration(parameters.getOrElse(KustoSinkOptions.KUSTO_TIMEOUT_LIMIT, KCONST
.DefaultWaitingIntervalLongRunning).toInt, TimeUnit.SECONDS)
val autoCleanupTime = new FiniteDuration(parameters.getOrElse(KustoSinkOptions.KUSTO_STAGING_RESOURCES_CLEANUP, KCONST
.DefaultCleaningInterval).toInt, TimeUnit.SECONDS)
val timeout = new FiniteDuration(parameters.getOrElse(KustoSinkOptions.KUSTO_TIMEOUT_LIMIT, KCONST.DefaultWaitingIntervalLongRunning).toLong, TimeUnit.SECONDS)
val requestId = parameters.getOrElse(KustoSinkOptions.KUSTO_REQUEST_ID, UUID.randomUUID().toString)
val ingestionPropertiesAsJson = parameters.get(KustoSinkOptions.KUSTO_SPARK_INGESTION_PROPERTIES_JSON)
@ -227,8 +224,7 @@ object KustoDataSourceUtils {
timeout,
ingestionPropertiesAsJson,
batchLimit,
requestId,
autoCleanupTime
requestId
)
val sourceParameters = parseSourceParameters(parameters)
@ -298,13 +294,10 @@ object KustoDataSourceUtils {
private[kusto] def getEngineUrlFromAliasIfNeeded(cluster: String): String = {
if (cluster.startsWith(EnginePrefix)) {
val uri = new URI(cluster)
val host = uri.getHost
val host = new URI(cluster).getHost
if (host.startsWith(IngestPrefix)) {
val startIdx = if (host.startsWith(IngestPrefix)) IngestPrefix.length else 0
val uriBuilder = new URIBuilder()
uriBuilder.setHost(s"${host.substring(startIdx, host.indexOf(".kusto."))}.kusto.windows.net")
uriBuilder.setScheme("https").toString
uriBuilder.setHost(IngestPrefix + host).toString
} else {
cluster
}
@ -549,23 +542,23 @@ object KustoDataSourceUtils {
else None
}
private[kusto] def countRows(client: Client, query: String, database: String, crp: ClientRequestProperties): Int = {
val res = client.execute(database, generateCountQuery(query), crp).getPrimaryResults
private[kusto] def countRows(client: Client, query: String, database: String): Int = {
val res = client.execute(database, generateCountQuery(query)).getPrimaryResults
res.next()
res.getInt(0)
}
private[kusto] def estimateRowsCount(client: Client, query: String, database: String, crp: ClientRequestProperties): Int = {
private[kusto] def estimateRowsCount(client: Client, query: String, database: String): Int = {
var count = 0
val estimationResult: util.List[AnyRef] = Await.result(Future {
val res = client.execute(database, generateEstimateRowsCountQuery(query), crp).getPrimaryResults
val res = client.execute(database, generateEstimateRowsCountQuery(query)).getPrimaryResults
res.next()
res.getCurrentRow
}, KustoConstants.TimeoutForCountCheck)
if (StringUtils.isBlank(estimationResult.get(1).toString)) {
// Estimation can be empty for certain cases
Await.result(Future {
val res = client.execute(database, generateCountQuery(query), crp).getPrimaryResults
val res = client.execute(database, generateCountQuery(query)).getPrimaryResults
res.next()
res.getInt(0)
}, KustoConstants.TimeoutForCountCheck)

Просмотреть файл

@ -2,7 +2,7 @@ package com.microsoft.kusto.spark
import java.util
import com.microsoft.azure.kusto.data.{ClientRequestProperties, KustoOperationResult, KustoResultSetTable}
import com.microsoft.azure.kusto.data.{KustoOperationResult, KustoResultSetTable}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder
import com.microsoft.kusto.spark.common.KustoCoordinates
import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
@ -16,8 +16,7 @@ import scala.collection.JavaConverters._
@RunWith(classOf[JUnitRunner])
class KustoClientTests extends FlatSpec with Matchers{
class KustoClientStub (override val clusterAlias: String, override val engineKcsb: ConnectionStringBuilder, override val ingestKcsb: ConnectionStringBuilder, var tagsToReturn: util.ArrayList[String]) extends KustoClient(clusterAlias, engineKcsb, ingestKcsb){
override def fetchTableExtentsTags(database: String, table: String, crp: ClientRequestProperties)
: KustoResultSetTable = {
override def fetchTableExtentsTags(database: String, table: String): KustoResultSetTable = {
val response =
s"""{"Tables":[{"TableName":"Table_0","Columns":[{"ColumnName":"Tags","DataType":"Object","ColumnType":"dynamic"}],
"Rows":[[${if (tagsToReturn.isEmpty) "" else tagsToReturn.asScala.map(t=>"\""+t+"\"").asJava}]]}]}"""
@ -30,20 +29,17 @@ class KustoClientTests extends FlatSpec with Matchers{
val stubbedClient = new KustoClientStub("", null, null, null)
stubbedClient.tagsToReturn = emptyTags
val props = new SparkIngestionProperties
val shouldIngestWhenNoTags = stubbedClient.shouldIngestData(KustoCoordinates("", "", "database", Some("table")),
Some(props.toString), tableExists = true, null)
val shouldIngestWhenNoTags = stubbedClient.shouldIngestData(KustoCoordinates("", "", "database", Some("table")), Some(props.toString), tableExists = true)
val tags = new util.ArrayList[String]
tags.add("tag")
stubbedClient.tagsToReturn = tags
props.ingestIfNotExists = new util.ArrayList[String](){{add("otherTag")}}
val shouldIngestWhenNoOverlap = stubbedClient.shouldIngestData(KustoCoordinates("", "","database", Some("table")),
Some(props.toString), tableExists = true, null)
val shouldIngestWhenNoOverlap = stubbedClient.shouldIngestData(KustoCoordinates("", "","database", Some("table")), Some(props.toString), tableExists = true)
shouldIngestWhenNoOverlap shouldEqual true
tags.add("otherTag")
val shouldIngestWhenOverlap = stubbedClient.shouldIngestData(KustoCoordinates("", "","database", Some("table")),
Some(props.toString), tableExists = true, null)
val shouldIngestWhenOverlap = stubbedClient.shouldIngestData(KustoCoordinates("", "","database", Some("table")), Some(props.toString), tableExists = true)
shouldIngestWhenOverlap shouldEqual false
}
}

Просмотреть файл

@ -90,8 +90,6 @@ class KustoSourceTests extends FlatSpec with MockFactory with Matchers with Befo
assert(ingestUrl.equals(AliasAndAuth(alias,engineUrl,null).ingestUri))
val engine2 = KDSU.getEngineUrlFromAliasIfNeeded(ingestUrl)
assert(engine2.equals(engineUrl))
}
"KustoDataSource" should "match cluster custom domain url or aria old cluster" in {

Просмотреть файл

@ -13,12 +13,12 @@
<!-- Spark dependencies -->
<scala.version.major>2.12</scala.version.major>
<scala.version.minor>11</scala.version.minor>
<specs2.version>3.6.5</specs2.version>
<spark.version.major>3.0</spark.version.major>
<spark.version.minor>1</spark.version.minor>
<!-- other dependencies -->
<kusto-sdk.version>2.7.0</kusto-sdk.version>
<specs2.version>3.6.5</specs2.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven-source-plugin.version>3.0.1</maven-source-plugin.version>
<maven-javadoc-plugin.version>3.1.0</maven-javadoc-plugin.version>