This commit is contained in:
Hong Ooi 2020-04-07 04:34:41 +10:00
Родитель 65235b283f
Коммит 15e62859a8
9 изменённых файлов: 162 добавлений и 13 удалений

Просмотреть файл

@ -6,9 +6,11 @@ S3method(create_storage_queue,queue_endpoint)
S3method(delete_storage_queue,StorageQueue)
S3method(delete_storage_queue,character)
S3method(delete_storage_queue,queue_endpoint)
S3method(get_storage_metadata,StorageQueue)
S3method(list_storage_containers,queue_endpoint)
S3method(list_storage_queues,character)
S3method(list_storage_queues,queue_endpoint)
S3method(set_storage_metadata,StorageQueue)
S3method(storage_queue,character)
S3method(storage_queue,queue_endpoint)
export(QueueMessage)

Просмотреть файл

@ -14,13 +14,17 @@ utils::globalVariables(c("self", "private"))
}
# assorted imports of unexported functions
# assorted imports of friend functions
delete_confirmed <- getNamespace("AzureStor")$delete_confirmed
is_endpoint_url <- getNamespace("AzureStor")$is_endpoint_url
generate_endpoint_container <- getNamespace("AzureStor")$generate_endpoint_container
get_classic_metadata_headers <- getNamespace("AzureStor")$get_classic_metadata_headers
set_classic_metadata_headers <- getNamespace("AzureStor")$set_classic_metadata_headers
# tag xml2 to satisfy R CMD check
xml2::as_xml_document

Просмотреть файл

@ -32,7 +32,7 @@ public=list(
#' @field insertion_time The message insertion (creation) time.
#' @field expiry_time The message expiration time.
#' @field text The message text.
#' @field receipt A pop receipt. This is present if the message was obtained by means other than [peeking][StorageQueue#method-peek_message], and is required for updating or deleting the message.
#' @field receipt A pop receipt. This is present if the message was obtained by means other than [peeking][StorageQueue], and is required for updating or deleting the message.
#' @field next_visible_time The time when this message will be next visible.
#' @field dequeue_count The number of times this message has been read.
queue=NULL,
@ -73,7 +73,7 @@ public=list(
#' @description
#' Updates this message in the queue.
#'
#' This operation can be used to continually extend the invisibility of a queue message. This functionality can be useful if you want a worker role to "lease" a message. For example, if a worker role calls [`get_messages`][StorageQueue#method-get_messages] and recognizes that it needs more time to process a message, it can continually extend the message's invisibility until it is processed. If the worker role were to fail during processing, eventually the message would become visible again and another worker role could process it.
#' This operation can be used to continually extend the invisibility of a queue message. This functionality can be useful if you want a worker role to "lease" a message. For example, if a worker role calls [`get_messages`][StorageQueue] and recognizes that it needs more time to process a message, it can continually extend the message's invisibility until it is processed. If the worker role were to fail during processing, eventually the message would become visible again and another worker role could process it.
#' @param visibility_timeout The new visibility timeout (time to when the message will again be visible).
#' @param text Optionally, new message text, either a raw or character vector. If a raw vector, it is base64-encoded, and if a character vector, it is collapsed into a single string before being sent to the queue.
update=function(visibility_timeout, text=self$text)

Просмотреть файл

@ -98,13 +98,41 @@ public=list(
do_container_op(self, "messages", http_verb="DELETE")
},
#' @description
#' Retrieves user-defined metadata for the queue.
#' @return
#' A named list of metadata properties.
get_metadata=function()
{
res <- do_container_op(self, "", options=list(comp="metadata"), http_verb="HEAD")
get_classic_metadata_headers(res)
},
#' @description
#' Sets user-defined metadata for the queue.
#' @param ... Name-value pairs to set as metadata.
#' @param keep_existing Whether to retain existing metadata information.
#' @return
#' A named list of metadata properties, invisibly.
set_metadata=function(..., keep_existing=TRUE)
{
meta <- if(keep_existing)
utils::modifyList(self$get_metadata(), list(...))
else list(...)
do_container_op(self, options=list(comp="metadata"), headers=set_classic_metadata_headers(meta),
http_verb = "PUT")
invisible(meta)
},
#' @description
#' Reads a message from the front of the storage queue.
#'
#' When a message is read, the consumer is expected to process the message and then delete it. After the message is read, it is made invisible to other consumers for a specified interval. If the message has not yet been deleted at the time the interval expires, its visibility is restored, so that another consumer may process it.
#' @return
#' A new object of class [`QueueMessage`].
get_message=function() {
get_message=function()
{
new_message(do_container_op(self, "messages")$QueueMessage, self)
},
@ -115,7 +143,8 @@ public=list(
#' @param n How many messages to read. The maximum is 32.
#' @return
#' A list of objects of class [`QueueMessage`].
get_messages=function(n=1) {
get_messages=function(n=1)
{
opts <- list(numofmessages=n)
lapply(do_container_op(self, "messages", options=opts), new_message, queue=self)
},
@ -126,7 +155,8 @@ public=list(
#' Note that a message obtained via the `peek_message` or `peek_messages` method will not include a pop receipt, which is required to delete or update it.
#' @return
#' A new object of class [`QueueMessage`].
peek_message=function() {
peek_message=function()
{
opts <- list(peekonly=TRUE)
new_message(do_container_op(self, "messages", options=opts)$QueueMessage, self)
},
@ -138,7 +168,8 @@ public=list(
#' @param n How many messages to read. The maximum is 32.
#' @return
#' A list of objects of class [`QueueMessage`].
peek_messages=function(n=1) {
peek_messages=function(n=1)
{
opts <- list(peekonly=TRUE, numofmessages=n)
lapply(do_container_op(self, "messages", options=opts), new_message, queue=self)
},
@ -147,7 +178,8 @@ public=list(
#' Reads a message from the storage queue, removing it at the same time. This is equivalent to calling [`get_message`](#method-get_message) and [`delete_message`](#method-delete_message) successively.
#' @return
#' A new object of class [`QueueMessage`].
pop_message=function() {
pop_message=function()
{
msg <- self$get_message()
msg$delete()
msg
@ -158,7 +190,8 @@ public=list(
#' @param n How many messages to read. The maximum is 32.
#' @return
#' A list of objects of class [`QueueMessage`].
pop_messages=function(n=1) {
pop_messages=function(n=1)
{
msgs <- self$get_messages(n)
lapply(msgs, function(msg) msg$delete())
msgs

11
R/metadata.R Normal file
Просмотреть файл

@ -0,0 +1,11 @@
#' @export
get_storage_metadata.StorageQueue <- function(object, ...)
{
object$get_metadata()
}
#' @export
set_storage_metadata.StorageQueue <- function(object, ..., keep_existing=TRUE)
{
object$set_metadata(..., keep_existing=keep_existing)
}

Просмотреть файл

@ -19,7 +19,7 @@ qu2 <- create_storage_queue(endp, "myqueue2")
delete_storage_queue(qu2)
```
The queue object exposes methods for getting (reading), peeking, popping (reading and deleting) and putting (writing) messages.
The queue object exposes methods for getting (reading), peeking, deleting, updating, popping (reading and deleting) and putting (writing) messages:
```r
qu$put_message("Hello queue")
@ -27,11 +27,22 @@ msg <- qu$get_message()
msg$text
## [1] "Hello queue"
# get several messages at once
qu$get_messages(n=30)
```
The message object exposes methods for deleting and updating the message.
The message object exposes methods for deleting and updating the message:
```r
msg$update(visibility_timeout=30, text="Updated message")
msg$delete()
```
You can get and set metadata for a queue with the AzureStor `get/set_storage_metadata` generics:
```r
get_storage_metadata(qu)
set_storage_metadata(qu, name1="value1", name2="value2")
```

Просмотреть файл

@ -34,7 +34,7 @@ msg$delete()
\item{\code{text}}{The message text.}
\item{\code{receipt}}{A pop receipt. This is present if the message was obtained by means other than \link[=StorageQueue#method-peek_message]{peeking}, and is required for updating or deleting the message.}
\item{\code{receipt}}{A pop receipt. This is present if the message was obtained by means other than \link[=StorageQueue]{peeking}, and is required for updating or deleting the message.}
\item{\code{next_visible_time}}{The time when this message will be next visible.}
@ -86,7 +86,7 @@ Deletes this message from the queue.
\subsection{Method \code{update()}}{
Updates this message in the queue.
This operation can be used to continually extend the invisibility of a queue message. This functionality can be useful if you want a worker role to "lease" a message. For example, if a worker role calls \code{\link[=StorageQueue#method-get_messages]{get_messages}} and recognizes that it needs more time to process a message, it can continually extend the message's invisibility until it is processed. If the worker role were to fail during processing, eventually the message would become visible again and another worker role could process it.
This operation can be used to continually extend the invisibility of a queue message. This functionality can be useful if you want a worker role to "lease" a message. For example, if a worker role calls \code{\link[=StorageQueue]{get_messages}} and recognizes that it needs more time to process a message, it can continually extend the message's invisibility until it is processed. If the worker role were to fail during processing, eventually the message would become visible again and another worker role could process it.
\subsection{Usage}{
\if{html}{\out{<div class="r">}}\preformatted{QueueMessage$update(visibility_timeout, text = self$text)}\if{html}{\out{</div>}}
}

Просмотреть файл

@ -69,6 +69,8 @@ AzureRMR::delete_pool()
\item \href{#method-create}{\code{StorageQueue$create()}}
\item \href{#method-delete}{\code{StorageQueue$delete()}}
\item \href{#method-clear}{\code{StorageQueue$clear()}}
\item \href{#method-get_metadata}{\code{StorageQueue$get_metadata()}}
\item \href{#method-set_metadata}{\code{StorageQueue$set_metadata()}}
\item \href{#method-get_message}{\code{StorageQueue$get_message()}}
\item \href{#method-get_messages}{\code{StorageQueue$get_messages()}}
\item \href{#method-peek_message}{\code{StorageQueue$peek_message()}}
@ -138,6 +140,41 @@ Clears (deletes) all messages in this storage queue.
\if{html}{\out{<div class="r">}}\preformatted{StorageQueue$clear()}\if{html}{\out{</div>}}
}
}
\if{html}{\out{<hr>}}
\if{html}{\out{<a id="method-get_metadata"></a>}}
\if{latex}{\out{\hypertarget{method-get_metadata}{}}}
\subsection{Method \code{get_metadata()}}{
Retrieves user-defined metadata for the queue.
\subsection{Usage}{
\if{html}{\out{<div class="r">}}\preformatted{StorageQueue$get_metadata()}\if{html}{\out{</div>}}
}
\subsection{Returns}{
A named list of metadata properties.
}
}
\if{html}{\out{<hr>}}
\if{html}{\out{<a id="method-set_metadata"></a>}}
\if{latex}{\out{\hypertarget{method-set_metadata}{}}}
\subsection{Method \code{set_metadata()}}{
Sets user-defined metadata for the queue.
\subsection{Usage}{
\if{html}{\out{<div class="r">}}\preformatted{StorageQueue$set_metadata(..., keep_existing = TRUE)}\if{html}{\out{</div>}}
}
\subsection{Arguments}{
\if{html}{\out{<div class="arguments">}}
\describe{
\item{\code{...}}{Name-value pairs to set as metadata.}
\item{\code{keep_existing}}{Whether to retain existing metadata information.}
}
\if{html}{\out{</div>}}
}
\subsection{Returns}{
A named list of metadata properties, invisibly.
}
}
\if{html}{\out{<hr>}}
\if{html}{\out{<a id="method-get_message"></a>}}

Просмотреть файл

@ -0,0 +1,51 @@
context("Metadata")
tenant <- Sys.getenv("AZ_TEST_TENANT_ID")
app <- Sys.getenv("AZ_TEST_APP_ID")
password <- Sys.getenv("AZ_TEST_PASSWORD")
subscription <- Sys.getenv("AZ_TEST_SUBSCRIPTION")
if(tenant == "" || app == "" || password == "" || subscription == "")
skip("Authentication tests skipped: ARM credentials not set")
rgname <- Sys.getenv("AZ_TEST_STORAGE_RG")
storname <- Sys.getenv("AZ_TEST_STORAGE_HNS")
if(rgname == "" || storname == "")
skip("Queue client tests skipped: resource names not set")
sub <- AzureRMR::az_rm$new(tenant=tenant, app=app, password=password)$get_subscription(subscription)
stor <- sub$get_resource_group(rgname)$get_storage_account(storname)
options(azure_storage_progress_bar=FALSE)
qu <- stor$get_queue_endpoint()
test_that("Metadata set/get works",
{
sq <- create_storage_queue(qu, make_name())
expect_is(sq, "StorageQueue")
meta <- sq$get_metadata()
expect_true(length(meta) == 0)
meta2 <- get_storage_metadata(sq)
expect_true(length(meta2) == 0)
set_storage_metadata(sq, name1="value1")
meta3 <- get_storage_metadata(sq)
expect_identical(meta3, list(name1="value1"))
sq$set_metadata(name2="value2")
meta4 <- sq$get_metadata()
expect_identical(meta4, list(name1="value1", name2="value2"))
set_storage_metadata(sq, name3="value3", keep_existing=FALSE)
meta5 <- get_storage_metadata(sq)
expect_identical(meta5, list(name3="value3"))
})
teardown({
lst <- list_storage_queues(qu)
lapply(lst, delete_storage_queue, confirm=FALSE)
})