From ca5d8e6142ef8f8ffd593bf809bd1a6a9d1c037e Mon Sep 17 00:00:00 2001 From: Nick Banks Date: Thu, 29 Aug 2024 11:55:54 -0400 Subject: [PATCH] Let's try this instead --- src/platform/datapath_raw_xdp.h | 5 ++- src/platform/datapath_raw_xdp_linux.c | 8 +++- src/platform/datapath_raw_xdp_win.c | 61 ++++++++++++--------------- 3 files changed, 39 insertions(+), 35 deletions(-) diff --git a/src/platform/datapath_raw_xdp.h b/src/platform/datapath_raw_xdp.h index f51080f465..d264ce5d4a 100644 --- a/src/platform/datapath_raw_xdp.h +++ b/src/platform/datapath_raw_xdp.h @@ -57,6 +57,7 @@ typedef struct QUIC_CACHEALIGN XDP_PARTITION { CXPLAT_EVENTQ* EventQ; XDP_QUEUE* Queues; // A linked list of queues, accessed by Next. uint16_t PartitionIndex; + uint16_t Processor; } XDP_PARTITION; void XdpWorkerAddQueue(_In_ XDP_PARTITION* Partition, _In_ XDP_QUEUE* Queue) { @@ -78,7 +79,9 @@ CxPlatDpRawAssignQueue( ) { const XDP_INTERFACE_COMMON* Interface = (const XDP_INTERFACE_COMMON*)_Interface; - Route->Queue = &((XDP_QUEUE_COMMON*)Interface->Queues)[0]; + XDP_QUEUE_COMMON* Queues = (XDP_QUEUE_COMMON*)Interface->Queues; + CXPLAT_FRE_ASSERT(Queues[0].Partition != NULL); // What if there was no partition? + Route->Queue = &Queues[0]; // TODO - Can we do better than just the first queue? } _IRQL_requires_max_(DISPATCH_LEVEL) diff --git a/src/platform/datapath_raw_xdp_linux.c b/src/platform/datapath_raw_xdp_linux.c index eaef116164..96fb14724a 100644 --- a/src/platform/datapath_raw_xdp_linux.c +++ b/src/platform/datapath_raw_xdp_linux.c @@ -332,7 +332,7 @@ static uint64_t XskUmemFrameAlloc(struct XskSocketInfo *Xsk) if (Xsk->UmemFrameFree == 0) { QuicTraceLogVerbose( XdpUmemAllocFails, - "[ xdp][umem] Out of UMEM frame, OOM"); + "[ xdp][umem] Out of UMEM frame, OOM"); return INVALID_UMEM_FRAME; } Frame = Xsk->UmemFrameAddr[--Xsk->UmemFrameFree]; @@ -684,8 +684,14 @@ CxPlatDpRawInitialize( if (Config && Config->ProcessorCount) { Xdp->PartitionCount = Config->ProcessorCount; + for (uint32_t i = 0; i < Xdp->PartitionCount; i++) { + Xdp->Partitions[i].Processor = Config->ProcessorList[i]; + } } else { Xdp->PartitionCount = CxPlatProcCount(); + for (uint32_t i = 0; i < Xdp->PartitionCount; i++) { + Xdp->Partitions[i].Processor = (uint16_t)i; + } } QuicTraceLogVerbose( diff --git a/src/platform/datapath_raw_xdp_win.c b/src/platform/datapath_raw_xdp_win.c index d2f38d3cfd..b0c25366b5 100644 --- a/src/platform/datapath_raw_xdp_win.c +++ b/src/platform/datapath_raw_xdp_win.c @@ -57,6 +57,7 @@ typedef struct XDP_INTERFACE { typedef struct XDP_QUEUE { XDP_QUEUE_COMMON; + uint16_t RssProcessor; uint8_t* RxBuffers; HANDLE RxXsk; DATAPATH_XDP_IO_SQE RxIoSqe; @@ -420,6 +421,7 @@ CxPlatDpRawInterfaceInitialize( for (uint8_t i = 0; i < Interface->QueueCount; i++) { XDP_QUEUE* Queue = &Interface->Queues[i]; + Queue->RssProcessor = (uint16_t)Processors[i]; // TODO - Should memory be aligned with this? Queue->Interface = Interface; InitializeSListHead(&Queue->RxPool); InitializeSListHead(&Queue->TxPool); @@ -431,23 +433,6 @@ CxPlatDpRawInterfaceInitialize( CxPlatDatapathSqeInitialize(&Queue->TxIoSqe.DatapathSqe, CXPLAT_CQE_TYPE_SOCKET_IO); Queue->TxIoSqe.IoType = DATAPATH_XDP_IO_SEND; - uint32_t QueueIndex = UINT32_MAX; - for (uint32_t j = 0; j < Interface->QueueCount; j++) { - if (Processors[j] == i) { - QueueIndex = j; - break; - } - } - if (QueueIndex == UINT32_MAX) { - Status = QUIC_STATUS_INVALID_STATE; - QuicTraceEvent( - LibraryErrorStatus, - "[ lib] ERROR, %u, %s.", - Status, - "QueueIndex"); - goto Error; - } - // // RX datapath. // @@ -515,7 +500,7 @@ CxPlatDpRawInterfaceInitialize( } uint32_t Flags = XSK_BIND_FLAG_RX; - Status = Xdp->XdpApi->XskBind(Queue->RxXsk, Interface->ActualIfIndex, QueueIndex, Flags); + Status = Xdp->XdpApi->XskBind(Queue->RxXsk, Interface->ActualIfIndex, i, Flags); if (QUIC_FAILED(Status)) { QuicTraceEvent( LibraryErrorStatus, @@ -637,7 +622,7 @@ CxPlatDpRawInterfaceInitialize( } Flags = XSK_BIND_FLAG_TX; // TODO: support native/generic forced flags. - Status = Xdp->XdpApi->XskBind(Queue->TxXsk, Interface->ActualIfIndex, QueueIndex, Flags); + Status = Xdp->XdpApi->XskBind(Queue->TxXsk, Interface->ActualIfIndex, i, Flags); if (QUIC_FAILED(Status)) { QuicTraceEvent( LibraryErrorStatus, @@ -694,10 +679,20 @@ CxPlatDpRawInterfaceInitialize( } // - // Add each queue to a partition (round robin). + // Add each queue to the correct partition. // - for (uint8_t i = 0; i < Interface->QueueCount; i++) { - XdpWorkerAddQueue(&Xdp->Partitions[i % Xdp->PartitionCount], &Interface->Queues[i]); + for (uint16_t i = 0; i < Interface->QueueCount; i++) { + BOOLEAN Found = FALSE; + for (uint16_t j = 0; j < Xdp->PartitionCount; j++) { + if (Xdp->Partitions[j].Processor == Interface->Queues[i].RssProcessor) { + XdpWorkerAddQueue(&Xdp->Partitions[j], &Interface->Queues[i]); + Found = TRUE; + break; + } + } + if (!Found) { + CXPLAT_FRE_ASSERT(FALSE); // TODO - What do we do if there is no partition for this processor? + } } Error: @@ -915,8 +910,14 @@ CxPlatDpRawInitialize( if (Config && Config->ProcessorCount) { Xdp->PartitionCount = Config->ProcessorCount; + for (uint32_t i = 0; i < Xdp->PartitionCount; i++) { + Xdp->Partitions[i].Processor = Config->ProcessorList[i]; + } } else { Xdp->PartitionCount = CxPlatProcCount(); + for (uint32_t i = 0; i < Xdp->PartitionCount; i++) { + Xdp->Partitions[i].Processor = (uint16_t)i; + } } QuicTraceLogVerbose( @@ -1050,15 +1051,7 @@ CxPlatDpRawInitialize( for (uint32_t i = 0; i < Xdp->PartitionCount; i++) { XDP_PARTITION* Partition = &Xdp->Partitions[i]; - if (Partition->Queues == NULL) { - // - // Because queues are assigned in a round-robin manner, subsequent - // partitions will not have a queue assigned. Stop the loop and update - // partition count. - // - Xdp->PartitionCount = i; - break; - } + if (Partition->Queues == NULL) { continue; } // No RSS queues for this partition. Partition->Xdp = Xdp; Partition->PartitionIndex = (uint16_t)i; @@ -1164,8 +1157,10 @@ CxPlatDpRawUninitialize( Xdp); Xdp->Running = FALSE; for (uint32_t i = 0; i < Xdp->PartitionCount; i++) { - Xdp->Partitions[i].Ec.Ready = TRUE; - CxPlatWakeExecutionContext(&Xdp->Partitions[i].Ec); + if (Xdp->Partitions[i].Queues != NULL) { + Xdp->Partitions[i].Ec.Ready = TRUE; + CxPlatWakeExecutionContext(&Xdp->Partitions[i].Ec); + } } CxPlatDpRawRelease(Xdp); }