Add Priority Connection Queue to Worker (#4386)

* Support Priority Work on Connections

* few nits

* Make CodeCheck builds happy (hopefully)

* Name tweak

* Add Priority Connection Queue to Worker (WIP)

* wip

* Fix openssl3

* revert spinquic

* Fix start comparison

* More fixes

* slight improvement

* slight change

* slight refactoring

* more refactoring

* Trying to debug failures

* tmp

* basic tests

* 3 test cases

* rollback QuicWorkerQueueConnection

* priority queueing to follow normal queueing

* cleanup and refactoring

* try disabling high priority GetParam

* sleep for flush Connections from worker queue

* empirical solution

* priority connection re-entry

---------

Co-authored-by: ami-GS <1991.daiki@gmail.com>
This commit is contained in:
Nick Banks 2024-08-05 00:59:40 -04:00 коммит произвёл GitHub
Родитель 9cb53ee75f
Коммит c689d51133
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
12 изменённых файлов: 416 добавлений и 28 удалений

Просмотреть файл

@ -735,7 +735,7 @@ QuicConnQueuePriorityOper(
// The connection needs to be queued on the worker because this was the
// first operation in our OperQ.
//
QuicWorkerQueueConnection(Connection->Worker, Connection); // TODO - Support priority connections on worker?
QuicWorkerQueuePriorityConnection(Connection->Worker, Connection);
}
}
@ -751,7 +751,7 @@ QuicConnQueueHighestPriorityOper(
// The connection needs to be queued on the worker because this was the
// first operation in our OperQ.
//
QuicWorkerQueueConnection(Connection->Worker, Connection);
QuicWorkerQueuePriorityConnection(Connection->Worker, Connection);
}
}

Просмотреть файл

@ -426,6 +426,7 @@ typedef struct QUIC_CONNECTION {
//
BOOLEAN WorkerProcessing : 1;
BOOLEAN HasQueuedWork : 1;
BOOLEAN HasPriorityWork : 1;
//
// Set of current reasons sending more packets is currently blocked.

Просмотреть файл

@ -351,7 +351,8 @@ QuicOperationEnqueue(
//
// Enqueues an operation into the priority part of the queue. Returns TRUE if
// the queue was previously empty and not already being processed.
// the priority queue was previously empty and not already being processed. Note
// that the regular queue might not have been empty.
//
_IRQL_requires_max_(DISPATCH_LEVEL)
BOOLEAN

Просмотреть файл

@ -76,6 +76,7 @@ QuicWorkerInitialize(
CxPlatEventInitialize(&Worker->Done, TRUE, FALSE);
CxPlatEventInitialize(&Worker->Ready, FALSE, FALSE);
CxPlatListInitializeHead(&Worker->Connections);
Worker->PriorityConnectionsTail = &Worker->Connections.Flink;
CxPlatListInitializeHead(&Worker->Operations);
CxPlatPoolInitialize(FALSE, sizeof(QUIC_STREAM), QUIC_POOL_STREAM, &Worker->StreamPool);
CxPlatPoolInitialize(FALSE, sizeof(QUIC_RECV_CHUNK)+QUIC_DEFAULT_STREAM_RECV_BUFFER_SIZE, QUIC_POOL_SBUF, &Worker->DefaultReceiveBufferPool);
@ -169,6 +170,7 @@ QuicWorkerUninitialize(
CxPlatEventUninitialize(Worker->Ready);
CXPLAT_TEL_ASSERT(CxPlatListIsEmpty(&Worker->Connections));
Worker->PriorityConnectionsTail = NULL;
CXPLAT_TEL_ASSERT(CxPlatListIsEmpty(&Worker->Operations));
CxPlatPoolUninitialize(&Worker->StreamPool);
@ -222,10 +224,10 @@ QuicWorkerQueueConnection(
{
CXPLAT_DBG_ASSERT(Connection->Worker != NULL);
BOOLEAN ConnectionQueued = FALSE;
BOOLEAN WakeWorkerThread = FALSE;
CxPlatDispatchLockAcquire(&Worker->Lock);
BOOLEAN WakeWorkerThread;
if (!Connection->WorkerProcessing && !Connection->HasQueuedWork) {
WakeWorkerThread = QuicWorkerIsIdle(Worker);
Connection->Stats.Schedule.LastQueueTime = CxPlatTimeUs32();
@ -237,8 +239,6 @@ QuicWorkerQueueConnection(
QuicConnAddRef(Connection, QUIC_CONN_REF_WORKER);
CxPlatListInsertTail(&Worker->Connections, &Connection->WorkerLink);
ConnectionQueued = TRUE;
} else {
WakeWorkerThread = FALSE;
}
Connection->HasQueuedWork = TRUE;
@ -246,11 +246,54 @@ QuicWorkerQueueConnection(
CxPlatDispatchLockRelease(&Worker->Lock);
if (ConnectionQueued) {
if (WakeWorkerThread) {
QuicWorkerThreadWake(Worker);
}
QuicPerfCounterIncrement(QUIC_PERF_COUNTER_CONN_QUEUE_DEPTH);
}
}
if (WakeWorkerThread) {
QuicWorkerThreadWake(Worker);
_IRQL_requires_max_(DISPATCH_LEVEL)
void
QuicWorkerQueuePriorityConnection(
_In_ QUIC_WORKER* Worker,
_In_ QUIC_CONNECTION* Connection
)
{
CXPLAT_DBG_ASSERT(Connection->Worker != NULL);
BOOLEAN ConnectionQueued = FALSE;
BOOLEAN WakeWorkerThread = FALSE;
CxPlatDispatchLockAcquire(&Worker->Lock);
if (!Connection->WorkerProcessing && !Connection->HasPriorityWork) {
if (!Connection->HasQueuedWork) { // Not already queued for normal priority work
WakeWorkerThread = QuicWorkerIsIdle(Worker);
Connection->Stats.Schedule.LastQueueTime = CxPlatTimeUs32();
QuicTraceEvent(
ConnScheduleState,
"[conn][%p] Scheduling: %u",
Connection,
QUIC_SCHEDULE_QUEUED);
QuicConnAddRef(Connection, QUIC_CONN_REF_WORKER);
ConnectionQueued = TRUE;
} else { // Moving from normal priority to high priority
CxPlatListEntryRemove(&Connection->WorkerLink);
}
CxPlatListInsertTail(*Worker->PriorityConnectionsTail, &Connection->WorkerLink);
Worker->PriorityConnectionsTail = &Connection->WorkerLink.Flink;
Connection->HasPriorityWork = TRUE;
}
Connection->HasQueuedWork = TRUE;
CxPlatDispatchLockRelease(&Worker->Lock);
if (ConnectionQueued) {
if (WakeWorkerThread) {
QuicWorkerThreadWake(Worker);
}
QuicPerfCounterIncrement(QUIC_PERF_COUNTER_CONN_QUEUE_DEPTH);
}
}
@ -365,9 +408,13 @@ QuicWorkerGetNextConnection(
Connection =
CXPLAT_CONTAINING_RECORD(
CxPlatListRemoveHead(&Worker->Connections), QUIC_CONNECTION, WorkerLink);
if (Worker->PriorityConnectionsTail == &Connection->WorkerLink.Flink) {
Worker->PriorityConnectionsTail = &Worker->Connections.Flink;
}
CXPLAT_DBG_ASSERT(!Connection->WorkerProcessing);
CXPLAT_DBG_ASSERT(Connection->HasQueuedWork);
Connection->HasQueuedWork = FALSE;
Connection->HasPriorityWork = FALSE;
Connection->WorkerProcessing = TRUE;
QuicPerfCounterDecrement(QUIC_PERF_COUNTER_CONN_QUEUE_DEPTH);
}
@ -517,7 +564,14 @@ QuicWorkerProcessConnection(
if (!Connection->State.UpdateWorker) {
if (Connection->HasQueuedWork) {
Connection->Stats.Schedule.LastQueueTime = CxPlatTimeUs32();
CxPlatListInsertTail(&Worker->Connections, &Connection->WorkerLink);
if (&Connection->OperQ.List.Flink != Connection->OperQ.PriorityTail) {
// priority operations are still pending
CxPlatListInsertTail(*Worker->PriorityConnectionsTail, &Connection->WorkerLink);
Worker->PriorityConnectionsTail = &Connection->WorkerLink.Flink;
Connection->HasPriorityWork = TRUE;
} else {
CxPlatListInsertTail(&Worker->Connections, &Connection->WorkerLink);
}
QuicTraceEvent(
ConnScheduleState,
"[conn][%p] Scheduling: %u",
@ -577,6 +631,9 @@ QuicWorkerLoopCleanup(
QUIC_CONNECTION* Connection =
CXPLAT_CONTAINING_RECORD(
CxPlatListRemoveHead(&Worker->Connections), QUIC_CONNECTION, WorkerLink);
if (Worker->PriorityConnectionsTail == &Connection->WorkerLink.Flink) {
Worker->PriorityConnectionsTail = &Worker->Connections.Flink;
}
if (!Connection->State.ExternalOwner) {
//
// If there is no external owner, shut down the connection so

Просмотреть файл

@ -70,6 +70,7 @@ typedef struct QUIC_CACHEALIGN QUIC_WORKER {
// Queue of connections with operations to be processed.
//
CXPLAT_LIST_ENTRY Connections;
CXPLAT_LIST_ENTRY** PriorityConnectionsTail;
//
// Queue of stateless operations to be processed.
@ -184,6 +185,17 @@ QuicWorkerQueueConnection(
_In_ QUIC_CONNECTION* Connection
);
//
// Queues a priority connection onto the worker, and kicks the worker thread if
// necessary.
//
_IRQL_requires_max_(DISPATCH_LEVEL)
void
QuicWorkerQueuePriorityConnection(
_In_ QUIC_WORKER* Worker,
_In_ QUIC_CONNECTION* Connection
);
//
// Queues the operation onto the worker, and kicks the worker thread if
// necessary.

Просмотреть файл

@ -110,23 +110,41 @@ EXT_COMMAND(
//
Dml("\n<u>OPERATIONS</u>\n"
"\n");
"\n"
"\tWorker Processing %s\n"
"\tHas Queued Work %s\n"
"\tHas Priority Work %s\n",
Conn.WorkerProcessing() ? "TRUE" : "FALSE",
Conn.HasQueuedWork() ? "TRUE" : "FALSE",
Conn.HasPriorityWork() ? "TRUE" : "FALSE");
bool HasAtLeastOneOperation = false;
auto Operations = Conn.GetOperQueue().GetOperations();
while (!CheckControlC()) {
auto OperLinkAddr = Operations.Next();
if (OperLinkAddr == 0) {
break;
if (Operations.IsEmpty()) {
Dml("\t\tNo Operations Queued\n");
} else {
bool IsHighPriority = true;
bool IsFirstOperation = true;
UINT64 PriorityTail;
ReadPointerAtAddr(Conn.GetOperQueue().GetPriorityTail(), &PriorityTail);
while (!CheckControlC()) {
auto OperLinkAddr = Operations.Next();
if (OperLinkAddr == 0) {
break;
}
if (PriorityTail == OperLinkAddr) {
IsHighPriority = false;
Dml("\n\tNORMAL PRIORITY:\n\n");
}
if (IsFirstOperation && IsHighPriority) {
Dml("\n\tHIGH PRIORITY:\n\n");
}
auto Operation = Operation::FromLink(OperLinkAddr);
Dml("\t\t%s\n", Operation.TypeStr());
IsFirstOperation = false;
}
auto Operation = Operation::FromLink(OperLinkAddr);
Dml("\t%s\n", Operation.TypeStr());
HasAtLeastOneOperation = true;
}
if (!HasAtLeastOneOperation) {
Dml("\tNo Operations Queued\n");
}
//

Просмотреть файл

@ -1123,6 +1123,10 @@ struct OperQueue : Struct {
LinkedList GetOperations() {
return LinkedList(AddrOf("List"));
}
ULONG64 GetPriorityTail() {
return ReadPointer("PriorityTail");
}
};
struct StreamSet : Struct {
@ -1216,6 +1220,18 @@ struct Connection : Struct {
}
}
BYTE WorkerProcessing() {
return ReadType<BYTE>("WorkerProcessing");
}
BYTE HasQueuedWork() {
return ReadType<BYTE>("HasQueuedWork");
}
BYTE HasPriorityWork() {
return ReadType<BYTE>("HasPriorityWork");
}
IpAddress GetLocalAddress() {
return IpAddress(AddrOf("LocalAddress")); // TODO - Broken
}

Просмотреть файл

@ -579,6 +579,10 @@ void
QuicTestOperationPriority(
);
void
QuicTestConnectionPriority(
);
void
QuicTestConnectionStreamStartSendPriority(
);
@ -1305,4 +1309,7 @@ typedef struct {
#define IOCTL_QUIC_RUN_OPERATION_PRIORITY \
QUIC_CTL_CODE(122, METHOD_BUFFERED, FILE_WRITE_DATA)
#define QUIC_MAX_IOCTL_FUNC_CODE 122
#define IOCTL_QUIC_RUN_CONNECTION_PRIORITY \
QUIC_CTL_CODE(123, METHOD_BUFFERED, FILE_WRITE_DATA)
#define QUIC_MAX_IOCTL_FUNC_CODE 123

Просмотреть файл

@ -2229,6 +2229,15 @@ TEST(Basic, OperationPriority) {
}
}
TEST(Basic, ConnectionPriority) {
TestLogger Logger("ConnectionPriority");
if (TestingKernelMode) {
ASSERT_TRUE(DriverClient.Run(IOCTL_QUIC_RUN_CONNECTION_PRIORITY));
} else {
QuicTestConnectionPriority();
}
}
TEST(Drill, VarIntEncoder) {
TestLogger Logger("QuicDrillTestVarIntEncoder");
if (TestingKernelMode) {

Просмотреть файл

@ -521,6 +521,7 @@ size_t QUIC_IOCTL_BUFFER_SIZES[] =
sizeof(BOOLEAN),
0,
0,
0,
};
CXPLAT_STATIC_ASSERT(
@ -1457,6 +1458,10 @@ QuicTestCtlEvtIoDeviceControl(
QuicTestCtlRun(QuicTestOperationPriority());
break;
case IOCTL_QUIC_RUN_CONNECTION_PRIORITY:
QuicTestCtlRun(QuicTestConnectionPriority());
break;
default:
Status = STATUS_NOT_IMPLEMENTED;
break;

Просмотреть файл

@ -3372,10 +3372,7 @@ struct OperationPriorityTestContext {
MsQuicStream* ExpectedStream {nullptr};
bool TestSucceeded {false};
static QUIC_STATUS ServerStreamCallback(_In_ MsQuicStream* Stream, _In_opt_ void* Context, _Inout_ QUIC_STREAM_EVENT* Event) {
UNREFERENCED_PARAMETER(Stream);
UNREFERENCED_PARAMETER(Context);
UNREFERENCED_PARAMETER(Event);
static QUIC_STATUS ServerStreamCallback(_In_ MsQuicStream*, _In_opt_ void*, _Inout_ QUIC_STREAM_EVENT*) {
return QUIC_STATUS_SUCCESS;
}
@ -3556,6 +3553,270 @@ void QuicTestOperationPriority()
}
}
struct ConnectionPriorityTestContext {
static const uint8_t NumSend;
uint8_t MaxSend;
CxPlatEvent AllReceivesComplete;
CxPlatEvent OperationQueuedComplete;
CxPlatEvent BlockAfterInitialStart;
uint32_t CurrentSendCount {0};
uint32_t CurrentStartCount {0};
MsQuicStream* ExpectedStream {nullptr};
bool TestSucceeded {false};
MsQuicStream* StartOrder[128] {0};
MsQuicStream* SendOrder[128] {0};
static QUIC_STATUS ServerStreamCallback(_In_ MsQuicStream*, _In_opt_ void*, _Inout_ QUIC_STREAM_EVENT*) {
return QUIC_STATUS_SUCCESS;
}
static QUIC_STATUS ClientStreamStartStreamCallback(_In_ MsQuicStream* Stream, _In_opt_ void* Context, _Inout_ QUIC_STREAM_EVENT* Event) {
auto TestContext = (ConnectionPriorityTestContext*)Context;
if (Event->Type == QUIC_STREAM_EVENT_START_COMPLETE) {
TestContext->StartOrder[TestContext->CurrentStartCount++] = Stream;
if (TestContext->CurrentStartCount == 1) {
// initial dummy stream start to block this thread
TestContext->TestSucceeded = TestContext->ExpectedStream == Stream;
TestContext->BlockAfterInitialStart.Set();
// Wait until all operations are queued
TestContext->OperationQueuedComplete.WaitTimeout(TestWaitTimeout);
} else if (TestContext->CurrentStartCount == 2) {
TestContext->TestSucceeded = TestContext->TestSucceeded && (TestContext->ExpectedStream == Stream);
}
} else if (Event->Type == QUIC_STREAM_EVENT_SEND_COMPLETE) {
TestContext->SendOrder[TestContext->CurrentSendCount++] = Stream;
if (TestContext->CurrentSendCount == 1) {
TestContext->TestSucceeded = TestContext->TestSucceeded && (TestContext->ExpectedStream == Stream);
} else if (TestContext->CurrentSendCount == TestContext->MaxSend) {
TestContext->AllReceivesComplete.Set();
}
}
return QUIC_STATUS_SUCCESS;
}
static QUIC_STATUS ConnCallback(_In_ MsQuicConnection*, _In_opt_ void* Context, _Inout_ QUIC_CONNECTION_EVENT* Event) {
if (Event->Type == QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED) {
new(std::nothrow) MsQuicStream(Event->PEER_STREAM_STARTED.Stream, CleanUpAutoDelete, ServerStreamCallback, Context);
}
return QUIC_STATUS_SUCCESS;
}
};
const uint8_t ConnectionPriorityTestContext::NumSend = 3;
void QuicTestConnectionPriority()
{
// QUIC_EXECUTION_PROFILE_TYPE_SCAVENGER for serializing Connections
MsQuicRegistration Registration("MsQuicTest", QUIC_EXECUTION_PROFILE_TYPE_SCAVENGER, true);
TEST_QUIC_SUCCEEDED(Registration.GetInitStatus());
MsQuicConfiguration ServerConfiguration(Registration, "MsQuicTest", MsQuicSettings().SetPeerUnidiStreamCount(ConnectionPriorityTestContext::NumSend), ServerSelfSignedCredConfig);
TEST_QUIC_SUCCEEDED(ServerConfiguration.GetInitStatus());
MsQuicConfiguration ClientConfiguration(Registration, "MsQuicTest", MsQuicCredentialConfig());
TEST_QUIC_SUCCEEDED(ClientConfiguration.GetInitStatus());
MsQuicAutoAcceptListener Listener(Registration, ServerConfiguration, ConnectionPriorityTestContext::ConnCallback);
TEST_QUIC_SUCCEEDED(Listener.GetInitStatus());
TEST_QUIC_SUCCEEDED(Listener.Start("MsQuicTest"));
QuicAddr ServerLocalAddr;
TEST_QUIC_SUCCEEDED(Listener.GetLocalAddr(ServerLocalAddr));
uint8_t RawBuffer[100];
QUIC_BUFFER Buffer { sizeof(RawBuffer), RawBuffer };
MsQuicStream* Streams[ConnectionPriorityTestContext::NumSend] = {0};
const uint8_t NumConnections = 8;
{
MsQuicConnection *Connections[NumConnections] = {0};
for (uint8_t i = 0; i < NumConnections; ++i) {
Connections[i] = new(std::nothrow) MsQuicConnection(Registration);
TEST_QUIC_SUCCEEDED(Connections[i]->GetInitStatus());
TEST_QUIC_SUCCEEDED(Connections[i]->Start(ClientConfiguration, ServerLocalAddr.GetFamily(), QUIC_TEST_LOOPBACK_FOR_AF(ServerLocalAddr.GetFamily()), ServerLocalAddr.GetPort()));
TEST_TRUE(Connections[i]->HandshakeCompleteEvent.WaitTimeout(TestWaitTimeout));
TEST_TRUE(Connections[i]->HandshakeComplete);
}
// s: stream op
// p: prioritied op
// s0/s2: stream start, stream send
// n in sn0/sn2: stream id
// processing | queued
// [1[s0]] | []
// [1[s0]] | [2[s00, s02, ..., s(n-1)0, s(n-1)2]]
// [1[s0]] | [3[s0p, s2p], 2[s00, s02, ..., s(n-1)0, s(n-1)2]]
// [3[s0p, s2p], 2[s00, s02, ..., s(n-1)0, s(n-1)2]] | []
{
ConnectionPriorityTestContext Context;
MsQuicStream Stream1(*Connections[0], QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, ConnectionPriorityTestContext::ClientStreamStartStreamCallback, &Context);
MsQuicStream Stream3(*Connections[2], QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, ConnectionPriorityTestContext::ClientStreamStartStreamCallback, &Context);
Context.ExpectedStream = &Stream1;
Stream1.Start(QUIC_STREAM_START_FLAG_IMMEDIATE);
// Wait until this StreamStart operation is drained
TEST_TRUE(Context.BlockAfterInitialStart.WaitTimeout(TestWaitTimeout));
Context.ExpectedStream = &Stream3;
for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend; ++i) {
Streams[i] = new(std::nothrow) MsQuicStream(
*Connections[1], QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual,
ConnectionPriorityTestContext::ClientStreamStartStreamCallback, &Context);
TEST_QUIC_SUCCEEDED(Streams[i]->GetInitStatus());
TEST_QUIC_SUCCEEDED(Streams[i]->Send(&Buffer, 1, QUIC_SEND_FLAG_START | QUIC_SEND_FLAG_FIN));
}
TEST_QUIC_SUCCEEDED(Stream3.Start(QUIC_STREAM_START_FLAG_PRIORITY_WORK));
TEST_QUIC_SUCCEEDED(Stream3.Send(&Buffer, 1, QUIC_SEND_FLAG_FIN | QUIC_SEND_FLAG_PRIORITY_WORK));
Context.MaxSend = ConnectionPriorityTestContext::NumSend + 1;
Context.OperationQueuedComplete.Set(); // All operations are queued. Kick off processing the operations
MsQuicStream* ExpectedStartOrder[ConnectionPriorityTestContext::NumSend + 2] = {0};
ExpectedStartOrder[0] = &Stream1;
ExpectedStartOrder[1] = &Stream3;
for (uint8_t i = 2; i < ConnectionPriorityTestContext::NumSend + 2; ++i) {
ExpectedStartOrder[i] = Streams[i-2];
}
MsQuicStream* ExpectedSendOrder[ConnectionPriorityTestContext::NumSend + 1] = {0};
ExpectedSendOrder[0] = &Stream3;
for (uint8_t i = 1; i < ConnectionPriorityTestContext::NumSend + 1; ++i) {
ExpectedSendOrder[i] = Streams[i-1];
}
TEST_TRUE(Context.AllReceivesComplete.WaitTimeout(TestWaitTimeout));
TEST_TRUE(memcmp(Context.StartOrder, ExpectedStartOrder, sizeof(ExpectedStartOrder)) == 0);
TEST_TRUE(memcmp(Context.SendOrder, ExpectedSendOrder, sizeof(ExpectedSendOrder)) == 0);
TEST_TRUE(Context.TestSucceeded);
for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend; ++i) {
delete Streams[i];
}
}
// processing | queued
// [1[s0]] | []
// [1[s0]] | [2[s(n-1)0p, s(n-1)2p, s00, s02, ..., s(n-2)0, s(n-2)2]]
// [1[s0]] | [2[s(n-1)0p, s(n-1)2p, s00, s02, ..., s(n-2)0, s(n-2)2], 3[s0p, s2p]]
// [2[s(n-1)0p, s(n-1)2p, s00, s02, ..., s(n-2)0, s(n-2)2], 3[s0p, s2p]] | []
{
ConnectionPriorityTestContext Context;
MsQuicStream Stream1(*Connections[0], QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, ConnectionPriorityTestContext::ClientStreamStartStreamCallback, &Context);
MsQuicStream Stream3(*Connections[2], QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, ConnectionPriorityTestContext::ClientStreamStartStreamCallback, &Context);
Context.ExpectedStream = &Stream1;
Stream1.Start(QUIC_STREAM_START_FLAG_IMMEDIATE);
// Wait until this StreamStart operation is drained
TEST_TRUE(Context.BlockAfterInitialStart.WaitTimeout(TestWaitTimeout));
for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend; ++i) {
Streams[i] = new(std::nothrow) MsQuicStream(
*Connections[1], QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual,
ConnectionPriorityTestContext::ClientStreamStartStreamCallback, &Context);
TEST_QUIC_SUCCEEDED(Streams[i]->GetInitStatus());
// NOTE: Splitting Start and Send without using QUIC_SEND_FLAG_START doesn't change operation order,
// but it changes callback order. This is needed for using ExpectedXxxxOrder array.
if (i == ConnectionPriorityTestContext::NumSend-1) {
Context.ExpectedStream = Streams[i];
TEST_QUIC_SUCCEEDED(Streams[i]->Start(QUIC_STREAM_START_FLAG_PRIORITY_WORK))
TEST_QUIC_SUCCEEDED(Streams[i]->Send(&Buffer, 1, QUIC_SEND_FLAG_FIN | QUIC_SEND_FLAG_PRIORITY_WORK));
} else {
TEST_QUIC_SUCCEEDED(Streams[i]->Start(QUIC_STREAM_START_FLAG_NONE))
TEST_QUIC_SUCCEEDED(Streams[i]->Send(&Buffer, 1, QUIC_SEND_FLAG_FIN));
}
}
TEST_QUIC_SUCCEEDED(Stream3.Start(QUIC_STREAM_START_FLAG_NONE));
TEST_QUIC_SUCCEEDED(Stream3.Send(&Buffer, 1, QUIC_SEND_FLAG_FIN));
Context.MaxSend = ConnectionPriorityTestContext::NumSend + 1;
Context.OperationQueuedComplete.Set(); // All operations are queued. Kick off processing the operations
MsQuicStream* ExpectedStartOrder[ConnectionPriorityTestContext::NumSend + 2] = {0};
ExpectedStartOrder[0] = &Stream1;
ExpectedStartOrder[1] = Streams[ConnectionPriorityTestContext::NumSend-1];
ExpectedStartOrder[ConnectionPriorityTestContext::NumSend+1] = &Stream3;
for (uint8_t i = 2; i < ConnectionPriorityTestContext::NumSend + 1; ++i) {
ExpectedStartOrder[i] = Streams[i-2];
}
MsQuicStream* ExpectedSendOrder[ConnectionPriorityTestContext::NumSend + 1] = {0};
ExpectedSendOrder[0] = Streams[ConnectionPriorityTestContext::NumSend-1];
ExpectedSendOrder[ConnectionPriorityTestContext::NumSend] = &Stream3;
for (uint8_t i = 1; i < ConnectionPriorityTestContext::NumSend; ++i) {
ExpectedSendOrder[i] = Streams[i-1];
}
TEST_TRUE(Context.AllReceivesComplete.WaitTimeout(TestWaitTimeout));
TEST_TRUE(memcmp(Context.StartOrder, ExpectedStartOrder, sizeof(ExpectedStartOrder)) == 0);
TEST_TRUE(memcmp(Context.SendOrder, ExpectedSendOrder, sizeof(ExpectedSendOrder)) == 0);
TEST_TRUE(Context.TestSucceeded);
for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend; ++i) {
delete Streams[i];
}
}
// processing | queued
// [1[s0]] | []
// [1[s0]] | [5[s0, s2]]
// [1[s0]] | [4[s0p, s2p], 5[s0, s2]]
// [1[s0]] | [4[s0p, s2p], 5[s0, s2]], 3[s0, s2]]
// [1[s0]] | [4[s0p, s2p], 2[s0p, s2p], 5[s0, s2], 3[s0, s2]]
// [1[s0, s2]] | [4[s0p, s2p], 2[s0p, s2p], 5[s0, s2], 3[s0, s2]]
{
ConnectionPriorityTestContext Context;
MsQuicStream Stream1(*Connections[0], QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, ConnectionPriorityTestContext::ClientStreamStartStreamCallback, &Context);
MsQuicStream Stream2(*Connections[1], QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, ConnectionPriorityTestContext::ClientStreamStartStreamCallback, &Context);
MsQuicStream Stream3(*Connections[2], QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, ConnectionPriorityTestContext::ClientStreamStartStreamCallback, &Context);
MsQuicStream Stream4(*Connections[3], QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, ConnectionPriorityTestContext::ClientStreamStartStreamCallback, &Context);
MsQuicStream Stream5(*Connections[4], QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, ConnectionPriorityTestContext::ClientStreamStartStreamCallback, &Context);
// NOTE: This is to flush all operations in the queue.
// If this is not done, the operation order is not guaranteed.
// e.g. This test case randomly swap [3] and [4] and fail without sleep.
// This happens when [3] is already in the worker queue.
// Normal enqueue doesn't re-queue the Connection
CxPlatSleep(1000);
Stream1.Start(QUIC_STREAM_START_FLAG_IMMEDIATE);
// Wait until this StreamStart operation is drained
TEST_TRUE(Context.BlockAfterInitialStart.WaitTimeout(TestWaitTimeout));
TEST_QUIC_SUCCEEDED(Stream5.Start(QUIC_STREAM_START_FLAG_NONE));
TEST_QUIC_SUCCEEDED(Stream5.Send(&Buffer, 1, QUIC_SEND_FLAG_START | QUIC_SEND_FLAG_FIN));
TEST_QUIC_SUCCEEDED(Stream4.Start(QUIC_STREAM_START_FLAG_PRIORITY_WORK));
TEST_QUIC_SUCCEEDED(Stream4.Send(&Buffer, 1, QUIC_SEND_FLAG_START | QUIC_SEND_FLAG_FIN | QUIC_SEND_FLAG_PRIORITY_WORK));
TEST_QUIC_SUCCEEDED(Stream3.Start(QUIC_STREAM_START_FLAG_NONE));
TEST_QUIC_SUCCEEDED(Stream3.Send(&Buffer, 1, QUIC_SEND_FLAG_START | QUIC_SEND_FLAG_FIN));
TEST_QUIC_SUCCEEDED(Stream2.Start(QUIC_STREAM_START_FLAG_PRIORITY_WORK));
TEST_QUIC_SUCCEEDED(Stream2.Send(&Buffer, 1, QUIC_SEND_FLAG_START | QUIC_SEND_FLAG_FIN | QUIC_SEND_FLAG_PRIORITY_WORK));
TEST_QUIC_SUCCEEDED(Stream1.Send(&Buffer, 1, QUIC_SEND_FLAG_FIN | QUIC_SEND_FLAG_PRIORITY_WORK));
Context.OperationQueuedComplete.Set(); // All operations are queued. Kick off processing the operations
Context.MaxSend = 5;
TEST_TRUE(Context.AllReceivesComplete.WaitTimeout(TestWaitTimeout));
MsQuicStream* ExpectedStartOrder[5] = {0};
MsQuicStream* ExpectedSendOrder[5] = {0};
ExpectedStartOrder[0] = &Stream1;
ExpectedStartOrder[1] = &Stream4;
ExpectedStartOrder[2] = &Stream2;
ExpectedStartOrder[3] = &Stream5;
ExpectedStartOrder[4] = &Stream3;
ExpectedSendOrder[0] = &Stream1;
ExpectedSendOrder[1] = &Stream4;
ExpectedSendOrder[2] = &Stream2;
ExpectedSendOrder[3] = &Stream5;
ExpectedSendOrder[4] = &Stream3;
TEST_TRUE(memcmp(Context.StartOrder, ExpectedStartOrder, sizeof(ExpectedStartOrder)) == 0);
TEST_TRUE(memcmp(Context.SendOrder, ExpectedSendOrder, sizeof(ExpectedSendOrder)) == 0);
}
for (uint8_t i = 0; i < NumConnections; ++i) {
delete Connections[i];
}
}
}
struct StreamBlockUnblockConnFlowControl {
CxPlatEvent ClientStreamShutdownComplete;
CxPlatEvent ClientStreamSendComplete;

Просмотреть файл

@ -879,6 +879,7 @@ void SpinQuicGetRandomParam(HQUIC Handle, uint16_t ThreadID)
uint32_t Level = (uint32_t)GetRandom(ARRAYSIZE(ParamCounts));
uint32_t Param = (uint32_t)GetRandom(((ParamCounts[Level] & 0xFFFFFFF)) + 1);
uint32_t Combined = ((Level+1) << 28) + Param;
Combined &= ~QUIC_PARAM_HIGH_PRIORITY; // TODO: enable high priority GetParam
uint8_t OutBuffer[200];
uint32_t OutBufferLength = (uint32_t)GetRandom(sizeof(OutBuffer) + 1);