From 5670ff364520d31328414ccaa81aa78981f2e74c Mon Sep 17 00:00:00 2001 From: ohad bitton <32278684+ohadbitt@users.noreply.github.com> Date: Tue, 21 Jun 2022 07:04:27 +0300 Subject: [PATCH] Fix bugs and upgrade to sdk v3.1.1 (#242) * Fix bugs and upgrade to sdk v3.1.1 * revert * revert test * fix build Co-authored-by: Ohad Bitton --- .../authentication/DeviceAuthentication.scala | 34 ++++------ .../kusto/spark/common/KustoOptions.scala | 5 +- .../kusto/spark/datasink/KustoWriter.scala | 8 +-- .../datasink/SparkIngestionProperties.scala | 7 +- .../spark/datasource/DefaultSource.scala | 2 +- .../spark/utils/CslCommandsGenerator.scala | 4 ++ .../kusto/spark/utils/KustoClient.scala | 33 +++++++--- .../spark/utils/KustoDataSourceUtils.scala | 65 ++++++++++++------- .../spark/KustoAuthenticationTestE2E.scala | 9 ++- pom.xml | 2 +- samples/src/main/python/pyKusto.py | 2 +- 11 files changed, 106 insertions(+), 65 deletions(-) diff --git a/connector/src/main/scala/com/microsoft/kusto/spark/authentication/DeviceAuthentication.scala b/connector/src/main/scala/com/microsoft/kusto/spark/authentication/DeviceAuthentication.scala index 8ff742c..0734b0b 100644 --- a/connector/src/main/scala/com/microsoft/kusto/spark/authentication/DeviceAuthentication.scala +++ b/connector/src/main/scala/com/microsoft/kusto/spark/authentication/DeviceAuthentication.scala @@ -1,28 +1,25 @@ package com.microsoft.kusto.spark.authentication -import java.util.concurrent.CompletableFuture +import java.util.concurrent.{CompletableFuture, TimeUnit} import java.util.function.Consumer - import com.microsoft.aad.msal4j.{DeviceCode, DeviceCodeFlowParameters, IAuthenticationResult} import com.microsoft.azure.kusto.data.auth import scala.concurrent.TimeoutException class DeviceAuthentication (val cluster: String, val authority:String) extends auth.DeviceAuthTokenProvider(cluster, authority) { - var deviceCode: Option[DeviceCode] = None + var currentDeviceCode: Option[DeviceCode] = None var expiresAt: Option[Long] = None - var awaitAuthentication: Option[CompletableFuture[IAuthenticationResult]] = None - val NewDeviceCodeFetchTimeout = 5000 - val Interval = 500 + val NewDeviceCodeFetchTimeout = 60L * 1000L + var currentToken: Option[String] = None override def acquireNewAccessToken(): IAuthenticationResult = { - awaitAuthentication = Some(acquireNewAccessTokenAsync()) - awaitAuthentication.get.join() + acquireNewAccessTokenAsync().get(NewDeviceCodeFetchTimeout, TimeUnit.MILLISECONDS) } def acquireNewAccessTokenAsync(): CompletableFuture[IAuthenticationResult] = { val deviceCodeConsumer: Consumer[DeviceCode] = (deviceCode: DeviceCode) => { - this.deviceCode = Some(deviceCode) + this.currentDeviceCode = Some(deviceCode) expiresAt = Some(System.currentTimeMillis + (deviceCode.expiresIn() * 1000)) println(deviceCode.message()) } @@ -32,33 +29,24 @@ class DeviceAuthentication (val cluster: String, val authority:String) extends a } def refreshIfNeeded(): Unit = { - if (deviceCode.isEmpty || expiresAt.get <= System.currentTimeMillis) { - val oldDeviceCode = this.deviceCode - awaitAuthentication = Some(acquireNewAccessTokenAsync()) - var awaitTime = NewDeviceCodeFetchTimeout - while (this.deviceCode == oldDeviceCode){ - if (awaitTime <= 0) { - throw new TimeoutException("Timed out waiting for a new device code") - } - Thread.sleep(Interval) - awaitTime = awaitTime - Interval - } + if (currentDeviceCode.isEmpty || expiresAt.get <= System.currentTimeMillis) { + currentToken = Some(acquireAccessToken()) } } def getDeviceCodeMessage: String = { refreshIfNeeded() - deviceCode.get.message() + currentDeviceCode.get.message() } def getDeviceCode: DeviceCode = { refreshIfNeeded() - deviceCode.get + currentDeviceCode.get } def acquireToken(): String = { refreshIfNeeded() - awaitAuthentication.get.join().accessToken() + currentToken.get } } diff --git a/connector/src/main/scala/com/microsoft/kusto/spark/common/KustoOptions.scala b/connector/src/main/scala/com/microsoft/kusto/spark/common/KustoOptions.scala index 877bd6c..e05de55 100644 --- a/connector/src/main/scala/com/microsoft/kusto/spark/common/KustoOptions.scala +++ b/connector/src/main/scala/com/microsoft/kusto/spark/common/KustoOptions.scala @@ -58,7 +58,10 @@ trait KustoOptions { val KUSTO_REQUEST_ID: String = newOption("requestId") } -case class KustoCoordinates(clusterUrl: String, clusterAlias: String, database: String, table: Option[String] = None) +case class KustoCoordinates(clusterUrl: String, + clusterAlias: String, + database: String, + table: Option[String] = None) /** ******************************************************************************* */ /* NOTE!!! */ diff --git a/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala b/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala index b7a07a0..70e2da0 100644 --- a/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala +++ b/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala @@ -8,7 +8,7 @@ import java.util import java.util.zip.GZIPOutputStream import java.util.{TimeZone, UUID} import com.microsoft.azure.kusto.data.ClientRequestProperties -import com.microsoft.azure.kusto.ingest.IngestionProperties.DATA_FORMAT +import com.microsoft.azure.kusto.ingest.IngestionProperties.DataFormat import com.microsoft.azure.kusto.ingest.result.IngestionResult import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo import com.microsoft.azure.kusto.ingest.{IngestClient, IngestionProperties} @@ -146,9 +146,9 @@ object KustoWriter { import parameters._ val ingestionProperties = getIngestionProperties(writeOptions, parameters.coordinates.database, parameters.tmpTableName) - ingestionProperties.setDataFormat(DATA_FORMAT.csv.name) - ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.Table) - ingestionProperties.setReportLevel(IngestionProperties.IngestionReportLevel.FailuresAndSuccesses) + ingestionProperties.setDataFormat(DataFormat.CSV.name) + ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.TABLE) + ingestionProperties.setReportLevel(IngestionProperties.IngestionReportLevel.FAILURES_AND_SUCCESSES) val tasks = ingestRows(rows, parameters, ingestClient, ingestionProperties, partitionsResults) diff --git a/connector/src/main/scala/com/microsoft/kusto/spark/datasink/SparkIngestionProperties.scala b/connector/src/main/scala/com/microsoft/kusto/spark/datasink/SparkIngestionProperties.scala index 3fc487f..a5fcdd6 100644 --- a/connector/src/main/scala/com/microsoft/kusto/spark/datasink/SparkIngestionProperties.scala +++ b/connector/src/main/scala/com/microsoft/kusto/spark/datasink/SparkIngestionProperties.scala @@ -1,5 +1,7 @@ package com.microsoft.kusto.spark.datasink +import com.microsoft.azure.kusto.ingest.IngestionMapping.IngestionMappingKind + import java.util import com.microsoft.azure.kusto.ingest.{IngestionMapping, IngestionProperties} import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility @@ -55,11 +57,12 @@ class SparkIngestionProperties(var flushImmediately: Boolean = false, } if (this.csvMapping != null) { - additionalProperties.put("csvMapping", this.csvMapping) + additionalProperties.put("ingestionMapping", this.csvMapping) + additionalProperties.put("ingestionMappingType", IngestionMappingKind.CSV.getKustoValue) } if (this.csvMappingNameReference != null) { - ingestionProperties.setIngestionMapping(new IngestionMapping(this.csvMappingNameReference, IngestionMapping.IngestionMappingKind.Csv)) + ingestionProperties.setIngestionMapping(new IngestionMapping(this.csvMappingNameReference, IngestionMapping.IngestionMappingKind.CSV)) } ingestionProperties.setAdditionalProperties(additionalProperties) diff --git a/connector/src/main/scala/com/microsoft/kusto/spark/datasource/DefaultSource.scala b/connector/src/main/scala/com/microsoft/kusto/spark/datasource/DefaultSource.scala index 9ee392f..444b7b5 100644 --- a/connector/src/main/scala/com/microsoft/kusto/spark/datasource/DefaultSource.scala +++ b/connector/src/main/scala/com/microsoft/kusto/spark/datasource/DefaultSource.scala @@ -72,7 +72,7 @@ class DefaultSource extends CreatableRelationProvider val readOptions = KDSU.getReadParameters(parameters, sqlContext) if (authenticationParameters.isEmpty) { // Parse parameters if haven't got parsed before - val sourceParameters = KDSU.parseSourceParameters(parameters) + val sourceParameters = KDSU.parseSourceParameters(parameters, true) initCommonParams(sourceParameters) } diff --git a/connector/src/main/scala/com/microsoft/kusto/spark/utils/CslCommandsGenerator.scala b/connector/src/main/scala/com/microsoft/kusto/spark/utils/CslCommandsGenerator.scala index 2b30bd7..f53ad67 100644 --- a/connector/src/main/scala/com/microsoft/kusto/spark/utils/CslCommandsGenerator.scala +++ b/connector/src/main/scala/com/microsoft/kusto/spark/utils/CslCommandsGenerator.scala @@ -60,6 +60,10 @@ private[kusto] object CslCommandsGenerator { s""".show materialized-views | where SourceTable == '$destinationTableName' | count""" } + def generateIsTableEngineV3(tableName: String): String = { + s""".show table ${tableName} details | project todynamic(ShardingPolicy).UseShardEngine""" + } + def generateTableMoveExtentsCommand(sourceTableName: String, destinationTableName: String, batchSize: Int, isDestinationTableMaterializedViewSource: Boolean = false): String = { val setNewIngestionTime: String = if (isDestinationTableMaterializedViewSource) "with(SetNewIngestionTime=true)" else "" diff --git a/connector/src/main/scala/com/microsoft/kusto/spark/utils/KustoClient.scala b/connector/src/main/scala/com/microsoft/kusto/spark/utils/KustoClient.scala index 9ab9d60..ce0fade 100644 --- a/connector/src/main/scala/com/microsoft/kusto/spark/utils/KustoClient.scala +++ b/connector/src/main/scala/com/microsoft/kusto/spark/utils/KustoClient.scala @@ -5,7 +5,7 @@ import java.time.Instant import java.util.StringJoiner import java.util.concurrent.TimeUnit import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder -import com.microsoft.azure.kusto.data.exceptions.KustoDataException +import com.microsoft.azure.kusto.data.exceptions.KustoDataExceptionBase import com.microsoft.azure.kusto.data.{Client, ClientFactory, ClientRequestProperties, KustoResultSetTable} import com.microsoft.azure.kusto.ingest.result.{IngestionStatus, OperationStatus} import com.microsoft.azure.kusto.ingest.{IngestClient, IngestClientFactory} @@ -164,6 +164,20 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil (failed, error) } + def shouldUseMaterializedViewFlag(database: String, targetTable: String, crp: ClientRequestProperties): Boolean ={ + val isDestinationTableMaterializedViewSourceResult = + engineClient.execute(database, generateIsTableMaterializedViewSourceCommand(targetTable), crp).getPrimaryResults + isDestinationTableMaterializedViewSourceResult.next() + val isDestinationTableMaterializedViewSource: Boolean = isDestinationTableMaterializedViewSourceResult.getLong(0) > 0 + if (isDestinationTableMaterializedViewSource){ + val res = engineClient.execute(database, generateIsTableEngineV3(targetTable), crp) + res.next() + res.getPrimaryResults.getBoolean(0) + } else { + false + } + } + def moveExtentsWithRetries(batchSize: Int, totalAmount: Int, database: String, tmpTableName: String, targetTable: String, crp: ClientRequestProperties, writeOptions: WriteOptions): Unit = { var extentsProcessed = 0 @@ -171,18 +185,15 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil var curBatchSize = batchSize var delayPeriodBetweenCalls = DelayPeriodBetweenCalls var consecutiveSuccesses = 0 + val useMaterializedViewFlag = shouldUseMaterializedViewFlag(database, targetTable, crp) while (extentsProcessed < totalAmount) { var error: Object = null var res: Option[KustoResultSetTable] = None var failed = false // Execute move batch and keep any transient error for handling try { - val isDestinationTableMaterializedViewSourceResult = engineClient.execute(database, generateIsTableMaterializedViewSourceCommand(targetTable), crp).getPrimaryResults - isDestinationTableMaterializedViewSourceResult.next() - val isDestinationTableMaterializedViewSource: Boolean = isDestinationTableMaterializedViewSourceResult.getLong(0) > 0 - val operation = engineClient.execute(database, generateTableMoveExtentsAsyncCommand(tmpTableName, - targetTable, curBatchSize, isDestinationTableMaterializedViewSource), crp).getPrimaryResults + targetTable, curBatchSize, useMaterializedViewFlag), crp).getPrimaryResults val operationResult = KDSU.verifyAsyncCommandCompletion(engineClient, database, operation, samplePeriod = KustoConstants .DefaultPeriodicSamplePeriod, writeOptions.timeout, s"move extents to destination table '$targetTable' ") @@ -208,7 +219,7 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil } else { throw ex } - case ex:KustoDataException => + case ex:KustoDataExceptionBase => if (ex.getCause.isInstanceOf[SocketTimeoutException] || !ex.isPermanent) { error = ExceptionUtils.getStackTrace(ex) failed = true @@ -256,7 +267,7 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil def moveExtents(database: String, tmpTableName: String, targetTable: String, crp: ClientRequestProperties, writeOptions: WriteOptions): Unit = { - val extentsCountQuery = engineClient.execute(database, generateExtentsCountCommand(tmpTableName), crp).getPrimaryResults + val extentsCountQuery = engineClient.execute(database, generateExtentsCountCommand(tmpTableName), crp).getPrimaryResults extentsCountQuery.next() val extentsCount = extentsCountQuery.getInt(0) if (extentsCount > writeOptions.minimalExtentsCountForSplitMerge) { @@ -354,7 +365,11 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil KDSU.reportExceptionAndThrow( myName, ex, - "Trying to poll on pending ingestions", coordinates.clusterUrl, coordinates.database, coordinates.table.getOrElse("Unspecified table name") + "Trying to poll on pending ingestions", + coordinates.clusterUrl, + coordinates.database, + coordinates.table.getOrElse("Unspecified table name"), + writeOptions.requestId ) } finally { cleanupIngestionByProducts(coordinates.database, kustoAdminClient, tmpTableName, crp) diff --git a/connector/src/main/scala/com/microsoft/kusto/spark/utils/KustoDataSourceUtils.scala b/connector/src/main/scala/com/microsoft/kusto/spark/utils/KustoDataSourceUtils.scala index cbf4f5e..6af431b 100644 --- a/connector/src/main/scala/com/microsoft/kusto/spark/utils/KustoDataSourceUtils.scala +++ b/connector/src/main/scala/com/microsoft/kusto/spark/utils/KustoDataSourceUtils.scala @@ -142,23 +142,7 @@ object KustoDataSourceUtils { KustoResponseDeserializer(client.execute(database, query, clientRequestProperties.orNull).getPrimaryResults).getSchema } - def parseSourceParameters(parameters: Map[String, String]): SourceParameters = { - // Parse KustoTableCoordinates - these are mandatory options - val database = parameters.get(KustoSourceOptions.KUSTO_DATABASE) - val cluster = parameters.get(KustoSourceOptions.KUSTO_CLUSTER) - - if (database.isEmpty) { - throw new InvalidParameterException("KUSTO_DATABASE parameter is missing. Must provide a destination database name") - } - - if (cluster.isEmpty) { - throw new InvalidParameterException("KUSTO_CLUSTER parameter is missing. Must provide a destination cluster name") - } - val alias = getClusterNameFromUrlIfNeeded(cluster.get.toLowerCase()) - val clusterUrl = getEngineUrlFromAliasIfNeeded(cluster.get.toLowerCase()) - val table = parameters.get(KustoSinkOptions.KUSTO_TABLE) - val requestId: String = parameters.getOrElse(KustoSinkOptions.KUSTO_REQUEST_ID, UUID.randomUUID().toString) - val clientRequestProperties = getClientRequestProperties(parameters, requestId) + private def parseAuthentication(parameters: Map[String, String], clusterUrl:String) = { // Parse KustoAuthentication val applicationId = parameters.getOrElse(KustoSourceOptions.KUSTO_AAD_APP_ID, "") val applicationKey = parameters.getOrElse(KustoSourceOptions.KUSTO_AAD_APP_SECRET, "") @@ -231,8 +215,40 @@ object KustoDataSourceUtils { authentication = KustoAccessTokenAuthentication(accessToken) } } + (authentication, keyVaultAuthentication) + } - SourceParameters(authentication, KustoCoordinates(clusterUrl, alias, database.get, table), + def parseSourceParameters(parameters: Map[String, String], allowProxy: Boolean): SourceParameters = { + // Parse KustoTableCoordinates - these are mandatory options + val database = parameters.get(KustoSourceOptions.KUSTO_DATABASE) + val cluster = parameters.get(KustoSourceOptions.KUSTO_CLUSTER) + + if (database.isEmpty) { + throw new InvalidParameterException("KUSTO_DATABASE parameter is missing. Must provide a destination database name") + } + + if (cluster.isEmpty) { + throw new InvalidParameterException("KUSTO_CLUSTER parameter is missing. Must provide a destination cluster name") + } + + var alias = cluster + var clusterUrl = cluster + try { + alias = Some(getClusterNameFromUrlIfNeeded(cluster.get.toLowerCase())) + clusterUrl = Some(getEngineUrlFromAliasIfNeeded(cluster.get.toLowerCase())) + } catch { + case e: Exception => + if (!allowProxy) { + throw e + } + } + val table = parameters.get(KustoSinkOptions.KUSTO_TABLE) + val requestId: String = parameters.getOrElse(KustoSinkOptions.KUSTO_REQUEST_ID, UUID.randomUUID().toString) + val clientRequestProperties = getClientRequestProperties(parameters, requestId) + + val (authentication,keyVaultAuthentication) = parseAuthentication(parameters, clusterUrl.get) + + SourceParameters(authentication, KustoCoordinates(clusterUrl.get, alias.get, database.get, table), keyVaultAuthentication, requestId, clientRequestProperties) } @@ -281,7 +297,7 @@ object KustoDataSourceUtils { val ingestionPropertiesAsJson = parameters.get(KustoSinkOptions.KUSTO_SPARK_INGESTION_PROPERTIES_JSON) - val sourceParameters = parseSourceParameters(parameters) + val sourceParameters = parseSourceParameters(parameters, allowProxy = false) val writeOptions = WriteOptions( tableCreation, @@ -293,8 +309,8 @@ object KustoDataSourceUtils { batchLimit, sourceParameters.requestId, autoCleanupTime, - minimalExtentsCountForSplitMergePerNode, maxRetriesOnMoveExtents, + minimalExtentsCountForSplitMergePerNode, adjustSchema ) @@ -330,18 +346,20 @@ object KustoDataSourceUtils { cluster: String = "", database: String = "", table: String = "", + requestId: String = "", shouldNotThrow: Boolean = false): Unit = { val whatFailed = if (doingWhat.isEmpty) "" else s"when $doingWhat" val clusterDesc = if (cluster.isEmpty) "" else s", cluster: '$cluster' " val databaseDesc = if (database.isEmpty) "" else s", database: '$database'" val tableDesc = if (table.isEmpty) "" else s", table: '$table'" + val requestIdDesc = if (requestId.isEmpty) "" else s", requestId: '$requestId'" if (!shouldNotThrow) { - logError(reporter, s"caught exception $whatFailed$clusterDesc$databaseDesc$tableDesc.${NewLine}EXCEPTION: ${ExceptionUtils.getStackTrace(exception)}") + logError(reporter, s"caught exception $whatFailed$clusterDesc$databaseDesc$tableDesc$requestIdDesc.${NewLine}EXCEPTION: ${ExceptionUtils.getStackTrace(exception)}") throw exception } - logWarn(reporter, s"caught exception $whatFailed$clusterDesc$databaseDesc$tableDesc, exception ignored.${NewLine}EXCEPTION: ${ExceptionUtils.getStackTrace(exception)}") + logWarn(reporter, s"caught exception $whatFailed$clusterDesc$databaseDesc$tableDesc$requestIdDesc, exception ignored.${NewLine}EXCEPTION: ${ExceptionUtils.getStackTrace(exception)}") } private[kusto] def getClusterNameFromUrlIfNeeded(cluster: String): String = { @@ -419,6 +437,7 @@ object KustoDataSourceUtils { if (!stopCondition.apply(res)) { finalWork.apply(res) while (latch.getCount > 0) latch.countDown() + t.cancel() } else { currentWaitTime = if (currentWaitTime + currentWaitTime > maxWaitTimeBetweenCalls) maxWaitTimeBetweenCalls else currentWaitTime + currentWaitTime t.schedule(new ExponentialBackoffTask(), currentWaitTime) @@ -426,6 +445,7 @@ object KustoDataSourceUtils { } catch { case exception: Exception => while (latch.getCount > 0) latch.countDown() + t.cancel() throw exception } } @@ -433,6 +453,7 @@ object KustoDataSourceUtils { val task: TimerTask = new ExponentialBackoffTask() t.schedule(task, delayBeforeStart) + latch } diff --git a/connector/src/test/scala/com/microsoft/kusto/spark/KustoAuthenticationTestE2E.scala b/connector/src/test/scala/com/microsoft/kusto/spark/KustoAuthenticationTestE2E.scala index e51818d..54d76cb 100644 --- a/connector/src/test/scala/com/microsoft/kusto/spark/KustoAuthenticationTestE2E.scala +++ b/connector/src/test/scala/com/microsoft/kusto/spark/KustoAuthenticationTestE2E.scala @@ -68,7 +68,14 @@ class KustoAuthenticationTestE2E extends FlatSpec { val rows: immutable.IndexedSeq[(String, Int)] = (1 to expectedNumberOfRows).map(v => (s"row-$v", v)) val prefix = "deviceAuthentication" val table = KustoQueryUtils.simplifyName(s"${prefix}_${UUID.randomUUID()}") - val engineKcsb = ConnectionStringBuilder.createWithAadApplicationCredentials(s"https://$cluster.kusto.windows.net", appId, appKey, authority) + + val deviceAuth = new com.microsoft.kusto.spark.authentication.DeviceAuthentication( + s"https://${cluster}.kusto.windows.net", + authority) + + val token = deviceAuth.acquireToken() + + val engineKcsb = ConnectionStringBuilder.createWithAadAccessTokenAuthentication(s"https://$cluster.kusto.windows.net", token) val kustoAdminClient = ClientFactory.createClient(engineKcsb) val df = rows.toDF("name", "value") diff --git a/pom.xml b/pom.xml index 947ea41..bc80b7f 100644 --- a/pom.xml +++ b/pom.xml @@ -17,7 +17,7 @@ 1 - 2.8.2 + 3.1.1 3.6.5 UTF-8 3.0.1 diff --git a/samples/src/main/python/pyKusto.py b/samples/src/main/python/pyKusto.py index b78d098..9a459e1 100644 --- a/samples/src/main/python/pyKusto.py +++ b/samples/src/main/python/pyKusto.py @@ -155,7 +155,7 @@ kustoQ.start().awaitTermination(60*8) deviceAuth = sc._jvm.com.microsoft.kusto.spark.authentication.DeviceAuthentication( "https://{clusterAlias}.kusto.windows.net".format(clusterAlias=kustoOptions["kustoCluster"]), - "common") + kustoOptions["kustoAadAuthorityID"]) deviceCodeMessage = deviceAuth.getDeviceCodeMessage() print(deviceCodeMessage) token = deviceAuth.acquireToken()