Shutdown unregistered connections on worker pool (#483)

This will ensure we don't race manually cleaning up the unregistered connections and work being done on them by the datapath.

Also updates quicspin to loop, allowing the shutdown path to be hit more often.

Should fix #482
This commit is contained in:
Thad House 2020-06-11 10:26:31 -07:00 коммит произвёл GitHub
Родитель 14b56c871c
Коммит 63c86ac0fc
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 109 добавлений и 92 удалений

Просмотреть файл

@ -35,7 +35,7 @@ jobs:
inputs:
pwsh: true
filePath: scripts/spin.ps1
arguments: -GenerateXmlResults -Timeout 600000 -ConvertLogs -Config ${{ parameters.config }} -Arch ${{ parameters.arch }} -Tls ${{ parameters.tls }}
arguments: -GenerateXmlResults -Timeout 600000 -RepeatCount 6 -ConvertLogs -Config ${{ parameters.config }} -Arch ${{ parameters.arch }} -Tls ${{ parameters.tls }}
- template: ./upload-test-artifacts.yml
parameters:

Просмотреть файл

@ -15,6 +15,9 @@ This script runs spinquic locally for a period of time.
.PARAMETER Timeout
The run time in milliseconds.
.Parameter RepeatCount
The amount of times to repeat the full test
.PARAMETER KeepOutputOnSuccess
Don't discard console output or logs on success.
@ -48,6 +51,9 @@ param (
[Parameter(Mandatory = $false)]
[Int32]$Timeout = 60000,
[Parameter(Mandatory = $false)]
[Int32]$RepeatCount = 1,
[Parameter(Mandatory = $false)]
[switch]$KeepOutputOnSuccess = $false,
@ -97,7 +103,7 @@ if (!(Test-Path $SpinQuic)) {
}
# Build up all the arguments to pass to the Powershell script.
$Arguments = "-Path $($SpinQuic) -Arguments 'both -timeout:$($Timeout)' -ShowOutput"
$Arguments = "-Path $($SpinQuic) -Arguments 'both -timeout:$($Timeout) -repeat_count:$($RepeatCount)' -ShowOutput"
if ($KeepOutputOnSuccess) {
$Arguments += " -KeepOutputOnSuccess"
}

Просмотреть файл

@ -326,15 +326,10 @@ MsQuicSessionClose(
} else {
//
// This is the global unregistered session. All connections need to be
// immediately cleaned up.
// immediately cleaned up. Use shutdown to ensure this all gets placed
// on the worker queue.
//
QUIC_LIST_ENTRY* Entry = Session->Connections.Flink;
while (Entry != &Session->Connections) {
QUIC_CONNECTION* Connection =
QUIC_CONTAINING_RECORD(Entry, QUIC_CONNECTION, SessionLink);
Entry = Entry->Flink;
QuicConnOnShutdownComplete(Connection);
}
MsQuicSessionShutdown(Handle, QUIC_CONNECTION_SHUTDOWN_FLAG_SILENT, 0);
}
QuicRundownReleaseAndWait(&Session->Rundown);

Просмотреть файл

@ -666,14 +666,15 @@ void PrintHelpText(void)
{
printf("Usage: spinquic.exe [client/server/both] [options]\n" \
"\n" \
" -alpn:<alpn> default: 'spin'\n" \
" -dstport:<port> default: 9999\n" \
" -loss:<percent> default: 1\n" \
" -max_ops:<count> default: UINT64_MAX\n"
" -seed:<seed> default: 6\n" \
" -sessions:<count> default: 4\n" \
" -target:<ip> default: '127.0.0.1'\n" \
" -timeout:<count_ms> default: 60000\n" \
" -alpn:<alpn> default: 'spin'\n" \
" -dstport:<port> default: 9999\n" \
" -loss:<percent> default: 1\n" \
" -max_ops:<count> default: UINT64_MAX\n"
" -seed:<seed> default: 6\n" \
" -sessions:<count> default: 4\n" \
" -target:<ip> default: '127.0.0.1'\n" \
" -timeout:<count_ms> default: 60000\n" \
" -repeat_count:<count> default: 1\n" \
);
exit(1);
}
@ -705,6 +706,7 @@ main(int argc, char **argv)
QuicPlatformInitialize();
uint32_t SessionCount = 4;
uint32_t RepeatCount = 1;
Settings.RunTimeMs = 60000;
Settings.ServerName = "127.0.0.1";
@ -716,6 +718,12 @@ main(int argc, char **argv)
TryGetValue(argc, argv, "timeout", &Settings.RunTimeMs);
TryGetValue(argc, argv, "max_ops", &Settings.MaxOperationCount);
TryGetValue(argc, argv, "loss", &Settings.LossPercent);
TryGetValue(argc, argv, "repeat_count", &RepeatCount);
if (RepeatCount == 0) {
printf("Must specify a non 0 repeat count\n");
PrintHelpText();
}
if (RunClient) {
uint16_t dstPort = 0;
@ -731,103 +739,111 @@ main(int argc, char **argv)
TryGetValue(argc, argv, "seed", &RngSeed);
srand(RngSeed);
for (size_t i = 0; i < BufferCount; ++i) {
Buffers[i].Length = MaxBufferSizes[i]; // TODO - Randomize?
Buffers[i].Buffer = (uint8_t*)malloc(Buffers[i].Length);
EXIT_ON_NOT(Buffers[i].Buffer);
}
SpinQuicWatchdog Watchdog((uint32_t)Settings.RunTimeMs + WATCHDOG_WIGGLE_ROOM);
EXIT_ON_FAILURE(MsQuicOpen(&MsQuic));
Settings.RunTimeMs = Settings.RunTimeMs / RepeatCount;
if (Settings.LossPercent != 0) {
QUIC_TEST_DATAPATH_HOOKS* Value = &DataPathHooks;
EXIT_ON_FAILURE(
MsQuic->SetParam(
nullptr,
QUIC_PARAM_LEVEL_GLOBAL,
QUIC_PARAM_GLOBAL_TEST_DATAPATH_HOOKS,
sizeof(Value),
&Value));
}
for (uint32_t i = 0; i < RepeatCount; i++) {
for (size_t i = 0; i < BufferCount; ++i) {
Buffers[i].Length = MaxBufferSizes[i]; // TODO - Randomize?
Buffers[i].Buffer = (uint8_t*)malloc(Buffers[i].Length);
EXIT_ON_NOT(Buffers[i].Buffer);
}
const QUIC_REGISTRATION_CONFIG RegConfig = { "spinquic", QUIC_EXECUTION_PROFILE_LOW_LATENCY };
EXIT_ON_FAILURE(MsQuic->RegistrationOpen(&RegConfig, &Registration));
EXIT_ON_FAILURE(MsQuicOpen(&MsQuic));
QUIC_BUFFER AlpnBuffer;
AlpnBuffer.Length = (uint32_t)strlen(Settings.AlpnPrefix) + 1; // You can't have more than 2^8 SessionCount. :)
AlpnBuffer.Buffer = (uint8_t*)malloc(AlpnBuffer.Length);
EXIT_ON_NOT(AlpnBuffer.Buffer);
memcpy(AlpnBuffer.Buffer, Settings.AlpnPrefix, AlpnBuffer.Length);
if (Settings.LossPercent != 0) {
QUIC_TEST_DATAPATH_HOOKS* Value = &DataPathHooks;
EXIT_ON_FAILURE(
MsQuic->SetParam(
nullptr,
QUIC_PARAM_LEVEL_GLOBAL,
QUIC_PARAM_GLOBAL_TEST_DATAPATH_HOOKS,
sizeof(Value),
&Value));
}
for (uint32_t i = 0; i < SessionCount; i++) {
const QUIC_REGISTRATION_CONFIG RegConfig = { "spinquic", QUIC_EXECUTION_PROFILE_LOW_LATENCY };
EXIT_ON_FAILURE(MsQuic->RegistrationOpen(&RegConfig, &Registration));
AlpnBuffer.Buffer[AlpnBuffer.Length-1] = (uint8_t)i;
QUIC_BUFFER AlpnBuffer;
AlpnBuffer.Length = (uint32_t)strlen(Settings.AlpnPrefix) + 1; // You can't have more than 2^8 SessionCount. :)
AlpnBuffer.Buffer = (uint8_t*)malloc(AlpnBuffer.Length);
EXIT_ON_NOT(AlpnBuffer.Buffer);
memcpy(AlpnBuffer.Buffer, Settings.AlpnPrefix, AlpnBuffer.Length);
HQUIC Session;
EXIT_ON_FAILURE(MsQuic->SessionOpen(Registration, &AlpnBuffer, 1, nullptr, &Session));
Sessions.push_back(Session);
for (uint32_t i = 0; i < SessionCount; i++) {
// Configure Session
auto PeerStreamCount = GetRandom((uint16_t)10);
EXIT_ON_FAILURE(MsQuic->SetParam(Session, QUIC_PARAM_LEVEL_SESSION, QUIC_PARAM_SESSION_PEER_BIDI_STREAM_COUNT, sizeof(PeerStreamCount), &PeerStreamCount));
EXIT_ON_FAILURE(MsQuic->SetParam(Session, QUIC_PARAM_LEVEL_SESSION, QUIC_PARAM_SESSION_PEER_UNIDI_STREAM_COUNT, sizeof(PeerStreamCount), &PeerStreamCount));
}
AlpnBuffer.Buffer[AlpnBuffer.Length-1] = (uint8_t)i;
free(AlpnBuffer.Buffer);
HQUIC Session;
EXIT_ON_FAILURE(MsQuic->SessionOpen(Registration, &AlpnBuffer, 1, nullptr, &Session));
Sessions.push_back(Session);
QUIC_THREAD Threads[2];
QUIC_THREAD_CONFIG Config = { 0 };
// Configure Session
auto PeerStreamCount = GetRandom((uint16_t)10);
EXIT_ON_FAILURE(MsQuic->SetParam(Session, QUIC_PARAM_LEVEL_SESSION, QUIC_PARAM_SESSION_PEER_BIDI_STREAM_COUNT, sizeof(PeerStreamCount), &PeerStreamCount));
EXIT_ON_FAILURE(MsQuic->SetParam(Session, QUIC_PARAM_LEVEL_SESSION, QUIC_PARAM_SESSION_PEER_UNIDI_STREAM_COUNT, sizeof(PeerStreamCount), &PeerStreamCount));
}
StartTimeMs = QuicTimeMs64();
free(AlpnBuffer.Buffer);
//
// Start worker threads
//
QUIC_THREAD Threads[2];
QUIC_THREAD_CONFIG Config = { 0 };
if (RunServer) {
Config.Name = "spin_server";
Config.Callback = ServerSpin;
EXIT_ON_FAILURE(QuicThreadCreate(&Config, &Threads[0]));
}
StartTimeMs = QuicTimeMs64();
if (RunClient) {
Config.Name = "spin_client";
Config.Callback = ClientSpin;
EXIT_ON_FAILURE(QuicThreadCreate(&Config, &Threads[1]));
}
//
// Start worker threads
//
//
// Wait on worker threads
//
if (RunServer) {
Config.Name = "spin_server";
Config.Callback = ServerSpin;
EXIT_ON_FAILURE(QuicThreadCreate(&Config, &Threads[0]));
}
if (RunClient) {
QuicThreadWait(&Threads[1]);
QuicThreadDelete(&Threads[1]);
}
if (RunClient) {
Config.Name = "spin_client";
Config.Callback = ClientSpin;
EXIT_ON_FAILURE(QuicThreadCreate(&Config, &Threads[1]));
}
if (RunServer) {
QuicThreadWait(&Threads[0]);
QuicThreadDelete(&Threads[0]);
}
//
// Wait on worker threads
//
//
// Clean up
//
if (RunClient) {
QuicThreadWait(&Threads[1]);
QuicThreadDelete(&Threads[1]);
}
while (Sessions.size() > 0) {
auto Session = Sessions.back();
Sessions.pop_back();
MsQuic->SessionClose(Session);
}
if (RunServer) {
QuicThreadWait(&Threads[0]);
QuicThreadDelete(&Threads[0]);
}
MsQuic->RegistrationClose(Registration);
//
// Clean up
//
MsQuicClose(MsQuic);
while (Sessions.size() > 0) {
auto Session = Sessions.back();
Sessions.pop_back();
MsQuic->SessionClose(Session);
}
for (size_t i = 0; i < BufferCount; ++i) {
free(Buffers[i].Buffer);
MsQuic->RegistrationClose(Registration);
Registration = nullptr;
MsQuicClose(MsQuic);
MsQuic = nullptr;
for (size_t i = 0; i < BufferCount; ++i) {
free(Buffers[i].Buffer);
}
}
return 0;