add compute and copy_to verbs (#45)

* add compute and copy_to verbs

* fix integration tests for compute and copy_to

* recompile documentation for compute.tbl_kusto

* remove ame_src

* qualify dplyr calls in test cases, and test copy_to where source is a query on the same db

* Update .gitignore

* review changes

* rm extraneous {}'s
This commit is contained in:
Alex Kyllo 2019-03-26 11:45:12 -07:00 коммит произвёл Hong Ooi
Родитель c5ad2ddb90
Коммит 388edf1b6d
7 изменённых файлов: 155 добавлений и 1 удалений

2
.gitignore поставляемый
Просмотреть файл

@ -313,4 +313,4 @@ flycheck_*.el
.dir-locals.el
.env
test-creds.*
test-creds.*

Просмотреть файл

@ -3,6 +3,8 @@
S3method(anti_join,tbl_kusto_abstract)
S3method(arrange,tbl_kusto_abstract)
S3method(collect,tbl_kusto)
S3method(compute,tbl_kusto)
S3method(copy_to,kusto_database_endpoint)
S3method(distinct,tbl_kusto_abstract)
S3method(escape,"NULL")
S3method(escape,Date)

Просмотреть файл

@ -240,3 +240,45 @@ print.kusto_database_endpoint <- function(x, ...)
cat("<Kusto database endpoint '", httr::build_url(url), "'>\n", sep="")
invisible(x)
}
#' This function uploads a local data frame into a remote data source, creating the table definition as needed.
#' If the table exists, it will append the data to the existing table. If not, it will create a new table.
#' @export
#' @param dest remote data source
#' @param df local data frame
#' @param name Name for new remote table
#' @param overwrite If `TRUE`, will overwrite an existing table with
#' name `name`. If `FALSE`, will throw an error if `name` already
#' exists.
#' @param method For local ingestion, the method to use. "inline", "streaming", or "indirect".
#' @param ... other parameters passed to the query
#' @seealso [collect()] for the opposite action; downloading remote data into a local tbl.
copy_to.kusto_database_endpoint <- function(dest, df, name=deparse(substitute(df)), overwrite = FALSE, method = "inline", ...)
{
if (!is.data.frame(df) && !inherits(df, "tbl_kusto"))
stop("`df` must be a local dataframe or a remote tbl_kusto", call. = FALSE)
if (inherits(df, "tbl_kusto") && dest$server == df$src$server)
out <- compute(df, name = name, ...)
else
{
df <- collect(df)
class(df) <- "data.frame" # avoid S4 dispatch problem in dbSendPreparedQuery
#initialize DBI connection
cnxn <- new("AzureKustoConnection", endpoint=dest)
tableExists <- DBI::dbExistsTable(cnxn, name)
if (tableExists)
{
if(overwrite)
DBI::dbRemoveTable(cnxn, name)
else stop(paste0("table ",
name,
" already exists. If you wish to overwrite it, specify overwrite=TRUE"))
}
dbWriteTable(cnxn, name, df, method=method)
out <- tbl_kusto(dest, name)
}
invisible(out)
}

23
R/tbl.R
Просмотреть файл

@ -326,6 +326,29 @@ collect.tbl_kusto <- function(tbl, ...)
as_tibble(res)
}
generate_table_name <- function() {
paste0("Rtbl_", paste0(sample(letters, 8), collapse=""))
}
#' Execute the query, store the results in a table, and return a reference to the new table
#' @export
#' @param tbl An instance of class tbl_kusto representing a Kusto table
#' @param name The name for the Kusto table to be created.
#' If name is omitted, the table will be named Rtbl_ + 8 random lowercase letters
#' @param ... other parameters passed to the query
compute.tbl_kusto <- function(tbl, name=generate_table_name(), ...)
{
q <- kql_build(tbl)
q_str <- kql_render(q)
new_tbl_name <- kql_escape_ident(name)
set_cmd <- kql(paste0(".set ", new_tbl_name, " <|\n"))
q_str <- kql(paste0(set_cmd, q_str))
params <- c(tbl$params, list(...))
params$database <- tbl$src
params$qry_cmd <- q_str
res <- do.call(run_query, params)
invisible(tbl_kusto(tbl$src, name))
}
#' @keywords internal
#' @export

19
man/compute.tbl_kusto.Rd Normal file
Просмотреть файл

@ -0,0 +1,19 @@
% Generated by roxygen2: do not edit by hand
% Please edit documentation in R/tbl.R
\name{compute.tbl_kusto}
\alias{compute.tbl_kusto}
\title{Execute the query, store the results in a table, and return a reference to the new table}
\usage{
\method{compute}{tbl_kusto}(tbl, name = generate_table_name(), ...)
}
\arguments{
\item{tbl}{An instance of class tbl_kusto representing a Kusto table}
\item{name}{The name for the Kusto table to be created.
If name is omitted, the table will be named Rtbl_ + 8 random lowercase letters}
\item{...}{other parameters passed to the query}
}
\description{
Execute the query, store the results in a table, and return a reference to the new table
}

Просмотреть файл

@ -0,0 +1,33 @@
% Generated by roxygen2: do not edit by hand
% Please edit documentation in R/endpoint.R
\name{copy_to.kusto_database_endpoint}
\alias{copy_to.kusto_database_endpoint}
\title{This function uploads a local data frame into a remote data source, creating the table definition as needed.
If the table exists, it will append the data to the existing table. If not, it will create a new table.}
\usage{
\method{copy_to}{kusto_database_endpoint}(dest, df,
name = deparse(substitute(df)), overwrite = FALSE,
method = "inline", ...)
}
\arguments{
\item{dest}{remote data source}
\item{df}{local data frame}
\item{name}{Name for new remote table}
\item{overwrite}{If \code{TRUE}, will overwrite an existing table with
name \code{name}. If \code{FALSE}, will throw an error if \code{name} already
exists.}
\item{method}{For local ingestion, the method to use. "inline", "streaming", or "indirect".}
\item{...}{other parameters passed to the query}
}
\description{
This function uploads a local data frame into a remote data source, creating the table definition as needed.
If the table exists, it will append the data to the existing table. If not, it will create a new table.
}
\seealso{
\code{\link[=collect]{collect()}} for the opposite action; downloading remote data into a local tbl.
}

Просмотреть файл

@ -135,6 +135,41 @@ test_that("local inline ingestion works",
expect_equal(run_query(db, "irisdfinline | count")$Count, 150)
})
test_that("ingestion via compute() verb works",
{
run_query(db, ".create table irisfileinline2 (sl:real, sw:real, pl:real, pw:real, species:string)")
ingest_local(db, "../resources/iris.csv", "irisfileinline2", method="inline")
irisfileinline2 <- tbl_kusto(db, "irisfileinline2")
q <- irisfileinline2 %>%
dplyr::group_by(species) %>%
dplyr::summarize(max_sepal_length = max(sl))
new_tbl <- dplyr::compute(q, "irismaxsepallength")
expect_equal(new_tbl$src$table, "['irismaxsepallength']")
})
test_that("ingestion via copy_to() verb works",
{
tbl_iris <- iris
names(tbl_iris) <- c('SepalLength', 'SepalWidth', 'PetalLength', 'PetalWidth', 'Species')
iris_copy_to <- dplyr::copy_to(db, tbl_iris, "iris_copy_to")
expect_equal(iris_copy_to$src$table, "['iris_copy_to']")
})
test_that("copy_to uses compute() when dest and src are in same Kusto database",
{
run_query(db, ".create table irisfileinline3 (sl:real, sw:real, pl:real, pw:real, species:string)")
ingest_local(db, "../resources/iris.csv", "irisfileinline3", method="inline")
irisfileinline3 <- tbl_kusto(db, "irisfileinline3")
q <- irisfileinline3 %>%
dplyr::group_by(species) %>%
dplyr::summarize(max_sepal_length = max(sl))
irismaxsepallength2 <- dplyr::copy_to(db, q, 'irismaxsepallength2')
expect_equal(irismaxsepallength2$src$table, "['irismaxsepallength2']")
})
srv$delete_database(dbname, confirm=FALSE)
delete_blob_container(blobstor$get_blob_endpoint(), dbname, confirm=FALSE)