This commit is contained in:
Hong Ooi 2020-10-22 11:31:14 +11:00
Родитель 8378a19422
Коммит 8c1417e770
5 изменённых файлов: 85 добавлений и 41 удалений

Просмотреть файл

@ -5,6 +5,7 @@
\.Rxproj$ \.Rxproj$
^\.Rproj\.user$ ^\.Rproj\.user$
CONTRIBUTING.md CONTRIBUTING.md
CODE_OF_CONDUCT.md
^LICENSE\.md$ ^LICENSE\.md$
^SECURITY\.md$ ^SECURITY\.md$
azure-pipelines.yml ^\.github$

Просмотреть файл

@ -4,15 +4,18 @@ S3method(azure_table,table_endpoint)
S3method(create_azure_table,table_endpoint) S3method(create_azure_table,table_endpoint)
S3method(delete_azure_table,azure_table) S3method(delete_azure_table,azure_table)
S3method(delete_azure_table,table_endpoint) S3method(delete_azure_table,table_endpoint)
S3method(do_batch_transaction,batch_transaction)
S3method(list_azure_tables,table_endpoint) S3method(list_azure_tables,table_endpoint)
S3method(print,azure_table) S3method(print,azure_table)
S3method(print,batch_operation) S3method(print,batch_transaction)
S3method(print,batch_operation_response) S3method(print,table_operation)
S3method(print,table_operation_response)
S3method(sign_request,table_endpoint) S3method(sign_request,table_endpoint)
export(azure_table) export(azure_table)
export(call_table_endpoint) export(call_table_endpoint)
export(create_azure_table) export(create_azure_table)
export(create_batch_operation) export(create_batch_transaction)
export(create_table_operation)
export(delete_azure_table) export(delete_azure_table)
export(delete_table_entity) export(delete_table_entity)
export(do_batch_transaction) export(do_batch_transaction)

Просмотреть файл

@ -7,13 +7,16 @@
#' @param body The request body for a PUT/POST/PATCH operation. #' @param body The request body for a PUT/POST/PATCH operation.
#' @param metadata The level of ODATA metadata to include in the response. #' @param metadata The level of ODATA metadata to include in the response.
#' @param http_verb The HTTP verb (method) for the operation. #' @param http_verb The HTTP verb (method) for the operation.
#' @param operations For `do_batch_transaction`, a list of individual operations to be batched up. #' @param operations For `create_batch_transaction`, a list of individual operations to be batched up.
#' @param transaction For `do_batch_transaction`, an object of class `batch_transaction`.
#' @param operations A list of individual table operation objects, each of class `table_operation`.
#' @param batch_status_handler For `do_batch_transaction`, what to do if one or more of the batch operations fails. The default is to signal a warning and return a list of response objects, from which the details of the failure(s) can be determined. Set this to "pass" to ignore the failure. #' @param batch_status_handler For `do_batch_transaction`, what to do if one or more of the batch operations fails. The default is to signal a warning and return a list of response objects, from which the details of the failure(s) can be determined. Set this to "pass" to ignore the failure.
#' @param ... Arguments passed to lower-level functions.
#' #'
#' @details #' @details
#' Table storage supports batch transactions on entities that are in the same table and belong to the same partition group. Batch transactions are also known as _entity group transactions_. #' Table storage supports batch transactions on entities that are in the same table and belong to the same partition group. Batch transactions are also known as _entity group transactions_.
#' #'
#' You can use `create_batch_operation` to produce an object corresponding to a single table storage operation, such as inserting, deleting or updating an entity. Multiple such objects can then be passed to `do_batch_transaction`, which will carry them out as a single atomic transaction. #' You can use `create_table_operation` to produce an object corresponding to a single table storage operation, such as inserting, deleting or updating an entity. Multiple such objects can then be passed to `create_batch_transaction`, which bundles them into a single atomic transaction. Call `do_batch_transaction` to send the transaction to the endpoint.
#' #'
#' Note that batch transactions are subject to some limitations imposed by the REST API: #' Note that batch transactions are subject to some limitations imposed by the REST API:
#' - All entities subject to operations as part of the transaction must have the same `PartitionKey` value. #' - All entities subject to operations as part of the transaction must have the same `PartitionKey` value.
@ -21,9 +24,9 @@
#' - The transaction can include at most 100 entities, and its total payload may be no more than 4 MB in size. #' - The transaction can include at most 100 entities, and its total payload may be no more than 4 MB in size.
#' #'
#' @return #' @return
#' `create_batch_operation` returns an object of class `batch_operation`. #' `create_table_operation` returns an object of class `table_operation`.
#' #'
#' `do_batch_transaction` returns a list of objects of class `batch_operation_response`, representing the results of each individual operation. Each object contains elements named `status`, `headers` and `body` containing the respective parts of the response. Note that the number of returned objects may be smaller than the number of operations in the batch, if the transaction failed. #' `do_batch_transaction` returns a list of objects of class `table_operation_response`, representing the results of each individual operation. Each object contains elements named `status`, `headers` and `body` containing the respective parts of the response. Note that the number of returned objects may be smaller than the number of operations in the batch, if the transaction failed.
#' @seealso #' @seealso
#' [import_table_entities], which uses (multiple) batch transactions under the hood #' [import_table_entities], which uses (multiple) batch transactions under the hood
#' #'
@ -46,15 +49,16 @@
#' #'
#' # generate the array of insert operations: 1 per row #' # generate the array of insert operations: 1 per row
#' ops <- lapply(seq_len(nrow(ir)), function(i) #' ops <- lapply(seq_len(nrow(ir)), function(i)
#' create_batch_operation(endp, "mytable", body=ir[i, ], http_verb="POST"))) #' create_table_operation(endp, "mytable", body=ir[i, ], http_verb="POST")))
#' #'
#' # send it to the endpoint #' # create a batch transaction and send it to the endpoint
#' do_batch_transaction(endp, ops) #' bat <- create_batch_transaction(endp, ops)
#' do_batch_transaction(bat)
#' #'
#' } #' }
#' @rdname table_batch #' @rdname table_batch
#' @export #' @export
create_batch_operation <- function(endpoint, path, options=list(), headers=list(), body=NULL, create_table_operation <- function(endpoint, path, options=list(), headers=list(), body=NULL,
metadata=c("none", "minimal", "full"), http_verb=c("GET", "PUT", "POST", "PATCH", "DELETE", "HEAD")) metadata=c("none", "minimal", "full"), http_verb=c("GET", "PUT", "POST", "PATCH", "DELETE", "HEAD"))
{ {
accept <- if(!is.null(metadata)) accept <- if(!is.null(metadata))
@ -74,17 +78,17 @@ create_batch_operation <- function(endpoint, path, options=list(), headers=list(
obj$headers <- utils::modifyList(headers, list(Accept=accept, DataServiceVersion="3.0;NetFx")) obj$headers <- utils::modifyList(headers, list(Accept=accept, DataServiceVersion="3.0;NetFx"))
obj$method <- match.arg(http_verb) obj$method <- match.arg(http_verb)
obj$body <- body obj$body <- body
structure(obj, class="batch_operation") structure(obj, class="table_operation")
} }
serialize_batch_operation <- function(object) serialize_table_operation <- function(object)
{ {
UseMethod("serialize_batch_operation") UseMethod("serialize_table_operation")
} }
serialize_batch_operation.batch_operation <- function(object) serialize_table_operation.table_operation <- function(object)
{ {
url <- httr::parse_url(object$endpoint$url) url <- httr::parse_url(object$endpoint$url)
url$path <- object$path url$path <- object$path
@ -115,7 +119,24 @@ serialize_batch_operation.batch_operation <- function(object)
#' @rdname table_batch #' @rdname table_batch
#' @export #' @export
do_batch_transaction <- function(endpoint, operations, batch_status_handler=c("warn", "stop", "message", "pass")) create_batch_transaction <- function(endpoint, operations)
{
structure(list(endpoint=endpoint, ops=operations), class="batch_transaction")
}
#' @rdname table_batch
#' @export
do_batch_transaction <- function(transaction, ...)
{
UseMethod("do_batch_transaction")
}
#' @rdname table_batch
#' @export
do_batch_transaction.batch_transaction <- function(transaction,
batch_status_handler=c("warn", "stop", "message", "pass"), ...)
{ {
# batch REST API only supports 1 changeset per batch, and is unlikely to change # batch REST API only supports 1 changeset per batch, and is unlikely to change
batch_bound <- paste0("batch_", uuid::UUIDgenerate()) batch_bound <- paste0("batch_", uuid::UUIDgenerate())
@ -132,12 +153,13 @@ do_batch_transaction <- function(endpoint, operations, batch_status_handler=c("w
paste0("--", changeset_bound, "--"), paste0("--", changeset_bound, "--"),
paste0("--", batch_bound, "--") paste0("--", batch_bound, "--")
) )
serialized <- lapply(operations, function(op) c(paste0("--", changeset_bound), serialize_batch_operation(op))) serialized <- lapply(transaction$ops,
function(op) c(paste0("--", changeset_bound), serialize_table_operation(op)))
body <- paste0(c(batch_preamble, unlist(serialized), batch_postscript), collapse="\n") body <- paste0(c(batch_preamble, unlist(serialized), batch_postscript), collapse="\n")
if(nchar(body) > 4194304) if(nchar(body) > 4194304)
stop("Batch request too large, must be 4MB or less") stop("Batch request too large, must be 4MB or less")
res <- call_table_endpoint(endpoint, "$batch", headers=headers, body=body, encode="raw", res <- call_table_endpoint(transaction$endpoint, "$batch", headers=headers, body=body, encode="raw",
http_verb="POST") http_verb="POST")
process_batch_response(res, match.arg(batch_status_handler)) process_batch_response(res, match.arg(batch_status_handler))
} }
@ -200,22 +222,28 @@ process_operation_response <- function(response, handler)
else NULL else NULL
obj <- list(status=status, headers=headers, body=body) obj <- list(status=status, headers=headers, body=body)
class(obj) <- "batch_operation_response" class(obj) <- "table_operation_response"
obj obj
} }
#' @export #' @export
print.batch_operation <- function(x, ...) print.table_operation <- function(x, ...)
{ {
cat("<Table storage batch operation>\n") cat("<Table storage batch operation>\n")
invisible(x) invisible(x)
} }
#' @export #' @export
print.batch_operation_response <- function(x, ...) print.table_operation_response <- function(x, ...)
{ {
cat("<Table storage batch operation response>\n") cat("<Table storage batch operation response>\n")
invisible(x) invisible(x)
} }
#' @export
print.batch_transaction <- function(x, ...)
{
cat("<Table storage batch transaction>\n")
invisible(x)
}

Просмотреть файл

@ -195,20 +195,21 @@ import_table_entities <- function(table, data, row_key=NULL, partition_key=NULL,
path <- table$name path <- table$name
headers <- list(Prefer="return-no-content") headers <- list(Prefer="return-no-content")
batch_status_handler <- match.arg(batch_status_handler) batch_status_handler <- match.arg(batch_status_handler)
res <- lapply(split(data, data$PartitionKey), function(dfpart) lst <- lapply(split(data, data$PartitionKey), function(dfpart)
{ {
n <- nrow(dfpart) n <- nrow(dfpart)
nchunks <- n %/% 100 + (n %% 100 > 0) nchunks <- n %/% 100 + (n %% 100 > 0)
reschunks <- lapply(seq_len(nchunks), function(chunk) lapply(seq_len(nchunks), function(chunk)
{ {
rows <- seq(from=(chunk-1)*100 + 1, to=min(chunk*100, n)) rows <- seq(from=(chunk-1)*100 + 1, to=min(chunk*100, n))
dfchunk <- dfpart[rows, ] dfchunk <- dfpart[rows, ]
ops <- lapply(seq_len(nrow(dfchunk)), function(i) ops <- lapply(seq_len(nrow(dfchunk)), function(i)
create_batch_operation(endpoint, path, body=dfchunk[i, ], headers=headers, http_verb="POST")) create_table_operation(endpoint, path, body=dfchunk[i, ], headers=headers, http_verb="POST"))
do_batch_transaction(endpoint, ops, batch_status_handler) create_batch_transaction(endpoint, ops)
}) })
unlist(reschunks, recursive=FALSE)
}) })
res <- lapply(unlist(lst, recursive=FALSE), do_batch_transaction)
invisible(res) invisible(res)
} }

Просмотреть файл

@ -1,11 +1,13 @@
% Generated by roxygen2: do not edit by hand % Generated by roxygen2: do not edit by hand
% Please edit documentation in R/table_batch_request.R % Please edit documentation in R/table_batch_request.R
\name{create_batch_operation} \name{create_table_operation}
\alias{create_batch_operation} \alias{create_table_operation}
\alias{create_batch_transaction}
\alias{do_batch_transaction} \alias{do_batch_transaction}
\alias{do_batch_transaction.batch_transaction}
\title{Batch transactions for table storage} \title{Batch transactions for table storage}
\usage{ \usage{
create_batch_operation( create_table_operation(
endpoint, endpoint,
path, path,
options = list(), options = list(),
@ -15,10 +17,14 @@ create_batch_operation(
http_verb = c("GET", "PUT", "POST", "PATCH", "DELETE", "HEAD") http_verb = c("GET", "PUT", "POST", "PATCH", "DELETE", "HEAD")
) )
do_batch_transaction( create_batch_transaction(endpoint, operations)
endpoint,
operations, do_batch_transaction(transaction, ...)
batch_status_handler = c("warn", "stop", "message", "pass")
\method{do_batch_transaction}{batch_transaction}(
transaction,
batch_status_handler = c("warn", "stop", "message", "pass"),
...
) )
} }
\arguments{ \arguments{
@ -36,14 +42,18 @@ do_batch_transaction(
\item{http_verb}{The HTTP verb (method) for the operation.} \item{http_verb}{The HTTP verb (method) for the operation.}
\item{operations}{For \code{do_batch_transaction}, a list of individual operations to be batched up.} \item{operations}{A list of individual table operation objects, each of class \code{table_operation}.}
\item{transaction}{For \code{do_batch_transaction}, an object of class \code{batch_transaction}.}
\item{...}{Arguments passed to lower-level functions.}
\item{batch_status_handler}{For \code{do_batch_transaction}, what to do if one or more of the batch operations fails. The default is to signal a warning and return a list of response objects, from which the details of the failure(s) can be determined. Set this to "pass" to ignore the failure.} \item{batch_status_handler}{For \code{do_batch_transaction}, what to do if one or more of the batch operations fails. The default is to signal a warning and return a list of response objects, from which the details of the failure(s) can be determined. Set this to "pass" to ignore the failure.}
} }
\value{ \value{
\code{create_batch_operation} returns an object of class \code{batch_operation}. \code{create_table_operation} returns an object of class \code{table_operation}.
\code{do_batch_transaction} returns a list of objects of class \code{batch_operation_response}, representing the results of each individual operation. Each object contains elements named \code{status}, \code{headers} and \code{body} containing the respective parts of the response. Note that the number of returned objects may be smaller than the number of operations in the batch, if the transaction failed. \code{do_batch_transaction} returns a list of objects of class \code{table_operation_response}, representing the results of each individual operation. Each object contains elements named \code{status}, \code{headers} and \code{body} containing the respective parts of the response. Note that the number of returned objects may be smaller than the number of operations in the batch, if the transaction failed.
} }
\description{ \description{
Batch transactions for table storage Batch transactions for table storage
@ -51,7 +61,7 @@ Batch transactions for table storage
\details{ \details{
Table storage supports batch transactions on entities that are in the same table and belong to the same partition group. Batch transactions are also known as \emph{entity group transactions}. Table storage supports batch transactions on entities that are in the same table and belong to the same partition group. Batch transactions are also known as \emph{entity group transactions}.
You can use \code{create_batch_operation} to produce an object corresponding to a single table storage operation, such as inserting, deleting or updating an entity. Multiple such objects can then be passed to \code{do_batch_transaction}, which will carry them out as a single atomic transaction. You can use \code{create_table_operation} to produce an object corresponding to a single table storage operation, such as inserting, deleting or updating an entity. Multiple such objects can then be passed to \code{create_batch_transaction}, which bundles them into a single atomic transaction. Call \code{do_batch_transaction} to send the transaction to the endpoint.
Note that batch transactions are subject to some limitations imposed by the REST API: Note that batch transactions are subject to some limitations imposed by the REST API:
\itemize{ \itemize{
@ -78,10 +88,11 @@ ir$RowKey <- sprintf("\%03d", seq_len(nrow(ir)))
# generate the array of insert operations: 1 per row # generate the array of insert operations: 1 per row
ops <- lapply(seq_len(nrow(ir)), function(i) ops <- lapply(seq_len(nrow(ir)), function(i)
create_batch_operation(endp, "mytable", body=ir[i, ], http_verb="POST"))) create_table_operation(endp, "mytable", body=ir[i, ], http_verb="POST")))
# send it to the endpoint # create a batch transaction and send it to the endpoint
do_batch_transaction(endp, ops) bat <- create_batch_transaction(endp, ops)
do_batch_transaction(bat)
} }
} }