Refactor and generalize state machine-based Queue Request Processor

- Split concepts of Agent, Scheduler and Queuing interface
- Rename RequestAction to ExecutionInstruction
- Add in-memory based implementation of Queuing API
- Add Azure Queue-based implementation of Queuing API
- Add Fibonnaci in-memory test for QueueServiceHandler
William Blum 2019-07-25 00:32:04 -07:00
Родитель c9dcf11eec
Коммит c7ed052541
9 изменённых файлов: 836 добавлений и 348 удалений

@ -2,5 +2,6 @@
<Project ToolsVersion="15.0" DefaultTargets="Build" xmlns="">

@ -137,8 +137,8 @@ type AzureStorageTests () =
[<Fact(Skip="Needs Azure Storage key configuration")>]
member x.``Azure Table implementation of Agent Storage interface is atomic``() =
async {
let storageAccountName, storageAccountKey =
failwithf "Please set azure storage account name and key"
let storageAccountConnectionString =
failwithf "Please set azure storage account connectiong string before running this test"
let tableName = "AgentJoinStorageTable"
@ -148,13 +148,11 @@ type AzureStorageTests () =
timestamp = System.DateTimeOffset.UtcNow
let creds = Table.StorageCredentials(storageAccountName, storageAccountKey)
let uri = Table.StorageUri(System.Uri(sprintf "" storageAccountName))
let storageAccount = Table.CloudStorageAccount.Parse(storageAccountConnectionString)
let! storage =

@ -15,6 +15,7 @@
<Compile Include="CommunicationTests.fs" />
<Compile Include="Generators.fs" />
<Compile Include="Azure.Test.Storage.fs" />
<Compile Include="QueueServiceHandlerTests.fs" />
<Compile Include="Azure.Test.Queue.fs" />

@ -0,0 +1,297 @@
module Microsoft.FSharpLu.QueueServiceHandler.Test
open Microsoft.FSharpLu.Logging
open Microsoft.FSharpLu.StateMachineAgent
open Microsoft.FSharpLu.StateMachineAgent.QueueScheduler
open Microsoft.FSharpLu.StateMachineAgent.Agent
open Xunit
/// A request with input of type 'i and state 's
type RequestOf<'i, 's> =
header :
correlationId : string
input : 'i
state : 's
metadata : RequestMetadata
/// Context parameters passed to every request handler
type HandlerContext<'QueueMessage, 'Request> =
contextName : string
joinStore : Join.StorageInterface<'Request>
queue : QueueingAPI<'QueueMessage, 'Request>
queuedMessage : 'QueueMessage
let createRequestContext<'QueueMessage, 'Request>
(queue:QueueingAPI<'QueueMessage, 'Request>)
HandlerContext.contextName = "bla"
joinStore = joinStore
queue = queue
queuedMessage = message
let toScheduler (c:HandlerContext<'QueueMessage, 'Request>) embed : Agent.Scheduler<_, _> =
spawn =
spawnIn = c.queue.postIn
persist = c.queue.update c.queuedMessage
embed = embed
joinStore = c.joinStore
/// Wrapper for Agent.execute to be used in request handlers
let inline run
(context: HandlerContext<'QueueMessage, ^Request>)
(request: RequestOf<'Input,'State>)
(embed: RequestMetadata -> 'State -> ^Request)
(transition: HandlerContext<'QueueMessage, ^Request> -> 'State -> Async<Transition<'State, 'Result, ^Request>>)
: Async<Agent.ExecutionInstruction< ^Request, _>>
Agent.execute request.state request.metadata {
Agent.title = title
logger =
tags = ["foo", "bar"]
transition = transition context
maximumExpectedStateTransitionTime = System.TimeSpan.FromMinutes(15.0)
maximumInprocessSleep = System.TimeSpan.FromMinutes(5.0)
scheduler = toScheduler context embed
module Example =
type FiboStates =
| Start
| Calculate of int * int * int
| End of int
type FlipCoinStates =
| Start
| Flip
| End
/// All supported requests
type ServiceRequests =
| Fibo of RequestOf<{| i : int; requestQuota : int |}, FiboStates>
| FlipCoin of RequestOf<int, FlipCoinStates>
| Request3
| Shutdown
type ServiceQueues =
| ImportantQueue
| NotImportantQueue
let request1TestOutcome = System.Collections.Concurrent.ConcurrentDictionary<int, int>()
let request1Handler context request =
run "request1 handler" context request
(fun metadata newState -> ServiceRequests.Fibo({ request with state = newState; metadata = metadata }))
(fun context ->
| FiboStates.Start -> async {
printfn "Received request"
return Goto <| FiboStates.Calculate (request.input.i, 0, 1)
| FiboStates.Calculate (i, fibminus_1, fib) -> async {
if i > 1 then
return SleepAndGoto (System.TimeSpan.FromMilliseconds(10.0),
FiboStates.Calculate(i-1, fib, fibminus_1 + fib))
return Goto <| FiboStates.End (fib)
| FiboStates.End (result) -> async {
if not <| request1TestOutcome.TryAdd(request.input.i, result) then
failwith "Result overwrite!"
if request1TestOutcome.Count = request.input.requestQuota then
do! ServiceRequests.Shutdown
return Transition.Return (result)
let request2Handler context request =
run "request2 handler" context request
(fun metadata newState -> ServiceRequests.FlipCoin({ request with state = newState; metadata = metadata }))
(fun context ->
| FlipCoinStates.Start -> async.Return <| Goto FlipCoinStates.Flip
| FlipCoinStates.Flip -> async {
let r = System.Random(System.DateTime.UtcNow.Ticks |> int)
if r.Next() % 2 = 0 then
return Sleep <| System.TimeSpan.FromMilliseconds(10.0 * (float <| r.Next()))
return Goto <| FlipCoinStates.End
| FlipCoinStates.End ->
async.Return <| Transition.Return ()
let QueuesProcessingOptions =
SleepDurationWhenAllQueuesAreEmpty = System.TimeSpan.FromMilliseconds(10.0)
HeartBeatIntervals = System.TimeSpan.FromSeconds(1.0)
ConcurrentRequestWorkers = 10
WorkerReplacementTimeout = System.TimeSpan.FromHours(1.0)
let queuesHandlersOrderedByPriority<'QueueMessage> (cts:System.Threading.CancellationTokenSource)
:QueueProcessor<ServiceQueues,ServiceRequests,HandlerContext<'QueueMessage, ServiceRequests>> list =
let dispatch (c:HandlerContext<'QueueMessage, ServiceRequests>) (k:Continuation<ServiceRequests>) =
| ServiceRequests.Fibo r -> k.k (request1Handler c r)
| ServiceRequests.FlipCoin r -> k.k (request2Handler c r)
| ServiceRequests.Shutdown -> k.k (async { "Shutdown request received"
cts.Cancel() "Token cancelled"
return ExecutionInstruction.Completed None
| _ -> raise RejectedMessage
queueId = ServiceQueues.ImportantQueue
handler = dispatch
maxProcessTime = System.TimeSpan.FromMinutes(1.0)
messageBatchSize = 10
queueId = ServiceQueues.NotImportantQueue
handler = dispatch
maxProcessTime = System.TimeSpan.FromMinutes(1.0)
messageBatchSize = 32
module InMemorySchedulerTest =
open Example
let newInMemoryJoinStorage () : Agent.Join.StorageInterface<'m> =
let storage = System.Collections.Concurrent.ConcurrentDictionary<_, _>()
add = fun joinId joinEntry -> async {
if not <| storage.TryAdd(joinId, joinEntry) then
failwithf "Add: Failed to add %A" (joinId, joinEntry)
update = fun joinId performEntryUpdate ->
lock(storage) (fun () ->
async {
match storage.TryGetValue(joinId) with
| true, entry ->
let newEntry = performEntryUpdate entry
storage.[joinId] <- newEntry
return newEntry
| false, _ -> return failwithf "Update: Failed to retrieve data with id: %A" joinId
get = fun joinId ->
async {
match storage.TryGetValue(joinId) with
| false, _ -> return failwithf "Get: There is no request with id : %A" joinId
| true, entry -> return entry
let joinStore = newInMemoryJoinStorage ()
let queueProcessLoop
async {
let context request = createRequestContext<System.Guid, Example.ServiceRequests> joinStore request
do! QueueScheduler.processingLoopMultipleQueues<Example.ServiceQueues, Example.ServiceRequests, _, System.Guid>
(Example.queuesHandlersOrderedByPriority cts)
(fun queueId -> Map.find queueId queues)
ignore // no heartbeat
(fun _ -> []) // no tags
{new OutcomeLogger with member __.log _ = ()} // no logger
| QueueScheduler.ProcessingLoopCancelled ->
Trace.warning "Processing loop terminated by Azure."
let ``InMemoryScheduler Fibonnaci Test`` () =
async {
let importantQueue = InMemoryQueue.newQueue "inmemory" ImportantQueue
let nonImportantQueue = InMemoryQueue.newQueue "inmemory" NotImportantQueue
let queues =
ImportantQueue, importantQueue
NotImportantQueue, nonImportantQueue
] |> Map.ofSeq
let shutdownSource = new System.Threading.CancellationTokenSource()
let fibonnaci = [|0;1;1;2;3;5;8;13;21;34;55;89;144|]
for i = 1 to fibonnaci.Length-1 do
let! req = Agent.createRequest joinStore
<| ServiceRequests.Fibo(
header = {| correlationId = "Fibonnaci" |}
input = {| i = i; requestQuota = fibonnaci.Length-1|}
state = FiboStates.Start
metadata = req
do! queueProcessLoop shutdownSource queues
for kvp in request1TestOutcome do
let i = kvp.Key
Assert.Equal(request1TestOutcome.[i], fibonnaci.[i])
module AzureQueueSchedulerTest =
open Microsoft.Azure.Cosmos
open Microsoft.Azure.Storage.Queue
open Microsoft.FSharpLu
let JoinTableName = "QueueServiceTest-AgentJoinStorageTable"
let QueueNamePrefix = "QueueServiceTest"
let queueProcessLoop
(storageAccountConnectionString:string) =
async {
let! joinStore =
let azureStorage = Azure.Context.getStorageAccountFromConnectionString storageAccountConnectionString
let context request = createRequestContext<CloudQueueMessage, Example.ServiceRequests> joinStore request
let shutdownSource = new System.Threading.CancellationTokenSource()
do! QueueScheduler.processingLoopMultipleQueues<Example.ServiceQueues, Example.ServiceRequests, _, CloudQueueMessage>
(Example.queuesHandlersOrderedByPriority shutdownSource)
(AzureQueue.newQueue azureStorage QueueNamePrefix)
ignore // no heartbeat
(fun _ -> []) // no tags
{new OutcomeLogger with member __.log _ = ()} // no logger
| QueueScheduler.ProcessingLoopCancelled ->
Trace.warning "Processing loop terminated by Azure."

@ -1,91 +1,131 @@
// Copyright (c) Microsoft Corporation.
namespace Microsoft.FSharpLu.StateMachineAgent
/// A Request Service Processor where persistence is implemented using Azure Storage Queues
module AzureQueue =
/// A Request Processor with scheduling and persistence implemented using some queueing API
module QueueScheduler =
open Microsoft.Azure.Storage.Queue
open Microsoft.FSharpLu
open Microsoft.FSharpLu.Async
open Microsoft.FSharpLu.Azure.AppInsights
open Microsoft.FSharpLu.StateMachineAgent
/// Update content and increase visibility of an Azure Queue message
let update<'Q> (queue:CloudQueue) (queuedRequest:CloudQueueMessage) (m:'Q) visibility =
Microsoft.FSharpLu.Azure.Queue.updateMessage queue queuedRequest m visibility
/// Update visibility of an Azure Queue message and ensure it will be persisted at the end of the invisibility period.
/// Note: this may require to delete and re-post the same message to get around the Azure Queue message lifetime of 7 days.
let updateVisibility (queue:CloudQueue) (queuedRequest:CloudQueueMessage) visibilityTimeout =
async {
do! Microsoft.FSharpLu.Azure.Queue.increaseVisibilityTimeout queue queuedRequest visibilityTimeout
| e ->
TraceTags.error "Could not increase message visibility in Azure backend queue. Update errors are usually due to a request taking too long to process, causing the message visibility timeout to be reached, subsequently leading to two concurrent instances of the service to process the same request."
[ "queuedMessage", queuedRequest.AsString
"exception", e.ToString()]
/// Delete a request from the queue
let delete (queue:CloudQueue) (queuedRequest:CloudQueueMessage) =
async {
do! queue.DeleteMessageAsync(queuedRequest) |> Async.AwaitTask
| e ->
TraceTags.error "Could not delete message from Azure backend queue. This is usually due to a request taking too long to process, causing the message visibility timeout to be reached, subsequently leading to two concurrent instances of the service to process the same request."
[ "queuedMessage", queuedRequest.AsString
"exception", e.ToString()]
/// Execute the specified action on the specified queued request
let executeAction (queue:CloudQueue) (queuedRequest:CloudQueueMessage) = function
| Agent.RequestAction.Delete ->
delete queue queuedRequest
| Agent.RequestAction.PostponeFor visibilityTimeout ->
updateVisibility queue queuedRequest visibilityTimeout
| Agent.RequestAction.PostponeAndReplace (newRequest, visibilityTimeout) ->
update queue queuedRequest newRequest visibilityTimeout
/// Implementation of agent operations using an Azure Queue
let inline toAgentOperations< ^T> queue =
/// Queue interface implemented by the underlying queueing infrastructure
/// (e.g. Azure Queue, mailbox processor, ConcurrentQueue, ...)
type QueueingAPI<'QueueMessage, 'QueueContent> =
Agent.Operations.spawn = Microsoft.FSharpLu.Azure.Queue.postMessage< ^T> queue
Agent.Operations.spawnIn = Microsoft.FSharpLu.Azure.Queue.schedulePostMessage< ^T> queue
/// queue name
queueName : string
/// update content and visibility of a queue message
update : 'QueueMessage -> 'QueueContent -> System.TimeSpan -> Async<unit>
/// update visibility of a queue message
updateVisibility : 'QueueMessage -> System.TimeSpan -> Async<unit>
/// delete a queue message
@ -106,116 +146,38 @@ module AzureQueue =
/// serialize a queue message to string
serialize : 'QueueMessage -> string
/// deserialize a string to a queue message content
tryDeserialize : string -> Result<'QueueContent, string>
/// get queue message insertion time
insertionTime : 'QueueMessage -> System.DateTimeOffset
/// try to pop specified number of messages from the queue
tryGetMessageBatch : int -> System.TimeSpan -> Async<'QueueMessage list>
/// post a new message onto the queue
post : 'QueueContent -> Async<unit>
/// post a new message onto the queue with the specified visibility
postIn : 'QueueContent -> System.TimeSpan -> Async<unit>
/// Handler for requests queued on an Azure Storage queue
/// 'R is the type of requests handled
/// 'C is a custom context parameter that will get passed to every request handler
type Handler<'C, 'R> = 'C -> 'R -> Async<Agent.RequestAction<'R>>
/// Info returned for a request successfully processed
/// 'Request is the type of request
/// 'Result is the request result type
/// 'QueueMessage type representing a queue message by the underlying queueing API
type ProcessedRequestInfo<'Request, 'Result, 'QueueMessage> =
/// status of the request execution
executionStatus : Agent.ExecutionInstruction<'Request,'Result>
/// the request, if it was properly parsed
request : 'Request
/// date/time when the request was posted on the queu
requestInsertionTime : System.DateTime
/// Time taken to process the request
processingTime : System.TimeSpan
/// queue message object from the underlying queue API
queuedMessage : 'QueueMessage
/// Defines a processor for request persisted on an Azure Storage Queue.
/// 'Q defines an enumeration of possible queue Ids
/// Info returned for a request that could not be processed
/// 'Request is the type of request
/// 'QueueMessage type representing a queue message by the underlying queueing API
type FailedRequestInfo<'Request, 'QueueMessage> =
/// A description of the error that occurred
errorMessage : string
/// More details about the error
details : string
/// processed request
request : 'Request option
/// date/time when the request was posted on the queu
requestInsertionTime : System.DateTime
/// Time taken to process the request
processingTime : System.TimeSpan
/// queue message object from the underlying queue API
queuedMessage : 'QueueMessage
/// Outcome of the processing of a request
/// 'Request is the type of request
/// 'Result is the request result type
/// 'QueueMessage type representing a queue message by the underlying queueing API
type RequestOutcome<'Request, 'Result, 'QueueMessage> =
| Processed of ProcessedRequestInfo<'Request, 'Result, 'QueueMessage>
| Error of FailedRequestInfo<'Request, 'QueueMessage>
| Rejected of FailedRequestInfo<'Request, 'QueueMessage>
| ExceptionThrown of System.Exception * FailedRequestInfo<'Request, 'QueueMessage>
/// A continuation that handles processing of an agent execution status.
/// Defining this as a class type rather than let-binding is a "trick"
/// to prevent the F# typecheker to unify the ghost generic type type parameter 'T
/// against the first occurrence of k.
/// This means that Continuation.k can be used in the same function
/// for different instances of the type parameter 'T.
type Continuation<'Request> =
abstract k<'Request, 'Result> : Async<Agent.ExecutionInstruction<'Request, 'Result>> -> Async<unit>
/// Loggger interface for request outcome
type OutcomeLogger =
abstract log : RequestOutcome<'Request, 'Result, 'QueueMessage> -> unit
/// Handler for queued requests
/// 'Request is the type of queued request
/// 'Context is a custom context parameter that will get passed to every request handler
type Handler<'Context, 'Request> = 'Context -> Continuation<'Request> -> 'Request -> Async<unit>
/// Defines a processor consuming queued request.
/// 'QId defines an enumeration of possible queue Ids
/// 'R is the type of requests handled by the processor
/// 'C is a custom context parameter that will get passed to every request handler
type QueueProcessor<'Q, 'R, 'C> =
type QueueProcessor<'QId, 'Request, 'Context> =
/// Name of the queue
queueId : 'Q
queueId : 'QId
/// Function handling a request received on the Azure Queue
handler : Handler<'C, 'R>
/// Function handling a request received on a Queue
handler : Handler<'Context, 'Request>
/// Initial value of the maximum expected processing time.
/// After this timeout expires Azure automatically reposts the message onto the queue.
/// Note for Azure queues: After this timeout expires Azure automatically reposts the message onto the queue.
/// We want this value to be high enough to cover requests with long processing time (e.g. creating a VM in Azure)
/// but not too high to avoid delays in case a real backend failure occurs.
/// Request handlers can dynamically override this value if more time is needed to process a request.
/// A request handler can dynamically override this value if more time is needed to process a request.
/// State machine agents (QueueRequestHandling module) facilitate this by automatically updating the expected
/// processing time on each transition of the state machine.
maxProcessTime : System.TimeSpan
/// Number of messages to pull at once from the Azure queue
/// Keep this number low for maxium utilization of concurrency between worker role instances
/// Number of messages to pull at once from the a queue
/// For Azure Queues, keep this number low for maxium utilization of concurrency between worker role instances
/// Note: maximum allowed is 32 (if greater you'll get a warning followed by an obscure exception
/// from Azure, See
messageBatchSize : int
@ -106,116 +146,38 @@ module AzureQueue =
WorkerReplacementTimeout : System.TimeSpan
/// Attempt to retrieve one batch of message from the Azure queue
let tryGetMessageBatch (queue:CloudQueue) (processor:QueueProcessor<'Q, 'R, 'C>) =
async {
let visibilityTimeout = System.TimeSpan.FromSeconds(float processor.messageBatchSize * processor.maxProcessTime.TotalSeconds)
|> Microsoft.FSharpLu.Azure.Queue.softValidateVisibilityTimeout
let! messages = queue.GetMessagesAsync(processor.messageBatchSize, System.Nullable visibilityTimeout, null, null) |> Async.AwaitTask
return messages |> Seq.toList
/// Initialize the queue-based communication channel using the specified Azure Queue reference
let initChannel (queue:CloudQueue) =
async { "Monitoring queue %s" queue.Name
let! r = queue.CreateIfNotExistsAsync().AsAsync
if not r then "Queue %s was already created." queue.Name
return queue
/// Find pending request from the queue with highest priority
let rec private findHighestPriorityRequests queues =
async {
match queues with
| [] -> return None
| (queue, handling)::rest ->
let! m = tryGetMessageBatch queue handling
match m with
| [] -> return! findHighestPriorityRequests rest
| messages -> return Some (queue, handling, messages)
/// Exception raised by message handlers when a message is rejected
exception RejectedMessage
/// Info returned for a request successfully processed
type ProcessedRequestInfo<'R> =
/// agent action performed after processing the request
action : Agent.RequestAction<'R>
/// the request, if it was properly parsed
request : 'R
/// date/time when the request was posted on the queu
requestInsertionTime : System.DateTime
/// Time taken to process the request
processingTime : System.TimeSpan
/// queue message object from the Azure API
queuedMessage : CloudQueueMessage
/// Info returned for a request that could not be processed
type FailedRequestInfo<'R> =
/// A description of the error that occurred
errorMessage : string
/// More details about the error
details : string
/// agent action performed after processing the request
action : Agent.RequestAction<'R>
/// processed request
request : 'R option
/// date/time when the request was posted on the queu
requestInsertionTime : System.DateTime
/// Time taken to process the request
processingTime : System.TimeSpan
/// queue message object from the Azure API
queuedMessage : CloudQueueMessage
/// Outcome of the processing of a request
type RequestOutcome<'R> =
| Processed of ProcessedRequestInfo<'R>
| Error of FailedRequestInfo<'R>
| Rejected of FailedRequestInfo<'R>
| ExceptionThrown of System.Exception * FailedRequestInfo<'R>
/// Process a single request from an Azure Queue
let processMessage<'C,'R>
(tryDeserialize:string -> Async<Result<'R,string>>)
(handler:Handler<'C, 'R>)
(getTags:'R -> (string*string) list)
(logger: RequestOutcome<'R> -> unit) =
/// Process a single request from a Queue
let processRequest<'Context, 'Request, 'QueueMessage>
(queueSystem:QueueingAPI<'QueueMessage, 'Request>)
(handler:Handler<'Context, 'Request>)
(getTags:'Request -> (string*string) list)
(logger: OutcomeLogger) =
async {
let requestInsertionTime =
// This gymnastic is needed to workAsync around spurious warning #52 from F# compiler in release mode.
// See
let insertionTime = queuedMessage.InsertionTime |> (fun x -> x.GetValueOrDefault())
(queueSystem.insertionTime queuedMessage).UtcDateTime
let! parseResult = tryDeserialize queuedMessage.AsString
let queueMessageAsString = queueSystem.serialize queuedMessage
let parseResult = queueSystem.tryDeserialize queueMessageAsString
match parseResult with
| Result.Error deserializationError ->
let outcome =
logger.log <|
errorMessage = "Could not parse queued message"
details = deserializationError
action = Agent.RequestAction.Delete
request = None
requestInsertionTime = requestInsertionTime
processingTime = System.TimeSpan.Zero
queuedMessage = queuedMessage
logger outcome
do! delete queue queuedMessage
return outcome
do! queueSystem.delete queuedMessage
| Result.Ok parsedMessage ->
let tags = getTags parsedMessage
@ -225,75 +187,82 @@ module AzureQueue =
let processingTime = System.Diagnostics.Stopwatch()
let! action = handler context parsedMessage
do! executeAction queue queuedMessage action
let outcome =
action = action
request = parsedMessage
requestInsertionTime = requestInsertionTime
processingTime = processingTime.Elapsed
queuedMessage = queuedMessage
do! handler context
{ new Continuation<'Request> with
member __.k executionStatusAsync =
async {
// Schedule the remaining execution of the specified queued request
let! executionStatus = executionStatusAsync
match executionStatus with
| Agent.ExecutionInstruction.Completed _ ->
do! queueSystem.delete queuedMessage
| Agent.ExecutionInstruction.SleepAndResume visibilityTimeout ->
do! queueSystem.updateVisibility queuedMessage visibilityTimeout
| Agent.ExecutionInstruction.SleepAndResumeAt (visibilityTimeout, newRequest) ->
do! queueSystem.update queuedMessage newRequest visibilityTimeout
logger.log <|
executionStatus = executionStatus
request = parsedMessage
requestInsertionTime = requestInsertionTime
processingTime = processingTime.Elapsed
queuedMessage = queuedMessage
logger outcome
return outcome
| RejectedMessage ->
let outcome =
logger.log <|
errorMessage = "This request type was rejected from queue handler"
details = sprintf "Queue name: %s" queue.Name
action = Agent.RequestAction.Delete
details = sprintf "Queue name: %s" queueSystem.queueName
request = Some parsedMessage
requestInsertionTime = requestInsertionTime
processingTime = processingTime.Elapsed
queuedMessage = queuedMessage
logger outcome
do! delete queue queuedMessage
return outcome
do! queueSystem.delete queuedMessage
| e ->
let elapsed = processingTime.Elapsed
TraceTags.trackException e (tags @[
"postedTime", requestInsertionTime.ToString()
"processingTime", elapsed.ToString()
"request", queuedMessage.AsString
"request", queueMessageAsString
let outcome =
logger.log <|
errorMessage = "Exception raised while processing request"
details = sprintf "Exception: %O" e
action = Agent.RequestAction.Delete
request = Some parsedMessage
requestInsertionTime = requestInsertionTime
processingTime = processingTime.Elapsed
queuedMessage = queuedMessage
do! delete queue queuedMessage
return outcome
do! queueSystem.delete queuedMessage
exception ProcessingLoopCancelled
/// A processing loop handling requests posted on multiple Azure Queues with
/// different assigned priorities
let processingLoopMultipleQueues<'Q, 'R, 'C>
/// A processing loop handling requests posted on multiple
/// queues with different assigned priorities
let processingLoopMultipleQueues<'QueueId, 'Request, 'Context, 'QueueMessage>
(queuesOrderedByPriority:(CloudQueue * QueueProcessor<'Q, 'R, 'C>) list)
(createRequestContext : CloudQueue -> CloudQueueMessage -> 'C)
(queueProcessorsOrderedByPriority: (QueueProcessor<'QueueId, 'Request, 'Context>) list)
(getQueue:'QueueId -> QueueingAPI<_,_>)
(createRequestContext : QueueingAPI<_,_> -> 'QueueMessage -> 'Context)
(signalHeartBeat : unit -> unit)
(tryDeserialize:'C -> string -> Async<Result<'R, string>>)
(terminationRequested: System.Threading.CancellationToken)
@ -301,8 +270,20 @@ module AzureQueue =
use processingPool = new Microsoft.FSharpLu.Async.Synchronization.Pool(options.ConcurrentRequestWorkers)
let pool = processingPool :> Microsoft.FSharpLu.Async.Synchronization.IPool
/// Find pending request from the queue with highest priority
let rec findHighestPriorityRequests queues =
async {
match queues with
| [] -> return None
| (queue, handling : QueueProcessor<'QueueId, 'Request, 'Context>)::rest ->
let! m = queue.tryGetMessageBatch handling.messageBatchSize handling.maxProcessTime
match m with
| [] -> return! findHighestPriorityRequests rest
| messages -> return Some (queue, handling, messages)
/// Process a batch of messages
let processMessageBatch handler (queue:CloudQueue) (queueMessageBatch:CloudQueueMessage list) =
let processMessageBatch handler queue (queueMessageBatch:'QueueMessage list) =
for queuedMessage in queueMessageBatch ->
@ -313,22 +294,27 @@ module AzureQueue =
if terminationRequested.IsCancellationRequested then
raise ProcessingLoopCancelled
let context = createRequestContext queue queuedMessage
let! outcome =
do! processRequest
(tryDeserialize context)
return ()
|> Async.Catch // Individual request should be able to fail independently without taking down the entire batch
|> Async.Ignore
let queuesOrderedByPriority =
|> (fun handling ->
let queue = getQueue handling.queueId "Monitoring queue %s" queue.queueName
queue, handling)
|> Seq.toList
/// Outermost loop: process requests from each channel by priority
/// Note: One drawback is that higher-priority queues may starve lower-priority ones
let rec processChannels (heartBeatWatch:System.Diagnostics.Stopwatch) =
@ -340,24 +326,204 @@ module AzureQueue =
if terminationRequested.IsCancellationRequested then
raise ProcessingLoopCancelled
let sleepDurationWhenAllQueuesAreEmptyInMs =
options.SleepDurationWhenAllQueuesAreEmpty.TotalMilliseconds |> int
let sleepDurationWhenAllQueuesAreEmptyInMs =
options.SleepDurationWhenAllQueuesAreEmpty.TotalMilliseconds |> int
let! highestRequest = findHighestPriorityRequests queuesOrderedByPriority
match highestRequest with
| None ->
// all channels are empty: we rest for a bit
do! Async.Sleep(sleepDurationWhenAllQueuesAreEmptyInMs)
let! highestRequest = findHighestPriorityRequests queuesOrderedByPriority
match highestRequest with
| None ->
// all channels are empty: we rest for a bit
do! Async.Sleep(sleepDurationWhenAllQueuesAreEmptyInMs)
| Some (queue, handling, firstMessageBatch) ->
do! processMessageBatch handling.handler queue firstMessageBatch
| Some (queue, handling, firstMessageBatch) ->
do! processMessageBatch handling.handler queue firstMessageBatch
return! processChannels heartBeatWatch
return! processChannels heartBeatWatch
let heartBeatWatch = System.Diagnostics.Stopwatch()
do! processChannels heartBeatWatch
/// Deserialize a request
let inline tryDeserializeJson< ^Request> queuedMessageAsString =
match Microsoft.FSharpLu.Json.Compact.tryDeserialize< ^Request> queuedMessageAsString with
| Choice1Of2 message -> Result.Ok message
| Choice2Of2 error -> Result.Error error
/// In-memory queue system implemented with System.Collections.ConcurrentQueue
/// with not fault-tolerance (if the process crashes the queue content is lost)
/// and not visiblity timeout when consuming messages
module InMemoryQueue =
type QueueEntry< ^QueueContent> =
insertionTime : System.DateTimeOffset
content : ^QueueContent
visible : bool
/// Create an instance of an queue processor based on mailbox processor
let inline newQueue<'QueueId, ^QueueContent> queueNamePrefix (queueId:'QueueId)
: QueueScheduler.QueueingAPI<System.Guid, ^QueueContent> =
let queueName = sprintf "%s-%A" queueNamePrefix queueId
let queue = new System.Collections.Concurrent.ConcurrentDictionary<System.Guid, QueueEntry< ^QueueContent>>()
queueName = queueName
update = fun queuedRequest queueContent visibility -> async {
let newContent =
(fun _ -> { insertionTime = System.DateTimeOffset.UtcNow
content = queueContent
visible = true }),
(fun _ c -> { c with content = queueContent }))
return ()
// no-op: visiblity timeout is infinite in this implementation
updateVisibility = fun queuedRequest visibilityTimeout -> async.Return ()
delete = fun queuedRequest -> async {
let success, _ = queue.TryRemove(queuedRequest)
if not success then
failwith "could not remove entry from queue"
insertionTime = fun queueMessage ->
let success, message = queue.TryGetValue(queueMessage)
if not success then
failwith "could not find queue entry"
serialize = fun queueMessage ->
let success, message = queue.TryGetValue(queueMessage)
if not success then
failwith "could not find queue entry"
Microsoft.FSharpLu.Json.Compact.serialize message.content
tryDeserialize = QueueScheduler.tryDeserializeJson< ^QueueContent>
tryGetMessageBatch = fun batchSize _ -> async {
let nextBatch =
lock queue (fun () ->
queue :> seq<_>
|> Seq.where (fun m -> m.Value.visible)
|> Seq.sortBy (fun m -> m.Value.insertionTime)
|> (fun c ->
if not <| queue.TryUpdate(c.Key, { c.Value with visible = false }, c.Value) then
failwith "impossible: queue entry disappeard!"
|> Seq.truncate batchSize
|> Seq.toList
) "Batch: %A" nextBatch
return nextBatch
post = fun (content:^QueueContent) -> async {
let id = System.Guid.NewGuid()
let queueEntry =
{ insertionTime = System.DateTimeOffset.UtcNow
content = content
visible = true
if not <| queue.TryAdd(id, queueEntry) then
failwith "impossible: guid collision"
postIn = fun (content:^QueueContent) delay -> async {
let id = System.Guid.NewGuid()
let queueEntry =
{ insertionTime = System.DateTimeOffset.UtcNow
content = content
visible = true
let child = async {
do! Async.Sleep (int <| delay.TotalMilliseconds)
if not <| queue.TryAdd(id, queueEntry) then
failwith "impossible: guid collision"
let r = Async.StartChild child
return ()
/// Queue system implemented with Azure Queues
module AzureQueue =
open Microsoft.Azure.Storage.Queue
open QueueScheduler
open Microsoft.FSharpLu.Azure.Queue
open Microsoft.FSharpLu.Logging
open Microsoft.Azure.Storage
type ProcessedRequestInfo<'Request, 'Result> = ProcessedRequestInfo<'Request, 'Result, CloudQueueMessage>
type FailedRequestInfo<'Request> = FailedRequestInfo<'Request, CloudQueueMessage>
/// Create an instance of an queue processor based on Azure Queue
let inline newQueue<'QueueId, ^QueueContent> (azureStorage:CloudStorageAccount) queueNamePrefix (queueId:'QueueId)
: QueueScheduler.QueueingAPI<CloudQueueMessage, ^QueueContent> =
// Create the reference to the Azure queue
let queueClient = azureStorage.CreateCloudQueueClient()
let queueName = sprintf "%s-%A" queueNamePrefix queueId
let queue = queueClient.GetQueueReference(queueName)
if not <| queue.CreateIfNotExists() then "Queue %s was already created." queueName
queueName = queueName
// Update content and increase visibility of an Azure Queue message
update = fun queuedRequest content visibility -> updateMessage queue queuedRequest content visibility
// Update visibility of an Azure Queue message and ensure it will be persisted at the end of the invisibility period.
// Note: this may require to delete and re-post the same message to get around the Azure Queue message lifetime of 7 days.
updateVisibility = fun queuedRequest visibilityTimeout ->
async {
do! increaseVisibilityTimeout queue queuedRequest visibilityTimeout
| e ->
TraceTags.error "Could not increase message visibility in Azure backend queue. Update errors are usually due to a request taking too long to process, causing the message visibility timeout to be reached, subsequently leading to two concurrent instances of the service to process the same request."
[ "queuedMessage", queuedRequest.AsString
"exception", e.ToString()]
// Delete a request from the queue
delete = fun queuedRequest ->
async {
do! queue.DeleteMessageAsync(queuedRequest) |> Async.AwaitTask
| e ->
TraceTags.error "Could not delete message from Azure backend queue. This is usually due to a request taking too long to process, causing the message visibility timeout to be reached, subsequently leading to two concurrent instances of the service to process the same request."
[ "queuedMessage", queuedRequest.AsString
"exception", e.ToString()]
insertionTime = fun queueMessage -> queueMessage.InsertionTime.GetValueOrDefault()
serialize = fun queueMessage -> queueMessage.AsString
tryDeserialize = QueueScheduler.tryDeserializeJson
// Attempt to retrieve one batch of message from the Azure queue
tryGetMessageBatch = fun messageBatchSize maxProcessTime ->
async {
let visibilityTimeout =
System.TimeSpan.FromSeconds(float messageBatchSize * maxProcessTime.TotalSeconds)
|> Microsoft.FSharpLu.Azure.Queue.softValidateVisibilityTimeout
let! messages = queue.GetMessagesAsync(messageBatchSize, System.Nullable visibilityTimeout, null, null) |> Async.AwaitTask
return messages |> Seq.toList
post = fun request -> Microsoft.FSharpLu.Azure.Queue.postMessage queue request
postIn = fun request -> Microsoft.FSharpLu.Azure.Queue.schedulePostMessage queue request

@ -15,15 +15,14 @@ module AzureTableJoinStorage =
/// Instantiate an implementation of Agent.Join.StorageInterface
/// backed up by an Azure Table
let newStorage
(credentials: Table.StorageCredentials)
(uri: Table.StorageUri)
(storage: Table.CloudStorageAccount)
(tableName: string)
(retryInterval: System.TimeSpan)
(totalTimeout: System.TimeSpan)
(agentId: string) /// Customer identifier that gets recorded in every entry in Azure Table
: Async<Agent.Join.StorageInterface<'m>> =
async {
let tableClient = Table.CloudTableClient(uri, credentials)
let tableClient = Table.CloudTableClient(storage.TableStorageUri, storage.Credentials)
let table = tableClient.GetTableReference(tableName)
let! _ = table.CreateIfNotExistsAsync() |> Async.AwaitTask

Просмотреть файл

@ -64,6 +64,13 @@ Storage entry contents: %A. New entry contents: %A" joinId existingEntry joinEnt
let rec newAgent(storage) : Agent.Agent<_, _, _> =
let spawn (metadata: Agent.RequestMetadata, state: TestStates) =
async {
printfn " %A" (metadata, state)
let agent = newAgent(storage)
let! _ = Agent.execute state metadata agent
return ()
title = "Agent based persisting state in-memory"
@ -111,29 +118,22 @@ Storage entry contents: %A. New entry contents: %A" joinId existingEntry joinEnt
maximumInprocessSleep = TimeSpan.FromSeconds 1.0
embedState = fun metadata state -> (metadata, state)
scheduler = {
embed = fun metadata state -> (metadata, state)
persist = fun m ts -> async {return ()}
persist = fun m ts -> async {return ()}
storage = storage
joinStore = storage
operations =
let spawn (metadata: Agent.RequestMetadata, state: TestStates) =
async {
printfn " %A" (metadata, state)
let agent = newAgent(storage)
let! _ = Agent.execute state metadata agent
return ()
spawn = spawn
spawnIn = fun request ts ->
async {
printfn "Operations.postIn [%A] %A" ts request
do! Async.Sleep (int ts.TotalMilliseconds)
return! spawn request
spawn = spawn
spawnIn = fun request ts ->
async {
printfn "Operations.postIn [%A] %A" ts request
do! Async.Sleep (int ts.TotalMilliseconds)
return! spawn request
@ -142,7 +142,7 @@ Storage entry contents: %A. New entry contents: %A" joinId existingEntry joinEnt
let storage = Collections.Concurrent.ConcurrentDictionary()
Assert.Empty storage
let agent = newAgent(newInMemoryForkProgressStorage storage)
let! request = Agent.createRequest agent
let! request = Agent.createRequest agent.scheduler.joinStore
let! result = Agent.executeWithResult (State1 23) request agent
Assert.True(storage |> Seq.forall (fun x -> x.Value.status = Agent.Join.Status.Completed))
Assert.True(storage.Count = 4)
@ -158,7 +158,7 @@ Storage entry contents: %A. New entry contents: %A" joinId existingEntry joinEnt
fun () ->
async {
let agent = newAgent(newInMemoryForkProgressStorage storage)
let! request = Agent.createRequest agent
let! request = Agent.createRequest agent.scheduler.joinStore
let! _ = Agent.executeWithResult (State2 "blah") request agent
return ()
} |> Async.StartAsTask :> System.Threading.Tasks.Task
@ -174,15 +174,15 @@ Storage entry contents: %A. New entry contents: %A" joinId existingEntry joinEnt
let storage = Collections.Concurrent.ConcurrentDictionary()
Assert.Empty storage
let agent = newAgent(newInMemoryForkProgressStorage storage)
let! request = Agent.createRequest agent
let! request = Agent.createRequest agent.scheduler.joinStore
let! _ = Agent.executeWithResult (State3 (1, "blah")) request agent
let completedChild = storage |> Seq.tryFind (fun x -> x.Value.status = Agent.Join.Status.Completed && x.Value.parent.IsSome)
Assert.True (completedChild.IsSome)
|> Seq.exists (fun x ->
x.Value.status = Agent.Join.Status.Completed &&
not x.Value.childrenStatuses.IsEmpty &&
|> Seq.exists (fun x ->
x.Value.status = Agent.Join.Status.Completed &&
not x.Value.childrenStatuses.IsEmpty &&
x.Value.childrenStatuses.[completedChild.Value.Key.guid] = Agent.Join.Status.Completed
@ -198,9 +198,9 @@ Storage entry contents: %A. New entry contents: %A" joinId existingEntry joinEnt
let storage = Collections.Concurrent.ConcurrentDictionary()
Assert.Empty storage
let agent = newAgent(newInMemoryForkProgressStorage storage)
let! request = Agent.createRequest agent
let! request = Agent.createRequest agent.scheduler.joinStore
let! _ = Agent.executeWithResult (Fork1) request agent
Assert.True(storage |> Seq.forall (fun x -> x.Value.status = Agent.Join.Status.Completed))
Assert.True(storage |> Seq.filter(fun x -> x.Value.parent.IsSome) |> Seq.length = 4)
Assert.True(storage |> Seq.filter(fun x -> not x.Value.childrenStatuses.IsEmpty) |> Seq.length = 2)
@ -213,7 +213,7 @@ Storage entry contents: %A. New entry contents: %A" joinId existingEntry joinEnt
let storage = Collections.Concurrent.ConcurrentDictionary()
Assert.Empty storage
let agent = newAgent(newInMemoryForkProgressStorage storage)
let! request = Agent.createRequest agent
let! request = Agent.createRequest agent.scheduler.joinStore
let! _ =
fun () ->

@ -13,6 +13,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
.gitignore = .gitignore
appveyor.yml = appveyor.yml
CI-vsts.yml = CI-vsts.yml
Directory.Build.props = Directory.Build.props = = =

Просмотреть файл

@ -12,9 +12,8 @@ type JoinId =
timestamp : System.DateTimeOffset
/// A state machine transition with
/// state type 's and return type 't,
/// and a set of callable state machines represented by type 'm
/// A state machine transition with return type 't.
/// state type 's, and a set of callable state machines represented by type 'm
type Transition<'s, 't, 'm> =
/// Sleep for the specified amount of type and come back to the same state
| Sleep of System.TimeSpan
@ -100,43 +99,38 @@ module StateMachine =
/// This gives flexibility to the API user in scheduling execution of the agent state machine.
module Agent =
/// Interface defining operations supported by an agent.
/// The consumer of the API is responsible for implementing the spawning logic
/// to execute a new agent in the specified state.
/// Interface defining operations required to "execute" an agent.
/// This is the "assembly code" of the state machine. These are basic instructions
/// that need to be implemented by any agent scheduler (responsibility of the API user).
/// This gets returned by the `Agent.execute` function when the state machine
/// returns (`Return` transition) or goes to sleep (out-of-process `Sleep` transition).
/// The user of the API is responsible for providing, as part of its scheduling system,
/// an implemention that conveys the semantic of the values defined below.
/// It is up to the API user to decide when to execute the spawn agent
/// (e.g. immediately in a background thread, queued up for later on some
/// queuing device...)
/// The type 'm represents a spawning request. It embeds the desired state of the agent to spawn, and the associated request metadata.
/// The API consumer provides the embedding function to create a request of type 'm
/// from a state of type 's and associated metadata (see `Agent.embedState` below).
type Operations<'m> =
/// Request spawning of a new state machine with the specified state and request metadata
spawn : 'm -> Async<unit>
/// Request spawning of a new state machine with the specified state and request metadata
/// in the specified amount of time
spawnIn : 'm -> System.TimeSpan -> Async<unit>
/// Type parameters:
/// 's is the type of states of the state machine
/// 't is the returned type of the agent
type ExecutionInstruction<'s, 't> =
/// The request has been processed and can be removed from the scheduling system
| Completed of 't option
/// Possible actions (to be implemented by the API user) that can be performed on a request
/// each time a state machine transition is executed.
/// This gets returned by the `Agent.execute` function when the state machine
/// returns (`Return` transition) or goes to sleep (out-of-process `Sleep` transition).
/// The user of the API is responsible for implementing those
/// action as part of its request scheduling system (each time the `Agent.execute` function is called).
type RequestAction<'m> =
/// The request has been processed and can be deleted from the request scheduling system
| Delete
/// The request processing is being paused for the specified amount of time,
/// The processing of the request must be paused for the specified amount of time,
/// once it the time out expires, the scheduler is responsible for calling `Agent.execute` to resume processing
/// of the request at the same state where it left off.
| PostponeFor of System.TimeSpan
/// Postpone processing of the request for the specified amount of time.
/// once the timeout expires, the scheduler is responsible for calling `Agent.execute` to resume processing
/// at the specified (embedded) state 'm
| PostponeAndReplace of 'm * System.TimeSpan
| SleepAndResume of System.TimeSpan
/// Processing must be paused for the specified amount of time.
/// Once the timeout expires, the scheduler is responsible for calling `Agent.execute` to resume processing
/// at the specified state 's
| SleepAndResumeAt of System.TimeSpan * 's
/// Metadata associated with an agent request
type RequestMetadata = JoinId
@ -188,24 +182,61 @@ module Agent =
get : JoinId -> Async<Entry<'m>>
/// An agent state machine with return type 't and underlying state type 's
/// embeddable into request type 'm.
/// An agent instance must implement `Agent.embedState`: a function to embed the agent state and associated `RequestMetadata` into a type 'm.
/// A scheduler responsible for scheduling execution of agents via calls to Agent.execute.
/// In order to support:
/// __pausing/resuming__ (and out-of-process asynchronous sleep). The agent must implement state persistence on an external device
/// (e.g. Azure Queue, Azure Service Bus...).
/// This is provided by function `Agent.persist` which gets called to persist the machine state and associated metadata in some external storage.
/// __out-of-process asynchronous sleep__ the function calling .execute must handle the actions `RequestAction.PostponeFor` and `RequestAction.PostponeAndReplace`
/// For instance by posting the serialize state 'm onto some queue for later processing.
/// - __pausing/resuming__ (and out-of-process asynchronous sleep). The scheduler must implement state persistence on an external device
/// (e.g. Azure Queue, Azure Service Bus...).
/// This is provided by function `Agent.persist` which gets called to persist the machine state and associated metadata in some external storage.
/// __corountines__ and the `CoReturn` transition, the agent must handle spawning new agents
/// by implementing `Agent.operations.spawn` and `Agent.operations.spawnIn`.
/// - __out-of-process asynchronous sleep__ the function calling .execute must handle the actions `RequestAction.PostponeFor` and `RequestAction.PostponeAndReplace`
/// For instance by posting the serialize state 'm onto some queue for later processing.
/// __forking__ (Fork, WhenAny, WhenAll), the agent must provides an external storage interface ``
/// of type `JoinStorage` implementing atomic add/update storage functions
/// - __corountines__ and the `CoReturn` transition, the scheduler must handle spawning new agents
/// by implementing `Agent.operations.spawn` and `Agent.operations.spawnIn`.
/// - __forking__ (Fork, WhenAny, WhenAll), the scheduler must provides an external storage interface ``
/// of type `JoinStorage` implementing atomic add/update storage functions
/// Type parameters:
/// 's is the type of states of the state machine
/// 'm represents a agent scheduling request. It embeds the desired state of the agent and the associated request metadata.
/// The scheduler provides the function to create a scheduling request of type 'm
/// from a state of type 's and associated metadata (see `Agent.embedState` below).
type Scheduler<'s, 'm> =
/// Create a scheduling request for creating/spawning a
/// an agent in specified state 's and associated metadata 'm
embed : RequestMetadata -> 's -> 'm
/// Persist a request 'm on some external storage device
/// and schedule the processing of the request in the specified amount of time
persist : 'm -> System.TimeSpan -> Async<unit>
/// Request spawning of a new state machine with the specified state and request metadata
spawn : 'm -> Async<unit>
/// Request spawning of a new state machine with the specified state and request metadata
/// in the specified amount of time
spawnIn : 'm -> System.TimeSpan -> Async<unit>
/// Provides an implementation of join storage (e.g. using Azure Table)
joinStore : Join.StorageInterface<'m>
/// An agent state machine with return type 't and underlying state type 's
/// embeddable into an agent request of type 'm.
/// An agent instance must implement `Agent.embedState`: a function to embed the agent state and associated `RequestMetadata`
/// into an agent processing request of type 'm.
/// Type parameters:
/// 's is the type of states of the state machine
/// 't is the returned type of the agent
/// 'm represents a agent scheduling request. It embeds the desired state of the agent and the associated request metadata.
type Agent<'s, 't, 'm> =
@ -228,23 +259,12 @@ module Agent =
/// Amount of time that is short enough to implement asynchronous sleep 'in-process' rather than out-of-process
maximumInprocessSleep : System.TimeSpan
/// A function embedding a given state machine internal state 's
/// and associated metadata into a custom type 'm
embedState : RequestMetadata -> 's -> 'm
/// Persist a state-request 'm on some external storage device
/// and schedule processing of the request in the specified amount of time
persist : 'm -> System.TimeSpan -> Async<unit>
/// External operations available to the agent
operations : Operations<'m>
/// Provides an implementation of join storage (e.g. using Azure Table)
storage : Join.StorageInterface<'m>
/// The scheduler responsible for execution the agent
scheduler : Scheduler<'s, 'm>
/// Create a new request attached to the specified parent JoinId
let private createRequestWithParent (m:Agent<'s, 't, 'm>) parent =
let private createRequestWithParent (joinStore:Join.StorageInterface<'m>) parent =
async {
let requestId =
@ -253,7 +273,7 @@ module Agent =
// Create a join entry for the request
do! joinStore.add
{ status = Join.Status.Requested
whenAllSubscribers = []
@ -267,8 +287,8 @@ module Agent =
/// Create a new request. Should be called by the API consumer to initialize a new request
/// to be executed with an agent and passed to `Agent.execute`
let public createRequest (m:Agent<'s, 't, 'm>) =
createRequestWithParent m None
let public createRequest (joinStore:Join.StorageInterface<'m>) =
createRequestWithParent joinStore None
/// Return true if all children are marked as completed
let private allCompleted childrenStatus =
@ -285,7 +305,7 @@ module Agent =
let private markRequestAsCompleted (m:Agent<'s, 't, 'm>) (metadata:RequestMetadata) =
async {
let! updatedEntry =
(fun u -> { u with status = Join.Status.Completed
modified = System.DateTimeOffset.UtcNow } )
@ -297,7 +317,7 @@ module Agent =
@ -297,7 +317,7 @@ module Agent =
let! joinEntry = joinId
m.scheduler.joinStore.update joinId
(fun driftedJoinEntry ->
// We need to check status from the drifeted entry in case
// other siblings have completed sine the update started
@ -323,7 +343,7 @@ module Agent =
// Honor the "WhenAny" subscriptions
for subscriber in whenAnySubscriptions do
do! m.operations.spawn subscriber
do! m.scheduler.spawn subscriber
// Honor the "WhenAll" subscriptions
if joinEntry.status = Join.Status.Completed then
@ -332,13 +352,13 @@ module Agent =
// Signal the fork's subscribers
for subscriber in joinEntry.whenAllSubscribers do
do! m.operations.spawn subscriber
do! m.scheduler.spawn subscriber
/// Advance execution of an agent state machine.
/// The state gets persisted on every transition to allow
/// resuming the execution in case of failure.
let public executeWithResult (initialState:'s) requestMetadata (m:Agent<'s, 't, 'm>) : Async<RequestAction<'s> * 't option> =
/// The state gets persisted on every transition
/// which allows resuming the execution in case of failure.
let public executeWithResult (initialState:'s) requestMetadata (m:Agent<'s, 't, 'm>) : Async<ExecutionInstruction<'s, 't>> =
/// Keep-alive function called to notify external device (e.g. Azure queue)
/// that more time is needed to finish executing the state machine
@ -347,24 +367,31 @@ module Agent =
// so that we can resume where we left off if the process crashes, is killed, upgraded...
// Specify a timeout after which the persisted state can be deserialized if not already deleted.
let timeoutPeriod = delay + m.maximumExpectedStateTransitionTime
m.persist (m.embedState requestMetadata newState) timeoutPeriod
m.scheduler.persist (m.scheduler.embed requestMetadata newState) timeoutPeriod
let tags = m.tags@["requestGuid", requestMetadata.guid.ToString()]
let rec sleepAndGoto delay (currentState:'s) (nextState:'s) =
async {
if delay < m.maximumInprocessSleep then
m.logger (sprintf "Agent '%s' sleeping for %O in proc" m.title delay) m.tags
m.logger (sprintf "Agent '%s' sleeping for %O in proc" m.title delay) tags
do! notifyMoreTimeIsNeeded currentState delay
// Short-enough to wait asynchronously in-process
do! Async.Sleep(delay.TotalMilliseconds |> int)
return! goto nextState
m.logger (sprintf "Agent '%s' sleeping for %O out of proc" m.title delay) m.tags
m.logger (sprintf "Agent '%s' sleeping for %O out of proc" m.title delay) tags
// For longer waits we wait asynchronously out-of-process using the external storage device
// and postpone the request until later
return (PostponeAndReplace (nextState, delay)), None
return ExecutionInstruction.SleepAndResumeAt(delay, nextState)
and goto (currentState:'s) =
and goto (state:'s) =
async {
do! notifyMoreTimeIsNeeded state System.TimeSpan.Zero
return! runAt state
and runAt (currentState:'s) =
async {
let! operation = m.transition currentState
match operation with
@ -375,18 +402,17 @@ module Agent =
return! sleepAndGoto delay currentState nextState
| Transition.Return result ->
m.logger (sprintf "Agent '%s' reached final state and returned %A" m.title result) m.tags
m.logger (sprintf "Agent '%s' reached final state and returned %A" m.title result) tags
do! markRequestAsCompleted m requestMetadata
return RequestAction.Delete, Some result
return ExecutionInstruction.Completed (Some result)
| Transition.Coreturn requestCallee ->
m.logger (sprintf "Agent '%s' coreturning to another state machine" m.title) m.tags
do! m.operations.spawn requestCallee
return RequestAction.Delete, None
m.logger (sprintf "Agent '%s' coreturning to another state machine" m.title) tags
do! m.scheduler.spawn requestCallee
return ExecutionInstruction.Completed None
| Transition.Goto newState ->
m.logger (sprintf "Agent '%s' at state %O" m.title newState) m.tags
do! notifyMoreTimeIsNeeded newState System.TimeSpan.Zero
m.logger (sprintf "Agent '%s' at state %O" m.title newState) tags
return! goto newState
@ -394,7 +420,7 @@ module Agent =
@ -394,7 +420,7 @@ module Agent =
return raise (System.NotSupportedException("ForkAndGoto does not accept empty spawned states list"))
// spawn children state machines
m.logger (sprintf "Agent '%s' forking into %d state machines" m.title spawnedStates.Length) m.tags
m.logger (sprintf "Agent '%s' forking into %d state machines" m.title spawnedStates.Length) tags
let joinId =
@ -409,7 +435,7 @@ module Agent =
|> AsyncSeq.foldAsync
(fun spawnChildrenSoFar spawnState ->
async {
let! metadata = createRequestWithParent m (Some joinId)
let! metadata = createRequestWithParent m.scheduler.joinStore (Some joinId)
return Map.add metadata.guid (metadata, spawnState) spawnChildrenSoFar
@ -417,7 +443,7 @@ module Agent =
// create the join entry
let now = System.DateTimeOffset.UtcNow
do! joinId
do! m.scheduler.joinStore.add joinId
status = Join.Status.Waiting
childrenStatuses = childrenMetdata |> (fun id metadata -> Join.Requested)
@ -425,7 +451,7 @@ module Agent =
whenAnySubscribers = []
parent = None
created = now
modified = now
modified = now
// spawn the child processes (Note: this cannot be done before the above join entry is created)
@ -436,8 +462,8 @@ module Agent =
|> AsyncSeq.iterAsync
(fun (childId, (childMetadata, spawnState)) ->
async {
let spawningRequest = m.embedState childMetadata spawnState
do! m.operations.spawn spawningRequest
let spawningRequest = m.scheduler.embed childMetadata spawnState
do! m.scheduler.spawn spawningRequest
@ -446,13 +472,13 @@ module Agent =
@ -446,13 +472,13 @@ module Agent =
| Transition.WhenAll (joinId, newState) ->
let! updatedJoinEntry = joinId
m.scheduler.joinStore.update joinId
(fun driftedJoin ->
if allCompleted driftedJoin.childrenStatuses then
// Subscribe to the 'WhenAll' event
let subscriber = m.embedState requestMetadata newState
let subscriber = m.scheduler.embed requestMetadata newState
{ driftedJoin with whenAllSubscribers = subscriber::driftedJoin.whenAllSubscribers
modified = System.DateTimeOffset.UtcNow } )
@ -460,18 +486,18 @@ module Agent =
// The 'WhenAll' condition is already met
return! goto newState
return RequestAction.Delete, None
return ExecutionInstruction.Completed None
/// 'WhenAny' joining
| Transition.WhenAny (joinId, newState) ->
let! updatedJoinEntry = joinId
m.scheduler.joinStore.update joinId
(fun driftedJoin ->
if atleastOneCompleted driftedJoin.childrenStatuses then
// Subscribe to the 'WhenAny' event
let subscriber = m.embedState requestMetadata newState
let subscriber = m.scheduler.embed requestMetadata newState
{ driftedJoin with whenAnySubscribers = subscriber::driftedJoin.whenAnySubscribers
modified = System.DateTimeOffset.UtcNow } )
@ -479,24 +505,23 @@ module Agent =
// The 'WhenAny' condition is already met
return! goto newState
return RequestAction.Delete, None
return ExecutionInstruction.Completed None
goto initialState
runAt initialState
/// Advance execution of an agent state machine.
/// Same as executeWithResult but return only the action in serialized format
/// and throw away the state machine result.
let public execute initialState requestMetadata (m:Agent<'s, 't, 'm>) =
/// Same as `executeWithResult` but return only the execution result
/// and throw away the state machine result itself.
let public execute (initialState:'s) requestMetadata (m:Agent<'s, 't, 'm>) :Async<ExecutionInstruction<'m, 't>> =
async {
let! action, finalResult = executeWithResult initialState requestMetadata m
let serializedAction =
match action with
| RequestAction.Delete ->
| RequestAction.PostponeFor t ->
RequestAction.PostponeFor t
| RequestAction.PostponeAndReplace (s, d) ->
RequestAction.PostponeAndReplace (m.embedState requestMetadata s, d)
return serializedAction
let! instruction = executeWithResult initialState requestMetadata m
match instruction with
| ExecutionInstruction.Completed result ->
ExecutionInstruction.Completed result
| ExecutionInstruction.SleepAndResume t ->
ExecutionInstruction.SleepAndResume t
| ExecutionInstruction.SleepAndResumeAt (t, s) ->
ExecutionInstruction.SleepAndResumeAt (t, m.scheduler.embed requestMetadata s)