Multi-Core XDP Polling Support (#2578)

This commit is contained in:
Nick Banks 2022-03-29 16:19:39 -04:00 коммит произвёл GitHub
Родитель 69184d1616
Коммит e59a8818cb
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
20 изменённых файлов: 205 добавлений и 166 удалений

Просмотреть файл

@ -27,6 +27,7 @@ pr:
include:
- .azure/azure-pipelines.perf.yml
- .azure/templates/run-performance.yml
- src/bin/*
- src/core/*
- src/platform/*
- src/perf/*
@ -43,6 +44,10 @@ parameters:
type: boolean
displayName: Windows (Schannel)
default: true
- name: winuser_xdp
type: boolean
displayName: Windows (XDP, Schannel)
default: true
- name: winuser_openssl
type: boolean
displayName: Windows (OpenSSL)
@ -143,6 +148,27 @@ stages:
extraBuildArgs: -DisableTest -DisableTools
${{ if eq(parameters.pgo_mode, true) }}:
extraBuildArgs: -DisableTest -DisableTools -PGO
- ${{ if eq(parameters.winuser_xdp, true) }}:
- stage: build_winuser_xdp
displayName: Build Windows (XDP)
dependsOn: []
variables:
runCodesignValidationInjection: false
jobs:
- template: ./templates/build-config-user.yml
parameters:
image: windows-latest
platform: windows
arch: ${{ parameters.arch }}
tls: schannel
config: Release
extraName: 'xdp'
extraPrepareArgs: -DisableTest -InstallXdpSdk
${{ if eq(parameters.pgo_mode, false) }}:
extraBuildArgs: -DisableTest -DisableTools -UseXdp -ExtraArtifactDir Xdp
${{ if eq(parameters.pgo_mode, true) }}:
extraBuildArgs: -DisableTest -DisableTools -UseXdp -ExtraArtifactDir Xdp -PGO
- ${{ if eq(parameters.winuser_openssl, true) }}:
- stage: build_winuser_openssl

Просмотреть файл

@ -88,7 +88,7 @@ These parameters are accessed by calling [GetParam](./api/GetParam.md) or [SetPa
| `QUIC_PARAM_GLOBAL_GLOBAL_SETTINGS`<br> 6 | QUIC_GLOBAL_SETTINGS | Both | Globally change global only settings. |
| `QUIC_PARAM_GLOBAL_VERSION_SETTINGS`<br> 7 | QUIC_VERSIONS_SETTINGS | Both | Globally change version settings for all subsequent connections. |
| `QUIC_PARAM_GLOBAL_LIBRARY_GIT_HASH`<br> 8 | char[64] | Get-only | Git hash used to build MsQuic (null terminated string) |
| `QUIC_PARAM_GLOBAL_RAW_DATAPATH_PROCS`<br> 9 | uint16_t[] | Both | Globally change the list of CPUs that raw datapath can use. Must be set before opening registration. |
| `QUIC_PARAM_GLOBAL_DATAPATH_PROCESSORS`<br> 9 | uint16_t[] | Both | Globally change the list of CPUs that datapath can use. Must be set before opening registration. |
### Registration Parameters

Просмотреть файл

@ -476,10 +476,10 @@ MsQuicLibraryUninitialize(
if (MsQuicLib.Datapath != NULL) {
CxPlatDataPathUninitialize(MsQuicLib.Datapath);
MsQuicLib.Datapath = NULL;
if (MsQuicLib.RawDataPathProcList != NULL) {
CXPLAT_FREE(MsQuicLib.RawDataPathProcList, QUIC_POOL_RAW_DATAPATH_PROCS);
MsQuicLib.RawDataPathProcList = NULL;
MsQuicLib.RawDataPathProcListLength = 0;
if (MsQuicLib.DataPathProcList != NULL) {
CXPLAT_FREE(MsQuicLib.DataPathProcList, QUIC_POOL_RAW_DATAPATH_PROCS);
MsQuicLib.DataPathProcList = NULL;
MsQuicLib.DataPathProcListLength = 0;
}
}
@ -893,12 +893,12 @@ QuicLibrarySetGlobalParam(
break;
case QUIC_PARAM_GLOBAL_RAW_DATAPATH_PROCS: {
case QUIC_PARAM_GLOBAL_DATAPATH_PROCESSORS: {
if (BufferLength == 0) {
if (MsQuicLib.RawDataPathProcList != NULL) {
CXPLAT_FREE(MsQuicLib.RawDataPathProcList, QUIC_POOL_RAW_DATAPATH_PROCS);
MsQuicLib.RawDataPathProcList = NULL;
MsQuicLib.RawDataPathProcListLength = 0;
if (MsQuicLib.DataPathProcList != NULL) {
CXPLAT_FREE(MsQuicLib.DataPathProcList, QUIC_POOL_RAW_DATAPATH_PROCS);
MsQuicLib.DataPathProcList = NULL;
MsQuicLib.DataPathProcListLength = 0;
}
Status = QUIC_STATUS_SUCCESS;
break;
@ -918,9 +918,9 @@ QuicLibrarySetGlobalParam(
break;
}
uint32_t RawDataPathProcListLength = BufferLength / sizeof(uint16_t);
uint32_t DataPathProcListLength = BufferLength / sizeof(uint16_t);
uint16_t* Cpus = (uint16_t*)Buffer;
for (uint32_t i = 0; i < RawDataPathProcListLength; ++i) {
for (uint32_t i = 0; i < DataPathProcListLength; ++i) {
if (*(Cpus + i) >= CxPlatProcActiveCount()) {
Status = QUIC_STATUS_INVALID_PARAMETER;
break;
@ -935,8 +935,8 @@ QuicLibrarySetGlobalParam(
break;
}
uint16_t* RawDataPathProcList = CXPLAT_ALLOC_NONPAGED(BufferLength, QUIC_POOL_RAW_DATAPATH_PROCS);
if (RawDataPathProcList == NULL) {
uint16_t* DataPathProcList = CXPLAT_ALLOC_NONPAGED(BufferLength, QUIC_POOL_RAW_DATAPATH_PROCS);
if (DataPathProcList == NULL) {
QuicTraceEvent(
AllocFailure,
"Allocation of '%s' failed. (%llu bytes)",
@ -946,19 +946,19 @@ QuicLibrarySetGlobalParam(
break;
}
if (MsQuicLib.RawDataPathProcList != NULL) {
CXPLAT_FREE(MsQuicLib.RawDataPathProcList, QUIC_POOL_RAW_DATAPATH_PROCS);
MsQuicLib.RawDataPathProcList = NULL;
MsQuicLib.RawDataPathProcListLength = 0;
if (MsQuicLib.DataPathProcList != NULL) {
CXPLAT_FREE(MsQuicLib.DataPathProcList, QUIC_POOL_RAW_DATAPATH_PROCS);
MsQuicLib.DataPathProcList = NULL;
MsQuicLib.DataPathProcListLength = 0;
}
CxPlatCopyMemory(RawDataPathProcList, Buffer, BufferLength);
MsQuicLib.RawDataPathProcList = RawDataPathProcList;
MsQuicLib.RawDataPathProcListLength = RawDataPathProcListLength;
CxPlatCopyMemory(DataPathProcList, Buffer, BufferLength);
MsQuicLib.DataPathProcList = DataPathProcList;
MsQuicLib.DataPathProcListLength = DataPathProcListLength;
QuicTraceLogInfo(
LibraryRawDataPathProcsSet,
"[ lib] Setting raw datapath procs");
LibraryDataPathProcsSet,
"[ lib] Setting datapath procs");
Status = QUIC_STATUS_SUCCESS;
break;
@ -1192,14 +1192,14 @@ QuicLibraryGetGlobalParam(
Status = QUIC_STATUS_SUCCESS;
break;
case QUIC_PARAM_GLOBAL_RAW_DATAPATH_PROCS:
if (*BufferLength == 0 && MsQuicLib.RawDataPathProcListLength == 0) {
case QUIC_PARAM_GLOBAL_DATAPATH_PROCESSORS:
if (*BufferLength == 0 && MsQuicLib.DataPathProcListLength == 0) {
Status = QUIC_STATUS_SUCCESS;
break;
}
if (*BufferLength < sizeof(uint16_t) * MsQuicLib.RawDataPathProcListLength) {
*BufferLength = sizeof(uint16_t) * MsQuicLib.RawDataPathProcListLength;
if (*BufferLength < sizeof(uint16_t) * MsQuicLib.DataPathProcListLength) {
*BufferLength = sizeof(uint16_t) * MsQuicLib.DataPathProcListLength;
Status = QUIC_STATUS_BUFFER_TOO_SMALL;
break;
}
@ -1209,9 +1209,9 @@ QuicLibraryGetGlobalParam(
break;
}
*BufferLength = sizeof(uint16_t) * MsQuicLib.RawDataPathProcListLength;
if (MsQuicLib.RawDataPathProcList != NULL) {
CxPlatCopyMemory(Buffer, MsQuicLib.RawDataPathProcList, *BufferLength);
*BufferLength = sizeof(uint16_t) * MsQuicLib.DataPathProcListLength;
if (MsQuicLib.DataPathProcList != NULL) {
CxPlatCopyMemory(Buffer, MsQuicLib.DataPathProcList, *BufferLength);
}
Status = QUIC_STATUS_SUCCESS;
break;

Просмотреть файл

@ -208,8 +208,8 @@ typedef struct QUIC_LIBRARY {
//
// Processor candidates for raw datapath threads.
//
uint16_t* RawDataPathProcList;
uint32_t RawDataPathProcListLength;
uint16_t* DataPathProcList;
uint32_t DataPathProcListLength;
//
// Datapath instance for the library.

Просмотреть файл

@ -64,8 +64,8 @@ MsQuicRegistrationOpen(
CxPlatLockAcquire(&MsQuicLib.Lock);
if (MsQuicLib.Datapath == NULL) {
CXPLAT_DATAPATH_CONFIG DataPathConfig = {
MsQuicLib.RawDataPathProcList,
MsQuicLib.RawDataPathProcListLength
MsQuicLib.DataPathProcList,
MsQuicLib.DataPathProcListLength
};
Status =
CxPlatDataPathInitialize(

Просмотреть файл

@ -429,14 +429,14 @@ TEST(SettingsTest, GlobalRawDataPathProcsSetAndGet)
ASSERT_EQ(
QUIC_STATUS_SUCCESS,
QuicLibrarySetGlobalParam(
QUIC_PARAM_GLOBAL_RAW_DATAPATH_PROCS,
QUIC_PARAM_GLOBAL_DATAPATH_PROCESSORS,
BufferLength,
SetCpus));
BufferLength = 0;
ASSERT_EQ(
QUIC_STATUS_BUFFER_TOO_SMALL,
QuicLibraryGetGlobalParam(
QUIC_PARAM_GLOBAL_RAW_DATAPATH_PROCS,
QUIC_PARAM_GLOBAL_DATAPATH_PROCESSORS,
&BufferLength,
NULL));
ASSERT_EQ(SetCpusSize, BufferLength / sizeof(uint16_t));
@ -444,7 +444,7 @@ TEST(SettingsTest, GlobalRawDataPathProcsSetAndGet)
ASSERT_EQ(
QUIC_STATUS_SUCCESS,
QuicLibraryGetGlobalParam(
QUIC_PARAM_GLOBAL_RAW_DATAPATH_PROCS,
QUIC_PARAM_GLOBAL_DATAPATH_PROCESSORS,
&BufferLength,
GetCpus));
ASSERT_EQ(0, memcmp(GetCpus, SetCpus, SetCpusSize * sizeof(uint16_t)));
@ -454,14 +454,14 @@ TEST(SettingsTest, GlobalRawDataPathProcsSetAndGet)
ASSERT_EQ(
QUIC_STATUS_SUCCESS,
QuicLibrarySetGlobalParam(
QUIC_PARAM_GLOBAL_RAW_DATAPATH_PROCS,
QUIC_PARAM_GLOBAL_DATAPATH_PROCESSORS,
0,
NULL));
BufferLength = 0;
ASSERT_EQ(
QUIC_STATUS_SUCCESS,
QuicLibraryGetGlobalParam(
QUIC_PARAM_GLOBAL_RAW_DATAPATH_PROCS,
QUIC_PARAM_GLOBAL_DATAPATH_PROCESSORS,
&BufferLength,
NULL));
ASSERT_EQ((uint32_t)0, BufferLength);
@ -472,7 +472,7 @@ TEST(SettingsTest, GlobalRawDataPathProcsSetAndGet)
ASSERT_EQ(
QUIC_STATUS_INVALID_PARAMETER,
QuicLibrarySetGlobalParam(
QUIC_PARAM_GLOBAL_RAW_DATAPATH_PROCS,
QUIC_PARAM_GLOBAL_DATAPATH_PROCESSORS,
sizeof(SetCpus),
SetCpus));
}
@ -484,7 +484,7 @@ TEST(SettingsTest, GlobalRawDataPathProcsSetAfterDataPathInit)
ASSERT_EQ(
QUIC_STATUS_INVALID_STATE,
QuicLibrarySetGlobalParam(
QUIC_PARAM_GLOBAL_RAW_DATAPATH_PROCS,
QUIC_PARAM_GLOBAL_DATAPATH_PROCESSORS,
sizeof(SetCpus),
SetCpus));
}

Просмотреть файл

@ -2552,8 +2552,8 @@ namespace Microsoft.Quic
[NativeTypeName("#define QUIC_PARAM_GLOBAL_LIBRARY_GIT_HASH 0x01000008")]
public const int QUIC_PARAM_GLOBAL_LIBRARY_GIT_HASH = 0x01000008;
[NativeTypeName("#define QUIC_PARAM_GLOBAL_RAW_DATAPATH_PROCS 0x01000009")]
public const int QUIC_PARAM_GLOBAL_RAW_DATAPATH_PROCS = 0x01000009;
[NativeTypeName("#define QUIC_PARAM_GLOBAL_DATAPATH_PROCESSORS 0x01000009")]
public const int QUIC_PARAM_GLOBAL_DATAPATH_PROCESSORS = 0x01000009;
[NativeTypeName("#define QUIC_PARAM_CONFIGURATION_SETTINGS 0x03000000")]
public const int QUIC_PARAM_CONFIGURATION_SETTINGS = 0x03000000;

Просмотреть файл

@ -192,15 +192,15 @@ tracepoint(CLOG_LIBRARY_C, LibrarySetSettings );\
/*----------------------------------------------------------
// Decoder Ring for LibraryRawDataPathProcsSet
// [ lib] Setting raw datapath procs
// Decoder Ring for LibraryDataPathProcsSet
// [ lib] Setting datapath procs
// QuicTraceLogInfo(
LibraryRawDataPathProcsSet,
"[ lib] Setting raw datapath procs");
LibraryDataPathProcsSet,
"[ lib] Setting datapath procs");
----------------------------------------------------------*/
#ifndef _clog_2_ARGS_TRACE_LibraryRawDataPathProcsSet
#define _clog_2_ARGS_TRACE_LibraryRawDataPathProcsSet(uniqueId, encoded_arg_string)\
tracepoint(CLOG_LIBRARY_C, LibraryRawDataPathProcsSet );\
#ifndef _clog_2_ARGS_TRACE_LibraryDataPathProcsSet
#define _clog_2_ARGS_TRACE_LibraryDataPathProcsSet(uniqueId, encoded_arg_string)\
tracepoint(CLOG_LIBRARY_C, LibraryDataPathProcsSet );\
#endif

Просмотреть файл

@ -161,13 +161,13 @@ TRACEPOINT_EVENT(CLOG_LIBRARY_C, LibrarySetSettings,
/*----------------------------------------------------------
// Decoder Ring for LibraryRawDataPathProcsSet
// [ lib] Setting raw datapath procs
// Decoder Ring for LibraryDataPathProcsSet
// [ lib] Setting datapath procs
// QuicTraceLogInfo(
LibraryRawDataPathProcsSet,
"[ lib] Setting raw datapath procs");
LibraryDataPathProcsSet,
"[ lib] Setting datapath procs");
----------------------------------------------------------*/
TRACEPOINT_EVENT(CLOG_LIBRARY_C, LibraryRawDataPathProcsSet,
TRACEPOINT_EVENT(CLOG_LIBRARY_C, LibraryDataPathProcsSet,
TP_ARGS(
),
TP_FIELDS(

Просмотреть файл

@ -704,7 +704,7 @@ void
#ifdef QUIC_API_ENABLE_PREVIEW_FEATURES
#define QUIC_PARAM_GLOBAL_VERSION_SETTINGS 0x01000007 // QUIC_VERSION_SETTINGS
#define QUIC_PARAM_GLOBAL_LIBRARY_GIT_HASH 0x01000008 // char[64]
#define QUIC_PARAM_GLOBAL_RAW_DATAPATH_PROCS 0x01000009 // uint32_t[]
#define QUIC_PARAM_GLOBAL_DATAPATH_PROCESSORS 0x01000009 // uint16_t[]
#endif
//

Просмотреть файл

@ -344,8 +344,8 @@ void
typedef CXPLAT_DATAPATH_SEND_COMPLETE *CXPLAT_DATAPATH_SEND_COMPLETE_HANDLER;
typedef struct CXPLAT_DATAPATH_CONFIG {
const uint16_t* RawDataPathProcList; // Processor index candidates
uint32_t RawDataPathProcListLength;
const uint16_t* DataPathProcList; // Processor index candidates
uint32_t DataPathProcListLength;
} CXPLAT_DATAPATH_CONFIG;
//

Просмотреть файл

@ -5574,6 +5574,13 @@
],
"macroName": "QuicTraceLogInfo"
},
"LibraryDataPathProcsSet": {
"ModuleProperites": {},
"TraceString": "[ lib] Setting datapath procs",
"UniqueId": "LibraryDataPathProcsSet",
"splitArgs": [],
"macroName": "QuicTraceLogInfo"
},
"LibraryError": {
"ModuleProperites": {},
"TraceString": "[ lib] ERROR, %s.",
@ -13030,6 +13037,11 @@
"TraceID": "LibraryCidLengthSet",
"EncodingString": "[ lib] CID Length = %hhu"
},
{
"UniquenessHash": "1324b8f1-5f38-c7ca-d7e9-51314f15e172",
"TraceID": "LibraryDataPathProcsSet",
"EncodingString": "[ lib] Setting datapath procs"
},
{
"UniquenessHash": "2cbc406b-c1c0-ba4d-eee8-669f30330f4a",
"TraceID": "LibraryError",

Просмотреть файл

@ -26,6 +26,7 @@ PrintHelp(
" -target:<####> The target server to connect to.\n"
" -runtime:<####> The total runtime (in ms). (def:%u)\n"
" -encrypt:<0/1> Enables/disables encryption. (def:1)\n"
" -inline:<0/1> Configured sending requests inline. (def:0)\n"
" -port:<####> The UDP port of the server. (def:%u)\n"
" -ip:<0/4/6> A hint for the resolving the hostname to an IP address. (def:0)\n"
" -cibir:<hex_bytes> A CIBIR well-known idenfitier.\n"
@ -76,6 +77,7 @@ RpsClient::Init(
TryGetValue(argc, argv, "runtime", &RunTime);
TryGetValue(argc, argv, "encrypt", &UseEncryption);
TryGetValue(argc, argv, "inline", &SendInline);
TryGetValue(argc, argv, "port", &Port);
TryGetValue(argc, argv, "conns", &ConnectionCount);
RequestCount = 2 * ConnectionCount;
@ -501,7 +503,7 @@ RpsConnectionContext::SendRequest(bool DelaySend) {
void
RpsWorkerContext::QueueSendRequest() {
if (Client->Running) {
if (ThreadStarted) {
if (ThreadStarted && !Client->SendInline) {
InterlockedIncrement((long*)&RequestCount);
CxPlatEventSet(WakeEvent);
} else {

Просмотреть файл

@ -175,6 +175,7 @@ public:
QUIC_ADDRESS_FAMILY RemoteFamily {QUIC_ADDRESS_FAMILY_UNSPEC};
UniquePtr<char[]> Target;
uint8_t UseEncryption {TRUE};
uint8_t SendInline {FALSE};
uint32_t RunTime {RPS_DEFAULT_RUN_TIME};
uint32_t ConnectionCount {RPS_DEFAULT_CONNECTION_COUNT};
uint32_t RequestCount {RPS_DEFAULT_CONNECTION_COUNT * 2};

Просмотреть файл

@ -87,8 +87,10 @@ PrintHelp(
" -cibir:<hex_bytes> A CIBIR well-known idenfitier.\n"
"\n"
"Client: secnetperf -TestName:<Throughput|RPS|HPS> [options]\n"
#ifndef _KERNEL_MODE
"Both:\n"
" -cpu:<cpu_index> Specify a processor for raw datapath thread(s) to run on.\n"
" -cpu:<cpu_index> Specify the processor(s) for the datapath to use.\n"
#endif // _KERNEL_MODE
"\n"
);
}
@ -177,18 +179,27 @@ QuicMainStart(
return Status;
}
uint32_t RawDatapathCpu;
if (TryGetValue(argc, argv, "cpu", &RawDatapathCpu)) {
#ifndef _KERNEL_MODE
const char* CpuStr;
if ((CpuStr = GetValue(argc, argv, "cpu")) != nullptr) {
uint16_t ProcList[64];
uint32_t ProcCount = 0;
do {
if (*CpuStr == ',') CpuStr++;
ProcList[ProcCount++] = (uint16_t)strtoul(CpuStr, (char**)&CpuStr, 10);
} while (*CpuStr && ProcCount < ARRAYSIZE(ProcList));
if (QUIC_FAILED(
Status =
MsQuic->SetParam(
nullptr,
QUIC_PARAM_GLOBAL_RAW_DATAPATH_PROCS,
sizeof(RawDatapathCpu),
&RawDatapathCpu))) {
WriteOutput("MsQuic Failed To Set Raw DataPath Procs %d\n", Status);
QUIC_PARAM_GLOBAL_DATAPATH_PROCESSORS,
ProcCount * sizeof(uint16_t),
ProcList))) {
WriteOutput("MsQuic Failed To Set DataPath Procs %d\n", Status);
return Status;
}
}
#endif // _KERNEL_MODE
if (ServerMode) {
TestToRun = new(std::nothrow) PerfServer(SelfSignedCredConfig);

Просмотреть файл

@ -113,7 +113,7 @@ CxPlatDataPathInitialize(
)
{
QUIC_STATUS Status = QUIC_STATUS_SUCCESS;
const size_t DatapathSize = CxPlatDpRawGetDapathSize();
const size_t DatapathSize = CxPlatDpRawGetDatapathSize(Config);
CXPLAT_FRE_ASSERT(DatapathSize > sizeof(CXPLAT_DATAPATH));
UNREFERENCED_PARAMETER(TcpCallbacks);
@ -190,26 +190,6 @@ CxPlatDataPathUninitialize(
CXPLAT_FREE(Datapath, QUIC_POOL_DATAPATH);
}
_IRQL_requires_max_(PASSIVE_LEVEL)
void
CxPlatDpRawGenerateCpuTable(
_Inout_ CXPLAT_DATAPATH* Datapath
)
{
Datapath->NumaNode = (uint8_t)CxPlatProcessorInfo[Datapath->Cpu].NumaNode;
//
// Build up the set of CPUs that are on the same NUMA node as this one.
//
Datapath->CpuTableSize = 0;
for (uint16_t i = 0; i < CxPlatProcMaxCount(); i++) {
if (i != Datapath->Cpu && // Skip raw layer's CPU
CxPlatProcessorInfo[i].NumaNode == Datapath->NumaNode) {
Datapath->CpuTable[Datapath->CpuTableSize++] = i;
}
}
}
_IRQL_requires_max_(DISPATCH_LEVEL)
uint32_t
CxPlatDataPathGetSupportedFeatures(

Просмотреть файл

@ -61,12 +61,6 @@ typedef struct CXPLAT_DATAPATH {
CXPLAT_ROUTE_RESOLUTION_WORKER* RouteResolutionWorker;
// RSS stuff
uint16_t Cpu;
uint8_t NumaNode;
uint8_t CpuTableSize;
uint16_t CpuTable[64];
CXPLAT_LIST_ENTRY Interfaces;
} CXPLAT_DATAPATH;
@ -106,8 +100,8 @@ typedef struct CXPLAT_SEND_DATA {
//
_IRQL_requires_max_(PASSIVE_LEVEL)
size_t
CxPlatDpRawGetDapathSize(
void
CxPlatDpRawGetDatapathSize(
_In_opt_ const CXPLAT_DATAPATH_CONFIG* Config
);
//
@ -118,7 +112,7 @@ QUIC_STATUS
CxPlatDpRawInitialize(
_Inout_ CXPLAT_DATAPATH* Datapath,
_In_ uint32_t ClientRecvContextLength,
_In_opt_ CXPLAT_DATAPATH_CONFIG* Config
_In_opt_ const CXPLAT_DATAPATH_CONFIG* Config
);
//
@ -130,15 +124,6 @@ CxPlatDpRawUninitialize(
_In_ CXPLAT_DATAPATH* Datapath
);
//
// Upcall from raw datapath to generate the CPU table used for RSS.
//
_IRQL_requires_max_(PASSIVE_LEVEL)
void
CxPlatDpRawGenerateCpuTable(
_Inout_ CXPLAT_DATAPATH* Datapath
);
//
// Called on creation and deletion of a socket. It indicates to the raw datapath
// that it should update any filtering rules as necessary.

Просмотреть файл

@ -113,8 +113,8 @@ CxPlatDpdkReadConfig(
//
// Read user-specified global config.
//
if (Config != NULL && Config->RawDataPathProcList != NULL) {
Dpdk->Cpu = Config->RawDataPathProcList[0];
if (Config != NULL && Config->DataPathProcList != NULL) {
Dpdk->Cpu = Config->DataPathProcList[0];
}
FILE *File = fopen("dpdk.ini", "r");
@ -143,10 +143,11 @@ CxPlatDpdkReadConfig(
_IRQL_requires_max_(PASSIVE_LEVEL)
size_t
CxPlatDpRawGetDapathSize(
void
CxPlatDpRawGetDatapathSize(
_In_opt_ const CXPLAT_DATAPATH_CONFIG* Config
)
{
UNREFERENCED_PARAMETER(Config);
return sizeof(DPDK_DATAPATH);
}
@ -155,7 +156,7 @@ QUIC_STATUS
CxPlatDpRawInitialize(
_Inout_ CXPLAT_DATAPATH* Datapath,
_In_ uint32_t ClientRecvContextLength,
_In_opt_ CXPLAT_DATAPATH_CONFIG* Config
_In_opt_ const CXPLAT_DATAPATH_CONFIG* Config
)
{
DPDK_DATAPATH* Dpdk = (DPDK_DATAPATH*)Datapath;
@ -166,7 +167,6 @@ CxPlatDpRawInitialize(
sizeof(DPDK_RX_PACKET) + ClientRecvContextLength;
CxPlatDpdkReadConfig(Dpdk, Config);
CxPlatDpRawGenerateCpuTable(Datapath);
BOOLEAN CleanUpThread = FALSE;
CxPlatEventInitialize(&Dpdk->StartComplete, TRUE, FALSE);

Просмотреть файл

@ -580,10 +580,6 @@ CxPlatDpRawParseUdp(
Packet->Buffer = (uint8_t*)Udp->Data;
Packet->BufferLength = Length;
//const uint32_t Hash = CxPlatHashSimple(sizeof(*Packet->Route), (uint8_t*)Packet->Route);
const uint32_t Hash = Udp->SourcePort + Udp->DestinationPort;
Packet->PartitionIndex = Datapath->CpuTable[Hash % Datapath->CpuTableSize];
}
_IRQL_requires_max_(DISPATCH_LEVEL)

Просмотреть файл

@ -33,8 +33,9 @@ Abstract:
typedef struct XDP_INTERFACE XDP_INTERFACE;
typedef struct _XDP_QUEUE {
typedef struct XDP_QUEUE {
const XDP_INTERFACE* Interface;
struct XDP_QUEUE* Next;
uint8_t* RxBuffers;
HANDLE RxXsk;
XSK_RING RxFillRing;
@ -66,26 +67,41 @@ typedef struct XDP_INTERFACE {
uint8_t RuleCount;
CXPLAT_LOCK RuleLock;
XDP_RULE* Rules;
XDP_QUEUE* Queues;
XDP_QUEUE* Queues; // An array of queues.
} XDP_INTERFACE;
typedef struct QUIC_CACHEALIGN XDP_WORKER {
const struct XDP_DATAPATH* Xdp;
HANDLE CompletionEvent;
XDP_QUEUE* Queues; // A linked list of queues, accessed by Next.
uint16_t ProcIndex;
} XDP_WORKER;
void XdpWorkerAddQueue(_In_ XDP_WORKER* Worker, _In_ XDP_QUEUE* Queue) {
XDP_QUEUE** Tail = &Worker->Queues;
while (*Tail != NULL) {
Tail = &(*Tail)->Next;
}
*Tail = Queue;
Queue->Next = NULL;
}
typedef struct XDP_DATAPATH {
CXPLAT_DATAPATH;
BOOLEAN Running;
HANDLE CompletionEvent;
// Constants
DECLSPEC_CACHEALIGN
//
// Currently, all XDP interfaces share the same config.
//
uint32_t WorkerCount;
uint32_t RxBufferCount;
uint32_t RxRingSize;
uint32_t TxBufferCount;
uint32_t TxRingSize;
BOOLEAN TxAlwaysPoke;
BOOLEAN SkipXsum;
BOOLEAN Running; // Signal to stop workers.
XDP_WORKER Workers[0];
} XDP_DATAPATH;
typedef struct DECLSPEC_ALIGN(MEMORY_ALLOCATION_ALIGNMENT) XDP_RX_PACKET {
@ -365,8 +381,7 @@ Cleanup:
_IRQL_requires_max_(PASSIVE_LEVEL)
void
CxPlatXdpReadConfig(
_Inout_ XDP_DATAPATH* Xdp,
_In_opt_ CXPLAT_DATAPATH_CONFIG* Config
_Inout_ XDP_DATAPATH* Xdp
)
{
//
@ -377,14 +392,6 @@ CxPlatXdpReadConfig(
Xdp->TxBufferCount = 4096;
Xdp->TxRingSize = 128;
Xdp->TxAlwaysPoke = FALSE;
Xdp->Cpu = (uint16_t)(CxPlatProcMaxCount() - 1);
//
// Read user-specified global config.
//
if (Config != NULL && Config->RawDataPathProcList != NULL) {
Xdp->Cpu = Config->RawDataPathProcList[0];
}
//
// Read config from config file.
@ -425,15 +432,6 @@ CxPlatXdpReadConfig(
fclose(File);
}
_IRQL_requires_max_(PASSIVE_LEVEL)
size_t
CxPlatDpRawGetDapathSize(
void
)
{
return sizeof(XDP_DATAPATH);
}
_IRQL_requires_max_(PASSIVE_LEVEL)
void
CxPlatDpRawInterfaceUninitialize(
@ -537,7 +535,7 @@ CxPlatDpRawInterfaceInitialize(
CxPlatZeroMemory(Interface->Queues, Interface->QueueCount * sizeof(*Interface->Queues));
for (uint32_t i = 0; i < Interface->QueueCount; i++) {
for (uint8_t i = 0; i < Interface->QueueCount; i++) {
XDP_QUEUE* Queue = &Interface->Queues[i];
Queue->Interface = Interface;
@ -742,6 +740,15 @@ CxPlatDpRawInterfaceInitialize(
}
}
//
// Add each queue to a worker (round robin).
//
for (uint8_t i = 0; i < Interface->QueueCount; i++) {
XdpWorkerAddQueue(
&Xdp->Workers[i % Xdp->WorkerCount],
&Interface->Queues[i]);
}
Error:
if (QUIC_FAILED(Status)) {
CxPlatDpRawInterfaceUninitialize(Interface);
@ -923,20 +930,36 @@ CxPlatDpRawInterfaceRemoveRules(
CxPlatLockRelease(&Interface->RuleLock);
}
_IRQL_requires_max_(PASSIVE_LEVEL)
size_t
CxPlatDpRawGetDatapathSize(
_In_opt_ const CXPLAT_DATAPATH_CONFIG* Config
)
{
const uint32_t WorkerCount =
(Config && Config->DataPathProcList) ? Config->DataPathProcListLength : 1;
return sizeof(XDP_DATAPATH) + (WorkerCount * sizeof(XDP_WORKER));
}
_IRQL_requires_max_(PASSIVE_LEVEL)
QUIC_STATUS
CxPlatDpRawInitialize(
_Inout_ CXPLAT_DATAPATH* Datapath,
_In_ uint32_t ClientRecvContextLength,
_In_opt_ CXPLAT_DATAPATH_CONFIG* Config
_In_opt_ const CXPLAT_DATAPATH_CONFIG* Config
)
{
XDP_DATAPATH* Xdp = (XDP_DATAPATH*)Datapath;
QUIC_STATUS Status;
CxPlatXdpReadConfig(Xdp, Config);
CxPlatDpRawGenerateCpuTable(Datapath);
uint16_t DefaultProc = (uint16_t)(CxPlatProcMaxCount() - 1);
const uint16_t* ProcList =
(Config && Config->DataPathProcList) ? Config->DataPathProcList : &DefaultProc;
CxPlatXdpReadConfig(Xdp);
CxPlatListInitializeHead(&Xdp->Interfaces);
Xdp->WorkerCount =
(Config && Config->DataPathProcList) ? Config->DataPathProcListLength : 1;
PIP_ADAPTER_ADDRESSES Adapters = NULL;
ULONG Error;
@ -1034,8 +1057,12 @@ CxPlatDpRawInitialize(
}
Xdp->Running = TRUE;
CxPlatEventInitialize(&Xdp->CompletionEvent, TRUE, FALSE);
CxPlatWorkerRegisterDataPath(Xdp->Cpu, Xdp);
for (uint32_t i = 0; i < Xdp->WorkerCount; i++) {
Xdp->Workers[i].Xdp = Xdp;
Xdp->Workers[i].ProcIndex = ProcList[i];
CxPlatEventInitialize(&Xdp->Workers[i].CompletionEvent, TRUE, FALSE);
CxPlatWorkerRegisterDataPath(ProcList[i], &Xdp->Workers[i]);
}
Status = QUIC_STATUS_SUCCESS;
Error:
@ -1057,8 +1084,10 @@ CxPlatDpRawUninitialize(
if (Xdp->Running) {
Xdp->Running = FALSE;
CxPlatEventWaitForever(Xdp->CompletionEvent);
CxPlatEventUninitialize(Xdp->CompletionEvent);
for (uint32_t i = 0; i < Xdp->WorkerCount; i++) {
CxPlatEventWaitForever(Xdp->Workers[i].CompletionEvent);
CxPlatEventUninitialize(Xdp->Workers[i].CompletionEvent);
}
}
while (!CxPlatListIsEmpty(&Xdp->Interfaces)) {
@ -1192,9 +1221,9 @@ CxPlatDpRawGetInterfaceFromQueue(
static
void
CxPlatXdpRx(
_In_ XDP_DATAPATH* Xdp,
_In_ uint8_t QueueId,
_In_ XDP_INTERFACE* Interface
_In_ const XDP_DATAPATH* Xdp,
_In_ XDP_QUEUE* Queue,
_In_ uint16_t ProcIndex
)
{
CXPLAT_RECV_DATA* Buffers[RX_BATCH_SIZE];
@ -1202,7 +1231,6 @@ CxPlatXdpRx(
uint32_t FillIndex;
uint32_t ProdCount = 0;
uint32_t PacketCount = 0;
XDP_QUEUE* Queue = &Interface->Queues[QueueId];
const uint32_t BuffersCount = XskRingConsumerReserve(&Queue->RxRing, RX_BATCH_SIZE, &RxIndex);
for (uint32_t i = 0; i < BuffersCount; i++) {
@ -1214,6 +1242,7 @@ CxPlatXdpRx(
CxPlatZeroMemory(Packet, sizeof(XDP_RX_PACKET));
Packet->Route = &Packet->RouteStorage;
Packet->RouteStorage.Queue = Queue;
Packet->PartitionIndex = ProcIndex;
CxPlatDpRawParseEthernet(
(CXPLAT_DATAPATH*)Xdp,
@ -1366,16 +1395,14 @@ CxPlatDpRawTxEnqueue(
static
void
CxPlatXdpTx(
_In_ XDP_DATAPATH* Xdp,
_In_ uint8_t QueueId,
_In_ XDP_INTERFACE* Interface
_In_ const XDP_DATAPATH* Xdp,
_In_ XDP_QUEUE* Queue
)
{
uint32_t ProdCount = 0;
uint32_t CompCount = 0;
SLIST_ENTRY* TxCompleteHead = NULL;
SLIST_ENTRY** TxCompleteTail = &TxCompleteHead;
XDP_QUEUE* Queue = &Interface->Queues[QueueId];
if (CxPlatListIsEmpty(&Queue->WorkerTxQueue) &&
ReadPointerNoFence(&Queue->TxQueue.Flink) != &Queue->TxQueue) {
@ -1451,23 +1478,22 @@ CxPlatDataPathRunEC(
_In_ uint32_t WaitTime
)
{
XDP_DATAPATH* Xdp = *(XDP_DATAPATH**)Context;
XDP_WORKER* Worker = *(XDP_WORKER**)Context;
const XDP_DATAPATH* Xdp = Worker->Xdp;
UNREFERENCED_PARAMETER(CurThreadId);
UNREFERENCED_PARAMETER(WaitTime);
if (!Xdp->Running) {
*Context = NULL;
CxPlatEventSet(Xdp->CompletionEvent);
CxPlatEventSet(Worker->CompletionEvent);
return;
}
CXPLAT_LIST_ENTRY* Entry;
for (Entry = Xdp->Interfaces.Flink; Entry != &Xdp->Interfaces; Entry = Entry->Flink) {
XDP_INTERFACE* Interface = CONTAINING_RECORD(Entry, XDP_INTERFACE, Link);
for (uint8_t QueueId = 0; QueueId < Interface->QueueCount; QueueId++) {
CxPlatXdpRx(Xdp, QueueId, Interface);
CxPlatXdpTx(Xdp, QueueId, Interface);
}
XDP_QUEUE* Queue = Worker->Queues;
while (Queue) {
CxPlatXdpRx(Xdp, Queue, Worker->ProcIndex);
CxPlatXdpTx(Xdp, Queue);
Queue = Queue->Next;
}
}