This commit is contained in:
Hong Ooi 2020-04-06 10:41:47 +10:00
Родитель dd988199db
Коммит d61e0ad944
8 изменённых файлов: 498 добавлений и 8 удалений

Просмотреть файл

@ -14,7 +14,7 @@ Depends:
Imports:
utils,
AzureRMR (>= 2.0.0),
AzureStor (>= 3.1.1),
AzureStor (>= 3.0.0),
openssl,
xml2,
httr

Просмотреть файл

@ -11,6 +11,8 @@ S3method(list_storage_queues,character)
S3method(list_storage_queues,queue_endpoint)
S3method(storage_queue,character)
S3method(storage_queue,queue_endpoint)
export(QueueMessage)
export(StorageQueue)
export(create_storage_queue)
export(delete_storage_queue)
export(list_storage_queues)

Просмотреть файл

@ -1,22 +1,37 @@
#' R6 class representing an Azure storage queue
#' @export
StorageQueue <- R6::R6Class("StorageQueue",
public=list(
#' @field endpoint A queue endpoint object. This contains the account and authentication information for the queue.
#' @field name The name of the queue.
endpoint=NULL,
name=NULL,
initialize=function(endpoint, name, ...)
#' @description
#' Initialize the queue. Rather than calling this directly, you should use one of the [`storage_queue`], [`list_storage_queues`] or [`create_storage_queue`] functions.
#' @param endpoint An endpoint object.
#' @param name The name of the queue.
#' @details
#' Initializing this R6 object does not touch the server. If a queue of the given name does not already exist, it has to be created by calling the `create` method.
initialize=function(endpoint, name)
{
self$endpoint <- endpoint
self$name <- name
},
#' @description
#' Creates a storage queue in Azure, using the storage endpoint and name stored in this R6 object.
create=function()
{
do_container_op(self, http_verb="PUT")
invisible(self)
},
#' @description
#' Deletes this storage queue in Azure.
#' @param confirm Whether to ask for confirmation before deleting.
delete=function(confirm=TRUE)
{
if(!delete_confirmed(confirm, paste0(self$endpoint$url, name), "queue"))
@ -26,43 +41,77 @@ public=list(
invisible(self)
},
#' @description
#' Clears (deletes) all messages in this storage queue.
clear=function()
{
do_container_op(self, "messages", http_verb="DELETE")
},
read_message=function() {
#' @description
#' Reads a message from the front of the storage queue. The message is then marked as read, but must still be deleted manually.
#' @return
#' A new object of class [`QueueMessage`].
get_message=function() {
new_message(do_container_op(self, "messages")$QueueMessage, self)
},
read_messages=function(n=1) {
#' @description
#' Reads several messages at once from the front of the storage queue. The messages are marked as read, but must still be deleted manually.
#' @param n How many messages to read. The maximum is 32.
#' @return
#' A list of objects of class [`QueueMessage`].
get_messages=function(n=1) {
opts <- list(numofmessages=n)
lapply(do_container_op(self, "messages", options=opts), new_message, queue=self)
},
#' @description
#' Reads a message from the storage queue, but does not mark it as read.
#' @return
#' A new object of class [`QueueMessage`].
peek_message=function() {
opts <- list(peekonly=TRUE)
new_message(do_container_op(self, "messages", options=opts)$QueueMessage, self)
},
#' @description
#' Reads several messages at once from the storage queue, without marking them as read.
#' @param n How many messages to read. The maximum is 32.
#' @return
#' A list of objects of class [`QueueMessage`].
peek_messages=function(n=1) {
opts <- list(peekonly=TRUE, numofmessages=n)
lapply(do_container_op(self, "messages", options=opts), new_message, queue=self)
},
#' @description
#' Reads a message from the storage queue, and then deletes it.
#' @return
#' A new object of class [`QueueMessage`].
pop_message=function() {
msg <- self$read_message()
msg <- self$get_message()
msg$delete()
msg
},
#' @description
#' Reads several messages at once from the storage queue, and then deletes them.
#' @param n How many messages to read. The maximum is 32.
#' @return
#' A list of objects of class [`QueueMessage`].
pop_messages=function(n=1) {
msgs <- self$read_messages(n)
msgs <- self$get_messages(n)
lapply(msgs, function(msg) msg$delete())
msgs
},
write_message=function(text, visibility_timeout=NULL, time_to_live=NULL)
#' @description
#' Writes a message to the back of the message queue.
#' @param text The message text, either a raw or character vector. If a raw vector, it is base64-encoded, and if a character vector, it is pasted into a single string before being sent to the queue.
#' @param visibility_timeout Optional visibility timeout after being read, in seconds.
#' @param time_to_live Optional message time-to-live, in seconds. The default is 7 days.
put_message=function(text, visibility_timeout=NULL, time_to_live=NULL)
{
text <- if(is.raw(text))
openssl::base64_encode(text)
@ -70,7 +119,11 @@ public=list(
paste0(text, collapse="\n")
else stop("Message text must be raw or character", call.=FALSE)
opts <- list(visibilitytimeout=visibility_timeout, messagettl=time_to_live)
opts <- list()
if(!is.null(visibility_timeout))
opts <- c(opts, visibilitytimeout=visibility_timeout)
if(!is.null(time_to_live))
opts <- c(opts, messagettl=time_to_live)
body <- paste0("<QueueMessage><MessageText>", text, "</MessageText></QueueMessage>")
hdrs <- list(`content-length`=sprintf("%.0f", nchar(body)))

Просмотреть файл

@ -1,3 +1,29 @@
#' Create a queue endpoint object
#'
#' @param endpoint The URL (hostname) for the endpoint, of the form `http[s]://{account-name}.queue.{core-host-name}`. On the public Azure cloud, endpoints will be of the form `https://{account-name}.queue.core.windows.net`.
#' @param key The access key for the storage account.
#' @param token An Azure Active Directory (AAD) authentication token. This can be either a string, or an object of class AzureToken created by [AzureRMR::get_azure_token]. The latter is the recommended way of doing it, as it allows for automatic refreshing of expired tokens.
#' @param sas A shared access signature (SAS) for the account.
#' @param api_version The storage API version to use when interacting with the host. Defaults to `"2019-07-07"`.
#'
#' @details
#' This is the queue storage counterpart to the endpoint functions defined in the AzureStor package.
#' @seealso
#' [`AzureStor::storage_endpoint`], [`AzureStor::blob_endpoint`]
#' @examples
#' \dontrun{
#'
#' # obtaining an endpoint from the storage account resource object
#' stor <- AzureRMR::get_azure_login()$
#' get_subscription("sub_id")$
#' get_resource_group("rgname")$
#' get_storage_account("mystorage")
#' stor$get_queue_endpoint()
#'
#' # creating an endpoint standalone
#' queue_endpoint("https://mystorage.queue.core.windows.net/", key="access_key")
#'
#' }
#' @export
queue_endpoint <- function(endpoint, key=NULL, token=NULL, sas=NULL,
api_version=getOption("azure_storage_api_version"))

Просмотреть файл

@ -5,10 +5,20 @@ new_message <- function(message, queue)
}
#' R6 class representing a message from an Azure storage queue
#' @export
QueueMessage <- R6::R6Class("QueueMessage",
public=list(
#' @field queue The queue this message is from, an object of class [`StorageQueue`]
#' @field id The message ID.
#' @field insertion_time The message insertion (creation) time.
#' @field expiry_time The message expiration time.
#' @field text The message text.
#' @field receipt A pop receipt. This is populated if the message was retrieved via a `get_message` or `update` call, and is necessary for deleting or further updating the message.
#' @field next_visible_time The time when this message will be next visible.
#' @field dequeue_count The number of times this message has been read.
queue=NULL,
id=NULL,
insertion_time=NULL,
@ -18,6 +28,10 @@ public=list(
next_visible_time=NULL,
dequeue_count=NULL,
#' @description
#' Creates a new message object. Rather than calling the `initialize` method manually, objects of this class should be created via the methods exposed by the [`StorageQueue`] object.
#' @param message Details about the message.
#' @param queue Object of class `StorageQueue.
initialize=function(message, queue)
{
self$queue <- queue
@ -30,6 +44,8 @@ public=list(
self$dequeue_count <- message$DequeueCount
},
#' @description
#' Deletes this message from the queue.
delete=function()
{
private$check_receipt()
@ -38,6 +54,10 @@ public=list(
invisible(NULL)
},
#' @description
#' Updates this message in the queue.
#' @param visibility_timeout The new visibility timeout (time to when the message will again be visible).
#' @param text Optionally, new message text.
update=function(visibility_timeout, text=self$text)
{
private$check_receipt()

106
man/QueueMessage.Rd Normal file
Просмотреть файл

@ -0,0 +1,106 @@
% Generated by roxygen2: do not edit by hand
% Please edit documentation in R/message.R
\name{QueueMessage}
\alias{QueueMessage}
\title{R6 class representing a message from an Azure storage queue}
\description{
R6 class representing a message from an Azure storage queue
R6 class representing a message from an Azure storage queue
}
\section{Public fields}{
\if{html}{\out{<div class="r6-fields">}}
\describe{
\item{\code{queue}}{The queue this message is from, an object of class \code{\link{StorageQueue}}}
\item{\code{id}}{The message ID.}
\item{\code{insertion_time}}{The message insertion (creation) time.}
\item{\code{expiry_time}}{The message expiration time.}
\item{\code{text}}{The message text.}
\item{\code{receipt}}{A pop receipt. This is populated if the message was retrieved via a \code{get_message} or \code{update} call, and is necessary for deleting or further updating the message.}
\item{\code{next_visible_time}}{The time when this message will be next visible.}
\item{\code{dequeue_count}}{The number of times this message has been read.}
}
\if{html}{\out{</div>}}
}
\section{Methods}{
\subsection{Public methods}{
\itemize{
\item \href{#method-new}{\code{QueueMessage$new()}}
\item \href{#method-delete}{\code{QueueMessage$delete()}}
\item \href{#method-update}{\code{QueueMessage$update()}}
\item \href{#method-clone}{\code{QueueMessage$clone()}}
}
}
\if{html}{\out{<hr>}}
\if{html}{\out{<a id="method-new"></a>}}
\if{latex}{\out{\hypertarget{method-new}{}}}
\subsection{Method \code{new()}}{
Creates a new message object. Rather than calling the \code{initialize} method manually, objects of this class should be created via the methods exposed by the \code{\link{StorageQueue}} object.
\subsection{Usage}{
\if{html}{\out{<div class="r">}}\preformatted{QueueMessage$new(message, queue)}\if{html}{\out{</div>}}
}
\subsection{Arguments}{
\if{html}{\out{<div class="arguments">}}
\describe{
\item{\code{message}}{Details about the message.}
\item{\code{queue}}{Object of class `StorageQueue.}
}
\if{html}{\out{</div>}}
}
}
\if{html}{\out{<hr>}}
\if{html}{\out{<a id="method-delete"></a>}}
\if{latex}{\out{\hypertarget{method-delete}{}}}
\subsection{Method \code{delete()}}{
Deletes this message from the queue.
\subsection{Usage}{
\if{html}{\out{<div class="r">}}\preformatted{QueueMessage$delete()}\if{html}{\out{</div>}}
}
}
\if{html}{\out{<hr>}}
\if{html}{\out{<a id="method-update"></a>}}
\if{latex}{\out{\hypertarget{method-update}{}}}
\subsection{Method \code{update()}}{
Updates this message in the queue.
\subsection{Usage}{
\if{html}{\out{<div class="r">}}\preformatted{QueueMessage$update(visibility_timeout, text = self$text)}\if{html}{\out{</div>}}
}
\subsection{Arguments}{
\if{html}{\out{<div class="arguments">}}
\describe{
\item{\code{visibility_timeout}}{The new visibility timeout (time to when the message will again be visible).}
\item{\code{text}}{Optionally, new message text.}
}
\if{html}{\out{</div>}}
}
}
\if{html}{\out{<hr>}}
\if{html}{\out{<a id="method-clone"></a>}}
\if{latex}{\out{\hypertarget{method-clone}{}}}
\subsection{Method \code{clone()}}{
The objects of this class are cloneable with this method.
\subsection{Usage}{
\if{html}{\out{<div class="r">}}\preformatted{QueueMessage$clone(deep = FALSE)}\if{html}{\out{</div>}}
}
\subsection{Arguments}{
\if{html}{\out{<div class="arguments">}}
\describe{
\item{\code{deep}}{Whether to make a deep clone.}
}
\if{html}{\out{</div>}}
}
}
}

234
man/StorageQueue.Rd Normal file
Просмотреть файл

@ -0,0 +1,234 @@
% Generated by roxygen2: do not edit by hand
% Please edit documentation in R/StorageQueue.R
\name{StorageQueue}
\alias{StorageQueue}
\title{R6 class representing an Azure storage queue}
\description{
R6 class representing an Azure storage queue
R6 class representing an Azure storage queue
}
\section{Public fields}{
\if{html}{\out{<div class="r6-fields">}}
\describe{
\item{\code{endpoint}}{A queue endpoint object. This contains the account and authentication information for the queue.}
\item{\code{name}}{The name of the queue.}
}
\if{html}{\out{</div>}}
}
\section{Methods}{
\subsection{Public methods}{
\itemize{
\item \href{#method-new}{\code{StorageQueue$new()}}
\item \href{#method-create}{\code{StorageQueue$create()}}
\item \href{#method-delete}{\code{StorageQueue$delete()}}
\item \href{#method-clear}{\code{StorageQueue$clear()}}
\item \href{#method-get_message}{\code{StorageQueue$get_message()}}
\item \href{#method-get_messages}{\code{StorageQueue$get_messages()}}
\item \href{#method-peek_message}{\code{StorageQueue$peek_message()}}
\item \href{#method-peek_messages}{\code{StorageQueue$peek_messages()}}
\item \href{#method-pop_message}{\code{StorageQueue$pop_message()}}
\item \href{#method-pop_messages}{\code{StorageQueue$pop_messages()}}
\item \href{#method-put_message}{\code{StorageQueue$put_message()}}
\item \href{#method-clone}{\code{StorageQueue$clone()}}
}
}
\if{html}{\out{<hr>}}
\if{html}{\out{<a id="method-new"></a>}}
\if{latex}{\out{\hypertarget{method-new}{}}}
\subsection{Method \code{new()}}{
Initialize the queue. Rather than calling this directly, you should use one of the \code{\link{storage_queue}}, \code{\link{list_storage_queues}} or \code{\link{create_storage_queue}} functions.
\subsection{Usage}{
\if{html}{\out{<div class="r">}}\preformatted{StorageQueue$new(endpoint, name)}\if{html}{\out{</div>}}
}
\subsection{Arguments}{
\if{html}{\out{<div class="arguments">}}
\describe{
\item{\code{endpoint}}{An endpoint object.}
\item{\code{name}}{The name of the queue.}
}
\if{html}{\out{</div>}}
}
\subsection{Details}{
Initializing this R6 object does not touch the server. If a queue of the given name does not already exist, it has to be created by calling the \code{create} method.
}
}
\if{html}{\out{<hr>}}
\if{html}{\out{<a id="method-create"></a>}}
\if{latex}{\out{\hypertarget{method-create}{}}}
\subsection{Method \code{create()}}{
Creates a storage queue in Azure, using the storage endpoint and name stored in this R6 object.
\subsection{Usage}{
\if{html}{\out{<div class="r">}}\preformatted{StorageQueue$create()}\if{html}{\out{</div>}}
}
}
\if{html}{\out{<hr>}}
\if{html}{\out{<a id="method-delete"></a>}}
\if{latex}{\out{\hypertarget{method-delete}{}}}
\subsection{Method \code{delete()}}{
Deletes this storage queue in Azure.
\subsection{Usage}{
\if{html}{\out{<div class="r">}}\preformatted{StorageQueue$delete(confirm = TRUE)}\if{html}{\out{</div>}}
}
\subsection{Arguments}{
\if{html}{\out{<div class="arguments">}}
\describe{
\item{\code{confirm}}{Whether to ask for confirmation before deleting.}
}
\if{html}{\out{</div>}}
}
}
\if{html}{\out{<hr>}}
\if{html}{\out{<a id="method-clear"></a>}}
\if{latex}{\out{\hypertarget{method-clear}{}}}
\subsection{Method \code{clear()}}{
Clears (deletes) all messages in this storage queue.
\subsection{Usage}{
\if{html}{\out{<div class="r">}}\preformatted{StorageQueue$clear()}\if{html}{\out{</div>}}
}
}
\if{html}{\out{<hr>}}
\if{html}{\out{<a id="method-get_message"></a>}}
\if{latex}{\out{\hypertarget{method-get_message}{}}}
\subsection{Method \code{get_message()}}{
Reads a message from the front of the storage queue. The message is then marked as read, but must still be deleted manually.
\subsection{Usage}{
\if{html}{\out{<div class="r">}}\preformatted{StorageQueue$get_message()}\if{html}{\out{</div>}}
}
\subsection{Returns}{
A new object of class \code{\link{QueueMessage}}.
}
}
\if{html}{\out{<hr>}}
\if{html}{\out{<a id="method-get_messages"></a>}}
\if{latex}{\out{\hypertarget{method-get_messages}{}}}
\subsection{Method \code{get_messages()}}{
Reads several messages at once from the front of the storage queue. The messages are marked as read, but must still be deleted manually.
\subsection{Usage}{
\if{html}{\out{<div class="r">}}\preformatted{StorageQueue$get_messages(n = 1)}\if{html}{\out{</div>}}
}
\subsection{Arguments}{
\if{html}{\out{<div class="arguments">}}
\describe{
\item{\code{n}}{How many messages to read. The maximum is 32.}
}
\if{html}{\out{</div>}}
}
\subsection{Returns}{
A list of objects of class \code{\link{QueueMessage}}.
}
}
\if{html}{\out{<hr>}}
\if{html}{\out{<a id="method-peek_message"></a>}}
\if{latex}{\out{\hypertarget{method-peek_message}{}}}
\subsection{Method \code{peek_message()}}{
Reads a message from the storage queue, but does not mark it as read.
\subsection{Usage}{
\if{html}{\out{<div class="r">}}\preformatted{StorageQueue$peek_message()}\if{html}{\out{</div>}}
}
\subsection{Returns}{
A new object of class \code{\link{QueueMessage}}.
}
}
\if{html}{\out{<hr>}}
\if{html}{\out{<a id="method-peek_messages"></a>}}
\if{latex}{\out{\hypertarget{method-peek_messages}{}}}
\subsection{Method \code{peek_messages()}}{
Reads several messages at once from the storage queue, without marking them as read.
\subsection{Usage}{
\if{html}{\out{<div class="r">}}\preformatted{StorageQueue$peek_messages(n = 1)}\if{html}{\out{</div>}}
}
\subsection{Arguments}{
\if{html}{\out{<div class="arguments">}}
\describe{
\item{\code{n}}{How many messages to read. The maximum is 32.}
}
\if{html}{\out{</div>}}
}
\subsection{Returns}{
A list of objects of class \code{\link{QueueMessage}}.
}
}
\if{html}{\out{<hr>}}
\if{html}{\out{<a id="method-pop_message"></a>}}
\if{latex}{\out{\hypertarget{method-pop_message}{}}}
\subsection{Method \code{pop_message()}}{
Reads a message from the storage queue, and then deletes it.
\subsection{Usage}{
\if{html}{\out{<div class="r">}}\preformatted{StorageQueue$pop_message()}\if{html}{\out{</div>}}
}
\subsection{Returns}{
A new object of class \code{\link{QueueMessage}}.
}
}
\if{html}{\out{<hr>}}
\if{html}{\out{<a id="method-pop_messages"></a>}}
\if{latex}{\out{\hypertarget{method-pop_messages}{}}}
\subsection{Method \code{pop_messages()}}{
Reads several messages at once from the storage queue, and then deletes them.
\subsection{Usage}{
\if{html}{\out{<div class="r">}}\preformatted{StorageQueue$pop_messages(n = 1)}\if{html}{\out{</div>}}
}
\subsection{Arguments}{
\if{html}{\out{<div class="arguments">}}
\describe{
\item{\code{n}}{How many messages to read. The maximum is 32.}
}
\if{html}{\out{</div>}}
}
\subsection{Returns}{
A list of objects of class \code{\link{QueueMessage}}.
}
}
\if{html}{\out{<hr>}}
\if{html}{\out{<a id="method-put_message"></a>}}
\if{latex}{\out{\hypertarget{method-put_message}{}}}
\subsection{Method \code{put_message()}}{
Writes a message to the back of the message queue.
\subsection{Usage}{
\if{html}{\out{<div class="r">}}\preformatted{StorageQueue$put_message(text, visibility_timeout = NULL, time_to_live = NULL)}\if{html}{\out{</div>}}
}
\subsection{Arguments}{
\if{html}{\out{<div class="arguments">}}
\describe{
\item{\code{text}}{The message text, either a raw or character vector. If a raw vector, it is base64-encoded, and if a character vector, it is pasted into a single string before being sent to the queue.}
\item{\code{visibility_timeout}}{Optional visibility timeout after being read, in seconds.}
\item{\code{time_to_live}}{Optional message time-to-live, in seconds. The default is 7 days.}
}
\if{html}{\out{</div>}}
}
}
\if{html}{\out{<hr>}}
\if{html}{\out{<a id="method-clone"></a>}}
\if{latex}{\out{\hypertarget{method-clone}{}}}
\subsection{Method \code{clone()}}{
The objects of this class are cloneable with this method.
\subsection{Usage}{
\if{html}{\out{<div class="r">}}\preformatted{StorageQueue$clone(deep = FALSE)}\if{html}{\out{</div>}}
}
\subsection{Arguments}{
\if{html}{\out{<div class="arguments">}}
\describe{
\item{\code{deep}}{Whether to make a deep clone.}
}
\if{html}{\out{</div>}}
}
}
}

49
man/queue_endpoint.Rd Normal file
Просмотреть файл

@ -0,0 +1,49 @@
% Generated by roxygen2: do not edit by hand
% Please edit documentation in R/endpoint.R
\name{queue_endpoint}
\alias{queue_endpoint}
\title{Create a queue endpoint object}
\usage{
queue_endpoint(
endpoint,
key = NULL,
token = NULL,
sas = NULL,
api_version = getOption("azure_storage_api_version")
)
}
\arguments{
\item{endpoint}{The URL (hostname) for the endpoint, of the form \verb{http[s]://\{account-name\}.queue.\{core-host-name\}}. On the public Azure cloud, endpoints will be of the form \verb{https://\{account-name\}.queue.core.windows.net}.}
\item{key}{The access key for the storage account.}
\item{token}{An Azure Active Directory (AAD) authentication token. This can be either a string, or an object of class AzureToken created by \link[AzureRMR:get_azure_token]{AzureRMR::get_azure_token}. The latter is the recommended way of doing it, as it allows for automatic refreshing of expired tokens.}
\item{sas}{A shared access signature (SAS) for the account.}
\item{api_version}{The storage API version to use when interacting with the host. Defaults to \code{"2019-07-07"}.}
}
\description{
Create a queue endpoint object
}
\details{
This is the queue storage counterpart to the endpoint functions defined in the AzureStor package.
}
\examples{
\dontrun{
# obtaining an endpoint from the storage account resource object
stor <- AzureRMR::get_azure_login()$
get_subscription("sub_id")$
get_resource_group("rgname")$
get_storage_account("mystorage")
stor$get_queue_endpoint()
# creating an endpoint standalone
queue_endpoint("https://mystorage.queue.core.windows.net/", key="access_key")
}
}
\seealso{
\code{\link[AzureStor:storage_endpoint]{AzureStor::storage_endpoint}}, \code{\link[AzureStor:blob_endpoint]{AzureStor::blob_endpoint}}
}