Fix bugs and upgrade to sdk v3.1.1 (#242)

* Fix bugs and upgrade to sdk v3.1.1

* revert

* revert test

* fix build

Co-authored-by: Ohad Bitton <ohbitton@microsoft.com>
This commit is contained in:
ohad bitton 2022-06-21 07:04:27 +03:00 коммит произвёл GitHub
Родитель 6801ebe51b
Коммит 5670ff3645
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
11 изменённых файлов: 106 добавлений и 65 удалений

Просмотреть файл

@ -1,28 +1,25 @@
package com.microsoft.kusto.spark.authentication
import java.util.concurrent.CompletableFuture
import java.util.concurrent.{CompletableFuture, TimeUnit}
import java.util.function.Consumer
import com.microsoft.aad.msal4j.{DeviceCode, DeviceCodeFlowParameters, IAuthenticationResult}
import com.microsoft.azure.kusto.data.auth
import scala.concurrent.TimeoutException
class DeviceAuthentication (val cluster: String, val authority:String) extends auth.DeviceAuthTokenProvider(cluster, authority) {
var deviceCode: Option[DeviceCode] = None
var currentDeviceCode: Option[DeviceCode] = None
var expiresAt: Option[Long] = None
var awaitAuthentication: Option[CompletableFuture[IAuthenticationResult]] = None
val NewDeviceCodeFetchTimeout = 5000
val Interval = 500
val NewDeviceCodeFetchTimeout = 60L * 1000L
var currentToken: Option[String] = None
override def acquireNewAccessToken(): IAuthenticationResult = {
awaitAuthentication = Some(acquireNewAccessTokenAsync())
awaitAuthentication.get.join()
acquireNewAccessTokenAsync().get(NewDeviceCodeFetchTimeout, TimeUnit.MILLISECONDS)
}
def acquireNewAccessTokenAsync(): CompletableFuture[IAuthenticationResult] = {
val deviceCodeConsumer: Consumer[DeviceCode] = (deviceCode: DeviceCode) => {
this.deviceCode = Some(deviceCode)
this.currentDeviceCode = Some(deviceCode)
expiresAt = Some(System.currentTimeMillis + (deviceCode.expiresIn() * 1000))
println(deviceCode.message())
}
@ -32,33 +29,24 @@ class DeviceAuthentication (val cluster: String, val authority:String) extends a
}
def refreshIfNeeded(): Unit = {
if (deviceCode.isEmpty || expiresAt.get <= System.currentTimeMillis) {
val oldDeviceCode = this.deviceCode
awaitAuthentication = Some(acquireNewAccessTokenAsync())
var awaitTime = NewDeviceCodeFetchTimeout
while (this.deviceCode == oldDeviceCode){
if (awaitTime <= 0) {
throw new TimeoutException("Timed out waiting for a new device code")
}
Thread.sleep(Interval)
awaitTime = awaitTime - Interval
}
if (currentDeviceCode.isEmpty || expiresAt.get <= System.currentTimeMillis) {
currentToken = Some(acquireAccessToken())
}
}
def getDeviceCodeMessage: String = {
refreshIfNeeded()
deviceCode.get.message()
currentDeviceCode.get.message()
}
def getDeviceCode: DeviceCode = {
refreshIfNeeded()
deviceCode.get
currentDeviceCode.get
}
def acquireToken(): String = {
refreshIfNeeded()
awaitAuthentication.get.join().accessToken()
currentToken.get
}
}

Просмотреть файл

@ -58,7 +58,10 @@ trait KustoOptions {
val KUSTO_REQUEST_ID: String = newOption("requestId")
}
case class KustoCoordinates(clusterUrl: String, clusterAlias: String, database: String, table: Option[String] = None)
case class KustoCoordinates(clusterUrl: String,
clusterAlias: String,
database: String,
table: Option[String] = None)
/** ******************************************************************************* */
/* NOTE!!! */

Просмотреть файл

@ -8,7 +8,7 @@ import java.util
import java.util.zip.GZIPOutputStream
import java.util.{TimeZone, UUID}
import com.microsoft.azure.kusto.data.ClientRequestProperties
import com.microsoft.azure.kusto.ingest.IngestionProperties.DATA_FORMAT
import com.microsoft.azure.kusto.ingest.IngestionProperties.DataFormat
import com.microsoft.azure.kusto.ingest.result.IngestionResult
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo
import com.microsoft.azure.kusto.ingest.{IngestClient, IngestionProperties}
@ -146,9 +146,9 @@ object KustoWriter {
import parameters._
val ingestionProperties = getIngestionProperties(writeOptions, parameters.coordinates.database, parameters.tmpTableName)
ingestionProperties.setDataFormat(DATA_FORMAT.csv.name)
ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.Table)
ingestionProperties.setReportLevel(IngestionProperties.IngestionReportLevel.FailuresAndSuccesses)
ingestionProperties.setDataFormat(DataFormat.CSV.name)
ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.TABLE)
ingestionProperties.setReportLevel(IngestionProperties.IngestionReportLevel.FAILURES_AND_SUCCESSES)
val tasks = ingestRows(rows, parameters, ingestClient, ingestionProperties, partitionsResults)

Просмотреть файл

@ -1,5 +1,7 @@
package com.microsoft.kusto.spark.datasink
import com.microsoft.azure.kusto.ingest.IngestionMapping.IngestionMappingKind
import java.util
import com.microsoft.azure.kusto.ingest.{IngestionMapping, IngestionProperties}
import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility
@ -55,11 +57,12 @@ class SparkIngestionProperties(var flushImmediately: Boolean = false,
}
if (this.csvMapping != null) {
additionalProperties.put("csvMapping", this.csvMapping)
additionalProperties.put("ingestionMapping", this.csvMapping)
additionalProperties.put("ingestionMappingType", IngestionMappingKind.CSV.getKustoValue)
}
if (this.csvMappingNameReference != null) {
ingestionProperties.setIngestionMapping(new IngestionMapping(this.csvMappingNameReference, IngestionMapping.IngestionMappingKind.Csv))
ingestionProperties.setIngestionMapping(new IngestionMapping(this.csvMappingNameReference, IngestionMapping.IngestionMappingKind.CSV))
}
ingestionProperties.setAdditionalProperties(additionalProperties)

Просмотреть файл

@ -72,7 +72,7 @@ class DefaultSource extends CreatableRelationProvider
val readOptions = KDSU.getReadParameters(parameters, sqlContext)
if (authenticationParameters.isEmpty) {
// Parse parameters if haven't got parsed before
val sourceParameters = KDSU.parseSourceParameters(parameters)
val sourceParameters = KDSU.parseSourceParameters(parameters, true)
initCommonParams(sourceParameters)
}

Просмотреть файл

@ -60,6 +60,10 @@ private[kusto] object CslCommandsGenerator {
s""".show materialized-views | where SourceTable == '$destinationTableName' | count"""
}
def generateIsTableEngineV3(tableName: String): String = {
s""".show table ${tableName} details | project todynamic(ShardingPolicy).UseShardEngine"""
}
def generateTableMoveExtentsCommand(sourceTableName: String, destinationTableName: String, batchSize: Int,
isDestinationTableMaterializedViewSource: Boolean = false): String = {
val setNewIngestionTime: String = if (isDestinationTableMaterializedViewSource) "with(SetNewIngestionTime=true)" else ""

Просмотреть файл

@ -5,7 +5,7 @@ import java.time.Instant
import java.util.StringJoiner
import java.util.concurrent.TimeUnit
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder
import com.microsoft.azure.kusto.data.exceptions.KustoDataException
import com.microsoft.azure.kusto.data.exceptions.KustoDataExceptionBase
import com.microsoft.azure.kusto.data.{Client, ClientFactory, ClientRequestProperties, KustoResultSetTable}
import com.microsoft.azure.kusto.ingest.result.{IngestionStatus, OperationStatus}
import com.microsoft.azure.kusto.ingest.{IngestClient, IngestClientFactory}
@ -164,6 +164,20 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil
(failed, error)
}
def shouldUseMaterializedViewFlag(database: String, targetTable: String, crp: ClientRequestProperties): Boolean ={
val isDestinationTableMaterializedViewSourceResult =
engineClient.execute(database, generateIsTableMaterializedViewSourceCommand(targetTable), crp).getPrimaryResults
isDestinationTableMaterializedViewSourceResult.next()
val isDestinationTableMaterializedViewSource: Boolean = isDestinationTableMaterializedViewSourceResult.getLong(0) > 0
if (isDestinationTableMaterializedViewSource){
val res = engineClient.execute(database, generateIsTableEngineV3(targetTable), crp)
res.next()
res.getPrimaryResults.getBoolean(0)
} else {
false
}
}
def moveExtentsWithRetries(batchSize: Int, totalAmount: Int, database: String, tmpTableName: String, targetTable: String,
crp: ClientRequestProperties, writeOptions: WriteOptions): Unit = {
var extentsProcessed = 0
@ -171,18 +185,15 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil
var curBatchSize = batchSize
var delayPeriodBetweenCalls = DelayPeriodBetweenCalls
var consecutiveSuccesses = 0
val useMaterializedViewFlag = shouldUseMaterializedViewFlag(database, targetTable, crp)
while (extentsProcessed < totalAmount) {
var error: Object = null
var res: Option[KustoResultSetTable] = None
var failed = false
// Execute move batch and keep any transient error for handling
try {
val isDestinationTableMaterializedViewSourceResult = engineClient.execute(database, generateIsTableMaterializedViewSourceCommand(targetTable), crp).getPrimaryResults
isDestinationTableMaterializedViewSourceResult.next()
val isDestinationTableMaterializedViewSource: Boolean = isDestinationTableMaterializedViewSourceResult.getLong(0) > 0
val operation = engineClient.execute(database, generateTableMoveExtentsAsyncCommand(tmpTableName,
targetTable, curBatchSize, isDestinationTableMaterializedViewSource), crp).getPrimaryResults
targetTable, curBatchSize, useMaterializedViewFlag), crp).getPrimaryResults
val operationResult = KDSU.verifyAsyncCommandCompletion(engineClient, database, operation, samplePeriod =
KustoConstants
.DefaultPeriodicSamplePeriod, writeOptions.timeout, s"move extents to destination table '$targetTable' ")
@ -208,7 +219,7 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil
} else {
throw ex
}
case ex:KustoDataException =>
case ex:KustoDataExceptionBase =>
if (ex.getCause.isInstanceOf[SocketTimeoutException] || !ex.isPermanent) {
error = ExceptionUtils.getStackTrace(ex)
failed = true
@ -256,7 +267,7 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil
def moveExtents(database: String, tmpTableName: String, targetTable: String, crp: ClientRequestProperties,
writeOptions: WriteOptions): Unit = {
val extentsCountQuery = engineClient.execute(database, generateExtentsCountCommand(tmpTableName), crp).getPrimaryResults
val extentsCountQuery = engineClient.execute(database, generateExtentsCountCommand(tmpTableName), crp).getPrimaryResults
extentsCountQuery.next()
val extentsCount = extentsCountQuery.getInt(0)
if (extentsCount > writeOptions.minimalExtentsCountForSplitMerge) {
@ -354,7 +365,11 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil
KDSU.reportExceptionAndThrow(
myName,
ex,
"Trying to poll on pending ingestions", coordinates.clusterUrl, coordinates.database, coordinates.table.getOrElse("Unspecified table name")
"Trying to poll on pending ingestions",
coordinates.clusterUrl,
coordinates.database,
coordinates.table.getOrElse("Unspecified table name"),
writeOptions.requestId
)
} finally {
cleanupIngestionByProducts(coordinates.database, kustoAdminClient, tmpTableName, crp)

Просмотреть файл

@ -142,23 +142,7 @@ object KustoDataSourceUtils {
KustoResponseDeserializer(client.execute(database, query, clientRequestProperties.orNull).getPrimaryResults).getSchema
}
def parseSourceParameters(parameters: Map[String, String]): SourceParameters = {
// Parse KustoTableCoordinates - these are mandatory options
val database = parameters.get(KustoSourceOptions.KUSTO_DATABASE)
val cluster = parameters.get(KustoSourceOptions.KUSTO_CLUSTER)
if (database.isEmpty) {
throw new InvalidParameterException("KUSTO_DATABASE parameter is missing. Must provide a destination database name")
}
if (cluster.isEmpty) {
throw new InvalidParameterException("KUSTO_CLUSTER parameter is missing. Must provide a destination cluster name")
}
val alias = getClusterNameFromUrlIfNeeded(cluster.get.toLowerCase())
val clusterUrl = getEngineUrlFromAliasIfNeeded(cluster.get.toLowerCase())
val table = parameters.get(KustoSinkOptions.KUSTO_TABLE)
val requestId: String = parameters.getOrElse(KustoSinkOptions.KUSTO_REQUEST_ID, UUID.randomUUID().toString)
val clientRequestProperties = getClientRequestProperties(parameters, requestId)
private def parseAuthentication(parameters: Map[String, String], clusterUrl:String) = {
// Parse KustoAuthentication
val applicationId = parameters.getOrElse(KustoSourceOptions.KUSTO_AAD_APP_ID, "")
val applicationKey = parameters.getOrElse(KustoSourceOptions.KUSTO_AAD_APP_SECRET, "")
@ -231,8 +215,40 @@ object KustoDataSourceUtils {
authentication = KustoAccessTokenAuthentication(accessToken)
}
}
(authentication, keyVaultAuthentication)
}
SourceParameters(authentication, KustoCoordinates(clusterUrl, alias, database.get, table),
def parseSourceParameters(parameters: Map[String, String], allowProxy: Boolean): SourceParameters = {
// Parse KustoTableCoordinates - these are mandatory options
val database = parameters.get(KustoSourceOptions.KUSTO_DATABASE)
val cluster = parameters.get(KustoSourceOptions.KUSTO_CLUSTER)
if (database.isEmpty) {
throw new InvalidParameterException("KUSTO_DATABASE parameter is missing. Must provide a destination database name")
}
if (cluster.isEmpty) {
throw new InvalidParameterException("KUSTO_CLUSTER parameter is missing. Must provide a destination cluster name")
}
var alias = cluster
var clusterUrl = cluster
try {
alias = Some(getClusterNameFromUrlIfNeeded(cluster.get.toLowerCase()))
clusterUrl = Some(getEngineUrlFromAliasIfNeeded(cluster.get.toLowerCase()))
} catch {
case e: Exception =>
if (!allowProxy) {
throw e
}
}
val table = parameters.get(KustoSinkOptions.KUSTO_TABLE)
val requestId: String = parameters.getOrElse(KustoSinkOptions.KUSTO_REQUEST_ID, UUID.randomUUID().toString)
val clientRequestProperties = getClientRequestProperties(parameters, requestId)
val (authentication,keyVaultAuthentication) = parseAuthentication(parameters, clusterUrl.get)
SourceParameters(authentication, KustoCoordinates(clusterUrl.get, alias.get, database.get, table),
keyVaultAuthentication, requestId, clientRequestProperties)
}
@ -281,7 +297,7 @@ object KustoDataSourceUtils {
val ingestionPropertiesAsJson = parameters.get(KustoSinkOptions.KUSTO_SPARK_INGESTION_PROPERTIES_JSON)
val sourceParameters = parseSourceParameters(parameters)
val sourceParameters = parseSourceParameters(parameters, allowProxy = false)
val writeOptions = WriteOptions(
tableCreation,
@ -293,8 +309,8 @@ object KustoDataSourceUtils {
batchLimit,
sourceParameters.requestId,
autoCleanupTime,
minimalExtentsCountForSplitMergePerNode,
maxRetriesOnMoveExtents,
minimalExtentsCountForSplitMergePerNode,
adjustSchema
)
@ -330,18 +346,20 @@ object KustoDataSourceUtils {
cluster: String = "",
database: String = "",
table: String = "",
requestId: String = "",
shouldNotThrow: Boolean = false): Unit = {
val whatFailed = if (doingWhat.isEmpty) "" else s"when $doingWhat"
val clusterDesc = if (cluster.isEmpty) "" else s", cluster: '$cluster' "
val databaseDesc = if (database.isEmpty) "" else s", database: '$database'"
val tableDesc = if (table.isEmpty) "" else s", table: '$table'"
val requestIdDesc = if (requestId.isEmpty) "" else s", requestId: '$requestId'"
if (!shouldNotThrow) {
logError(reporter, s"caught exception $whatFailed$clusterDesc$databaseDesc$tableDesc.${NewLine}EXCEPTION: ${ExceptionUtils.getStackTrace(exception)}")
logError(reporter, s"caught exception $whatFailed$clusterDesc$databaseDesc$tableDesc$requestIdDesc.${NewLine}EXCEPTION: ${ExceptionUtils.getStackTrace(exception)}")
throw exception
}
logWarn(reporter, s"caught exception $whatFailed$clusterDesc$databaseDesc$tableDesc, exception ignored.${NewLine}EXCEPTION: ${ExceptionUtils.getStackTrace(exception)}")
logWarn(reporter, s"caught exception $whatFailed$clusterDesc$databaseDesc$tableDesc$requestIdDesc, exception ignored.${NewLine}EXCEPTION: ${ExceptionUtils.getStackTrace(exception)}")
}
private[kusto] def getClusterNameFromUrlIfNeeded(cluster: String): String = {
@ -419,6 +437,7 @@ object KustoDataSourceUtils {
if (!stopCondition.apply(res)) {
finalWork.apply(res)
while (latch.getCount > 0) latch.countDown()
t.cancel()
} else {
currentWaitTime = if (currentWaitTime + currentWaitTime > maxWaitTimeBetweenCalls) maxWaitTimeBetweenCalls else currentWaitTime + currentWaitTime
t.schedule(new ExponentialBackoffTask(), currentWaitTime)
@ -426,6 +445,7 @@ object KustoDataSourceUtils {
} catch {
case exception: Exception =>
while (latch.getCount > 0) latch.countDown()
t.cancel()
throw exception
}
}
@ -433,6 +453,7 @@ object KustoDataSourceUtils {
val task: TimerTask = new ExponentialBackoffTask()
t.schedule(task, delayBeforeStart)
latch
}

Просмотреть файл

@ -68,7 +68,14 @@ class KustoAuthenticationTestE2E extends FlatSpec {
val rows: immutable.IndexedSeq[(String, Int)] = (1 to expectedNumberOfRows).map(v => (s"row-$v", v))
val prefix = "deviceAuthentication"
val table = KustoQueryUtils.simplifyName(s"${prefix}_${UUID.randomUUID()}")
val engineKcsb = ConnectionStringBuilder.createWithAadApplicationCredentials(s"https://$cluster.kusto.windows.net", appId, appKey, authority)
val deviceAuth = new com.microsoft.kusto.spark.authentication.DeviceAuthentication(
s"https://${cluster}.kusto.windows.net",
authority)
val token = deviceAuth.acquireToken()
val engineKcsb = ConnectionStringBuilder.createWithAadAccessTokenAuthentication(s"https://$cluster.kusto.windows.net", token)
val kustoAdminClient = ClientFactory.createClient(engineKcsb)
val df = rows.toDF("name", "value")

Просмотреть файл

@ -17,7 +17,7 @@
<spark.version.minor>1</spark.version.minor>
<!-- other dependencies -->
<kusto-sdk.version>2.8.2</kusto-sdk.version>
<kusto-sdk.version>3.1.1</kusto-sdk.version>
<specs2.version>3.6.5</specs2.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven-source-plugin.version>3.0.1</maven-source-plugin.version>

Просмотреть файл

@ -155,7 +155,7 @@ kustoQ.start().awaitTermination(60*8)
deviceAuth = sc._jvm.com.microsoft.kusto.spark.authentication.DeviceAuthentication(
"https://{clusterAlias}.kusto.windows.net".format(clusterAlias=kustoOptions["kustoCluster"]),
"common")
kustoOptions["kustoAadAuthorityID"])
deviceCodeMessage = deviceAuth.getDeviceCodeMessage()
print(deviceCodeMessage)
token = deviceAuth.acquireToken()