* retry with call_table_endpoint

* documenting

* retry for batch
This commit is contained in:
Hong Ooi 2020-10-23 08:20:23 +11:00 коммит произвёл GitHub
Родитель 7b22f71f92
Коммит 21e7280a11
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
8 изменённых файлов: 164 добавлений и 61 удалений

Просмотреть файл

@ -22,3 +22,5 @@ is_endpoint_url <- get("is_endpoint_url", getNamespace("AzureStor"))
delete_confirmed <- get("delete_confirmed", getNamespace("AzureStor"))
storage_error_message <- get("storage_error_message", getNamespace("AzureStor"))
process_storage_response <- get("process_storage_response", getNamespace("AzureStor"))

Просмотреть файл

@ -11,6 +11,7 @@
#' @param transaction For `do_batch_transaction`, an object of class `batch_transaction`.
#' @param operations A list of individual table operation objects, each of class `table_operation`.
#' @param batch_status_handler For `do_batch_transaction`, what to do if one or more of the batch operations fails. The default is to signal a warning and return a list of response objects, from which the details of the failure(s) can be determined. Set this to "pass" to ignore the failure.
#' @param num_retries The number of times to retry the call, if the response is a HTTP error 429 (too many requests). The Cosmos DB endpoint tends to be aggressive at rate-limiting requests, to maintain the desired level of latency. This will generally not affect calls to an endpoint provided by a storage account.
#' @param ... Arguments passed to lower-level functions.
#'
#' @details
@ -26,7 +27,7 @@
#' @return
#' `create_table_operation` returns an object of class `table_operation`.
#'
#' `do_batch_transaction` returns a list of objects of class `table_operation_response`, representing the results of each individual operation. Each object contains elements named `status`, `headers` and `body` containing the respective parts of the response. Note that the number of returned objects may be smaller than the number of operations in the batch, if the transaction failed.
#' Assuming the batch transaction did not fail due to rate-limiting, `do_batch_transaction` returns a list of objects of class `table_operation_response`, representing the results of each individual operation. Each object contains elements named `status`, `headers` and `body` containing the respective parts of the response. Note that the number of returned objects may be smaller than the number of operations in the batch, if the transaction failed.
#' @seealso
#' [import_table_entities], which uses (multiple) batch transactions under the hood
#'
@ -61,21 +62,21 @@
create_table_operation <- function(endpoint, path, options=list(), headers=list(), body=NULL,
metadata=c("none", "minimal", "full"), http_verb=c("GET", "PUT", "POST", "PATCH", "DELETE", "HEAD"))
{
accept <- if(!is.null(metadata))
headers <- utils::modifyList(headers, list(DataServiceVersion="3.0;NetFx"))
if(!is.null(metadata))
{
metadata <- match.arg(metadata)
switch(match.arg(metadata),
accept <- switch(match.arg(metadata),
"none"="application/json;odata=nometadata",
"minimal"="application/json;odata=minimalmetadata",
"full"="application/json;odata=fullmetadata")
headers$Accept <- accept
}
else NULL
obj <- list()
obj$endpoint <- endpoint
obj$path <- path
obj$options <- options
obj$headers <- utils::modifyList(headers, list(Accept=accept, DataServiceVersion="3.0;NetFx"))
obj$headers <- headers
obj$method <- match.arg(http_verb)
obj$body <- body
structure(obj, class="table_operation")
@ -90,30 +91,30 @@ serialize_table_operation <- function(object)
serialize_table_operation.table_operation <- function(object)
{
url <- httr::parse_url(object$endpoint$url)
url$path <- object$path
url$query <- object$options
url <- httr::parse_url(object$endpoint$url)
url$path <- object$path
url$query <- object$options
preamble <- c(
"Content-Type: application/http",
"Content-Transfer-Encoding: binary",
"",
paste(object$method, httr::build_url(url), "HTTP/1.1"),
paste0(names(object$headers), ": ", object$headers),
if(!is.null(object$body)) "Content-Type: application/json"
)
preamble <- c(
"Content-Type: application/http",
"Content-Transfer-Encoding: binary",
"",
paste(object$method, httr::build_url(url), "HTTP/1.1"),
paste0(names(object$headers), ": ", object$headers),
if(!is.null(object$body)) "Content-Type: application/json"
)
if(is.null(object$body))
preamble
else if(!is.character(object$body))
{
body <- jsonlite::toJSON(object$body, auto_unbox=TRUE, null="null")
# special-case treatment for 1-row dataframes
if(is.data.frame(object$body) && nrow(object$body) == 1)
body <- substr(body, 2, nchar(body) - 1)
c(preamble, "", body)
}
else c(preamble, "", object$body)
if(is.null(object$body))
preamble
else if(!is.character(object$body))
{
body <- jsonlite::toJSON(object$body, auto_unbox=TRUE, null="null")
# special-case treatment for 1-row dataframes
if(is.data.frame(object$body) && nrow(object$body) == 1)
body <- substr(body, 2, nchar(body) - 1)
c(preamble, "", body)
}
else c(preamble, "", object$body)
}
@ -136,7 +137,7 @@ do_batch_transaction <- function(transaction, ...)
#' @rdname table_batch
#' @export
do_batch_transaction.batch_transaction <- function(transaction,
batch_status_handler=c("warn", "stop", "message", "pass"), ...)
batch_status_handler=c("warn", "stop", "message", "pass"), num_retries=10, ...)
{
# batch REST API only supports 1 changeset per batch, and is unlikely to change
batch_bound <- paste0("batch_", uuid::UUIDgenerate())
@ -159,13 +160,34 @@ do_batch_transaction.batch_transaction <- function(transaction,
if(nchar(body) > 4194304)
stop("Batch request too large, must be 4MB or less")
res <- call_table_endpoint(transaction$endpoint, "$batch", headers=headers, body=body, encode="raw",
http_verb="POST")
process_batch_response(res, match.arg(batch_status_handler))
for(i in seq_len(num_retries))
{
res <- call_table_endpoint(transaction$endpoint, "$batch", headers=headers, body=body, encode="raw",
http_verb="POST")
reslst <- process_batch_response(res)
statuses <- sapply(reslst, `[[`, "status")
complete <- all(statuses != 429)
if(complete)
break
Sys.sleep(1.5^i)
}
if(!complete)
httr::stop_for_status(429, "complete batch transaction")
batch_status_handler <- match.arg(batch_status_handler)
if(any(statuses >= 300) && batch_status_handler != "pass")
{
msg <- paste("Batch transaction failed, max status code was", max(statuses))
switch(batch_status_handler,
"stop"=stop(msg, call.=FALSE),
"warn"=warning(msg, call.=FALSE),
"message"=message(msg, call.=FALSE)
)
}
statuses
}
process_batch_response <- function(response, batch_status_handler)
process_batch_response <- function(response)
{
# assume response (including body) is always text
response <- rawToChar(response)
@ -184,16 +206,15 @@ process_batch_response <- function(response, batch_status_handler)
lines <- lines[3:(n-3)]
op_bounds <- grep(changeset_bound, lines)
op_responses <- Map(
function(start, end) process_operation_response(lines[seq(start, end)], batch_status_handler),
Map(
function(start, end) process_operation_response(lines[seq(start, end)]),
op_bounds + 1,
c(op_bounds[-1], length(lines))
)
op_responses
}
process_operation_response <- function(response, handler)
process_operation_response <- function(response)
{
blanks <- which(response == "")
if(length(blanks) < 2)
@ -207,16 +228,6 @@ process_operation_response <- function(response, handler)
headers <- sapply(headers, `[[`, 2, simplify=FALSE)
class(headers) <- c("insensitive", "list")
if(status >= 300)
{
if(handler == "stop")
stop(httr::http_condition(status, "error"))
else if(handler == "warn")
warning(httr::http_condition(status, "warning"))
else if(handler == "message")
message(httr::http_condition(status, "message"))
}
body <- if(!(status %in% c(204, 205)) && blanks[2] < length(response))
response[seq(blanks[2]+1, length(response))]
else NULL

Просмотреть файл

@ -11,11 +11,17 @@
#' @param options For `call_table_endpoint`, a named list giving the query parameters for the operation.
#' @param headers For `call_table_endpoint`, a named list giving any additional HTTP headers to send to the host. AzureCosmosR will handle authentication details, so you don't have to specify these here.
#' @param body For `call_table_endpoint`, the request body for a PUT/POST/PATCH call.
#' @param http_verb For `call_table_endpoint`, the HTTP verb (method) of the operation.
#' @param http_status_handler For `call_table_endpoint`, the R handler for the HTTP status code of the response. ``"stop"``, ``"warn"`` or ``"message"`` will call the corresponding handlers in httr, while ``"pass"`` ignores the status code. The latter is primarily useful for debugging purposes.
#' @param return_headers For `call_table_endpoint`, whether to return the (parsed) response headers instead of the body. Ignored if `http_status_handler="pass"`.
#' @param metadata For `call_table_endpoint`, the level of ODATA metadata to include in the response.
#' @param num_retries The number of times to retry the call, if the response is a HTTP error 429 (too many requests). The Cosmos DB endpoint tends to be aggressive at rate-limiting requests, to maintain the desired level of latency. This will generally not affect calls to an endpoint provided by a storage account.
#' @param ... For `call_table_endpoint`, further arguments passed to `AzureStor::call_storage_endpoint` and `httr::VERB`.
#'
#' @return
#' An object of class `table_endpoint`, inheriting from `storage_endpoint`. This is the analogue of the `blob_endpoint`, `file_endpoint` and `adls_endpoint` classes provided by the AzureStor package.
#' `table_endpoint` returns an object of class `table_endpoint`, inheriting from `storage_endpoint`. This is the analogue of the `blob_endpoint`, `file_endpoint` and `adls_endpoint` classes provided by the AzureStor package.
#'
#' `call_table_endpoint` returns the body of the response by default, or the headers if `return_headers=TRUE`. If `http_status_handler="pass"`, it returns the entire response object without modification.
#'
#' @seealso
#' [storage_table], [table_entity], [AzureStor::call_storage_endpoint]
@ -54,18 +60,21 @@ table_endpoint <- function(endpoint, key=NULL, token=NULL, sas=NULL,
#' @rdname table_endpoint
#' @export
call_table_endpoint <- function(endpoint, path, options=list(), headers=list(), body=NULL, ...,
metadata=c("none", "minimal", "full"))
http_verb=c("GET", "DELETE", "PUT", "POST", "HEAD", "PATCH"),
http_status_handler=c("stop", "warn", "message", "pass"),
return_headers=(http_verb == "HEAD"),
metadata=c("none", "minimal", "full"),
num_retries=10)
{
accept <- if(!is.null(metadata))
headers <- utils::modifyList(headers, list(DataServiceVersion="3.0;NetFx"))
if(!is.null(metadata))
{
metadata <- match.arg(metadata)
switch(metadata,
accept <- switch(match.arg(metadata),
"none"="application/json;odata=nometadata",
"minimal"="application/json;odata=minimalmetadata",
"full"="application/json;odata=fullmetadata")
headers$Accept <- accept
}
else NULL
headers <- utils::modifyList(headers, list(Accept=accept, DataServiceVersion="3.0;NetFx"))
if(is.list(body))
{
@ -73,6 +82,17 @@ call_table_endpoint <- function(endpoint, path, options=list(), headers=list(),
headers$`Content-Length` <- nchar(body)
headers$`Content-Type` <- "application/json"
}
call_storage_endpoint(endpoint, path=path, options=options, body=body, headers=headers, ...)
http_verb <- match.arg(http_verb)
# handle possible rate limiting in Cosmos DB
for(i in seq_len(num_retries))
{
res <- call_storage_endpoint(endpoint, path=path, options=options, body=body, headers=headers,
http_verb=http_verb, http_status_handler="pass", return_headers=return_headers, ...)
if(httr::status_code(res) != 429)
break
Sys.sleep(1.5^i)
}
process_storage_response(res, match.arg(http_status_handler), FALSE)
}

Просмотреть файл

@ -8,6 +8,7 @@
#' @param filter,select For `list_table_entities`, optional row filter and column select expressions to subset the result with. If omitted, `list_table_entities` will return all entities in the table.
#' @param as_data_frame For `list_table_entities`, whether to return the results as a data frame, rather than a list of table rows.
#' @param batch_status_handler For `import_table_entities`, what to do if one or more of the batch operations fails. The default is to signal a warning and return a list of response objects, from which the details of the failure(s) can be determined. Set this to "pass" to ignore the failure.
#' @param ... For `import_table_entities`, further named arguments passed to `do_batch_transaction`.
#'
#' @details
#' These functions operate on rows of a table, also known as _entities_. `insert`, `get`, `update` and `delete_table_entity` operate on an individual row. `import_table_entities` bulk-inserts multiple rows of data into the table, using batch transactions. `list_table_entities` queries the table and returns multiple rows, subsetted on the `filter` and `select` arguments.
@ -183,7 +184,7 @@ get_table_entity <- function(table, row_key, partition_key, select=NULL)
#' @rdname table_entity
#' @export
import_table_entities <- function(table, data, row_key=NULL, partition_key=NULL,
batch_status_handler=c("warn", "stop", "message", "pass"))
batch_status_handler=c("warn", "stop", "message", "pass"), ...)
{
if(is.character(data) && jsonlite::validate(data))
data <- jsonlite::fromJSON(data, simplifyDataFrame=TRUE)
@ -212,7 +213,8 @@ import_table_entities <- function(table, data, row_key=NULL, partition_key=NULL,
})
})
res <- lapply(unlist(lst, recursive=FALSE), do_batch_transaction, batch_status_handler=batch_status_handler)
res <- lapply(unlist(lst, recursive=FALSE, use.names=FALSE), do_batch_transaction,
batch_status_handler=batch_status_handler, ...)
invisible(res)
}

Просмотреть файл

@ -24,6 +24,7 @@ do_batch_transaction(transaction, ...)
\method{do_batch_transaction}{batch_transaction}(
transaction,
batch_status_handler = c("warn", "stop", "message", "pass"),
num_retries = 10,
...
)
}
@ -49,11 +50,13 @@ do_batch_transaction(transaction, ...)
\item{...}{Arguments passed to lower-level functions.}
\item{batch_status_handler}{For \code{do_batch_transaction}, what to do if one or more of the batch operations fails. The default is to signal a warning and return a list of response objects, from which the details of the failure(s) can be determined. Set this to "pass" to ignore the failure.}
\item{num_retries}{The number of times to retry the call, if the response is a HTTP error 429 (too many requests). The Cosmos DB endpoint tends to be aggressive at rate-limiting requests, to maintain the desired level of latency. This will generally not affect calls to an endpoint provided by a storage account.}
}
\value{
\code{create_table_operation} returns an object of class \code{table_operation}.
\code{do_batch_transaction} returns a list of objects of class \code{table_operation_response}, representing the results of each individual operation. Each object contains elements named \code{status}, \code{headers} and \code{body} containing the respective parts of the response. Note that the number of returned objects may be smaller than the number of operations in the batch, if the transaction failed.
Assuming the batch transaction did not fail due to rate-limiting, \code{do_batch_transaction} returns a list of objects of class \code{table_operation_response}, representing the results of each individual operation. Each object contains elements named \code{status}, \code{headers} and \code{body} containing the respective parts of the response. Note that the number of returned objects may be smaller than the number of operations in the batch, if the transaction failed.
}
\description{
Batch transactions for table storage

Просмотреть файл

@ -20,7 +20,11 @@ call_table_endpoint(
headers = list(),
body = NULL,
...,
metadata = c("none", "minimal", "full")
http_verb = c("GET", "DELETE", "PUT", "POST", "HEAD", "PATCH"),
http_status_handler = c("stop", "warn", "message", "pass"),
return_headers = (http_verb == "HEAD"),
metadata = c("none", "minimal", "full"),
num_retries = 10
)
}
\arguments{
@ -44,10 +48,20 @@ call_table_endpoint(
\item{...}{For \code{call_table_endpoint}, further arguments passed to \code{AzureStor::call_storage_endpoint} and \code{httr::VERB}.}
\item{http_verb}{For \code{call_table_endpoint}, the HTTP verb (method) of the operation.}
\item{http_status_handler}{For \code{call_table_endpoint}, the R handler for the HTTP status code of the response. \code{"stop"}, \code{"warn"} or \code{"message"} will call the corresponding handlers in httr, while \code{"pass"} ignores the status code. The latter is primarily useful for debugging purposes.}
\item{return_headers}{For \code{call_table_endpoint}, whether to return the (parsed) response headers instead of the body. Ignored if \code{http_status_handler="pass"}.}
\item{metadata}{For \code{call_table_endpoint}, the level of ODATA metadata to include in the response.}
\item{num_retries}{The number of times to retry the call, if the response is a HTTP error 429 (too many requests). The Cosmos DB endpoint tends to be aggressive at rate-limiting requests, to maintain the desired level of latency. This will generally not affect calls to an endpoint provided by a storage account.}
}
\value{
An object of class \code{table_endpoint}, inheriting from \code{storage_endpoint}. This is the analogue of the \code{blob_endpoint}, \code{file_endpoint} and \code{adls_endpoint} classes provided by the AzureStor package.
\code{table_endpoint} returns an object of class \code{table_endpoint}, inheriting from \code{storage_endpoint}. This is the analogue of the \code{blob_endpoint}, \code{file_endpoint} and \code{adls_endpoint} classes provided by the AzureStor package.
\code{call_table_endpoint} returns the body of the response by default, or the headers if \code{return_headers=TRUE}. If \code{http_status_handler="pass"}, it returns the entire response object without modification.
}
\description{
Table storage endpoint object, and method to call it.

Просмотреть файл

@ -31,7 +31,8 @@ import_table_entities(
data,
row_key = NULL,
partition_key = NULL,
batch_status_handler = c("warn", "stop", "message", "pass")
batch_status_handler = c("warn", "stop", "message", "pass"),
...
)
}
\arguments{
@ -50,6 +51,8 @@ import_table_entities(
\item{data}{For \code{import_table_entities}, a data frame. See 'Details' below.}
\item{batch_status_handler}{For \code{import_table_entities}, what to do if one or more of the batch operations fails. The default is to signal a warning and return a list of response objects, from which the details of the failure(s) can be determined. Set this to "pass" to ignore the failure.}
\item{...}{For \code{import_table_entities}, further named arguments passed to \code{do_batch_transaction}.}
}
\value{
\code{insert_table_entity} and \code{update_table_entity} return the Etag of the inserted/updated entity, invisibly.

Просмотреть файл

@ -0,0 +1,48 @@
context("Cosmos DB table entities")
cosmosdb <- Sys.getenv("AZ_TEST_COSMOSDB_TABLE")
key <- Sys.getenv("AZ_TEST_COSMOSDB_TABLE_KEY")
if(cosmosdb == "" || key == "")
skip("Cosmos DB table client tests skipped: resource names not set")
endp <- table_endpoint(sprintf("https://%s.table.cosmos.azure.com:443", cosmosdb), key=key)
tab <- create_storage_table(endp, make_name())
test_that("Entity methods work",
{
etag <- insert_table_entity(tab, list(RowKey="row1", PartitionKey="part1", x=1, y=2))
expect_type(etag, "character")
etag2 <- update_table_entity(tab, '{"RowKey":"row1", "PartitionKey":"part1", "z":3}', etag=etag)
expect_type(etag2, "character")
expect_error(update_table_entity(tab, list(RowKey="row1", PartitionKey="part1", w=4), etag=etag))
expect_silent(update_table_entity(tab, list(RowKey="row1", PartitionKey="part1", w=4)))
expect_is(get_table_entity(tab, "row1", "part1"), "list")
expect_is(list_table_entities(tab), "data.frame")
expect_is(list_table_entities(tab, as_data_frame=FALSE), "list")
expect_silent(delete_table_entity(tab, row_key="row1", partition_key="part1"))
data <- do.call(rbind, replicate(10, iris, simplify=FALSE))
names(data) <- sub("\\.", "_", names(data))
expect_silent(import_table_entities(tab, data,
row_key=row.names(data),
partition_key=data$Species))
lst <- list_table_entities(tab)
expect_true(is.data.frame(lst) && nrow(lst) == nrow(data))
lst2 <- list_table_entities(tab, filter="Species eq 'setosa'", select="Sepal_Length,Sepal_Width")
expect_true(is.data.frame(lst2) && nrow(lst2) == nrow(data)/3 && ncol(lst2) == 2)
})
teardown({
lst <- list_storage_tables(endp)
lapply(lst, delete_storage_table, confirm=FALSE)
})