From 388edf1b6d4c8b823d4b44148e5324a78bd81187 Mon Sep 17 00:00:00 2001 From: Alex Kyllo Date: Tue, 26 Mar 2019 11:45:12 -0700 Subject: [PATCH] add compute and copy_to verbs (#45) * add compute and copy_to verbs * fix integration tests for compute and copy_to * recompile documentation for compute.tbl_kusto * remove ame_src * qualify dplyr calls in test cases, and test copy_to where source is a query on the same db * Update .gitignore * review changes * rm extraneous {}'s --- .gitignore | 2 +- NAMESPACE | 2 ++ R/endpoint.R | 42 ++++++++++++++++++++++++++ R/tbl.R | 23 ++++++++++++++ man/compute.tbl_kusto.Rd | 19 ++++++++++++ man/copy_to.kusto_database_endpoint.Rd | 33 ++++++++++++++++++++ tests/testthat/test04_ingest.R | 35 +++++++++++++++++++++ 7 files changed, 155 insertions(+), 1 deletion(-) create mode 100644 man/compute.tbl_kusto.Rd create mode 100644 man/copy_to.kusto_database_endpoint.Rd diff --git a/.gitignore b/.gitignore index aea277a..c99d63b 100644 --- a/.gitignore +++ b/.gitignore @@ -313,4 +313,4 @@ flycheck_*.el .dir-locals.el .env -test-creds.* \ No newline at end of file +test-creds.* diff --git a/NAMESPACE b/NAMESPACE index 6bca66f..8da83bb 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -3,6 +3,8 @@ S3method(anti_join,tbl_kusto_abstract) S3method(arrange,tbl_kusto_abstract) S3method(collect,tbl_kusto) +S3method(compute,tbl_kusto) +S3method(copy_to,kusto_database_endpoint) S3method(distinct,tbl_kusto_abstract) S3method(escape,"NULL") S3method(escape,Date) diff --git a/R/endpoint.R b/R/endpoint.R index b343282..d04405d 100644 --- a/R/endpoint.R +++ b/R/endpoint.R @@ -240,3 +240,45 @@ print.kusto_database_endpoint <- function(x, ...) cat("\n", sep="") invisible(x) } + +#' This function uploads a local data frame into a remote data source, creating the table definition as needed. +#' If the table exists, it will append the data to the existing table. If not, it will create a new table. +#' @export +#' @param dest remote data source +#' @param df local data frame +#' @param name Name for new remote table +#' @param overwrite If `TRUE`, will overwrite an existing table with +#' name `name`. If `FALSE`, will throw an error if `name` already +#' exists. +#' @param method For local ingestion, the method to use. "inline", "streaming", or "indirect". +#' @param ... other parameters passed to the query +#' @seealso [collect()] for the opposite action; downloading remote data into a local tbl. +copy_to.kusto_database_endpoint <- function(dest, df, name=deparse(substitute(df)), overwrite = FALSE, method = "inline", ...) +{ + if (!is.data.frame(df) && !inherits(df, "tbl_kusto")) + stop("`df` must be a local dataframe or a remote tbl_kusto", call. = FALSE) + + if (inherits(df, "tbl_kusto") && dest$server == df$src$server) + out <- compute(df, name = name, ...) + else + { + df <- collect(df) + class(df) <- "data.frame" # avoid S4 dispatch problem in dbSendPreparedQuery + + #initialize DBI connection + cnxn <- new("AzureKustoConnection", endpoint=dest) + tableExists <- DBI::dbExistsTable(cnxn, name) + if (tableExists) + { + if(overwrite) + DBI::dbRemoveTable(cnxn, name) + else stop(paste0("table ", + name, + " already exists. If you wish to overwrite it, specify overwrite=TRUE")) + } + dbWriteTable(cnxn, name, df, method=method) + + out <- tbl_kusto(dest, name) + } + invisible(out) +} diff --git a/R/tbl.R b/R/tbl.R index 1cd3f44..bcc5af0 100644 --- a/R/tbl.R +++ b/R/tbl.R @@ -326,6 +326,29 @@ collect.tbl_kusto <- function(tbl, ...) as_tibble(res) } +generate_table_name <- function() { + paste0("Rtbl_", paste0(sample(letters, 8), collapse="")) +} + +#' Execute the query, store the results in a table, and return a reference to the new table +#' @export +#' @param tbl An instance of class tbl_kusto representing a Kusto table +#' @param name The name for the Kusto table to be created. +#' If name is omitted, the table will be named Rtbl_ + 8 random lowercase letters +#' @param ... other parameters passed to the query +compute.tbl_kusto <- function(tbl, name=generate_table_name(), ...) +{ + q <- kql_build(tbl) + q_str <- kql_render(q) + new_tbl_name <- kql_escape_ident(name) + set_cmd <- kql(paste0(".set ", new_tbl_name, " <|\n")) + q_str <- kql(paste0(set_cmd, q_str)) + params <- c(tbl$params, list(...)) + params$database <- tbl$src + params$qry_cmd <- q_str + res <- do.call(run_query, params) + invisible(tbl_kusto(tbl$src, name)) +} #' @keywords internal #' @export diff --git a/man/compute.tbl_kusto.Rd b/man/compute.tbl_kusto.Rd new file mode 100644 index 0000000..9d41252 --- /dev/null +++ b/man/compute.tbl_kusto.Rd @@ -0,0 +1,19 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/tbl.R +\name{compute.tbl_kusto} +\alias{compute.tbl_kusto} +\title{Execute the query, store the results in a table, and return a reference to the new table} +\usage{ +\method{compute}{tbl_kusto}(tbl, name = generate_table_name(), ...) +} +\arguments{ +\item{tbl}{An instance of class tbl_kusto representing a Kusto table} + +\item{name}{The name for the Kusto table to be created. +If name is omitted, the table will be named Rtbl_ + 8 random lowercase letters} + +\item{...}{other parameters passed to the query} +} +\description{ +Execute the query, store the results in a table, and return a reference to the new table +} diff --git a/man/copy_to.kusto_database_endpoint.Rd b/man/copy_to.kusto_database_endpoint.Rd new file mode 100644 index 0000000..3829820 --- /dev/null +++ b/man/copy_to.kusto_database_endpoint.Rd @@ -0,0 +1,33 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/endpoint.R +\name{copy_to.kusto_database_endpoint} +\alias{copy_to.kusto_database_endpoint} +\title{This function uploads a local data frame into a remote data source, creating the table definition as needed. +If the table exists, it will append the data to the existing table. If not, it will create a new table.} +\usage{ +\method{copy_to}{kusto_database_endpoint}(dest, df, + name = deparse(substitute(df)), overwrite = FALSE, + method = "inline", ...) +} +\arguments{ +\item{dest}{remote data source} + +\item{df}{local data frame} + +\item{name}{Name for new remote table} + +\item{overwrite}{If \code{TRUE}, will overwrite an existing table with +name \code{name}. If \code{FALSE}, will throw an error if \code{name} already +exists.} + +\item{method}{For local ingestion, the method to use. "inline", "streaming", or "indirect".} + +\item{...}{other parameters passed to the query} +} +\description{ +This function uploads a local data frame into a remote data source, creating the table definition as needed. +If the table exists, it will append the data to the existing table. If not, it will create a new table. +} +\seealso{ +\code{\link[=collect]{collect()}} for the opposite action; downloading remote data into a local tbl. +} diff --git a/tests/testthat/test04_ingest.R b/tests/testthat/test04_ingest.R index de0d5b0..ffeef7b 100644 --- a/tests/testthat/test04_ingest.R +++ b/tests/testthat/test04_ingest.R @@ -135,6 +135,41 @@ test_that("local inline ingestion works", expect_equal(run_query(db, "irisdfinline | count")$Count, 150) }) +test_that("ingestion via compute() verb works", +{ + run_query(db, ".create table irisfileinline2 (sl:real, sw:real, pl:real, pw:real, species:string)") + ingest_local(db, "../resources/iris.csv", "irisfileinline2", method="inline") + irisfileinline2 <- tbl_kusto(db, "irisfileinline2") + + q <- irisfileinline2 %>% + dplyr::group_by(species) %>% + dplyr::summarize(max_sepal_length = max(sl)) + + new_tbl <- dplyr::compute(q, "irismaxsepallength") + expect_equal(new_tbl$src$table, "['irismaxsepallength']") +}) + +test_that("ingestion via copy_to() verb works", +{ + tbl_iris <- iris + names(tbl_iris) <- c('SepalLength', 'SepalWidth', 'PetalLength', 'PetalWidth', 'Species') + iris_copy_to <- dplyr::copy_to(db, tbl_iris, "iris_copy_to") + expect_equal(iris_copy_to$src$table, "['iris_copy_to']") +}) + +test_that("copy_to uses compute() when dest and src are in same Kusto database", +{ + run_query(db, ".create table irisfileinline3 (sl:real, sw:real, pl:real, pw:real, species:string)") + ingest_local(db, "../resources/iris.csv", "irisfileinline3", method="inline") + irisfileinline3 <- tbl_kusto(db, "irisfileinline3") + + q <- irisfileinline3 %>% + dplyr::group_by(species) %>% + dplyr::summarize(max_sepal_length = max(sl)) + + irismaxsepallength2 <- dplyr::copy_to(db, q, 'irismaxsepallength2') + expect_equal(irismaxsepallength2$src$table, "['irismaxsepallength2']") +}) srv$delete_database(dbname, confirm=FALSE) delete_blob_container(blobstor$get_blob_endpoint(), dbname, confirm=FALSE)