|
|
|
@ -9,66 +9,62 @@
|
|
|
|
|
#' df <- data.frame(x = 1, y = 2)
|
|
|
|
|
#'
|
|
|
|
|
#' df <- tbl_kusto_abstract(df, "table1")
|
|
|
|
|
#' df %>% summarise(x = sd(x)) %>% show_query()
|
|
|
|
|
#' df %>%
|
|
|
|
|
#' summarise(x = sd(x)) %>%
|
|
|
|
|
#' show_query()
|
|
|
|
|
tbl_kusto_abstract <- function(df, table_name, ...) {
|
|
|
|
|
params <- list(...)
|
|
|
|
|
src <- structure(
|
|
|
|
|
list(
|
|
|
|
|
database = "local_df",
|
|
|
|
|
server = "local_df",
|
|
|
|
|
table = escape(ident(table_name))
|
|
|
|
|
),
|
|
|
|
|
class = "kusto_database_endpoint"
|
|
|
|
|
)
|
|
|
|
|
make_tbl("kusto_abstract", ops = op_base_local(df), src = src, params = params)
|
|
|
|
|
params <- list(...)
|
|
|
|
|
src <- structure(
|
|
|
|
|
list(
|
|
|
|
|
database = "local_df",
|
|
|
|
|
server = "local_df",
|
|
|
|
|
table = escape(ident(table_name))
|
|
|
|
|
),
|
|
|
|
|
class = "kusto_database_endpoint"
|
|
|
|
|
)
|
|
|
|
|
make_tbl("kusto_abstract", ops = op_base_local(df), src = src, params = params)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
setOldClass(c("tbl_kusto_abstract", "tbl"))
|
|
|
|
|
|
|
|
|
|
#' @export
|
|
|
|
|
select.tbl_kusto_abstract <- function(.data, ...)
|
|
|
|
|
{
|
|
|
|
|
select.tbl_kusto_abstract <- function(.data, ...) {
|
|
|
|
|
dots <- quos(...)
|
|
|
|
|
add_op_single("select", .data, dots = dots)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' @export
|
|
|
|
|
distinct.tbl_kusto_abstract <- function(.data, ...)
|
|
|
|
|
{
|
|
|
|
|
distinct.tbl_kusto_abstract <- function(.data, ...) {
|
|
|
|
|
dots <- quos(...)
|
|
|
|
|
dots <- partial_eval(dots, vars = op_vars(.data))
|
|
|
|
|
add_op_single("distinct", .data, dots = dots)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' @export
|
|
|
|
|
rename.tbl_kusto_abstract <- function(.data, ...)
|
|
|
|
|
{
|
|
|
|
|
rename.tbl_kusto_abstract <- function(.data, ...) {
|
|
|
|
|
dots <- quos(...)
|
|
|
|
|
add_op_single("rename", .data, dots = dots)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' @export
|
|
|
|
|
filter.tbl_kusto_abstract <- function(.data, ...)
|
|
|
|
|
{
|
|
|
|
|
filter.tbl_kusto_abstract <- function(.data, ...) {
|
|
|
|
|
dots <- quos(...)
|
|
|
|
|
# add the tbl params into the environment of the expression's quosure
|
|
|
|
|
dots <- lapply(dots, add_params_to_quosure, params=.data$params)
|
|
|
|
|
dots <- lapply(dots, add_params_to_quosure, params = .data$params)
|
|
|
|
|
dots <- partial_eval(dots, vars = op_vars(.data))
|
|
|
|
|
add_op_single("filter", .data, dots = dots)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' @export
|
|
|
|
|
mutate.tbl_kusto_abstract <- function(.data, ...)
|
|
|
|
|
{
|
|
|
|
|
dots <- quos(..., .named=TRUE)
|
|
|
|
|
dots <- lapply(dots, add_params_to_quosure, params=.data$params)
|
|
|
|
|
mutate.tbl_kusto_abstract <- function(.data, ...) {
|
|
|
|
|
dots <- quos(..., .named = TRUE)
|
|
|
|
|
dots <- lapply(dots, add_params_to_quosure, params = .data$params)
|
|
|
|
|
dots <- partial_eval(dots, vars = op_vars(.data))
|
|
|
|
|
add_op_single("mutate", .data, dots = dots)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' @export
|
|
|
|
|
arrange.tbl_kusto_abstract <- function(.data, ...)
|
|
|
|
|
{
|
|
|
|
|
arrange.tbl_kusto_abstract <- function(.data, ...) {
|
|
|
|
|
dots <- quos(...)
|
|
|
|
|
dots <- partial_eval(dots, vars = op_vars(.data))
|
|
|
|
|
names(dots) <- NULL
|
|
|
|
@ -76,26 +72,24 @@ arrange.tbl_kusto_abstract <- function(.data, ...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' @export
|
|
|
|
|
group_by.tbl_kusto_abstract <- function(.data, ..., add = FALSE)
|
|
|
|
|
{
|
|
|
|
|
group_by.tbl_kusto_abstract <- function(.data, ..., add = FALSE) {
|
|
|
|
|
dots <- quos(...)
|
|
|
|
|
dots <- partial_eval(dots, vars = op_vars(.data))
|
|
|
|
|
if (is_empty(dots))
|
|
|
|
|
{
|
|
|
|
|
if (is_empty(dots)) {
|
|
|
|
|
return(.data)
|
|
|
|
|
}
|
|
|
|
|
# Updated for dplyr deprecation of .dots and add params
|
|
|
|
|
groups <- group_by_prepare(.data, !!!dots, .add = add)
|
|
|
|
|
names <- vapply(groups$groups, as_string, character(1))
|
|
|
|
|
add_op_single("group_by",
|
|
|
|
|
groups$data,
|
|
|
|
|
dots = set_names(groups$groups, names),
|
|
|
|
|
args = list(add = FALSE))
|
|
|
|
|
groups$data,
|
|
|
|
|
dots = set_names(groups$groups, names),
|
|
|
|
|
args = list(add = FALSE)
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' @export
|
|
|
|
|
ungroup.tbl_kusto_abstract <- function(x, ...)
|
|
|
|
|
{
|
|
|
|
|
ungroup.tbl_kusto_abstract <- function(x, ...) {
|
|
|
|
|
add_op_single("ungroup", x)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -117,24 +111,25 @@ ungroup.tbl_kusto_abstract <- function(x, ...)
|
|
|
|
|
#' tbl1 <- tbl_kusto(db, "table1")
|
|
|
|
|
#'
|
|
|
|
|
#' ## standard dplyr syntax:
|
|
|
|
|
#' summarise(tbl1, mx=mean(x))
|
|
|
|
|
#' summarise(tbl1, mx = mean(x))
|
|
|
|
|
#'
|
|
|
|
|
#' ## Kusto extensions:
|
|
|
|
|
#' summarise(tbl1, mx=mean(x), .strategy="broadcast") # a broadcast summarise
|
|
|
|
|
#' summarise(tbl1, mx = mean(x), .strategy = "broadcast") # a broadcast summarise
|
|
|
|
|
#'
|
|
|
|
|
#' summarise(tbl1, mx=mean(x), .shufflekeys=c("var1", "var2")) # shuffle summarise with shuffle keys
|
|
|
|
|
#' summarise(tbl1, mx = mean(x), .shufflekeys = c("var1", "var2")) # shuffle summarise with shuffle keys
|
|
|
|
|
#'
|
|
|
|
|
#' summarise(tbl1, mx=mean(x), .num_partitions=5) # no. of partitions for a shuffle summarise
|
|
|
|
|
#' summarise(tbl1, mx = mean(x), .num_partitions = 5) # no. of partitions for a shuffle summarise
|
|
|
|
|
#' }
|
|
|
|
|
#'
|
|
|
|
|
#' @rdname summarise
|
|
|
|
|
#' @export
|
|
|
|
|
summarise.tbl_kusto_abstract <- function(.data, ..., .strategy = NULL, .shufflekeys = NULL, .num_partitions = NULL)
|
|
|
|
|
{
|
|
|
|
|
summarise.tbl_kusto_abstract <- function(.data, ..., .strategy = NULL, .shufflekeys = NULL, .num_partitions = NULL) {
|
|
|
|
|
dots <- quos(..., .named = TRUE)
|
|
|
|
|
dots <- partial_eval(dots, vars = op_vars(.data))
|
|
|
|
|
add_op_single("summarise", .data, dots = dots,
|
|
|
|
|
args = list(.strategy = .strategy, .shufflekeys = .shufflekeys, .num_partitions = .num_partitions))
|
|
|
|
|
add_op_single("summarise", .data,
|
|
|
|
|
dots = dots,
|
|
|
|
|
args = list(.strategy = .strategy, .shufflekeys = .shufflekeys, .num_partitions = .num_partitions)
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' Unnest method for Kusto tables
|
|
|
|
@ -142,14 +137,28 @@ summarise.tbl_kusto_abstract <- function(.data, ..., .strategy = NULL, .shufflek
|
|
|
|
|
#' This method takes a list column and expands it so that each element of the list gets its own row.
|
|
|
|
|
#' unnest() translates to Kusto's mv-expand operator.
|
|
|
|
|
#'
|
|
|
|
|
#' @param .data A Kusto tbl.
|
|
|
|
|
#' @param ... Specification of columns to unnest.
|
|
|
|
|
#' @param data A Kusto tbl.
|
|
|
|
|
#' @param cols Specification of columns to unnest.
|
|
|
|
|
#' @param ... `r lifecycle::badge("deprecated")`:
|
|
|
|
|
#' previously you could write `df %>% unnest(x, y, z)`.
|
|
|
|
|
#' Convert to `df %>% unnest(c(x, y, z))`. If you previously created a new
|
|
|
|
|
#' variable in `unnest()` you'll now need to do it explicitly with `mutate()`.
|
|
|
|
|
#' Convert `df %>% unnest(y = fun(x, y, z))`
|
|
|
|
|
#' to `df %>% mutate(y = fun(x, y, z)) %>% unnest(y)`.
|
|
|
|
|
#' @param keep_empty Needed for agreement with generic. Not otherwise used. Kusto does not keep empty rows.
|
|
|
|
|
#' @param ptype Needed for agreement with generic. Not otherwise used.
|
|
|
|
|
#' @param names_sep Needed for agreement with generic. Not otherwise used.
|
|
|
|
|
#' @param names_repair Needed for agreement with generic. Not otherwise used.
|
|
|
|
|
#' @param .drop Needed for agreement with generic. Not otherwise used.
|
|
|
|
|
#' @param .id Data frame identifier - if supplied, will create a new column with name .id, giving a unique identifier. This is most useful if the list column is named.
|
|
|
|
|
#' @param .sep Needed for agreement with generic. Not otherwise used.
|
|
|
|
|
#' @param .preserve Needed for agreement with generic. Not otherwise used.
|
|
|
|
|
#' @export
|
|
|
|
|
unnest.tbl_kusto_abstract <- function(.data, ..., .id = NULL)
|
|
|
|
|
{
|
|
|
|
|
dots <- quos(...)
|
|
|
|
|
add_op_single("unnest", .data, dots = dots, args = list(.id = .id))
|
|
|
|
|
unnest.tbl_kusto_abstract <- function(data, cols, ..., keep_empty = FALSE, ptype = NULL,
|
|
|
|
|
names_sep = NULL, names_repair = NULL, .id = NULL) {
|
|
|
|
|
# dots <- quos(...)
|
|
|
|
|
dots <- enquo(cols)
|
|
|
|
|
add_op_single("unnest", data, dots = dots, args = list(.id = .id))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' Nest method for Kusto tables
|
|
|
|
@ -159,32 +168,31 @@ unnest.tbl_kusto_abstract <- function(.data, ..., .id = NULL)
|
|
|
|
|
#' @param .data A kusto tbl.
|
|
|
|
|
#' @param ... Specification of columns to nest. Translates to summarize make_list() in Kusto.
|
|
|
|
|
#' @export
|
|
|
|
|
nest.tbl_kusto_abstract <- function(.data, ...)
|
|
|
|
|
{
|
|
|
|
|
nest.tbl_kusto_abstract <- function(.data, ...) {
|
|
|
|
|
nest_vars <- unname(tidyselect::vars_select(op_vars(.data), ...))
|
|
|
|
|
|
|
|
|
|
if (is_empty(nest_vars))
|
|
|
|
|
if (is_empty(nest_vars)) {
|
|
|
|
|
nest_vars <- op_vars(.data)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
group_vars <- union(op_grps(.data), setdiff(op_vars(.data), nest_vars))
|
|
|
|
|
nest_vars <- setdiff(nest_vars, group_vars)
|
|
|
|
|
dot_calls <- mapply(function(x) expr(make_list(!!as.name(x))), nest_vars)
|
|
|
|
|
|
|
|
|
|
if (is_empty(group_vars))
|
|
|
|
|
summarise(.data, !!! dot_calls)
|
|
|
|
|
else
|
|
|
|
|
summarise(group_by(.data, !! as.name(group_vars)), !!! dot_calls)
|
|
|
|
|
if (is_empty(group_vars)) {
|
|
|
|
|
summarise(.data, !!!dot_calls)
|
|
|
|
|
} else {
|
|
|
|
|
summarise(group_by(.data, !!as.name(group_vars)), !!!dot_calls)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' @export
|
|
|
|
|
head.tbl_kusto_abstract <- function(x, n = 6L, ...)
|
|
|
|
|
{
|
|
|
|
|
head.tbl_kusto_abstract <- function(x, n = 6L, ...) {
|
|
|
|
|
add_op_single("head", x, args = list(n = n))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' @export
|
|
|
|
|
slice_sample.tbl_kusto_abstract <- function(.data, ..., n = 6L, prop, by, weight_by, replace)
|
|
|
|
|
{
|
|
|
|
|
slice_sample.tbl_kusto_abstract <- function(.data, ..., n = 6L, prop, by, weight_by, replace) {
|
|
|
|
|
add_op_single("slice_sample", .data, args = list(n = n))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -194,12 +202,14 @@ slice_sample.tbl_kusto_abstract <- function(.data, ..., n = 6L, prop, by, weight
|
|
|
|
|
#'
|
|
|
|
|
#' @param x,y Kusto tbls.
|
|
|
|
|
#' @param by The columns to join on.
|
|
|
|
|
#' @param copy Needed for agreement with generic. Not otherwise used.
|
|
|
|
|
#' @param suffix The suffixes to use for deduplicating column names.
|
|
|
|
|
#' @param ... Other arguments passed to lower-level functions.
|
|
|
|
|
#' @param keep Needed for agreement with generic. Not otherwise used. Kusto retains keys from both sides of joins.
|
|
|
|
|
#' @param .strategy A join strategy hint to pass to Kusto. Currently the values supported are "shuffle" and "broadcast".
|
|
|
|
|
#' @param .shufflekeys A character vector of column names to use as shuffle keys.
|
|
|
|
|
#' @param .num_partitions The number of partitions for a shuffle query.
|
|
|
|
|
#' @param .remote A join strategy hint to use for cross-cluster joins. Can be "left", "right", "local" or "auto" (the default).
|
|
|
|
|
#' @param ... Other arguments passed to lower-level functions.
|
|
|
|
|
#' @seealso
|
|
|
|
|
#' [dplyr::join]
|
|
|
|
|
#'
|
|
|
|
@ -213,122 +223,119 @@ slice_sample.tbl_kusto_abstract <- function(.data, ..., n = 6L, prop, by, weight
|
|
|
|
|
#' left_join(tbl1, tbl2)
|
|
|
|
|
#'
|
|
|
|
|
#' # Kusto extensions:
|
|
|
|
|
#' left_join(tbl1, tbl2, .strategy="broadcast") # a broadcast join
|
|
|
|
|
#' left_join(tbl1, tbl2, .strategy = "broadcast") # a broadcast join
|
|
|
|
|
#'
|
|
|
|
|
#' left_join(tbl1, tbl2, .shufflekeys=c("var1", "var2")) # shuffle join with shuffle keys
|
|
|
|
|
#' left_join(tbl1, tbl2, .shufflekeys = c("var1", "var2")) # shuffle join with shuffle keys
|
|
|
|
|
#'
|
|
|
|
|
#' left_join(tbl1, tbl2, .num_partitions=5) # no. of partitions for a shuffle join
|
|
|
|
|
#' left_join(tbl1, tbl2, .num_partitions = 5) # no. of partitions for a shuffle join
|
|
|
|
|
#' }
|
|
|
|
|
#'
|
|
|
|
|
#' @aliases inner_join left_join right_join full_join semi_join anti_join
|
|
|
|
|
#'
|
|
|
|
|
#' @rdname join
|
|
|
|
|
#' @export
|
|
|
|
|
inner_join.tbl_kusto_abstract <- function(x, y, by = NULL, suffix = c(".x", ".y"),
|
|
|
|
|
.strategy = NULL, .shufflekeys = NULL, .num_partitions = NULL,
|
|
|
|
|
.remote = NULL, ...)
|
|
|
|
|
{
|
|
|
|
|
add_op_join("inner_join", x, y, by = by, suffix = suffix,
|
|
|
|
|
.strategy = .strategy, .shufflekeys = .shufflekeys, .num_partitions = .num_partitions,
|
|
|
|
|
.remote = .remote, ...)
|
|
|
|
|
inner_join.tbl_kusto_abstract <- function(x, y, by = NULL, copy = NULL, suffix = c(".x", ".y"), ...,
|
|
|
|
|
keep = NULL, .strategy = NULL, .shufflekeys = NULL,
|
|
|
|
|
.num_partitions = NULL, .remote = NULL) {
|
|
|
|
|
add_op_join("inner_join", x, y,
|
|
|
|
|
by = by, suffix = suffix,
|
|
|
|
|
.strategy = .strategy, .shufflekeys = .shufflekeys, .num_partitions = .num_partitions,
|
|
|
|
|
.remote = .remote, ...
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' @rdname join
|
|
|
|
|
#' @export
|
|
|
|
|
left_join.tbl_kusto_abstract <- function(x, y, by = NULL, suffix = c(".x", ".y"),
|
|
|
|
|
left_join.tbl_kusto_abstract <- function(x, y, by = NULL, copy = NULL, suffix = c(".x", ".y"), ...,
|
|
|
|
|
keep = NULL, .strategy = NULL, .shufflekeys = NULL,
|
|
|
|
|
.num_partitions = NULL, .remote = NULL) {
|
|
|
|
|
add_op_join("left_join", x, y,
|
|
|
|
|
by = by, suffix = suffix,
|
|
|
|
|
.strategy = .strategy, .shufflekeys = .shufflekeys, .num_partitions = .num_partitions,
|
|
|
|
|
.remote = .remote, ...
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' @rdname join
|
|
|
|
|
#' @export
|
|
|
|
|
right_join.tbl_kusto_abstract <- function(x, y, by = NULL, copy = NULL, suffix = c(".x", ".y"), ...,
|
|
|
|
|
keep = NULL, .strategy = NULL, .shufflekeys = NULL,
|
|
|
|
|
.num_partitions = NULL, .remote = NULL) {
|
|
|
|
|
add_op_join("right_join", x, y,
|
|
|
|
|
by = by, suffix = suffix,
|
|
|
|
|
.strategy = .strategy, .shufflekeys = .shufflekeys, .num_partitions = .num_partitions,
|
|
|
|
|
.remote = .remote, ...
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' @rdname join
|
|
|
|
|
#' @export
|
|
|
|
|
full_join.tbl_kusto_abstract <- function(x, y, by = NULL, copy = NULL, suffix = c(".x", ".y"), ...,
|
|
|
|
|
keep = NULL, .strategy = NULL, .shufflekeys = NULL,
|
|
|
|
|
.num_partitions = NULL, .remote = NULL) {
|
|
|
|
|
add_op_join("full_join", x, y,
|
|
|
|
|
by = by, suffix = suffix,
|
|
|
|
|
.strategy = .strategy, .shufflekeys = .shufflekeys, .num_partitions = .num_partitions,
|
|
|
|
|
.remote = .remote, ...
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' @rdname join
|
|
|
|
|
#' @export
|
|
|
|
|
semi_join.tbl_kusto_abstract <- function(x, y, by = NULL, copy = NULL, ..., suffix = c(".x", ".y"),
|
|
|
|
|
.strategy = NULL, .shufflekeys = NULL, .num_partitions = NULL,
|
|
|
|
|
.remote = NULL, ...)
|
|
|
|
|
{
|
|
|
|
|
add_op_join("left_join", x, y, by = by, suffix = suffix,
|
|
|
|
|
.strategy = .strategy, .shufflekeys = .shufflekeys, .num_partitions = .num_partitions,
|
|
|
|
|
.remote = .remote, ...)
|
|
|
|
|
.remote = NULL) {
|
|
|
|
|
add_op_join("semi_join", x, y,
|
|
|
|
|
by = by, suffix = suffix,
|
|
|
|
|
.strategy = .strategy, .shufflekeys = .shufflekeys, .num_partitions = .num_partitions,
|
|
|
|
|
.remote = .remote, ...
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' @rdname join
|
|
|
|
|
#' @export
|
|
|
|
|
right_join.tbl_kusto_abstract <- function(x, y, by = NULL, suffix = c(".x", ".y"),
|
|
|
|
|
.strategy = NULL, .shufflekeys = NULL, .num_partitions = NULL,
|
|
|
|
|
.remote = NULL, ...)
|
|
|
|
|
{
|
|
|
|
|
add_op_join("right_join", x, y, by = by, suffix = suffix,
|
|
|
|
|
.strategy = .strategy, .shufflekeys = .shufflekeys, .num_partitions = .num_partitions,
|
|
|
|
|
.remote = .remote, ...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' @rdname join
|
|
|
|
|
#' @export
|
|
|
|
|
full_join.tbl_kusto_abstract <- function(x, y, by = NULL, suffix = c(".x", ".y"),
|
|
|
|
|
anti_join.tbl_kusto_abstract <- function(x, y, by = NULL, copy = NULL, suffix = c(".x", ".y"),
|
|
|
|
|
.strategy = NULL, .shufflekeys = NULL, .num_partitions = NULL,
|
|
|
|
|
.remote = NULL, ...)
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
add_op_join("full_join", x, y, by = by, suffix = suffix,
|
|
|
|
|
.strategy = .strategy, .shufflekeys = .shufflekeys, .num_partitions = .num_partitions,
|
|
|
|
|
.remote = .remote, ...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' @rdname join
|
|
|
|
|
#' @export
|
|
|
|
|
semi_join.tbl_kusto_abstract <- function(x, y, by = NULL, suffix = c(".x", ".y"),
|
|
|
|
|
.strategy = NULL, .shufflekeys = NULL, .num_partitions = NULL,
|
|
|
|
|
.remote = NULL, ...)
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
add_op_join("semi_join", x, y, by = by, suffix = suffix,
|
|
|
|
|
.strategy = .strategy, .shufflekeys = .shufflekeys, .num_partitions = .num_partitions,
|
|
|
|
|
.remote = .remote, ...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' @rdname join
|
|
|
|
|
#' @export
|
|
|
|
|
anti_join.tbl_kusto_abstract <- function(x, y, by = NULL, suffix = c(".x", ".y"),
|
|
|
|
|
.strategy = NULL, .shufflekeys = NULL, .num_partitions = NULL,
|
|
|
|
|
.remote = NULL, ...)
|
|
|
|
|
{
|
|
|
|
|
add_op_join("anti_join", x, y, by = by, suffix = suffix,
|
|
|
|
|
.strategy = .strategy, .shufflekeys = .shufflekeys, .num_partitions = .num_partitions,
|
|
|
|
|
.remote = .remote, ...)
|
|
|
|
|
.remote = NULL, ...) {
|
|
|
|
|
add_op_join("anti_join", x, y,
|
|
|
|
|
by = by, suffix = suffix,
|
|
|
|
|
.strategy = .strategy, .shufflekeys = .shufflekeys, .num_partitions = .num_partitions,
|
|
|
|
|
.remote = .remote, ...
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' @export
|
|
|
|
|
union_all.tbl_kusto_abstract <- function(x, y, ...)
|
|
|
|
|
{
|
|
|
|
|
union_all.tbl_kusto_abstract <- function(x, y, ...) {
|
|
|
|
|
add_op_set_op(x, y, "union_all")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' @export
|
|
|
|
|
union.tbl_kusto_abstract <- function(x, y, ...)
|
|
|
|
|
{
|
|
|
|
|
union.tbl_kusto_abstract <- function(x, y, ...) {
|
|
|
|
|
stop("Kusto does not support union(). Please use union_all() instead.")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' @export
|
|
|
|
|
setdiff.tbl_kusto_abstract <- function(x, y, ...)
|
|
|
|
|
{
|
|
|
|
|
setdiff.tbl_kusto_abstract <- function(x, y, ...) {
|
|
|
|
|
stop("Kusto does not support setdiff() at this time.")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' @export
|
|
|
|
|
setequal.tbl_kusto_abstract <- function(x, y, ...)
|
|
|
|
|
{
|
|
|
|
|
setequal.tbl_kusto_abstract <- function(x, y, ...) {
|
|
|
|
|
stop("Kusto does not support setequal() at this time.")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' @export
|
|
|
|
|
intersect.tbl_kusto_abstract <- function(x, y, ...)
|
|
|
|
|
{
|
|
|
|
|
intersect.tbl_kusto_abstract <- function(x, y, ...) {
|
|
|
|
|
stop("Kusto does not support intersect() at this time.")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' @export
|
|
|
|
|
tbl_vars.tbl_kusto_abstract <- function(x)
|
|
|
|
|
{
|
|
|
|
|
tbl_vars.tbl_kusto_abstract <- function(x) {
|
|
|
|
|
op_vars(x$ops)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' @export
|
|
|
|
|
group_vars.tbl_kusto_abstract <- function(x)
|
|
|
|
|
{
|
|
|
|
|
group_vars.tbl_kusto_abstract <- function(x) {
|
|
|
|
|
op_grps(x$ops)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -336,8 +343,7 @@ group_vars.tbl_kusto_abstract <- function(x)
|
|
|
|
|
#' @export
|
|
|
|
|
#' @param x A tbl_kusto or tbl_kusto_abstract instance
|
|
|
|
|
#' @param ... needed for agreement with generic. Not otherwise used.
|
|
|
|
|
show_query.tbl_kusto_abstract <- function(x, ...)
|
|
|
|
|
{
|
|
|
|
|
show_query.tbl_kusto_abstract <- function(x, ...) {
|
|
|
|
|
qry <- kql_build(x)
|
|
|
|
|
kql_render(qry)
|
|
|
|
|
}
|
|
|
|
@ -347,14 +353,13 @@ show_query.tbl_kusto_abstract <- function(x, ...)
|
|
|
|
|
#' @param kusto_database An instance of kusto_database_endpoint that this table should be queried from
|
|
|
|
|
#' @param table_name The name of the table in the Kusto database
|
|
|
|
|
#' @param ... parameters to pass in case the Kusto source table is a parameterized function.
|
|
|
|
|
tbl_kusto <- function(kusto_database, table_name, ...)
|
|
|
|
|
{
|
|
|
|
|
tbl_kusto <- function(kusto_database, table_name, ...) {
|
|
|
|
|
stopifnot(inherits(kusto_database, "kusto_database_endpoint"))
|
|
|
|
|
params <- list(...)
|
|
|
|
|
#in case the table name is a function like MyFunction(arg1, arg2) we need to split it
|
|
|
|
|
table_ident <- strsplit(table_name, split="\\(")[[1]]
|
|
|
|
|
# in case the table name is a function like MyFunction(arg1, arg2) we need to split it
|
|
|
|
|
table_ident <- strsplit(table_name, split = "\\(")[[1]]
|
|
|
|
|
table_ident[1] <- escape(ident(table_ident[1]))
|
|
|
|
|
escaped_table_name <- paste(table_ident, collapse="(")
|
|
|
|
|
escaped_table_name <- paste(table_ident, collapse = "(")
|
|
|
|
|
kusto_database$table <- escaped_table_name
|
|
|
|
|
query_str <- sprintf("%s | take 1", escaped_table_name)
|
|
|
|
|
vars <- names(run_query(kusto_database, query_str, ...))
|
|
|
|
@ -367,8 +372,7 @@ tbl_kusto <- function(kusto_database, table_name, ...)
|
|
|
|
|
#' @export
|
|
|
|
|
#' @param x An instance of class tbl_kusto representing a Kusto table
|
|
|
|
|
#' @param ... needed for agreement with generic. Not otherwise used.
|
|
|
|
|
collect.tbl_kusto <- function(x, ...)
|
|
|
|
|
{
|
|
|
|
|
collect.tbl_kusto <- function(x, ...) {
|
|
|
|
|
q <- kql_build(x)
|
|
|
|
|
q_str <- kql_render(q)
|
|
|
|
|
params <- c(x$params, list(...))
|
|
|
|
@ -379,27 +383,26 @@ collect.tbl_kusto <- function(x, ...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
generate_table_name <- function() {
|
|
|
|
|
paste0("Rtbl_", paste0(sample(letters, 8), collapse=""))
|
|
|
|
|
paste0("Rtbl_", paste0(sample(letters, 8), collapse = ""))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' Execute the query, store the results in a table, and return a reference to the new table
|
|
|
|
|
#' @export
|
|
|
|
|
#' @param tbl An instance of class tbl_kusto representing a Kusto table
|
|
|
|
|
#' @param x An instance of class tbl_kusto representing a Kusto table
|
|
|
|
|
#' @param ... other parameters passed to the query
|
|
|
|
|
#' @param name The name for the Kusto table to be created.
|
|
|
|
|
#' If name is omitted, the table will be named Rtbl_ + 8 random lowercase letters
|
|
|
|
|
compute.tbl_kusto <- function(tbl, ..., name = generate_table_name())
|
|
|
|
|
{
|
|
|
|
|
q <- kql_build(tbl)
|
|
|
|
|
compute.tbl_kusto <- function(x, ..., name = generate_table_name()) {
|
|
|
|
|
q <- kql_build(x)
|
|
|
|
|
q_str <- kql_render(q)
|
|
|
|
|
new_tbl_name <- kql_escape_ident(name)
|
|
|
|
|
set_cmd <- kql(paste0(".set ", new_tbl_name, " <|\n"))
|
|
|
|
|
q_str <- kql(paste0(set_cmd, q_str))
|
|
|
|
|
params <- c(tbl$params, list(...))
|
|
|
|
|
params$database <- tbl$src
|
|
|
|
|
params <- c(x$params, list(...))
|
|
|
|
|
params$database <- x$src
|
|
|
|
|
params$qry_cmd <- q_str
|
|
|
|
|
res <- do.call(run_query, params)
|
|
|
|
|
invisible(tbl_kusto(tbl$src, name))
|
|
|
|
|
invisible(tbl_kusto(x$src, name))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' Execute the query, store the results in a table, and return a reference to the new table
|
|
|
|
@ -415,13 +418,16 @@ compute.tbl_kusto <- function(tbl, ..., name = generate_table_name())
|
|
|
|
|
#' @param distributed logical, indicates whether Kusto should distributed the
|
|
|
|
|
#' export job to multiple nodes, in which case multiple files will be written
|
|
|
|
|
#' to storage concurrently.
|
|
|
|
|
kusto_export_cmd <- function(query, storage_uri, name_prefix, key, format,
|
|
|
|
|
kusto_export_cmd <- function(
|
|
|
|
|
query, storage_uri, name_prefix, key, format,
|
|
|
|
|
distributed) {
|
|
|
|
|
# Make sure the storage uri ends with a slash
|
|
|
|
|
if (!(format %in% c("parquet", "csv", "tsv", "json")))
|
|
|
|
|
if (!(format %in% c("parquet", "csv", "tsv", "json"))) {
|
|
|
|
|
stop("Format must be one of parquet, csv, tsv, or json.")
|
|
|
|
|
if (!endsWith(storage_uri, "/"))
|
|
|
|
|
}
|
|
|
|
|
if (!endsWith(storage_uri, "/")) {
|
|
|
|
|
storage_uri <- paste0(storage_uri, "/")
|
|
|
|
|
}
|
|
|
|
|
distr <- ifelse(distributed, "true", "false")
|
|
|
|
|
compr <- ifelse(format == "parquet", "snappy", "gzip")
|
|
|
|
|
sprintf(".export
|
|
|
|
@ -455,7 +461,8 @@ distributed=%s
|
|
|
|
|
#' @param ... needed for agreement with generic. Not otherwise used.
|
|
|
|
|
#' @rdname export
|
|
|
|
|
#' @export
|
|
|
|
|
export <- function(tbl, storage_uri, query = NULL, name_prefix = "export",
|
|
|
|
|
export <- function(
|
|
|
|
|
tbl, storage_uri, query = NULL, name_prefix = "export",
|
|
|
|
|
key = "impersonate", format = "parquet", distributed = FALSE, ...) {
|
|
|
|
|
UseMethod("export")
|
|
|
|
|
}
|
|
|
|
@ -474,59 +481,58 @@ export <- function(tbl, storage_uri, query = NULL, name_prefix = "export",
|
|
|
|
|
#' @param ... needed for agreement with generic. Not otherwise used.
|
|
|
|
|
#' @rdname export
|
|
|
|
|
#' @export
|
|
|
|
|
export.kusto_database_endpoint <- function(tbl, storage_uri, query = NULL, name_prefix = "export",
|
|
|
|
|
export.kusto_database_endpoint <- function(
|
|
|
|
|
tbl, storage_uri, query = NULL, name_prefix = "export",
|
|
|
|
|
key = "impersonate", format = "parquet", distributed = FALSE, ...) {
|
|
|
|
|
if (missing(query)) stop("query parameter is required.")
|
|
|
|
|
is_cmd <- substr(query, 1, 1) == "."
|
|
|
|
|
if (is_cmd) stop("Management commands cannot be used with export()")
|
|
|
|
|
q_str <- kusto_export_cmd(query = query, storage_uri = storage_uri,
|
|
|
|
|
q_str <- kusto_export_cmd(
|
|
|
|
|
query = query, storage_uri = storage_uri,
|
|
|
|
|
name_prefix = name_prefix, key = key, format = format,
|
|
|
|
|
distributed = distributed)
|
|
|
|
|
distributed = distributed
|
|
|
|
|
)
|
|
|
|
|
run_query(tbl, q_str, ...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' @rdname export
|
|
|
|
|
#' @export
|
|
|
|
|
export.tbl_kusto <- function(tbl, storage_uri, query = NULL, name_prefix = "export",
|
|
|
|
|
export.tbl_kusto <- function(
|
|
|
|
|
tbl, storage_uri, query = NULL, name_prefix = "export",
|
|
|
|
|
key = "impersonate", format = "parquet", distributed = FALSE, ...) {
|
|
|
|
|
database <- tbl$src
|
|
|
|
|
q <- kql_render(kql_build(tbl))
|
|
|
|
|
q_str <- kusto_export_cmd(query = q, storage_uri = storage_uri,
|
|
|
|
|
q_str <- kusto_export_cmd(
|
|
|
|
|
query = q, storage_uri = storage_uri,
|
|
|
|
|
name_prefix = name_prefix, key = key, format = format,
|
|
|
|
|
distributed = distributed)
|
|
|
|
|
distributed = distributed
|
|
|
|
|
)
|
|
|
|
|
res <- run_query(database, q_str, ...)
|
|
|
|
|
tibble::as_tibble(res)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#' @keywords internal
|
|
|
|
|
#' @export
|
|
|
|
|
print.tbl_kusto_abstract <- function(x, ...)
|
|
|
|
|
{
|
|
|
|
|
print.tbl_kusto_abstract <- function(x, ...) {
|
|
|
|
|
# different paths if this is a query, simulated table, or real table
|
|
|
|
|
if(!inherits(x$ops, "op_base"))
|
|
|
|
|
{
|
|
|
|
|
if (!inherits(x$ops, "op_base")) {
|
|
|
|
|
cat("<Kusto query>\n")
|
|
|
|
|
print(show_query(x))
|
|
|
|
|
}
|
|
|
|
|
else if(!inherits(x, "tbl_kusto"))
|
|
|
|
|
{
|
|
|
|
|
} else if (!inherits(x, "tbl_kusto")) {
|
|
|
|
|
cat("<Simulated Kusto table '")
|
|
|
|
|
name <- paste0("local_df/", x$src$table)
|
|
|
|
|
cat(name, "'>\n", sep="")
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
cat(name, "'>\n", sep = "")
|
|
|
|
|
} else {
|
|
|
|
|
cat("<Kusto table '")
|
|
|
|
|
url <- httr::parse_url(x$src$server)
|
|
|
|
|
url$path <- file.path(x$src$database, x$src$table)
|
|
|
|
|
cat(httr::build_url(url), "'>\n", sep="")
|
|
|
|
|
cat(httr::build_url(url), "'>\n", sep = "")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
invisible(x)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
add_params_to_quosure <- function(quosure, params)
|
|
|
|
|
{
|
|
|
|
|
new_env <- list2env(params, envir = get_env(quosure))
|
|
|
|
|
quo_set_env(quosure, new_env)
|
|
|
|
|
add_params_to_quosure <- function(quosure, params) {
|
|
|
|
|
new_env <- list2env(params, envir = get_env(quosure))
|
|
|
|
|
quo_set_env(quosure, new_env)
|
|
|
|
|
}
|
|
|
|
|