Skip to content

Commit

Permalink
Prototype Prunable Pool Allocator
Browse files Browse the repository at this point in the history
  • Loading branch information
nibanks committed Aug 29, 2024
1 parent c6e28b0 commit 40d552f
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 6 deletions.
22 changes: 22 additions & 0 deletions src/generated/linux/platform_worker.c.clog.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
#define _clog_MACRO_QuicTraceLogInfo 1
#define QuicTraceLogInfo(a, ...) _clog_CAT(_clog_ARGN_SELECTOR(__VA_ARGS__), _clog_CAT(_,a(#a, __VA_ARGS__)))
#endif
#ifndef _clog_MACRO_QuicTraceLogVerbose
#define _clog_MACRO_QuicTraceLogVerbose 1
#define QuicTraceLogVerbose(a, ...) _clog_CAT(_clog_ARGN_SELECTOR(__VA_ARGS__), _clog_CAT(_,a(#a, __VA_ARGS__)))
#endif
#ifndef _clog_MACRO_QuicTraceEvent
#define _clog_MACRO_QuicTraceEvent 1
#define QuicTraceEvent(a, ...) _clog_CAT(_clog_ARGN_SELECTOR(__VA_ARGS__), _clog_CAT(_,a(#a, __VA_ARGS__)))
Expand Down Expand Up @@ -61,6 +65,24 @@ tracepoint(CLOG_PLATFORM_WORKER_C, PlatformWorkerThreadStop , arg2);\



/*----------------------------------------------------------
// Decoder Ring for PlatformWorkerProcessPools
// [ lib][%p] Processing pools
// QuicTraceLogVerbose(
PlatformWorkerProcessPools,
"[ lib][%p] Processing pools",
Worker);
// arg2 = arg2 = Worker = arg2
----------------------------------------------------------*/
#ifndef _clog_3_ARGS_TRACE_PlatformWorkerProcessPools
#define _clog_3_ARGS_TRACE_PlatformWorkerProcessPools(uniqueId, encoded_arg_string, arg2)\
tracepoint(CLOG_PLATFORM_WORKER_C, PlatformWorkerProcessPools , arg2);\

#endif




/*----------------------------------------------------------
// Decoder Ring for AllocFailure
// Allocation of '%s' failed. (%llu bytes)
Expand Down
19 changes: 19 additions & 0 deletions src/generated/linux/platform_worker.c.clog.h.lttng.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,25 @@ TRACEPOINT_EVENT(CLOG_PLATFORM_WORKER_C, PlatformWorkerThreadStop,



/*----------------------------------------------------------
// Decoder Ring for PlatformWorkerProcessPools
// [ lib][%p] Processing pools
// QuicTraceLogVerbose(
PlatformWorkerProcessPools,
"[ lib][%p] Processing pools",
Worker);
// arg2 = arg2 = Worker = arg2
----------------------------------------------------------*/
TRACEPOINT_EVENT(CLOG_PLATFORM_WORKER_C, PlatformWorkerProcessPools,
TP_ARGS(
const void *, arg2),
TP_FIELDS(
ctf_integer_hex(uint64_t, arg2, (uint64_t)arg2)
)
)



/*----------------------------------------------------------
// Decoder Ring for AllocFailure
// Allocation of '%s' failed. (%llu bytes)
Expand Down
34 changes: 32 additions & 2 deletions src/inc/quic_platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -431,13 +431,43 @@ typedef struct QUIC_EXECUTION_CONFIG QUIC_EXECUTION_CONFIG;
typedef struct CXPLAT_EXECUTION_CONTEXT CXPLAT_EXECUTION_CONTEXT;

typedef struct CXPLAT_EXECUTION_STATE {
uint64_t TimeNow; // in microseconds
uint64_t LastWorkTime; // in microseconds
uint64_t TimeNow; // in microseconds
uint64_t LastWorkTime; // in microseconds
uint64_t LastPoolProcessTime; // in microseconds
uint32_t WaitTime;
uint32_t NoWorkCount;
CXPLAT_THREAD_ID ThreadID;
} CXPLAT_EXECUTION_STATE;

//
// Supports more dynamic operations, but must be submitted to the platform worker
// to manage.
//
typedef struct CXPLAT_POOL_EX {
#ifdef __cplusplus
struct CXPLAT_POOL _;
#else
struct CXPLAT_POOL;
#endif
CXPLAT_LIST_ENTRY Link;
void* Owner;
} CXPLAT_POOL_EX;

#ifndef _KERNEL_MODE // Not supported on kernel mode

void
CxPlatAddDynamicPoolAllocator(
_Inout_ CXPLAT_POOL_EX* Pool,
_In_ uint16_t Index // Into the execution config processor array
);

void
CxPlatRemoveDynamicPoolAllocator(
_Inout_ CXPLAT_POOL_EX* Pool
);

#endif

//
// Returns FALSE when it's time to cleanup.
//
Expand Down
17 changes: 17 additions & 0 deletions src/manifest/clog.sidecar
Original file line number Diff line number Diff line change
Expand Up @@ -8745,6 +8745,18 @@
"splitArgs": [],
"macroName": "QuicTraceLogWarning"
},
"PlatformWorkerProcessPools": {
"ModuleProperites": {},
"TraceString": "[ lib][%p] Processing pools",
"UniqueId": "PlatformWorkerProcessPools",
"splitArgs": [
{
"DefinationEncoding": "p",
"MacroVariableName": "arg2"
}
],
"macroName": "QuicTraceLogVerbose"
},
"PlatformWorkerThreadStart": {
"ModuleProperites": {},
"TraceString": "[ lib][%p] Worker start",
Expand Down Expand Up @@ -15936,6 +15948,11 @@
"TraceID": "PlatformThreadCreateFailed",
"EncodingString": "[ lib] pthread_create failed, retrying without affinitization"
},
{
"UniquenessHash": "3f46cbc3-c609-dab4-07c9-fbe956e68f5c",
"TraceID": "PlatformWorkerProcessPools",
"EncodingString": "[ lib][%p] Processing pools"
},
{
"UniquenessHash": "5c3773e2-ef60-26b9-4b3c-d433ca2656df",
"TraceID": "PlatformWorkerThreadStart",
Expand Down
10 changes: 7 additions & 3 deletions src/platform/datapath_winuser.c
Original file line number Diff line number Diff line change
Expand Up @@ -957,7 +957,10 @@ DataPathInitialize(
FALSE,
RecvDatagramLength,
QUIC_POOL_DATA,
&Datapath->Partitions[i].RecvDatagramPool);
(CXPLAT_POOL*)&Datapath->Partitions[i].RecvDatagramPool);
CxPlatAddDynamicPoolAllocator(
&Datapath->Partitions[i].RecvDatagramPool,
i);

CxPlatPoolInitializeEx(
FALSE,
Expand Down Expand Up @@ -1020,7 +1023,8 @@ CxPlatProcessorContextRelease(
CxPlatPoolUninitialize(&DatapathProc->LargeSendBufferPool);
CxPlatPoolUninitialize(&DatapathProc->RioSendBufferPool);
CxPlatPoolUninitialize(&DatapathProc->RioLargeSendBufferPool);
CxPlatPoolUninitialize(&DatapathProc->RecvDatagramPool);
CxPlatRemoveDynamicPoolAllocator(&DatapathProc->RecvDatagramPool);
CxPlatPoolUninitialize((CXPLAT_POOL*)&DatapathProc->RecvDatagramPool);
CxPlatPoolUninitialize(&DatapathProc->RioRecvPool);
CxPlatDataPathRelease(DatapathProc->Datapath);
}
Expand Down Expand Up @@ -2465,7 +2469,7 @@ CxPlatSocketAllocRxIoBlock(
if (SocketProc->Parent->UseRio) {
OwningPool = &DatapathProc->RioRecvPool;
} else {
OwningPool = &DatapathProc->RecvDatagramPool;
OwningPool = (CXPLAT_POOL*)&DatapathProc->RecvDatagramPool;
}

IoBlock = CxPlatPoolAlloc(OwningPool);
Expand Down
2 changes: 1 addition & 1 deletion src/platform/platform_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ typedef struct QUIC_CACHEALIGN CXPLAT_DATAPATH_PROC {
// Pool of receive datagram contexts and buffers to be shared by all sockets
// on this core.
//
CXPLAT_POOL RecvDatagramPool;
CXPLAT_POOL_EX RecvDatagramPool;

//
// Pool of RIO receive datagram contexts and buffers to be shared by all
Expand Down
71 changes: 71 additions & 0 deletions src/platform/platform_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ typedef struct QUIC_CACHEALIGN CXPLAT_WORKER {
//
CXPLAT_LOCK ECLock;

//
// List of dynamic pools to manage.
//
CXPLAT_LIST_ENTRY DynamicPoolList;

//
// Execution contexts that are waiting to be added to CXPLAT_WORKER::ExecutionContexts.
//
Expand Down Expand Up @@ -160,6 +165,7 @@ CxPlatWorkersLazyStart(
CxPlatZeroMemory(CxPlatWorkers, WorkersSize);
for (uint32_t i = 0; i < CxPlatWorkerCount; ++i) {
CxPlatLockInitialize(&CxPlatWorkers[i].ECLock);
CxPlatListInitializeHead(&CxPlatWorkers[i].DynamicPoolList);
CxPlatWorkers[i].InitializedECLock = TRUE;
CxPlatWorkers[i].IdealProcessor = ProcessorList ? ProcessorList[i] : (uint16_t)i;
CXPLAT_DBG_ASSERT(CxPlatWorkers[i].IdealProcessor < CxPlatProcCount());
Expand Down Expand Up @@ -288,6 +294,7 @@ CxPlatWorkersUninit(
CxPlatSqeCleanup(&CxPlatWorkers[i].EventQ, &CxPlatWorkers[i].ShutdownSqe);
#endif // CXPLAT_SQE_INIT
CxPlatEventQCleanup(&CxPlatWorkers[i].EventQ);
CXPLAT_DBG_ASSERT(CxPlatListIsEmpty(&CxPlatWorkers[i].DynamicPoolList));
CxPlatLockUninitialize(&CxPlatWorkers[i].ECLock);
}

Expand All @@ -300,6 +307,65 @@ CxPlatWorkersUninit(
CxPlatLockUninitialize(&CxPlatWorkerLock);
}

#define DYNAMIC_POOL_PROCESSING_TIME 1000000 // 1 second
#define DYNAMIC_POOL_PRUNE_COUNT 8

void
CxPlatAddDynamicPoolAllocator(
_Inout_ CXPLAT_POOL_EX* Pool,
_In_ uint16_t Index // Into the execution config processor array
)
{
CXPLAT_WORKER* Worker = &CxPlatWorkers[Index];
Pool->Owner = Worker;
CxPlatLockAcquire(&Worker->ECLock);
CxPlatListInsertTail(&CxPlatWorkers[Index].DynamicPoolList, &Pool->Link);
CxPlatLockRelease(&Worker->ECLock);
}

void
CxPlatRemoveDynamicPoolAllocator(
_Inout_ CXPLAT_POOL_EX* Pool
)
{
CXPLAT_WORKER* Worker = (CXPLAT_WORKER*)Pool->Owner;
CxPlatLockAcquire(&Worker->ECLock);
CxPlatListEntryRemove(&Pool->Link);
CxPlatLockRelease(&Worker->ECLock);
}

void
CxPlatProcessDynamicPoolAllocator(
_Inout_ CXPLAT_POOL_EX* Pool
)
{
for (uint32_t i = 0; i < DYNAMIC_POOL_PRUNE_COUNT; ++i) {
void* Entry = InterlockedPopEntrySList(&Pool->ListHead);
if (!Entry) break;
Pool->Free(Entry, Pool->Tag, (CXPLAT_POOL*)Pool);
}
}

void
CxPlatProcessDynamicPoolAllocators(
_In_ CXPLAT_WORKER* Worker
)
{
QuicTraceLogVerbose(
PlatformWorkerProcessPools,
"[ lib][%p] Processing pools",
Worker);

CxPlatLockAcquire(&Worker->ECLock);
CXPLAT_LIST_ENTRY* Entry = Worker->DynamicPoolList.Flink;
while (Entry != &Worker->DynamicPoolList) {
CXPLAT_POOL_EX* Pool = CXPLAT_CONTAINING_RECORD(Entry, CXPLAT_POOL_EX, Link);
Entry = Entry->Flink;
CxPlatProcessDynamicPoolAllocator(Pool);
}
CxPlatLockRelease(&Worker->ECLock);
}

CXPLAT_EVENTQ*
CxPlatWorkerGetEventQ(
_In_ uint16_t Index
Expand Down Expand Up @@ -504,6 +570,11 @@ CXPLAT_THREAD_CALLBACK(CxPlatWorkerThread, Context)
CxPlatSchedulerYield();
State.NoWorkCount = 0;
}

if (State.TimeNow - State.LastPoolProcessTime > DYNAMIC_POOL_PROCESSING_TIME) {
CxPlatProcessDynamicPoolAllocators(Worker);
State.LastPoolProcessTime = State.TimeNow;
}
}

Shutdown:
Expand Down

0 comments on commit 40d552f

Please sign in to comment.