* WIP

* Fixes

* Simplify

* fix

* Fix

* Fix more bugs

* Fix clog and .net

* Fixes

* Improvements

* Undo a merge issue

* Another merge issue

* Simple test

* remove fprintf

* fix signature to upto 255. add Large send case

* 1G Multi receive

* fix drain bug

* kernel test

* fix

* fix

* unused variable

* compare data

* tmp

* test cases

* 95% works

* remove fprintf in core

* Fix Range copy

* fix type mimatch

* cleanup

* Fix stall issue

* rollback

* retry if length is 0

* remove continue to reset Readpending in QuicStreamReceiveComplete

* stop enablling send with pending data (race condition exists)

* fix QuicRecvBufferHasUnreadData

* rollback for recv_buff lock bug then return earlier not to indicate FIN bit

* fix

* fix

* reduce test buffer size for CI speed

* update document

* add back buffer count caution

* Update docs/api/StreamReceiveComplete.md

Co-authored-by: Nick Banks <nibanks@microsoft.com>

* Update docs/api/StreamReceiveComplete.md

Co-authored-by: Nick Banks <nibanks@microsoft.com>

* logical conflicts

---------

Co-authored-by: ami-GS <1991.daiki@gmail.com>
This commit is contained in:
Nick Banks 2024-08-05 13:11:50 -04:00 коммит произвёл GitHub
Родитель 7267442e5b
Коммит f960155603
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
23 изменённых файлов: 537 добавлений и 38 удалений

Просмотреть файл

@ -64,6 +64,7 @@ The following settings are available via registry as well as via [QUIC_SETTINGS]
| Stateless Operation Expiration | uint16_t | StatelessOperationExpirationMs | 100 | The time limit between operations for the same endpoint, in milliseconds. |
| Congestion Control Algorithm | uint16_t | CongestionControlAlgorithm | 0 (Cubic) | The congestion control algorithm used for the connection. |
| ECN | uint8_t | EcnEnabled | 0 (FALSE) | Enable sender-side ECN support. |
| Stream Multi Receive | uint8_t | StreamMultiReceiveEnabled | 0 (FALSE) | Enable multi receive support |
The types map to registry types as follows:
- `uint64_t` is a `REG_QWORD`.

Просмотреть файл

@ -78,7 +78,7 @@ Typically, the buffer count is one, which means that most events will include a
When the buffer count is 0, it signifies the reception of a QUIC frame with empty data, which also indicates the end of stream data.
Currently, the maximum buffer count is 2 in the case of partial receive, where only a portion of the buffer data is consumed (as explained below). However, it is strongly advised not to assume in application code that the upper limit is always 2. This caution is important because future releases may incorporate multiple circular buffers to enhance performance, leading to potential changes in the buffer count limit.
Currently, the maximum buffer count is 3 in the case of partial receive, where only a portion of the buffer data is consumed (as explained below). However, it is strongly advised not to assume in application code that the upper limit is always 3. This caution is important because future releases may change internal algorithm, leading to potential changes in the buffer count limit.
The app then may respond to the event in a number of ways:
@ -100,4 +100,12 @@ Any value less than or equal to the initial **TotalBufferLength** value is allow
Whenever a receive isn't fully accepted by the app, additional receive events are immediately disabled. The app is assumed to be at capacity and not able to consume more until further indication. To re-enable receive callbacks, the app must call [StreamReceiveSetEnabled](api/StreamReceiveSetEnabled.md).
There are cases where an app may want to partially accept the current data, but still immediately get a callback with the rest of the data. To do this (only works in the synchronous flow) the app must return `QUIC_STATUS_CONTINUE`.
There are cases where an app may want to partially accept the current data, but still immediately get a callback with the rest of the data. To do this (only works in the synchronous flow) the app must return `QUIC_STATUS_CONTINUE`.
## Multi Receive mode
Setting [`StreamMultiReceiveEnabled`](./Settings.md) an app can continue getting indicated by `QUIC_STREAM_EVENT_RECEIVE` without returning `QUIC_STATUS_SUCCESS` nor calling [StreamReceiveComplete](api/StreamReceiveComplete.md).
This changes internal receive buffer more efficient for continuous receiving.
The app need to keep track of total `TotalBufferLength` to later call [StreamReceiveComplete](api/StreamReceiveComplete.md) appropriately.

Просмотреть файл

@ -54,7 +54,8 @@ typedef struct QUIC_SETTINGS {
uint64_t ReliableResetEnabled : 1;
uint64_t OneWayDelayEnabled : 1;
uint64_t NetStatsEventEnabled : 1;
uint64_t RESERVED : 22;
uint64_t StreamMultiReceiveEnabled : 1;
uint64_t RESERVED : 21;
#else
uint64_t RESERVED : 26;
#endif
@ -104,7 +105,8 @@ typedef struct QUIC_SETTINGS {
uint64_t ReliableResetEnabled : 1;
uint64_t OneWayDelayEnabled : 1;
uint64_t NetStatsEventEnabled : 1;
uint64_t ReservedFlags : 59;
uint64_t StreamMultiReceiveEnabled : 1;
uint64_t ReservedFlags : 58;
#else
uint64_t ReservedFlags : 63;
#endif
@ -351,6 +353,12 @@ Initial stream receive flow control window size for remotely initiated unidirect
**Default value:** 0 (no overwrite)
`StreamMultiReceiveEnabled`
Enable multi receive mode. An app can continue receiving stream data without calling `StreamReceiveComplete` for each `QUIC_STREAM_EVENT_RECEIVE` indication.
**Default value:** 0 (`FALSE`)
# Remarks
When setting new values for the settings, the app must set the corresponding `.IsSet.*` parameter for each actual parameter that is being set or updated. For example:

Просмотреть файл

@ -22,8 +22,9 @@ void
# Remarks
This is an asynchronous API but can run inline if called in a callback.
The application must ensure that one `StreamReceiveComplete` call corresponds to one `QUIC_STREAM_EVENT_RECEIVE` event.
The application, without setting `StreamMultiReceiveEnabled`, must ensure that one `StreamReceiveComplete` call corresponds to one `QUIC_STREAM_EVENT_RECEIVE` event.
Duplicate `StreamReceiveComplete` calls after one `QUIC_STREAM_EVENT_RECEIVE` event are ignored silently even with different `BufferLength`.
The `StreamMultiReceiveEnabled` mode doesn't follow this rule. Multiple `QUIC_STREAM_EVENT_RECEIVE` events can be indicated at once by `StreamReceiveComplete`. The application needs to keep track of accumulated `TotalBufferLength` with this mode.
# See Also

Просмотреть файл

@ -527,6 +527,11 @@ CXPLAT_STATIC_ASSERT(
//
#define QUIC_DEFAULT_NET_STATS_EVENT_ENABLED FALSE
//
// The default settings for using multiple parallel receives for streams.
//
#define QUIC_DEFAULT_STREAM_MULTI_RECEIVE_ENABLED FALSE
//
// The number of rounds in Cubic Slow Start to sample RTT.
//
@ -633,6 +638,7 @@ CXPLAT_STATIC_ASSERT(
#define QUIC_SETTING_RELIABLE_RESET_ENABLED "ReliableResetEnabled"
#define QUIC_SETTING_ONE_WAY_DELAY_ENABLED "OneWayDelayEnabled"
#define QUIC_SETTING_NET_STATS_EVENT_ENABLED "NetStatsEventEnabled"
#define QUIC_SETTING_STREAM_MULTI_RECEIVE_ENABLED "StreamMultiReceiveEnabled"
#define QUIC_SETTING_INITIAL_WINDOW_PACKETS "InitialWindowPackets"
#define QUIC_SETTING_SEND_IDLE_TIMEOUT_MS "SendIdleTimeoutMs"

Просмотреть файл

@ -165,6 +165,9 @@ QuicSettingsSetDefault(
if (!Settings->IsSet.NetStatsEventEnabled) {
Settings->NetStatsEventEnabled = QUIC_DEFAULT_NET_STATS_EVENT_ENABLED;
}
if (!Settings->IsSet.StreamMultiReceiveEnabled) {
Settings->StreamMultiReceiveEnabled = QUIC_DEFAULT_STREAM_MULTI_RECEIVE_ENABLED;
}
}
_IRQL_requires_max_(PASSIVE_LEVEL)
@ -330,6 +333,9 @@ QuicSettingsCopy(
if (!Destination->IsSet.NetStatsEventEnabled) {
Destination->NetStatsEventEnabled = Source->NetStatsEventEnabled;
}
if (!Destination->IsSet.StreamMultiReceiveEnabled) {
Destination->StreamMultiReceiveEnabled = Source->StreamMultiReceiveEnabled;
}
}
_IRQL_requires_max_(PASSIVE_LEVEL)
@ -700,6 +706,11 @@ QuicSettingApply(
Destination->NetStatsEventEnabled = Source->NetStatsEventEnabled;
Destination->IsSet.NetStatsEventEnabled = TRUE;
}
if (Source->IsSet.StreamMultiReceiveEnabled && (!Destination->IsSet.StreamMultiReceiveEnabled || OverWrite)) {
Destination->StreamMultiReceiveEnabled = Source->StreamMultiReceiveEnabled;
Destination->IsSet.StreamMultiReceiveEnabled = TRUE;
}
return TRUE;
}
@ -1358,6 +1369,16 @@ VersionSettingsFail:
&ValueLen);
Settings->NetStatsEventEnabled = !!Value;
}
if (!Settings->IsSet.StreamMultiReceiveEnabled) {
Value = QUIC_DEFAULT_STREAM_MULTI_RECEIVE_ENABLED;
ValueLen = sizeof(Value);
CxPlatStorageReadValue(
Storage,
QUIC_SETTING_STREAM_MULTI_RECEIVE_ENABLED,
(uint8_t*)&Value,
&ValueLen);
Settings->StreamMultiReceiveEnabled = !!Value;
}
}
_IRQL_requires_max_(PASSIVE_LEVEL)
@ -1426,6 +1447,7 @@ QuicSettingsDump(
QuicTraceLogVerbose(SettingReliableResetEnabled, "[sett] ReliableResetEnabled = %hhu", Settings->ReliableResetEnabled);
QuicTraceLogVerbose(SettingOneWayDelayEnabled, "[sett] OneWayDelayEnabled = %hhu", Settings->OneWayDelayEnabled);
QuicTraceLogVerbose(SettingNetStatsEventEnabled, "[sett] NetStatsEventEnabled = %hhu", Settings->NetStatsEventEnabled);
QuicTraceLogVerbose(SettingsStreamMultiReceiveEnabled, "[sett] StreamMultiReceiveEnabled= %hhu", Settings->StreamMultiReceiveEnabled);
}
_IRQL_requires_max_(PASSIVE_LEVEL)
@ -1587,6 +1609,9 @@ QuicSettingsDumpNew(
if (Settings->IsSet.NetStatsEventEnabled) {
QuicTraceLogVerbose(SettingNetStatsEventEnabled, "[sett] NetStatsEventEnabled = %hhu", Settings->NetStatsEventEnabled);
}
if (Settings->IsSet.StreamMultiReceiveEnabled) {
QuicTraceLogVerbose(SettingStreamMultiReceiveEnabled, "[sett] StreamMultiReceiveEnabled = %hhu", Settings->StreamMultiReceiveEnabled);
}
}
#define SETTINGS_SIZE_THRU_FIELD(SettingsType, Field) \
@ -1843,6 +1868,14 @@ QuicSettingsSettingsToInternal(
SettingsSize,
InternalSettings);
SETTING_COPY_FLAG_TO_INTERNAL_SIZED(
Flags,
StreamMultiReceiveEnabled,
QUIC_SETTINGS,
Settings,
SettingsSize,
InternalSettings);
return QUIC_STATUS_SUCCESS;
}
@ -2004,6 +2037,14 @@ QuicSettingsGetSettings(
*SettingsLength,
InternalSettings);
SETTING_COPY_FLAG_FROM_INTERNAL_SIZED(
Flags,
StreamMultiReceiveEnabled,
QUIC_SETTINGS,
Settings,
*SettingsLength,
InternalSettings);
*SettingsLength = CXPLAT_MIN(*SettingsLength, sizeof(QUIC_SETTINGS));
return QUIC_STATUS_SUCCESS;

Просмотреть файл

@ -61,7 +61,8 @@ typedef struct QUIC_SETTINGS_INTERNAL {
uint64_t ReliableResetEnabled : 1;
uint64_t OneWayDelayEnabled : 1;
uint64_t NetStatsEventEnabled : 1;
uint64_t RESERVED : 17;
uint64_t StreamMultiReceiveEnabled : 1;
uint64_t RESERVED : 16;
} IsSet;
};
@ -111,6 +112,7 @@ typedef struct QUIC_SETTINGS_INTERNAL {
uint8_t ReliableResetEnabled : 1;
uint8_t OneWayDelayEnabled : 1;
uint8_t NetStatsEventEnabled : 1;
uint8_t StreamMultiReceiveEnabled : 1;
uint8_t MtuDiscoveryMissingProbeCount;
} QUIC_SETTINGS_INTERNAL;

Просмотреть файл

@ -66,6 +66,7 @@ QuicStreamInitialize(
Stream->Flags.Allocated = TRUE;
Stream->Flags.SendEnabled = TRUE;
Stream->Flags.ReceiveEnabled = TRUE;
Stream->Flags.ReceiveMultiple = Connection->Settings.StreamMultiReceiveEnabled;
Stream->RecvMaxLength = UINT64_MAX;
Stream->RefCount = 1;
Stream->SendRequestsTail = &Stream->SendRequests;
@ -131,7 +132,8 @@ QuicStreamInitialize(
&Stream->RecvBuffer,
InitialRecvBufferLength,
FlowControlWindowSize,
QUIC_RECV_BUF_MODE_CIRCULAR,
Stream->Flags.ReceiveMultiple ?
QUIC_RECV_BUF_MODE_MULTIPLE : QUIC_RECV_BUF_MODE_CIRCULAR,
PreallocatedRecvChunk);
if (QUIC_FAILED(Status)) {
goto Exit;

Просмотреть файл

@ -139,6 +139,7 @@ typedef union QUIC_STREAM_FLAGS {
BOOLEAN SendEnabled : 1; // Application is allowed to send data.
BOOLEAN ReceiveEnabled : 1; // Application is ready for receive callbacks.
BOOLEAN ReceiveMultiple : 1; // The app supports multiple parallel receive indications.
BOOLEAN ReceiveFlushQueued : 1; // The receive flush operation is queued.
BOOLEAN ReceiveDataPending : 1; // Data (or FIN) is queued and ready for delivery.
BOOLEAN ReceiveCallActive : 1; // There is an active receive to the app.

Просмотреть файл

@ -129,10 +129,7 @@ QuicStreamRecvQueueFlush(
// The caller has indicated data is ready to be indicated to the
// application. Queue a FLUSH_RECV if one isn't already queued.
//
if (Stream->Flags.ReceiveEnabled &&
Stream->Flags.ReceiveDataPending &&
Stream->RecvPendingLength == 0) {
if (Stream->Flags.ReceiveEnabled && Stream->Flags.ReceiveDataPending) {
if (AllowInlineFlush) {
QuicStreamRecvFlush(Stream);
@ -541,7 +538,9 @@ QuicStreamProcessStreamFrame(
}
}
if (ReadyToDeliver) {
if (ReadyToDeliver &&
(Stream->RecvBuffer.RecvMode == QUIC_RECV_BUF_MODE_MULTIPLE ||
Stream->RecvBuffer.ReadPendingLength == 0)) {
Stream->Flags.ReceiveDataPending = TRUE;
QuicStreamRecvQueueFlush(
Stream,
@ -870,8 +869,6 @@ QuicStreamRecvFlush(
return;
}
CXPLAT_TEL_ASSERT(!Stream->RecvPendingLength); // N.B. - Will be an invalid assert once we support multiple receives
BOOLEAN FlushRecv = TRUE;
while (FlushRecv) {
CXPLAT_DBG_ASSERT(!Stream->Flags.SentStopSending);
@ -924,9 +921,10 @@ QuicStreamRecvFlush(
Event.RECEIVE.Flags |= QUIC_RECEIVE_FLAG_FIN; // TODO - 0-RTT flag?
}
Stream->Flags.ReceiveEnabled = FALSE;
Stream->Flags.ReceiveEnabled = Stream->Flags.ReceiveMultiple;
Stream->Flags.ReceiveCallActive = TRUE;
Stream->RecvPendingLength += Event.RECEIVE.TotalBufferLength;
CXPLAT_DBG_ASSERT(Stream->RecvPendingLength <= Stream->RecvBuffer.ReadPendingLength);
QuicTraceEvent(
StreamAppReceive,
@ -1056,7 +1054,7 @@ QuicStreamReceiveComplete(
//
Stream->Flags.ReceiveEnabled = TRUE;
} else {
} else if (!Stream->Flags.ReceiveMultiple) {
//
// The app didn't drain all the data, so we will need to wait for them
// to request a new receive.
@ -1080,9 +1078,10 @@ QuicStreamReceiveComplete(
if (Stream->Flags.ReceiveDataPending) {
//
// There is still more data for the app to process and it still has
// receive callbacks enabled, so do another recv flush.
// receive callbacks enabled, so do another recv flush (if not already
// doing multi-receive mode).
//
return TRUE;
return !Stream->Flags.ReceiveMultiple;
}
if (Stream->RecvBuffer.BaseOffset == Stream->RecvMaxLength) {
@ -1157,7 +1156,9 @@ QuicStreamRecvSetEnabledState(
CXPLAT_DBG_ASSERT(!Stream->Flags.SentStopSending);
Stream->Flags.ReceiveEnabled = NewRecvEnabled;
if (Stream->Flags.Started && NewRecvEnabled) {
if (Stream->Flags.Started && NewRecvEnabled &&
(Stream->RecvBuffer.RecvMode == QUIC_RECV_BUF_MODE_MULTIPLE ||
Stream->RecvBuffer.ReadPendingLength == 0)) {
//
// The application just resumed receive callbacks. Queue a
// flush receive operation to start draining the receive buffer.

Просмотреть файл

@ -124,6 +124,7 @@ TEST(SettingsTest, TestAllSettingsFieldsSet)
SETTINGS_FEATURE_SET_TEST(ReliableResetEnabled, QuicSettingsSettingsToInternal);
SETTINGS_FEATURE_SET_TEST(OneWayDelayEnabled, QuicSettingsSettingsToInternal);
SETTINGS_FEATURE_SET_TEST(NetStatsEventEnabled, QuicSettingsSettingsToInternal);
SETTINGS_FEATURE_SET_TEST(StreamMultiReceiveEnabled, QuicSettingsSettingsToInternal);
Settings.IsSetFlags = 0;
Settings.IsSet.RESERVED = ~Settings.IsSet.RESERVED;
@ -209,6 +210,7 @@ TEST(SettingsTest, TestAllSettingsFieldsGet)
SETTINGS_FEATURE_GET_TEST(ReliableResetEnabled, QuicSettingsGetSettings);
SETTINGS_FEATURE_GET_TEST(OneWayDelayEnabled, QuicSettingsGetSettings);
SETTINGS_FEATURE_GET_TEST(NetStatsEventEnabled, QuicSettingsGetSettings);
SETTINGS_FEATURE_GET_TEST(StreamMultiReceiveEnabled, QuicSettingsGetSettings);
Settings.IsSetFlags = 0;
Settings.IsSet.RESERVED = ~Settings.IsSet.RESERVED;

Просмотреть файл

@ -1349,6 +1349,19 @@ namespace Microsoft.Quic
}
}
internal ulong StreamMultiReceiveEnabled
{
get
{
return Anonymous2.Anonymous.StreamMultiReceiveEnabled;
}
set
{
Anonymous2.Anonymous.StreamMultiReceiveEnabled = value;
}
}
internal ulong ReservedFlags
{
get
@ -1965,17 +1978,31 @@ namespace Microsoft.Quic
}
}
[NativeTypeName("uint64_t : 22")]
internal ulong RESERVED
[NativeTypeName("uint64_t : 1")]
internal ulong StreamMultiReceiveEnabled
{
get
{
return (_bitfield >> 42) & 0x3FFFFFUL;
return (_bitfield >> 42) & 0x1UL;
}
set
{
_bitfield = (_bitfield & ~(0x3FFFFFUL << 42)) | ((value & 0x3FFFFFUL) << 42);
_bitfield = (_bitfield & ~(0x1UL << 42)) | ((value & 0x1UL) << 42);
}
}
[NativeTypeName("uint64_t : 21")]
internal ulong RESERVED
{
get
{
return (_bitfield >> 43) & 0x1FFFFFUL;
}
set
{
_bitfield = (_bitfield & ~(0x1FFFFFUL << 43)) | ((value & 0x1FFFFFUL) << 43);
}
}
}
@ -2066,17 +2093,31 @@ namespace Microsoft.Quic
}
}
[NativeTypeName("uint64_t : 59")]
internal ulong ReservedFlags
[NativeTypeName("uint64_t : 1")]
internal ulong StreamMultiReceiveEnabled
{
get
{
return (_bitfield >> 5) & 0x7FFFFFFUL;
return (_bitfield >> 5) & 0x1UL;
}
set
{
_bitfield = (_bitfield & ~(0x7FFFFFFUL << 5)) | ((value & 0x7FFFFFFUL) << 5);
_bitfield = (_bitfield & ~(0x1UL << 5)) | ((value & 0x1UL) << 5);
}
}
[NativeTypeName("uint64_t : 58")]
internal ulong ReservedFlags
{
get
{
return (_bitfield >> 6) & 0x3FFFFFFUL;
}
set
{
_bitfield = (_bitfield & ~(0x3FFFFFFUL << 6)) | ((value & 0x3FFFFFFUL) << 6);
}
}
}

Просмотреть файл

@ -782,6 +782,21 @@ tracepoint(CLOG_SETTINGS_C, SettingNetStatsEventEnabled , arg2);\
/*----------------------------------------------------------
// Decoder Ring for SettingsStreamMultiReceiveEnabled
// [sett] StreamMultiReceiveEnabled= %hhu
// QuicTraceLogVerbose(SettingsStreamMultiReceiveEnabled, "[sett] StreamMultiReceiveEnabled= %hhu", Settings->StreamMultiReceiveEnabled);
// arg2 = arg2 = Settings->StreamMultiReceiveEnabled = arg2
----------------------------------------------------------*/
#ifndef _clog_3_ARGS_TRACE_SettingsStreamMultiReceiveEnabled
#define _clog_3_ARGS_TRACE_SettingsStreamMultiReceiveEnabled(uniqueId, encoded_arg_string, arg2)\
tracepoint(CLOG_SETTINGS_C, SettingsStreamMultiReceiveEnabled , arg2);\
#endif
/*----------------------------------------------------------
// Decoder Ring for SettingDumpLFixedServerID
// [sett] FixedServerID = %u
@ -812,6 +827,21 @@ tracepoint(CLOG_SETTINGS_C, SettingDumpStreamRecvBufferDefault , arg2);\
/*----------------------------------------------------------
// Decoder Ring for SettingStreamMultiReceiveEnabled
// [sett] StreamMultiReceiveEnabled = %hhu
// QuicTraceLogVerbose(SettingStreamMultiReceiveEnabled, "[sett] StreamMultiReceiveEnabled = %hhu", Settings->StreamMultiReceiveEnabled);
// arg2 = arg2 = Settings->StreamMultiReceiveEnabled = arg2
----------------------------------------------------------*/
#ifndef _clog_3_ARGS_TRACE_SettingStreamMultiReceiveEnabled
#define _clog_3_ARGS_TRACE_SettingStreamMultiReceiveEnabled(uniqueId, encoded_arg_string, arg2)\
tracepoint(CLOG_SETTINGS_C, SettingStreamMultiReceiveEnabled , arg2);\
#endif
/*----------------------------------------------------------
// Decoder Ring for SettingsLoadInvalidAcceptableVersion
// Invalid AcceptableVersion loaded from storage! 0x%x at position %d

Просмотреть файл

@ -810,6 +810,22 @@ TRACEPOINT_EVENT(CLOG_SETTINGS_C, SettingNetStatsEventEnabled,
/*----------------------------------------------------------
// Decoder Ring for SettingsStreamMultiReceiveEnabled
// [sett] StreamMultiReceiveEnabled= %hhu
// QuicTraceLogVerbose(SettingsStreamMultiReceiveEnabled, "[sett] StreamMultiReceiveEnabled= %hhu", Settings->StreamMultiReceiveEnabled);
// arg2 = arg2 = Settings->StreamMultiReceiveEnabled = arg2
----------------------------------------------------------*/
TRACEPOINT_EVENT(CLOG_SETTINGS_C, SettingsStreamMultiReceiveEnabled,
TP_ARGS(
unsigned char, arg2),
TP_FIELDS(
ctf_integer(unsigned char, arg2, arg2)
)
)
/*----------------------------------------------------------
// Decoder Ring for SettingDumpLFixedServerID
// [sett] FixedServerID = %u
@ -842,6 +858,22 @@ TRACEPOINT_EVENT(CLOG_SETTINGS_C, SettingDumpStreamRecvBufferDefault,
/*----------------------------------------------------------
// Decoder Ring for SettingStreamMultiReceiveEnabled
// [sett] StreamMultiReceiveEnabled = %hhu
// QuicTraceLogVerbose(SettingStreamMultiReceiveEnabled, "[sett] StreamMultiReceiveEnabled = %hhu", Settings->StreamMultiReceiveEnabled);
// arg2 = arg2 = Settings->StreamMultiReceiveEnabled = arg2
----------------------------------------------------------*/
TRACEPOINT_EVENT(CLOG_SETTINGS_C, SettingStreamMultiReceiveEnabled,
TP_ARGS(
unsigned char, arg2),
TP_FIELDS(
ctf_integer(unsigned char, arg2, arg2)
)
)
/*----------------------------------------------------------
// Decoder Ring for SettingsLoadInvalidAcceptableVersion
// Invalid AcceptableVersion loaded from storage! 0x%x at position %d

Просмотреть файл

@ -685,7 +685,8 @@ typedef struct QUIC_SETTINGS {
uint64_t ReliableResetEnabled : 1;
uint64_t OneWayDelayEnabled : 1;
uint64_t NetStatsEventEnabled : 1;
uint64_t RESERVED : 22;
uint64_t StreamMultiReceiveEnabled : 1;
uint64_t RESERVED : 21;
#else
uint64_t RESERVED : 26;
#endif
@ -735,7 +736,8 @@ typedef struct QUIC_SETTINGS {
uint64_t ReliableResetEnabled : 1;
uint64_t OneWayDelayEnabled : 1;
uint64_t NetStatsEventEnabled : 1;
uint64_t ReservedFlags : 59;
uint64_t StreamMultiReceiveEnabled : 1;
uint64_t ReservedFlags : 58;
#else
uint64_t ReservedFlags : 63;
#endif

Просмотреть файл

@ -587,6 +587,7 @@ public:
MsQuicSettings& SetDisconnectTimeoutMs(uint32_t Value) { DisconnectTimeoutMs = Value; IsSet.DisconnectTimeoutMs = TRUE; return *this; }
MsQuicSettings& SetPeerBidiStreamCount(uint16_t Value) { PeerBidiStreamCount = Value; IsSet.PeerBidiStreamCount = TRUE; return *this; }
MsQuicSettings& SetPeerUnidiStreamCount(uint16_t Value) { PeerUnidiStreamCount = Value; IsSet.PeerUnidiStreamCount = TRUE; return *this; }
MsQuicSettings& SetStreamRecvWindowDefault(uint32_t Value) { StreamRecvWindowDefault = Value; IsSet.StreamRecvWindowDefault = TRUE; return *this; }
MsQuicSettings& SetMaxBytesPerKey(uint64_t Value) { MaxBytesPerKey = Value; IsSet.MaxBytesPerKey = TRUE; return *this; }
MsQuicSettings& SetMaxAckDelayMs(uint32_t Value) { MaxAckDelayMs = Value; IsSet.MaxAckDelayMs = TRUE; return *this; }
MsQuicSettings& SetMaximumMtu(uint16_t Mtu) { MaximumMtu = Mtu; IsSet.MaximumMtu = TRUE; return *this; }
@ -604,6 +605,7 @@ public:
MsQuicSettings& SetReliableResetEnabled(bool value) { ReliableResetEnabled = value; IsSet.ReliableResetEnabled = TRUE; return *this; }
MsQuicSettings& SetOneWayDelayEnabled(bool value) { OneWayDelayEnabled = value; IsSet.OneWayDelayEnabled = TRUE; return *this; }
MsQuicSettings& SetNetStatsEventEnabled(bool value) { NetStatsEventEnabled = value; IsSet.NetStatsEventEnabled = TRUE; return *this; }
MsQuicSettings& SetStreamMultiReceiveEnabled(bool value) { StreamMultiReceiveEnabled = value; IsSet.StreamMultiReceiveEnabled = TRUE; return *this; }
#endif
QUIC_STATUS

Просмотреть файл

@ -5644,6 +5644,22 @@
],
"macroName": "QuicTraceLogStreamVerbose"
},
"IgnoreRecvFlushByReadPending": {
"ModuleProperites": {},
"TraceString": "[strm][%p] Ignoring recv flush (ReadPendingLenght=%lu)",
"UniqueId": "IgnoreRecvFlushByReadPending",
"splitArgs": [
{
"DefinationEncoding": "p",
"MacroVariableName": "arg1"
},
{
"DefinationEncoding": "lu",
"MacroVariableName": "arg3"
}
],
"macroName": "QuicTraceLogStreamVerbose"
},
"IgnoreUnreachable": {
"ModuleProperites": {},
"TraceString": "[conn][%p] Ignoring received unreachable event (inline)",
@ -10994,6 +11010,30 @@
],
"macroName": "QuicTraceLogError"
},
"SettingsStreamMultiReceiveEnabled": {
"ModuleProperites": {},
"TraceString": "[sett] StreamMultiReceiveEnabled= %hhu",
"UniqueId": "SettingsStreamMultiReceiveEnabled",
"splitArgs": [
{
"DefinationEncoding": "hhu",
"MacroVariableName": "arg2"
}
],
"macroName": "QuicTraceLogVerbose"
},
"SettingStreamMultiReceiveEnabled": {
"ModuleProperites": {},
"TraceString": "[sett] StreamMultiReceiveEnabled = %hhu",
"UniqueId": "SettingStreamMultiReceiveEnabled",
"splitArgs": [
{
"DefinationEncoding": "hhu",
"MacroVariableName": "arg2"
}
],
"macroName": "QuicTraceLogVerbose"
},
"ShutdownImmediatePendingReliableReset": {
"ModuleProperites": {},
"TraceString": "[strm][%p] Invalid immediate shutdown request (pending reliable reset).",
@ -14896,6 +14936,11 @@
"TraceID": "IgnoreRecvFlush",
"EncodingString": "[strm][%p] Ignoring recv flush (recv disabled)"
},
{
"UniquenessHash": "1537935d-ab2d-521e-a290-e59e29d2626d",
"TraceID": "IgnoreRecvFlushByReadPending",
"EncodingString": "[strm][%p] Ignoring recv flush (ReadPendingLenght=%llu)"
},
{
"UniquenessHash": "1e752977-c5b3-6034-4b89-414f9c6ff50f",
"TraceID": "IgnoreUnreachable",
@ -16651,6 +16696,16 @@
"TraceID": "SettingsLoadInvalidOfferedVersion",
"EncodingString": "Invalid OfferedVersion loaded from storage! 0x%x at position %d"
},
{
"UniquenessHash": "7db0f817-cc89-749d-01ec-356d35f7189a",
"TraceID": "SettingsStreamMultiReceiveEnabled",
"EncodingString": "[sett] StreamMultiReceiveEnabled= %hhu"
},
{
"UniquenessHash": "45ba4873-08cc-5dee-18d4-fe33c464ee1f",
"TraceID": "SettingStreamMultiReceiveEnabled",
"EncodingString": "[sett] StreamMultiReceiveEnabled = %hhu"
},
{
"UniquenessHash": "f34a9d8e-7798-1d30-2104-37dac8a3c0e0",
"TraceID": "ShutdownImmediatePendingReliableReset",

Просмотреть файл

@ -945,8 +945,7 @@ PerfClientStream::OnShutdown() {
}
if (Client.PrintThroughput && SendSuccess) {
//const auto ElapsedMicroseconds = SendEndTime - StartTime;
const auto ElapsedMicroseconds = RecvEndTime - StartTime;
const auto ElapsedMicroseconds = CXPLAT_MAX(SendEndTime - StartTime, RecvEndTime - StartTime);
const auto Rate = (uint32_t)((TotalBytes * 1000 * 1000 * 8) / (1000 * ElapsedMicroseconds));
WriteOutput(
"Result: Upload %llu bytes @ %u kbps (%u.%03u ms).\n",

Просмотреть файл

@ -570,6 +570,10 @@ void
QuicTestStreamReliableResetMultipleSends(
);
void
QuicTestStreamMultiReceive(
);
void
QuicTestStreamBlockUnblockConnFlowControl(
_In_ BOOLEAN Bidirectional
@ -1312,4 +1316,7 @@ typedef struct {
#define IOCTL_QUIC_RUN_CONNECTION_PRIORITY \
QUIC_CTL_CODE(123, METHOD_BUFFERED, FILE_WRITE_DATA)
#define QUIC_MAX_IOCTL_FUNC_CODE 123
#define IOCTL_QUIC_RUN_STREAM_MULTI_RECEIVE \
QUIC_CTL_CODE(124, METHOD_BUFFERED, FILE_WRITE_DATA)
#define QUIC_MAX_IOCTL_FUNC_CODE 124

Просмотреть файл

@ -2202,6 +2202,16 @@ TEST(Misc, StreamReliableResetMultipleSends) {
}
#endif // QUIC_PARAM_STREAM_RELIABLE_OFFSET
TEST(Misc, StreamMultiReceive) {
TestLogger Logger("StreamMultiReceive");
if (TestingKernelMode) {
GTEST_SKIP();
ASSERT_TRUE(DriverClient.Run(IOCTL_QUIC_RUN_STREAM_MULTI_RECEIVE));
} else {
QuicTestStreamMultiReceive();
}
}
TEST(Misc, StreamBlockUnblockUnidiConnFlowControl) {
TestLogger Logger("StreamBlockUnblockUnidiConnFlowControl");
if (TestingKernelMode) {

Просмотреть файл

@ -522,6 +522,7 @@ size_t QUIC_IOCTL_BUFFER_SIZES[] =
0,
0,
0,
0,
};
CXPLAT_STATIC_ASSERT(
@ -1456,6 +1457,10 @@ QuicTestCtlEvtIoDeviceControl(
case IOCTL_QUIC_RUN_OPERATION_PRIORITY:
QuicTestCtlRun(QuicTestOperationPriority());
case IOCTL_QUIC_RUN_STREAM_MULTI_RECEIVE:
QuicTestCtlRun(QuicTestStreamMultiReceive());
break;
case IOCTL_QUIC_RUN_CONNECTION_PRIORITY:

Просмотреть файл

@ -4250,3 +4250,220 @@ QuicTestStreamReliableResetMultipleSends(
TEST_TRUE(Context.ShutdownErrorCode == AbortShutdownErrorCode);
}
#endif // QUIC_PARAM_STREAM_RELIABLE_OFFSET
#define MultiRecvNumSend 10
// 1G seems to be too big for CI environment to finish in a reasonable time.
uint8_t Buffer10M[10000000] = {};
struct MultiReceiveTestContext {
CxPlatEvent PktRecvd[MultiRecvNumSend];
MsQuicStream* ServerStream {nullptr};
int Recvd {0};
uint8_t RecvdSignatures[MultiRecvNumSend] {0};
uint64_t PseudoProcessingLength {0};
CXPLAT_LOCK Lock;
uint64_t TotalReceivedBytes {0};
uint64_t TotalSendBytes {0};
uint8_t* RecvBuffer {nullptr};
MultiReceiveTestContext() {
CxPlatLockInitialize(&Lock);
}
~MultiReceiveTestContext() {
CxPlatLockUninitialize(&Lock);
}
static QUIC_STATUS ServerStreamCallback(_In_ MsQuicStream* Stream, _In_opt_ void* Context, _Inout_ QUIC_STREAM_EVENT* Event) {
UNREFERENCED_PARAMETER(Stream);
QUIC_STATUS Status = QUIC_STATUS_SUCCESS;
auto TestContext = (MultiReceiveTestContext*)Context;
if (Event->Type == QUIC_STREAM_EVENT_RECEIVE) {
const QUIC_BUFFER* Buffers = Event->RECEIVE.Buffers;
uint32_t BufferCount = Event->RECEIVE.BufferCount;
TestContext->RecvdSignatures[TestContext->Recvd] = Buffers[BufferCount-1].Buffer[Buffers[BufferCount-1].Length-1];
CxPlatLockAcquire(&TestContext->Lock);
TestContext->PseudoProcessingLength += Event->RECEIVE.TotalBufferLength;
CxPlatLockRelease(&TestContext->Lock);
TestContext->TotalReceivedBytes += Event->RECEIVE.TotalBufferLength;
if (TestContext->RecvBuffer) {
uint64_t Offset = Event->RECEIVE.AbsoluteOffset;
for (uint32_t i = 0; i < BufferCount; i++) {
memcpy(TestContext->RecvBuffer + Offset, Buffers[i].Buffer, Buffers[i].Length);
Offset += Buffers[i].Length;
}
if (TestContext->TotalReceivedBytes == TestContext->TotalSendBytes) {
TestContext->PktRecvd[0].Set();
}
} else {
if (TestContext->RecvdSignatures[TestContext->Recvd] != 0) {
TestContext->PktRecvd[TestContext->Recvd++].Set();
}
}
Status = QUIC_STATUS_PENDING;
}
return Status;
}
static QUIC_STATUS ClientStreamCallback(_In_ MsQuicStream* , _In_opt_ void* , _Inout_ QUIC_STREAM_EVENT* ) {
return QUIC_STATUS_SUCCESS;
}
static QUIC_STATUS ConnCallback(_In_ MsQuicConnection*, _In_opt_ void* Context, _Inout_ QUIC_CONNECTION_EVENT* Event) {
if (Event->Type == QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED) {
auto TestContext = (MultiReceiveTestContext*)Context;
TestContext->ServerStream = new(std::nothrow) MsQuicStream(Event->PEER_STREAM_STARTED.Stream, CleanUpAutoDelete, ServerStreamCallback, Context);
}
return QUIC_STATUS_SUCCESS;
}
};
void
QuicTestStreamMultiReceive(
)
{
MsQuicRegistration Registration(true);
TEST_QUIC_SUCCEEDED(Registration.GetInitStatus());
MsQuicConfiguration ServerConfiguration(Registration, "MsQuicTest", MsQuicSettings().SetPeerUnidiStreamCount(5).SetStreamMultiReceiveEnabled(true), ServerSelfSignedCredConfig);
TEST_QUIC_SUCCEEDED(ServerConfiguration.GetInitStatus());
MsQuicConfiguration ClientConfiguration(Registration, "MsQuicTest", MsQuicCredentialConfig());
TEST_QUIC_SUCCEEDED(ClientConfiguration.GetInitStatus());
// Server side multi receive simple. 3 Sends and Complete at once
{
uint32_t BufferSize = 128;
QUIC_BUFFER Buffer { BufferSize, Buffer10M };
int NumSend = MultiRecvNumSend;
MultiReceiveTestContext Context;
MsQuicAutoAcceptListener Listener(Registration, ServerConfiguration, MultiReceiveTestContext::ConnCallback, &Context);
TEST_QUIC_SUCCEEDED(Listener.GetInitStatus());
TEST_QUIC_SUCCEEDED(Listener.Start("MsQuicTest"));
QuicAddr ServerLocalAddr;
TEST_QUIC_SUCCEEDED(Listener.GetLocalAddr(ServerLocalAddr));
MsQuicConnection Connection(Registration);
TEST_QUIC_SUCCEEDED(Connection.GetInitStatus());
TEST_QUIC_SUCCEEDED(Connection.Start(ClientConfiguration, ServerLocalAddr.GetFamily(), QUIC_TEST_LOOPBACK_FOR_AF(ServerLocalAddr.GetFamily()), ServerLocalAddr.GetPort()));
TEST_TRUE(Connection.HandshakeCompleteEvent.WaitTimeout(TestWaitTimeout));
TEST_TRUE(Connection.HandshakeComplete);
MsQuicStream Stream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, MultiReceiveTestContext::ClientStreamCallback, &Context);
TEST_QUIC_SUCCEEDED(Stream.GetInitStatus());
TEST_QUIC_SUCCEEDED(Stream.Start(QUIC_STREAM_START_FLAG_IMMEDIATE));
for (int i = 0; i < NumSend; i++) {
Buffer.Buffer[BufferSize-1] = ((uint8_t)i % 255) + 1;
TEST_QUIC_SUCCEEDED(Stream.Send(&Buffer, 1, i == NumSend - 1 ? QUIC_SEND_FLAG_FIN : QUIC_SEND_FLAG_NONE));
TEST_TRUE(Context.PktRecvd[i].WaitTimeout(TestWaitTimeout));
}
Context.ServerStream->ReceiveComplete(BufferSize * NumSend);
for (int i = 0; i < NumSend; i++) {
TEST_TRUE(Context.RecvdSignatures[i] == (uint8_t)(i % 255) + 1)
}
}
// Server side multi receive. MultiRecvNumSend Sends and Complete every 8 sends
// Possible packet split
{
uint32_t BufferSize = 2048;
QUIC_BUFFER Buffer { BufferSize, Buffer10M };
int NumSend = MultiRecvNumSend;
MultiReceiveTestContext Context;
MsQuicAutoAcceptListener Listener(Registration, ServerConfiguration, MultiReceiveTestContext::ConnCallback, &Context);
TEST_QUIC_SUCCEEDED(Listener.GetInitStatus());
TEST_QUIC_SUCCEEDED(Listener.Start("MsQuicTest"));
QuicAddr ServerLocalAddr;
TEST_QUIC_SUCCEEDED(Listener.GetLocalAddr(ServerLocalAddr));
MsQuicConnection Connection(Registration);
TEST_QUIC_SUCCEEDED(Connection.GetInitStatus());
TEST_QUIC_SUCCEEDED(Connection.Start(ClientConfiguration, ServerLocalAddr.GetFamily(), QUIC_TEST_LOOPBACK_FOR_AF(ServerLocalAddr.GetFamily()), ServerLocalAddr.GetPort()));
TEST_TRUE(Connection.HandshakeCompleteEvent.WaitTimeout(TestWaitTimeout));
TEST_TRUE(Connection.HandshakeComplete);
MsQuicStream Stream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, MultiReceiveTestContext::ClientStreamCallback, &Context);
TEST_QUIC_SUCCEEDED(Stream.GetInitStatus());
TEST_QUIC_SUCCEEDED(Stream.Start(QUIC_STREAM_START_FLAG_IMMEDIATE));
int lastCompleted = -1;
for (int i = 0; i < NumSend; i++) {
Buffer.Buffer[BufferSize-1] = ((uint8_t)i % 255) + 1;
TEST_QUIC_SUCCEEDED(Stream.Send(&Buffer, 1, i == NumSend - 1 ? QUIC_SEND_FLAG_FIN : QUIC_SEND_FLAG_NONE));
TEST_TRUE(Context.PktRecvd[i].WaitTimeout(TestWaitTimeout));
if ((i + 1) % 8 == 0) { // ReceiveComplete every 8 sends
Context.ServerStream->ReceiveComplete(BufferSize * (i - lastCompleted));
lastCompleted = i;
}
}
if (lastCompleted != NumSend - 1) {
Context.ServerStream->ReceiveComplete(BufferSize * (NumSend - lastCompleted - 1));
}
for (int i = 0; i < NumSend; i++) {
TEST_TRUE(Context.RecvdSignatures[i] == (uint8_t)(i % 255) + 1)
}
}
// Server side multi receive. Send 1G bytes
// handle MAX_STREAM_DATA and STREAM_DATA_BLOCKED,
// potential multi chunk and multi range
{
uint32_t BufferSize = sizeof(Buffer10M);
QUIC_BUFFER Buffer { BufferSize, Buffer10M };
int NumSend = 1;
MultiReceiveTestContext Context;
for (uint32_t i = 0; i < BufferSize; i++) {
Buffer10M[i] = (uint8_t)(i % 255) + 1;
}
// alloc 1G
Context.RecvBuffer = new(std::nothrow) uint8_t[BufferSize];
memset(Context.RecvBuffer, 0, BufferSize);
Context.TotalSendBytes = BufferSize;
MsQuicAutoAcceptListener Listener(Registration, ServerConfiguration, MultiReceiveTestContext::ConnCallback, &Context);
TEST_QUIC_SUCCEEDED(Listener.GetInitStatus());
TEST_QUIC_SUCCEEDED(Listener.Start("MsQuicTest"));
QuicAddr ServerLocalAddr;
TEST_QUIC_SUCCEEDED(Listener.GetLocalAddr(ServerLocalAddr));
MsQuicConnection Connection(Registration);
TEST_QUIC_SUCCEEDED(Connection.GetInitStatus());
TEST_QUIC_SUCCEEDED(Connection.Start(ClientConfiguration, ServerLocalAddr.GetFamily(), QUIC_TEST_LOOPBACK_FOR_AF(ServerLocalAddr.GetFamily()), ServerLocalAddr.GetPort()));
TEST_TRUE(Connection.HandshakeCompleteEvent.WaitTimeout(TestWaitTimeout));
TEST_TRUE(Connection.HandshakeComplete);
MsQuicStream Stream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, MultiReceiveTestContext::ClientStreamCallback, &Context);
TEST_QUIC_SUCCEEDED(Stream.GetInitStatus());
TEST_QUIC_SUCCEEDED(Stream.Start(QUIC_STREAM_START_FLAG_IMMEDIATE));
for (int i = 0; i < NumSend; i++) {
TEST_QUIC_SUCCEEDED(Stream.Send(&Buffer, 1, i == NumSend - 1 ? QUIC_SEND_FLAG_FIN : QUIC_SEND_FLAG_NONE));
uint64_t CompletingLength = 0;
while (!(Context.PktRecvd[0].WaitTimeout(1))) {
CxPlatLockAcquire(&Context.Lock);
CompletingLength = Context.PseudoProcessingLength;
Context.PseudoProcessingLength = 0;
CxPlatLockRelease(&Context.Lock);
if (CompletingLength > 0) {
Context.ServerStream->ReceiveComplete(CompletingLength);
}
}
if (Context.PseudoProcessingLength > 0) {
Context.ServerStream->ReceiveComplete(Context.PseudoProcessingLength);
Context.PseudoProcessingLength = 0;
}
}
TEST_TRUE(Context.TotalReceivedBytes == BufferSize * NumSend);
TEST_EQUAL(0, memcmp(Buffer10M, Context.RecvBuffer, BufferSize));
delete[] Context.RecvBuffer;
}
}

Просмотреть файл

@ -8,8 +8,12 @@ Abstract:
This tool creates a terminating QUIC proxy to forward all incoming traffic
to a specified target.
N.B. Better synchronization between peer objects is needed around teardown.
--*/
#define QUIC_API_ENABLE_PREVIEW_FEATURES 1 // for multiple receive
#include "msquichelper.h"
#include "msquic.hpp"
@ -19,6 +23,7 @@ const char* BackEndTarget;
uint16_t BackEndPort;
QUIC_CERTIFICATE_HASH Cert;
bool BufferedMode = true;
uint32_t FlowControlWindow = 0;
const MsQuicApi* MsQuic;
MsQuicRegistration* Registration;
@ -26,7 +31,7 @@ MsQuicConfiguration* FrontEndConfiguration;
MsQuicConfiguration* BackEndConfiguration;
#define USAGE \
"Usage: quicforward <alpn> <local-port> <target-name/ip>:<target-port> <thumbprint> [0/1-buffered-mode]\n"
"Usage: quicforward <alpn> <local-port> <target-name/ip>:<target-port> <thumbprint> [0/1-buffered-mode] [fc-window]\n"
bool ParseArgs(int argc, char **argv) {
if (argc < 5) {
@ -49,12 +54,15 @@ bool ParseArgs(int argc, char **argv) {
if (argc > 5) {
BufferedMode = atoi(argv[5]) != 0;
}
if (argc > 6) {
FlowControlWindow = (uint32_t)atoi(argv[6]);
}
return true;
}
struct ForwardedSend {
uint64_t TotalLength;
QUIC_BUFFER Buffers[2];
QUIC_BUFFER Buffers[3];
static ForwardedSend* New(QUIC_STREAM_EVENT* Event) {
if (BufferedMode) {
auto SendContext = (ForwardedSend*)malloc(sizeof(ForwardedSend) + (size_t)Event->RECEIVE.TotalBufferLength);
@ -73,7 +81,8 @@ struct ForwardedSend {
auto SendContext = new(std::nothrow) ForwardedSend;
SendContext->TotalLength = Event->RECEIVE.TotalBufferLength;
for (uint32_t i = 0; i < Event->RECEIVE.BufferCount; ++i) {
SendContext->Buffers[i] = Event->RECEIVE.Buffers[i];
SendContext->Buffers[i].Length = Event->RECEIVE.Buffers[i].Length;
SendContext->Buffers[i].Buffer = Event->RECEIVE.Buffers[i].Buffer;
}
return SendContext;
}
@ -90,15 +99,27 @@ QUIC_STATUS StreamCallback(
)
{
auto PeerStream = (MsQuicStream*)Context;
if (!PeerStream || !PeerStream->Handle) { return QUIC_STATUS_SUCCESS; }
switch (Event->Type) {
case QUIC_STREAM_EVENT_RECEIVE: {
//printf("s[%p] Received %llu bytes\n", Stream, Event->RECEIVE.TotalBufferLength);
if (Event->RECEIVE.TotalBufferLength == 0) {
if (Event->RECEIVE.Flags & QUIC_RECEIVE_FLAG_FIN) {
PeerStream->Shutdown(0, QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL);
}
return QUIC_STATUS_SUCCESS;
}
auto SendContext = ForwardedSend::New(Event);
QUIC_SEND_FLAGS Flags = QUIC_SEND_FLAG_START;
if (Event->RECEIVE.Flags & QUIC_RECEIVE_FLAG_FIN) { Flags |= QUIC_SEND_FLAG_FIN; }
if (Event->RECEIVE.Flags & QUIC_RECEIVE_FLAG_0_RTT) { Flags |= QUIC_SEND_FLAG_ALLOW_0_RTT; }
CXPLAT_FRE_ASSERT(QUIC_SUCCEEDED(
PeerStream->Send(SendContext->Buffers, Event->RECEIVE.BufferCount, Flags, SendContext)));
auto Status =
PeerStream->Send(SendContext->Buffers, Event->RECEIVE.BufferCount, Flags, SendContext);
if (Status == QUIC_STATUS_ABORTED || Status == QUIC_STATUS_INVALID_STATE) {
ForwardedSend::Delete(SendContext);
return QUIC_STATUS_SUCCESS;
}
CXPLAT_FRE_ASSERT(QUIC_SUCCEEDED(Status));
return BufferedMode ? QUIC_STATUS_SUCCESS : QUIC_STATUS_PENDING;
}
case QUIC_STREAM_EVENT_SEND_COMPLETE: {
@ -191,12 +212,17 @@ int QUIC_MAIN_EXPORT main(int argc, char **argv) {
MsQuicApi _MsQuic;
CXPLAT_FRE_ASSERT(_MsQuic.IsValid());
MsQuic = &_MsQuic;
MsQuicRegistration Reg(true);
MsQuicRegistration Reg("forwarder", QUIC_EXECUTION_PROFILE_TYPE_MAX_THROUGHPUT, true); // TODO - make a knob for low lat vs max tput
Registration = &Reg;
MsQuicSettings Settings;
Settings.SetSendBufferingEnabled(false);
Settings.SetStreamMultiReceiveEnabled(true);
Settings.SetPeerBidiStreamCount(1000);
Settings.SetPeerUnidiStreamCount(1000);
if (FlowControlWindow) {
Settings.SetStreamRecvWindowDefault(FlowControlWindow);
Settings.SetConnFlowControlWindow(FlowControlWindow);
}
MsQuicConfiguration FrontEndConfig(Reg, Alpn, Settings, MsQuicCredentialConfig(QUIC_CREDENTIAL_FLAG_NONE, &Cert));
CXPLAT_FRE_ASSERT(FrontEndConfig.IsValid());
FrontEndConfiguration = &FrontEndConfig;