diff --git a/DESCRIPTION b/DESCRIPTION index 0d2765c..6cba4ad 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -16,7 +16,8 @@ Imports: AzureRMR (>= 2.0.0), AzureStor (>= 3.0.0), openssl, - httr + httr, + uuid Suggests: AzureKeyVault, testthat, diff --git a/R/table_batch_request.R b/R/table_batch_request.R new file mode 100644 index 0000000..b77ac76 --- /dev/null +++ b/R/table_batch_request.R @@ -0,0 +1,105 @@ +BatchOperation <- R6::R6Class("BatchOperation", + +public=list( + endpoint=NULL, + path=NULL, + options=NULL, + headers=NULL, + method=NULL, + body=NULL, + + initialize=function(endpoint, path, options=list(), headers=list(), body=NULL, + metadata=c("none", "minimal", "full"), + http_verb=c("GET", "PUT", "POST", "PATCH", "DELETE", "HEAD")) + { + accept <- if(!is.null(metadata)) + { + metadata <- match.arg(metadata) + switch(match.arg(metadata), + "none"="application/json;odata=nometadata", + "minimal"="application/json;odata=minimalmetadata", + "full"="application/json;odata=fullmetadata") + } + else NULL + + self$endpoint <- endpoint + self$path <- path + self$options <- options + self$headers <- utils::modifyList(headers, list(Accept=accept, DataServiceVersion="3.0")) + self$method <- http_verb + }, + + serialize=function() + { + url <- httr::parse_url(self$endpoint$url) + url$path <- self$path + url$query <- self$options + + preamble <- c( + "Content-Type: application/http", + "Content-Transfer-Encoding: binary", + "", + paste0(names(self$headers), ": ", self$headers) + ) + + if(is.null(self$body)) + preamble + else if(!is.character(self$body)) + c(preamble, "", jsonlite::toJSON(self$body, auto_unbox=TRUE, null="null")) + else c(preamble, "", self$body) + } +)) + + +BatchRequest <- R6::R6Class("BatchRequest", + +public=list( + endpoint=NULL, + changesets=list(), + + initialize=function(endpoint, changesets) + { + self$endpoint <- endpoint + self$changesets <- changesets + }, + + send=function() + { + batch_bound <- paste0("batch_", uuid::UUIDgenerate()) + changeset_bound <- paste0("req_", uuid::UUIDgenerate()) + headers <- list(`Content-Type`=paste0("multipart/mixed; boundary=", batch_bound)) + call_table_endpoint(endpoint, "$batch", headers=headers, body=body, encode="raw", http_verb="POST") + } +)) + + + + +create_batch_operation <- function(endpoint, path, options=list(), headers=list(), body=NULL, + metadata=c("none", "minimal", "full"), http_verb=c("GET", "PUT", "POST", "PATCH", "DELETE", "HEAD")) +{ + BatchOperation$new(endpoint, path, options, headers, body, metadata, http_verb) +} + + +send_batch_request <- function(endpoint, operations, ...) +{ + # batch REST API only supports 1 changeset per batch, and is unlikely to change + batch_bound <- paste0("--batch_", uuid::UUIDgenerate()) + changeset_bound <- paste0("--changeset_", uuid::UUIDgenerate()) + headers <- list(`Content-Type`=paste0("multipart/mixed; boundary=", batch_bound)) + + batch_preamble <- c( + batch_bound, + paste0("Content-Type: multipart/mixed; boundary=", changeset_bound) + ) + batch_postscript <- c( + paste0(changeset_bound, "--"), + batch_bound + ) + reqs <- lapply(requests, function(req) c(changeset_bound, req$serialize())) + body <- c(batch_preamble, unlist(reqs), batch_postscript) + + invisible(call_table_endpoint(endpoint, "$batch", headers=headers, body=paste0(body, collapse="\n"), encode="raw", + http_verb="POST")) +} diff --git a/R/table_endpoint.R b/R/table_endpoint.R index ba775b8..2ee3c5d 100644 --- a/R/table_endpoint.R +++ b/R/table_endpoint.R @@ -14,12 +14,16 @@ table_endpoint <- function(endpoint, key=NULL, token=NULL, sas=NULL, call_table_endpoint <- function(endpoint, path, options=list(), headers=list(), body=NULL, ..., metadata=c("none", "minimal", "full")) { - metadata <- match.arg(metadata) - accept <- switch(metadata, - "none"="application/json;odata=nometadata", - "minimal"="application/json;odata=minimalmetadata", - "full"="application/json;odata=fullmetadata") - headers <- utils::modifyList(headers, list(Accept=accept)) + accept <- if(!is.null(metadata)) + { + metadata <- match.arg(metadata) + switch(metadata, + "none"="application/json;odata=nometadata", + "minimal"="application/json;odata=minimalmetadata", + "full"="application/json;odata=fullmetadata") + } + else NULL + headers <- utils::modifyList(headers, list(Accept=accept, DataServiceVersion="3.0")) if(is.list(body)) { @@ -30,5 +34,3 @@ call_table_endpoint <- function(endpoint, path, options=list(), headers=list(), call_storage_endpoint(endpoint, path=path, options=options, body=body, headers=headers, ...) } - - diff --git a/R/table_entity.R b/R/table_entity.R index cb2fb1a..15c820a 100644 --- a/R/table_entity.R +++ b/R/table_entity.R @@ -59,3 +59,26 @@ get_table_entity <- function(table, partition_key, row_key, select=NULL) opts <- list(`$select`=paste0(select, collapse=",")) call_table_endpoint(table$endpoint, path, options=opts) } + + +import_table_entities <- function(table, data, partition_key=NULL, row_key=NULL) +{ + force(data) + if(is.character(data) && jsonlite::validate(data)) + data <- jsonlite::fromJSON(data, simplifyDataFrame=TRUE) + + if(!is.null(partition_key)) + names(data)[names(data) == partition_key] <- "PartitionKey" + if(!is.null(row_key)) + names(data)[names(data) == row_key] <- "RowKey" + + if(!("PartitionKey" %in% names(data)) || !("RowKey" %in% names(data))) + stop("Data must contain columns named 'PartitionKey' and 'RowKey'", call.=FALSE) + + path <- table$name + ops <- lapply(seq_len(nrow(data)), function(i) + { + create_batch_operation(table$endpoint, path, body=data[i, ], http_verb="POST") + }) + ops +}