* document customSchema

* Update samples/src/main/python/pyKusto.py

Co-authored-by: Yochai Gilad <yogilad@microsoft.com>

* Update pyKusto.py

* Update pyKusto.py

Co-authored-by: Yochai Gilad <yogilad@microsoft.com>
This commit is contained in:
ohad bitton 2021-08-03 15:02:14 +03:00 коммит произвёл GitHub
Родитель 52b6782be8
Коммит 1836405e56
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
1 изменённых файлов: 7 добавлений и 12 удалений

Просмотреть файл

@ -46,11 +46,16 @@ kustoDf = pyKusto.read. \
# Read the data from the kusto table in forced 'distributed' mode and with advanced options
# Please refer to https://github.com/Azure/azure-kusto-spark/blob/master/connector/src/main/scala/com/microsoft/kusto/spark/datasource/KustoSourceOptions.scala
# in order to get the string representation of the options - as pyspark does not support calling properties of scala objects.
# ClientRequestProperties are used in every command executed on the service (schema inference, export command or query. in older versions it is used just the for the export command)
crp = sc._jvm.com.microsoft.azure.kusto.data.ClientRequestProperties()
crp.setOption("norequesttimeout",True)
crp.toString()
# Use customSchema to enforce the schema and remove the initial command to the service for schema inference.
# The provided Schema must be a subset of the query result schema.
ddl = spark.sparkContext._jvm.org.apache.spark.sql.types.DataType.fromJson(customSchema.json()).toDDL()
kustoDf = pyKusto.read. \
format("com.microsoft.kusto.spark.datasource"). \
option("kustoCluster", kustoOptions["kustoCluster"]). \
@ -61,17 +66,7 @@ kustoDf = pyKusto.read. \
option("kustoAadAuthorityID", kustoOptions["kustoAadAuthorityID"]). \
option("clientRequestPropertiesJson", crp.toString()). \
option("readMode", 'ForceDistributedMode'). \
load()
kustoDf = pyKusto.read. \
format("com.microsoft.kusto.spark.datasource"). \
option("kustoCluster", kustoOptions["kustoCluster"]). \
option("kustoDatabase", kustoOptions["kustoDatabase"]). \
option("kustoQuery", kustoOptions["kustoTable"]). \
option("kustoAadAppId", kustoOptions["kustoAadAppId"]). \
option("kustoAadAppSecret", kustoOptions["kustoAadAppSecret"]). \
option("kustoAadAuthorityID", kustoOptions["kustoAadAuthorityID"]). \
option("clientRequestPropertiesJson", crp.toString()). \
option("customSchema", ddl). \
load()
@ -154,7 +149,7 @@ kustoQ.start().awaitTermination(60*8)
# Device authentication for databricks (Scala users can just discard any authentication parameters and get the same result)
# Acquire a token with device authentication and pass the token to the connector, this token will expire in one hour but
# it should be enough for reading as the call to the service is done at the start of the flow but writing should be done
# it should be enough for reading as the call to the service is done at the start of the flow. Write commands should be done
# in an hour.
# Prints done inside the JVM are not shown in the notebooks, therefore the user has to print himself the device code.