зеркало из
1
0
Форкнуть 0

Enable connection pooling in Peregrine ADO

This commit is contained in:
Andrew Peters 2018-01-31 13:27:13 -08:00
Родитель 4bb31a18c2
Коммит e0d6e40598
4 изменённых файлов: 142 добавлений и 44 удалений

Просмотреть файл

@ -34,7 +34,7 @@ namespace Peregrine.Tests
{
await connection.OpenAsync();
using (var command = connection.CreateCommand())
using (var command = (PeregrineCommand)connection.CreateCommand())
{
command.CommandText = "select id, message from fortune";

Просмотреть файл

@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Data;
using System.Data.Common;
using System.Linq;
@ -12,8 +13,50 @@ namespace Peregrine.Ado
{
public class PeregrineConnection : DbConnection
{
private sealed class Pool
{
private readonly PGSession[] _sessions;
public Pool(int maximumRetained)
{
_sessions = new PGSession[maximumRetained];
}
public PGSession Rent()
{
for (var i = 0; i < _sessions.Length; i++)
{
var item = _sessions[i];
if (item != null
&& Interlocked.CompareExchange(ref _sessions[i], null, item) == item)
{
return item;
}
}
return null;
}
public void Return(PGSession session)
{
for (var i = 0; i < _sessions.Length; i++)
{
if (Interlocked.CompareExchange(ref _sessions[i], session, null) == null)
{
return;
}
}
}
}
private static readonly ConcurrentDictionary<string, Pool> _pools
= new ConcurrentDictionary<string, Pool>();
private PGSession _session;
private bool _disposed;
public override string ConnectionString { get; set; }
public override ConnectionState State
@ -23,44 +66,57 @@ namespace Peregrine.Ado
public override Task OpenAsync(CancellationToken cancellationToken)
{
ThrowIfDisposed();
if (_session != null)
{
return Task.CompletedTask;
}
var pool = _pools.GetOrAdd(ConnectionString, _ => new Pool(256));
_session = pool.Rent();
if (_session == null)
{
var connectionInfo
var parts
= (from s in ConnectionString.Split(';')
let kv = s.Split('=')
select kv)
.ToDictionary(p => p[0], p => p[1]);
_session
= new PGSession(
connectionInfo["host"],
5432,
connectionInfo["database"],
connectionInfo["username"],
connectionInfo["password"]);
return _session.StartAsync();
_session = new PGSession(
parts["host"],
port: 5432,
database: parts["database"],
user: parts["username"],
password: parts["password"]);
}
return Task.CompletedTask;
return _session.IsConnected ? Task.CompletedTask : _session.StartAsync();
}
public new PeregrineCommand CreateCommand()
private void ThrowIfDisposed()
{
return new PeregrineCommand(_session);
if (_disposed)
{
throw new ObjectDisposedException(nameof(PeregrineConnection));
}
}
protected override DbCommand CreateDbCommand()
protected override void Dispose(bool disposing)
{
return new PeregrineCommand(_session);
if (_session != null)
{
_pools[ConnectionString].Return(_session);
}
_disposed = true;
}
public override void Close()
{
_session?.Dispose();
_session = null;
}
protected override DbCommand CreateDbCommand() => new PeregrineCommand(_session);
public override void Close() => throw new NotImplementedException();
public override string Database => throw new NotImplementedException();
public override string DataSource => throw new NotImplementedException();
public override string ServerVersion => throw new NotImplementedException();

Просмотреть файл

@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;
@ -20,12 +21,13 @@ namespace Peregrine
private readonly string _password;
private WriteBuffer _writeBuffer;
private ReadBuffer _readBuffer;
private AwaitableSocket _awaitableSocket;
private bool _disposed;
private readonly HashSet<string> _prepared = new HashSet<string>();
public PGSession(
string host,
int port,
@ -50,11 +52,18 @@ namespace Peregrine
}
}
public async Task PrepareAsync(string statementName, string query)
public Task PrepareAsync(string statementName, string query)
{
ThrowIfDisposed();
ThrowIfNotConnected();
return _prepared.Contains(statementName)
? Task.CompletedTask
: PrepareSlow(statementName, query);
}
private async Task PrepareSlow(string statementName, string query)
{
await _writeBuffer
.StartMessage('P')
.WriteString(statementName)
@ -65,17 +74,18 @@ namespace Peregrine
.EndMessage()
.FlushAsync();
await _readBuffer.ReceiveAsync();
await ReadBuffer.ReceiveAsync();
var message = _readBuffer.ReadMessage();
var message = ReadBuffer.ReadMessage();
switch (message)
{
case MessageType.ParseComplete:
_prepared.Add(statementName);
break;
case MessageType.ErrorResponse:
throw new InvalidOperationException(_readBuffer.ReadErrorMessage());
throw new InvalidOperationException(ReadBuffer.ReadErrorMessage());
default:
throw new NotImplementedException(message.ToString());
@ -92,7 +102,7 @@ namespace Peregrine
return WriteExecFinish();
}
internal ReadBuffer ReadBuffer => _readBuffer;
internal ReadBuffer ReadBuffer { get; private set; }
public Task ExecuteAsync<TResult>(
string statementName,
@ -154,11 +164,11 @@ namespace Peregrine
Action<TResult, ReadBuffer, int, int> columnBinder)
{
await WriteExecFinish();
await _readBuffer.ReceiveAsync();
await ReadBuffer.ReceiveAsync();
read:
var message = _readBuffer.ReadMessage();
var message = ReadBuffer.ReadMessage();
switch (message)
{
@ -172,13 +182,13 @@ namespace Peregrine
? resultFactory(initialState)
: default;
var columns = _readBuffer.ReadShort();
var columns = ReadBuffer.ReadShort();
for (var i = 0; i < columns; i++)
{
var length = _readBuffer.ReadInt();
var length = ReadBuffer.ReadInt();
columnBinder(result, _readBuffer, i, length);
columnBinder(result, ReadBuffer, i, length);
}
goto read;
@ -188,7 +198,7 @@ namespace Peregrine
return;
case MessageType.ErrorResponse:
throw new InvalidOperationException(_readBuffer.ReadErrorMessage());
throw new InvalidOperationException(ReadBuffer.ReadErrorMessage());
default:
throw new NotImplementedException(message.ToString());
@ -209,22 +219,22 @@ namespace Peregrine
await OpenSocketAsync(millisecondsTimeout);
_writeBuffer = new WriteBuffer(_awaitableSocket);
_readBuffer = new ReadBuffer(_awaitableSocket);
ReadBuffer = new ReadBuffer(_awaitableSocket);
await WriteStartupAsync();
await _readBuffer.ReceiveAsync();
await ReadBuffer.ReceiveAsync();
read:
var message = _readBuffer.ReadMessage();
var message = ReadBuffer.ReadMessage();
switch (message)
{
case MessageType.AuthenticationRequest:
{
var authenticationRequestType
= (AuthenticationRequestType)_readBuffer.ReadInt();
= (AuthenticationRequestType)ReadBuffer.ReadInt();
switch (authenticationRequestType)
{
@ -235,7 +245,7 @@ namespace Peregrine
case AuthenticationRequestType.AuthenticationMD5Password:
{
var salt = _readBuffer.ReadBytes(4);
var salt = ReadBuffer.ReadBytes(4);
var hash = Hashing.CreateMD5(_password, _user, salt);
await _writeBuffer
@ -244,7 +254,7 @@ namespace Peregrine
.EndMessage()
.FlushAsync();
await _readBuffer.ReceiveAsync();
await ReadBuffer.ReceiveAsync();
goto read;
}
@ -255,7 +265,7 @@ namespace Peregrine
}
case MessageType.ErrorResponse:
throw new InvalidOperationException(_readBuffer.ReadErrorMessage());
throw new InvalidOperationException(ReadBuffer.ReadErrorMessage());
case MessageType.BackendKeyData:
case MessageType.EmptyQueryResponse:

Просмотреть файл

@ -1,7 +1,6 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Threading.Tasks;
@ -16,10 +15,43 @@ namespace BenchmarkDb
{
}
public override Task DoWorkAsync()
public override async Task DoWorkAsync()
{
// No pooling in Peregrine ADO
throw new NotImplementedException();
while (Program.IsRunning)
{
var results = new List<Fortune>();
using (var connection = _providerFactory.CreateConnection())
{
connection.ConnectionString = _connectionString;
await connection.OpenAsync();
using (var command = (PeregrineCommand)connection.CreateCommand())
{
command.CommandText = Program.TestQuery;
await command.PrepareAsync();
using (var reader = await command.ExecuteReaderAsync())
{
while (await reader.ReadAsync())
{
results.Add(
new Fortune
{
Id = reader.GetInt32(0),
Message = reader.GetString(1)
});
}
}
}
}
CheckResults(results);
Program.IncrementCounter();
}
}
public override async Task DoWorkAsyncCaching()