2019-01-24 04:11:59 +03:00
#' The "base case" operation representing the tbl itself and its column variables
2019-01-02 06:17:00 +03:00
#' @export
2019-01-24 04:11:59 +03:00
#' @param x A tbl object
#' @param vars A vector of column variables in the tbl
2019-01-29 06:03:34 +03:00
#' @param class The class that op_base should inherit from, default is character()
2019-01-14 19:50:15 +03:00
op_base <- function ( x , vars , class = character ( ) )
2019-01-02 06:17:00 +03:00
{
2019-01-14 19:50:15 +03:00
stopifnot ( is.character ( vars ) )
structure (
list (
x = x ,
vars = vars
) ,
class = c ( paste0 ( " op_base_" , class ) , " op_base" , " op" )
)
2019-01-02 06:17:00 +03:00
}
2019-01-14 19:50:15 +03:00
op_base_local <- function ( df )
2019-01-02 06:17:00 +03:00
{
2019-01-14 19:50:15 +03:00
op_base ( df , names ( df ) , class = " local" )
2019-01-02 06:17:00 +03:00
}
2019-01-14 19:50:15 +03:00
op_base_remote <- function ( x , vars )
2019-01-03 01:58:28 +03:00
{
2019-01-14 19:50:15 +03:00
op_base ( x , vars , class = " remote" )
2019-01-03 01:58:28 +03:00
}
2019-01-24 04:11:59 +03:00
#' A class representing a single-table verb
2019-01-14 19:50:15 +03:00
#' @export
2019-01-24 04:11:59 +03:00
#' @param name the name of the operation verb, e.g. "select", "filter"
#' @param x the tbl object
#' @param dots expressions passed to the operation verb function
#' @param args other arguments passed to the operation verb function
2019-01-14 19:50:15 +03:00
op_single <- function ( name , x , dots = list ( ) , args = list ( ) )
2019-01-03 07:43:08 +03:00
{
2019-01-14 19:50:15 +03:00
structure (
list (
name = name ,
x = x ,
dots = dots ,
args = args
) ,
class = c ( paste0 ( " op_" , name ) , " op_single" , " op" )
)
2019-01-03 07:43:08 +03:00
}
2019-01-24 04:11:59 +03:00
#' Append an operation representing a single-table verb to the tbl_kusto object's ops list
2019-01-02 06:17:00 +03:00
#' @export
2019-01-24 04:11:59 +03:00
#' @param name The name of the operation, e.g. 'select', 'filter'
#' @param .data The tbl_kusto object to append the operation to
#' @param dots The expressions passed as arguments to the operation verb
#' @param args Other non-expression arguments passed to the operation verb
2019-01-14 19:50:15 +03:00
add_op_single <- function ( name , .data , dots = list ( ) , args = list ( ) )
2019-01-03 01:58:28 +03:00
{
2019-01-14 19:50:15 +03:00
.data $ ops <- op_single ( name , x = .data $ ops , dots = dots , args = args )
.data
2019-01-02 06:17:00 +03:00
}
2019-01-24 04:11:59 +03:00
#' A double-table verb, e.g. joins, setops
2019-01-02 06:17:00 +03:00
#' @export
2019-01-24 04:11:59 +03:00
#' @param name The name of the operation, e.g. 'left_join', 'union_all'
#' @param x The "left" tbl
#' @param y The "right" tbl
#' @param args Other arguments passed to the operation verb
2019-01-14 19:50:15 +03:00
op_double <- function ( name , x , y , args = list ( ) )
2019-01-02 10:10:17 +03:00
{
2019-01-14 19:50:15 +03:00
structure (
list (
name = name ,
x = x ,
y = y ,
args = args
) ,
class = c ( paste0 ( " op_" , name ) , " op_double" , " op" )
)
2019-01-02 06:17:00 +03:00
}
2019-01-24 04:11:59 +03:00
#' Append a join operation to the tbl_kusto object's ops list
2019-01-03 01:58:28 +03:00
#' @export
2019-01-24 04:11:59 +03:00
#' @param type The name of the join type,
#' one of: inner_join, left_join, right_join, full_join, semi_join, anti_join
#' @param x The "left" tbl
#' @param y The "right" tbl
#' @param by A vector of column names; keys by which tbl x and tbl y will be joined
#' @param suffix A vector of strings that will be appended to the names of non-join key columns that exist in both tbl x and tbl y to distinguish them by source tbl.
2019-03-05 21:34:29 +03:00
#' @param .strategy A strategy hint to provide to Kusto.
#' @param .shufflekeys A character vector of column names to shuffle on, if `.strategy = "shuffle"`.
2019-03-21 18:02:03 +03:00
#' @param .remote A strategy hint to provide to Kusto for cross-cluster joins.
2019-03-05 21:34:29 +03:00
#' @param .num_partitions The number of partitions for a shuffle query.
add_op_join <- function ( type , x , y , by = NULL , suffix = NULL ,
2019-03-21 18:02:03 +03:00
.strategy = NULL , .shufflekeys = NULL , .num_partitions = NULL , .remote = NULL )
2019-01-03 01:58:28 +03:00
{
2019-01-14 19:50:15 +03:00
by <- common_by ( by , x , y )
vars <- join_vars ( op_vars ( x ) , op_vars ( y ) , type = type , by = by , suffix = suffix )
x $ ops <- op_double ( " join" , x , y ,
args = list (
vars = vars ,
type = type ,
by = by ,
2019-03-05 21:34:29 +03:00
suffix = suffix ,
.strategy = .strategy ,
.shufflekeys = .shufflekeys ,
2019-03-21 18:02:03 +03:00
.num_partitions = .num_partitions ,
.remote = .remote
2019-01-14 19:50:15 +03:00
) )
x
2019-01-03 01:58:28 +03:00
}
2019-01-29 06:03:34 +03:00
#' Append a set operation to the tbl_kusto object's ops list
#' @export
#' @param x The "left" tbl
#' @param y The "right" tbl
#' @param type The type of set operation to perform, currently only supports union_all
2019-01-24 04:11:59 +03:00
add_op_set_op <- function ( x , y , type )
2019-01-14 19:50:15 +03:00
{
x $ ops <- op_double ( " set_op" , x , y , args = list ( type = type ) )
x
}
join_vars <- function ( x_names , y_names , type , by , suffix = c ( " .x" , " .y" ) )
{
# Remove join keys from y's names
y_names <- setdiff ( y_names , by $ y )
if ( ! is.character ( suffix ) || length ( suffix ) != 2 )
stop ( " `suffix` must be a character vector of length 2." , call. = FALSE )
suffix <- list ( x = suffix [1 ] , y = suffix [2 ] )
x_new <- add_suffixes ( x_names , y_names , suffix $ x )
y_new <- add_suffixes ( y_names , x_names , suffix $ y )
# In left and inner joins, return key values only from x
# In right joins, return key values only from y
# In full joins, return key values by coalescing values from x and y
x_x <- x_names
x_y <- by $ y [match ( x_names , by $ x ) ]
x_y [type == " left_join" | type == " inner_join" ] <- NA
x_x [type == " right_join" & ! is.na ( x_y ) ] <- NA
y_x <- rep_len ( NA , length ( y_names ) )
y_y <- y_names
# Return a list with 3 parallel vectors
# At each position, values in the 3 vectors represent
# alias - name of column in join result
# x - name of column from left table or NA if only from right table
# y - name of column from right table or NA if only from left table
list ( alias = c ( x_new , y_new ) , x = c ( x_x , y_x ) , y = c ( x_y , y_y ) )
}
add_suffixes <- function ( x , y , suffix )
2019-01-03 01:58:28 +03:00
{
2019-01-14 19:50:15 +03:00
if ( identical ( suffix , " " ) ) return ( x )
2019-08-31 03:25:32 +03:00
out <- rep_len ( na_chr , length ( x ) )
2019-01-14 19:50:15 +03:00
for ( i in seq_along ( x ) )
2019-01-03 01:58:28 +03:00
{
2019-01-14 19:50:15 +03:00
nm <- x [ [i ] ]
while ( nm %in% y || nm %in% out )
nm <- paste0 ( nm , suffix )
out [ [i ] ] <- nm
2019-01-03 01:58:28 +03:00
}
2019-01-14 19:50:15 +03:00
out
2019-01-03 01:58:28 +03:00
}
2019-01-24 04:11:59 +03:00
#' Look up the applicable grouping variables for an operation
#' based on the data source and preceding sequence of operations
#' @param op An operation instance
2019-01-03 01:58:28 +03:00
#' @export
2019-01-14 19:50:15 +03:00
op_grps <- function ( op ) UseMethod ( " op_grps" )
2019-01-03 01:58:28 +03:00
2019-01-14 19:50:15 +03:00
#' @export
op_grps.op_base <- function ( op ) character ( )
2019-01-03 01:58:28 +03:00
2019-01-14 19:50:15 +03:00
#' @export
op_grps.op_group_by <- function ( op )
{
if ( isTRUE ( op $ args $ add ) )
union ( op_grps ( op $ x ) , names ( op $ dots ) )
else
names ( op $ dots )
2019-01-03 01:58:28 +03:00
}
2019-01-14 19:50:15 +03:00
#' @export
op_grps.op_ungroup <- function ( op )
2019-01-03 01:58:28 +03:00
{
2019-01-14 19:50:15 +03:00
character ( )
2019-01-03 01:58:28 +03:00
}
2019-01-14 19:50:15 +03:00
#' @export
op_grps.op_summarise <- function ( op )
2019-01-03 02:29:51 +03:00
{
2019-01-14 19:50:15 +03:00
grps <- op_grps ( op $ x )
}
2019-01-03 01:58:28 +03:00
2019-01-14 19:50:15 +03:00
#' @export
op_grps.op_rename <- function ( op )
{
names ( tidyselect :: vars_rename ( op_grps ( op $ x ) , ! ! ! op $ dots , .strict = FALSE ) )
}
2019-01-03 01:58:28 +03:00
2019-01-14 19:50:15 +03:00
#' @export
op_grps.op_single <- function ( op )
{
op_grps ( op $ x )
}
#' @export
op_grps.op_double <- function ( op )
{
op_grps ( op $ x )
2019-01-03 01:58:28 +03:00
}
2019-01-14 19:50:15 +03:00
#' @export
op_grps.tbl_kusto_abstract <- function ( op )
{
op_grps ( op $ ops )
}
2019-01-03 01:58:28 +03:00
2019-01-14 19:50:15 +03:00
#' @export
op_grps.tbl_df <- function ( op )
2019-01-03 01:58:28 +03:00
{
2019-01-14 19:50:15 +03:00
character ( )
2019-01-03 01:58:28 +03:00
}
2019-01-24 04:11:59 +03:00
#' Look up the applicable variables in scope for a given operation
#' based on the data source and preceding sequence of operations
#' @param op An operation instance
2019-01-14 19:50:15 +03:00
#' @export
op_vars <- function ( op ) UseMethod ( " op_vars" )
#' @export
op_vars.op_base <- function ( op )
2019-01-03 01:58:28 +03:00
{
2019-01-14 19:50:15 +03:00
op $ vars
2019-01-03 01:58:28 +03:00
}
2019-01-14 19:50:15 +03:00
#' @export
op_vars.op_select <- function ( op )
2019-01-03 01:58:28 +03:00
{
2019-01-14 19:50:15 +03:00
names ( tidyselect :: vars_select ( op_vars ( op $ x ) , ! ! ! op $ dots , .include = op_grps ( op $ x ) ) )
2019-01-03 01:58:28 +03:00
}
2019-01-14 19:50:15 +03:00
#' @export
op_vars.op_rename <- function ( op )
2019-01-03 01:58:28 +03:00
{
2019-08-31 03:25:32 +03:00
names ( tidyselect :: vars_rename ( op_vars ( op $ x ) , ! ! ! op $ dots ) )
2019-01-03 01:58:28 +03:00
}
2019-01-14 19:50:15 +03:00
#' @export
op_vars.op_summarise <- function ( op )
2019-01-03 01:58:28 +03:00
{
2019-01-14 19:50:15 +03:00
c ( op_grps ( op $ x ) , names ( op $ dots ) )
2019-01-03 01:58:28 +03:00
}
2019-01-14 19:50:15 +03:00
#' @export
op_vars.op_distinct <- function ( op )
2019-01-03 01:58:28 +03:00
{
2019-01-14 19:50:15 +03:00
if ( is_empty ( op $ dots ) )
op_vars ( op $ x )
2019-01-03 01:58:28 +03:00
else
2019-01-14 19:50:15 +03:00
unique ( c ( op_vars ( op $ x ) , names ( op $ dots ) ) )
2019-01-03 01:58:28 +03:00
}
2019-01-14 19:50:15 +03:00
#' @export
op_vars.op_mutate <- function ( op )
2019-01-03 01:58:28 +03:00
{
2019-01-14 19:50:15 +03:00
unique ( c ( op_vars ( op $ x ) , names ( op $ dots ) ) )
2019-01-03 01:58:28 +03:00
}
2019-01-14 19:50:15 +03:00
#' @export
op_vars.op_single <- function ( op )
2019-01-03 02:29:51 +03:00
{
2019-01-14 19:50:15 +03:00
op_vars ( op $ x )
2019-01-03 01:58:28 +03:00
}
2019-01-02 06:17:00 +03:00
#' @export
2019-01-14 19:50:15 +03:00
op_vars.op_join <- function ( op )
2019-01-02 06:17:00 +03:00
{
2019-01-14 19:50:15 +03:00
op $ args $ vars $ alias
2019-01-02 06:17:00 +03:00
}
#' @export
2019-01-14 19:50:15 +03:00
op_vars.op_join <- function ( op )
2019-01-02 06:17:00 +03:00
{
2019-01-14 19:50:15 +03:00
op $ args $ vars $ alias
2019-01-02 06:17:00 +03:00
}
#' @export
2019-01-14 19:50:15 +03:00
op_vars.op_semi_join <- function ( op )
2019-01-02 06:17:00 +03:00
{
2019-01-14 19:50:15 +03:00
op_vars ( op $ x )
2019-01-02 06:17:00 +03:00
}
#' @export
2019-01-14 19:50:15 +03:00
op_vars.op_set_op <- function ( op )
2019-01-02 06:17:00 +03:00
{
2019-01-14 19:50:15 +03:00
union ( op_vars ( op $ x ) , op_vars ( op $ y ) )
2019-01-02 06:17:00 +03:00
}
2019-01-03 01:58:28 +03:00
#' @export
2019-01-14 19:50:15 +03:00
op_vars.tbl_kusto_abstract <- function ( op )
2019-01-03 01:58:28 +03:00
{
2019-01-14 19:50:15 +03:00
op_vars ( op $ ops )
2019-01-03 01:58:28 +03:00
}
2019-01-03 07:43:08 +03:00
2019-01-14 19:50:15 +03:00
#' @export
op_vars.tbl_df <- function ( op )
2019-01-03 07:43:08 +03:00
{
2019-01-14 19:50:15 +03:00
names ( op )
2019-01-03 07:43:08 +03:00
}