* ingestion endpoint start

* combine query and ingest endpoints

* implementing

* adls2

* rm stray return

* fold run_query, run_command into 1

* add docs

* satisfy check()

* rename back to run_query

* ingestion working

* add adls2 abfss check

* rm stray ...

* fleshing out local ingest

* streaming ingest

* document

* fixup

* shorten function names

* all ingest methods

* hide import from check

* make sure AzureStor is present before loading it

* partial matching wart

* throw error if given blob token

* add ingest_adls1

* fixing streaming ingest

* rm refs to ingestion uri

* local ingest working

* ingestion tests seem ok

* rm print

* update readme

* don't hide ingest result

* fix collect()

* write.csv -> write.table

* escaping strings

* fix ingest_inline
This commit is contained in:
Hong Ooi 2019-02-07 03:37:08 +11:00 коммит произвёл Alex Kyllo
Родитель ce8979cd94
Коммит dc2554f494
24 изменённых файлов: 1808 добавлений и 196 удалений

Просмотреть файл

@ -28,8 +28,10 @@ Imports:
Suggests:
bit64,
knitr,
testthat
testthat,
AzureStor (>= 1.0.0.9000)
Roxygen: list(markdown=TRUE)
RoxygenNote: 6.1.0.9000
Remotes:
cloudyr/AzureRMR
cloudyr/AzureRMR,
cloudyr/AzureStor

Просмотреть файл

@ -68,8 +68,6 @@ S3method(op_vars,tbl_kusto_abstract)
S3method(print,kql)
S3method(rename,tbl_kusto_abstract)
S3method(right_join,tbl_kusto_abstract)
S3method(run_command,kusto_database_endpoint)
S3method(run_query,kusto_database_endpoint)
S3method(select,tbl_kusto_abstract)
S3method(semi_join,tbl_kusto_abstract)
S3method(setdiff,tbl_kusto_abstract)
@ -94,6 +92,11 @@ export(escape)
export(filter)
export(flatten_query)
export(get_kusto_token)
export(ingest_adls1)
export(ingest_adls2)
export(ingest_blob)
export(ingest_local)
export(ingest_url)
export(intersect)
export(is_kusto_cluster)
export(is_kusto_database)
@ -111,14 +114,13 @@ export(kql_translate_env)
export(kql_translator)
export(kql_vector)
export(kql_window)
export(kusto_query_endpoint)
export(kusto_database_endpoint)
export(list_kusto_tokens)
export(op_base)
export(op_double)
export(op_grps)
export(op_single)
export(op_vars)
export(run_command)
export(run_query)
export(setdiff)
export(setequal)

Просмотреть файл

@ -3,7 +3,7 @@
#' @import dplyr
NULL
utils::globalVariables(c("self", "asc"))
utils::globalVariables(c("self", "asc", "con"))
.onLoad <- function(libname, pkgname)
{

Просмотреть файл

@ -10,6 +10,7 @@
#' - `remove_principals(...)`: Remove database principals.
#' - `list_principals()`: Retrieve all database principals, as a data frame.
#' - `get_query_endpoint()`: Get a query endpoint object for interacting with the database.
#' - `get_ingestion_endpoint()`: Get an ingestion endpoint object for interacting with the database.
#'
#' @section Initialization:
#' Initializing a new object of this class can either retrieve an existing Kusto database, or create a new database on the server. Generally, the best way to initialize an object is via the `get_database`, `list_databases()` and `create_database` methods of the [az_kusto] class, which handle the details automatically.
@ -21,14 +22,14 @@
#' - `name`: The name of the principal to create.
#' - `role`: The roleo of the principal, for example "Admin" or "User".
#' - `type`: The type of principal, either "User" or "App".
#' - `fqn`: The fully qualified name of the principal, for example "aaduser=username@mydomain". If supplied, the other details will be obtained from this.
#' - `fqn`: The fully qualified name of the principal, for example "aaduser=username@mydomain" for an Azure Active Directory account. If supplied, the other details will be obtained from this.
#' - `email`: For a user principal, the email address.
#' - `app_id`: For an application principal, the ID.
#'
#' `remove_principal` removes a principal. It takes the same arguments as `add_principal`; if the supplied details do not match the actual details for the principal, it is not removed.
#'
#' @seealso
#' [az_kusto], [kusto_query_endpoint],
#' [az_kusto], [kusto_database_endpoint],
#' [create_database], [get_database], [delete_database]
#'
#' [Kusto/Azure Data Explorer documentation](https://docs.microsoft.com/en-us/azure/data-explorer/),
@ -47,12 +48,10 @@
#' # add a new principal
#' db$add_principal("New User", role="User", fqn="aaduser=username@mydomain")
#'
#' # get a query endpoint
#' db$get_query_endpoint()
#' # get the endpoint
#' db$get_database_endpoint(use_integer64=FALSE)
#'
#' }
#' @seealso
#' [az_kusto], [create_database], [get_database], [delete_database], [list_databases], [kusto_query_endpoint]
#' @export
az_kusto_database <- R6::R6Class("az_kusto_database", inherit=AzureRMR::az_resource,
public=list(
@ -117,15 +116,17 @@ public=list(
do.call(rbind, lapply(val, as.data.frame, stringsAsFactors=FALSE))
},
get_query_endpoint=function(tenant=NULL, user=NULL, pwd=NULL)
get_database_endpoint=function(tenant=NULL, user=NULL, pwd=NULL, ...)
{
if(is.null(tenant))
tenant <- self$cluster$get_default_tenant()
token <- self$cluster$get_aad_token(tenant)
token <- self$cluster$get_query_token(tenant)
server <- self$cluster$properties$queryUri
database <- basename(self$name)
kusto_query_endpoint(server=server, database=database, tenantid=tenant, .azure_token=token)
kusto_database_endpoint(server=server, database=database, tenantid=tenant,
.query_token=token,
...)
}
))

Просмотреть файл

@ -13,7 +13,8 @@
#' - `delete_database(database, confirm=TRUE)`: Delete a database, by default asking for confirmation first.
#' - `list_databases()`: List all databases in this cluster.
#' - `get_default_tenant()`: Retrieve the default tenant to authenticate with this cluster.
#' - `get_aad_token(tenant, ...)`: Obtain an authentication token from Azure Active Directory. Accepts further arguments that will be passed to [get_kusto_token].
#' - `get_query_token(tenant, ...)`: Obtain an authentication token from Azure Active Directory for this cluster's query enpdoint. Accepts further arguments that will be passed to [get_kusto_token].
#' - `get_ingestion_token(tenant, ...)`: Obtain an authentication token for this cluster's ingestion endpoint. Accepts further arguments that will be passed to [get_kusto_token].
#'
#' @section Initialization:
#' Initializing a new object of this class can either retrieve an existing Kusto cluster, or create a new cluster on the host. Generally, the best way to initialize an object is via the `get_kusto_cluster` and `create_kusto_cluster` methods of the [az_resource_group] class, which handle the details automatically.
@ -29,7 +30,7 @@
#' `get_database` takes a single argument `database`, the name of the database to retrieve, and returns an object of class `az_kusto_database`. `delete_database` takes the name of the database to delete and returns NULL on a successful deletion. `list_databases` takes no arguments and returns a list of `az_kusto_database` objects, one for each database in the cluster.
#'
#' @seealso
#' [az_kusto_database], [kusto_query_endpoint],
#' [az_kusto_database], [kusto_database_endpoint],
#' [create_kusto_cluster], [get_kusto_cluster], [delete_kusto_cluster],
#' [get_kusto_token]
#'
@ -121,9 +122,14 @@ public=list(
tenant
},
get_aad_token=function(tenant=self$get_default_tenant(), ...)
get_query_token=function(tenant=self$get_default_tenant(), ...)
{
get_kusto_token(server=self$properties$queryUri, tenant=tenant, ...)
},
get_ingestion_token=function(tenant=self$get_default_tenant(), ...)
{
get_kusto_token(server=self$properties$dataIngestionUri, tenant=tenant, ...)
}
))

Просмотреть файл

@ -1,12 +1,12 @@
#' Endpoint for communicating with a Kusto database
#' Endpoints for communicating with a Kusto database
#'
#' @param ... Named arguments which are the properties for the endpoint object. See 'Details' below for the properties that AzureKusto recognises.
#' @param .connection_string An alternative way of specifying the properties, as a database connection string. Properties supplied here override those in `...` if they overlap.
#' @param .azure_token Optionally, an Azure Active Directory token to authenticate with. If this is supplied, it overrides other tokens specified in `...` or in the connection string.
#' @param .use_integer64 Whether to convert columns with Kusto `long` datatype into 64-bit integers in R, using the bit64 package. If FALSE, represent them as numeric instead.
#' @param .query_token Optionally, an Azure Active Directory (AAD) token to authenticate with. If this is supplied, it overrides other tokens specified in `...` or in the connection string.
#' @param .use_integer64 For `kusto_database_endpoint`, whether to convert columns with Kusto `long` datatype into 64-bit integers in R, using the bit64 package. If FALSE, represent them as numeric instead.
#'
#' @details
#' This is a list of properties recognised by `kusto_query_endpoint`, and their alternate names. Property names not in this list will generate an error.
#' This is a list of properties recognised by `kusto_database_endpoint`, and their alternate names. Property names not in this list will generate an error. Note that not all properties that are recognised are currently supported by AzureKusto.
#'
#' General properties:
#' - server: The URI of the server, usually of the form 'https://{clustername}.{location}.kusto.windows.net'.
@ -30,7 +30,7 @@
#' - traceusername: The user name for tracing.
#' - usertoken: The AAD token for user authentication.
#' - * usertoken, usrtoken
#' - fed: Logical, whether federated authentication is enabled. Currently unsupported; if this is TRUE, `kusto_query_endpoint` will print a warning and ignore it.
#' - fed: Logical, whether federated authentication is enabled. Currently unsupported; if this is TRUE, `kusto_database_endpoint` will print a warning and ignore it.
#' * federated security, federated, aadfed
#'
#' App authentication properties:
@ -42,8 +42,8 @@
#'
#' Currently, AzureKusto only supports authentication via Azure Active Directory, and only via app or user credentials. Authenticating with federated logins, an AAD certificate, or with DSTS is planned for the future.
#'
#' The way `kusto_query_endpoint` obtains an AAD token is as follows.
#' 1. If the `.azure_token` argument is supplied, use it.
#' The way `kusto_database_endpoint` obtains an AAD token is as follows.
#' 1. If the `.query_token` argument is supplied, use it.
#' 2. Otherwise, if the `usertoken` property is supplied, use it.
#' 3. Otherwise, if the `apptoken` property is supplied, use it.
#' 4. Otherwise, if the `appclientid` property is supplied, use it to obtain a token:
@ -57,9 +57,10 @@
#' @return
#' An object of class `kusto_database_endpoint`.
#' @seealso
#' [run_query], [run_command]
#' [run_query], [az_kusto_database]
#' @rdname database_endpoint
#' @export
kusto_query_endpoint <- function(..., .connection_string=NULL, .azure_token=NULL, .use_integer64=FALSE)
kusto_database_endpoint <- function(..., .connection_string=NULL, .query_token=NULL, .use_integer64=FALSE)
{
props <- list(...)
names(props) <- tolower(names(props))
@ -78,30 +79,22 @@ kusto_query_endpoint <- function(..., .connection_string=NULL, .azure_token=NULL
}
# fix all property names to a given (sub)set, remove quotes from quoted values
props <- normalize_properties(props)
props <- normalize_connstring_properties(props)
if(!is.null(.azure_token))
props$token <- .azure_token
# if .azure_token arg not supplied, get it from other properties
if(is.null(props$token))
props$token <- find_token(props)
if(is.null(props$token))
stop("Only logins with Azure Active Directory are currently supported, unable to acquire token",
call.=FALSE)
if(props$token$credentials$resource != props$server)
props$token <- find_endpoint_token(props, .query_token)
if(AzureRMR::is_azure_token(props$token) && props$token$credentials$resource != props$server)
warning(sprintf("Mismatch between server (%s) and token resource (%s)",
props$token$credentials$resource, props$server))
props$use_integer64 <- .use_integer64
props <- check_endpoint_properties(props)
props <- handle_unsupported(props)
class(props) <- "kusto_database_endpoint"
props
}
normalize_properties <- function(properties)
normalize_connstring_properties <- function(properties)
{
# valid property names for a Kusto connection string
property_list <- list(
@ -163,8 +156,11 @@ find_type_from_connstring <- function(string)
}
find_token <- function(properties)
find_endpoint_token <- function(properties, .query_token)
{
if(!is.null(.query_token))
return(.query_token)
# properties to check for token: usertoken, apptoken, appclientid, appkey
if(!is_empty(properties$usertoken))
return(properties$usertoken)
@ -204,7 +200,7 @@ find_token <- function(properties)
}
handle_unsupported <- function(props)
check_endpoint_properties <- function(props)
{
if(isTRUE(props$fed))
{

248
R/ingest.R Normal file
Просмотреть файл

@ -0,0 +1,248 @@
#' Ingestion functions for Kusto
#'
#' @param database A Kusto database endpoint object, created with [kusto_database_endpoint].
#' @param src The source data. This can be either a data frame, local filename, or URL.
#' @param dest_table The name of the destination table.
#' @param method For local ingestion, the method to use. See 'Details' below.
#' @param staging_container For local ingestion, an Azure storage container to use for staging the dataset. This can be an object of class either [AzureStor::blob_container] or [AzureStor::adls_filesystem]. Only used if `method="indirect"`.
#' @param ingestion_token For local ingestion, an Azure Active Directory authentication token for the cluster ingestion endpoint. Only used if `method="streaming"`.
#' @param http_status_handler For local ingestion, how to handle HTTP conditions >= 300. Defaults to "stop"; alternatives are "warn", "message" and "pass". The last option will pass through the raw response object from the server unchanged, regardless of the status code. This is mostly useful for debugging purposes, or if you want to see what the Kusto REST API does. Only used if `method="streaming"`.
#' @param key,token,sas Authentication arguments for the Azure storage ingestion methods. If multiple arguments are supplied, a key takes priority over a token, which takes priority over a SAS. Note that these arguments are for authenticating with the Azure _storage account_, as opposed to Kusto itself.
#' @param async For the URL ingestion functions, whether to do the ingestion asychronously. If TRUE, the function will return immediately while the server handles the operation in the background.
#' @param ... Named arguments to be treated as ingestion parameters.
#'
#' @details
#' There are up to 3 possible ways to ingest a local dataset, specified by the `method` argument.
#' - `method="indirect"`: The data is uploaded to blob storage, and then ingested from there. This is the default if the AzureStor package is present.
#' - `method="streaming"`: The data is uploaded to the cluster ingestion endpoint. This is the default if the AzureStor package is not present, however be aware that currently (as of February 2019) streaming ingestion is in beta and has to be enabled for a cluster by filing a support ticket.
#' - `method="inline"`: The data is embedded into the command text itself. This is only recommended for testing purposes, or small datasets.
#'
#' @rdname ingest
#' @export
ingest_local <- function(database, src, dest_table, method=NULL, staging_container=NULL,
ingestion_token=database$token, http_status_handler="stop", ...)
{
AzureStor <- requireNamespace("AzureStor")
if(is.null(method))
method <- if(AzureStor) "indirect" else "streaming"
switch(as.character(method),
indirect=
ingest_indirect(database, src, dest_table, staging_container, ...),
streaming=
ingest_streaming(database, src, dest_table, ingestion_token, http_status_handler, ...),
inline=
ingest_inline(database, src, dest_table, ...),
stop("Bad ingestion method argument", call.=FALSE)
)
}
#' @rdname ingest
#' @export
ingest_url <- function(database, src, dest_table, async=FALSE, ...)
{
prop_list <- get_ingestion_properties(...)
cmd <- paste(".ingest",
if(async) "async" else NULL,
"into table",
escape(ident(dest_table)),
"(", obfuscate_string(src), ")",
prop_list)
run_query(database, cmd)
}
#' @rdname ingest
#' @export
ingest_blob <- function(database, src, dest_table, async=FALSE, key=NULL, token=NULL, sas=NULL, ...)
{
if(!is.null(key))
src <- paste0(src, ";", key)
else if(!is.null(token))
stop("Kusto does not support authenticating to blob storage with a token", call.=FALSE)
else if(!is.null(sas))
src <- paste0(src, "?", sas)
ingest_url(database, src, dest_table, async, ...)
}
#' @rdname ingest
#' @export
ingest_adls2 <- function(database, src, dest_table, async=FALSE, key=NULL, token=NULL, sas=NULL, ...)
{
# convert https URI into abfss for Kusto
src_uri <- httr::parse_url(src)
if(grepl("^http", src_uri$scheme))
{
message("ADLSgen2 URIs should be specified as 'abfss://filesystem@host/path/file'")
src_uri$scheme <- "abfss"
src_uri$username <- sub("/.+$", "", src_uri$path)
src_uri$path <- sub("^[^/]+/", "", src_uri$path)
src <- httr::build_url(src_uri)
}
else if(src_uri$scheme == "adl")
stop("ADLSgen2 URIs do not use the adl: scheme; did you mean to call ingest_adls1()?", call.=FALSE)
if(!is.null(key))
src <- paste0(src, ";", key)
else if(!is.null(token))
src <- paste0(src, ";token=", validate_token(token))
else if(!is.null(sas))
stop("ADLSgen2 does not support use of shared access signatures", call.=FALSE)
else src <- paste0(src, ";impersonate")
ingest_url(database, src, dest_table, async, ...)
}
#' @rdname ingest
#' @export
ingest_adls1 <- function(database, src, dest_table, async=FALSE, key=NULL, token=NULL, sas=NULL, ...)
{
# convert https URI into adl for Kusto
src_uri <- httr::parse_url(src)
if(grepl("^http", src_uri$scheme))
{
warning("ADLSgen1 URIs should use the adl: scheme; did you mean to call ingest_adls2()?")
src_uri$scheme <- "adl"
src <- httr::build_url(src_uri)
}
else if(src_uri$scheme == "abfss")
stop("ADLSgen1 URIs do not use the abfss: scheme; did you mean to call ingest_adls2()?", call.=FALSE)
if(!is.null(key))
stop("ADLSgen1 does not use shared keys; did you mean to call ingest_adls2()?", call.=FALSE)
else if(!is.null(token))
src <- paste0(src, ";token=", validate_token(token))
else if(!is.null(sas))
stop("ADLSgen1 does not support use of shared access signatures", call.=FALSE)
ingest_url(database, src, dest_table, async, ...)
}
ingest_streaming <- function(database, src, dest_table, ingestion_token=database$token,
http_status_handler=c("stop", "warn", "message", "pass"), ...)
{
opts <- list(...)
if(is.data.frame(src))
{
con <- textConnection(NULL, "w")
on.exit(close(con))
utils::write.table(src, con, row.names=FALSE, col.names=FALSE, sep=",")
body <- textConnectionValue(con)
opts <- utils::modifyList(opts, list(streamFormat="Csv"))
}
else body <- readLines(src)
ingest_uri <- httr::parse_url(database$server)
ingest_uri$path <- file.path("v1/rest/ingest", database$database, dest_table)
ingest_uri$query <- opts
headers <- httr::add_headers(Authorization=paste("Bearer", validate_token(ingestion_token)))
res <- httr::POST(ingest_uri, headers, body=body, encode="raw")
http_status_handler <- match.arg(http_status_handler)
if(http_status_handler == "pass")
return(res)
cont <- httr::content(res, simplifyVector=TRUE)
handler <- get(paste0(http_status_handler, "_for_status"), getNamespace("httr"))
handler(res, make_error_message(cont))
cont
}
ingest_indirect <- function(database, src, dest_table, staging_container=NULL, ...)
{
if(!requireNamespace("AzureStor"))
stop("AzureStor package must be installed to do indirect ingestion", call.=FALSE)
opts <- utils::modifyList(list(...), list(
key=staging_container$endpoint$key,
token=staging_container$endpoint$token,
sas=staging_container$endpoint$sas))
if(is.data.frame(src))
{
utils::write.table(src, textConnection("con", "w"), row.names=FALSE, col.names=FALSE, sep=",")
src <- textConnection(con, "r")
opts <- utils::modifyList(opts, list(streamFormat="Csv"))
}
if(inherits(staging_container, "blob_container"))
{
uploadfunc <- get("upload_blob", getNamespace("AzureStor"))
uploadfunc(staging_container, src, dest_table)
url <- httr::parse_url(staging_container$endpoint$url)
url$path <- file.path(staging_container$name, dest_table)
do.call(ingest_blob, c(list(database, httr::build_url(url), dest_table), opts))
}
else if(inherits(staging_container, "adls_filesystem"))
{
uploadfunc <- get("upload_adls_file", getNamespace("AzureStor"))
uploadfunc(staging_container, src, dest_table)
url <- httr::parse_url(staging_container$endpoint$url)
url$scheme <- "abfss"
url$username <- staging_container$name
url$path <- dest_table
do.call(ingest_adls2, c(list(database, httr::build_url(url), dest_table), opts))
}
else stop("Unsupported staging container type", call.=FALSE)
}
ingest_inline <- function(database, src, dest_table, ...)
{
prop_list <- get_ingestion_properties(...)
if(is.data.frame(src))
{
con <- textConnection(NULL, "w")
on.exit(close(con))
utils::write.table(src, con, row.names=FALSE, col.names=FALSE, sep=",")
records <- textConnectionValue(con)
}
else records <- readLines(src)
cmd <- paste(".ingest inline into table",
escape(ident(dest_table)),
prop_list,
"<|\n",
paste0(records, collapse="\n"))
invisible(run_query(database, cmd))
}
obfuscate_string <- function(string)
{
paste0("h", escape(string))
}
get_ingestion_properties <- function(...)
{
props <- list(...)
if(is_empty(props))
return(NULL)
prop_list <- mapply(function(name, value)
{
paste(name, escape(value), sep="=")
}, names(props), props)
paste("with (", paste(prop_list, collapse=", "), ")")
}

Просмотреть файл

@ -23,7 +23,7 @@
#' `get_kusto_token` returns an object of class AzureRMR::AzureToken representing the authentication token, while `list_kusto_tokens` returns a list of such objects. `delete_azure_token` returns NULL on a successful delete.
#'
#' @seealso
#' [kusto_query_endpoint], [AzureRMR::get_azure_token]
#' [kusto_database_endpoint], [AzureRMR::get_azure_token]
#' @export
get_kusto_token <- function(server=NULL, clustername, location=NULL, tenant, app=.kusto_app_id, auth_type=NULL, ...)
{

110
R/query.R
Просмотреть файл

@ -7,55 +7,18 @@
#' Run a query or command against a Kusto database
#'
#' @param database A Kusto database endpoint object, as returned by `kusto_query_endpoint`.
#' @param query,command A string containing the query or command. Note that database management commands in KQL are distinct from queries.
#' @param ... For `run_query`, named arguments to be used as parameters for a parameterized query.
#' @param database A Kusto database endpoint object, as returned by `kusto_database_endpoint`.
#' @param qry_cmd A string containing the query or command. In KQL, a database management command is a statement that starts with a "."
#' @param ... Named arguments to be used as parameters for a parameterized query. These are ignored for database management commands.
#' @param .http_status_handler The function to use to handle HTTP status codes. The default "stop" will throw an R error via `httr::stop_for_status` if the status code is not less than 300; other possibilities are "warn", "message" and "pass". The last option will pass through the raw response object from the server unchanged, regardless of the status code. This is mostly useful for debugging purposes, or if you want to see what the Kusto REST API does.
#'
#' @details
#' These functions are the workhorses of the AzureKusto package. They communicate with the Kusto server and return the query or command results, as data frames.
#' This function is the workhorse of the AzureKusto package. It communicates with the Kusto server and returns the query or command results, as data frames.
#'
#' @seealso
#' [kusto_query_endpoint]
#' @rdname query
#' [kusto_database_endpoint], [ingest_local], [ingest_url], [ingest_blob], [ingest_adls2]
#' @export
run_query <- function(database, ...)
{
UseMethod("run_query")
}
#' @rdname query
#' @export
run_query.kusto_database_endpoint <- function(database, query, ...)
{
query_params <- list(...)
server <- database$server
db <- database$database
token <- database$token
user <- database$user
password <- database$pwd
qry_opts <- database[names(database) %in% .qry_opt_names]
uri <- paste0(server, "/v1/rest/query")
body <- build_request_body(db, query, query_options=qry_opts, query_parameters=query_params)
auth_str <- build_auth_str(token, user, password)
result <- call_kusto(uri, body, auth_str)
parse_query_result(result, database$use_integer64)
}
#' @rdname query
#' @export
run_command <- function(database, ...)
{
UseMethod("run_command")
}
#' @rdname query
#' @export
run_command.kusto_database_endpoint <- function(database, command, ...)
run_query <- function(database, qry_cmd, ..., .http_status_handler="stop")
{
server <- database$server
db <- database$database
@ -65,11 +28,18 @@ run_command.kusto_database_endpoint <- function(database, command, ...)
qry_opts <- database[names(database) %in% .qry_opt_names]
uri <- paste0(server, "/v1/rest/mgmt")
body <- build_request_body(db, command, query_options=qry_opts)
is_cmd <- substr(qry_cmd, 1, 1) == "."
uri <- paste0(server,
if(is_cmd) "/v1/rest/mgmt" else "/v1/rest/query")
query_params <- if(is_cmd) list() else list(...)
body <- build_request_body(db, qry_cmd, query_options=qry_opts, query_parameters=query_params)
auth_str <- build_auth_str(token, user, password)
result <- call_kusto(uri, body, auth_str)
parse_command_result(result, database$use_integer64)
result <- call_kusto(uri, body, auth_str, http_status_handler=.http_status_handler)
if(is_cmd)
parse_command_result(result, database$use_integer64)
else parse_query_result(result, database$use_integer64)
}
@ -121,7 +91,7 @@ build_request_body <- function(db, qry_cmd, query_options=list(), query_paramete
build_auth_str <- function(token=NULL, user=NULL, password=NULL)
{
token <- validate_kusto_token(token)
token <- validate_token(token)
auth_str <- if(!is.null(token))
paste("Bearer", token)
@ -134,7 +104,7 @@ build_auth_str <- function(token=NULL, user=NULL, password=NULL)
call_kusto <- function(uri, body, auth_str,
http_status_handler=c("stop", "warn", "message", "pass"))
http_status_handler=c("stop", "warn", "message", "pass"))
{
res <- httr::POST(uri, httr::add_headers(Authorization=auth_str), body=body, encode="json")
@ -198,28 +168,28 @@ parse_command_result <- function(tables, .use_integer64)
}
convert_kusto_datatype <- function(column, kusto_type, .use_integer64)
{
switch(kusto_type,
long=, Int64=
if(.use_integer64) bit64::as.integer64(column) else as.numeric(column),
int=, integer=, Int32=
as.integer(column),
datetime=, DateTime=
as.POSIXct(strptime(column, format='%Y-%m-%dT%H:%M:%OSZ', tz='UTC')),
real=, Double=, Float=
as.numeric(column),
bool=, Boolean=
as.logical(column),
as.character(column)
)
}
convert_result_types <- function(df, coltypes_df, .use_integer64)
{
if(is_empty(df))
return(list())
convert_kusto_datatype <- function(column, kusto_type, .use_integer64)
{
switch(kusto_type,
long=, Int64=
if(.use_integer64) bit64::as.integer64(column) else as.numeric(column),
int=, integer=, Int32=
as.integer(column),
datetime=, DateTime=
as.POSIXct(strptime(column, format='%Y-%m-%dT%H:%M:%OSZ', tz='UTC')),
real=, Double=, Float=
as.numeric(column),
bool=, Boolean=
as.logical(column),
as.character(column)
)
}
df <- as.data.frame(df, stringsAsFactors=FALSE)
names(df) <- coltypes_df$ColumnName
df[] <- Map(convert_kusto_datatype, df, coltypes_df$DataType, MoreArgs=list(.use_integer64=.use_integer64))
@ -227,7 +197,7 @@ convert_result_types <- function(df, coltypes_df, .use_integer64)
}
validate_kusto_token <- function(token)
validate_token <- function(token)
{
# token can be a string or an object of class AzureRMR::AzureToken
if(AzureRMR::is_azure_token(token))
@ -240,6 +210,6 @@ validate_kusto_token <- function(token)
token <- token$credentials$access_token
}
else if(!is.character(token))
stop("Invalid authentication token in database endpoint", call.=FALSE)
stop("Invalid authentication token", call.=FALSE)
token
}

Просмотреть файл

@ -218,7 +218,7 @@ collect.tbl_kusto <- function(tbl, ...)
q_str <- kql_render(q)
params <- c(tbl$params, list(...))
params$database <- tbl$src
params$query <- q_str
params$qry_cmd <- q_str
res <- do.call(run_query, params)
as_tibble(res)
}

Просмотреть файл

@ -10,15 +10,15 @@ You can install the development version from GitHub, via `devtools::install_gith
### Kusto Endpoint Interface
Connect to a Kusto cluster by instantiating a `kusto_query_endpoint` object with the cluster URI, database name, and an `AzureRMR::AzureToken` object, which you can obtain via the `get_kusto_token` helper function.
Connect to a Kusto cluster by instantiating a `kusto_database_endpoint` object with the cluster URI, database name, and an `AzureRMR::AzureToken` object, which you can obtain via the `get_kusto_token` helper function.
```r
library(AzureKusto)
Samples <- kusto_query_endpoint(server="https://help.kusto.windows.net",
Samples <- kusto_database_endpoint(server="https://help.kusto.windows.net",
database="Samples",
.azure_token=get_kusto_token(clustername="help", tenant="microsoft"))
.query_token=get_kusto_token(clustername="help", tenant="microsoft"))
# To sign in, use a web browser to open the page https://microsoft.com/devicelogin and enter the code FPD8GZPY9 to authenticate.
# Waiting for device code in browser...
@ -52,12 +52,11 @@ res <- run_query(Samples, "MyFunction(lim)", lim=10L)
```
Command statements can be run with `run_command()`.
Note that commands do not accept parameters.
Command statements work much the same way, except that they do not accept parameters.
```r
res <- run_command(Samples, ".show tables | count")
res <- run_query(Samples, ".show tables | count")
```

Просмотреть файл

@ -27,7 +27,8 @@ The following methods are available, in addition to those provided by the \link[
\item \code{delete_database(database, confirm=TRUE)}: Delete a database, by default asking for confirmation first.
\item \code{list_databases()}: List all databases in this cluster.
\item \code{get_default_tenant()}: Retrieve the default tenant to authenticate with this cluster.
\item \code{get_aad_token(tenant, ...)}: Obtain an authentication token from Azure Active Directory. Accepts further arguments that will be passed to \link{get_kusto_token}.
\item \code{get_query_token(tenant, ...)}: Obtain an authentication token from Azure Active Directory for this cluster's query enpdoint. Accepts further arguments that will be passed to \link{get_kusto_token}.
\item \code{get_ingestion_token(tenant, ...)}: Obtain an authentication token for this cluster's ingestion endpoint. Accepts further arguments that will be passed to \link{get_kusto_token}.
}
}
@ -71,7 +72,7 @@ kust$get_aad_token()
}
}
\seealso{
\link{az_kusto_database}, \link{kusto_query_endpoint},
\link{az_kusto_database}, \link{kusto_database_endpoint},
\link{create_kusto_cluster}, \link{get_kusto_cluster}, \link{delete_kusto_cluster},
\link{get_kusto_token}

Просмотреть файл

@ -20,6 +20,7 @@ The following methods are available, in addition to those provided by the \link[
\item \code{remove_principals(...)}: Remove database principals.
\item \code{list_principals()}: Retrieve all database principals, as a data frame.
\item \code{get_query_endpoint()}: Get a query endpoint object for interacting with the database.
\item \code{get_ingestion_endpoint()}: Get an ingestion endpoint object for interacting with the database.
}
}
@ -37,7 +38,7 @@ This class provides methods for managing the principals of a database.
\item \code{name}: The name of the principal to create.
\item \code{role}: The roleo of the principal, for example "Admin" or "User".
\item \code{type}: The type of principal, either "User" or "App".
\item \code{fqn}: The fully qualified name of the principal, for example "aaduser=username@mydomain". If supplied, the other details will be obtained from this.
\item \code{fqn}: The fully qualified name of the principal, for example "aaduser=username@mydomain" for an Azure Active Directory account. If supplied, the other details will be obtained from this.
\item \code{email}: For a user principal, the email address.
\item \code{app_id}: For an application principal, the ID.
}
@ -59,17 +60,15 @@ db$list_principals()
# add a new principal
db$add_principal("New User", role="User", fqn="aaduser=username@mydomain")
# get a query endpoint
db$get_query_endpoint()
# get the endpoint
db$get_database_endpoint(use_integer64=FALSE)
}
}
\seealso{
\link{az_kusto}, \link{kusto_query_endpoint},
\link{az_kusto}, \link{kusto_database_endpoint},
\link{create_database}, \link{get_database}, \link{delete_database}
\href{https://docs.microsoft.com/en-us/azure/data-explorer/}{Kusto/Azure Data Explorer documentation},
\link{az_kusto}, \link{create_database}, \link{get_database}, \link{delete_database}, \link{list_databases}, \link{kusto_query_endpoint}
}
\keyword{datasets}

Просмотреть файл

@ -1,29 +1,29 @@
% Generated by roxygen2: do not edit by hand
% Please edit documentation in R/endpoint.R
\name{kusto_query_endpoint}
\alias{kusto_query_endpoint}
\title{Endpoint for communicating with a Kusto database}
\name{kusto_database_endpoint}
\alias{kusto_database_endpoint}
\title{Endpoints for communicating with a Kusto database}
\usage{
kusto_query_endpoint(..., .connection_string = NULL,
.azure_token = NULL, .use_integer64 = FALSE)
kusto_database_endpoint(..., .connection_string = NULL,
.query_token = NULL, .use_integer64 = FALSE)
}
\arguments{
\item{...}{Named arguments which are the properties for the endpoint object. See 'Details' below for the properties that AzureKusto recognises.}
\item{.connection_string}{An alternative way of specifying the properties, as a database connection string. Properties supplied here override those in \code{...} if they overlap.}
\item{.azure_token}{Optionally, an Azure Active Directory token to authenticate with. If this is supplied, it overrides other tokens specified in \code{...} or in the connection string.}
\item{.query_token}{Optionally, an Azure Active Directory (AAD) token to authenticate with. If this is supplied, it overrides other tokens specified in \code{...} or in the connection string.}
\item{.use_integer64}{Whether to convert columns with Kusto \code{long} datatype into 64-bit integers in R, using the bit64 package. If FALSE, represent them as numeric instead.}
\item{.use_integer64}{For \code{kusto_database_endpoint}, whether to convert columns with Kusto \code{long} datatype into 64-bit integers in R, using the bit64 package. If FALSE, represent them as numeric instead.}
}
\value{
An object of class \code{kusto_database_endpoint}.
}
\description{
Endpoint for communicating with a Kusto database
Endpoints for communicating with a Kusto database
}
\details{
This is a list of properties recognised by \code{kusto_query_endpoint}, and their alternate names. Property names not in this list will generate an error.
This is a list of properties recognised by \code{kusto_database_endpoint}, and their alternate names. Property names not in this list will generate an error. Note that not all properties that are recognised are currently supported by AzureKusto.
General properties:
\itemize{
@ -64,7 +64,7 @@ User authentication properties:
\itemize{
\item usertoken, usrtoken
}
\item fed: Logical, whether federated authentication is enabled. Currently unsupported; if this is TRUE, \code{kusto_query_endpoint} will print a warning and ignore it.
\item fed: Logical, whether federated authentication is enabled. Currently unsupported; if this is TRUE, \code{kusto_database_endpoint} will print a warning and ignore it.
\itemize{
\item federated security, federated, aadfed
}
@ -85,9 +85,9 @@ App authentication properties:
Currently, AzureKusto only supports authentication via Azure Active Directory, and only via app or user credentials. Authenticating with federated logins, an AAD certificate, or with DSTS is planned for the future.
The way \code{kusto_query_endpoint} obtains an AAD token is as follows.
The way \code{kusto_database_endpoint} obtains an AAD token is as follows.
\enumerate{
\item If the \code{.azure_token} argument is supplied, use it.
\item If the \code{.query_token} argument is supplied, use it.
\item Otherwise, if the \code{usertoken} property is supplied, use it.
\item Otherwise, if the \code{apptoken} property is supplied, use it.
\item Otherwise, if the \code{appclientid} property is supplied, use it to obtain a token:
@ -104,5 +104,5 @@ The way \code{kusto_query_endpoint} obtains an AAD token is as follows.
}
}
\seealso{
\link{run_query}, \link{run_command}
\link{run_query}, \link{az_kusto_database}
}

Просмотреть файл

@ -46,5 +46,5 @@ Manage AAD authentication tokens for Kusto clusters
By default, authentication tokens will be obtained using the main KustoClient Active Directory app. This app can be used to authenticate with any Kusto cluster (assuming, of course, you have the proper credentials).
}
\seealso{
\link{kusto_query_endpoint}, \link[AzureRMR:get_azure_token]{AzureRMR::get_azure_token}
\link{kusto_database_endpoint}, \link[AzureRMR:get_azure_token]{AzureRMR::get_azure_token}
}

57
man/ingest.Rd Normal file
Просмотреть файл

@ -0,0 +1,57 @@
% Generated by roxygen2: do not edit by hand
% Please edit documentation in R/ingest.R
\name{ingest_local}
\alias{ingest_local}
\alias{ingest_url}
\alias{ingest_blob}
\alias{ingest_adls2}
\alias{ingest_adls1}
\title{Ingestion functions for Kusto}
\usage{
ingest_local(database, src, dest_table, method = NULL,
staging_container = NULL, ingestion_token = database$token,
http_status_handler = "stop", ...)
ingest_url(database, src, dest_table, async = FALSE, ...)
ingest_blob(database, src, dest_table, async = FALSE, key = NULL,
token = NULL, sas = NULL, ...)
ingest_adls2(database, src, dest_table, async = FALSE, key = NULL,
token = NULL, sas = NULL, ...)
ingest_adls1(database, src, dest_table, async = FALSE, key = NULL,
token = NULL, sas = NULL, ...)
}
\arguments{
\item{database}{A Kusto database endpoint object, created with \link{kusto_database_endpoint}.}
\item{src}{The source data. This can be either a data frame, local filename, or URL.}
\item{dest_table}{The name of the destination table.}
\item{method}{For local ingestion, the method to use. See 'Details' below.}
\item{staging_container}{For local ingestion, an Azure storage container to use for staging the dataset. This can be an object of class either \link[AzureStor:blob_container]{AzureStor::blob_container} or \link[AzureStor:adls_filesystem]{AzureStor::adls_filesystem}. Only used if \code{method="indirect"}.}
\item{ingestion_token}{For local ingestion, an Azure Active Directory authentication token for the cluster ingestion endpoint. Only used if \code{method="streaming"}.}
\item{http_status_handler}{For local ingestion, how to handle HTTP conditions >= 300. Defaults to "stop"; alternatives are "warn", "message" and "pass". The last option will pass through the raw response object from the server unchanged, regardless of the status code. This is mostly useful for debugging purposes, or if you want to see what the Kusto REST API does. Only used if \code{method="streaming"}.}
\item{...}{Named arguments to be treated as ingestion parameters.}
\item{async}{For the URL ingestion functions, whether to do the ingestion asychronously. If TRUE, the function will return immediately while the server handles the operation in the background.}
\item{key, token, sas}{Authentication arguments for the Azure storage ingestion methods. If multiple arguments are supplied, a key takes priority over a token, which takes priority over a SAS. Note that these arguments are for authenticating with the Azure \emph{storage account}, as opposed to Kusto itself.}
}
\description{
Ingestion functions for Kusto
}
\details{
There are up to 3 possible ways to ingest a local dataset, specified by the \code{method} argument.
\itemize{
\item \code{method="indirect"}: The data is uploaded to blob storage, and then ingested from there. This is the default if the AzureStor package is present.
\item \code{method="streaming"}: The data is uploaded to the cluster ingestion endpoint. This is the default if the AzureStor package is not present, however be aware that currently (as of February 2019) streaming ingestion is in beta and has to be enabled for a cluster by filing a support ticket.
\item \code{method="inline"}: The data is embedded into the command text itself. This is only recommended for testing purposes, or small datasets.
}
}

Просмотреть файл

@ -1,33 +0,0 @@
% Generated by roxygen2: do not edit by hand
% Please edit documentation in R/query.R
\name{run_query}
\alias{run_query}
\alias{run_query.kusto_database_endpoint}
\alias{run_command}
\alias{run_command.kusto_database_endpoint}
\title{Run a query or command against a Kusto database}
\usage{
run_query(database, ...)
\method{run_query}{kusto_database_endpoint}(database, query, ...)
run_command(database, ...)
\method{run_command}{kusto_database_endpoint}(database, command, ...)
}
\arguments{
\item{database}{A Kusto database endpoint object, as returned by \code{kusto_query_endpoint}.}
\item{...}{For \code{run_query}, named arguments to be used as parameters for a parameterized query.}
\item{query, command}{A string containing the query or command. Note that database management commands in KQL are distinct from queries.}
}
\description{
Run a query or command against a Kusto database
}
\details{
These functions are the workhorses of the AzureKusto package. They communicate with the Kusto server and return the query or command results, as data frames.
}
\seealso{
\link{kusto_query_endpoint}
}

26
man/run_query.Rd Normal file
Просмотреть файл

@ -0,0 +1,26 @@
% Generated by roxygen2: do not edit by hand
% Please edit documentation in R/query.R
\name{run_query}
\alias{run_query}
\title{Run a query or command against a Kusto database}
\usage{
run_query(database, qry_cmd, ..., .http_status_handler = "stop")
}
\arguments{
\item{database}{A Kusto database endpoint object, as returned by \code{kusto_database_endpoint}.}
\item{qry_cmd}{A string containing the query or command. In KQL, a database management command is a statement that starts with a "."}
\item{...}{Named arguments to be used as parameters for a parameterized query. These are ignored for database management commands.}
\item{.http_status_handler}{The function to use to handle HTTP status codes. The default "stop" will throw an R error via \code{httr::stop_for_status} if the status code is not less than 300; other possibilities are "warn", "message" and "pass". The last option will pass through the raw response object from the server unchanged, regardless of the status code. This is mostly useful for debugging purposes, or if you want to see what the Kusto REST API does.}
}
\description{
Run a query or command against a Kusto database
}
\details{
This function is the workhorse of the AzureKusto package. It communicates with the Kusto server and returns the query or command results, as data frames.
}
\seealso{
\link{kusto_database_endpoint}, \link{ingest_local}, \link{ingest_url}, \link{ingest_blob}, \link{ingest_adls2}
}

150
tests/resources/iris.csv Normal file
Просмотреть файл

@ -0,0 +1,150 @@
5.1,3.5,1.4,0.2,"setosa"
4.9,3,1.4,0.2,"setosa"
4.7,3.2,1.3,0.2,"setosa"
4.6,3.1,1.5,0.2,"setosa"
5,3.6,1.4,0.2,"setosa"
5.4,3.9,1.7,0.4,"setosa"
4.6,3.4,1.4,0.3,"setosa"
5,3.4,1.5,0.2,"setosa"
4.4,2.9,1.4,0.2,"setosa"
4.9,3.1,1.5,0.1,"setosa"
5.4,3.7,1.5,0.2,"setosa"
4.8,3.4,1.6,0.2,"setosa"
4.8,3,1.4,0.1,"setosa"
4.3,3,1.1,0.1,"setosa"
5.8,4,1.2,0.2,"setosa"
5.7,4.4,1.5,0.4,"setosa"
5.4,3.9,1.3,0.4,"setosa"
5.1,3.5,1.4,0.3,"setosa"
5.7,3.8,1.7,0.3,"setosa"
5.1,3.8,1.5,0.3,"setosa"
5.4,3.4,1.7,0.2,"setosa"
5.1,3.7,1.5,0.4,"setosa"
4.6,3.6,1,0.2,"setosa"
5.1,3.3,1.7,0.5,"setosa"
4.8,3.4,1.9,0.2,"setosa"
5,3,1.6,0.2,"setosa"
5,3.4,1.6,0.4,"setosa"
5.2,3.5,1.5,0.2,"setosa"
5.2,3.4,1.4,0.2,"setosa"
4.7,3.2,1.6,0.2,"setosa"
4.8,3.1,1.6,0.2,"setosa"
5.4,3.4,1.5,0.4,"setosa"
5.2,4.1,1.5,0.1,"setosa"
5.5,4.2,1.4,0.2,"setosa"
4.9,3.1,1.5,0.2,"setosa"
5,3.2,1.2,0.2,"setosa"
5.5,3.5,1.3,0.2,"setosa"
4.9,3.6,1.4,0.1,"setosa"
4.4,3,1.3,0.2,"setosa"
5.1,3.4,1.5,0.2,"setosa"
5,3.5,1.3,0.3,"setosa"
4.5,2.3,1.3,0.3,"setosa"
4.4,3.2,1.3,0.2,"setosa"
5,3.5,1.6,0.6,"setosa"
5.1,3.8,1.9,0.4,"setosa"
4.8,3,1.4,0.3,"setosa"
5.1,3.8,1.6,0.2,"setosa"
4.6,3.2,1.4,0.2,"setosa"
5.3,3.7,1.5,0.2,"setosa"
5,3.3,1.4,0.2,"setosa"
7,3.2,4.7,1.4,"versicolor"
6.4,3.2,4.5,1.5,"versicolor"
6.9,3.1,4.9,1.5,"versicolor"
5.5,2.3,4,1.3,"versicolor"
6.5,2.8,4.6,1.5,"versicolor"
5.7,2.8,4.5,1.3,"versicolor"
6.3,3.3,4.7,1.6,"versicolor"
4.9,2.4,3.3,1,"versicolor"
6.6,2.9,4.6,1.3,"versicolor"
5.2,2.7,3.9,1.4,"versicolor"
5,2,3.5,1,"versicolor"
5.9,3,4.2,1.5,"versicolor"
6,2.2,4,1,"versicolor"
6.1,2.9,4.7,1.4,"versicolor"
5.6,2.9,3.6,1.3,"versicolor"
6.7,3.1,4.4,1.4,"versicolor"
5.6,3,4.5,1.5,"versicolor"
5.8,2.7,4.1,1,"versicolor"
6.2,2.2,4.5,1.5,"versicolor"
5.6,2.5,3.9,1.1,"versicolor"
5.9,3.2,4.8,1.8,"versicolor"
6.1,2.8,4,1.3,"versicolor"
6.3,2.5,4.9,1.5,"versicolor"
6.1,2.8,4.7,1.2,"versicolor"
6.4,2.9,4.3,1.3,"versicolor"
6.6,3,4.4,1.4,"versicolor"
6.8,2.8,4.8,1.4,"versicolor"
6.7,3,5,1.7,"versicolor"
6,2.9,4.5,1.5,"versicolor"
5.7,2.6,3.5,1,"versicolor"
5.5,2.4,3.8,1.1,"versicolor"
5.5,2.4,3.7,1,"versicolor"
5.8,2.7,3.9,1.2,"versicolor"
6,2.7,5.1,1.6,"versicolor"
5.4,3,4.5,1.5,"versicolor"
6,3.4,4.5,1.6,"versicolor"
6.7,3.1,4.7,1.5,"versicolor"
6.3,2.3,4.4,1.3,"versicolor"
5.6,3,4.1,1.3,"versicolor"
5.5,2.5,4,1.3,"versicolor"
5.5,2.6,4.4,1.2,"versicolor"
6.1,3,4.6,1.4,"versicolor"
5.8,2.6,4,1.2,"versicolor"
5,2.3,3.3,1,"versicolor"
5.6,2.7,4.2,1.3,"versicolor"
5.7,3,4.2,1.2,"versicolor"
5.7,2.9,4.2,1.3,"versicolor"
6.2,2.9,4.3,1.3,"versicolor"
5.1,2.5,3,1.1,"versicolor"
5.7,2.8,4.1,1.3,"versicolor"
6.3,3.3,6,2.5,"virginica"
5.8,2.7,5.1,1.9,"virginica"
7.1,3,5.9,2.1,"virginica"
6.3,2.9,5.6,1.8,"virginica"
6.5,3,5.8,2.2,"virginica"
7.6,3,6.6,2.1,"virginica"
4.9,2.5,4.5,1.7,"virginica"
7.3,2.9,6.3,1.8,"virginica"
6.7,2.5,5.8,1.8,"virginica"
7.2,3.6,6.1,2.5,"virginica"
6.5,3.2,5.1,2,"virginica"
6.4,2.7,5.3,1.9,"virginica"
6.8,3,5.5,2.1,"virginica"
5.7,2.5,5,2,"virginica"
5.8,2.8,5.1,2.4,"virginica"
6.4,3.2,5.3,2.3,"virginica"
6.5,3,5.5,1.8,"virginica"
7.7,3.8,6.7,2.2,"virginica"
7.7,2.6,6.9,2.3,"virginica"
6,2.2,5,1.5,"virginica"
6.9,3.2,5.7,2.3,"virginica"
5.6,2.8,4.9,2,"virginica"
7.7,2.8,6.7,2,"virginica"
6.3,2.7,4.9,1.8,"virginica"
6.7,3.3,5.7,2.1,"virginica"
7.2,3.2,6,1.8,"virginica"
6.2,2.8,4.8,1.8,"virginica"
6.1,3,4.9,1.8,"virginica"
6.4,2.8,5.6,2.1,"virginica"
7.2,3,5.8,1.6,"virginica"
7.4,2.8,6.1,1.9,"virginica"
7.9,3.8,6.4,2,"virginica"
6.4,2.8,5.6,2.2,"virginica"
6.3,2.8,5.1,1.5,"virginica"
6.1,2.6,5.6,1.4,"virginica"
7.7,3,6.1,2.3,"virginica"
6.3,3.4,5.6,2.4,"virginica"
6.4,3.1,5.5,1.8,"virginica"
6,3,4.8,1.8,"virginica"
6.9,3.1,5.4,2.1,"virginica"
6.7,3.1,5.6,2.4,"virginica"
6.9,3.1,5.1,2.3,"virginica"
5.8,2.7,5.1,1.9,"virginica"
6.8,3.2,5.9,2.3,"virginica"
6.7,3.3,5.7,2.5,"virginica"
6.7,3,5.2,2.3,"virginica"
6.3,2.5,5,1.9,"virginica"
6.5,3,5.2,2,"virginica"
6.2,3.4,5.4,2.3,"virginica"
5.9,3,5.1,1.8,"virginica"
1 5.1 3.5 1.4 0.2 setosa
2 4.9 3 1.4 0.2 setosa
3 4.7 3.2 1.3 0.2 setosa
4 4.6 3.1 1.5 0.2 setosa
5 5 3.6 1.4 0.2 setosa
6 5.4 3.9 1.7 0.4 setosa
7 4.6 3.4 1.4 0.3 setosa
8 5 3.4 1.5 0.2 setosa
9 4.4 2.9 1.4 0.2 setosa
10 4.9 3.1 1.5 0.1 setosa
11 5.4 3.7 1.5 0.2 setosa
12 4.8 3.4 1.6 0.2 setosa
13 4.8 3 1.4 0.1 setosa
14 4.3 3 1.1 0.1 setosa
15 5.8 4 1.2 0.2 setosa
16 5.7 4.4 1.5 0.4 setosa
17 5.4 3.9 1.3 0.4 setosa
18 5.1 3.5 1.4 0.3 setosa
19 5.7 3.8 1.7 0.3 setosa
20 5.1 3.8 1.5 0.3 setosa
21 5.4 3.4 1.7 0.2 setosa
22 5.1 3.7 1.5 0.4 setosa
23 4.6 3.6 1 0.2 setosa
24 5.1 3.3 1.7 0.5 setosa
25 4.8 3.4 1.9 0.2 setosa
26 5 3 1.6 0.2 setosa
27 5 3.4 1.6 0.4 setosa
28 5.2 3.5 1.5 0.2 setosa
29 5.2 3.4 1.4 0.2 setosa
30 4.7 3.2 1.6 0.2 setosa
31 4.8 3.1 1.6 0.2 setosa
32 5.4 3.4 1.5 0.4 setosa
33 5.2 4.1 1.5 0.1 setosa
34 5.5 4.2 1.4 0.2 setosa
35 4.9 3.1 1.5 0.2 setosa
36 5 3.2 1.2 0.2 setosa
37 5.5 3.5 1.3 0.2 setosa
38 4.9 3.6 1.4 0.1 setosa
39 4.4 3 1.3 0.2 setosa
40 5.1 3.4 1.5 0.2 setosa
41 5 3.5 1.3 0.3 setosa
42 4.5 2.3 1.3 0.3 setosa
43 4.4 3.2 1.3 0.2 setosa
44 5 3.5 1.6 0.6 setosa
45 5.1 3.8 1.9 0.4 setosa
46 4.8 3 1.4 0.3 setosa
47 5.1 3.8 1.6 0.2 setosa
48 4.6 3.2 1.4 0.2 setosa
49 5.3 3.7 1.5 0.2 setosa
50 5 3.3 1.4 0.2 setosa
51 7 3.2 4.7 1.4 versicolor
52 6.4 3.2 4.5 1.5 versicolor
53 6.9 3.1 4.9 1.5 versicolor
54 5.5 2.3 4 1.3 versicolor
55 6.5 2.8 4.6 1.5 versicolor
56 5.7 2.8 4.5 1.3 versicolor
57 6.3 3.3 4.7 1.6 versicolor
58 4.9 2.4 3.3 1 versicolor
59 6.6 2.9 4.6 1.3 versicolor
60 5.2 2.7 3.9 1.4 versicolor
61 5 2 3.5 1 versicolor
62 5.9 3 4.2 1.5 versicolor
63 6 2.2 4 1 versicolor
64 6.1 2.9 4.7 1.4 versicolor
65 5.6 2.9 3.6 1.3 versicolor
66 6.7 3.1 4.4 1.4 versicolor
67 5.6 3 4.5 1.5 versicolor
68 5.8 2.7 4.1 1 versicolor
69 6.2 2.2 4.5 1.5 versicolor
70 5.6 2.5 3.9 1.1 versicolor
71 5.9 3.2 4.8 1.8 versicolor
72 6.1 2.8 4 1.3 versicolor
73 6.3 2.5 4.9 1.5 versicolor
74 6.1 2.8 4.7 1.2 versicolor
75 6.4 2.9 4.3 1.3 versicolor
76 6.6 3 4.4 1.4 versicolor
77 6.8 2.8 4.8 1.4 versicolor
78 6.7 3 5 1.7 versicolor
79 6 2.9 4.5 1.5 versicolor
80 5.7 2.6 3.5 1 versicolor
81 5.5 2.4 3.8 1.1 versicolor
82 5.5 2.4 3.7 1 versicolor
83 5.8 2.7 3.9 1.2 versicolor
84 6 2.7 5.1 1.6 versicolor
85 5.4 3 4.5 1.5 versicolor
86 6 3.4 4.5 1.6 versicolor
87 6.7 3.1 4.7 1.5 versicolor
88 6.3 2.3 4.4 1.3 versicolor
89 5.6 3 4.1 1.3 versicolor
90 5.5 2.5 4 1.3 versicolor
91 5.5 2.6 4.4 1.2 versicolor
92 6.1 3 4.6 1.4 versicolor
93 5.8 2.6 4 1.2 versicolor
94 5 2.3 3.3 1 versicolor
95 5.6 2.7 4.2 1.3 versicolor
96 5.7 3 4.2 1.2 versicolor
97 5.7 2.9 4.2 1.3 versicolor
98 6.2 2.9 4.3 1.3 versicolor
99 5.1 2.5 3 1.1 versicolor
100 5.7 2.8 4.1 1.3 versicolor
101 6.3 3.3 6 2.5 virginica
102 5.8 2.7 5.1 1.9 virginica
103 7.1 3 5.9 2.1 virginica
104 6.3 2.9 5.6 1.8 virginica
105 6.5 3 5.8 2.2 virginica
106 7.6 3 6.6 2.1 virginica
107 4.9 2.5 4.5 1.7 virginica
108 7.3 2.9 6.3 1.8 virginica
109 6.7 2.5 5.8 1.8 virginica
110 7.2 3.6 6.1 2.5 virginica
111 6.5 3.2 5.1 2 virginica
112 6.4 2.7 5.3 1.9 virginica
113 6.8 3 5.5 2.1 virginica
114 5.7 2.5 5 2 virginica
115 5.8 2.8 5.1 2.4 virginica
116 6.4 3.2 5.3 2.3 virginica
117 6.5 3 5.5 1.8 virginica
118 7.7 3.8 6.7 2.2 virginica
119 7.7 2.6 6.9 2.3 virginica
120 6 2.2 5 1.5 virginica
121 6.9 3.2 5.7 2.3 virginica
122 5.6 2.8 4.9 2 virginica
123 7.7 2.8 6.7 2 virginica
124 6.3 2.7 4.9 1.8 virginica
125 6.7 3.3 5.7 2.1 virginica
126 7.2 3.2 6 1.8 virginica
127 6.2 2.8 4.8 1.8 virginica
128 6.1 3 4.9 1.8 virginica
129 6.4 2.8 5.6 2.1 virginica
130 7.2 3 5.8 1.6 virginica
131 7.4 2.8 6.1 1.9 virginica
132 7.9 3.8 6.4 2 virginica
133 6.4 2.8 5.6 2.2 virginica
134 6.3 2.8 5.1 1.5 virginica
135 6.1 2.6 5.6 1.4 virginica
136 7.7 3 6.1 2.3 virginica
137 6.3 3.4 5.6 2.4 virginica
138 6.4 3.1 5.5 1.8 virginica
139 6 3 4.8 1.8 virginica
140 6.9 3.1 5.4 2.1 virginica
141 6.7 3.1 5.6 2.4 virginica
142 6.9 3.1 5.1 2.3 virginica
143 5.8 2.7 5.1 1.9 virginica
144 6.8 3.2 5.9 2.3 virginica
145 6.7 3.3 5.7 2.5 virginica
146 6.7 3 5.2 2.3 virginica
147 6.3 2.5 5 1.9 virginica
148 6.5 3 5.2 2 virginica
149 6.2 3.4 5.4 2.3 virginica
150 5.9 3 5.1 1.8 virginica

1052
tests/resources/iris.json Normal file

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -33,6 +33,8 @@ srvloc <- Sys.getenv("AZ_TEST_KUSTO_SERVER_LOCATION")
if(srvname == "" || srvloc == "")
skip("Token acquisition tests skipped: server info not set")
if(!interactive())
skip("Token acquisition tests skipped: must be an interactive session")
# remove all cached Kusto tokens before testing
lapply(AzureRMR::list_azure_tokens(), function(token)

Просмотреть файл

@ -15,37 +15,43 @@ dbname <- Sys.getenv("AZ_TEST_KUSTO_DATABASE")
if(rgname == "" || srvname == "" || dbname == "")
skip("Database endpoint tests skipped: server info not set")
if(!interactive())
skip("Database endpoint tests skipped: must be an interactive session")
rg <- AzureRMR::az_rm$
new(tenant=tenant, app=app, password=password)$
get_subscription(subscription)$
get_resource_group(rgname)
# should only get one devicecode prompt here at most
# should only get 2 devicecode prompts here at most
test_that("Resource access functions work",
{
srv <- rg$get_kusto_cluster(srvname)
expect_true(is_kusto_cluster(srv))
expect_true(AzureRMR::is_azure_token(srv$get_aad_token()))
expect_true(AzureRMR::is_azure_token(srv$get_aad_token(app=app, password=password)))
expect_true(AzureRMR::is_azure_token(srv$get_query_token()))
expect_true(AzureRMR::is_azure_token(srv$get_query_token(app=app, password=password)))
expect_true(AzureRMR::is_azure_token(srv$get_ingestion_token()))
expect_true(AzureRMR::is_azure_token(srv$get_ingestion_token(app=app, password=password)))
db <- srv$get_database(dbname)
expect_true(is_kusto_database(db))
endp1 <- db$get_query_endpoint()
endp1 <- db$get_database_endpoint()
expect_is(endp1, "kusto_database_endpoint")
server <- srv$properties$queryUri
endp2 <- kusto_query_endpoint(server=server, database=dbname, tenantid=tenant)
endp2 <- kusto_database_endpoint(server=server, database=dbname, tenantid=tenant)
expect_is(endp2, "kusto_database_endpoint")
endp3 <- kusto_query_endpoint(server=server, database=dbname,
.azure_token=get_kusto_token(cluster=srvname, location=srv$location, tenant=tenant))
endp3 <- kusto_database_endpoint(server=server, database=dbname,
.query_token=get_kusto_token(cluster=srvname, location=srv$location, tenant=tenant))
expect_is(endp3, "kusto_database_endpoint")
conn_str <- sprintf("server=%s;database=%s;tenantid=%s", server, dbname, tenant)
endp4 <- kusto_query_endpoint(.connection_string=conn_str)
endp4 <- kusto_database_endpoint(.connection_string=conn_str)
expect_is(endp4, "kusto_database_endpoint")
expect_identical(endp1$token$hash(), endp2$token$hash())
@ -53,29 +59,29 @@ test_that("Resource access functions work",
expect_identical(endp1$token$hash(), endp4$token$hash())
# using our own app
endp5 <- kusto_query_endpoint(server=server, database=dbname, tenantid=tenant,
endp5 <- kusto_database_endpoint(server=server, database=dbname, tenantid=tenant,
appclientid=app, appkey=password)
expect_is(endp5, "kusto_database_endpoint")
# no trailing / on server should trigger warning
expect_warning(kusto_query_endpoint(
expect_warning(kusto_database_endpoint(
server=sprintf("https://%s.%s.kusto.windows.net", srvname, srv$location),
database=dbname,
.azure_token=endp4$token))
.query_token=endp4$token))
# invalid property
expect_error(kusto_property_endpoint(badproperty="foo"))
# unsupported property (change this if/when federated auth is supported)
expect_warning(kusto_query_endpoint(server=server, database=dbname, tenantid=tenant, fed=TRUE))
expect_warning(kusto_database_endpoint(server=server, database=dbname, tenantid=tenant, fed=TRUE))
# quote stripping
endp6 <- kusto_query_endpoint(server=sprintf("'%s'", server), database=dbname, tenantid=tenant)
endp6 <- kusto_database_endpoint(server=sprintf("'%s'", server), database=dbname, tenantid=tenant)
expect_identical(endp6$server, endp1$server)
# connection string type handling (change this if/when federated auth is supported)
conn_str2 <- sprintf("server=%s;database=%s;tenantid=%s;fed=true", server, dbname, tenant)
expect_warning(kusto_query_endpoint(.connection_string=conn_str2))
expect_warning(kusto_database_endpoint(.connection_string=conn_str2))
})

Просмотреть файл

@ -17,8 +17,8 @@ if(srvname == "" || srvloc == "" || dbname == "")
server <- sprintf("https://%s.%s.kusto.windows.net", srvname, srvloc)
db <- kusto_query_endpoint(server=server, database=dbname, tenantid=tenant)
db2 <- kusto_query_endpoint(server=server, database=dbname, tenantid=tenant,
db <- kusto_database_endpoint(server=server, database=dbname, tenantid=tenant)
db2 <- kusto_database_endpoint(server=server, database=dbname, tenantid=tenant,
appclientid=app, appkey=password)
test_that("Queries work",
@ -33,12 +33,12 @@ test_that("Queries work",
test_that("Commands work",
{
out <- run_command(db2, ".show cluster")
out <- run_query(db, ".show cluster")
expect_is(out, "data.frame")
dberr <- db
dberr$token <- NULL
expect_error(run_command(dberr, ".show cluster"))
expect_error(run_query(dberr, ".show cluster"))
})
test_that("Queries work with own app",
@ -53,10 +53,10 @@ test_that("Queries work with own app",
test_that("Commands work with own app",
{
out <- run_command(db2, ".show cluster")
out <- run_query(db2, ".show cluster")
expect_is(out, "data.frame")
dberr <- db2
dberr$token <- NULL
expect_error(run_command(dberr, ".show cluster"))
expect_error(run_query(dberr, ".show cluster"))
})

Просмотреть файл

@ -0,0 +1,128 @@
context("Ingesting")
tenant <- Sys.getenv("AZ_TEST_TENANT_ID")
app <- Sys.getenv("AZ_TEST_APP_ID")
password <- Sys.getenv("AZ_TEST_PASSWORD")
subscription <- Sys.getenv("AZ_TEST_SUBSCRIPTION")
if(tenant == "" || app == "" || password == "" || subscription == "")
skip("Tests skipped: ARM credentials not set")
if(!requireNamespace("AzureStor"))
skip("Database ingestion tests skipped: AzureStor package not found")
# use persistent testing resources
rgname <- Sys.getenv("AZ_TEST_KUSTO_SERVER_RG")
username <- Sys.getenv("AZ_TEST_KUSTO_USERNAME")
srvname <- Sys.getenv("AZ_TEST_KUSTO_SERVER")
srvloc <- Sys.getenv("AZ_TEST_KUSTO_SERVER_LOCATION")
blobacct <- Sys.getenv("AZ_TEST_KUSTO_BLOBACCT")
blobcont <- Sys.getenv("AZ_TEST_KUSTO_BLOBCONT")
adlsacct <- Sys.getenv("AZ_TEST_KUSTO_ADLSACCT")
adlscont <- Sys.getenv("AZ_TEST_KUSTO_ADLSCONT")
if(rgname == "" || username == "" ||
srvname == "" || srvloc == "" ||
blobacct == "" || blobcont == "" ||
adlsacct == "" || adlscont == "")
skip("Database ingestion tests skipped: server info not set")
rg <- AzureRMR::az_rm$
new(tenant=tenant, app=app, password=password)$
get_subscription(subscription)$
get_resource_group(rgname)
srv <- rg$get_kusto_cluster(srvname)
# generate random ingestion database
dbname <- paste0("ingest", paste0(sample(letters, 5), collapse=""))
dbres <- srv$create_database(dbname)
dbres$add_principals(name="user1", role="Admin", fqn=paste0("aaduser=", username))
db <- dbres$get_database_endpoint()
blobstor <- rg$get_storage_account(blobacct)
adlsstor <- rg$get_storage_account(adlsacct)
test_that("blob ingestion works",
{
bloburl <- sprintf("https://%s.blob.core.windows.net/%s/iris.csv", blobacct, blobcont)
# blob with key
blobkey <- blobstor$list_keys()[1]
run_query(db, ".create table irisblobkey (sl:real, sw:real, pl:real, pw:real, species:string)")
ingest_blob(db, bloburl, "irisblobkey", key=blobkey, ignoreFirstRecord=TRUE)
expect_equal(run_query(db, "irisblobkey | count")$Count, 150)
# blob with sas
blobsas <- blobstor$get_account_sas()
run_query(db, ".create table irisblobsas (sl:real, sw:real, pl:real, pw:real, species:string)")
ingest_blob(db, bloburl, "irisblobsas", sas=blobsas, ignoreFirstRecord=TRUE)
expect_equal(run_query(db, "irisblobsas | count")$Count, 150)
})
test_that("ADLSgen2 ingestion works",
{
adlsurl <- sprintf("https://%s.dfs.core.windows.net/%s/iris.csv", adlsacct, adlscont)
# adls2 with key
adlskey <- adlsstor$list_keys()[1]
run_query(db, ".create table irisadlskey (sl:real, sw:real, pl:real, pw:real, species:string)")
ingest_adls2(db, adlsurl, "irisadlskey", key=adlskey, ignoreFirstRecord=TRUE)
expect_equal(run_query(db, "irisadlskey | count")$Count, 150)
# skip adls2 token test for now
# adlstok <- AzureRMR::get_azure_token(sprintf("https://%s.dfs.core.windows.net", adlsacct),
# tenant=tenant, app=app, password=password)
# run_query(db, ".create table irisadlskey (sl:real, sw:real, pl:real, pw:real, species:string)")
# ingest_adls2(db, adlsurl, "irisadlskey", tok=adlstok, ignoreFirstRecord=TRUE)
# expect_equal(run_query(db, "irisadlskey | count")$Count, 150)
})
test_that("local indirect ingestion works",
{
bl <- blobstor$get_blob_endpoint()
stagecont <- AzureStor::create_blob_container(bl, db$database)
run_query(db, ".create table irisfileindirect (sl:real, sw:real, pl:real, pw:real, species:string)")
ingest_local(db, "../resources/iris.csv", "irisfileindirect", method="indirect",
staging_container=stagecont)
expect_equal(run_query(db, "irisfileindirect | count")$Count, 150)
run_query(db, ".create table irisdfindirect (sl:real, sw:real, pl:real, pw:real, species:string)")
ingest_local(db, iris, "irisdfindirect", method="indirect",
staging_container=stagecont)
expect_equal(run_query(db, "irisdfindirect | count")$Count, 150)
})
test_that("local streaming ingestion works",
{
run_query(db,
sprintf(".alter database %s policy streamingingestion '{\"NumberOfRowStores\": 10}'", db$database))
run_query(db, ".create table irisfilestream (sl:real, sw:real, pl:real, pw:real, species:string)")
ingest_local(db, "../resources/iris.csv", "irisfilestream", method="streaming",
streamFormat="Csv")
expect_equal(run_query(db, "irisfilestream | count")$Count, 150)
run_query(db, ".create table irisdfstream (sl:real, sw:real, pl:real, pw:real, species:string)")
ingest_local(db, iris, "irisdfstream", method="streaming")
expect_equal(run_query(db, "irisdfstream | count")$Count, 150)
})
test_that("local inline ingestion works",
{
run_query(db, ".create table irisfileinline (sl:real, sw:real, pl:real, pw:real, species:string)")
ingest_local(db, "../resources/iris.csv", "irisfileinline", method="inline")
expect_equal(run_query(db, "irisfileinline | count")$Count, 150)
run_query(db, ".create table irisdfinline (sl:real, sw:real, pl:real, pw:real, species:string)")
ingest_local(db, iris, "irisdfinline", method="inline")
expect_equal(run_query(db, "irisdfinline | count")$Count, 150)
})
srv$delete_database(dbname, confirm=FALSE)
delete_blob_container(blobstor$get_blob_endpoint(), dbname, confirm=FALSE)