зеркало из https://github.com/Azure/AzureKusto.git
289 строки
11 KiB
R
289 строки
11 KiB
R
#' Endpoints for communicating with a Kusto database
|
|
#'
|
|
#' @param ... Named arguments which are the properties for the endpoint object. See 'Details' below for the properties that AzureKusto recognises.
|
|
#' @param .connection_string An alternative way of specifying the properties, as a database connection string. Properties supplied here override those in `...` if they overlap.
|
|
#' @param .query_token Optionally, an Azure Active Directory (AAD) token to authenticate with. If this is supplied, it overrides other tokens specified in `...` or in the connection string.
|
|
#' @param .use_integer64 For `kusto_database_endpoint`, whether to convert columns with Kusto `long` datatype into 64-bit integers in R, using the bit64 package. If FALSE, represent them as numeric instead.
|
|
#'
|
|
#' @details
|
|
#' This is a list of properties recognised by `kusto_database_endpoint`, and their alternate names. Property names not in this list will generate an error. Note that not all properties that are recognised are currently supported by AzureKusto.
|
|
#'
|
|
#' General properties:
|
|
#' - server: The URI of the server, usually of the form 'https://clustername.location.kusto.windows.net'.
|
|
#' * addr, address, network address, datasource, host
|
|
#' - database: The database.
|
|
#' * initialcatalog, dbname
|
|
#' - tenantid: The AAD tenant name or ID to authenticate with.
|
|
#' * authority
|
|
#' - appclientid: The AAD app/service principal ID
|
|
#' * applicationclientid
|
|
#' - traceclientversion: The client version for tracing.
|
|
#' - queryconsistency: The level of query consistency. Defaults to "weakconsistency".
|
|
#' - response_dynamic_serialization: How to serialize dynamic responses.
|
|
#' - response_dynamic_serialization_2: How to serialize dynamic responses.
|
|
#'
|
|
#' User authentication properties:
|
|
# - pwd: The user password.
|
|
#' * password
|
|
#' - user: The user name.
|
|
#' * uid, userid
|
|
#' - traceusername: The user name for tracing.
|
|
#' - usertoken: The AAD token for user authentication.
|
|
#' - * usertoken, usrtoken
|
|
#' - fed: Logical, whether federated authentication is enabled. Currently unsupported; if this is TRUE, `kusto_database_endpoint` will print a warning and ignore it.
|
|
#' * federated security, federated, aadfed, aadfederatedsecurity
|
|
#'
|
|
#' App authentication properties:
|
|
#' - appkey: The secret key for the app.
|
|
#' * applicationkey
|
|
#' - traceappname: The AAD app for tracing.
|
|
#' - apptoken: The AAD token for app authentication.
|
|
#' * apptoken, applicationtoken
|
|
#'
|
|
#' Currently, AzureKusto only supports authentication via Azure Active Directory. Authenticating with DSTS is planned for the future.
|
|
#'
|
|
#' The way `kusto_database_endpoint` obtains an AAD token is as follows.
|
|
#' 1. If the `.query_token` argument is supplied, use it.
|
|
#' 2. Otherwise, if the `usertoken` property is supplied, use it.
|
|
#' 3. Otherwise, if the `apptoken` property is supplied, use it.
|
|
#' 4. Otherwise, if the `appclientid` property is supplied, use it to obtain a token:
|
|
#' - With the `user` and `pwd` properties if available
|
|
#' - Or with the `appkey` property if available
|
|
#' - Otherwise do an interactive authentication and ask for the user credentials
|
|
#' 5. Otherwise, if no `appclientid` property is supplied, authenticate with the KustoClient app:
|
|
#' - With the `user` and `pwd` properties if available
|
|
#' - Otherwise do an interactive authentication and ask for the user credentials using a device code
|
|
#'
|
|
#' @return
|
|
#' An object of class `kusto_database_endpoint`.
|
|
#'
|
|
#' @examples
|
|
#' \dontrun{
|
|
#'
|
|
#' kusto_database_endpoint(server="myclust.australiaeast.kusto.windows.net", database="db1")
|
|
#'
|
|
#' # supplying a token obtained previously
|
|
#' token <- get_kusto_token("myclust.australiaeast.kusto.windows.net")
|
|
#' kusto_database_endpoint(server="myclust.australiaeast.kusto.windows.net", database="db1",
|
|
#' .query_token=token)
|
|
#'
|
|
#' }
|
|
#' @seealso
|
|
#' [run_query], [az_kusto_database]
|
|
#' @rdname database_endpoint
|
|
#' @export
|
|
kusto_database_endpoint <- function(..., .connection_string=NULL, .query_token=NULL, .use_integer64=FALSE)
|
|
{
|
|
props <- list(...)
|
|
names(props) <- tolower(names(props))
|
|
|
|
if(!is.null(.connection_string))
|
|
{
|
|
# simplified connection string handling: ignores quoting issues
|
|
conn_props <- strsplit(.connection_string, ";")[[1]]
|
|
names(conn_props) <- tolower(sapply(conn_props, function(x) sub("[ ]*=.+$", "", x)))
|
|
conn_props <- lapply(conn_props, function(x)
|
|
{
|
|
x <- sub("^[^=]+=[ ]*", "", x)
|
|
find_type_from_connstring(x)
|
|
})
|
|
props <- utils::modifyList(props, conn_props)
|
|
}
|
|
|
|
# fix all property names to a given (sub)set, remove quotes from quoted values
|
|
props <- normalize_connstring_properties(props)
|
|
|
|
# Make bare cluster name into FQDN for server if it's not already
|
|
if (!startsWith(props$server, "https://"))
|
|
{
|
|
props$server <- paste0("https://", props$server)
|
|
if (!endsWith(props$server, ".kusto.windows.net"))
|
|
props$server <- paste0(props$server, ".kusto.windows.net")
|
|
}
|
|
props$token <- find_endpoint_token(props, .query_token)
|
|
|
|
props$use_integer64 <- .use_integer64
|
|
props <- check_endpoint_properties(props)
|
|
|
|
class(props) <- "kusto_database_endpoint"
|
|
props
|
|
}
|
|
|
|
|
|
normalize_connstring_properties <- function(properties)
|
|
{
|
|
# valid property names for a Kusto connection string
|
|
property_list <- list(
|
|
# general properties
|
|
traceclientversion="traceclientversion",
|
|
server=c("server", "addr", "address", "network address", "datasource", "host", "cluster"),
|
|
database=c("database", "initialcatalog", "dbname"),
|
|
tenantid=c("tenantid", "authority"),
|
|
queryconsistency="queryconsistency",
|
|
response_dynamic_serialization="response_dynamic_serialization",
|
|
response_dynamic_serialization_2="response_dynamic_serialization_2",
|
|
|
|
# userauth properties -- DSTS not yet supported
|
|
fed=c("fed", "federated security", "federated", "aadfed", "aadfederatedsecurity"),
|
|
pwd=c("pwd", "password"),
|
|
user=c("user", "uid", "userid"),
|
|
traceusername="traceusername",
|
|
usertoken=c("usertoken", "usrtoken"),
|
|
|
|
# appauth properties -- cert, DSTS not yet supported
|
|
appclientid=c("appclientid", "applicationclientid"),
|
|
appkey=c("appkey", "applicationkey"),
|
|
traceappname="traceappname",
|
|
apptoken=c("apptoken", "applicationtoken")
|
|
)
|
|
|
|
normalize_name <- function(x)
|
|
{
|
|
for(i in seq_along(property_list))
|
|
{
|
|
if(x %in% property_list[[i]])
|
|
return(names(property_list)[i])
|
|
}
|
|
stop("Invalid/unsupported property name: ", x, call.=FALSE)
|
|
}
|
|
|
|
strip_quotes <- function(x)
|
|
{
|
|
if(is.character(x))
|
|
sub("^['\"](.+)['\"]$", "\\1", x)
|
|
else x
|
|
}
|
|
|
|
names(properties) <- sapply(names(properties), normalize_name)
|
|
lapply(properties, strip_quotes)
|
|
}
|
|
|
|
|
|
# for a property obtained from a connection string, convert to a type other than char if possible
|
|
find_type_from_connstring <- function(string)
|
|
{
|
|
if(identical(string, "NA"))
|
|
return(NA)
|
|
if(!is.na(x <- suppressWarnings(as.numeric(string)))) # assign inside comparison!
|
|
return(x)
|
|
if(!is.na(l <- as.logical(string)))
|
|
return(l)
|
|
string
|
|
}
|
|
|
|
|
|
find_endpoint_token <- function(properties, .query_token)
|
|
{
|
|
if(!is.null(.query_token))
|
|
return(.query_token)
|
|
|
|
# properties to check for token: usertoken, apptoken, appclientid, appkey
|
|
if(!is_empty(properties$usertoken))
|
|
return(properties$usertoken)
|
|
|
|
if(!is_empty(properties$apptoken))
|
|
return(properties$apptoken)
|
|
|
|
# if no app ID supplied, insert Kusto app ID
|
|
if(is_empty(properties$appclientid))
|
|
{
|
|
message("No app ID supplied; using KustoClient app")
|
|
properties$appclientid <- .kusto_app_id
|
|
auth_type <- "authorization_code"
|
|
}
|
|
else auth_type <- NULL # KustoClient needs devicecode, otherwise let get_azure_token choose
|
|
|
|
# possibilities for authenticating with AAD:
|
|
# - appid + username + userpwd
|
|
# - appid + appkey
|
|
# - appid only (auth_code/device_code flow)
|
|
token_pwd <- token_user <- NULL
|
|
|
|
if(!is_empty(properties$user) && !is_empty(properties$pwd))
|
|
{
|
|
token_pwd <- properties$pwd
|
|
token_user <- properties$user
|
|
auth_type <- "resource_owner"
|
|
}
|
|
else if(!is_empty(properties$appkey) && properties$appclientid != .kusto_app_id)
|
|
{
|
|
token_pwd <- properties$appkey
|
|
auth_type <- "client_credentials"
|
|
}
|
|
|
|
return(get_kusto_token(properties$server, tenant=properties$tenantid,
|
|
app=properties$appclientid, password=token_pwd, username=token_user, auth_type=auth_type))
|
|
}
|
|
|
|
|
|
# internal checks on various properties
|
|
check_endpoint_properties <- function(props)
|
|
{
|
|
if(isTRUE(props$use_integer64) && !requireNamespace("bit64", quietly=TRUE))
|
|
{
|
|
warning("bit64 package not installed, cannot use 64-bit integers")
|
|
props$use_integer64 <- FALSE
|
|
}
|
|
|
|
if(!is_empty(props[["response_dynamic_serialization"]]) &&
|
|
tolower(props[["response_dynamic_serialization"]]) != "string")
|
|
{
|
|
warning("Serialization of dynamic response to JSON is not yet supported")
|
|
props[["response_dynamic_serialization"]] <- NULL
|
|
}
|
|
|
|
props
|
|
}
|
|
|
|
|
|
#' @export
|
|
print.kusto_database_endpoint <- function(x, ...)
|
|
{
|
|
url <- httr::parse_url(x$server)
|
|
url$path <- x$database
|
|
cat("<Kusto database endpoint '", httr::build_url(url), "'>\n", sep="")
|
|
invisible(x)
|
|
}
|
|
|
|
#' This function uploads a local data frame into a remote data source, creating the table definition as needed.
|
|
#' If the table exists, it will append the data to the existing table. If not, it will create a new table.
|
|
#' @export
|
|
#' @param dest remote data source
|
|
#' @param df local data frame
|
|
#' @param name Name for new remote table
|
|
#' @param overwrite If `TRUE`, will overwrite an existing table with
|
|
#' name `name`. If `FALSE`, will throw an error if `name` already
|
|
#' exists.
|
|
#' @param method For local ingestion, the method to use. "inline", "streaming", or "indirect".
|
|
#' @param ... other parameters passed to the query
|
|
#' @seealso [collect()] for the opposite action; downloading remote data into a local tbl.
|
|
copy_to.kusto_database_endpoint <- function(dest, df, name=deparse(substitute(df)), overwrite = FALSE, method = "inline", ...)
|
|
{
|
|
if (!is.data.frame(df) && !inherits(df, "tbl_kusto"))
|
|
stop("`df` must be a local dataframe or a remote tbl_kusto", call. = FALSE)
|
|
|
|
if (inherits(df, "tbl_kusto") && dest$server == df$src$server)
|
|
out <- compute(df, name = name, ...)
|
|
else
|
|
{
|
|
df <- collect(df)
|
|
class(df) <- "data.frame" # avoid S4 dispatch problem in dbSendPreparedQuery
|
|
|
|
#initialize DBI connection
|
|
cnxn <- new("AzureKustoConnection", endpoint=dest)
|
|
tableExists <- DBI::dbExistsTable(cnxn, name)
|
|
if (tableExists)
|
|
{
|
|
if(overwrite)
|
|
DBI::dbRemoveTable(cnxn, name)
|
|
else stop(paste0("table ",
|
|
name,
|
|
" already exists. If you wish to overwrite it, specify overwrite=TRUE"))
|
|
}
|
|
dbWriteTable(cnxn, name, df, method=method)
|
|
|
|
out <- tbl_kusto(dest, name)
|
|
}
|
|
invisible(out)
|
|
}
|