Revert "New option for the auto delete policy time - apart from the timeout with default of 7 days Make sure we use the same client request properties in all the flow Small bug fix - not allowed url to contain "ingest-""
This reverts commit dcdcb66d
This commit is contained in:
Родитель
dcdcb66dfd
Коммит
9bccb17acf
|
@ -44,9 +44,8 @@ trait KustoOptions {
|
|||
// required permissions
|
||||
val KUSTO_DATABASE: String = newOption("kustoDatabase")
|
||||
|
||||
// An integer number corresponding to the period in seconds after which the operation will timeout.
|
||||
// For write operations this limit starts ticking only after the data was processed by the connector and it starts
|
||||
// polling on the ingestion results. Default 2 days
|
||||
// An integer number corresponding to the period in seconds after which the operation will timeout. Default: '5400'
|
||||
// (90 minutes)
|
||||
val KUSTO_TIMEOUT_LIMIT: String = newOption("timeoutLimit")
|
||||
|
||||
// An id of the source used for tracing of the write operation
|
||||
|
|
|
@ -33,9 +33,6 @@ object KustoSinkOptions extends KustoOptions{
|
|||
// partition. Kusto's ingestion also aggregates data, default suggested by Kusto is 1GB but here we suggest to cut
|
||||
// it at 100MB to adjust it to spark pulling of data.
|
||||
val KUSTO_CLIENT_BATCHING_LIMIT: String = newOption("clientBatchingLimit")
|
||||
|
||||
// Time after which staging resources are cleaned if they weren't cleaned at the end of the run
|
||||
val KUSTO_STAGING_RESOURCES_CLEANUP: String = newOption("stagingResourcesCleanup")
|
||||
}
|
||||
|
||||
object SinkTableCreationMode extends Enumeration {
|
||||
|
@ -46,9 +43,7 @@ object SinkTableCreationMode extends Enumeration {
|
|||
case class WriteOptions(tableCreateOptions: SinkTableCreationMode.SinkTableCreationMode = SinkTableCreationMode.FailIfNotExist,
|
||||
isAsync: Boolean = false,
|
||||
writeResultLimit: String = KustoSinkOptions.NONE_RESULT_LIMIT,
|
||||
timeZone: String = "UTC",
|
||||
timeout: FiniteDuration,
|
||||
timeZone: String = "UTC", timeout: FiniteDuration,
|
||||
IngestionProperties: Option[String] = None,
|
||||
batchLimit: Int = 100,
|
||||
requestId: String = UUID.randomUUID().toString,
|
||||
autoCleanupTime: FiniteDuration)
|
||||
requestId: String = UUID.randomUUID().toString)
|
||||
|
|
|
@ -8,7 +8,6 @@ import java.util
|
|||
import java.util.zip.GZIPOutputStream
|
||||
import java.util.{TimeZone, UUID}
|
||||
|
||||
import com.microsoft.azure.kusto.data.ClientRequestProperties
|
||||
import com.microsoft.azure.kusto.ingest.IngestionProperties.DATA_FORMAT
|
||||
import com.microsoft.azure.kusto.ingest.result.IngestionResult
|
||||
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo
|
||||
|
@ -18,6 +17,7 @@ import com.microsoft.kusto.spark.authentication.KustoAuthentication
|
|||
import com.microsoft.kusto.spark.common.KustoCoordinates
|
||||
import com.microsoft.kusto.spark.utils.CslCommandsGenerator.generateTableGetSchemaAsRowsCommand
|
||||
import com.microsoft.kusto.spark.utils.{KustoClient, KustoClientCache, KustoQueryUtils, KustoConstants => KCONST, KustoDataSourceUtils => KDSU}
|
||||
import org.apache.commons.lang3.time.FastDateFormat
|
||||
import org.apache.spark.TaskContext
|
||||
import org.apache.spark.sql.DataFrame
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
|
@ -49,55 +49,49 @@ object KustoWriter {
|
|||
tableCoordinates.clusterUrl, tableCoordinates.database)
|
||||
}
|
||||
|
||||
val crp = new ClientRequestProperties
|
||||
crp.setClientRequestId(writeOptions.requestId)
|
||||
|
||||
val table = tableCoordinates.table.get
|
||||
val tmpTableName: String = KustoQueryUtils.simplifyName(TempIngestionTablePrefix +
|
||||
data.sparkSession.sparkContext.appName +
|
||||
"_" + table + batchId.map(b => s"_${b.toString}").getOrElse("") + "_" + writeOptions.requestId)
|
||||
"_" + table + batchId.map(b => s"_${b.toString}").getOrElse("") + "_" + writeOptions.requestId + UUID.randomUUID()
|
||||
.toString)
|
||||
|
||||
implicit val parameters: KustoWriteResource = KustoWriteResource(authentication, tableCoordinates, data.schema,
|
||||
writeOptions, tmpTableName)
|
||||
implicit val parameters: KustoWriteResource = KustoWriteResource(authentication, tableCoordinates, data.schema, writeOptions, tmpTableName)
|
||||
val stagingTableIngestionProperties = getIngestionProperties(writeOptions, parameters)
|
||||
val ingestIfNotExistsTags = stagingTableIngestionProperties.getIngestIfNotExists
|
||||
|
||||
val schemaShowCommandResult = kustoClient.engineClient.execute(tableCoordinates.database,
|
||||
generateTableGetSchemaAsRowsCommand(tableCoordinates.table.get), crp).getPrimaryResults
|
||||
val schemaShowCommandResult = kustoClient.engineClient.execute(tableCoordinates.database, generateTableGetSchemaAsRowsCommand(tableCoordinates.table.get)).getPrimaryResults
|
||||
|
||||
// Remove the ingestIfNotExists tags because several partitions can ingest into the staging table and should not interfere one another
|
||||
stagingTableIngestionProperties.setIngestIfNotExists(new util.ArrayList())
|
||||
val shouldIngest = kustoClient.shouldIngestData(tableCoordinates, writeOptions.IngestionProperties, tableExists =
|
||||
schemaShowCommandResult.count() > 0, crp)
|
||||
val shouldIngest = kustoClient.shouldIngestData(tableCoordinates, writeOptions.IngestionProperties, tableExists = schemaShowCommandResult.count() > 0)
|
||||
|
||||
if (!shouldIngest) {
|
||||
KDSU.logInfo(myName, s"Ingestion skipped: Provided ingest-by tags are present in the destination table '$table'")
|
||||
} else {
|
||||
// KustoWriter will create a temporary table ingesting the data to it.
|
||||
// Only if all executors succeeded the table will be appended to the original destination table.
|
||||
kustoClient.initializeTablesBySchema(tableCoordinates, tmpTableName, data
|
||||
.schema, schemaShowCommandResult, writeOptions, crp)
|
||||
kustoClient.initializeTablesBySchema(tableCoordinates, tmpTableName, writeOptions.tableCreateOptions, data
|
||||
.schema, schemaShowCommandResult, writeOptions.timeout)
|
||||
KDSU.logInfo(myName, s"Successfully created temporary table $tmpTableName, will be deleted after completing the operation")
|
||||
|
||||
kustoClient.setMappingOnStagingTableIfNeeded(stagingTableIngestionProperties, table, crp)
|
||||
if (stagingTableIngestionProperties.getFlushImmediately) {
|
||||
kustoClient.setMappingOnStagingTableIfNeeded(stagingTableIngestionProperties, table)
|
||||
if (stagingTableIngestionProperties.getFlushImmediately){
|
||||
KDSU.logWarn(myName, "Its not recommended to set flushImmediately to true")
|
||||
}
|
||||
val rdd = data.queryExecution.toRdd
|
||||
val partitionsResults = rdd.sparkContext.collectionAccumulator[PartitionResult]
|
||||
|
||||
if (writeOptions.isAsync) {
|
||||
val asyncWork = rdd.foreachPartitionAsync { rows => ingestRowsIntoTempTbl(rows, batchIdIfExists, partitionsResults) }
|
||||
KDSU.logInfo(myName, s"asynchronous write to Kusto table '$table' in progress")
|
||||
// This part runs back on the driver
|
||||
asyncWork.onSuccess {
|
||||
case _ => kustoClient.finalizeIngestionWhenWorkersSucceeded(
|
||||
tableCoordinates, batchIdIfExists, kustoClient.engineClient, tmpTableName, partitionsResults,
|
||||
writeOptions, ingestIfNotExistsTags, crp)
|
||||
tableCoordinates, batchIdIfExists, kustoClient.engineClient, tmpTableName, partitionsResults, writeOptions.timeout, writeOptions.requestId, ingestIfNotExistsTags, isAsync = true)
|
||||
}
|
||||
asyncWork.onFailure {
|
||||
case exception: Exception =>
|
||||
kustoClient.cleanupIngestionByproducts(
|
||||
tableCoordinates.database, kustoClient.engineClient, tmpTableName, crp)
|
||||
kustoClient.cleanupIngestionByproducts(tableCoordinates.database, kustoClient.engineClient, tmpTableName)
|
||||
KDSU.reportExceptionAndThrow(myName, exception, "writing data", tableCoordinates.clusterUrl, tableCoordinates.database, table, shouldNotThrow = true)
|
||||
KDSU.logError(myName, "The exception is not visible in the driver since we're in async mode")
|
||||
}
|
||||
|
@ -106,13 +100,11 @@ object KustoWriter {
|
|||
rdd.foreachPartition { rows => ingestRowsIntoTempTbl(rows, batchIdIfExists, partitionsResults) }
|
||||
catch {
|
||||
case exception: Exception =>
|
||||
kustoClient.cleanupIngestionByproducts(
|
||||
tableCoordinates.database, kustoClient.engineClient, tmpTableName, crp)
|
||||
kustoClient.cleanupIngestionByproducts(tableCoordinates.database, kustoClient.engineClient, tmpTableName)
|
||||
throw exception
|
||||
}
|
||||
kustoClient.finalizeIngestionWhenWorkersSucceeded(
|
||||
tableCoordinates, batchIdIfExists, kustoClient.engineClient, tmpTableName, partitionsResults, writeOptions,
|
||||
ingestIfNotExistsTags, crp)
|
||||
tableCoordinates, batchIdIfExists, kustoClient.engineClient, tmpTableName, partitionsResults, writeOptions.timeout, writeOptions.requestId, ingestIfNotExistsTags)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -207,7 +199,7 @@ object KustoWriter {
|
|||
val partitionId = TaskContext.getPartitionId
|
||||
Future {
|
||||
var props = ingestionProperties
|
||||
if (!ingestionProperties.getFlushImmediately && flushImmediately) {
|
||||
if(!ingestionProperties.getFlushImmediately && flushImmediately){
|
||||
// Need to copy the ingestionProperties so that only this one will be flushed immediately
|
||||
props = SparkIngestionProperties.cloneIngestionProperties(ingestionProperties)
|
||||
props.setFlushImmediately(true)
|
||||
|
@ -235,7 +227,7 @@ object KustoWriter {
|
|||
RowCSVWriterUtils.writeRowAsCSV(row, schema, timeZone, blobWriter.csvWriter)
|
||||
|
||||
val count = blobWriter.csvWriter.getCounter
|
||||
val shouldNotCommitBlockBlob = count < maxBlobSize
|
||||
val shouldNotCommitBlockBlob = count < maxBlobSize
|
||||
if (shouldNotCommitBlockBlob) {
|
||||
blobWriter
|
||||
} else {
|
||||
|
|
|
@ -113,8 +113,7 @@ private[kusto] object KustoReader {
|
|||
// Check whether the result is empty, causing an IO exception on reading empty parquet file
|
||||
// We don't mind generating the filtered query again - it only happens upon exception
|
||||
val filteredQuery = KustoFilter.pruneAndFilter(request.schema, request.query, filtering)
|
||||
val count = KDSU.countRows(kustoClient, filteredQuery, request.kustoCoordinates.database, request
|
||||
.clientRequestProperties.orNull)
|
||||
val count = KDSU.countRows(kustoClient, filteredQuery, request.kustoCoordinates.database)
|
||||
|
||||
if (count == 0) {
|
||||
request.sparkSession.emptyDataFrame.rdd
|
||||
|
|
|
@ -61,8 +61,7 @@ private[kusto] case class KustoRelation(kustoCoordinates: KustoCoordinates,
|
|||
if (readOptions.readMode.isEmpty){
|
||||
var count = 0
|
||||
try {
|
||||
count = KDSU.
|
||||
estimateRowsCount(kustoClient, query, kustoCoordinates.database, clientRequestProperties.orNull)
|
||||
count = KDSU.estimateRowsCount(kustoClient, query, kustoCoordinates.database)
|
||||
}catch {
|
||||
// Assume count is high if estimation got timed out
|
||||
case e: Exception =>
|
||||
|
@ -162,8 +161,6 @@ private[kusto] case class KustoRelation(kustoCoordinates: KustoCoordinates,
|
|||
|
||||
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
|
||||
KustoWriter.write(Some(0), data, kustoCoordinates, authentication, writeOptions =
|
||||
WriteOptions.apply(timeout = new FiniteDuration(KustoConstants.DefaultWaitingIntervalLongRunning.toInt,
|
||||
TimeUnit.SECONDS), autoCleanupTime = new FiniteDuration(KustoConstants.DefaultCleaningInterval.toInt,
|
||||
TimeUnit.SECONDS)))
|
||||
WriteOptions.apply(timeout = new FiniteDuration(KustoConstants.DefaultWaitingIntervalLongRunning.toLong, TimeUnit.MINUTES)))
|
||||
}
|
||||
}
|
|
@ -6,13 +6,14 @@ import java.util.StringJoiner
|
|||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder
|
||||
import com.microsoft.azure.kusto.data.{Client, ClientFactory, ClientRequestProperties, KustoResultSetTable}
|
||||
import com.microsoft.azure.kusto.data.{Client, ClientFactory, KustoResultSetTable}
|
||||
import com.microsoft.azure.kusto.ingest.result.{IngestionStatus, OperationStatus}
|
||||
import com.microsoft.azure.kusto.ingest.{IngestClient, IngestClientFactory, IngestionProperties}
|
||||
import com.microsoft.azure.storage.StorageException
|
||||
import com.microsoft.kusto.spark.common.KustoCoordinates
|
||||
import com.microsoft.kusto.spark.datasink.KustoWriter.DelayPeriodBetweenCalls
|
||||
import com.microsoft.kusto.spark.datasink.{PartitionResult, SinkTableCreationMode, SparkIngestionProperties, WriteOptions}
|
||||
import com.microsoft.kusto.spark.datasink.KustoWriter.{DelayPeriodBetweenCalls, LegacyTempIngestionTablePrefix, TempIngestionTablePrefix}
|
||||
import com.microsoft.kusto.spark.datasink.SinkTableCreationMode.SinkTableCreationMode
|
||||
import com.microsoft.kusto.spark.datasink.{PartitionResult, SinkTableCreationMode, SparkIngestionProperties}
|
||||
import com.microsoft.kusto.spark.datasource.KustoStorageParameters
|
||||
import com.microsoft.kusto.spark.utils.CslCommandsGenerator._
|
||||
import com.microsoft.kusto.spark.utils.KustoDataSourceUtils.extractSchemaFromResultTable
|
||||
|
@ -26,6 +27,7 @@ import shaded.parquet.org.codehaus.jackson.map.ObjectMapper
|
|||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import scala.concurrent.{Await, Future}
|
||||
|
||||
class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuilder, val ingestKcsb: ConnectionStringBuilder) {
|
||||
|
@ -41,13 +43,12 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil
|
|||
|
||||
private val myName = this.getClass.getSimpleName
|
||||
private val durationFormat = "dd:HH:mm:ss"
|
||||
|
||||
def initializeTablesBySchema(tableCoordinates: KustoCoordinates,
|
||||
tmpTableName: String,
|
||||
tableCreation: SinkTableCreationMode = SinkTableCreationMode.FailIfNotExist,
|
||||
schema: StructType,
|
||||
schemaShowCommandResult: KustoResultSetTable,
|
||||
writeOptions: WriteOptions,
|
||||
crp: ClientRequestProperties): Unit = {
|
||||
writeTimeout: FiniteDuration): Unit = {
|
||||
|
||||
var tmpTableSchema: String = ""
|
||||
val database = tableCoordinates.database
|
||||
|
@ -55,7 +56,7 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil
|
|||
|
||||
if (schemaShowCommandResult.count() == 0) {
|
||||
// Table Does not exist
|
||||
if (writeOptions.tableCreateOptions == SinkTableCreationMode.FailIfNotExist) {
|
||||
if (tableCreation == SinkTableCreationMode.FailIfNotExist) {
|
||||
throw new RuntimeException(s"Table '$table' doesn't exist in database '$database', cluster '${tableCoordinates.clusterAlias} and tableCreateOptions is set to FailIfNotExist.")
|
||||
} else {
|
||||
// Parse dataframe schema and create a destination table with that schema
|
||||
|
@ -65,7 +66,7 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil
|
|||
tableSchemaBuilder.add(s"['${field.name}']:$fieldType")
|
||||
}
|
||||
tmpTableSchema = tableSchemaBuilder.toString
|
||||
engineClient.execute(database, generateTableCreateCommand(table, tmpTableSchema), crp)
|
||||
engineClient.execute(database, generateTableCreateCommand(table, tmpTableSchema))
|
||||
}
|
||||
} else {
|
||||
// Table exists. Parse kusto table schema and check if it matches the dataframes schema
|
||||
|
@ -74,12 +75,12 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil
|
|||
|
||||
// Create a temporary table with the kusto or dataframe parsed schema with retention and delete set to after the
|
||||
// write operation times out. Engine recommended keeping the retention although we use auto delete.
|
||||
engineClient.execute(database, generateTempTableCreateCommand(tmpTableName, tmpTableSchema), crp)
|
||||
engineClient.execute(database, generateTempTableCreateCommand(tmpTableName, tmpTableSchema))
|
||||
engineClient.execute(database, generateTableAlterRetentionPolicy(tmpTableName,
|
||||
DurationFormatUtils.formatDuration(writeOptions.autoCleanupTime.toMillis, durationFormat, true),
|
||||
recoverable = false), crp)
|
||||
val instant = Instant.now.plusSeconds(writeOptions.autoCleanupTime.toSeconds)
|
||||
engineClient.execute(database, generateTableAlterAutoDeletePolicy(tmpTableName, instant), crp)
|
||||
DurationFormatUtils.formatDuration(writeTimeout.toMillis, durationFormat,true),
|
||||
recoverable=false))
|
||||
val instant = Instant.now.plusSeconds(writeTimeout.toSeconds)
|
||||
engineClient.execute(database, generateTableAlterAutoDeletePolicy(tmpTableName, instant))
|
||||
}
|
||||
|
||||
def getTempBlobForIngestion: ContainerAndSas = {
|
||||
|
@ -95,15 +96,14 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil
|
|||
kustoAdminClient: Client,
|
||||
tmpTableName: String,
|
||||
partitionsResults: CollectionAccumulator[PartitionResult],
|
||||
writeOptions: WriteOptions,
|
||||
timeout: FiniteDuration,
|
||||
requestId: String,
|
||||
ingestIfNotExistsTags: util.ArrayList[String],
|
||||
crp: ClientRequestProperties
|
||||
): Unit = {
|
||||
isAsync: Boolean = false): Unit = {
|
||||
import coordinates._
|
||||
|
||||
val mergeTask = Future {
|
||||
KDSU.logInfo(myName, s"Polling on ingestion results for requestId: ${writeOptions.requestId}, will move data to " +
|
||||
s"destination table when finished")
|
||||
KDSU.logInfo(myName, s"Polling on ingestion results for requestId: $requestId, will move data to destination table when finished")
|
||||
|
||||
try {
|
||||
partitionsResults.value.asScala.foreach {
|
||||
|
@ -116,18 +116,17 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil
|
|||
finalRes
|
||||
} catch {
|
||||
case _: StorageException =>
|
||||
KDSU.logWarn(myName, s"Failed to fetch operation status transiently - will keep polling. RequestId: ${writeOptions.requestId}")
|
||||
KDSU.logWarn(myName, s"Failed to fetch operation status transiently - will keep polling. RequestId: $requestId")
|
||||
None
|
||||
case e: Exception => KDSU.reportExceptionAndThrow(myName, e, s"Failed to fetch operation status. RequestId: ${writeOptions.requestId}"); None
|
||||
case e: Exception => KDSU.reportExceptionAndThrow(myName, e, s"Failed to fetch operation status. RequestId: $requestId"); None
|
||||
}
|
||||
},
|
||||
0,
|
||||
DelayPeriodBetweenCalls,
|
||||
(writeOptions.timeout.toMillis / DelayPeriodBetweenCalls + 5).toInt,
|
||||
(timeout.toMillis / DelayPeriodBetweenCalls + 5).toInt,
|
||||
res => res.isDefined && res.get.status == OperationStatus.Pending,
|
||||
res => finalRes = res,
|
||||
maxWaitTimeBetweenCalls = KDSU.WriteMaxWaitTime.toMillis.toInt)
|
||||
.await(writeOptions.timeout.toMillis, TimeUnit.MILLISECONDS)
|
||||
maxWaitTimeBetweenCalls = KDSU.WriteMaxWaitTime.toMillis.toInt).await(timeout.toMillis, TimeUnit.MILLISECONDS)
|
||||
|
||||
if (finalRes.isDefined) {
|
||||
finalRes.get.status match {
|
||||
|
@ -155,17 +154,15 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil
|
|||
// Protect tmp table from merge/rebuild and move data to the table requested by customer. This operation is atomic.
|
||||
// We are using the ingestIfNotExists Tags here too (on top of the check at the start of the flow) so that if
|
||||
// several flows started together only one of them would ingest
|
||||
kustoAdminClient.execute(database, generateTableAlterMergePolicyCommand(tmpTableName, allowMerge = false,
|
||||
allowRebuild = false), crp)
|
||||
val res = kustoAdminClient.execute(database, generateTableMoveExtentsCommand(tmpTableName, table.get,
|
||||
ingestIfNotExistsTags), crp).getPrimaryResults
|
||||
kustoAdminClient.execute(database, generateTableAlterMergePolicyCommand(tmpTableName, allowMerge = false, allowRebuild = false))
|
||||
val res = kustoAdminClient.execute(database, generateTableMoveExtentsCommand(tmpTableName, table.get, ingestIfNotExistsTags)).getPrimaryResults
|
||||
if (!res.next()) {
|
||||
// Extents that were moved are returned by move extents command
|
||||
KDSU.logInfo(myName, s"Ingestion skipped: Provided ingest-by tags are present in the destination table '$table'")
|
||||
}
|
||||
KDSU.logInfo(myName, s"write to Kusto table '${table.get}' finished successfully requestId: ${writeOptions.requestId} $batchIdIfExists")
|
||||
KDSU.logInfo(myName, s"write to Kusto table '${table.get}' finished successfully requestId: $requestId $batchIdIfExists")
|
||||
} else {
|
||||
KDSU.logWarn(myName, s"write to Kusto table '${table.get}' finished with no data written requestId: ${writeOptions.requestId} $batchIdIfExists")
|
||||
KDSU.logWarn(myName, s"write to Kusto table '${table.get}' finished with no data written requestId: $requestId $batchIdIfExists")
|
||||
}
|
||||
} catch {
|
||||
case ex: Exception =>
|
||||
|
@ -175,29 +172,27 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil
|
|||
"Trying to poll on pending ingestions", coordinates.clusterUrl, coordinates.database, coordinates.table.getOrElse("Unspecified table name")
|
||||
)
|
||||
} finally {
|
||||
cleanupIngestionByproducts(database, kustoAdminClient, tmpTableName, crp)
|
||||
cleanupIngestionByproducts(database, kustoAdminClient, tmpTableName)
|
||||
}
|
||||
}
|
||||
|
||||
if (!writeOptions.isAsync) {
|
||||
Await.result(mergeTask, writeOptions.timeout)
|
||||
if (!isAsync) {
|
||||
Await.result(mergeTask, timeout)
|
||||
}
|
||||
}
|
||||
|
||||
private[kusto] def cleanupIngestionByproducts(database: String, kustoAdminClient: Client, tmpTableName: String,
|
||||
crp: ClientRequestProperties): Unit = {
|
||||
private[kusto] def cleanupIngestionByproducts(database: String, kustoAdminClient: Client, tmpTableName: String): Unit = {
|
||||
try {
|
||||
kustoAdminClient.execute(database, generateTableDropCommand(tmpTableName), crp)
|
||||
kustoAdminClient.execute(database, generateTableDropCommand(tmpTableName))
|
||||
KDSU.logInfo(myName, s"Temporary table '$tmpTableName' deleted successfully")
|
||||
}
|
||||
catch {
|
||||
case exception: Exception =>
|
||||
KDSU.reportExceptionAndThrow(myName, exception, s"deleting temporary table $tmpTableName", database, shouldNotThrow = false)
|
||||
KDSU.reportExceptionAndThrow(myName, exception, s"deleting temporary table $tmpTableName", database = database, shouldNotThrow = false)
|
||||
}
|
||||
}
|
||||
|
||||
private[kusto] def setMappingOnStagingTableIfNeeded(stagingTableIngestionProperties: IngestionProperties,
|
||||
originalTable: String, crp: ClientRequestProperties): Unit = {
|
||||
private[kusto] def setMappingOnStagingTableIfNeeded(stagingTableIngestionProperties: IngestionProperties, originalTable: String): Unit = {
|
||||
val mapping = stagingTableIngestionProperties.getIngestionMapping
|
||||
val mappingReferenceName = mapping.getIngestionMappingReference
|
||||
if (StringUtils.isNotBlank(mappingReferenceName)) {
|
||||
|
@ -209,21 +204,19 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil
|
|||
while (mappings.next && !found) {
|
||||
if (mappings.getString(0).equals(mappingReferenceName)) {
|
||||
val policyJson = mappings.getString(2).replace("\"", "'")
|
||||
val cmd = generateCreateTableMappingCommand(stagingTableIngestionProperties.getTableName, mappingKind,
|
||||
mappingReferenceName, policyJson)
|
||||
engineClient.execute(stagingTableIngestionProperties.getDatabaseName, cmd, crp)
|
||||
val c = generateCreateTableMappingCommand(stagingTableIngestionProperties.getTableName, mappingKind, mappingReferenceName, policyJson)
|
||||
engineClient.execute(stagingTableIngestionProperties.getDatabaseName, c)
|
||||
found = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def fetchTableExtentsTags(database: String, table: String, crp: ClientRequestProperties): KustoResultSetTable = {
|
||||
def fetchTableExtentsTags(database: String, table: String): KustoResultSetTable = {
|
||||
engineClient.execute(database, CslCommandsGenerator.generateFetchTableIngestByTagsCommand(table)).getPrimaryResults
|
||||
}
|
||||
|
||||
def shouldIngestData(tableCoordinates: KustoCoordinates, ingestionProperties: Option[String],
|
||||
tableExists: Boolean, crp: ClientRequestProperties): Boolean = {
|
||||
def shouldIngestData(tableCoordinates: KustoCoordinates, ingestionProperties: Option[String], tableExists: Boolean): Boolean = {
|
||||
var shouldIngest = true
|
||||
|
||||
if (tableExists && ingestionProperties.isDefined) {
|
||||
|
@ -231,7 +224,7 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil
|
|||
if (ingestIfNotExistsTags != null && !ingestIfNotExistsTags.isEmpty) {
|
||||
val ingestIfNotExistsTagsSet = ingestIfNotExistsTags.asScala.toSet
|
||||
|
||||
val res = fetchTableExtentsTags(tableCoordinates.database, tableCoordinates.table.get, crp)
|
||||
val res = fetchTableExtentsTags(tableCoordinates.database, tableCoordinates.table.get)
|
||||
if (res.next()) {
|
||||
val tagsArray = res.getObject(0).asInstanceOf[JSONArray]
|
||||
for (i <- 0 until tagsArray.length) {
|
||||
|
|
|
@ -5,10 +5,8 @@ import scala.concurrent.duration._
|
|||
object KustoConstants {
|
||||
// Setting high value to have no timeout on Await commands
|
||||
val DefaultWaitingIntervalLongRunning: String = (2 days).toSeconds.toString
|
||||
val DefaultCleaningInterval: String = (7 days).toSeconds.toString
|
||||
val DefaultPeriodicSamplePeriod: FiniteDuration = 1 seconds
|
||||
val DefaultIngestionTaskTime: FiniteDuration = 20 seconds
|
||||
val NoTimeout: String = (-1 seconds).toSeconds.toString
|
||||
val ClientName: String = KustoDataSourceUtils.clientName
|
||||
val DefaultBufferSize: Int = 16 * 1024
|
||||
val StorageExpiryMinutes: Int = 120
|
||||
|
|
|
@ -211,10 +211,7 @@ object KustoDataSourceUtils {
|
|||
case _: java.lang.IllegalArgumentException => throw new InvalidParameterException(s"KUSTO_WRITE_ENABLE_ASYNC is expecting either 'true' or 'false', got: '$isAsyncParam'")
|
||||
}
|
||||
|
||||
val timeout = new FiniteDuration(parameters.getOrElse(KustoSinkOptions.KUSTO_TIMEOUT_LIMIT, KCONST
|
||||
.DefaultWaitingIntervalLongRunning).toInt, TimeUnit.SECONDS)
|
||||
val autoCleanupTime = new FiniteDuration(parameters.getOrElse(KustoSinkOptions.KUSTO_STAGING_RESOURCES_CLEANUP, KCONST
|
||||
.DefaultCleaningInterval).toInt, TimeUnit.SECONDS)
|
||||
val timeout = new FiniteDuration(parameters.getOrElse(KustoSinkOptions.KUSTO_TIMEOUT_LIMIT, KCONST.DefaultWaitingIntervalLongRunning).toLong, TimeUnit.SECONDS)
|
||||
val requestId = parameters.getOrElse(KustoSinkOptions.KUSTO_REQUEST_ID, UUID.randomUUID().toString)
|
||||
|
||||
val ingestionPropertiesAsJson = parameters.get(KustoSinkOptions.KUSTO_SPARK_INGESTION_PROPERTIES_JSON)
|
||||
|
@ -227,8 +224,7 @@ object KustoDataSourceUtils {
|
|||
timeout,
|
||||
ingestionPropertiesAsJson,
|
||||
batchLimit,
|
||||
requestId,
|
||||
autoCleanupTime
|
||||
requestId
|
||||
)
|
||||
|
||||
val sourceParameters = parseSourceParameters(parameters)
|
||||
|
@ -298,13 +294,10 @@ object KustoDataSourceUtils {
|
|||
|
||||
private[kusto] def getEngineUrlFromAliasIfNeeded(cluster: String): String = {
|
||||
if (cluster.startsWith(EnginePrefix)) {
|
||||
val uri = new URI(cluster)
|
||||
val host = uri.getHost
|
||||
val host = new URI(cluster).getHost
|
||||
if (host.startsWith(IngestPrefix)) {
|
||||
val startIdx = if (host.startsWith(IngestPrefix)) IngestPrefix.length else 0
|
||||
val uriBuilder = new URIBuilder()
|
||||
uriBuilder.setHost(s"${host.substring(startIdx, host.indexOf(".kusto."))}.kusto.windows.net")
|
||||
uriBuilder.setScheme("https").toString
|
||||
uriBuilder.setHost(IngestPrefix + host).toString
|
||||
} else {
|
||||
cluster
|
||||
}
|
||||
|
@ -549,23 +542,23 @@ object KustoDataSourceUtils {
|
|||
else None
|
||||
}
|
||||
|
||||
private[kusto] def countRows(client: Client, query: String, database: String, crp: ClientRequestProperties): Int = {
|
||||
val res = client.execute(database, generateCountQuery(query), crp).getPrimaryResults
|
||||
private[kusto] def countRows(client: Client, query: String, database: String): Int = {
|
||||
val res = client.execute(database, generateCountQuery(query)).getPrimaryResults
|
||||
res.next()
|
||||
res.getInt(0)
|
||||
}
|
||||
|
||||
private[kusto] def estimateRowsCount(client: Client, query: String, database: String, crp: ClientRequestProperties): Int = {
|
||||
private[kusto] def estimateRowsCount(client: Client, query: String, database: String): Int = {
|
||||
var count = 0
|
||||
val estimationResult: util.List[AnyRef] = Await.result(Future {
|
||||
val res = client.execute(database, generateEstimateRowsCountQuery(query), crp).getPrimaryResults
|
||||
val res = client.execute(database, generateEstimateRowsCountQuery(query)).getPrimaryResults
|
||||
res.next()
|
||||
res.getCurrentRow
|
||||
}, KustoConstants.TimeoutForCountCheck)
|
||||
if (StringUtils.isBlank(estimationResult.get(1).toString)) {
|
||||
// Estimation can be empty for certain cases
|
||||
Await.result(Future {
|
||||
val res = client.execute(database, generateCountQuery(query), crp).getPrimaryResults
|
||||
val res = client.execute(database, generateCountQuery(query)).getPrimaryResults
|
||||
res.next()
|
||||
res.getInt(0)
|
||||
}, KustoConstants.TimeoutForCountCheck)
|
||||
|
|
|
@ -2,7 +2,7 @@ package com.microsoft.kusto.spark
|
|||
|
||||
import java.util
|
||||
|
||||
import com.microsoft.azure.kusto.data.{ClientRequestProperties, KustoOperationResult, KustoResultSetTable}
|
||||
import com.microsoft.azure.kusto.data.{KustoOperationResult, KustoResultSetTable}
|
||||
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder
|
||||
import com.microsoft.kusto.spark.common.KustoCoordinates
|
||||
import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
|
||||
|
@ -16,8 +16,7 @@ import scala.collection.JavaConverters._
|
|||
@RunWith(classOf[JUnitRunner])
|
||||
class KustoClientTests extends FlatSpec with Matchers{
|
||||
class KustoClientStub (override val clusterAlias: String, override val engineKcsb: ConnectionStringBuilder, override val ingestKcsb: ConnectionStringBuilder, var tagsToReturn: util.ArrayList[String]) extends KustoClient(clusterAlias, engineKcsb, ingestKcsb){
|
||||
override def fetchTableExtentsTags(database: String, table: String, crp: ClientRequestProperties)
|
||||
: KustoResultSetTable = {
|
||||
override def fetchTableExtentsTags(database: String, table: String): KustoResultSetTable = {
|
||||
val response =
|
||||
s"""{"Tables":[{"TableName":"Table_0","Columns":[{"ColumnName":"Tags","DataType":"Object","ColumnType":"dynamic"}],
|
||||
"Rows":[[${if (tagsToReturn.isEmpty) "" else tagsToReturn.asScala.map(t=>"\""+t+"\"").asJava}]]}]}"""
|
||||
|
@ -30,20 +29,17 @@ class KustoClientTests extends FlatSpec with Matchers{
|
|||
val stubbedClient = new KustoClientStub("", null, null, null)
|
||||
stubbedClient.tagsToReturn = emptyTags
|
||||
val props = new SparkIngestionProperties
|
||||
val shouldIngestWhenNoTags = stubbedClient.shouldIngestData(KustoCoordinates("", "", "database", Some("table")),
|
||||
Some(props.toString), tableExists = true, null)
|
||||
val shouldIngestWhenNoTags = stubbedClient.shouldIngestData(KustoCoordinates("", "", "database", Some("table")), Some(props.toString), tableExists = true)
|
||||
|
||||
val tags = new util.ArrayList[String]
|
||||
tags.add("tag")
|
||||
stubbedClient.tagsToReturn = tags
|
||||
props.ingestIfNotExists = new util.ArrayList[String](){{add("otherTag")}}
|
||||
val shouldIngestWhenNoOverlap = stubbedClient.shouldIngestData(KustoCoordinates("", "","database", Some("table")),
|
||||
Some(props.toString), tableExists = true, null)
|
||||
val shouldIngestWhenNoOverlap = stubbedClient.shouldIngestData(KustoCoordinates("", "","database", Some("table")), Some(props.toString), tableExists = true)
|
||||
shouldIngestWhenNoOverlap shouldEqual true
|
||||
|
||||
tags.add("otherTag")
|
||||
val shouldIngestWhenOverlap = stubbedClient.shouldIngestData(KustoCoordinates("", "","database", Some("table")),
|
||||
Some(props.toString), tableExists = true, null)
|
||||
val shouldIngestWhenOverlap = stubbedClient.shouldIngestData(KustoCoordinates("", "","database", Some("table")), Some(props.toString), tableExists = true)
|
||||
shouldIngestWhenOverlap shouldEqual false
|
||||
}
|
||||
}
|
||||
|
|
|
@ -90,8 +90,6 @@ class KustoSourceTests extends FlatSpec with MockFactory with Matchers with Befo
|
|||
|
||||
assert(ingestUrl.equals(AliasAndAuth(alias,engineUrl,null).ingestUri))
|
||||
|
||||
val engine2 = KDSU.getEngineUrlFromAliasIfNeeded(ingestUrl)
|
||||
assert(engine2.equals(engineUrl))
|
||||
}
|
||||
|
||||
"KustoDataSource" should "match cluster custom domain url or aria old cluster" in {
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -13,12 +13,12 @@
|
|||
<!-- Spark dependencies -->
|
||||
<scala.version.major>2.12</scala.version.major>
|
||||
<scala.version.minor>11</scala.version.minor>
|
||||
<specs2.version>3.6.5</specs2.version>
|
||||
<spark.version.major>3.0</spark.version.major>
|
||||
<spark.version.minor>1</spark.version.minor>
|
||||
|
||||
<!-- other dependencies -->
|
||||
<kusto-sdk.version>2.7.0</kusto-sdk.version>
|
||||
<specs2.version>3.6.5</specs2.version>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<maven-source-plugin.version>3.0.1</maven-source-plugin.version>
|
||||
<maven-javadoc-plugin.version>3.1.0</maven-javadoc-plugin.version>
|
||||
|
|
Загрузка…
Ссылка в новой задаче