Skip to content

Commit

Permalink
Let's try this instead
Browse files Browse the repository at this point in the history
  • Loading branch information
nibanks committed Aug 29, 2024
1 parent 1fc88e5 commit ca5d8e6
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 35 deletions.
5 changes: 4 additions & 1 deletion src/platform/datapath_raw_xdp.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ typedef struct QUIC_CACHEALIGN XDP_PARTITION {
CXPLAT_EVENTQ* EventQ;
XDP_QUEUE* Queues; // A linked list of queues, accessed by Next.
uint16_t PartitionIndex;
uint16_t Processor;
} XDP_PARTITION;

void XdpWorkerAddQueue(_In_ XDP_PARTITION* Partition, _In_ XDP_QUEUE* Queue) {
Expand All @@ -78,7 +79,9 @@ CxPlatDpRawAssignQueue(
)
{
const XDP_INTERFACE_COMMON* Interface = (const XDP_INTERFACE_COMMON*)_Interface;
Route->Queue = &((XDP_QUEUE_COMMON*)Interface->Queues)[0];
XDP_QUEUE_COMMON* Queues = (XDP_QUEUE_COMMON*)Interface->Queues;
CXPLAT_FRE_ASSERT(Queues[0].Partition != NULL); // What if there was no partition?
Route->Queue = &Queues[0]; // TODO - Can we do better than just the first queue?
}

_IRQL_requires_max_(DISPATCH_LEVEL)
Expand Down
8 changes: 7 additions & 1 deletion src/platform/datapath_raw_xdp_linux.c
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ static uint64_t XskUmemFrameAlloc(struct XskSocketInfo *Xsk)
if (Xsk->UmemFrameFree == 0) {
QuicTraceLogVerbose(
XdpUmemAllocFails,
"[ xdp][umem] Out of UMEM frame, OOM");
"[ xdp][umem] Out of UMEM frame, OOM");
return INVALID_UMEM_FRAME;
}
Frame = Xsk->UmemFrameAddr[--Xsk->UmemFrameFree];
Expand Down Expand Up @@ -684,8 +684,14 @@ CxPlatDpRawInitialize(

if (Config && Config->ProcessorCount) {
Xdp->PartitionCount = Config->ProcessorCount;
for (uint32_t i = 0; i < Xdp->PartitionCount; i++) {
Xdp->Partitions[i].Processor = Config->ProcessorList[i];
}
} else {
Xdp->PartitionCount = CxPlatProcCount();
for (uint32_t i = 0; i < Xdp->PartitionCount; i++) {
Xdp->Partitions[i].Processor = (uint16_t)i;
}
}

QuicTraceLogVerbose(
Expand Down
61 changes: 28 additions & 33 deletions src/platform/datapath_raw_xdp_win.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ typedef struct XDP_INTERFACE {

typedef struct XDP_QUEUE {
XDP_QUEUE_COMMON;
uint16_t RssProcessor;
uint8_t* RxBuffers;
HANDLE RxXsk;
DATAPATH_XDP_IO_SQE RxIoSqe;
Expand Down Expand Up @@ -420,6 +421,7 @@ CxPlatDpRawInterfaceInitialize(
for (uint8_t i = 0; i < Interface->QueueCount; i++) {
XDP_QUEUE* Queue = &Interface->Queues[i];

Queue->RssProcessor = (uint16_t)Processors[i]; // TODO - Should memory be aligned with this?
Queue->Interface = Interface;
InitializeSListHead(&Queue->RxPool);
InitializeSListHead(&Queue->TxPool);
Expand All @@ -431,23 +433,6 @@ CxPlatDpRawInterfaceInitialize(
CxPlatDatapathSqeInitialize(&Queue->TxIoSqe.DatapathSqe, CXPLAT_CQE_TYPE_SOCKET_IO);
Queue->TxIoSqe.IoType = DATAPATH_XDP_IO_SEND;

uint32_t QueueIndex = UINT32_MAX;
for (uint32_t j = 0; j < Interface->QueueCount; j++) {
if (Processors[j] == i) {
QueueIndex = j;
break;
}
}
if (QueueIndex == UINT32_MAX) {
Status = QUIC_STATUS_INVALID_STATE;
QuicTraceEvent(
LibraryErrorStatus,
"[ lib] ERROR, %u, %s.",
Status,
"QueueIndex");
goto Error;
}

//
// RX datapath.
//
Expand Down Expand Up @@ -515,7 +500,7 @@ CxPlatDpRawInterfaceInitialize(
}

uint32_t Flags = XSK_BIND_FLAG_RX;
Status = Xdp->XdpApi->XskBind(Queue->RxXsk, Interface->ActualIfIndex, QueueIndex, Flags);
Status = Xdp->XdpApi->XskBind(Queue->RxXsk, Interface->ActualIfIndex, i, Flags);
if (QUIC_FAILED(Status)) {
QuicTraceEvent(
LibraryErrorStatus,
Expand Down Expand Up @@ -637,7 +622,7 @@ CxPlatDpRawInterfaceInitialize(
}

Flags = XSK_BIND_FLAG_TX; // TODO: support native/generic forced flags.
Status = Xdp->XdpApi->XskBind(Queue->TxXsk, Interface->ActualIfIndex, QueueIndex, Flags);
Status = Xdp->XdpApi->XskBind(Queue->TxXsk, Interface->ActualIfIndex, i, Flags);
if (QUIC_FAILED(Status)) {
QuicTraceEvent(
LibraryErrorStatus,
Expand Down Expand Up @@ -694,10 +679,20 @@ CxPlatDpRawInterfaceInitialize(
}

//
// Add each queue to a partition (round robin).
// Add each queue to the correct partition.
//
for (uint8_t i = 0; i < Interface->QueueCount; i++) {
XdpWorkerAddQueue(&Xdp->Partitions[i % Xdp->PartitionCount], &Interface->Queues[i]);
for (uint16_t i = 0; i < Interface->QueueCount; i++) {
BOOLEAN Found = FALSE;
for (uint16_t j = 0; j < Xdp->PartitionCount; j++) {
if (Xdp->Partitions[j].Processor == Interface->Queues[i].RssProcessor) {
XdpWorkerAddQueue(&Xdp->Partitions[j], &Interface->Queues[i]);
Found = TRUE;
break;
}
}
if (!Found) {
CXPLAT_FRE_ASSERT(FALSE); // TODO - What do we do if there is no partition for this processor?
}
}

Error:
Expand Down Expand Up @@ -915,8 +910,14 @@ CxPlatDpRawInitialize(

if (Config && Config->ProcessorCount) {
Xdp->PartitionCount = Config->ProcessorCount;
for (uint32_t i = 0; i < Xdp->PartitionCount; i++) {
Xdp->Partitions[i].Processor = Config->ProcessorList[i];
}
} else {
Xdp->PartitionCount = CxPlatProcCount();
for (uint32_t i = 0; i < Xdp->PartitionCount; i++) {
Xdp->Partitions[i].Processor = (uint16_t)i;
}
}

QuicTraceLogVerbose(
Expand Down Expand Up @@ -1050,15 +1051,7 @@ CxPlatDpRawInitialize(
for (uint32_t i = 0; i < Xdp->PartitionCount; i++) {

XDP_PARTITION* Partition = &Xdp->Partitions[i];
if (Partition->Queues == NULL) {
//
// Because queues are assigned in a round-robin manner, subsequent
// partitions will not have a queue assigned. Stop the loop and update
// partition count.
//
Xdp->PartitionCount = i;
break;
}
if (Partition->Queues == NULL) { continue; } // No RSS queues for this partition.

Partition->Xdp = Xdp;
Partition->PartitionIndex = (uint16_t)i;
Expand Down Expand Up @@ -1164,8 +1157,10 @@ CxPlatDpRawUninitialize(
Xdp);
Xdp->Running = FALSE;
for (uint32_t i = 0; i < Xdp->PartitionCount; i++) {
Xdp->Partitions[i].Ec.Ready = TRUE;
CxPlatWakeExecutionContext(&Xdp->Partitions[i].Ec);
if (Xdp->Partitions[i].Queues != NULL) {
Xdp->Partitions[i].Ec.Ready = TRUE;
CxPlatWakeExecutionContext(&Xdp->Partitions[i].Ec);
}
}
CxPlatDpRawRelease(Xdp);
}
Expand Down

0 comments on commit ca5d8e6

Please sign in to comment.