AAD: new AuthenticationTransmissionPolicy for retry scenarios. (#2312)
This commit is contained in:
Родитель
b655ef46af
Коммит
649264d67d
|
@ -0,0 +1,121 @@
|
|||
namespace Microsoft.ApplicationInsights.WindowsServer.TelemetryChannel.Implementation.TransmissionPolicy
|
||||
{
|
||||
using System;
|
||||
using System.Diagnostics.Tracing;
|
||||
using System.Globalization;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
|
||||
using Microsoft.ApplicationInsights.Channel.Implementation;
|
||||
using Microsoft.ApplicationInsights.Extensibility.Implementation;
|
||||
using Microsoft.ApplicationInsights.TestFramework;
|
||||
using Microsoft.ApplicationInsights.WindowsServer.TelemetryChannel.Helpers;
|
||||
using Microsoft.VisualStudio.TestTools.UnitTesting;
|
||||
|
||||
[TestClass]
|
||||
[TestCategory("TransmissionPolicy")]
|
||||
public class AuthenticationTransmissionPolicyTests
|
||||
{
|
||||
[TestMethod]
|
||||
public void Verify401TriggersThrottling() => this.EvaluateIfStatusCodeTriggersThrottling(ResponseStatusCodes.Unauthorized);
|
||||
|
||||
[TestMethod]
|
||||
public void Verify403TriggersThrottling() => this.EvaluateIfStatusCodeTriggersThrottling(ResponseStatusCodes.Forbidden);
|
||||
|
||||
[TestMethod]
|
||||
public void Verify400DoesNotTriggerThrottling() => this.EvaluateIfStatusCodeIgnored(ResponseStatusCodes.BadRequest);
|
||||
|
||||
[TestMethod]
|
||||
public void Verify200DoesNotTriggerThrottling() => this.EvaluateIfStatusCodeIgnored(ResponseStatusCodes.Success);
|
||||
|
||||
[TestMethod]
|
||||
public void VerifyOtherDoesNotTriggerThrottling() => this.EvaluateIfStatusCodeIgnored(000);
|
||||
|
||||
private void EvaluateIfStatusCodeTriggersThrottling(int statusCode)
|
||||
{
|
||||
var retryDelay = TimeSpan.FromSeconds(5);
|
||||
var waitForTheFirstApplyAsync = TimeSpan.FromMilliseconds(100);
|
||||
var waitForTheSecondApplyAsync = retryDelay.Add(TimeSpan.FromMilliseconds(500)); // adding a few ms to give the unit test a buffer.
|
||||
|
||||
// SETUP
|
||||
var transmitter = new StubTransmitterEvalOnApply();
|
||||
|
||||
var policy = new AuthenticationTransmissionPolicy()
|
||||
{
|
||||
Enabled = true,
|
||||
};
|
||||
|
||||
// we override the default timer here to speed up unit tests.
|
||||
policy.PauseTimer = new TaskTimerInternal { Delay = retryDelay };
|
||||
policy.Initialize(transmitter);
|
||||
|
||||
// ACT
|
||||
transmitter.InvokeTransmissionSentEvent(statusCode);
|
||||
|
||||
// ASSERT: First Handle will trigger Throttle and delay.
|
||||
Assert.IsTrue(transmitter.IsApplyInvoked(waitForTheFirstApplyAsync));
|
||||
|
||||
Assert.AreEqual(0, policy.MaxSenderCapacity);
|
||||
Assert.AreEqual(0, policy.MaxBufferCapacity);
|
||||
Assert.IsNull(policy.MaxStorageCapacity);
|
||||
|
||||
// ASSERT: Throttle expires and policy will be reset.
|
||||
Assert.IsTrue(transmitter.IsApplyInvoked(waitForTheSecondApplyAsync));
|
||||
|
||||
Assert.IsNull(policy.MaxSenderCapacity);
|
||||
Assert.IsNull(policy.MaxBufferCapacity);
|
||||
Assert.IsNull(policy.MaxStorageCapacity);
|
||||
}
|
||||
|
||||
private void EvaluateIfStatusCodeIgnored(int statusCode)
|
||||
{
|
||||
var waitForTheFirstApplyAsync = TimeSpan.FromMilliseconds(100);
|
||||
|
||||
// SETUP
|
||||
var transmitter = new StubTransmitterEvalOnApply();
|
||||
|
||||
var policy = new AuthenticationTransmissionPolicy()
|
||||
{
|
||||
Enabled = true,
|
||||
};
|
||||
policy.Initialize(transmitter);
|
||||
|
||||
// ACT
|
||||
transmitter.InvokeTransmissionSentEvent(statusCode);
|
||||
|
||||
// ASSERT: The Apply event handler should not be called.
|
||||
Assert.IsFalse(transmitter.IsApplyInvoked(waitForTheFirstApplyAsync));
|
||||
|
||||
// ASSERT: Capacities should have default values.
|
||||
Assert.IsNull(policy.MaxSenderCapacity);
|
||||
Assert.IsNull(policy.MaxBufferCapacity);
|
||||
Assert.IsNull(policy.MaxStorageCapacity);
|
||||
}
|
||||
|
||||
private class StubTransmitterEvalOnApply : StubTransmitter
|
||||
{
|
||||
private AutoResetEvent autoResetEvent;
|
||||
|
||||
public StubTransmitterEvalOnApply()
|
||||
{
|
||||
this.autoResetEvent = new AutoResetEvent(false);
|
||||
this.OnApplyPolicies = () => this.autoResetEvent.Set();
|
||||
}
|
||||
|
||||
public void InvokeTransmissionSentEvent(int responseStatusCode)
|
||||
{
|
||||
this.OnTransmissionSent(new TransmissionProcessedEventArgs(
|
||||
transmission: new StubTransmission(),
|
||||
exception: null,
|
||||
response: new HttpWebResponseWrapper()
|
||||
{
|
||||
StatusCode = responseStatusCode,
|
||||
StatusDescription = null,
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
public bool IsApplyInvoked(TimeSpan timeout) => this.autoResetEvent.WaitOne(timeout);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -22,67 +22,31 @@
|
|||
[TestMethod]
|
||||
public void AssertTooManyRequestsStopsSending()
|
||||
{
|
||||
this.PositiveTest(ResponseStatusCodes.ResponseCodeTooManyRequests, 0, null, null, false);
|
||||
this.EvaluateIfStatusCodeTriggersThrottling(ResponseStatusCodes.ResponseCodeTooManyRequests, 0, null, null, false);
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
public void AssertTooManyRequestsStopsSendingWithFlushAsyncTask()
|
||||
{
|
||||
this.PositiveTest(ResponseStatusCodes.ResponseCodeTooManyRequests, 0, 0, null, true);
|
||||
this.EvaluateIfStatusCodeTriggersThrottling(ResponseStatusCodes.ResponseCodeTooManyRequests, 0, 0, null, true);
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
public void AssertTooManyRequestsOverExtendedTimeStopsSendingAndCleansCache()
|
||||
{
|
||||
this.PositiveTest(ResponseStatusCodes.ResponseCodeTooManyRequestsOverExtendedTime, 0, 0, 0, false);
|
||||
this.EvaluateIfStatusCodeTriggersThrottling(ResponseStatusCodes.ResponseCodeTooManyRequestsOverExtendedTime, 0, 0, 0, false);
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
public void AssertPaymentRequiredDoesntChangeCapacity()
|
||||
{
|
||||
var transmitter = new StubTransmitter();
|
||||
transmitter.OnApplyPolicies = () =>
|
||||
{
|
||||
throw new Exception("Apply shouldn't be called because unsupported response code was passed");
|
||||
};
|
||||
|
||||
var policy = new ThrottlingTransmissionPolicy();
|
||||
policy.Initialize(transmitter);
|
||||
|
||||
transmitter.OnTransmissionSent(
|
||||
new TransmissionProcessedEventArgs(
|
||||
new StubTransmission(), null, new HttpWebResponseWrapper()
|
||||
{
|
||||
StatusCode = ResponseCodePaymentRequired
|
||||
}));
|
||||
|
||||
Assert.IsNull(policy.MaxSenderCapacity);
|
||||
Assert.IsNull(policy.MaxBufferCapacity);
|
||||
Assert.IsNull(policy.MaxStorageCapacity);
|
||||
this.EvaluateIfStatusCodeIgnored(ResponseCodePaymentRequired);
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
public void AssertUnsupportedResponseCodeDoesnotChangeCapacity()
|
||||
{
|
||||
var transmitter = new StubTransmitter();
|
||||
transmitter.OnApplyPolicies = () =>
|
||||
{
|
||||
throw new Exception("Apply shouldn't be called because unsupported response code was passed");
|
||||
};
|
||||
|
||||
var policy = new ThrottlingTransmissionPolicy();
|
||||
policy.Initialize(transmitter);
|
||||
|
||||
transmitter.OnTransmissionSent(
|
||||
new TransmissionProcessedEventArgs(
|
||||
new StubTransmission(), null, new HttpWebResponseWrapper()
|
||||
{
|
||||
StatusCode = ResponseCodeUnsupported
|
||||
}));
|
||||
|
||||
Assert.IsNull(policy.MaxSenderCapacity);
|
||||
Assert.IsNull(policy.MaxBufferCapacity);
|
||||
Assert.IsNull(policy.MaxStorageCapacity);
|
||||
this.EvaluateIfStatusCodeIgnored(ResponseCodeUnsupported);
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
|
@ -112,49 +76,86 @@
|
|||
}
|
||||
}
|
||||
|
||||
private void PositiveTest(int responseCode, int? expectedSenderCapacity, int? expectedBufferCapacity, int? expectedStorageCapacity, bool hasFlushTask)
|
||||
private void EvaluateIfStatusCodeTriggersThrottling(int responseCode, int? expectedSenderCapacity, int? expectedBufferCapacity, int? expectedStorageCapacity, bool hasFlushTask)
|
||||
{
|
||||
const int RetryAfterSeconds = 2;
|
||||
string retryAfter = DateTime.Now.ToUniversalTime().AddSeconds(RetryAfterSeconds).ToString("R", CultureInfo.InvariantCulture);
|
||||
const int WaitForTheFirstApplyAsync = 100;
|
||||
int waitForTheSecondApplyAsync = (RetryAfterSeconds * 1000) /*to milliseconds*/ +
|
||||
500 /**magic number to wait for other code before/after
|
||||
* timer which calls 2nd ApplyAsync
|
||||
**/;
|
||||
var waitForTheFirstApplyAsync = TimeSpan.FromMilliseconds(100);
|
||||
var waitForTheSecondApplyAsync = TimeSpan.FromMilliseconds(RetryAfterSeconds * 1000 + 500);
|
||||
|
||||
var policyApplied = new AutoResetEvent(false);
|
||||
var transmitter = new StubTransmitter
|
||||
{
|
||||
OnApplyPolicies = () => policyApplied.Set(),
|
||||
};
|
||||
// SETUP
|
||||
var transmitter = new StubTransmitterEvalOnApply();
|
||||
|
||||
var policy = new ThrottlingTransmissionPolicy();
|
||||
policy.Initialize(transmitter);
|
||||
|
||||
transmitter.OnTransmissionSent(
|
||||
new TransmissionProcessedEventArgs(
|
||||
transmission: new StubTransmission() { IsFlushAsyncInProgress = hasFlushTask },
|
||||
exception: null,
|
||||
response: new HttpWebResponseWrapper()
|
||||
{
|
||||
StatusCode = responseCode,
|
||||
StatusDescription = null,
|
||||
RetryAfterHeader = retryAfter
|
||||
}));
|
||||
|
||||
Assert.IsTrue(policyApplied.WaitOne(WaitForTheFirstApplyAsync));
|
||||
transmitter.InvokeTransmissionSentEvent(responseCode, TimeSpan.FromSeconds(RetryAfterSeconds), hasFlushTask);
|
||||
|
||||
// ASSERT: First Handle will trigger Throttle and delay.
|
||||
Assert.IsTrue(transmitter.IsApplyInvoked(waitForTheFirstApplyAsync));
|
||||
|
||||
Assert.AreEqual(expectedSenderCapacity, policy.MaxSenderCapacity);
|
||||
Assert.AreEqual(expectedBufferCapacity, policy.MaxBufferCapacity);
|
||||
Assert.AreEqual(expectedStorageCapacity, policy.MaxStorageCapacity);
|
||||
|
||||
Assert.IsTrue(policyApplied.WaitOne(waitForTheSecondApplyAsync));
|
||||
// ASSERT: Throttle expires and policy will be reset.
|
||||
Assert.IsTrue(transmitter.IsApplyInvoked(waitForTheSecondApplyAsync));
|
||||
|
||||
// Check that it resets after retry-after interval
|
||||
Assert.IsNull(policy.MaxSenderCapacity);
|
||||
Assert.IsNull(policy.MaxBufferCapacity);
|
||||
Assert.IsNull(policy.MaxStorageCapacity);
|
||||
}
|
||||
|
||||
private void EvaluateIfStatusCodeIgnored(int statusCode)
|
||||
{
|
||||
var waitForTheFirstApplyAsync = TimeSpan.FromMilliseconds(100);
|
||||
|
||||
// SETUP
|
||||
var transmitter = new StubTransmitterEvalOnApply();
|
||||
|
||||
var policy = new AuthenticationTransmissionPolicy()
|
||||
{
|
||||
Enabled = true,
|
||||
};
|
||||
policy.Initialize(transmitter);
|
||||
|
||||
// ACT
|
||||
transmitter.InvokeTransmissionSentEvent(statusCode, default, false);
|
||||
|
||||
// ASSERT: The Apply event handler should not be called.
|
||||
Assert.IsFalse(transmitter.IsApplyInvoked(waitForTheFirstApplyAsync));
|
||||
|
||||
// ASSERT: Capacities should have default values.
|
||||
Assert.IsNull(policy.MaxSenderCapacity);
|
||||
Assert.IsNull(policy.MaxBufferCapacity);
|
||||
Assert.IsNull(policy.MaxStorageCapacity);
|
||||
}
|
||||
|
||||
private class StubTransmitterEvalOnApply : StubTransmitter
|
||||
{
|
||||
private AutoResetEvent autoResetEvent;
|
||||
|
||||
public StubTransmitterEvalOnApply()
|
||||
{
|
||||
this.autoResetEvent = new AutoResetEvent(false);
|
||||
this.OnApplyPolicies = () => this.autoResetEvent.Set();
|
||||
}
|
||||
|
||||
public void InvokeTransmissionSentEvent(int responseStatusCode, TimeSpan retryAfter, bool isFlushAsyncInProgress)
|
||||
{
|
||||
this.OnTransmissionSent(new TransmissionProcessedEventArgs(
|
||||
transmission: new StubTransmission() { IsFlushAsyncInProgress = isFlushAsyncInProgress },
|
||||
exception: null,
|
||||
response: new HttpWebResponseWrapper()
|
||||
{
|
||||
StatusCode = responseStatusCode,
|
||||
StatusDescription = null,
|
||||
RetryAfterHeader = DateTime.Now.ToUniversalTime().Add(retryAfter).ToString("R", CultureInfo.InvariantCulture),
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
public bool IsApplyInvoked(TimeSpan timeout) => this.autoResetEvent.WaitOne(timeout);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -11,8 +11,9 @@
|
|||
public const int InternalServerError = 500;
|
||||
public const int BadGateway = 502;
|
||||
public const int ServiceUnavailable = 503;
|
||||
public const int GatewayTimeout = 504;
|
||||
public const int Unauthorized = 401; // AAD
|
||||
public const int Forbidden = 403; // AAD
|
||||
public const int GatewayTimeout = 504;
|
||||
public const int BadRequest = 400; // AAD: AI resource was configured for AAD, but SDK is using older api. (v2 and v2.1).
|
||||
public const int Unauthorized = 401; // AAD: token is either absent, invalid, or expired.
|
||||
public const int Forbidden = 403; // AAD: Provided credentials do not grant access to ingest telemetry.
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,121 @@
|
|||
namespace Microsoft.ApplicationInsights.WindowsServer.TelemetryChannel.Implementation.TransmissionPolicy
|
||||
{
|
||||
using System;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
using Microsoft.ApplicationInsights.Channel.Implementation;
|
||||
using Microsoft.ApplicationInsights.Extensibility;
|
||||
using Microsoft.ApplicationInsights.Extensibility.Implementation;
|
||||
|
||||
/// <summary>
|
||||
/// This class defines how the ServerTelemetryChannel will behave when it receives Response Codes
|
||||
/// from the Ingestion Service related to Authentication (AAD) scenarios.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// This class is disabled by default and expected to be enabled only when AAD has been configured in <see cref="TelemetryConfiguration.CredentialEnvelope"/>.
|
||||
/// </remarks>
|
||||
internal class AuthenticationTransmissionPolicy : TransmissionPolicy, IDisposable
|
||||
{
|
||||
internal TaskTimerInternal PauseTimer = new TaskTimerInternal { Delay = TimeSpan.FromMinutes(1) };
|
||||
|
||||
private BackoffLogicManager backoffLogicManager;
|
||||
|
||||
public bool Enabled { get; set; } = false;
|
||||
|
||||
public override void Initialize(Transmitter transmitter)
|
||||
{
|
||||
if (transmitter == null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(transmitter));
|
||||
}
|
||||
|
||||
this.backoffLogicManager = transmitter.BackoffLogicManager;
|
||||
|
||||
base.Initialize(transmitter);
|
||||
transmitter.TransmissionSent += this.HandleTransmissionSentEvent;
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
this.Dispose(true);
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// This method subscribes to the <see cref="Transmitter.TransmissionSent"/> event.
|
||||
/// This encapsulates all <see cref="HttpWebResponseWrapper.StatusCode"/> related to Authentication (AAD) scenarios.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// AN EXPLANATION OF THE STATUS CODES:
|
||||
/// - <see cref="ResponseStatusCodes.BadRequest"/>
|
||||
/// "HTTP/1.1 400 Incorrect API was used - v2 API does not support authentication".
|
||||
/// This indicates that the AI resource was configured for AAD, but SDK was not.
|
||||
/// This is a configuration issue and is not recoverable from the client side.
|
||||
/// IMPORTANT: We ignore this error and allow telemetry to be dropped.
|
||||
/// Ingestion also uses HTTP 400 for invalid JSON and is indistinguishable from AAD errors.
|
||||
/// - <see cref="ResponseStatusCodes.Unauthorized"/>
|
||||
/// "HTTP/1.1 401 Unauthorized - please provide the valid authorization token".
|
||||
/// This indicates that the authorization token was either absent, invalid, or expired.
|
||||
/// The root cause is not known and we should throttle retries.
|
||||
/// - <see cref="ResponseStatusCodes.Forbidden"/>
|
||||
/// "HTTP/1.1 403 Forbidden - provided credentials do not grant the access to ingest the telemetry into the component".
|
||||
/// This indicates the configured identity does not have permissions to publish to this resource.
|
||||
/// This is a configuration issue and is not recoverable from the client side.
|
||||
/// This can be recovered if the user changes the AI Resource's configured Access Control.
|
||||
/// </remarks>
|
||||
private void HandleTransmissionSentEvent(object sender, TransmissionProcessedEventArgs e)
|
||||
{
|
||||
if (this.Enabled && e.Response != null)
|
||||
{
|
||||
switch (e.Response.StatusCode)
|
||||
{
|
||||
case ResponseStatusCodes.Unauthorized:
|
||||
case ResponseStatusCodes.Forbidden:
|
||||
this.ApplyThrottlePolicy(e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void ApplyThrottlePolicy(TransmissionProcessedEventArgs e)
|
||||
{
|
||||
this.MaxSenderCapacity = 0;
|
||||
this.MaxBufferCapacity = 0;
|
||||
this.MaxStorageCapacity = null;
|
||||
|
||||
this.LogCapacityChanged();
|
||||
this.Apply();
|
||||
|
||||
this.backoffLogicManager.ReportBackoffEnabled(e.Response.StatusCode);
|
||||
this.Transmitter.Enqueue(e.Transmission);
|
||||
|
||||
// Check this.pauseTimer above for the configured wait time.
|
||||
this.PauseTimer.Start(() =>
|
||||
{
|
||||
this.ResetPolicy();
|
||||
return Task.FromResult<object>(null);
|
||||
});
|
||||
}
|
||||
|
||||
private void ResetPolicy()
|
||||
{
|
||||
this.MaxSenderCapacity = null;
|
||||
this.MaxBufferCapacity = null;
|
||||
this.MaxStorageCapacity = null;
|
||||
this.LogCapacityChanged();
|
||||
this.Apply();
|
||||
}
|
||||
|
||||
private void Dispose(bool disposing)
|
||||
{
|
||||
if (disposing)
|
||||
{
|
||||
if (this.PauseTimer != null)
|
||||
{
|
||||
this.PauseTimer.Dispose();
|
||||
this.PauseTimer = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -8,6 +8,7 @@
|
|||
{
|
||||
private readonly IEnumerable<TransmissionPolicy> policies;
|
||||
private bool isDisposed;
|
||||
private AuthenticationTransmissionPolicy authenticationTransmissionPolicy;
|
||||
|
||||
public TransmissionPolicyCollection(INetwork network, IApplicationLifecycle applicationLifecycle)
|
||||
{
|
||||
|
@ -21,6 +22,7 @@
|
|||
new ErrorHandlingTransmissionPolicy(),
|
||||
new PartialSuccessTransmissionPolicy(),
|
||||
new NetworkAvailabilityTransmissionPolicy(network),
|
||||
this.authenticationTransmissionPolicy = new AuthenticationTransmissionPolicy(),
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -44,6 +46,8 @@
|
|||
}
|
||||
}
|
||||
|
||||
public void EnableAuthenticationPolicy() => this.authenticationTransmissionPolicy.Enabled = true;
|
||||
|
||||
public int? CalculateMinimumMaxSenderCapacity() => this.CalculateMinimumCapacity(p => p.MaxSenderCapacity);
|
||||
|
||||
public int? CalculateMinimumMaxBufferCapacity() => this.CalculateMinimumCapacity(p => p.MaxBufferCapacity);
|
||||
|
|
|
@ -247,7 +247,11 @@
|
|||
CredentialEnvelope ISupportCredentialEnvelope.CredentialEnvelope
|
||||
{
|
||||
get => this.Transmitter.CredentialEnvelope;
|
||||
set => this.Transmitter.CredentialEnvelope = value;
|
||||
set
|
||||
{
|
||||
this.Transmitter.CredentialEnvelope = value;
|
||||
this.policies.EnableAuthenticationPolicy();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
|
Загрузка…
Ссылка в новой задаче