зеркало из https://github.com/Azure/AzureKusto.git
312 строки
7.8 KiB
R
312 строки
7.8 KiB
R
#' The "base case" operation representing the tbl itself and its column variables
|
|
#' @export
|
|
#' @param x A tbl object
|
|
#' @param vars A vector of column variables in the tbl
|
|
#' @param class The class that op_base should inherit from, default is character()
|
|
op_base <- function(x, vars, class = character())
|
|
{
|
|
stopifnot(is.character(vars))
|
|
|
|
structure(
|
|
list(
|
|
x = x,
|
|
vars = vars
|
|
),
|
|
class = c(paste0("op_base_", class), "op_base", "op")
|
|
)
|
|
}
|
|
|
|
op_base_local <- function(df)
|
|
{
|
|
op_base(df, names(df), class = "local")
|
|
}
|
|
|
|
op_base_remote <- function(x, vars)
|
|
{
|
|
op_base(x, vars, class = "remote")
|
|
}
|
|
|
|
#' A class representing a single-table verb
|
|
#' @export
|
|
#' @param name the name of the operation verb, e.g. "select", "filter"
|
|
#' @param x the tbl object
|
|
#' @param dots expressions passed to the operation verb function
|
|
#' @param args other arguments passed to the operation verb function
|
|
op_single <- function(name, x, dots = list(), args = list())
|
|
{
|
|
structure(
|
|
list(
|
|
name = name,
|
|
x = x,
|
|
dots = dots,
|
|
args = args
|
|
),
|
|
class = c(paste0("op_", name), "op_single", "op")
|
|
)
|
|
}
|
|
|
|
#' Append an operation representing a single-table verb to the tbl_kusto object's ops list
|
|
#' @export
|
|
#' @param name The name of the operation, e.g. 'select', 'filter'
|
|
#' @param .data The tbl_kusto object to append the operation to
|
|
#' @param dots The expressions passed as arguments to the operation verb
|
|
#' @param args Other non-expression arguments passed to the operation verb
|
|
add_op_single <- function(name, .data, dots = list(), args = list())
|
|
{
|
|
.data$ops <- op_single(name, x = .data$ops, dots = dots, args = args)
|
|
.data
|
|
}
|
|
|
|
#' A double-table verb, e.g. joins, setops
|
|
#' @export
|
|
#' @param name The name of the operation, e.g. 'left_join', 'union_all'
|
|
#' @param x The "left" tbl
|
|
#' @param y The "right" tbl
|
|
#' @param args Other arguments passed to the operation verb
|
|
op_double <- function(name, x, y, args = list())
|
|
{
|
|
structure(
|
|
list(
|
|
name = name,
|
|
x = x,
|
|
y = y,
|
|
args = args
|
|
),
|
|
class = c(paste0("op_", name), "op_double", "op")
|
|
)
|
|
}
|
|
|
|
#' Append a join operation to the tbl_kusto object's ops list
|
|
#' @export
|
|
#' @param type The name of the join type,
|
|
#' one of: inner_join, left_join, right_join, full_join, semi_join, anti_join
|
|
#' @param x The "left" tbl
|
|
#' @param y The "right" tbl
|
|
#' @param by A vector of column names; keys by which tbl x and tbl y will be joined
|
|
#' @param suffix A vector of strings that will be appended to the names of non-join key columns that exist in both tbl x and tbl y to distinguish them by source tbl.
|
|
#' @param .strategy A strategy hint to provide to Kusto.
|
|
#' @param .shufflekeys A character vector of column names to shuffle on, if `.strategy = "shuffle"`.
|
|
#' @param .remote A strategy hint to provide to Kusto for cross-cluster joins.
|
|
#' @param .num_partitions The number of partitions for a shuffle query.
|
|
add_op_join <- function(type, x, y, by = NULL, suffix = NULL,
|
|
.strategy = NULL, .shufflekeys = NULL, .num_partitions = NULL, .remote = NULL)
|
|
{
|
|
by <- common_by(by, x, y)
|
|
vars <- join_vars(op_vars(x), op_vars(y), type = type, by = by, suffix = suffix)
|
|
x$ops <- op_double("join", x, y,
|
|
args = list(
|
|
vars = vars,
|
|
type = type,
|
|
by = by,
|
|
suffix = suffix,
|
|
.strategy = .strategy,
|
|
.shufflekeys = .shufflekeys,
|
|
.num_partitions = .num_partitions,
|
|
.remote = .remote
|
|
))
|
|
x
|
|
}
|
|
|
|
#' Append a set operation to the tbl_kusto object's ops list
|
|
#' @export
|
|
#' @param x The "left" tbl
|
|
#' @param y The "right" tbl
|
|
#' @param type The type of set operation to perform, currently only supports union_all
|
|
add_op_set_op <- function(x, y, type)
|
|
{
|
|
x$ops <- op_double("set_op", x, y, args = list(type = type))
|
|
x
|
|
}
|
|
|
|
join_vars <- function(x_names, y_names, type, by, suffix = c(".x", ".y"))
|
|
{
|
|
# Remove join keys from y's names
|
|
y_names <- setdiff(y_names, by$y)
|
|
|
|
if(!is.character(suffix) || length(suffix) != 2)
|
|
stop("`suffix` must be a character vector of length 2.", call. = FALSE)
|
|
|
|
suffix <- list(x = suffix[1], y = suffix[2])
|
|
x_new <- add_suffixes(x_names, y_names, suffix$x)
|
|
y_new <- add_suffixes(y_names, x_names, suffix$y)
|
|
|
|
# In left and inner joins, return key values only from x
|
|
# In right joins, return key values only from y
|
|
# In full joins, return key values by coalescing values from x and y
|
|
x_x <- x_names
|
|
x_y <- by$y[match(x_names, by$x)]
|
|
x_y[type == "left_join" | type == "inner_join"] <- NA
|
|
x_x[type == "right_join" & !is.na(x_y)] <- NA
|
|
y_x <- rep_len(NA, length(y_names))
|
|
y_y <- y_names
|
|
|
|
# Return a list with 3 parallel vectors
|
|
# At each position, values in the 3 vectors represent
|
|
# alias - name of column in join result
|
|
# x - name of column from left table or NA if only from right table
|
|
# y - name of column from right table or NA if only from left table
|
|
list(alias = c(x_new, y_new), x = c(x_x, y_x), y = c(x_y, y_y))
|
|
}
|
|
|
|
add_suffixes <- function(x, y, suffix)
|
|
{
|
|
if (identical(suffix, "")) return(x)
|
|
|
|
out <- rep_len(na_chr, length(x))
|
|
for (i in seq_along(x))
|
|
{
|
|
nm <- x[[i]]
|
|
while (nm %in% y || nm %in% out)
|
|
nm <- paste0(nm, suffix)
|
|
|
|
out[[i]] <- nm
|
|
}
|
|
out
|
|
}
|
|
|
|
#' Look up the applicable grouping variables for an operation
|
|
#' based on the data source and preceding sequence of operations
|
|
#' @param op An operation instance
|
|
#' @export
|
|
op_grps <- function(op) UseMethod("op_grps")
|
|
|
|
#' @export
|
|
op_grps.op_base <- function(op) character()
|
|
|
|
#' @export
|
|
op_grps.op_group_by <- function(op)
|
|
{
|
|
if (isTRUE(op$args$add))
|
|
union(op_grps(op$x), names(op$dots))
|
|
else
|
|
names(op$dots)
|
|
}
|
|
|
|
#' @export
|
|
op_grps.op_ungroup <- function(op)
|
|
{
|
|
character()
|
|
}
|
|
|
|
#' @export
|
|
op_grps.op_summarise <- function(op)
|
|
{
|
|
grps <- op_grps(op$x)
|
|
}
|
|
|
|
#' @export
|
|
op_grps.op_rename <- function(op)
|
|
{
|
|
names(tidyselect::vars_rename(op_grps(op$x), !!! op$dots, .strict = FALSE))
|
|
}
|
|
|
|
#' @export
|
|
op_grps.op_single <- function(op)
|
|
{
|
|
op_grps(op$x)
|
|
}
|
|
#' @export
|
|
op_grps.op_double <- function(op)
|
|
{
|
|
op_grps(op$x)
|
|
}
|
|
|
|
#' @export
|
|
op_grps.tbl_kusto_abstract <- function(op)
|
|
{
|
|
op_grps(op$ops)
|
|
}
|
|
|
|
#' @export
|
|
op_grps.tbl_df <- function(op)
|
|
{
|
|
character()
|
|
}
|
|
|
|
#' Look up the applicable variables in scope for a given operation
|
|
#' based on the data source and preceding sequence of operations
|
|
#' @param op An operation instance
|
|
#' @export
|
|
op_vars <- function(op) UseMethod("op_vars")
|
|
|
|
#' @export
|
|
op_vars.op_base <- function(op)
|
|
{
|
|
op$vars
|
|
}
|
|
|
|
#' @export
|
|
op_vars.op_select <- function(op)
|
|
{
|
|
names(tidyselect::vars_select(op_vars(op$x), !!! op$dots, .include = op_grps(op$x)))
|
|
}
|
|
|
|
#' @export
|
|
op_vars.op_rename <- function(op)
|
|
{
|
|
names(tidyselect::vars_rename(op_vars(op$x), !!! op$dots))
|
|
}
|
|
|
|
#' @export
|
|
op_vars.op_summarise <- function(op)
|
|
{
|
|
c(op_grps(op$x), names(op$dots))
|
|
}
|
|
|
|
#' @export
|
|
op_vars.op_distinct <- function(op)
|
|
{
|
|
if (is_empty(op$dots))
|
|
op_vars(op$x)
|
|
else
|
|
unique(c(op_vars(op$x), names(op$dots)))
|
|
}
|
|
|
|
#' @export
|
|
op_vars.op_mutate <- function(op)
|
|
{
|
|
unique(c(op_vars(op$x), names(op$dots)))
|
|
}
|
|
|
|
#' @export
|
|
op_vars.op_single <- function(op)
|
|
{
|
|
op_vars(op$x)
|
|
}
|
|
|
|
#' @export
|
|
op_vars.op_join <- function(op)
|
|
{
|
|
op$args$vars$alias
|
|
}
|
|
|
|
#' @export
|
|
op_vars.op_join <- function(op)
|
|
{
|
|
op$args$vars$alias
|
|
}
|
|
|
|
#' @export
|
|
op_vars.op_semi_join <- function(op)
|
|
{
|
|
op_vars(op$x)
|
|
}
|
|
|
|
#' @export
|
|
op_vars.op_set_op <- function(op)
|
|
{
|
|
union(op_vars(op$x), op_vars(op$y))
|
|
}
|
|
|
|
#' @export
|
|
op_vars.tbl_kusto_abstract <- function(op)
|
|
{
|
|
op_vars(op$ops)
|
|
}
|
|
|
|
#' @export
|
|
op_vars.tbl_df <- function(op)
|
|
{
|
|
names(op)
|
|
}
|