Merge branch 'dev' into Clean-up-dependencies

This commit is contained in:
michazag 2019-03-19 13:08:53 +02:00
Родитель 8548b3c324 022741c735
Коммит 729c38a1a6
8 изменённых файлов: 154 добавлений и 104 удалений

Просмотреть файл

@ -69,7 +69,7 @@ In order to use the connector, you need to have:
- [Maven 3.x](https://maven.apache.org/download.cgi) installed
- Spark version 2.4.0 or higher
>**Note**: 2.3.x versions are also supported, but require some changes in pom.xml dependencies.
>**Note:** 2.3.x versions are also supported, but require some changes in pom.xml dependencies.
For details, refer to [CHANGELIST](docs/CHANGELIST)
## Build Commands

Просмотреть файл

@ -46,7 +46,6 @@ object KustoOptions {
// it will be created, with a schema matching the DataFrame that is being written.
// Default: 'FailIfNotExist'
val KUSTO_TABLE_CREATE_OPTIONS: String = newOption("tableCreateOptions")
val KUSTO_TRUNCATE: String = newOption("truncate")
val KUSTO_ACCESS_TOKEN: String = newOption("accessToken")
// When writing to Kusto, allows the driver to complete operation asynchronously. See KustoSink.md for
// details and limitations. Default: 'false'
@ -63,8 +62,10 @@ object KustoOptions {
val KUSTO_READ_PARTITION_MODE: String = newOption("partitionMode")
// Note: for 'read', this allows to partition export from Kusto to blob (default is '1')
// It does not affect the number of partitions created when reading from blob.
// Therefore it is not recommended to use this option when reading from Kusto, left here for experimentation
// Therefore it is not recommended to use this option when reading from Kusto
// CURRENTLY NOT USED, left here for experimentation
val KUSTO_NUM_PARTITIONS: String = newOption("numPartitions")
// CURRENTLY NOT USED, left here for experimentation
val KUSTO_PARTITION_COLUMN: String = newOption("partitionColumn")
object SinkTableCreationMode extends Enumeration {
@ -83,7 +84,6 @@ object KustoOptions {
val KUSTO_BLOB_STORAGE_SAS_URL: String = newOption("blobStorageSasUrl")
// Blob container name
val KUSTO_BLOB_CONTAINER: String = newOption("blobContainer")
// When reading in 'scale' mode, sets Spark configuration to read from Azure blob.
// The following configuration parameters are set:
// 1. Blob access secrete:
@ -97,7 +97,6 @@ object KustoOptions {
// If set to 'true', the connector will update these parameters on every 'read' operation
// Default: 'false'
val KUSTO_BLOB_SET_FS_CONFIG: String = newOption("blobSetFsConfig")
// When reading in 'scale' mode, compresses the data upon export from Kusto to Blob
// This feature is experimental, in order to measure performance impact w/wo compression
// Default: 'true'

Просмотреть файл

@ -8,15 +8,7 @@ import org.apache.spark.sql._
import scala.collection.JavaConverters._
object SparkExtension {
/*
implicit class SparkSessionExtension(spark: SparkSession) {
def kusto(kustoQuery: String): DataFrame = {
}
}
*/
implicit class DataFrameReaderExtension(dataframeReader: DataFrameReader) {
implicit class DataFrameReaderExtension(dataframeReader: DataFrameReader) {
def kusto(kustoCluster: String, database: String, query: String, properties: Map[String, String]): DataFrame = {
dataframeReader.format("com.microsoft.kusto.spark.datasource")
@ -26,21 +18,5 @@ object SparkExtension {
.options(properties)
.load()
}
def kusto(kustoCluster: String,
database: String,
query: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
connectionProperties: Properties): DataFrame = {
dataframeReader.option(KustoOptions.KUSTO_PARTITION_COLUMN, columnName)
.option(KustoOptions.KUSTO_NUM_PARTITIONS, numPartitions.toString)
val hashMap = new scala.collection.mutable.HashMap[String, String]
hashMap ++= connectionProperties.asScala
kusto(kustoCluster, database, query, hashMap.toMap)
}
}
}

Просмотреть файл

@ -61,20 +61,20 @@ object KeyVaultUtils {
}
private def getStorageParamsFromKeyVaultImpl(client: KeyVaultClient, uri: String): KustoStorageParameters = {
val sasUrl = client.getSecret(uri, SasUrl).value()
val accountId = client.getSecret(uri, StorageAccountId)
val sasUrl = Option(client.getSecret(uri, SasUrl))
val accountKey = client.getSecret(uri, StorageAccountKey)
val container = client.getSecret(uri, Container)
val accountId = Option(client.getSecret(uri, StorageAccountId))
val accountKey = Option(client.getSecret(uri, StorageAccountKey))
val container = Option(client.getSecret(uri, Container))
if(sasUrl.isEmpty) {
KustoStorageParameters(
account = if (accountId == null) null else accountId.value(),
secret = if (accountKey == null) null else accountKey.value(),
container = if (container == null) null else container.value(),
account = if(accountId.isDefined) accountId.get.value else "",
secret = if (accountKey.isDefined) accountKey.get.value else "",
container = if (container.isDefined) container.get.value else "",
storageSecretIsAccountKey = true)
} else {
KustoDataSourceUtils.parseSas(sasUrl)
KustoDataSourceUtils.parseSas(sasUrl.get.value)
}
}
}

Просмотреть файл

@ -348,7 +348,7 @@ object KustoDataSourceUtils{
Some(keyVaultParameters.container)
} else storageContainer
getAndValidateTransientStorageParameters(account, secret, container, storageSecretIsAccountKey = true)
getAndValidateTransientStorageParameters(account, container, secret, storageSecretIsAccountKey = true)
}
} else {
KustoStorageParameters(storageAccount.get, storageSecret.get, storageContainer.get, storageSecretIsAccountKey)

Просмотреть файл

@ -34,6 +34,9 @@ Kusto Spark connector allows authentication using Azure Key Vault. The Key Vaul
mandatory read/write authentication parameters. If a parameter appears in both the Key Vault and passed directly as an option, the direct option will take precedence.
Although a combination of Key Vault and direct options is supported, for clarity, it is advised to use
either key vault or direct options for passing authentication parameters but not both.
>**Note:** when working with a Databricks notebook, azure-keyvault package must be installed.
For details, refer to [Databricks documentation](https://docs.databricks.com/user-guide/libraries.html#maven-or-spark-package).
**The connector will look for the following secret names:**
@ -50,7 +53,7 @@ either key vault or direct options for passing authentication parameters but not
### Transient Storage Parameters
>**Note**: these parameters are only required when working in "scale" reading mode. For details, refer to [Kusto Source](KustoSource.md#supported-options) documentation on
>**Note:** these parameters are only required when working in "scale" reading mode. For details, refer to [Kusto Source](KustoSource.md#supported-options) documentation on
_KUSTO_READ_MODE_.
* **blobStorageAccountName**
@ -92,7 +95,7 @@ val conf: Map[String, String] = Map(
val dfResult = spark.read.kusto(cluster, database, table, conf)
```
## Access token
## Direct Authentication with Access Token
User can also use ADAL directly to acquire an AAD access token to access Kusto.
The token must be valid throughout the duration of the read/write operation
@ -114,4 +117,4 @@ to the console. This token can be used to authenticate at https://login.microsof
and will allow temporary access.
The user needs appropriate privileges for the Kusto cluster as explained in [Kusto Sink authentication section](KustoSink.md#authentication).
>**Note**: This method is not recommended for production!
>**Note:** This method is not recommended for production!

Просмотреть файл

@ -83,7 +83,7 @@ that is using it. Please verify the following before using Kusto connector:
This is an upper limit that may coexist with addition timeout limits as configured on Spark or Kusto clusters.
Default: '5400' (90 minutes)
>**Note**:
>**Note:**
For both synchronous and asynchronous operation, 'write' is an atomic transaction, i.e.
either all data is written to Kusto, or no data is written.

Просмотреть файл

@ -4,53 +4,42 @@
/* (Apache Spark 2.4.0, Scala 2.11) */
/***************************************************************************************/
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import com.microsoft.kusto.spark.datasource.KustoOptions
import com.microsoft.kusto.spark.sql.extension.SparkExtension._
import com.microsoft.kusto.spark.utils.{KustoDataSourceUtils => KDSU}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.DataTypes.IntegerType
import org.apache.spark.sql.types.{StringType, StructType}
object KustoConnectorDemo {
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "Your HADOOP_HOME")
// COMMAND ----------
// Note! This command is not required if you run in a Databricks notebook
val spark: SparkSession = SparkSession.builder()
.appName("KustoSink")
.master(f"local[4]")
.getOrCreate()
// COMMAND ----------
// BASIC SETUP ----------
// Set logging level. Logs are available via databricks visualization
KDSU.setLoggingLevel("all")
// COMMAND ----------
// Get the application Id and Key from the secret store. Scope must be pre-configured in Databricks. For details, refer to https://docs.azuredatabricks.net/user-guide/secrets/index.html#secrets-user-guide
// The application must be registered in Azure Active Directory (AAD)
// It must also have proper privileges on the target database and table
val KustoSparkTestAppId = "Your AAD Application Id" // Databricks example: dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
val KustoSparkTestAppKey = "Your AAD Application Key" // Databricks example: dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
// COMMAND ----------
// SETUP SINK CONNECTION PARAMETERS ----------
// Set up sink connection parameters
val appId = s"$KustoSparkTestAppId"
val appKey = s"$KustoSparkTestAppKey"
val authorityId = "Your AAD authority id"
val appId= KustoSparkTestAppId
val appKey=KustoSparkTestAppKey
val authorityId = "Your AAD authority id. A.K.A. directory/tenant id when registering an AAD client" // For microsoft applications, typically "72f988bf-86f1-41af-91ab-2d7cd011db47"
val cluster = "Your Cluster Name"
val database = "Your Database Name"
val table = "Your Kusto Table Name"
// COMMAND ----------
/** ************************************************/
/* BATCH SINK EXAMPLES */
/** ************************************************/
// generate a Data Frame for batch sink
// GENERATE DATA TO WRITE ----------
val rowId = new AtomicInteger(1)
def newRow(): String = s"row-${rowId.getAndIncrement()}"
@ -60,13 +49,11 @@ object KustoConnectorDemo {
import spark.implicits._
val df = rows.toDF("name", "value")
// COMMAND ----------
// BATCH SINK EXAMPLES
// BATCH SINK (WRITE)
// Write the data to a Kusto cluster, synchronously
// To see how the number of artitions effect the command performance, use 'repartition': e.g. 'df.repartition(16).write...'
// To see how the number of partitions effect the command performance, use 'repartition': e.g. 'df.repartition(16).write...'
df.write
.format("com.microsoft.kusto.spark.datasource")
.partitionBy("value")
.option(KustoOptions.KUSTO_CLUSTER, cluster)
.option(KustoOptions.KUSTO_DATABASE, database)
.option(KustoOptions.KUSTO_TABLE, table)
@ -75,56 +62,141 @@ object KustoConnectorDemo {
.option(KustoOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
.save()
// COMMAND ----------
// BATCH SINK EXAMPLES
// Write the data to a Kusto cluster, ASYNChronously
// The driver will return quickly, and will complete the operation asynchronously once the workers end ingestion to Kusto.
// However exceptions are not captured in the driver, and tracking command success/failure status is not straightforward as in synchronous mode
df.write
.format("com.microsoft.kusto.spark.datasource")
.partitionBy("value")
.option(KustoOptions.KUSTO_CLUSTER, cluster)
.option(KustoOptions.KUSTO_DATABASE, database)
.option(KustoOptions.KUSTO_TABLE, table)
.option(KustoOptions.KUSTO_AAD_CLIENT_ID, appId)
.option(KustoOptions.KUSTO_AAD_CLIENT_PASSWORD, appKey)
.option(KustoOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
.option(KustoOptions.KUSTO_WRITE_ENABLE_ASYNC, value = true)
.save()
// To write the data to a Kusto cluster, ASYNCronously, add:
// " .option(KustoOptions.KUSTO_WRITE_ENABLE_ASYNC, true) "
// The driver will return quickly, and will complete the operation asynchronously once the workers end ingestion to Kusto.
// However exceptions are not captured in the driver, and tracking command success/failure status is not straightforward as in synchronous mode
// COMMAND ----------
/** ************************************************/
/* SOURCE EXAMPLES */
/** ************************************************/
/* USING KUSTO QUERY, LEAN MODE */
/** *****************************/
val conf: Map[String, String] = Map(
// PREPARE A DATA FRAME FOR STREAMING SINK (WRITE)
import org.apache.spark.sql.types.DataTypes.IntegerType
val customSchema = new StructType().add("colA", StringType, nullable = true).add("colB", IntegerType, nullable = true)
// Read data from file to a stream
val csvDf = spark
.readStream
.schema(customSchema)
.csv("/FileStore/tables")
// PERFORM STREAMING WRITE
import java.util.concurrent.TimeUnit
import org.apache.spark.sql.streaming.Trigger
// Set up a checkpoint and disable codeGen
spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
spark.conf.set("spark.sql.codegen.wholeStage","false")
// Write to a Kusto table fro streaming source
val kustoQ = csvDf
.writeStream
.format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
.options(Map(
KustoOptions.KUSTO_CLUSTER -> cluster,
KustoOptions.KUSTO_TABLE -> table,
KustoOptions.KUSTO_DATABASE -> database,
KustoOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoOptions.KUSTO_AAD_AUTHORITY_ID -> authorityId))
.trigger(Trigger.Once)
kustoQ.start().awaitTermination(TimeUnit.MINUTES.toMillis(8))
// READING IN LEAN MODE
val conf: Map[String, String] = Map(
KustoOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoOptions.KUSTO_READ_MODE -> "lean",
KustoOptions.KUSTO_QUERY -> s"$table | where (ColB % 1000 == 0) | distinct ColA "
KustoOptions.KUSTO_QUERY -> s"$table | where (ColB % 1000 == 0) | distinct ColA",
KustoOptions.KUSTO_READ_MODE->"lean"
)
// SOURCE EXAMPLES: query, simplified syntax flavor
// Simplified syntax flavour
import com.microsoft.kusto.spark.sql.extension.SparkExtension._
val df2 = spark.read.kusto(cluster, database, "", conf)
// Databricks: display(df2)
// COMMAND ----------
// SOURCE EXAMPLES: query, elaborate (without extension) format.
// In this case we run a query. Table parameter is empty.
// ELABORATE READ SYNTAX, LEAN
// Here we read the whole table.
val conf2: Map[String, String] = Map(
KustoOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoOptions.KUSTO_QUERY -> "StringAndIntTable",
KustoOptions.KUSTO_READ_MODE->"lean"
)
val df3 = spark.sqlContext.read
.format("com.microsoft.kusto.spark.datasource")
.option(KustoOptions.KUSTO_CLUSTER, cluster)
.option(KustoOptions.KUSTO_DATABASE, database)
.options(conf2)
.load()
// GET STORAGE PARAMETERS FOR SCALE READ
// Use either container/account-key/account name, or container SaS
val container = "Your container name" // Databricks example: dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
val storageAccountKey = "Your storage account key" // Databricks example: dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
val storageAccountName = "Your storage account name" // Databricks example: dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
// Note: alternatively, provide just the container SAS.
// val storageSas = "Your container SAS" //Databricks example: dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
// SET UP AZURE FS CONFIGURATION FOR BLOB ACCESS
// Note: alternatively, ask the connector to setup FS configuration on every read access by setting:
// KustoOptions.KUSTO_BLOB_SET_FS_CONFIG -> "true"
// When using storage account key
spark.conf.set(s"fs.azure.account.key.$storageAccountName.blob.core.windows.net", s"$storageAccountKey")
// when using SAS
// spark.conf.set(s"fs.azure.sas.$container.$storageAccountName.blob.core.windows.net", s"$storageSas")
// READING IN SCALE MODE: SET UP CONFIGURATION
val conf3: Map[String, String] = Map(
KustoOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoOptions.KUSTO_READ_MODE -> "lean",
KustoOptions.KUSTO_QUERY -> s"$table | where (ColB % 1000 == 0) | distinct ColA "
KustoOptions.KUSTO_BLOB_CONTAINER -> container,
//KustoOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas,
KustoOptions.KUSTO_BLOB_STORAGE_ACCOUNT_NAME -> storageAccountName,
KustoOptions.KUSTO_BLOB_STORAGE_ACCOUNT_KEY -> storageAccountKey
)
val df3 = spark.sqlContext.read
.format("com.microsoft.kusto.spark.datasource")
.option(KustoOptions.KUSTO_CLUSTER, cluster)
.option(KustoOptions.KUSTO_DATABASE, database)
.options(conf2)
.load()
// Databricks: display(df3)
// READING IN SCALE MODE (FULL TABLE, UNFILTERED)
val query = "ReallyBigTable"
val df4 = spark.read.kusto(cluster, database, query, conf3)
// READING IN SCALE MODE (WITH PRUNING AND FILTERING)
val df5 = spark.read.kusto(cluster, database, query, conf3)
val dfFiltered = df5
.where(df5.col("ColA").startsWith("row-2"))
.filter("ColB > 12")
.filter("ColB <= 21")
.select("ColA")
// READING USING KEY VAULT ACCESS
// There are two different approaches to use parameters stored in Azure Key Vault for Kusto connector:
// 1. One can use DataBricks KV-assisted store under a DataBricks secrete scope (see https://docs.azuredatabricks.net/user-guide/secrets/secret-scopes.html#akv-ss). This is how keyVaultClientPassword is stored.
// 2. Some parameters, including all secretes required for Kusto connector operations, can be accessed as described in https://github.com/Azure/azure-kusto-spark/blob/dev/docs/Authentication.md
// This requires accessing the KV with the three parameters below
val keyVaultClientPassword = "Password of the AAD client used to identify to your key vault"//Databricks example: dbutils.secrets.get(scope = "KustoDemos", key = "keyVaultClientPassword")
val keyVaultClientID = "The client id of the AAD client used to identify to your key vault"
val keyVaultUri = "https://<Your key vault name>.vault.azure.net"
/************************************************************************************************************/
/* Using scale read. The following parameters are taken from Key Vault using connector Key Vault access */
/* blobContainer, blobStorageAccountKey, blobStorageAccountName, kustoAppAuthority, kustoAppId, kustoAppKey */
/************************************************************************************************************/
val conf4: Map[String, String] = Map(
KustoOptions.KEY_VAULT_URI -> keyVaultUri,
KustoOptions.KEY_VAULT_APP_ID -> keyVaultClientID,
KustoOptions.KEY_VAULT_APP_KEY -> keyVaultClientPassword,
KustoOptions.KUSTO_QUERY -> "StringAndIntExpTable"
)
val df6 =
spark.sqlContext.read
.format("com.microsoft.kusto.spark.datasource")
.option(KustoOptions.KUSTO_CLUSTER, cluster)
.option(KustoOptions.KUSTO_DATABASE, "ExperimentDb")
.options(conf4)
.load()
}
}
}