Making VisibilityTimeout configurable (#844)

This commit is contained in:
Mathew Charles 2016-12-16 11:45:16 -08:00
Родитель 46acc4a315
Коммит f185c0afa1
10 изменённых файлов: 116 добавлений и 16 удалений

Просмотреть файл

@ -1,6 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using Microsoft.Azure.WebJobs;
namespace SampleHost
@ -10,6 +11,8 @@ namespace SampleHost
static void Main(string[] args)
{
var config = new JobHostConfiguration();
config.Queues.VisibilityTimeout = TimeSpan.FromSeconds(15);
if (config.IsDevelopment)
{
config.UseDevelopmentSettings();

Просмотреть файл

@ -21,6 +21,7 @@ namespace Microsoft.Azure.WebJobs.Host
private int _batchSize = DefaultBatchSize;
private int _newBatchThreshold;
private TimeSpan _maxPollingInterval = QueuePollingIntervals.DefaultMaximum;
private TimeSpan _visibilityTimeout = TimeSpan.Zero;
private int _maxDequeueCount = DefaultMaxDequeueCount;
/// <summary>
@ -126,6 +127,28 @@ namespace Microsoft.Azure.WebJobs.Host
}
}
/// <summary>
/// Gets or sets the default message visibility timeout that will be used
/// for messages that fail processing. The default is TimeSpan.Zero. To increase
/// the time delay between retries, increase this value.
/// </summary>
/// <remarks>
/// When message processing fails, the message will remain in the queue and
/// its visibility will be updated with this value. The message will then be
/// available for reprocessing after this timeout expires.
/// </remarks>
public TimeSpan VisibilityTimeout
{
get
{
return _visibilityTimeout;
}
set
{
_visibilityTimeout = value;
}
}
/// <summary>
/// Gets or sets the <see cref="IQueueProcessorFactory"/> that will be used to create
/// <see cref="QueueProcessor"/> instances that will be used to process messages.

Просмотреть файл

@ -15,6 +15,8 @@ namespace Microsoft.Azure.WebJobs.Host.Queues
int MaxDequeueCount { get; }
TimeSpan VisibilityTimeout { get; }
IQueueProcessorFactory QueueProcessorFactory { get; }
}
}

Просмотреть файл

@ -30,6 +30,7 @@ namespace Microsoft.Azure.WebJobs.Host.Queues.Listeners
private readonly object _stopWaitingTaskSourceLock = new object();
private readonly IQueueConfiguration _queueConfiguration;
private readonly QueueProcessor _queueProcessor;
private readonly TimeSpan _visibilityTimeout;
private bool _foundMessageSinceLastDelay;
private bool _disposed;
@ -73,6 +74,10 @@ namespace Microsoft.Azure.WebJobs.Host.Queues.Listeners
_trace = trace;
_queueConfiguration = queueConfiguration;
// if the function runs longer than this, the invisibility will be updated
// on a timer periodically for the duration of the function execution
_visibilityTimeout = TimeSpan.FromMinutes(10);
if (sharedWatcher != null)
{
// Call Notify whenever a function adds a message to this queue.
@ -134,14 +139,11 @@ namespace Microsoft.Azure.WebJobs.Host.Queues.Listeners
return CreateBackoffResult();
}
// What if job takes longer. Call CloudQueue.UpdateMessage
TimeSpan visibilityTimeout = TimeSpan.FromMinutes(10); // long enough to process the job
IEnumerable<IStorageQueueMessage> batch;
try
{
batch = await _queue.GetMessagesAsync(_queueProcessor.BatchSize,
visibilityTimeout,
_visibilityTimeout,
options: null,
operationContext: null,
cancellationToken: cancellationToken);
@ -181,7 +183,7 @@ namespace Microsoft.Azure.WebJobs.Host.Queues.Listeners
// of the cancellation token contract. However, the timer implementation would not dispose of the
// cancellation token source until it has stopped and perhaps also disposed, and we wait for all
// outstanding tasks to complete before stopping the timer.
Task task = ProcessMessageAsync(message, visibilityTimeout, cancellationToken);
Task task = ProcessMessageAsync(message, _visibilityTimeout, cancellationToken);
// Having both WaitForNewBatchThreshold and this method mutate _processing is safe because the timer
// contract is serial: it only calls ExecuteAsync once the wait expires (and the wait won't expire until

Просмотреть файл

@ -26,7 +26,6 @@ namespace Microsoft.Azure.WebJobs.Host.Queues
private readonly CloudQueue _queue;
private readonly CloudQueue _poisonQueue;
private readonly TraceWriter _trace;
private readonly int _maxDequeueCount;
/// <summary>
/// Constructs a new instance.
@ -42,10 +41,11 @@ namespace Microsoft.Azure.WebJobs.Host.Queues
_queue = context.Queue;
_poisonQueue = context.PoisonQueue;
_trace = context.Trace;
_maxDequeueCount = context.MaxDequeueCount;
MaxDequeueCount = context.MaxDequeueCount;
BatchSize = context.BatchSize;
NewBatchThreshold = context.NewBatchThreshold;
VisibilityTimeout = context.VisibilityTimeout;
}
/// <summary>
@ -58,11 +58,22 @@ namespace Microsoft.Azure.WebJobs.Host.Queues
/// </summary>
public int BatchSize { get; protected set; }
/// <summary>
/// Gets or sets the number of times to try processing a message before moving it to the poison queue.
/// </summary>
public int MaxDequeueCount { get; protected set; }
/// <summary>
/// Gets or sets the threshold at which a new batch of messages will be fetched.
/// </summary>
public int NewBatchThreshold { get; protected set; }
/// <summary>
/// Gets or sets the default message visibility timeout that will be used
/// for messages that fail processing.
/// </summary>
public TimeSpan VisibilityTimeout { get; protected set; }
/// <summary>
/// This method is called when there is a new message to process, before the job function is invoked.
/// This allows any preprocessing to take place on the message before processing begins.
@ -95,14 +106,14 @@ namespace Microsoft.Azure.WebJobs.Host.Queues
}
else if (_poisonQueue != null)
{
if (message.DequeueCount >= _maxDequeueCount)
if (message.DequeueCount >= MaxDequeueCount)
{
await CopyMessageToPoisonQueueAsync(message, cancellationToken);
await DeleteMessageAsync(message, cancellationToken);
}
else
{
await ReleaseMessageAsync(message, result, TimeSpan.Zero, cancellationToken);
await ReleaseMessageAsync(message, result, VisibilityTimeout, cancellationToken);
}
}
else
@ -121,7 +132,7 @@ namespace Microsoft.Azure.WebJobs.Host.Queues
/// <returns></returns>
protected virtual async Task CopyMessageToPoisonQueueAsync(CloudQueueMessage message, CancellationToken cancellationToken)
{
_trace.Warning(string.Format(CultureInfo.InvariantCulture, "Message has reached MaxDequeueCount of {0}. Moving message to queue '{1}'.", _maxDequeueCount, _poisonQueue.Name), TraceSource.Execution);
_trace.Warning(string.Format(CultureInfo.InvariantCulture, "Message has reached MaxDequeueCount of {0}. Moving message to queue '{1}'.", MaxDequeueCount, _poisonQueue.Name), TraceSource.Execution);
await AddMessageAndCreateIfNotExistsAsync(_poisonQueue, message, cancellationToken);

Просмотреть файл

@ -47,6 +47,7 @@ namespace Microsoft.Azure.WebJobs.Host.Queues
BatchSize = queueConfiguration.BatchSize;
MaxDequeueCount = queueConfiguration.MaxDequeueCount;
NewBatchThreshold = queueConfiguration.NewBatchThreshold;
VisibilityTimeout = queueConfiguration.VisibilityTimeout;
}
/// <summary>
@ -80,5 +81,11 @@ namespace Microsoft.Azure.WebJobs.Host.Queues
/// Gets or sets the threshold at which a new batch of messages will be fetched.
/// </summary>
public int NewBatchThreshold { get; set; }
/// <summary>
/// Gets or sets the message visibility that will be used for messages that
/// fail processing.
/// </summary>
public TimeSpan VisibilityTimeout { get; set; }
}
}

Просмотреть файл

@ -37,10 +37,20 @@ namespace Microsoft.Azure.WebJobs.Host.FunctionalTests
[Fact]
public void Constructor_DefaultsValues()
{
QueueProcessorFactoryContext context = new QueueProcessorFactoryContext(_queue, _trace, _queuesConfig);
var config = new JobHostQueuesConfiguration
{
BatchSize = 32,
MaxDequeueCount = 2,
NewBatchThreshold = 100,
VisibilityTimeout = TimeSpan.FromSeconds(30)
};
QueueProcessorFactoryContext context = new QueueProcessorFactoryContext(_queue, _trace, config);
QueueProcessor localProcessor = new QueueProcessor(context);
Assert.Equal(_queuesConfig.BatchSize, localProcessor.BatchSize);
Assert.Equal(_queuesConfig.NewBatchThreshold, localProcessor.NewBatchThreshold);
Assert.Equal(config.BatchSize, localProcessor.BatchSize);
Assert.Equal(config.MaxDequeueCount, localProcessor.MaxDequeueCount);
Assert.Equal(config.NewBatchThreshold, localProcessor.NewBatchThreshold);
Assert.Equal(config.VisibilityTimeout, localProcessor.VisibilityTimeout);
}
[Fact]
@ -111,6 +121,29 @@ namespace Microsoft.Azure.WebJobs.Host.FunctionalTests
Assert.True(poisonMessageHandlerCalled);
}
[Fact]
public async Task CompleteProcessingMessageAsync_Failure_AppliesVisibilityTimeout()
{
var queuesConfig = new JobHostQueuesConfiguration
{
// configure a non-zero visibility timeout
VisibilityTimeout = TimeSpan.FromMinutes(5)
};
QueueProcessorFactoryContext context = new QueueProcessorFactoryContext(_queue, _trace, queuesConfig, _poisonQueue);
QueueProcessor localProcessor = new QueueProcessor(context);
string messageContent = Guid.NewGuid().ToString();
CloudQueueMessage message = new CloudQueueMessage(messageContent);
await _queue.AddMessageAsync(message, CancellationToken.None);
var functionResult = new FunctionResult(false);
message = await _queue.GetMessageAsync();
await localProcessor.CompleteProcessingMessageAsync(message, functionResult, CancellationToken.None);
var delta = message.NextVisibleTime - DateTime.UtcNow;
Assert.True(delta.Value.TotalMinutes > 4);
}
public class TestFixture : IDisposable
{
private const string TestQueuePrefix = "queueprocessortests";

Просмотреть файл

@ -44,6 +44,11 @@ namespace Microsoft.Azure.WebJobs.Host.FunctionalTests.TestDoubles
get { return 3; }
}
public TimeSpan VisibilityTimeout
{
get { return TimeSpan.Zero; }
}
public IQueueProcessorFactory QueueProcessorFactory
{
get

Просмотреть файл

@ -38,6 +38,11 @@ namespace Microsoft.Azure.WebJobs.Host.TestCommon
get { return _maxDequeueCount; }
}
public TimeSpan VisibilityTimeout
{
get { return TimeSpan.Zero; }
}
public IQueueProcessorFactory QueueProcessorFactory
{
get

Просмотреть файл

@ -1,6 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using Microsoft.Azure.WebJobs.Host.Queues;
using Microsoft.Azure.WebJobs.Host.Queues.Listeners;
using Xunit;
@ -12,10 +13,8 @@ namespace Microsoft.Azure.WebJobs.Host.UnitTests
[Fact]
public void Constructor_Defaults()
{
// Arrange
JobHostQueuesConfiguration config = new JobHostQueuesConfiguration();
// Act & Assert
Assert.Equal(16, config.BatchSize);
Assert.Equal(8, config.NewBatchThreshold);
Assert.Equal(typeof(DefaultQueueProcessorFactory), config.QueueProcessorFactory.GetType());
@ -25,7 +24,6 @@ namespace Microsoft.Azure.WebJobs.Host.UnitTests
[Fact]
public void NewBatchThreshold_CanSetAndGetValue()
{
// Arrange
JobHostQueuesConfiguration config = new JobHostQueuesConfiguration();
// Unless explicitly set, NewBatchThreshold will be computed based
@ -41,5 +39,16 @@ namespace Microsoft.Azure.WebJobs.Host.UnitTests
config.BatchSize = 8;
Assert.Equal(1000, config.NewBatchThreshold);
}
[Fact]
public void VisibilityTimeout_CanGetAndSetValue()
{
JobHostQueuesConfiguration config = new JobHostQueuesConfiguration();
Assert.Equal(TimeSpan.Zero, config.VisibilityTimeout);
config.VisibilityTimeout = TimeSpan.FromSeconds(30);
Assert.Equal(TimeSpan.FromSeconds(30), config.VisibilityTimeout);
}
}
}