From 40d552f6ad011c6bce26de55dcb29ecc37abb5e3 Mon Sep 17 00:00:00 2001 From: Nick Banks Date: Thu, 29 Aug 2024 15:26:16 -0400 Subject: [PATCH] Prototype Prunable Pool Allocator --- src/generated/linux/platform_worker.c.clog.h | 22 ++++++ .../linux/platform_worker.c.clog.h.lttng.h | 19 +++++ src/inc/quic_platform.h | 34 ++++++++- src/manifest/clog.sidecar | 17 +++++ src/platform/datapath_winuser.c | 10 ++- src/platform/platform_internal.h | 2 +- src/platform/platform_worker.c | 71 +++++++++++++++++++ 7 files changed, 169 insertions(+), 6 deletions(-) diff --git a/src/generated/linux/platform_worker.c.clog.h b/src/generated/linux/platform_worker.c.clog.h index e2141b3aa0..533ae2a751 100644 --- a/src/generated/linux/platform_worker.c.clog.h +++ b/src/generated/linux/platform_worker.c.clog.h @@ -18,6 +18,10 @@ #define _clog_MACRO_QuicTraceLogInfo 1 #define QuicTraceLogInfo(a, ...) _clog_CAT(_clog_ARGN_SELECTOR(__VA_ARGS__), _clog_CAT(_,a(#a, __VA_ARGS__))) #endif +#ifndef _clog_MACRO_QuicTraceLogVerbose +#define _clog_MACRO_QuicTraceLogVerbose 1 +#define QuicTraceLogVerbose(a, ...) _clog_CAT(_clog_ARGN_SELECTOR(__VA_ARGS__), _clog_CAT(_,a(#a, __VA_ARGS__))) +#endif #ifndef _clog_MACRO_QuicTraceEvent #define _clog_MACRO_QuicTraceEvent 1 #define QuicTraceEvent(a, ...) _clog_CAT(_clog_ARGN_SELECTOR(__VA_ARGS__), _clog_CAT(_,a(#a, __VA_ARGS__))) @@ -61,6 +65,24 @@ tracepoint(CLOG_PLATFORM_WORKER_C, PlatformWorkerThreadStop , arg2);\ +/*---------------------------------------------------------- +// Decoder Ring for PlatformWorkerProcessPools +// [ lib][%p] Processing pools +// QuicTraceLogVerbose( + PlatformWorkerProcessPools, + "[ lib][%p] Processing pools", + Worker); +// arg2 = arg2 = Worker = arg2 +----------------------------------------------------------*/ +#ifndef _clog_3_ARGS_TRACE_PlatformWorkerProcessPools +#define _clog_3_ARGS_TRACE_PlatformWorkerProcessPools(uniqueId, encoded_arg_string, arg2)\ +tracepoint(CLOG_PLATFORM_WORKER_C, PlatformWorkerProcessPools , arg2);\ + +#endif + + + + /*---------------------------------------------------------- // Decoder Ring for AllocFailure // Allocation of '%s' failed. (%llu bytes) diff --git a/src/generated/linux/platform_worker.c.clog.h.lttng.h b/src/generated/linux/platform_worker.c.clog.h.lttng.h index 7f82dccbd4..db19ead56e 100644 --- a/src/generated/linux/platform_worker.c.clog.h.lttng.h +++ b/src/generated/linux/platform_worker.c.clog.h.lttng.h @@ -39,6 +39,25 @@ TRACEPOINT_EVENT(CLOG_PLATFORM_WORKER_C, PlatformWorkerThreadStop, +/*---------------------------------------------------------- +// Decoder Ring for PlatformWorkerProcessPools +// [ lib][%p] Processing pools +// QuicTraceLogVerbose( + PlatformWorkerProcessPools, + "[ lib][%p] Processing pools", + Worker); +// arg2 = arg2 = Worker = arg2 +----------------------------------------------------------*/ +TRACEPOINT_EVENT(CLOG_PLATFORM_WORKER_C, PlatformWorkerProcessPools, + TP_ARGS( + const void *, arg2), + TP_FIELDS( + ctf_integer_hex(uint64_t, arg2, (uint64_t)arg2) + ) +) + + + /*---------------------------------------------------------- // Decoder Ring for AllocFailure // Allocation of '%s' failed. (%llu bytes) diff --git a/src/inc/quic_platform.h b/src/inc/quic_platform.h index f7bb776fd4..ec69b62dee 100644 --- a/src/inc/quic_platform.h +++ b/src/inc/quic_platform.h @@ -431,13 +431,43 @@ typedef struct QUIC_EXECUTION_CONFIG QUIC_EXECUTION_CONFIG; typedef struct CXPLAT_EXECUTION_CONTEXT CXPLAT_EXECUTION_CONTEXT; typedef struct CXPLAT_EXECUTION_STATE { - uint64_t TimeNow; // in microseconds - uint64_t LastWorkTime; // in microseconds + uint64_t TimeNow; // in microseconds + uint64_t LastWorkTime; // in microseconds + uint64_t LastPoolProcessTime; // in microseconds uint32_t WaitTime; uint32_t NoWorkCount; CXPLAT_THREAD_ID ThreadID; } CXPLAT_EXECUTION_STATE; +// +// Supports more dynamic operations, but must be submitted to the platform worker +// to manage. +// +typedef struct CXPLAT_POOL_EX { +#ifdef __cplusplus + struct CXPLAT_POOL _; +#else + struct CXPLAT_POOL; +#endif + CXPLAT_LIST_ENTRY Link; + void* Owner; +} CXPLAT_POOL_EX; + +#ifndef _KERNEL_MODE // Not supported on kernel mode + +void +CxPlatAddDynamicPoolAllocator( + _Inout_ CXPLAT_POOL_EX* Pool, + _In_ uint16_t Index // Into the execution config processor array + ); + +void +CxPlatRemoveDynamicPoolAllocator( + _Inout_ CXPLAT_POOL_EX* Pool + ); + +#endif + // // Returns FALSE when it's time to cleanup. // diff --git a/src/manifest/clog.sidecar b/src/manifest/clog.sidecar index 226b674417..e32b9e39f5 100644 --- a/src/manifest/clog.sidecar +++ b/src/manifest/clog.sidecar @@ -8745,6 +8745,18 @@ "splitArgs": [], "macroName": "QuicTraceLogWarning" }, + "PlatformWorkerProcessPools": { + "ModuleProperites": {}, + "TraceString": "[ lib][%p] Processing pools", + "UniqueId": "PlatformWorkerProcessPools", + "splitArgs": [ + { + "DefinationEncoding": "p", + "MacroVariableName": "arg2" + } + ], + "macroName": "QuicTraceLogVerbose" + }, "PlatformWorkerThreadStart": { "ModuleProperites": {}, "TraceString": "[ lib][%p] Worker start", @@ -15936,6 +15948,11 @@ "TraceID": "PlatformThreadCreateFailed", "EncodingString": "[ lib] pthread_create failed, retrying without affinitization" }, + { + "UniquenessHash": "3f46cbc3-c609-dab4-07c9-fbe956e68f5c", + "TraceID": "PlatformWorkerProcessPools", + "EncodingString": "[ lib][%p] Processing pools" + }, { "UniquenessHash": "5c3773e2-ef60-26b9-4b3c-d433ca2656df", "TraceID": "PlatformWorkerThreadStart", diff --git a/src/platform/datapath_winuser.c b/src/platform/datapath_winuser.c index cb1d9f5619..357700af68 100644 --- a/src/platform/datapath_winuser.c +++ b/src/platform/datapath_winuser.c @@ -957,7 +957,10 @@ DataPathInitialize( FALSE, RecvDatagramLength, QUIC_POOL_DATA, - &Datapath->Partitions[i].RecvDatagramPool); + (CXPLAT_POOL*)&Datapath->Partitions[i].RecvDatagramPool); + CxPlatAddDynamicPoolAllocator( + &Datapath->Partitions[i].RecvDatagramPool, + i); CxPlatPoolInitializeEx( FALSE, @@ -1020,7 +1023,8 @@ CxPlatProcessorContextRelease( CxPlatPoolUninitialize(&DatapathProc->LargeSendBufferPool); CxPlatPoolUninitialize(&DatapathProc->RioSendBufferPool); CxPlatPoolUninitialize(&DatapathProc->RioLargeSendBufferPool); - CxPlatPoolUninitialize(&DatapathProc->RecvDatagramPool); + CxPlatRemoveDynamicPoolAllocator(&DatapathProc->RecvDatagramPool); + CxPlatPoolUninitialize((CXPLAT_POOL*)&DatapathProc->RecvDatagramPool); CxPlatPoolUninitialize(&DatapathProc->RioRecvPool); CxPlatDataPathRelease(DatapathProc->Datapath); } @@ -2465,7 +2469,7 @@ CxPlatSocketAllocRxIoBlock( if (SocketProc->Parent->UseRio) { OwningPool = &DatapathProc->RioRecvPool; } else { - OwningPool = &DatapathProc->RecvDatagramPool; + OwningPool = (CXPLAT_POOL*)&DatapathProc->RecvDatagramPool; } IoBlock = CxPlatPoolAlloc(OwningPool); diff --git a/src/platform/platform_internal.h b/src/platform/platform_internal.h index 6e4675eb14..a99ac6d776 100644 --- a/src/platform/platform_internal.h +++ b/src/platform/platform_internal.h @@ -260,7 +260,7 @@ typedef struct QUIC_CACHEALIGN CXPLAT_DATAPATH_PROC { // Pool of receive datagram contexts and buffers to be shared by all sockets // on this core. // - CXPLAT_POOL RecvDatagramPool; + CXPLAT_POOL_EX RecvDatagramPool; // // Pool of RIO receive datagram contexts and buffers to be shared by all diff --git a/src/platform/platform_worker.c b/src/platform/platform_worker.c index 9ea1ac620c..6343a8a19e 100644 --- a/src/platform/platform_worker.c +++ b/src/platform/platform_worker.c @@ -52,6 +52,11 @@ typedef struct QUIC_CACHEALIGN CXPLAT_WORKER { // CXPLAT_LOCK ECLock; + // + // List of dynamic pools to manage. + // + CXPLAT_LIST_ENTRY DynamicPoolList; + // // Execution contexts that are waiting to be added to CXPLAT_WORKER::ExecutionContexts. // @@ -160,6 +165,7 @@ CxPlatWorkersLazyStart( CxPlatZeroMemory(CxPlatWorkers, WorkersSize); for (uint32_t i = 0; i < CxPlatWorkerCount; ++i) { CxPlatLockInitialize(&CxPlatWorkers[i].ECLock); + CxPlatListInitializeHead(&CxPlatWorkers[i].DynamicPoolList); CxPlatWorkers[i].InitializedECLock = TRUE; CxPlatWorkers[i].IdealProcessor = ProcessorList ? ProcessorList[i] : (uint16_t)i; CXPLAT_DBG_ASSERT(CxPlatWorkers[i].IdealProcessor < CxPlatProcCount()); @@ -288,6 +294,7 @@ CxPlatWorkersUninit( CxPlatSqeCleanup(&CxPlatWorkers[i].EventQ, &CxPlatWorkers[i].ShutdownSqe); #endif // CXPLAT_SQE_INIT CxPlatEventQCleanup(&CxPlatWorkers[i].EventQ); + CXPLAT_DBG_ASSERT(CxPlatListIsEmpty(&CxPlatWorkers[i].DynamicPoolList)); CxPlatLockUninitialize(&CxPlatWorkers[i].ECLock); } @@ -300,6 +307,65 @@ CxPlatWorkersUninit( CxPlatLockUninitialize(&CxPlatWorkerLock); } +#define DYNAMIC_POOL_PROCESSING_TIME 1000000 // 1 second +#define DYNAMIC_POOL_PRUNE_COUNT 8 + +void +CxPlatAddDynamicPoolAllocator( + _Inout_ CXPLAT_POOL_EX* Pool, + _In_ uint16_t Index // Into the execution config processor array + ) +{ + CXPLAT_WORKER* Worker = &CxPlatWorkers[Index]; + Pool->Owner = Worker; + CxPlatLockAcquire(&Worker->ECLock); + CxPlatListInsertTail(&CxPlatWorkers[Index].DynamicPoolList, &Pool->Link); + CxPlatLockRelease(&Worker->ECLock); +} + +void +CxPlatRemoveDynamicPoolAllocator( + _Inout_ CXPLAT_POOL_EX* Pool + ) +{ + CXPLAT_WORKER* Worker = (CXPLAT_WORKER*)Pool->Owner; + CxPlatLockAcquire(&Worker->ECLock); + CxPlatListEntryRemove(&Pool->Link); + CxPlatLockRelease(&Worker->ECLock); +} + +void +CxPlatProcessDynamicPoolAllocator( + _Inout_ CXPLAT_POOL_EX* Pool + ) +{ + for (uint32_t i = 0; i < DYNAMIC_POOL_PRUNE_COUNT; ++i) { + void* Entry = InterlockedPopEntrySList(&Pool->ListHead); + if (!Entry) break; + Pool->Free(Entry, Pool->Tag, (CXPLAT_POOL*)Pool); + } +} + +void +CxPlatProcessDynamicPoolAllocators( + _In_ CXPLAT_WORKER* Worker + ) +{ + QuicTraceLogVerbose( + PlatformWorkerProcessPools, + "[ lib][%p] Processing pools", + Worker); + + CxPlatLockAcquire(&Worker->ECLock); + CXPLAT_LIST_ENTRY* Entry = Worker->DynamicPoolList.Flink; + while (Entry != &Worker->DynamicPoolList) { + CXPLAT_POOL_EX* Pool = CXPLAT_CONTAINING_RECORD(Entry, CXPLAT_POOL_EX, Link); + Entry = Entry->Flink; + CxPlatProcessDynamicPoolAllocator(Pool); + } + CxPlatLockRelease(&Worker->ECLock); +} + CXPLAT_EVENTQ* CxPlatWorkerGetEventQ( _In_ uint16_t Index @@ -504,6 +570,11 @@ CXPLAT_THREAD_CALLBACK(CxPlatWorkerThread, Context) CxPlatSchedulerYield(); State.NoWorkCount = 0; } + + if (State.TimeNow - State.LastPoolProcessTime > DYNAMIC_POOL_PROCESSING_TIME) { + CxPlatProcessDynamicPoolAllocators(Worker); + State.LastPoolProcessTime = State.TimeNow; + } } Shutdown: