ClientRequestPropertiesAndIngestionProperties (#68)

* ClientRequestPropertiesAndIngestionProperties

* ClientRequestProperties And IngestionProperties

* split KustoOptions to KustoSourceOPtions and KustoSinkOptions

* fix maven compilation failure

* move kustoOptions to common package
This commit is contained in:
ohadbitt 2019-06-05 17:50:19 +03:00 коммит произвёл GitHub
Родитель bd0b3ca39f
Коммит 8f755a4291
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
31 изменённых файлов: 632 добавлений и 449 удалений

Просмотреть файл

@ -95,7 +95,7 @@ These libraries include:
Spark Azure Data Explorer connector takes dependency on [Azure Data Explorer Data Client Library](https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-data)
and [Azure Data Explorer Ingest Client Library](https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-ingest),
available on maven repository.
When [Key Vault based authentication](Authentication.md) is used, there is an additional dependency
When [Key Vault based authentication](./docs/Authentication.md) is used, there is an additional dependency
on [Microsoft Azure SDK For Key Vault](https://mvnrepository.com/artifact/com.microsoft.azure/azure-keyvault).
> **Note:** When working with Databricks, Azure Data Explorer connector requires Azure Data Explorer java client libraries (and azure key-vault library if used) to be installed.
@ -118,6 +118,8 @@ Here is a list of currently available client libraries for Azure Data Explorer:
- [Python](https://github.com/azure/azure-kusto-python)
- [.NET](https://docs.microsoft.com/en-us/azure/kusto/api/netfx/about-the-sdk)
- [Java](https://github.com/azure/azure-kusto-java)
For the comfort of the user, here is a [Pyspark sample](./samples/src/main/python/pyKusto.py) for the connector.
# Need Support?

Просмотреть файл

@ -102,7 +102,12 @@
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.1</version>
<version>2.10.2</version>
</dependency>
<dependency>
<groupId>org.joda</groupId>
<artifactId>joda-convert</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>com.univocity</groupId>

Просмотреть файл

@ -1,12 +1,11 @@
package com.microsoft.kusto.spark.datasource
package com.microsoft.kusto.spark.common
import java.util.Locale
import scala.concurrent.duration.FiniteDuration
object KustoOptions {
trait KustoOptions {
private val kustoOptionNames = collection.mutable.Set[String]()
private def newOption(name: String): String = {
protected def newOption(name: String): String = {
kustoOptionNames += name.toLowerCase(Locale.ROOT)
name
}
@ -14,8 +13,6 @@ object KustoOptions {
// KeyVault options. Relevant only if credentials need to be retrieved from Key Vault
val KEY_VAULT_URI = "keyVaultUri"
val KEY_VAULT_CREDENTIALS = "keyVaultCredentials"
val KEY_VAULT_PEM_FILE_PATH = "keyVaultPemFilePath"
val KEY_VAULT_CERTIFICATE_KEY = "keyVaultPemFileKey"
val KEY_VAULT_APP_ID = "keyVaultAppId"
val KEY_VAULT_APP_KEY = "keyVaultAppKey"
@ -25,88 +22,40 @@ object KustoOptions {
val KUSTO_AAD_AUTHORITY_ID: String = newOption("kustoAADAuthorityID")
// AAD application key for the client
val KUSTO_AAD_CLIENT_PASSWORD: String = newOption("kustoClientAADClientPassword")
val KUSTO_ACCESS_TOKEN: String = newOption("accessToken")
// Target/source Kusto cluster for writing/reading the data.
val KUSTO_CLUSTER: String = newOption("kustoCluster")
val KUSTO_CONNECTION_TIMEOUT: String = newOption("kustoConnectionTimeout")
val KUSTO_CREATE_TABLE_COLUMN_TYPES: String = newOption("createTableColumnTypes")
val KUSTO_CUSTOM_DATAFRAME_COLUMN_TYPES: String = newOption("customSchema")
// Target/source Kusto database for writing/reading the data. See KustoSink.md/KustoSource.md for
// required permissions
val KUSTO_DATABASE: String = newOption("kustoDatabase")
val KUSTO_QUERY: String = newOption("kustoQuery")
val KUSTO_QUERY_RETRY_TIMES: String = newOption("kustoQueryRetryTimes")
// Target/source Kusto table for writing/reading the data. See KustoSink.md/KustoSource.md for
// required permissions
val KUSTO_TABLE: String = newOption("kustoTable")
// If set to 'FailIfNotExist', the operation will fail if the table is not found
// in the requested cluster and database.
// If set to 'CreateIfNotExist' and the table is not found in the requested cluster and database,
// it will be created, with a schema matching the DataFrame that is being written.
// Default: 'FailIfNotExist'
val KUSTO_TABLE_CREATE_OPTIONS: String = newOption("tableCreateOptions")
val KUSTO_ACCESS_TOKEN: String = newOption("accessToken")
// When writing to Kusto, allows the driver to complete operation asynchronously. See KustoSink.md for
// details and limitations. Default: 'false'
val KUSTO_WRITE_ENABLE_ASYNC: String = newOption("writeEnableAsync")
// When writing to Kusto, limits the number of rows read back as BaseRelation. Default: '1'.
// To read back all rows, set as 'none' (NONE_RESULT_LIMIT)
val KUSTO_WRITE_RESULT_LIMIT: String = newOption("writeResultLimit")
// An integer number corresponding to the period in seconds after which the operation will timeout. Default: '5400' (90 minutes)
val KUSTO_TIMEOUT_LIMIT: String = newOption("timeoutLimit")
// Partitioning parameters, CURRENTLY NOT USED
// CURRENTLY NOT USED
val KUSTO_READ_PARTITION_MODE: String = newOption("partitionMode")
// Note: for 'read', this allows to partition export from Kusto to blob (default is '1')
// It does not affect the number of partitions created when reading from blob.
// Therefore it is not recommended to use this option when reading from Kusto
// CURRENTLY NOT USED, left here for experimentation
val KUSTO_NUM_PARTITIONS: String = newOption("numPartitions")
// CURRENTLY NOT USED, left here for experimentation
val KUSTO_PARTITION_COLUMN: String = newOption("partitionColumn")
object SinkTableCreationMode extends Enumeration {
type SinkTableCreationMode = Value
val CreateIfNotExist, FailIfNotExist = Value
}
// Blob Storage access parameters for source connector when working in 'scale' mode (read)
// These parameters will not be needed once we move to automatic blob provisioning
// Transient storage account when reading from Kusto
val KUSTO_BLOB_STORAGE_ACCOUNT_NAME: String = newOption("blobStorageAccountName")
// Storage account key. Use either this or SAS key to access the storage account
val KUSTO_BLOB_STORAGE_ACCOUNT_KEY: String = newOption("blobStorageAccountKey")
// SAS access key: a complete query string of the SAS as a container
// Use either this or storage account key to access the storage account
val KUSTO_BLOB_STORAGE_SAS_URL: String = newOption("blobStorageSasUrl")
// Blob container name
val KUSTO_BLOB_CONTAINER: String = newOption("blobContainer")
val NONE_RESULT_LIMIT = "none"
// Partitioning modes allow to export data from Kusto to separate folders within the blob container per-partition
// Note! In current implementation this is not exploited by Kusto read connector
// Left for future experimentation
val supportedPartitioningModes: Set[String] = Set("hash")
}
case class KustoCoordinates(cluster: String, database: String, table: Option[String] = None)
case class WriteOptions(tableCreateOptions: KustoOptions.SinkTableCreationMode.SinkTableCreationMode = KustoOptions.SinkTableCreationMode.FailIfNotExist,
isAsync: Boolean = false, writeResultLimit: String = KustoOptions.NONE_RESULT_LIMIT, timeZone: String = "UTC", timeout: FiniteDuration)
/************************************************************************************/
/** **********************************************************************************/
/* NOTE!!! */
/* These options are intended for testing, experimentation and debug. */
/* They may not be used in a production environment */
/* Interface stability is not guaranteed: options may be removed or changed freely */
/************************************************************************************/
object KustoDebugOptions {
/** **********************************************************************************/
private[kusto] object KustoDebugOptions {
private val kustoOptionNames = collection.mutable.Set[String]()
private def newOption(name: String): String = {
kustoOptionNames += name.toLowerCase(Locale.ROOT)
name
}
// Reading method is determined internally by the connector
// This option allows to override connector heuristics and force a specific mode.
// Recommended to use only for debug and testing purposes
@ -119,5 +68,23 @@ object KustoDebugOptions {
// The size limit in MB (uncompressed) after which the export to blob command will create another file (split)
// Setting negative or zero value results in applying export command default
val KUSTO_DBG_BLOB_FILE_SIZE_LIMIT_MB: String = newOption("dbgBlobFileSizeLimitMb")
}
// Partitioning parameters, CURRENTLY NOT USED
// CURRENTLY NOT USED
val KUSTO_READ_PARTITION_MODE: String = newOption("partitionMode")
// Note: for 'read', this allows to partition export from Kusto to blob (default is '1')
// It does not affect the number of partitions created when reading from blob.
// Therefore it is not recommended to use this option when reading from Kusto
// CURRENTLY NOT USED, left here for experimentation
val KUSTO_NUM_PARTITIONS: String = newOption("numPartitions")
// CURRENTLY NOT USED, left here for experimentation
val KUSTO_PARTITION_COLUMN: String = newOption("partitionColumn")
// Partitioning modes allow to export data from Kusto to separate folders within the blob container per-partition
// Note! In current implementation this is not exploited by Kusto read connector
// Left for future experimentation
val supportedPartitioningModes: Set[String] = Set("hash")
val KEY_VAULT_PEM_FILE_PATH = "keyVaultPemFilePath" // Not yet supported
val KEY_VAULT_CERTIFICATE_KEY = "keyVaultPemFileKey" // Not yet supported
}

Просмотреть файл

@ -3,8 +3,8 @@ package com.microsoft.kusto.spark.datasink
import java.io._
import com.microsoft.kusto.spark.authentication.KustoAuthentication
import com.microsoft.kusto.spark.datasource._
import com.microsoft.kusto.spark.utils.{KustoDataSourceUtils => KDSU}
import com.microsoft.kusto.spark.common.KustoCoordinates
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.{DataFrame, SQLContext}

Просмотреть файл

@ -0,0 +1,40 @@
package com.microsoft.kusto.spark.datasink
import com.microsoft.kusto.spark.common.KustoOptions
import scala.concurrent.duration.FiniteDuration
object KustoSinkOptions extends KustoOptions{
val KUSTO_TABLE: String = newOption("kustoTable")
val KUSTO_CUSTOM_DATAFRAME_COLUMN_TYPES: String = newOption("customSchema")
// If set to 'FailIfNotExist', the operation will fail if the table is not found
// in the requested cluster and database.
// If set to 'CreateIfNotExist' and the table is not found in the requested cluster and database,
// it will be created, with a schema matching the DataFrame that is being written.
// Default: 'FailIfNotExist'
val KUSTO_TABLE_CREATE_OPTIONS: String = newOption("tableCreateOptions")
// When writing to Kusto, allows the driver to complete operation asynchronously. See KustoSink.md for
// details and limitations. Default: 'false'
val KUSTO_WRITE_ENABLE_ASYNC: String = newOption("writeEnableAsync")
// When writing to Kusto, limits the number of rows read back as BaseRelation. Default: '1'.
// To read back all rows, set as 'none' (NONE_RESULT_LIMIT)
val KUSTO_WRITE_RESULT_LIMIT: String = newOption("writeResultLimit")
// A json representation of the SparkIngestionProperties object used for writing to Kusto
var KUSTO_SPARK_INGESTION_PROPERTIES_JSON: String = newOption("sparkIngestionPropertiesJson")
val NONE_RESULT_LIMIT = "none"
}
object SinkTableCreationMode extends Enumeration {
type SinkTableCreationMode = Value
val CreateIfNotExist, FailIfNotExist = Value
}
case class WriteOptions(tableCreateOptions: SinkTableCreationMode.SinkTableCreationMode = SinkTableCreationMode.FailIfNotExist,
isAsync: Boolean = false,
writeResultLimit: String = KustoSinkOptions.NONE_RESULT_LIMIT,
timeZone: String = "UTC", timeout: FiniteDuration,
IngestionProperties: Option[String] = None)

Просмотреть файл

@ -11,15 +11,15 @@ import java.util.zip.GZIPOutputStream
import java.util.{TimeZone, UUID}
import com.microsoft.azure.kusto.data.Client
import com.microsoft.azure.kusto.ingest.{IngestClient, IngestionProperties}
import com.microsoft.azure.kusto.ingest.IngestionProperties.DATA_FORMAT
import com.microsoft.azure.kusto.ingest.result.{IngestionResult, IngestionStatus, OperationStatus}
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo
import com.microsoft.azure.kusto.ingest.{IngestClient, IngestionProperties}
import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature
import com.microsoft.azure.storage.blob.{CloudBlobContainer, CloudBlockBlob}
import com.microsoft.kusto.spark.authentication.KustoAuthentication
import com.microsoft.kusto.spark.datasink
import com.microsoft.kusto.spark.datasource.{KustoCoordinates, WriteOptions}
import com.microsoft.kusto.spark.common.KustoCoordinates
import com.microsoft.kusto.spark.utils.CslCommandsGenerator._
import com.microsoft.kusto.spark.utils.{KustoClient, KustoClientCache, KustoQueryUtils, KustoConstants => KCONST, KustoDataSourceUtils => KDSU}
import com.univocity.parsers.csv.{CsvWriter, CsvWriterSettings}
@ -49,7 +49,8 @@ object KustoWriter {
private[kusto] def write(batchId: Option[Long],
data: DataFrame,
tableCoordinates: KustoCoordinates,
authentication: KustoAuthentication, writeOptions: WriteOptions): Unit = {
authentication: KustoAuthentication,
writeOptions: WriteOptions): Unit = {
val batchIdIfExists = batchId.filter(_ != 0).map(_.toString).getOrElse("")
val kustoClient = KustoClientCache.getClient(tableCoordinates.cluster, authentication)
@ -144,7 +145,12 @@ object KustoWriter {
(implicit parameters: KustoWriteResource): Seq[IngestionResult] = {
import parameters._
val ingestionProperties = new IngestionProperties(coordinates.database, tmpTableName)
val ingestionProperties = if (writeOptions.IngestionProperties.isDefined) {
SparkIngestionProperties.fromString(writeOptions.IngestionProperties.get).toIngestionProperties(coordinates.database, tmpTableName)
} else {
new IngestionProperties(coordinates.database, tmpTableName)
}
ingestionProperties.setDataFormat(DATA_FORMAT.csv.name)
ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.Table)
ingestionProperties.setReportLevel(IngestionProperties.IngestionReportLevel.FailuresAndSuccesses)
@ -226,7 +232,7 @@ object KustoWriter {
ingestionResults.foreach {
ingestionResult =>
KDSU.runSequentially[IngestionStatus](
func = () => ingestionResult.getIngestionStatusCollection().get(0),
func = () => ingestionResult.getIngestionStatusCollection.get(0),
0, delayPeriodBetweenCalls, (timeout.toMillis / delayPeriodBetweenCalls + 5).toInt,
res => res.status == OperationStatus.Pending,
res => res.status match {
@ -381,7 +387,7 @@ object KustoWriter {
result(x) = "\"" + keys(x) + "\"" + ":" + values(x)
}
"{" + result.mkString(",") +"}"
"{" + result.mkString(",") + "}"
}

Просмотреть файл

@ -0,0 +1,67 @@
package com.microsoft.kusto.spark.datasink
import java.util
import com.microsoft.azure.kusto.ingest.IngestionProperties
import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility
import org.codehaus.jackson.annotate.JsonMethod
import org.codehaus.jackson.map.ObjectMapper
import org.joda.time.DateTime
class SparkIngestionProperties {
var flushImmediately: Boolean = false
var dropByTags: util.ArrayList[String] = _
var ingestByTags: util.ArrayList[String] = _
var additionalTags: util.ArrayList[String] = _
var ingestIfNotExists: util.ArrayList[String] = _
var creationTime: DateTime = _
var csvMapping: String = _
var csvMappingNameReference: String = _
override def toString: String = {
new ObjectMapper().setVisibility(JsonMethod.FIELD, Visibility.ANY)
.writerWithDefaultPrettyPrinter
.writeValueAsString(this)
}
def toIngestionProperties(database: String, table: String): IngestionProperties ={
val ingestionProperties = new IngestionProperties(database, table)
val additionalProperties = new util.HashMap[String, String]()
if (this.dropByTags != null) {
ingestionProperties.setDropByTags(this.dropByTags)
}
if (this.ingestByTags != null) {
ingestionProperties.setIngestByTags(this.ingestByTags)
}
if (this.additionalTags != null) {
ingestionProperties.setAdditionalTags(this.additionalTags)
}
if (this.ingestIfNotExists != null) {
ingestionProperties.setIngestIfNotExists(this.ingestIfNotExists)
}
if (this.creationTime != null) {
additionalProperties.put("creationTime", this.creationTime.toString())
}
if (this.csvMapping != null) {
additionalProperties.put("csvMapping", this.csvMapping)
}
if (this.csvMappingNameReference != null) {
ingestionProperties.setCsvMappingName(this.csvMappingNameReference)
}
ingestionProperties.setAdditionalProperties(additionalProperties)
ingestionProperties
}
}
object SparkIngestionProperties {
private[kusto] def fromString(json: String): SparkIngestionProperties = {
new ObjectMapper().setVisibility(JsonMethod.FIELD, Visibility.ANY).readValue(json, classOf[SparkIngestionProperties])
}
}

Просмотреть файл

@ -3,8 +3,10 @@ package com.microsoft.kusto.spark.datasource
import java.security.InvalidParameterException
import java.util.concurrent.TimeUnit
import com.microsoft.azure.kusto.data.ClientRequestProperties
import com.microsoft.kusto.spark.authentication.{KeyVaultAuthentication, KustoAuthentication}
import com.microsoft.kusto.spark.datasink.KustoWriter
import com.microsoft.kusto.spark.common.{KustoCoordinates, KustoDebugOptions}
import com.microsoft.kusto.spark.datasink.{KustoSinkOptions, KustoWriter}
import com.microsoft.kusto.spark.utils.{KeyVaultUtils, KustoQueryUtils, KustoConstants => KCONST, KustoDataSourceUtils => KDSU}
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider}
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
@ -16,6 +18,7 @@ class DefaultSource extends CreatableRelationProvider
var authenticationParameters: Option[KustoAuthentication] = None
var kustoCoordinates: KustoCoordinates = _
var keyVaultAuthentication: Option[KeyVaultAuthentication] = None
var clientRequestProperties: Option[ClientRequestProperties] = None
override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = {
val sinkParameters = KDSU.parseSinkParameters(parameters, mode)
@ -23,7 +26,7 @@ class DefaultSource extends CreatableRelationProvider
kustoCoordinates = sinkParameters.sourceParametersResults.kustoCoordinates
authenticationParameters = Some(sinkParameters.sourceParametersResults.authenticationParameters)
if(keyVaultAuthentication.isDefined){
if (keyVaultAuthentication.isDefined) {
val paramsFromKeyVault = KeyVaultUtils.getAadAppParametersFromKeyVault(keyVaultAuthentication.get)
authenticationParameters = Some(KDSU.mergeKeyVaultAndOptionsAuthentication(paramsFromKeyVault, authenticationParameters))
}
@ -35,7 +38,7 @@ class DefaultSource extends CreatableRelationProvider
authenticationParameters.get,
sinkParameters.writeOptions)
val limit = if (sinkParameters.writeOptions.writeResultLimit.equalsIgnoreCase(KustoOptions.NONE_RESULT_LIMIT)) None else {
val limit = if (sinkParameters.writeOptions.writeResultLimit.equalsIgnoreCase(KustoSinkOptions.NONE_RESULT_LIMIT)) None else {
try {
Some(sinkParameters.writeOptions.writeResultLimit.toInt)
}
@ -49,38 +52,39 @@ class DefaultSource extends CreatableRelationProvider
def adjustParametersForBaseRelation(parameters: Map[String, String], limit: Option[Int]): Map[String, String] = {
if (limit.isDefined) {
parameters + (KustoOptions.KUSTO_QUERY -> KustoQueryUtils.limitQuery(parameters(KustoOptions.KUSTO_TABLE), limit.get))
parameters + (KustoSourceOptions.KUSTO_QUERY -> KustoQueryUtils.limitQuery(parameters(KustoSinkOptions.KUSTO_TABLE), limit.get))
} else {
parameters
}
}
private[kusto] def blobStorageAttributesProvided(parameters: Map[String, String]) = {
parameters.get(KustoOptions.KUSTO_BLOB_STORAGE_SAS_URL).isDefined || (parameters.get(KustoOptions.KUSTO_BLOB_STORAGE_ACCOUNT_NAME).isDefined &&
parameters.get(KustoOptions.KUSTO_BLOB_CONTAINER).isDefined && parameters.get(KustoOptions.KUSTO_BLOB_STORAGE_ACCOUNT_KEY).isDefined)
parameters.get(KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL).isDefined || (parameters.get(KustoSourceOptions.KUSTO_BLOB_STORAGE_ACCOUNT_NAME).isDefined &&
parameters.get(KustoSourceOptions.KUSTO_BLOB_CONTAINER).isDefined && parameters.get(KustoSourceOptions.KUSTO_BLOB_STORAGE_ACCOUNT_KEY).isDefined)
}
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
val requestedPartitions = parameters.get(KustoOptions.KUSTO_NUM_PARTITIONS)
val partitioningMode = parameters.get(KustoOptions.KUSTO_READ_PARTITION_MODE)
val isCompressOnExport = parameters.getOrElse(KustoDebugOptions.KUSTO_DBG_BLOB_COMPRESS_ON_EXPORT, "true").trim.toBoolean
val requestedPartitions = parameters.get(KustoDebugOptions.KUSTO_NUM_PARTITIONS)
val partitioningMode = parameters.get(KustoDebugOptions.KUSTO_READ_PARTITION_MODE)
val shouldCompressOnExport = parameters.getOrElse(KustoDebugOptions.KUSTO_DBG_BLOB_COMPRESS_ON_EXPORT, "true").trim.toBoolean
// Set default export split limit as 1GB, maximal allowed
val exportSplitLimitMb = parameters.getOrElse(KustoDebugOptions.KUSTO_DBG_BLOB_FILE_SIZE_LIMIT_MB, "1024").trim.toInt
val numPartitions = setNumPartitions(sqlContext, requestedPartitions, partitioningMode)
var storageSecretIsAccountKey = true
var storageSecret = parameters.get(KustoOptions.KUSTO_BLOB_STORAGE_ACCOUNT_KEY)
var storageSecret = parameters.get(KustoSourceOptions.KUSTO_BLOB_STORAGE_ACCOUNT_KEY)
if (storageSecret.isEmpty) {
storageSecret = parameters.get(KustoOptions.KUSTO_BLOB_STORAGE_SAS_URL)
storageSecret = parameters.get(KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL)
if (storageSecret.isDefined) storageSecretIsAccountKey = false
}
if(authenticationParameters.isEmpty){
if (authenticationParameters.isEmpty) {
val sourceParameters = KDSU.parseSourceParameters(parameters)
authenticationParameters = Some(sourceParameters.authenticationParameters)
kustoCoordinates = sourceParameters.kustoCoordinates
keyVaultAuthentication = sourceParameters.keyVaultAuth
clientRequestProperties = KDSU.getClientRequestProperties(parameters)
}
val (kustoAuthentication, storageParameters): (Option[KustoAuthentication], Option[KustoStorageParameters]) =
@ -89,33 +93,34 @@ class DefaultSource extends CreatableRelationProvider
authenticationParameters = Some(KDSU.mergeKeyVaultAndOptionsAuthentication(KeyVaultUtils.getAadAppParametersFromKeyVault(keyVaultAuthentication.get), authenticationParameters))
(authenticationParameters, KDSU.mergeKeyVaultAndOptionsStorageParams(
parameters.get(KustoOptions.KUSTO_BLOB_STORAGE_ACCOUNT_NAME),
parameters.get(KustoOptions.KUSTO_BLOB_CONTAINER),
parameters.get(KustoSourceOptions.KUSTO_BLOB_STORAGE_ACCOUNT_NAME),
parameters.get(KustoSourceOptions.KUSTO_BLOB_CONTAINER),
storageSecret,
storageSecretIsAccountKey,
keyVaultAuthentication.get))
} else {
// Params passed from options
(authenticationParameters, KDSU.getAndValidateTransientStorageParametersIfExist(
parameters.get(KustoOptions.KUSTO_BLOB_STORAGE_ACCOUNT_NAME),
parameters.get(KustoOptions.KUSTO_BLOB_CONTAINER),
parameters.get(KustoSourceOptions.KUSTO_BLOB_STORAGE_ACCOUNT_NAME),
parameters.get(KustoSourceOptions.KUSTO_BLOB_CONTAINER),
storageSecret,
storageSecretIsAccountKey))
}
val timeout = new FiniteDuration(parameters.getOrElse(KustoOptions.KUSTO_TIMEOUT_LIMIT, KCONST.defaultTimeoutAsString).toLong, TimeUnit.SECONDS)
val timeout = new FiniteDuration(parameters.getOrElse(KustoSourceOptions.KUSTO_TIMEOUT_LIMIT, KCONST.defaultTimeoutAsString).toLong, TimeUnit.SECONDS)
KustoRelation(
kustoCoordinates,
kustoAuthentication.get,
parameters.getOrElse(KustoOptions.KUSTO_QUERY, ""),
KustoReadOptions(parameters.getOrElse(KustoDebugOptions.KUSTO_DBG_FORCE_READ_MODE, ""), isCompressOnExport, exportSplitLimitMb),
parameters.getOrElse(KustoSourceOptions.KUSTO_QUERY, ""),
KustoReadOptions(parameters.getOrElse(KustoDebugOptions.KUSTO_DBG_FORCE_READ_MODE, ""), shouldCompressOnExport, exportSplitLimitMb),
timeout,
numPartitions,
parameters.get(KustoOptions.KUSTO_PARTITION_COLUMN),
parameters.get(KustoDebugOptions.KUSTO_PARTITION_COLUMN),
partitioningMode,
parameters.get(KustoOptions.KUSTO_CUSTOM_DATAFRAME_COLUMN_TYPES),
storageParameters
parameters.get(KustoSourceOptions.KUSTO_CUSTOM_DATAFRAME_COLUMN_TYPES),
storageParameters,
clientRequestProperties
)(sqlContext.sparkSession)
}

Просмотреть файл

@ -2,8 +2,9 @@ package com.microsoft.kusto.spark.datasource
import java.security.InvalidParameterException
import java.util.UUID
import com.microsoft.azure.kusto.data.Client
import com.microsoft.kusto.spark.authentication.KustoAuthentication
import com.microsoft.azure.kusto.data.{Client, ClientRequestProperties}
import com.microsoft.kusto.spark.common.KustoCoordinates
import com.microsoft.kusto.spark.utils.{CslCommandsGenerator, KustoAzureFsSetupCache, KustoBlobStorageUtils, KustoQueryUtils, KustoDataSourceUtils => KDSU}
import org.apache.spark.Partition
import org.apache.spark.rdd.RDD
@ -32,11 +33,12 @@ private[kusto] case class KustoReadRequest(sparkSession: SparkSession,
kustoCoordinates: KustoCoordinates,
query: String,
authentication: KustoAuthentication,
timeout: FiniteDuration)
timeout: FiniteDuration,
clientRequestProperties: Option[ClientRequestProperties])
private[kusto] case class KustoReadOptions( forcedReadMode: String = "",
isCompressOnExport: Boolean = true,
exportSplitLimitMb: Long = 1024)
private[kusto] case class KustoReadOptions(forcedReadMode: String = "",
shouldCompressOnExport: Boolean = true,
exportSplitLimitMb: Long = 1024)
private[kusto] object KustoReader {
private val myName = this.getClass.getSimpleName
@ -47,7 +49,10 @@ private[kusto] object KustoReader {
filtering: KustoFiltering = KustoFiltering.apply()): RDD[Row] = {
val filteredQuery = KustoFilter.pruneAndFilter(request.schema, request.query, filtering)
val kustoResult = kustoClient.execute(request.kustoCoordinates.database, filteredQuery)
val kustoResult = kustoClient.execute(request.kustoCoordinates.database,
filteredQuery,
request.clientRequestProperties.orNull)
val serializer = KustoResponseDeserializer(kustoResult)
request.sparkSession.createDataFrame(serializer.toRows, serializer.getSchema).rdd
}
@ -72,7 +77,8 @@ private[kusto] object KustoReader {
storage,
directory,
options,
filtering)
filtering,
request.clientRequestProperties)
}
val path = s"wasbs://${storage.container}@${storage.account}.blob.core.windows.net/$directory"
@ -157,7 +163,8 @@ private[kusto] class KustoReader(client: Client, request: KustoReadRequest, stor
storage: KustoStorageParameters,
directory: String,
options: KustoReadOptions,
filtering: KustoFiltering): Unit = {
filtering: KustoFiltering,
clientRequestProperties: Option[ClientRequestProperties]): Unit = {
val limit = if (options.exportSplitLimitMb <= 0) None else Some(options.exportSplitLimitMb)
@ -171,10 +178,12 @@ private[kusto] class KustoReader(client: Client, request: KustoReadRequest, stor
partition.idx,
partition.predicate,
limit,
isCompressed = options.isCompressOnExport
isCompressed = options.shouldCompressOnExport
)
val commandResult = client.execute(request.kustoCoordinates.database, exportCommand)
val commandResult = client.execute(request.kustoCoordinates.database,
exportCommand,
clientRequestProperties.orNull)
KDSU.verifyAsyncCommandCompletion(client, request.kustoCoordinates.database, commandResult, timeOut = request.timeout)
}
}

Просмотреть файл

@ -3,7 +3,9 @@ package com.microsoft.kusto.spark.datasource
import java.security.InvalidParameterException
import java.util.Locale
import com.microsoft.azure.kusto.data.ClientRequestProperties
import com.microsoft.kusto.spark.authentication.KustoAuthentication
import com.microsoft.kusto.spark.common.{KustoCoordinates, KustoDebugOptions}
import com.microsoft.kusto.spark.utils.{KustoClientCache, KustoQueryUtils, KustoDataSourceUtils => KDSU}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan, TableScan}
@ -13,25 +15,27 @@ import org.apache.spark.sql.{Row, SQLContext, SparkSession}
import scala.concurrent.duration.FiniteDuration
private[kusto] case class KustoRelation(kustoCoordinates: KustoCoordinates,
authentication: KustoAuthentication,
query: String,
readOptions: KustoReadOptions,
timeout: FiniteDuration,
numPartitions: Int,
partitioningColumn: Option[String],
partitioningMode: Option[String],
customSchema: Option[String] = None,
storageParameters: Option[KustoStorageParameters])
(@transient val sparkSession: SparkSession)
authentication: KustoAuthentication,
query: String,
readOptions: KustoReadOptions,
timeout: FiniteDuration,
numPartitions: Int,
partitioningColumn: Option[String],
partitioningMode: Option[String],
customSchema: Option[String] = None,
storageParameters: Option[KustoStorageParameters],
clientRequestProperties: Option[ClientRequestProperties])
(@transient val sparkSession: SparkSession)
extends BaseRelation with TableScan with PrunedFilteredScan with Serializable {
private val normalizedQuery = KustoQueryUtils.normalizeQuery(query)
var cachedSchema: StructType = _
override def sqlContext: SQLContext = sparkSession.sqlContext
override def schema: StructType = {
if(cachedSchema == null){
cachedSchema = if (customSchema.isDefined) {
if (cachedSchema == null) {
cachedSchema = if (customSchema.isDefined) {
StructType.fromDDL(customSchema.get)
}
else getSchema
@ -42,7 +46,7 @@ private[kusto] case class KustoRelation(kustoCoordinates: KustoCoordinates,
override def buildScan(): RDD[Row] = {
val kustoClient = KustoClientCache.getClient(kustoCoordinates.cluster, authentication).engineClient
val count = KDSU.countRows(kustoClient, query, kustoCoordinates.database)
if (count == 0) {
if (count == 0) {
sparkSession.emptyDataFrame.rdd
}
else {
@ -50,12 +54,12 @@ private[kusto] case class KustoRelation(kustoCoordinates: KustoCoordinates,
if (useLeanMode) {
KustoReader.leanBuildScan(
kustoClient,
KustoReadRequest(sparkSession, schema, kustoCoordinates, query, authentication, timeout)
KustoReadRequest(sparkSession, schema, kustoCoordinates, query, authentication, timeout, clientRequestProperties)
)
} else {
KustoReader.scaleBuildScan(
kustoClient,
KustoReadRequest(sparkSession, schema, kustoCoordinates, query, authentication, timeout),
KustoReadRequest(sparkSession, schema, kustoCoordinates, query, authentication, timeout, clientRequestProperties),
storageParameters.get,
KustoPartitionParameters(numPartitions, getPartitioningColumn, getPartitioningMode)
)
@ -74,13 +78,13 @@ private[kusto] case class KustoRelation(kustoCoordinates: KustoCoordinates,
if (useLeanMode) {
KustoReader.leanBuildScan(
kustoClient,
KustoReadRequest(sparkSession, schema, kustoCoordinates, query, authentication, timeout),
KustoReadRequest(sparkSession, schema, kustoCoordinates, query, authentication, timeout, clientRequestProperties),
KustoFiltering(requiredColumns, filters)
)
} else {
KustoReader.scaleBuildScan(
kustoClient,
KustoReadRequest(sparkSession, schema, kustoCoordinates, query, authentication, timeout),
KustoReadRequest(sparkSession, schema, kustoCoordinates, query, authentication, timeout, clientRequestProperties),
storageParameters.get,
KustoPartitionParameters(numPartitions, getPartitioningColumn, getPartitioningMode),
readOptions,
@ -117,9 +121,9 @@ private[kusto] case class KustoRelation(kustoCoordinates: KustoCoordinates,
private def getPartitioningMode: String = {
if (partitioningMode.isDefined) {
val mode = partitioningMode.get.toLowerCase(Locale.ROOT)
if (!KustoOptions.supportedPartitioningModes.contains(mode)) {
if (!KustoDebugOptions.supportedPartitioningModes.contains(mode)) {
throw new InvalidParameterException(
s"Specified partitioning mode '$mode' : ${KDSU.NewLine}${KustoOptions.supportedPartitioningModes.mkString(", ")}")
s"Specified partitioning mode '$mode' : ${KDSU.NewLine}${KustoDebugOptions.supportedPartitioningModes.mkString(", ")}")
}
mode
} else "hash"

Просмотреть файл

@ -0,0 +1,26 @@
package com.microsoft.kusto.spark.datasource
import com.microsoft.kusto.spark.common.KustoOptions
object KustoSourceOptions extends KustoOptions {
val KUSTO_CUSTOM_DATAFRAME_COLUMN_TYPES: String = newOption("customSchema")
val KUSTO_QUERY: String = newOption("kustoQuery")
val KUSTO_QUERY_RETRY_TIMES: String = newOption("kustoQueryRetryTimes")
// A json representation of the ClientRequestProperties object used for reading from Kusto
var KUSTO_CLIENT_REQUEST_PROPERTIES_JSON: String = newOption("clientRequestPropertiesJson")
// Blob Storage access parameters for source connector when working in 'scale' mode (read)
// These parameters will not be needed once we move to automatic blob provisioning
// Transient storage account when reading from Kusto
val KUSTO_BLOB_STORAGE_ACCOUNT_NAME: String = newOption("blobStorageAccountName")
// Storage account key. Use either this or SAS key to access the storage account
val KUSTO_BLOB_STORAGE_ACCOUNT_KEY: String = newOption("blobStorageAccountKey")
// SAS access key: a complete query string of the SAS as a container
// Use either this or storage account key to access the storage account
val KUSTO_BLOB_STORAGE_SAS_URL: String = newOption("blobStorageSasUrl")
// Blob container name
val KUSTO_BLOB_CONTAINER: String = newOption("blobContainer")
}

Просмотреть файл

@ -1,22 +1,39 @@
package com.microsoft.kusto.spark.sql.extension
import java.util.Properties
import com.microsoft.kusto.spark.datasource.KustoOptions
import org.apache.spark.sql._
import scala.collection.JavaConverters._
import com.microsoft.azure.kusto.data.ClientRequestProperties
import com.microsoft.kusto.spark.datasink.{KustoSinkOptions, SparkIngestionProperties}
import com.microsoft.kusto.spark.datasource.KustoSourceOptions
import org.apache.spark.sql.{DataFrameWriter, _}
object SparkExtension {
implicit class DataFrameReaderExtension(dataframeReader: DataFrameReader) {
def kusto(kustoCluster: String, database: String, query: String, properties: Map[String, String]): DataFrame = {
dataframeReader.format("com.microsoft.kusto.spark.datasource")
.option(KustoOptions.KUSTO_CLUSTER, kustoCluster)
.option(KustoOptions.KUSTO_DATABASE, database)
.option(KustoOptions.KUSTO_QUERY, query)
.options(properties)
implicit class DataFrameReaderExtension(df: DataFrameReader) {
def kusto(kustoCluster: String, database: String, query: String, conf: Map[String, String] = Map.empty[String, String], cpr: Option[ClientRequestProperties] = None): DataFrame = {
(if (cpr.isDefined) {
df.option(KustoSourceOptions.KUSTO_CLIENT_REQUEST_PROPERTIES_JSON, cpr.get.toString)
} else {
df
}).format("com.microsoft.kusto.spark.datasource")
.option(KustoSourceOptions.KUSTO_CLUSTER, kustoCluster)
.option(KustoSourceOptions.KUSTO_DATABASE, database)
.option(KustoSourceOptions.KUSTO_QUERY, query)
.options(conf)
.load()
}
}
implicit class DataFrameWriterExtension(df: DataFrameWriter[Row]) {
def kusto(kustoCluster: String, database: String, table: String, sparkIngestionProperties: Option[SparkIngestionProperties] = None): Unit = {
(if (sparkIngestionProperties.isDefined) {
df.option(KustoSinkOptions.KUSTO_SPARK_INGESTION_PROPERTIES_JSON, sparkIngestionProperties.get.toString)
} else {
df
}).format("com.microsoft.kusto.spark.datasource")
.option(KustoSinkOptions.KUSTO_CLUSTER, kustoCluster)
.option(KustoSinkOptions.KUSTO_DATABASE, database)
.option(KustoSinkOptions.KUSTO_TABLE, table)
.save()
}
}
}

Просмотреть файл

@ -1,13 +1,12 @@
package com.microsoft.kusto.spark.utils
import java.util.StringJoiner
import com.microsoft.azure.kusto.data.Client
import com.microsoft.azure.kusto.ingest.IngestClient
import com.microsoft.kusto.spark.datasource.KustoCoordinates
import com.microsoft.kusto.spark.datasource.KustoOptions.SinkTableCreationMode
import com.microsoft.kusto.spark.datasource.KustoOptions.SinkTableCreationMode.SinkTableCreationMode
import com.microsoft.kusto.spark.datasink.SinkTableCreationMode
import com.microsoft.kusto.spark.datasink.SinkTableCreationMode.SinkTableCreationMode
import com.microsoft.kusto.spark.common.KustoCoordinates
import com.microsoft.kusto.spark.utils.CslCommandsGenerator._
import com.microsoft.kusto.spark.utils.KustoDataSourceUtils.extractSchemaFromResultTable
import com.microsoft.kusto.spark.utils.{KustoDataSourceUtils => KDSU}
@ -57,7 +56,7 @@ class KustoClient(val clusterAlias: String, val engineClient: Client, val dmClie
private var storageUris: Seq[String] = Seq.empty
private var lastRefresh: DateTime = new DateTime(DateTimeZone.UTC)
def getNewTempBlobReference(): String = {
def getNewTempBlobReference: String = {
// Refresh if storageExpiryMinutes have passed since last refresh for this cluster as SAS should be valid for at least 120 minutes
if (storageUris.isEmpty ||
new Period(new DateTime(DateTimeZone.UTC), lastRefresh).getMinutes > KustoConstants.storageExpiryMinutes) {

Просмотреть файл

@ -5,21 +5,23 @@ import java.util
import java.util.concurrent.{CountDownLatch, TimeUnit, TimeoutException}
import java.util.{NoSuchElementException, StringJoiner, Timer, TimerTask}
import com.microsoft.azure.kusto.data.{Client, Results}
import com.microsoft.azure.kusto.data.{Client, ClientRequestProperties, Results}
import com.microsoft.kusto.spark.authentication._
import com.microsoft.kusto.spark.datasource.{KustoCoordinates, WriteOptions, _}
import com.microsoft.kusto.spark.datasource.KustoOptions.SinkTableCreationMode
import com.microsoft.kusto.spark.datasource.KustoOptions.SinkTableCreationMode.SinkTableCreationMode
import com.microsoft.kusto.spark.datasink.SinkTableCreationMode.SinkTableCreationMode
import com.microsoft.kusto.spark.datasink.{KustoSinkOptions, SinkTableCreationMode, WriteOptions}
import com.microsoft.kusto.spark.common.{KustoCoordinates, KustoDebugOptions}
import com.microsoft.kusto.spark.datasource.{KustoResponseDeserializer, KustoSourceOptions, KustoStorageParameters}
import com.microsoft.kusto.spark.utils.CslCommandsGenerator._
import com.microsoft.kusto.spark.utils.{KustoConstants => KCONST}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types.StructType
import org.json.JSONObject
import scala.collection.JavaConversions._
import scala.util.matching.Regex
import scala.concurrent.duration._
import com.microsoft.kusto.spark.utils.{KustoConstants => KCONST}
import scala.util.matching.Regex
object KustoDataSourceUtils {
private val klog = Logger.getLogger("KustoConnector")
@ -59,7 +61,7 @@ object KustoDataSourceUtils {
val tableSchemaBuilder = new StringJoiner(",")
for(row <- resultRows){
for (row <- resultRows) {
// Each row contains {Name, CslType, Type}, converted to (Name:CslType) pairs
tableSchemaBuilder.add(s"['${row.get(0)}']:${row.get(1)}")
}
@ -75,8 +77,8 @@ object KustoDataSourceUtils {
def parseSourceParameters(parameters: Map[String, String]): SourceParameters = {
// Parse KustoTableCoordinates - these are mandatory options
val database = parameters.get(KustoOptions.KUSTO_DATABASE)
val cluster = parameters.get(KustoOptions.KUSTO_CLUSTER)
val database = parameters.get(KustoSourceOptions.KUSTO_DATABASE)
val cluster = parameters.get(KustoSourceOptions.KUSTO_CLUSTER)
if (database.isEmpty) {
throw new InvalidParameterException("KUSTO_DATABASE parameter is missing. Must provide a destination database name")
@ -86,35 +88,35 @@ object KustoDataSourceUtils {
throw new InvalidParameterException("KUSTO_CLUSTER parameter is missing. Must provide a destination cluster name")
}
val table = parameters.get(KustoOptions.KUSTO_TABLE)
val table = parameters.get(KustoSinkOptions.KUSTO_TABLE)
// Parse KustoAuthentication
val applicationId = parameters.getOrElse(KustoOptions.KUSTO_AAD_CLIENT_ID, "")
val applicationKey = parameters.getOrElse(KustoOptions.KUSTO_AAD_CLIENT_PASSWORD, "")
val applicationId = parameters.getOrElse(KustoSourceOptions.KUSTO_AAD_CLIENT_ID, "")
val applicationKey = parameters.getOrElse(KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD, "")
var authentication: KustoAuthentication = null
val keyVaultUri: String = parameters.getOrElse(KustoOptions.KEY_VAULT_URI, "")
val keyVaultUri: String = parameters.getOrElse(KustoSourceOptions.KEY_VAULT_URI, "")
var accessToken: String = ""
var keyVaultAuthentication: Option[KeyVaultAuthentication] = None
if (keyVaultUri != "") {
// KeyVault Authentication
val keyVaultAppId: String = parameters.getOrElse(KustoOptions.KEY_VAULT_APP_ID, "")
val keyVaultAppId: String = parameters.getOrElse(KustoSourceOptions.KEY_VAULT_APP_ID, "")
if (!keyVaultAppId.isEmpty) {
keyVaultAuthentication = Some(KeyVaultAppAuthentication(keyVaultUri,
keyVaultAppId,
parameters.getOrElse(KustoOptions.KEY_VAULT_APP_KEY, "")))
parameters.getOrElse(KustoSourceOptions.KEY_VAULT_APP_KEY, "")))
} else {
keyVaultAuthentication = Some(KeyVaultCertificateAuthentication(keyVaultUri,
parameters.getOrElse(KustoOptions.KEY_VAULT_PEM_FILE_PATH, ""),
parameters.getOrElse(KustoOptions.KEY_VAULT_CERTIFICATE_KEY, "")))
parameters.getOrElse(KustoDebugOptions.KEY_VAULT_PEM_FILE_PATH, ""),
parameters.getOrElse(KustoDebugOptions.KEY_VAULT_CERTIFICATE_KEY, "")))
}
}
if (!applicationId.isEmpty || !applicationKey.isEmpty) {
authentication = AadApplicationAuthentication(applicationId, applicationKey, parameters.getOrElse(KustoOptions.KUSTO_AAD_AUTHORITY_ID, "microsoft.com"))
authentication = AadApplicationAuthentication(applicationId, applicationKey, parameters.getOrElse(KustoSourceOptions.KUSTO_AAD_AUTHORITY_ID, "microsoft.com"))
}
else if ( {
accessToken = parameters.getOrElse(KustoOptions.KUSTO_ACCESS_TOKEN, "")
accessToken = parameters.getOrElse(KustoSourceOptions.KUSTO_ACCESS_TOKEN, "")
!accessToken.isEmpty
}) {
authentication = KustoAccessTokenAuthentication(accessToken)
@ -146,23 +148,27 @@ object KustoDataSourceUtils {
var isAsyncParam: String = ""
try {
isAsyncParam = parameters.getOrElse(KustoOptions.KUSTO_WRITE_ENABLE_ASYNC, "false")
isAsync = parameters.getOrElse(KustoOptions.KUSTO_WRITE_ENABLE_ASYNC, "false").trim.toBoolean
tableCreationParam = parameters.get(KustoOptions.KUSTO_TABLE_CREATE_OPTIONS)
isAsyncParam = parameters.getOrElse(KustoSinkOptions.KUSTO_WRITE_ENABLE_ASYNC, "false")
isAsync = parameters.getOrElse(KustoSinkOptions.KUSTO_WRITE_ENABLE_ASYNC, "false").trim.toBoolean
tableCreationParam = parameters.get(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS)
tableCreation = if (tableCreationParam.isEmpty) SinkTableCreationMode.FailIfNotExist else SinkTableCreationMode.withName(tableCreationParam.get)
} catch {
case _: NoSuchElementException => throw new InvalidParameterException(s"No such SinkTableCreationMode option: '${tableCreationParam.get}'")
case _: java.lang.IllegalArgumentException => throw new InvalidParameterException(s"KUSTO_WRITE_ENABLE_ASYNC is expecting either 'true' or 'false', got: '$isAsyncParam'")
}
val timeout = new FiniteDuration(parameters.getOrElse(KustoOptions.KUSTO_TIMEOUT_LIMIT, KCONST.defaultTimeoutAsString).toLong, TimeUnit.SECONDS)
val timeout = new FiniteDuration(parameters.getOrElse(KustoSinkOptions.KUSTO_TIMEOUT_LIMIT, KCONST.defaultTimeoutAsString).toLong, TimeUnit.SECONDS)
val ingestionPropertiesAsJson = parameters.get(KustoSinkOptions.KUSTO_SPARK_INGESTION_PROPERTIES_JSON)
val writeOptions = WriteOptions(
tableCreation,
isAsync,
parameters.getOrElse(KustoOptions.KUSTO_WRITE_RESULT_LIMIT, "1"),
parameters.getOrElse(KustoSinkOptions.KUSTO_WRITE_RESULT_LIMIT, "1"),
parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, "UTC"),
timeout)
timeout,
ingestionPropertiesAsJson
)
val sourceParameters = parseSourceParameters(parameters)
@ -172,6 +178,24 @@ object KustoDataSourceUtils {
SinkParameters(writeOptions, sourceParameters)
}
def getClientRequestProperties(parameters: Map[String, String]): Option[ClientRequestProperties] = {
val crpOption = parameters.get(KustoSourceOptions.KUSTO_CLIENT_REQUEST_PROPERTIES_JSON)
//TODO: use the java client implementation when published to maven
if (crpOption.isDefined) {
val crp = new ClientRequestProperties()
val jsonObj = new JSONObject(crpOption.get).asInstanceOf[util.HashMap[String, Object]]
val it = jsonObj.keySet().iterator()
while (it.hasNext) {
val optionName: String = it.next()
crp.setOption(optionName, jsonObj.get(optionName))
}
Some(crp)
} else {
None
}
}
private[kusto] def reportExceptionAndThrow(
reporter: String,
exception: Exception,
@ -184,7 +208,7 @@ object KustoDataSourceUtils {
val clusterDesc = if (cluster.isEmpty) "" else s", cluster: '$cluster' "
val databaseDesc = if (database.isEmpty) "" else s", database: '$database'"
val tableDesc = if (table.isEmpty) "" else s", table: '$table'"
logError(reporter,s"caught exception $whatFailed$clusterDesc$databaseDesc$tableDesc.${NewLine}EXCEPTION MESSAGE: ${exception.getMessage}")
logError(reporter, s"caught exception $whatFailed$clusterDesc$databaseDesc$tableDesc.${NewLine}EXCEPTION MESSAGE: ${exception.getMessage}")
if (!isLogDontThrow) throw exception
}
@ -341,7 +365,7 @@ object KustoDataSourceUtils {
if (storageAccount.isEmpty || storageContainer.isEmpty || storageSecret.isEmpty) {
val keyVaultParameters = KeyVaultUtils.getStorageParamsFromKeyVault(keyVaultAuthentication)
// If KeyVault contains sas take it
if(!keyVaultParameters.secretIsAccountKey) {
if (!keyVaultParameters.secretIsAccountKey) {
Some(keyVaultParameters)
} else {
if ((storageAccount.isEmpty && keyVaultParameters.account.isEmpty) ||

Просмотреть файл

@ -3,16 +3,15 @@ package com.microsoft.kusto.spark
import java.util.UUID
import com.microsoft.azure.kusto.data.{ClientFactory, ConnectionStringBuilder}
import com.microsoft.kusto.spark.datasource.KustoOptions
import com.microsoft.kusto.spark.datasource.KustoOptions.SinkTableCreationMode
import com.microsoft.kusto.spark.datasink.{KustoSinkOptions, SinkTableCreationMode}
import com.microsoft.kusto.spark.sql.extension.SparkExtension._
import com.microsoft.kusto.spark.utils.KustoQueryUtils
import org.apache.spark.sql.{Row, SparkSession}
import scala.collection.immutable
import org.apache.spark.sql.SparkSession
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
import com.microsoft.kusto.spark.sql.extension.SparkExtension._
import scala.collection.immutable
@RunWith(classOf[JUnitRunner])
class KustoAthenticationTest extends FlatSpec {
@ -21,16 +20,16 @@ class KustoAthenticationTest extends FlatSpec {
.master(f"local[2]")
.getOrCreate()
val cluster: String = System.getProperty(KustoOptions.KUSTO_CLUSTER)
val database: String = System.getProperty(KustoOptions.KUSTO_DATABASE)
val cluster: String = System.getProperty(KustoSinkOptions.KUSTO_CLUSTER)
val database: String = System.getProperty(KustoSinkOptions.KUSTO_DATABASE)
val appId: String = System.getProperty(KustoOptions.KUSTO_AAD_CLIENT_ID)
val appKey: String = System.getProperty(KustoOptions.KUSTO_AAD_CLIENT_PASSWORD)
val authority: String = System.getProperty(KustoOptions.KUSTO_AAD_AUTHORITY_ID, "microsoft.com")
val appId: String = System.getProperty(KustoSinkOptions.KUSTO_AAD_CLIENT_ID)
val appKey: String = System.getProperty(KustoSinkOptions.KUSTO_AAD_CLIENT_PASSWORD)
val authority: String = System.getProperty(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, "microsoft.com")
val keyVaultClientID: String = System.getProperty(KustoOptions.KEY_VAULT_APP_ID)
val keyVaultClientPassword: String = System.getProperty(KustoOptions.KEY_VAULT_APP_KEY)
val keyVaultUri: String = System.getProperty(KustoOptions.KEY_VAULT_URI)
val keyVaultClientID: String = System.getProperty(KustoSinkOptions.KEY_VAULT_APP_ID)
val keyVaultClientPassword: String = System.getProperty(KustoSinkOptions.KEY_VAULT_APP_KEY)
val keyVaultUri: String = System.getProperty(KustoSinkOptions.KEY_VAULT_URI)
"keyVaultAuthentication" should "use key vault for authentication and retracting kusto app auth params" taggedAs KustoE2E in {
import spark.implicits._
@ -47,19 +46,19 @@ class KustoAthenticationTest extends FlatSpec {
df.write
.format("com.microsoft.kusto.spark.datasource")
.option(KustoOptions.KUSTO_CLUSTER, cluster)
.option(KustoOptions.KUSTO_DATABASE, database)
.option(KustoOptions.KUSTO_TABLE, table)
.option(KustoOptions.KEY_VAULT_URI, keyVaultUri)
.option(KustoOptions.KEY_VAULT_APP_ID, keyVaultClientID)
.option(KustoOptions.KEY_VAULT_APP_KEY, keyVaultClientPassword)
.option(KustoOptions.KUSTO_TABLE_CREATE_OPTIONS, SinkTableCreationMode.CreateIfNotExist.toString)
.option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
.option(KustoSinkOptions.KUSTO_DATABASE, database)
.option(KustoSinkOptions.KUSTO_TABLE, table)
.option(KustoSinkOptions.KEY_VAULT_URI, keyVaultUri)
.option(KustoSinkOptions.KEY_VAULT_APP_ID, keyVaultClientID)
.option(KustoSinkOptions.KEY_VAULT_APP_KEY, keyVaultClientPassword)
.option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, SinkTableCreationMode.CreateIfNotExist.toString)
.save()
val conf: Map[String, String] = Map(
KustoOptions.KEY_VAULT_URI -> keyVaultUri,
KustoOptions.KEY_VAULT_APP_ID -> keyVaultClientID,
KustoOptions.KEY_VAULT_APP_KEY -> keyVaultClientPassword
KustoSinkOptions.KEY_VAULT_URI -> keyVaultUri,
KustoSinkOptions.KEY_VAULT_APP_ID -> keyVaultClientID,
KustoSinkOptions.KEY_VAULT_APP_KEY -> keyVaultClientPassword
)
val dfResult = spark.read.kusto(cluster, database, table, conf)
val result = dfResult.select("name", "value").rdd.collect().sortBy(x => x.getInt(1))
@ -83,10 +82,10 @@ class KustoAthenticationTest extends FlatSpec {
df.write
.format("com.microsoft.kusto.spark.datasource")
.option(KustoOptions.KUSTO_CLUSTER, s"https://ingest-$cluster.kusto.windows.net")
.option(KustoOptions.KUSTO_DATABASE, database)
.option(KustoOptions.KUSTO_TABLE, table)
.option(KustoOptions.KUSTO_TABLE_CREATE_OPTIONS, SinkTableCreationMode.CreateIfNotExist.toString)
.option(KustoSinkOptions.KUSTO_CLUSTER, s"https://ingest-$cluster.kusto.windows.net")
.option(KustoSinkOptions.KUSTO_DATABASE, database)
.option(KustoSinkOptions.KUSTO_TABLE, table)
.option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, SinkTableCreationMode.CreateIfNotExist.toString)
.save()
KustoTestUtils.validateResultsAndCleanup(kustoAdminClient, table, database, expectedNumberOfRows, timeoutMs, tableCleanupPrefix = prefix)

Просмотреть файл

@ -4,8 +4,9 @@ import java.security.InvalidParameterException
import java.util.UUID
import com.microsoft.azure.kusto.data.{ClientFactory, ConnectionStringBuilder}
import com.microsoft.kusto.spark.datasource.{KustoOptions, KustoResponseDeserializer}
import com.microsoft.kusto.spark.utils.{KustoBlobStorageUtils, CslCommandsGenerator, KustoQueryUtils, KustoDataSourceUtils => KDSU}
import com.microsoft.kusto.spark.datasink.KustoSinkOptions
import com.microsoft.kusto.spark.datasource.{KustoResponseDeserializer, KustoSourceOptions}
import com.microsoft.kusto.spark.utils.{CslCommandsGenerator, KustoBlobStorageUtils, KustoQueryUtils, KustoDataSourceUtils => KDSU}
import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.junit.runner.RunWith
@ -40,12 +41,12 @@ class KustoBlobAccessE2E extends FlatSpec with BeforeAndAfterAll {
sc.stop()
}
val appId: String = System.getProperty(KustoOptions.KUSTO_AAD_CLIENT_ID)
val appKey: String = System.getProperty(KustoOptions.KUSTO_AAD_CLIENT_PASSWORD)
val authority: String = System.getProperty(KustoOptions.KUSTO_AAD_AUTHORITY_ID)
val cluster: String = System.getProperty(KustoOptions.KUSTO_CLUSTER)
val database: String = System.getProperty(KustoOptions.KUSTO_DATABASE)
val table: String = System.getProperty(KustoOptions.KUSTO_TABLE, "")
val appId: String = System.getProperty(KustoSourceOptions.KUSTO_AAD_CLIENT_ID)
val appKey: String = System.getProperty(KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD)
val authority: String = System.getProperty(KustoSourceOptions.KUSTO_AAD_AUTHORITY_ID)
val cluster: String = System.getProperty(KustoSourceOptions.KUSTO_CLUSTER)
val database: String = System.getProperty(KustoSourceOptions.KUSTO_DATABASE)
val table: String = System.getProperty(KustoSinkOptions.KUSTO_TABLE, "")
val storageAccount: String = System.getProperty("storageAccount", "sparkblobforkustomichael")
val container: String = System.getProperty("container", "CONTAINER")
val blobKey: String = System.getProperty("blobKey", "KEY")
@ -72,13 +73,13 @@ class KustoBlobAccessE2E extends FlatSpec with BeforeAndAfterAll {
df.write
.format("com.microsoft.kusto.spark.datasource")
.option(KustoOptions.KUSTO_CLUSTER, cluster)
.option(KustoOptions.KUSTO_DATABASE, database)
.option(KustoOptions.KUSTO_TABLE, updatedTable)
.option(KustoOptions.KUSTO_AAD_CLIENT_ID, appId)
.option(KustoOptions.KUSTO_AAD_CLIENT_PASSWORD, appKey)
.option(KustoOptions.KUSTO_AAD_AUTHORITY_ID, authority)
.option(KustoOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
.option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
.option(KustoSinkOptions.KUSTO_DATABASE, database)
.option(KustoSinkOptions.KUSTO_TABLE, updatedTable)
.option(KustoSinkOptions.KUSTO_AAD_CLIENT_ID, appId)
.option(KustoSinkOptions.KUSTO_AAD_CLIENT_PASSWORD, appKey)
.option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authority)
.option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
.save()
}

Просмотреть файл

@ -4,7 +4,9 @@ import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import com.microsoft.azure.kusto.data.{ClientFactory, ConnectionStringBuilder}
import com.microsoft.kusto.spark.datasource.{KustoDebugOptions, KustoOptions}
import com.microsoft.kusto.spark.common.KustoDebugOptions
import com.microsoft.kusto.spark.datasink.KustoSinkOptions
import com.microsoft.kusto.spark.datasource.KustoSourceOptions
import com.microsoft.kusto.spark.sql.extension.SparkExtension._
import com.microsoft.kusto.spark.utils.CslCommandsGenerator._
import com.microsoft.kusto.spark.utils.{KustoQueryUtils, KustoDataSourceUtils => KDSU}
@ -42,22 +44,22 @@ class KustoPruneAndFilterE2E extends FlatSpec with BeforeAndAfterAll {
sc.stop()
}
val appId: String = System.getProperty(KustoOptions.KUSTO_AAD_CLIENT_ID)
val appKey: String = System.getProperty(KustoOptions.KUSTO_AAD_CLIENT_PASSWORD)
val authority: String = System.getProperty(KustoOptions.KUSTO_AAD_AUTHORITY_ID, "microsoft.com")
val cluster: String = System.getProperty(KustoOptions.KUSTO_CLUSTER)
val database: String = System.getProperty(KustoOptions.KUSTO_DATABASE)
val appId: String = System.getProperty(KustoSourceOptions.KUSTO_AAD_CLIENT_ID)
val appKey: String = System.getProperty(KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD)
val authority: String = System.getProperty(KustoSourceOptions.KUSTO_AAD_AUTHORITY_ID, "microsoft.com")
val cluster: String = System.getProperty(KustoSourceOptions.KUSTO_CLUSTER)
val database: String = System.getProperty(KustoSourceOptions.KUSTO_DATABASE)
private val loggingLevel: Option[String] = Option(System.getProperty("logLevel"))
if (loggingLevel.isDefined) KDSU.setLoggingLevel(loggingLevel.get)
"KustoSource" should "apply pruning and filtering when reading in lean mode" taggedAs KustoE2E in {
val table: String = System.getProperty(KustoOptions.KUSTO_TABLE)
val query: String = System.getProperty(KustoOptions.KUSTO_QUERY, s"$table | where (toint(ColB) % 1000 == 0) ")
val table: String = System.getProperty(KustoSinkOptions.KUSTO_TABLE)
val query: String = System.getProperty(KustoSourceOptions.KUSTO_QUERY, s"$table | where (toint(ColB) % 1000 == 0) ")
val conf = Map(
KustoOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoDebugOptions.KUSTO_DBG_FORCE_READ_MODE -> "lean"
)
@ -66,8 +68,8 @@ class KustoPruneAndFilterE2E extends FlatSpec with BeforeAndAfterAll {
}
"KustoSource" should "apply pruning and filtering when reading in scale mode" taggedAs KustoE2E in {
val table: String = System.getProperty(KustoOptions.KUSTO_TABLE)
val query: String = System.getProperty(KustoOptions.KUSTO_QUERY, s"$table | where (toint(ColB) % 1 == 0)")
val table: String = System.getProperty(KustoSinkOptions.KUSTO_TABLE)
val query: String = System.getProperty(KustoSourceOptions.KUSTO_QUERY, s"$table | where (toint(ColB) % 1 == 0)")
val storageAccount: String = System.getProperty("storageAccount")
val container: String = System.getProperty("container")
@ -75,19 +77,19 @@ class KustoPruneAndFilterE2E extends FlatSpec with BeforeAndAfterAll {
val blobSas: String = System.getProperty("blobSas")
val conf = if (blobSas.isEmpty) {
Map(KustoOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoOptions.KUSTO_BLOB_STORAGE_ACCOUNT_NAME -> storageAccount,
KustoOptions.KUSTO_BLOB_STORAGE_ACCOUNT_KEY -> blobKey,
KustoOptions.KUSTO_BLOB_CONTAINER -> container)
}
else {
Map(KustoOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoOptions.KUSTO_BLOB_STORAGE_SAS_URL -> blobSas
// , KustoDebugOptions.KUSTO_DBG_BLOB_FORCE_KEEP -> "true"
)
}
Map(KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoSourceOptions.KUSTO_BLOB_STORAGE_ACCOUNT_NAME -> storageAccount,
KustoSourceOptions.KUSTO_BLOB_STORAGE_ACCOUNT_KEY -> blobKey,
KustoSourceOptions.KUSTO_BLOB_CONTAINER -> container)
}
else {
Map(KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> blobSas
// , KustoDebugOptions.KUSTO_DBG_BLOB_FORCE_KEEP -> "true"
)
}
val df = spark.read.kusto(cluster, database, query, conf)
@ -117,26 +119,26 @@ class KustoPruneAndFilterE2E extends FlatSpec with BeforeAndAfterAll {
dfOrig.write
.format("com.microsoft.kusto.spark.datasource")
.option(KustoOptions.KUSTO_CLUSTER, cluster)
.option(KustoOptions.KUSTO_DATABASE, database)
.option(KustoOptions.KUSTO_TABLE, query)
.option(KustoOptions.KUSTO_AAD_CLIENT_ID, appId)
.option(KustoOptions.KUSTO_AAD_CLIENT_PASSWORD, appKey)
.option(KustoOptions.KUSTO_AAD_AUTHORITY_ID, authority)
.option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
.option(KustoSinkOptions.KUSTO_DATABASE, database)
.option(KustoSinkOptions.KUSTO_TABLE, query)
.option(KustoSinkOptions.KUSTO_AAD_CLIENT_ID, appId)
.option(KustoSinkOptions.KUSTO_AAD_CLIENT_PASSWORD, appKey)
.option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authority)
.save()
val conf = if (blobSas.isEmpty) {
Map(KustoOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoOptions.KUSTO_BLOB_STORAGE_ACCOUNT_NAME -> storageAccount,
KustoOptions.KUSTO_BLOB_STORAGE_ACCOUNT_KEY -> blobKey,
KustoOptions.KUSTO_BLOB_CONTAINER -> container,
Map(KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoSourceOptions.KUSTO_BLOB_STORAGE_ACCOUNT_NAME -> storageAccount,
KustoSourceOptions.KUSTO_BLOB_STORAGE_ACCOUNT_KEY -> blobKey,
KustoSourceOptions.KUSTO_BLOB_CONTAINER -> container,
KustoDebugOptions.KUSTO_DBG_BLOB_COMPRESS_ON_EXPORT -> "false") // Just to test this option
}
else {
Map(KustoOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoOptions.KUSTO_BLOB_STORAGE_SAS_URL -> blobSas,
Map(KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> blobSas,
KustoDebugOptions.KUSTO_DBG_BLOB_COMPRESS_ON_EXPORT -> "false") // Just to test this option
}
@ -187,25 +189,25 @@ class KustoPruneAndFilterE2E extends FlatSpec with BeforeAndAfterAll {
dfOrig.write
.format("com.microsoft.kusto.spark.datasource")
.option(KustoOptions.KUSTO_CLUSTER, cluster)
.option(KustoOptions.KUSTO_DATABASE, database)
.option(KustoOptions.KUSTO_TABLE, query)
.option(KustoOptions.KUSTO_AAD_CLIENT_ID, appId)
.option(KustoOptions.KUSTO_AAD_CLIENT_PASSWORD, appKey)
.option(KustoOptions.KUSTO_AAD_AUTHORITY_ID, authority)
.option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
.option(KustoSinkOptions.KUSTO_DATABASE, database)
.option(KustoSinkOptions.KUSTO_TABLE, query)
.option(KustoSinkOptions.KUSTO_AAD_CLIENT_ID, appId)
.option(KustoSinkOptions.KUSTO_AAD_CLIENT_PASSWORD, appKey)
.option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authority)
.save()
val conf = if (blobSas.isEmpty) {
Map(KustoOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoOptions.KUSTO_BLOB_STORAGE_ACCOUNT_NAME -> storageAccount,
KustoOptions.KUSTO_BLOB_STORAGE_ACCOUNT_KEY -> blobKey,
KustoOptions.KUSTO_BLOB_CONTAINER -> container)
Map(KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoSourceOptions.KUSTO_BLOB_STORAGE_ACCOUNT_NAME -> storageAccount,
KustoSourceOptions.KUSTO_BLOB_STORAGE_ACCOUNT_KEY -> blobKey,
KustoSourceOptions.KUSTO_BLOB_CONTAINER -> container)
}
else {
Map(KustoOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoOptions.KUSTO_BLOB_STORAGE_SAS_URL -> blobSas)
Map(KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> blobSas)
}
val dfResult = spark.read.kusto(cluster, database, query, conf)

Просмотреть файл

@ -7,8 +7,7 @@ import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import com.microsoft.azure.kusto.data.{ClientFactory, ConnectionStringBuilder}
import com.microsoft.kusto.spark.datasource.KustoOptions
import com.microsoft.kusto.spark.datasource.KustoOptions.SinkTableCreationMode
import com.microsoft.kusto.spark.datasink.{KustoSinkOptions, SinkTableCreationMode}
import com.microsoft.kusto.spark.sql.extension.SparkExtension._
import com.microsoft.kusto.spark.utils.CslCommandsGenerator._
import com.microsoft.kusto.spark.utils.{KustoQueryUtils, KustoDataSourceUtils => KDSU}
@ -57,12 +56,12 @@ class KustoSinkBatchE2E extends FlatSpec with BeforeAndAfterAll{
sc.stop()
}
val appId: String = System.getProperty(KustoOptions.KUSTO_AAD_CLIENT_ID)
val appKey: String = System.getProperty(KustoOptions.KUSTO_AAD_CLIENT_PASSWORD)
val authority: String = System.getProperty(KustoOptions.KUSTO_AAD_AUTHORITY_ID, "microsoft.com")
val cluster: String = System.getProperty(KustoOptions.KUSTO_CLUSTER)
val database: String = System.getProperty(KustoOptions.KUSTO_DATABASE)
val expectedNumberOfRows: Int = 1 * 1000
val appId: String = System.getProperty(KustoSinkOptions.KUSTO_AAD_CLIENT_ID)
val appKey: String = System.getProperty(KustoSinkOptions.KUSTO_AAD_CLIENT_PASSWORD)
val authority: String = System.getProperty(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, "microsoft.com")
val cluster: String = System.getProperty(KustoSinkOptions.KUSTO_CLUSTER)
val database: String = System.getProperty(KustoSinkOptions.KUSTO_DATABASE)
val expectedNumberOfRows: Int = 1 * 1000
val rows: immutable.IndexedSeq[(String, Int)] = (1 to expectedNumberOfRows).map(v => (newRow(), v))
private val loggingLevel: Option[String] = Option(System.getProperty("logLevel"))
@ -86,19 +85,19 @@ class KustoSinkBatchE2E extends FlatSpec with BeforeAndAfterAll{
dfOrig.write
.format("com.microsoft.kusto.spark.datasource")
.partitionBy("value")
.option(KustoOptions.KUSTO_CLUSTER, cluster)
.option(KustoOptions.KUSTO_DATABASE, database)
.option(KustoOptions.KUSTO_TABLE, table)
.option(KustoOptions.KUSTO_AAD_CLIENT_ID, appId)
.option(KustoOptions.KUSTO_AAD_CLIENT_PASSWORD, appKey)
.option(KustoOptions.KUSTO_AAD_AUTHORITY_ID, authority)
.option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
.option(KustoSinkOptions.KUSTO_DATABASE, database)
.option(KustoSinkOptions.KUSTO_TABLE, table)
.option(KustoSinkOptions.KUSTO_AAD_CLIENT_ID, appId)
.option(KustoSinkOptions.KUSTO_AAD_CLIENT_PASSWORD, appKey)
.option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authority)
.option(DateTimeUtils.TIMEZONE_OPTION, "GMT+4")
.option(KustoOptions.KUSTO_TABLE_CREATE_OPTIONS, SinkTableCreationMode.CreateIfNotExist.toString)
.option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, SinkTableCreationMode.CreateIfNotExist.toString)
.save()
val conf: Map[String, String] = Map(
KustoOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey
KustoSinkOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoSinkOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey
)
val dfResult: DataFrame = spark.read.kusto(cluster, database, table, conf)
@ -162,13 +161,13 @@ class KustoSinkBatchE2E extends FlatSpec with BeforeAndAfterAll{
df.write
.format("com.microsoft.kusto.spark.datasource")
.partitionBy("value")
.option(KustoOptions.KUSTO_CLUSTER, cluster)
.option(KustoOptions.KUSTO_DATABASE, database)
.option(KustoOptions.KUSTO_TABLE, table)
.option(KustoOptions.KUSTO_AAD_CLIENT_ID, appId)
.option(KustoOptions.KUSTO_AAD_CLIENT_PASSWORD, appKey)
.option(KustoOptions.KUSTO_AAD_AUTHORITY_ID, authority)
.option(KustoOptions.KUSTO_TIMEOUT_LIMIT, (8 * 60).toString)
.option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
.option(KustoSinkOptions.KUSTO_DATABASE, database)
.option(KustoSinkOptions.KUSTO_TABLE, table)
.option(KustoSinkOptions.KUSTO_AAD_CLIENT_ID, appId)
.option(KustoSinkOptions.KUSTO_AAD_CLIENT_PASSWORD, appKey)
.option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authority)
.option(KustoSinkOptions.KUSTO_TIMEOUT_LIMIT, (8 * 60).toString)
.save()
val timeoutMs: Int = 8 * 60 * 1000 // 8 minutes
@ -187,13 +186,13 @@ class KustoSinkBatchE2E extends FlatSpec with BeforeAndAfterAll{
df.write
.format("com.microsoft.kusto.spark.datasource")
.option(KustoOptions.KUSTO_CLUSTER, cluster)
.option(KustoOptions.KUSTO_DATABASE, database)
.option(KustoOptions.KUSTO_TABLE, table)
.option(KustoOptions.KUSTO_AAD_CLIENT_ID, appId)
.option(KustoOptions.KUSTO_AAD_CLIENT_PASSWORD, appKey)
.option(KustoOptions.KUSTO_AAD_AUTHORITY_ID, authority)
.option(KustoOptions.KUSTO_WRITE_ENABLE_ASYNC, "true")
.option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
.option(KustoSinkOptions.KUSTO_DATABASE, database)
.option(KustoSinkOptions.KUSTO_TABLE, table)
.option(KustoSinkOptions.KUSTO_AAD_CLIENT_ID, appId)
.option(KustoSinkOptions.KUSTO_AAD_CLIENT_PASSWORD, appKey)
.option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authority)
.option(KustoSinkOptions.KUSTO_WRITE_ENABLE_ASYNC, "true")
.save()
val timeoutMs: Int = 8 * 60 * 1000 // 8 minutes

Просмотреть файл

@ -3,8 +3,7 @@ package com.microsoft.kusto.spark
import java.util.UUID
import com.microsoft.azure.kusto.data.{ClientFactory, ConnectionStringBuilder}
import com.microsoft.kusto.spark.datasource.KustoOptions
import com.microsoft.kusto.spark.datasource.KustoOptions.SinkTableCreationMode
import com.microsoft.kusto.spark.datasink.{KustoSinkOptions, SinkTableCreationMode}
import com.microsoft.kusto.spark.utils.CslCommandsGenerator._
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
@ -16,7 +15,7 @@ import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, FlatSpec}
@RunWith(classOf[JUnitRunner])
class KustoSinkStreamingE2E extends FlatSpec with BeforeAndAfterAll{
class KustoSinkStreamingE2E extends FlatSpec with BeforeAndAfterAll {
val expectedNumberOfRows: Int = 100
val timeoutMs: Int = 8 * 60 * 1000 // 8 minutes
val sleepTimeTillTableCreate: Int = 2 * 60 * 1000 // 2 minutes
@ -40,11 +39,11 @@ class KustoSinkStreamingE2E extends FlatSpec with BeforeAndAfterAll{
sc.stop()
}
val appId: String = System.getProperty(KustoOptions.KUSTO_AAD_CLIENT_ID)
val appKey: String = System.getProperty(KustoOptions.KUSTO_AAD_CLIENT_PASSWORD)
val authority: String = System.getProperty(KustoOptions.KUSTO_AAD_AUTHORITY_ID, "microsoft.com")
val cluster: String = System.getProperty(KustoOptions.KUSTO_CLUSTER)
val database: String = System.getProperty(KustoOptions.KUSTO_DATABASE)
val appId: String = System.getProperty(KustoSinkOptions.KUSTO_AAD_CLIENT_ID)
val appKey: String = System.getProperty(KustoSinkOptions.KUSTO_AAD_CLIENT_PASSWORD)
val authority: String = System.getProperty(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, "microsoft.com")
val cluster: String = System.getProperty(KustoSinkOptions.KUSTO_CLUSTER)
val database: String = System.getProperty(KustoSinkOptions.KUSTO_DATABASE)
val csvPath: String = System.getProperty("path", "src/test/resources/TestData")
val customSchema: StructType = new StructType().add("colA", StringType, nullable = true).add("colB", IntegerType, nullable = true)
@ -78,13 +77,13 @@ class KustoSinkStreamingE2E extends FlatSpec with BeforeAndAfterAll{
.writeStream
.format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
.options(Map(
KustoOptions.KUSTO_CLUSTER -> cluster,
KustoOptions.KUSTO_TABLE -> table,
KustoOptions.KUSTO_DATABASE -> database,
KustoOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoOptions.KUSTO_AAD_AUTHORITY_ID -> authority,
KustoOptions.KUSTO_TABLE_CREATE_OPTIONS -> SinkTableCreationMode.CreateIfNotExist.toString))
KustoSinkOptions.KUSTO_CLUSTER -> cluster,
KustoSinkOptions.KUSTO_TABLE -> table,
KustoSinkOptions.KUSTO_DATABASE -> database,
KustoSinkOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoSinkOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID -> authority,
KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS -> SinkTableCreationMode.CreateIfNotExist.toString))
.trigger(Trigger.Once)
kustoQ.start().awaitTermination()
@ -125,13 +124,13 @@ class KustoSinkStreamingE2E extends FlatSpec with BeforeAndAfterAll{
.writeStream
.format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
.options(Map(
KustoOptions.KUSTO_CLUSTER -> cluster,
KustoOptions.KUSTO_TABLE -> table,
KustoOptions.KUSTO_DATABASE -> database,
KustoOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoOptions.KUSTO_AAD_AUTHORITY_ID -> authority,
KustoOptions.KUSTO_WRITE_ENABLE_ASYNC -> "true"))
KustoSinkOptions.KUSTO_CLUSTER -> cluster,
KustoSinkOptions.KUSTO_TABLE -> table,
KustoSinkOptions.KUSTO_DATABASE -> database,
KustoSinkOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoSinkOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID -> authority,
KustoSinkOptions.KUSTO_WRITE_ENABLE_ASYNC -> "true"))
.trigger(Trigger.Once)
kustoQ.start().awaitTermination()

Просмотреть файл

@ -6,9 +6,9 @@ import java.util.concurrent.atomic.AtomicInteger
import com.microsoft.azure.kusto.ingest.IngestClient
import com.microsoft.azure.kusto.ingest.result.{IngestionResult, IngestionStatus}
import com.microsoft.kusto.spark.authentication.AadApplicationAuthentication
import com.microsoft.kusto.spark.datasink.KustoSink
import com.microsoft.kusto.spark.datasource.{KustoOptions, WriteOptions, KustoCoordinates}
import com.microsoft.kusto.spark.utils.{KustoDataSourceUtils => KDSU, KustoConstants => KCONST}
import com.microsoft.kusto.spark.datasink.{KustoSink, KustoSinkOptions, WriteOptions}
import com.microsoft.kusto.spark.common.KustoCoordinates
import com.microsoft.kusto.spark.utils.{KustoConstants => KCONST, KustoDataSourceUtils => KDSU}
import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.junit.runner.RunWith
@ -54,7 +54,7 @@ class KustoSinkTests extends FlatSpec with MockFactory with Matchers with Before
sqlContext,
KustoCoordinates(kustoCluster, kustoDatabase, Some(kustoTable)),
AadApplicationAuthentication(appId, appKey, appAuthorityId),
WriteOptions(writeResultLimit = KustoOptions.NONE_RESULT_LIMIT, timeout = KCONST.defaultTimeoutLongRunning))
WriteOptions(writeResultLimit = KustoSinkOptions.NONE_RESULT_LIMIT, timeout = KCONST.defaultTimeoutLongRunning))
private val rowId = new AtomicInteger(1)

Просмотреть файл

@ -3,7 +3,9 @@ import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import com.microsoft.azure.kusto.data.{ClientFactory, ConnectionStringBuilder}
import com.microsoft.kusto.spark.datasource.{KustoDebugOptions, KustoOptions}
import com.microsoft.kusto.spark.common.KustoDebugOptions
import com.microsoft.kusto.spark.datasink.KustoSinkOptions
import com.microsoft.kusto.spark.datasource.KustoSourceOptions
import com.microsoft.kusto.spark.sql.extension.SparkExtension._
import com.microsoft.kusto.spark.utils.CslCommandsGenerator._
import com.microsoft.kusto.spark.utils.{KustoQueryUtils, KustoDataSourceUtils => KDSU}
@ -41,22 +43,22 @@ class KustoSourceE2E extends FlatSpec with BeforeAndAfterAll {
sc.stop()
}
val appId: String = System.getProperty(KustoOptions.KUSTO_AAD_CLIENT_ID)
val appKey: String = System.getProperty(KustoOptions.KUSTO_AAD_CLIENT_PASSWORD)
val authority: String = System.getProperty(KustoOptions.KUSTO_AAD_AUTHORITY_ID, "microsoft.com")
val cluster: String = System.getProperty(KustoOptions.KUSTO_CLUSTER)
val database: String = System.getProperty(KustoOptions.KUSTO_DATABASE)
val appId: String = System.getProperty(KustoSinkOptions.KUSTO_AAD_CLIENT_ID)
val appKey: String = System.getProperty(KustoSinkOptions.KUSTO_AAD_CLIENT_PASSWORD)
val authority: String = System.getProperty(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, "microsoft.com")
val cluster: String = System.getProperty(KustoSinkOptions.KUSTO_CLUSTER)
val database: String = System.getProperty(KustoSinkOptions.KUSTO_DATABASE)
private val loggingLevel: Option[String] = Option(System.getProperty("logLevel"))
if (loggingLevel.isDefined) KDSU.setLoggingLevel(loggingLevel.get)
"KustoSource"should "execute a read query on Kusto cluster in lean mode" taggedAs KustoE2E in {
val table: String = System.getProperty(KustoOptions.KUSTO_TABLE)
val query: String = System.getProperty(KustoOptions.KUSTO_QUERY, s"$table | where (toint(ColB) % 1000 == 0) | distinct ColA ")
"KustoSource" should "execute a read query on Kusto cluster in lean mode" taggedAs KustoE2E in {
val table: String = System.getProperty(KustoSinkOptions.KUSTO_TABLE)
val query: String = System.getProperty(KustoSourceOptions.KUSTO_QUERY, s"$table | where (toint(ColB) % 1000 == 0) | distinct ColA ")
val conf: Map[String, String] = Map(
KustoOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoDebugOptions.KUSTO_DBG_FORCE_READ_MODE -> "lean"
)
@ -65,8 +67,8 @@ class KustoSourceE2E extends FlatSpec with BeforeAndAfterAll {
}
"KustoSource" should "execute a read query on Kusto cluster in scale mode" taggedAs KustoE2E in {
val table: String = System.getProperty(KustoOptions.KUSTO_TABLE)
val query: String = System.getProperty(KustoOptions.KUSTO_QUERY, s"$table | where (toint(ColB) % 1 == 0)")
val table: String = System.getProperty(KustoSinkOptions.KUSTO_TABLE)
val query: String = System.getProperty(KustoSourceOptions.KUSTO_QUERY, s"$table | where (toint(ColB) % 1 == 0)")
val storageAccount: String = System.getProperty("storageAccount")
val container: String = System.getProperty("container")
@ -75,11 +77,11 @@ class KustoSourceE2E extends FlatSpec with BeforeAndAfterAll {
val blobSasConnectionString: String = System.getProperty("blobSasQuery")
val conf: Map[String, String] = Map(
KustoOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoOptions.KUSTO_BLOB_STORAGE_ACCOUNT_NAME -> storageAccount,
KustoOptions.KUSTO_BLOB_STORAGE_ACCOUNT_KEY -> blobKey,
KustoOptions.KUSTO_BLOB_CONTAINER -> container
KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoSourceOptions.KUSTO_BLOB_STORAGE_ACCOUNT_NAME -> storageAccount,
KustoSourceOptions.KUSTO_BLOB_STORAGE_ACCOUNT_KEY -> blobKey,
KustoSourceOptions.KUSTO_BLOB_CONTAINER -> container
)
spark.read.kusto(cluster, database, query, conf).show(20)
@ -102,17 +104,17 @@ class KustoSourceE2E extends FlatSpec with BeforeAndAfterAll {
dfOrig.write
.format("com.microsoft.kusto.spark.datasource")
.option(KustoOptions.KUSTO_CLUSTER, cluster)
.option(KustoOptions.KUSTO_DATABASE, database)
.option(KustoOptions.KUSTO_TABLE, table)
.option(KustoOptions.KUSTO_AAD_CLIENT_ID, appId)
.option(KustoOptions.KUSTO_AAD_CLIENT_PASSWORD, appKey)
.option(KustoOptions.KUSTO_AAD_AUTHORITY_ID, authority)
.option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
.option(KustoSinkOptions.KUSTO_DATABASE, database)
.option(KustoSinkOptions.KUSTO_TABLE, table)
.option(KustoSinkOptions.KUSTO_AAD_CLIENT_ID, appId)
.option(KustoSinkOptions.KUSTO_AAD_CLIENT_PASSWORD, appKey)
.option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authority)
.save()
val conf: Map[String, String] = Map(
KustoOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoSinkOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoSinkOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoDebugOptions.KUSTO_DBG_FORCE_READ_MODE -> "lean"
)

Просмотреть файл

@ -1,6 +1,6 @@
package com.microsoft.kusto.spark
import com.microsoft.kusto.spark.datasource.KustoOptions
import com.microsoft.kusto.spark.datasource.KustoSourceOptions
import com.microsoft.kusto.spark.utils.{KustoDataSourceUtils => KDSU}
import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext, SparkSession}
@ -10,7 +10,7 @@ import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
@RunWith(classOf[JUnitRunner])
class KustoSourceTests extends FlatSpec with MockFactory with Matchers with BeforeAndAfterAll{
class KustoSourceTests extends FlatSpec with MockFactory with Matchers with BeforeAndAfterAll {
private val loggingLevel: Option[String] = Option(System.getProperty("logLevel"))
if (loggingLevel.isDefined) KDSU.setLoggingLevel(loggingLevel.get)
@ -53,13 +53,13 @@ class KustoSourceTests extends FlatSpec with MockFactory with Matchers with Befo
val df = spark.sqlContext
.read
.format("com.microsoft.kusto.spark.datasource")
.option(KustoOptions.KUSTO_CLUSTER, cluster)
.option(KustoOptions.KUSTO_DATABASE, database)
.option(KustoOptions.KUSTO_QUERY, query)
.option(KustoOptions.KUSTO_AAD_CLIENT_ID, appId)
.option(KustoOptions.KUSTO_AAD_CLIENT_PASSWORD, appKey)
.option(KustoOptions.KUSTO_AAD_AUTHORITY_ID, appAuthorityId)
.option(KustoOptions.KUSTO_CUSTOM_DATAFRAME_COLUMN_TYPES, customSchema)
.option(KustoSourceOptions.KUSTO_CLUSTER, cluster)
.option(KustoSourceOptions.KUSTO_DATABASE, database)
.option(KustoSourceOptions.KUSTO_QUERY, query)
.option(KustoSourceOptions.KUSTO_AAD_CLIENT_ID, appId)
.option(KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD, appKey)
.option(KustoSourceOptions.KUSTO_AAD_AUTHORITY_ID, appAuthorityId)
.option(KustoSourceOptions.KUSTO_CUSTOM_DATAFRAME_COLUMN_TYPES, customSchema)
.load("src/test/resources/")
df.printSchema()

Просмотреть файл

@ -19,12 +19,12 @@ This authentication method is fairly straightforward, and it is used in most of
```
df.write
.format("com.microsoft.kusto.spark.datasource")
.option(KustoOptions.KUSTO_CLUSTER, "MyCluster")
.option(KustoOptions.KUSTO_DATABASE, "MyDatabase")
.option(KustoOptions.KUSTO_TABLE, "MyTable")
.option(KustoOptions.KUSTO_AAD_CLIENT_ID, "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx")
.option(KustoOptions.KUSTO_AAD_CLIENT_PASSWORD, "MyPassword")
.option(KustoOptions.KUSTO_AAD_AUTHORITY_ID, "AAD Authority Id") // "microsoft.com"
.option(KustoSinkOptions.KUSTO_CLUSTER, "MyCluster")
.option(KustoSinkOptions.KUSTO_DATABASE, "MyDatabase")
.option(KustoSinkOptions.KUSTO_TABLE, "MyTable")
.option(KustoSinkOptions.KUSTO_AAD_CLIENT_ID, "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx")
.option(KustoSinkOptions.KUSTO_AAD_CLIENT_PASSWORD, "MyPassword")
.option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, "AAD Authority Id") // "microsoft.com"
.save()
```
## Key Vault
@ -77,21 +77,22 @@ val keyVaultUri: String = "keyVaultUri"
df.write
.format("com.microsoft.kusto.spark.datasource")
.option(KustoOptions.KUSTO_CLUSTER, MyCluster)
.option(KustoOptions.KUSTO_DATABASE, MyDatabase)
.option(KustoOptions.KUSTO_TABLE, MyTable)
.option(KustoOptions.KEY_VAULT_URI, keyVaultUri)
.option(KustoOptions.KEY_VAULT_APP_ID, keyVaultClientID)
.option(KustoOptions.KEY_VAULT_APP_KEY, keyVaultClientPassword)
.option(KustoSinkOptions.KUSTO_CLUSTER, MyCluster)
.option(KustoSinkOptions.KUSTO_DATABASE, MyDatabase)
.option(KustoSinkOptions.KUSTO_TABLE, MyTable)
.option(KustoSinkOptions.KEY_VAULT_URI, keyVaultUri)
.option(KustoSinkOptions.KEY_VAULT_APP_ID, keyVaultClientID)
.option(KustoSinkOptions.KEY_VAULT_APP_KEY, keyVaultClientPassword)
.save()
val conf: Map[String, String] = Map(
KustoOptions.KEY_VAULT_URI -> keyVaultUri,
KustoOptions.KEY_VAULT_APP_ID -> keyVaultClientID,
KustoOptions.KEY_VAULT_APP_KEY -> keyVaultClientPassword
KustoSourceOptions.KEY_VAULT_URI -> keyVaultUri,
KustoSourceOptions.KEY_VAULT_APP_ID -> keyVaultClientID,
KustoSourceOptions.KEY_VAULT_APP_KEY -> keyVaultClientPassword
)
val dfResult = spark.read.kusto(cluster, database, table, conf)
val query = table
val dfResult = spark.read.kusto(cluster, database, query, conf)
```
## Direct Authentication with Access Token
User can also use ADAL directly to acquire an AAD access token to access Kusto.
@ -102,9 +103,9 @@ The token must be valid throughout the duration of the read/write operation
```
df.write
.format("com.microsoft.kusto.spark.datasource")
.option(KustoOptions.KUSTO_CLUSTER, "MyCluster")
.option(KustoOptions.KUSTO_DATABASE, "MyDatabase")
.option(KustoOptions.KUSTO_TABLE, "MyTable")
.option(KustoSinkOptions.KUSTO_CLUSTER, "MyCluster")
.option(KustoSinkOptions.KUSTO_DATABASE, "MyDatabase")
.option(KustoSinkOptions.KUSTO_TABLE, "MyTable")
.option(KustoOptions., "MyTable")
.save()
```

Просмотреть файл

@ -29,14 +29,16 @@ that is using it. Please verify the following before using Kusto connector:
<dataframe-object>
.write
.format("com.microsoft.kusto.spark.datasource")
.option(KustoOptions.<option-name-1>, <option-value-1>
.option(KustoSinkOptions.<option-name-1>, <option-value-1>
...
.option(KustoOptions.<option-name-n>, <option-value-n>
.option(KustoSinkOptions.<option-name-n>, <option-value-n>
.save()
```
### Supported Options
All the options that can be use in the Kusto sink are under the object KustoSinkOptions.
**Mandatory Parameters:**
* **KUSTO_CLUSTER**:
@ -103,12 +105,12 @@ Synchronous mode, table already exists:
```
df.write
.format("com.microsoft.kusto.spark.datasource")
.option(KustoOptions.KUSTO_CLUSTER, "MyCluster")
.option(KustoOptions.KUSTO_DATABASE, "MyDatabase")
.option(KustoOptions.KUSTO_TABLE, "MyTable")
.option(KustoOptions.KUSTO_AAD_CLIENT_ID, "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx")
.option(KustoOptions.KUSTO_AAD_CLIENT_PASSWORD, "MyPassword")
.option(KustoOptions.KUSTO_AAD_AUTHORITY_ID, "AAD Authority Id") // "microsoft.com"
.option(KustoSinkOptions.KUSTO_CLUSTER, "MyCluster")
.option(KustoSinkOptions.KUSTO_DATABASE, "MyDatabase")
.option(KustoSinkOptions.KUSTO_TABLE, "MyTable")
.option(KustoSinkOptions.KUSTO_AAD_CLIENT_ID, "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx")
.option(KustoSinkOptions.KUSTO_AAD_CLIENT_PASSWORD, "MyPassword")
.option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, "AAD Authority Id") // "microsoft.com"
.save()
```

Просмотреть файл

@ -45,15 +45,17 @@ that is using it. Please verify the following before using Kusto connector:
<dataframe-name> =
sqlContext.read
.format("com.microsoft.kusto.spark.datasource")
.option(KustoOptions.KUSTO_CLUSTER, <cluster-name>)
.option(KustoOptions.KUSTO_DATABASE, <database-name>)
.option(KustoOptions.KUSTO_QUERY, <kusto-query>)
.option(KustoSourceOptions.KUSTO_CLUSTER, <cluster-name>)
.option(KustoSourceOptions.KUSTO_DATABASE, <database-name>)
.option(KustoSourceOptions.KUSTO_QUERY, <kusto-query>)
.load()
```
Where **parameters map** is identical for both syntax flavors.
### Supported Options
All the options that can be use in the Kusto source are under the object KustoSourceOptions.
**Mandatory Parameters:**
* **KUSTO_CLUSTER**:
@ -110,8 +112,8 @@ Once the RDD is no longer required by the caller application, the container and/
Create a DataFrame based on a query accessing 'MyKustoTable' table
```
val conf: Map[String, String] = Map(
KustoOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey
KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey
)
val df = spark.read.kusto(cluster, database, "MyKustoTable | where (ColB % 1000 == 0) | distinct ColA ", conf)
@ -122,15 +124,15 @@ Once the RDD is no longer required by the caller application, the container and/
Create a DataFrame by reading all of 'MyKustoTable' table
```
val conf: Map[String, String] = Map(
KustoOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoOptions.KUSTO_QUERY -> "MyKustoTable"
KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoSourceOptions.KUSTO_QUERY -> "MyKustoTable"
)
val df = sqlContext.read
.format("com.microsoft.kusto.spark.datasource")
.option(KustoOptions.KUSTO_CLUSTER, "MyCluster")
.option(KustoOptions.KUSTO_DATABASE, "MyDatabase")
.option(KustoSourceOptions.KUSTO_CLUSTER, "MyCluster")
.option(KustoSourceOptions.KUSTO_DATABASE, "MyDatabase")
.options(conf)
.load()
```

Просмотреть файл

@ -18,6 +18,9 @@ When writing to or reading from a Kusto table, the connector converts types from
|DecimalType |decimal
|TimestampType |datetime
|DateType |datetime
|StructType |dynamic
|MapType |dynamic
|ArrayType |dynamic
#####Kusto DataTypes mapping to Spark DataTypes

Просмотреть файл

Просмотреть файл

@ -6,7 +6,8 @@
import java.util.concurrent.atomic.AtomicInteger
import com.microsoft.kusto.spark.datasource.KustoOptions
import com.microsoft.kusto.spark.datasink.KustoSinkOptions
import com.microsoft.kusto.spark.datasource.KustoSourceOptions
import com.microsoft.kusto.spark.utils.{KustoDataSourceUtils => KDSU}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StringType, StructType}
@ -54,12 +55,12 @@ object KustoConnectorDemo {
// To see how the number of partitions effect the command performance, use 'repartition': e.g. 'df.repartition(16).write...'
df.write
.format("com.microsoft.kusto.spark.datasource")
.option(KustoOptions.KUSTO_CLUSTER, cluster)
.option(KustoOptions.KUSTO_DATABASE, database)
.option(KustoOptions.KUSTO_TABLE, table)
.option(KustoOptions.KUSTO_AAD_CLIENT_ID, appId)
.option(KustoOptions.KUSTO_AAD_CLIENT_PASSWORD, appKey)
.option(KustoOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
.option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
.option(KustoSinkOptions.KUSTO_DATABASE, database)
.option(KustoSinkOptions.KUSTO_TABLE, table)
.option(KustoSinkOptions.KUSTO_AAD_CLIENT_ID, appId)
.option(KustoSinkOptions.KUSTO_AAD_CLIENT_PASSWORD, appKey)
.option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
.save()
// To write the data to a Kusto cluster, ASYNCronously, add:
@ -92,20 +93,20 @@ object KustoConnectorDemo {
.writeStream
.format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
.options(Map(
KustoOptions.KUSTO_CLUSTER -> cluster,
KustoOptions.KUSTO_TABLE -> table,
KustoOptions.KUSTO_DATABASE -> database,
KustoOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoOptions.KUSTO_AAD_AUTHORITY_ID -> authorityId))
KustoSinkOptions.KUSTO_CLUSTER -> cluster,
KustoSinkOptions.KUSTO_TABLE -> table,
KustoSinkOptions.KUSTO_DATABASE -> database,
KustoSinkOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoSinkOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID -> authorityId))
.trigger(Trigger.Once)
kustoQ.start().awaitTermination(TimeUnit.MINUTES.toMillis(8))
val conf: Map[String, String] = Map(
KustoOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoOptions.KUSTO_QUERY -> s"$table | where (ColB % 1000 == 0) | distinct ColA"
KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoSourceOptions.KUSTO_QUERY -> s"$table | where (ColB % 1000 == 0) | distinct ColA"
)
// Simplified syntax flavour
@ -115,15 +116,15 @@ object KustoConnectorDemo {
// ELABORATE READ SYNTAX
// Here we read the whole table.
val conf2: Map[String, String] = Map(
KustoOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoOptions.KUSTO_QUERY -> "StringAndIntTable"
KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoSourceOptions.KUSTO_QUERY -> "StringAndIntTable"
)
val df3 = spark.sqlContext.read
.format("com.microsoft.kusto.spark.datasource")
.option(KustoOptions.KUSTO_CLUSTER, cluster)
.option(KustoOptions.KUSTO_DATABASE, database)
.option(KustoSourceOptions.KUSTO_CLUSTER, cluster)
.option(KustoSourceOptions.KUSTO_DATABASE, database)
.options(conf2)
.load()
@ -143,12 +144,12 @@ object KustoConnectorDemo {
// READING LARGE DATA SET: SET UP CONFIGURATION
val conf3: Map[String, String] = Map(
KustoOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoOptions.KUSTO_BLOB_CONTAINER -> container,
KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoSourceOptions.KUSTO_BLOB_CONTAINER -> container,
//KustoOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas,
KustoOptions.KUSTO_BLOB_STORAGE_ACCOUNT_NAME -> storageAccountName,
KustoOptions.KUSTO_BLOB_STORAGE_ACCOUNT_KEY -> storageAccountKey
KustoSourceOptions.KUSTO_BLOB_STORAGE_ACCOUNT_NAME -> storageAccountName,
KustoSourceOptions.KUSTO_BLOB_STORAGE_ACCOUNT_KEY -> storageAccountKey
)
// READING LARGE DATA SET(FULL TABLE, UNFILTERED)
@ -178,17 +179,17 @@ object KustoConnectorDemo {
/* blobContainer, blobStorageAccountKey, blobStorageAccountName, kustoAppAuthority, kustoAppId, kustoAppKey */
/************************************************************************************************************/
val conf4: Map[String, String] = Map(
KustoOptions.KEY_VAULT_URI -> keyVaultUri,
KustoOptions.KEY_VAULT_APP_ID -> keyVaultClientID,
KustoOptions.KEY_VAULT_APP_KEY -> keyVaultClientPassword,
KustoOptions.KUSTO_QUERY -> "StringAndIntExpTable"
KustoSourceOptions.KEY_VAULT_URI -> keyVaultUri,
KustoSourceOptions.KEY_VAULT_APP_ID -> keyVaultClientID,
KustoSourceOptions.KEY_VAULT_APP_KEY -> keyVaultClientPassword,
KustoSourceOptions.KUSTO_QUERY -> "StringAndIntExpTable"
)
val df6 =
spark.sqlContext.read
.format("com.microsoft.kusto.spark.datasource")
.option(KustoOptions.KUSTO_CLUSTER, cluster)
.option(KustoOptions.KUSTO_DATABASE, "ExperimentDb")
.option(KustoSourceOptions.KUSTO_CLUSTER, cluster)
.option(KustoSourceOptions.KUSTO_DATABASE, "ExperimentDb")
.options(conf4)
.load()
}

Просмотреть файл

@ -1,4 +1,5 @@
import com.microsoft.kusto.spark.datasource.KustoOptions
import com.microsoft.kusto.spark.datasink.KustoSinkOptions
import com.microsoft.kusto.spark.datasource.KustoSourceOptions
import com.microsoft.kusto.spark.sql.extension.SparkExtension._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
@ -12,9 +13,9 @@ object SimpleKustoDataSink {
.setMaster("local[*]")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val conf: Map[String, String] = Map(
KustoOptions.KUSTO_AAD_CLIENT_ID -> "Your Client ID",
KustoOptions.KUSTO_AAD_CLIENT_PASSWORD -> "Your secret",
KustoOptions.KUSTO_QUERY -> "Your Kusto query"
KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> "Your Client ID",
KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> "Your secret",
KustoSourceOptions.KUSTO_QUERY -> "Your Kusto query"
)
// Create a DF - read from a Kusto cluster
@ -24,11 +25,11 @@ object SimpleKustoDataSink {
// Now write to a Kusto table
df.write
.format("com.microsoft.kusto.spark.datasource")
.option(KustoOptions.KUSTO_CLUSTER, "Your Kusto Cluster")
.option(KustoOptions.KUSTO_DATABASE, "Your Kusto Database")
.option(KustoOptions.KUSTO_TABLE, "Your Kusto Destination Table")
.option(KustoOptions.KUSTO_AAD_CLIENT_ID, "Your Client ID")
.option(KustoOptions.KUSTO_AAD_CLIENT_PASSWORD, "Your secret")
.option(KustoSinkOptions.KUSTO_CLUSTER, "Your Kusto Cluster")
.option(KustoSinkOptions.KUSTO_DATABASE, "Your Kusto Database")
.option(KustoSinkOptions.KUSTO_TABLE, "Your Kusto Destination Table")
.option(KustoSinkOptions.KUSTO_AAD_CLIENT_ID, "Your Client ID")
.option(KustoSinkOptions.KUSTO_AAD_CLIENT_PASSWORD, "Your secret")
.save()
sparkSession.stop

Просмотреть файл

@ -1,4 +1,4 @@
import com.microsoft.kusto.spark.datasource.KustoOptions
import com.microsoft.kusto.spark.datasource.KustoSourceOptions
import com.microsoft.kusto.spark.sql.extension.SparkExtension._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
@ -12,12 +12,12 @@ object SimpleKustoDataSource {
.setMaster("local[*]")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val conf: Map[String, String] = Map(
KustoOptions.KUSTO_AAD_CLIENT_ID -> "Your Client ID",
KustoOptions.KUSTO_AAD_CLIENT_PASSWORD -> "Your secret",
KustoOptions.KUSTO_QUERY -> "Your Kusto query",
KustoOptions.KUSTO_BLOB_STORAGE_ACCOUNT_NAME -> "Your blob storage account",
KustoOptions.KUSTO_BLOB_STORAGE_ACCOUNT_KEY -> "Your storage account key, Alternatively, SAS key can be used",
KustoOptions.KUSTO_BLOB_CONTAINER -> "Your blob storage container name"
KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> "Your Client ID",
KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> "Your secret",
KustoSourceOptions.KUSTO_QUERY -> "Your Kusto query",
KustoSourceOptions.KUSTO_BLOB_STORAGE_ACCOUNT_NAME -> "Your blob storage account",
KustoSourceOptions.KUSTO_BLOB_STORAGE_ACCOUNT_KEY -> "Your storage account key, Alternatively, SAS key can be used",
KustoSourceOptions.KUSTO_BLOB_CONTAINER -> "Your blob storage container name"
)
val df = sparkSession.read.kusto("Your Kusto Cluster", "Your Kusto Database", "Your Kusto Query in KustoOptions.Kusto_Query", conf)
df.show

Просмотреть файл

@ -1,5 +1,6 @@
import java.util.concurrent.TimeUnit
import com.microsoft.kusto.spark.datasource.KustoOptions
import com.microsoft.kusto.spark.datasink.KustoSinkOptions
import org.apache.spark.sql._
import org.apache.spark.eventhubs.{ConnectionStringBuilder, EventHubsConf, EventPosition}
import org.apache.spark.sql.streaming.Trigger
@ -46,16 +47,15 @@ object SparkStreamingKustoSink {
val df1 = df
.writeStream
.format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
.option(KustoOptions.KUSTO_CLUSTER, "Your Kusto Cluster")
.option(KustoOptions.KUSTO_DATABASE, "Your Kusto Database")
.option(KustoOptions.KUSTO_TABLE, "Your Kusto Destination Table")
.option(KustoOptions.KUSTO_AAD_CLIENT_ID, "Your Client ID")
.option(KustoOptions.KUSTO_AAD_CLIENT_PASSWORD, "Your secret")
.option(KustoSinkOptions.KUSTO_CLUSTER, "Your Kusto Cluster")
.option(KustoSinkOptions.KUSTO_DATABASE, "Your Kusto Database")
.option(KustoSinkOptions.KUSTO_TABLE, "Your Kusto Destination Table")
.option(KustoSinkOptions.KUSTO_AAD_CLIENT_ID, "Your Client ID")
.option(KustoSinkOptions.KUSTO_AAD_CLIENT_PASSWORD, "Your secret")
.trigger(Trigger.ProcessingTime(0))
.start()
df1.awaitTermination(TimeUnit.MINUTES.toMillis(8))
}
}