This commit is contained in:
Malcolm Daigle 2024-08-05 16:51:12 -07:00 коммит произвёл GitHub
Родитель 7723b208ef
Коммит 295867bc8a
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
6 изменённых файлов: 184 добавлений и 57 удалений

Просмотреть файл

@ -53,6 +53,10 @@ namespace Microsoft.Data.SqlClientX
/// </summary>
private readonly ChannelReader<SqlConnector?> _idleConnectorReader;
private readonly ChannelWriter<SqlConnector?> _idleConnectorWriter;
private ValueTask _warmupTask;
private CancellationTokenSource _warmupCTS;
private readonly SemaphoreSlim _warmupLock;
#endregion
// Counts the total number of open connectors tracked by the pool.
@ -65,11 +69,12 @@ namespace Microsoft.Data.SqlClientX
/// Initializes a new PoolingDataSource.
/// </summary>
//TODO: support auth contexts and provider info
internal PoolingDataSource(SqlConnectionStringBuilder connectionStringBuilder,
internal PoolingDataSource(
SqlConnectionString connectionString,
SqlCredential credential,
DbConnectionPoolGroupOptions options,
RateLimiterBase connectionRateLimiter)
: base(connectionStringBuilder, credential)
: base(connectionString, credential)
{
_connectionPoolGroupOptions = options;
_connectionRateLimiter = connectionRateLimiter;
@ -83,6 +88,10 @@ namespace Microsoft.Data.SqlClientX
_idleConnectorWriter = idleChannel.Writer;
//TODO: initiate idle lifetime and pruning fields
_warmupTask = ValueTask.CompletedTask;
_warmupCTS = new CancellationTokenSource();
_warmupLock = new SemaphoreSlim(1);
}
#region properties
@ -192,7 +201,7 @@ namespace Microsoft.Data.SqlClientX
return connector;
}
}
}
}
finally
{
//TODO: log error
@ -273,7 +282,7 @@ namespace Microsoft.Data.SqlClientX
}
int i;
int i;
for (i = 0; i < MaxPoolSize; i++)
{
if (Interlocked.CompareExchange(ref _connectors[i], null, connector) == connector)
@ -300,6 +309,9 @@ namespace Microsoft.Data.SqlClientX
// Only turn off the timer one time, when it was this Close that brought Open back to _min.
//TODO: pruning
// Ensure that we return to min pool size if closing this connector brought us below min pool size.
_ = WarmUp();
}
/// <summary>
@ -307,33 +319,34 @@ namespace Microsoft.Data.SqlClientX
/// </summary>
internal readonly struct OpenInternalConnectionState
{
internal readonly SqlConnectionX _owningConnection;
internal readonly TimeSpan _timeout;
internal OpenInternalConnectionState(SqlConnectionX owningConnection, TimeSpan timeout)
{
_owningConnection = owningConnection;
_timeout = timeout;
}
internal PoolingDataSource Pool { get; init; }
internal SqlConnectionX? OwningConnection { get; init; }
internal TimeSpan Timeout { get; init; }
}
/// <inheritdoc/>
internal override ValueTask<SqlConnector?> OpenNewInternalConnection(SqlConnectionX owningConnection, TimeSpan timeout, bool async, CancellationToken cancellationToken)
internal override ValueTask<SqlConnector?> OpenNewInternalConnection(SqlConnectionX? owningConnection, TimeSpan timeout, bool async, CancellationToken cancellationToken)
{
return _connectionRateLimiter.Execute(
RateLimitedOpen,
new OpenInternalConnectionState(owningConnection, timeout),
new OpenInternalConnectionState
{
Pool = this,
OwningConnection = owningConnection,
Timeout = timeout
},
async,
cancellationToken
);
async ValueTask<SqlConnector?> RateLimitedOpen(OpenInternalConnectionState state, bool async, CancellationToken cancellationToken)
static async ValueTask<SqlConnector?> RateLimitedOpen(OpenInternalConnectionState state, bool async, CancellationToken cancellationToken)
{
// As long as we're under max capacity, attempt to increase the connector count and open a new connection.
for (var numConnectors = _numConnectors; numConnectors < MaxPoolSize; numConnectors = _numConnectors)
for (var numConnectors = state.Pool._numConnectors; numConnectors < state.Pool.MaxPoolSize; numConnectors = state.Pool._numConnectors)
{
// Note that we purposefully don't use SpinWait for this: https://github.com/dotnet/coreclr/pull/21437
if (Interlocked.CompareExchange(ref _numConnectors, numConnectors + 1, numConnectors) != numConnectors)
if (Interlocked.CompareExchange(ref state.Pool._numConnectors, numConnectors + 1, numConnectors) != numConnectors)
{
continue;
}
@ -342,25 +355,25 @@ namespace Microsoft.Data.SqlClientX
{
// We've managed to increase the open counter, open a physical connection.
var startTime = Stopwatch.GetTimestamp();
SqlConnector? connector = new SqlConnector(state._owningConnection, this);
SqlConnector? connector = new SqlConnector(state.OwningConnection, state.Pool);
//TODO: set clear counter on connector
await connector.Open(timeout, async, cancellationToken).ConfigureAwait(false);
await connector.Open(state.Timeout, async, cancellationToken).ConfigureAwait(false);
int i;
for (i = 0; i < MaxPoolSize; i++)
for (i = 0; i < state.Pool.MaxPoolSize; i++)
{
if (Interlocked.CompareExchange(ref _connectors[i], connector, null) == null)
if (Interlocked.CompareExchange(ref state.Pool._connectors[i], connector, null) == null)
{
break;
}
}
Debug.Assert(i < MaxPoolSize, $"Could not find free slot in {_connectors} when opening.");
if (i == MaxPoolSize)
Debug.Assert(i < state.Pool.MaxPoolSize, $"Could not find free slot in {state.Pool._connectors} when opening.");
if (i == state.Pool.MaxPoolSize)
{
//TODO: generic exception?
throw new Exception($"Could not find free slot in {_connectors} when opening. Please report a bug.");
throw new Exception($"Could not find free slot in {state.Pool._connectors} when opening. Please report a bug.");
}
// Only start pruning if we've incremented open count past _min.
@ -375,12 +388,12 @@ namespace Microsoft.Data.SqlClientX
catch
{
// Physical open failed, decrement the open and busy counter back down.
Interlocked.Decrement(ref _numConnectors);
Interlocked.Decrement(ref state.Pool._numConnectors);
// In case there's a waiting attempt on the channel, we write a null to the idle connector channel
// to wake it up, so it will try opening (and probably throw immediately)
// Statement order is important since we have synchronous completions on the channel.
_idleConnectorWriter.TryWrite(null);
state.Pool._idleConnectorWriter.TryWrite(null);
// Just in case we always call UpdatePruningTimer for failed physical open
//TODO: UpdatePruningTimer();
@ -425,12 +438,67 @@ namespace Microsoft.Data.SqlClientX
}
/// <summary>
/// Warms up the pool to bring it up to min pool size.
/// Warms up the pool by bringing it up to min pool size.
/// </summary>
/// <exception cref="NotImplementedException"></exception>
internal void WarmUp()
/// <returns>A ValueTask containing a ValueTask that represents the warmup process.</returns>
internal async ValueTask<ValueTask> WarmUp()
{
throw new NotImplementedException();
// Avoid semaphore wait if task is still running
if (!_warmupTask.IsCompleted)
{
return _warmupTask;
}
// Prevent multiple threads from modifying the warmup task
await _warmupLock.WaitAsync();
try
{
// The task may have been started by another thread while we were
// waiting on the semaphore
if (_warmupTask.IsCompleted)
{
_warmupTask = _WarmUp(_warmupCTS.Token);
}
}
finally
{
_warmupLock.Release();
}
return _warmupTask;
async ValueTask _WarmUp(CancellationToken ct)
{
// Best effort, we may over or under create due to race conditions.
// Open new connections slowly. If many connections are needed immediately
// upon pool creation they can always be created via user-initiated requests as fast
// as a parallel, pool-initiated approach could.
while (_numConnectors < MinPoolSize)
{
ct.ThrowIfCancellationRequested();
// Obey the same rate limit as user-initiated opens.
// Ensures that pool-initiated opens are queued properly alongside user requests.
SqlConnector? connector = await OpenNewInternalConnection(
null,
TimeSpan.FromSeconds(Settings.ConnectTimeout),
true,
ct)
.ConfigureAwait(false);
// If connector is null, then we hit the max pool size and can stop
// warming up the pool.
if (connector == null)
{
return;
}
// The connector has never been used, so it's safe to immediately return it to the
// pool without resetting it.
ReturnInternalConnection(connector);
}
}
}
/// <summary>
@ -439,6 +507,8 @@ namespace Microsoft.Data.SqlClientX
internal void Shutdown()
{
SqlClientEventSource.Log.TryPoolerTraceEvent("<prov.DbConnectionPool.Shutdown|RES|INFO|CPOOL> {0}", ObjectID);
_warmupCTS.Dispose();
_warmupLock.Dispose();
_connectionRateLimiter?.Dispose();
}

Просмотреть файл

@ -20,7 +20,7 @@ namespace Microsoft.Data.SqlClientX
{
private static int SpoofedServerProcessId = 1;
internal SqlConnector(SqlConnectionX owningConnection, SqlDataSource dataSource)
internal SqlConnector(SqlConnectionX? owningConnection, SqlDataSource dataSource)
{
OwningConnection = owningConnection;
DataSource = dataSource;

Просмотреть файл

@ -7,7 +7,6 @@
using System;
using System.Data.Common;
using System.Security.AccessControl;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Data.SqlClient;
@ -22,32 +21,43 @@ namespace Microsoft.Data.SqlClientX
/// </summary>
internal abstract class SqlDataSource : DbDataSource
{
#region private
private readonly SqlConnectionStringBuilder _connectionStringBuilder;
private protected volatile int _isDisposed;
#endregion
#region constructors
/// <summary>
/// Initializes a new instance of SqlDataSource.
/// </summary>
/// <param name="connectionStringBuilder">The connection string that connections produced by this data source should use.</param>
/// <param name="connectionString">The connection string that connections produced by this data source should use.</param>
/// <param name="credential">The credentials that connections produced by this data source should use.</param>
internal SqlDataSource(
SqlConnectionStringBuilder connectionStringBuilder,
SqlConnectionString connectionString,
SqlCredential credential)
{
_connectionStringBuilder = connectionStringBuilder;
Settings = connectionString;
var hidePassword = !connectionString.PersistSecurityInfo;
ConnectionString = connectionString.UsersConnectionString(hidePassword);
Credential = credential;
}
#endregion
#region properties
/// <inheritdoc />
public override string ConnectionString => _connectionStringBuilder.ConnectionString;
public override string ConnectionString { get; }
/// <summary>
/// Stores settings for the data source.
/// Do not expose publicly.
/// </summary>
internal SqlConnectionString Settings { get; }
/// <summary>
/// Credentials to use for new connections created by this data source.
/// </summary>
internal SqlCredential Credential { get; }
/// <summary>
/// Gives pool statistics for total, idle, and busy connections.
/// </summary>
internal abstract (int Total, int Idle, int Busy) Statistics { get; }
#endregion

Просмотреть файл

@ -23,7 +23,7 @@ namespace Microsoft.Data.SqlClientX
/// <summary>
/// A connection string builder that can be used to configure the connection string on the builder.
/// </summary>
public SqlConnectionStringBuilder ConnectionStringBuilder { get; }
public SqlConnectionString ConnectionString { get; }
// TODO: how does it interact with credentials specified in ConnectionStringBuilder?
public SqlCredential Credential { get; set; }
@ -44,7 +44,7 @@ namespace Microsoft.Data.SqlClientX
/// </summary>
public SqlDataSourceBuilder(SqlConnectionStringBuilder connectionStringBuilder, SqlCredential sqlCredential = null)
{
ConnectionStringBuilder = connectionStringBuilder;
ConnectionString = new SqlConnectionString(connectionStringBuilder.ConnectionString);
Credential = sqlCredential;
}
@ -53,23 +53,23 @@ namespace Microsoft.Data.SqlClientX
/// </summary>
public SqlDataSource Build()
{
if (ConnectionStringBuilder.Pooling)
if (ConnectionString.Pooling)
{
//TODO: pool group layer
DbConnectionPoolGroupOptions poolGroupOptions = new DbConnectionPoolGroupOptions(
ConnectionStringBuilder.IntegratedSecurity,
ConnectionStringBuilder.MinPoolSize,
ConnectionStringBuilder.MaxPoolSize,
ConnectionString.IntegratedSecurity,
ConnectionString.MinPoolSize,
ConnectionString.MaxPoolSize,
//TODO: carry over connect timeout conversion logic from SqlConnectionFactory? if not, don't need an extra allocation for this object, just use connection string builder
ConnectionStringBuilder.ConnectTimeout,
ConnectionStringBuilder.LoadBalanceTimeout,
ConnectionStringBuilder.Enlist);
ConnectionString.ConnectTimeout,
ConnectionString.LoadBalanceTimeout,
ConnectionString.Enlist);
//TODO: evaluate app context switch for concurrency limit
RateLimiterBase rateLimiter = IsBlockingPeriodEnabled() ? new BlockingPeriodRateLimiter() : new PassthroughRateLimiter();
return new PoolingDataSource(ConnectionStringBuilder,
return new PoolingDataSource(ConnectionString,
Credential,
poolGroupOptions,
rateLimiter);
@ -77,20 +77,20 @@ namespace Microsoft.Data.SqlClientX
else
{
return new UnpooledDataSource(
ConnectionStringBuilder,
ConnectionString,
Credential);
}
}
private bool IsBlockingPeriodEnabled()
{
var policy = ConnectionStringBuilder.PoolBlockingPeriod;
var policy = ConnectionString.PoolBlockingPeriod;
switch (policy)
{
case PoolBlockingPeriod.Auto:
{
if (ADP.IsAzureSqlServerEndpoint(ConnectionStringBuilder.DataSource))
if (ADP.IsAzureSqlServerEndpoint(ConnectionString.DataSource))
{
return false; // in Azure it will be Disabled
}

Просмотреть файл

@ -23,10 +23,10 @@ namespace Microsoft.Data.SqlClientX
/// <summary>
/// Initializes a new instance of UnpooledDataSource.
/// </summary>
/// <param name="connectionStringBuilder"></param>
/// <param name="connectionString"></param>
/// <param name="credential"></param>
internal UnpooledDataSource(SqlConnectionStringBuilder connectionStringBuilder, SqlCredential credential) :
base(connectionStringBuilder, credential)
internal UnpooledDataSource(SqlConnectionString connectionString, SqlCredential credential) :
base(connectionString, credential)
{
}

Просмотреть файл

@ -85,7 +85,7 @@ namespace Microsoft.Data.SqlClient.NetCore.UnitTests
else
conn2.Open();
}
[Theory]
[InlineData(true)]
[InlineData(false)]
@ -113,7 +113,7 @@ namespace Microsoft.Data.SqlClient.NetCore.UnitTests
// conn1 should now be back in the pool as idle
await using var conn3 = await dataSource.OpenConnectionAsync();
}
[Fact]
//[Explicit("Timing-based")]
public async Task OpenAsync_cancel()
@ -459,6 +459,53 @@ namespace Microsoft.Data.SqlClient.NetCore.UnitTests
Assert.That(conn.ProcessID, Is.Not.EqualTo(processId));
}
*/
[Theory]
[InlineData(0)]
[InlineData(1)]
[InlineData(5)]
public async Task Warmup_OpenedConnectionsEqualsMinPoolSize(int minPoolSize)
{
// Arrange
await using var dataSource = (PoolingDataSource)testBase.CreateDataSource(csb => csb.MinPoolSize = minPoolSize);
// Act
await await dataSource.WarmUp();
// Assert
Assert.Equal(minPoolSize, dataSource.Statistics.Total);
Assert.Equal(minPoolSize, dataSource.Statistics.Idle);
}
[Fact]
public async Task Warmup_ConcurrentWarmupCalls_OnlyOneExecuted()
{
// Arrange
await using var dataSource = (PoolingDataSource)testBase.CreateDataSource(csb => csb.MinPoolSize = 10);
// Act
ValueTask t1 = await dataSource.WarmUp();
ValueTask t2 = await dataSource.WarmUp();
// Assert
Assert.Equal(t1, t2);
}
[Fact]
public async Task Warmup_SequentialWarmupCalls_BothExecuted()
{
// Arrange
await using var dataSource = (PoolingDataSource)testBase.CreateDataSource(csb => csb.MinPoolSize = 10);
// Act
ValueTask t1 = await dataSource.WarmUp();
await t1;
ValueTask t2 = await dataSource.WarmUp();
// Assert
Assert.NotEqual(t1, t2);
}
#region Support
volatile int StopFlag;