Skip to content

Commit

Permalink
Move to separate type
Browse files Browse the repository at this point in the history
  • Loading branch information
benaadams committed Sep 14, 2024
1 parent d24ffa3 commit f64441c
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 109 deletions.
148 changes: 148 additions & 0 deletions src/Nethermind/Nethermind.Consensus/GC/GCScheduler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Diagnostics;
using System.Runtime;
using System.Threading;
using System.Threading.Tasks;
using Nethermind.Core.Extensions;
using Nethermind.Core.Memory;

namespace Nethermind.Consensus;

public sealed class GCScheduler
{
private const int BlocksBacklogTriggeringManualGC = 4;
private const int MaxBlocksWithoutGC = 250;
private const int MinSecondsBetweenForcedGC = 120;

private static int _isPerformingGC = 0;

private readonly Timer _gcTimer;
private Task _lastGcTask = Task.CompletedTask;
private bool _isNextGcBlocking = false;
private bool _isNextGcCompacting = false;
private bool _gcTimerSet = false;
private bool _fireGC = false;
private long _countToGC = 0L;
private Stopwatch _stopwatch = new();

public static GCScheduler Instance => new GCScheduler();

public GCScheduler()
{
_gcTimer = new Timer(_ => PerformFullGC(), null, Timeout.Infinite, Timeout.Infinite);
}

public void SwitchOnBackgroundGC(int queueCount)
{
if (_fireGC)
{
_countToGC--;
if (_countToGC <= 0 && _stopwatch.Elapsed.TotalSeconds > MinSecondsBetweenForcedGC)
{
_fireGC = false;
_stopwatch.Reset();
if (_lastGcTask.IsCompleted)
{
_lastGcTask = PerformFullGCAsync();
}
}
}

if (queueCount > 0)
{
// Don't switch on if still processing blocks
}

if (_gcTimerSet)
{
return;
}
_gcTimerSet = true;
// GC every 2 minutes if block processing idle
_gcTimer.Change(TimeSpan.FromMinutes(2), TimeSpan.FromMinutes(2));
}

public void SwitchOffBackgroundGC(int queueCount)
{
if (!_fireGC && queueCount > BlocksBacklogTriggeringManualGC)
{
// Long chains in archive sync don't force GC and don't call MallocTrim;
// so we trigger it manually
_fireGC = true;
_countToGC = MaxBlocksWithoutGC;
_stopwatch.Restart();
}
else if (queueCount == 0)
{
// Nothing remaining in the queue, so we can stop forcing GC
_fireGC = false;
}

if (!_gcTimerSet)
{
return;
}
_gcTimerSet = false;
_gcTimer.Change(Timeout.Infinite, Timeout.Infinite);
}

public async Task PerformFullGCAsync()
{
// Flip to ThreadPool to avoid blocking the main processing thread
await Task.Yield();

PerformFullGC();
}

public static bool MarkGCPaused()
{
return Interlocked.CompareExchange(ref _isPerformingGC, 1, 0) == 0;
}

public static void MarkGCResumed()
{
Volatile.Write(ref _isPerformingGC, 0);
}

private void PerformFullGC()
{
if (!MarkGCPaused())
{
// Skip if another GC is in progress
return;
}

// Compacting GC every other cycle of blocking GC
bool compacting = _isNextGcBlocking && _isNextGcCompacting;

int generation = 1;
GCCollectionMode mode = GCCollectionMode.Forced;
if (compacting)
{
// Collect all generations
generation = GC.MaxGeneration;
// Compact large object heap
GCSettings.LargeObjectHeapCompactionMode = GCLargeObjectHeapCompactionMode.CompactOnce;
// Release memory back to the OS
mode = GCCollectionMode.Aggressive;
}

GC.Collect(generation, mode, blocking: _isNextGcBlocking, compacting: compacting);

if (_isNextGcBlocking)
{
// Switch compacting every other cycle of blocking GC
_isNextGcCompacting = !_isNextGcCompacting;
}
_isNextGcBlocking = !_isNextGcBlocking;

// Trim native memory
MallocHelper.Instance.MallocTrim((uint)1.MiB());

// Mark GC as finished
Volatile.Write(ref _isPerformingGC, 0);
}
}
112 changes: 4 additions & 108 deletions src/Nethermind/Nethermind.Consensus/Processing/BlockchainProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,11 @@ public sealed class BlockchainProcessor : IBlockchainProcessor, IBlockProcessing
private Task? _recoveryTask;
private Task? _processorTask;
private DateTime _lastProcessedBlock;
private Task _lastGcTask = Task.CompletedTask;

private int _currentRecoveryQueueSize;
private const int MaxBlocksDuringFastSyncTransition = 8192;
private readonly CompositeBlockTracer _compositeBlockTracer = new();
private readonly Stopwatch _stopwatch = new();
private readonly Timer _gcTimer;
private int _isPerformingGC = 0;
private bool _isNextGcBlocking = false;
private bool _isNextGcCompacting = false;

public event EventHandler<IBlockchainProcessor.InvalidBlockEventArgs>? InvalidBlock;

Expand Down Expand Up @@ -99,7 +94,6 @@ public BlockchainProcessor(
_blockTree.NewHeadBlock += OnNewHeadBlock;

_stats = new ProcessingStats(_logger);
_gcTimer = new Timer(_ => PerformFullGC(), null, Timeout.Infinite, Timeout.Infinite);
}

private void OnNewHeadBlock(object? sender, BlockEventArgs e)
Expand Down Expand Up @@ -303,40 +297,15 @@ private Task RunProcessing()

private void RunProcessingLoop()
{
const int BlocksBacklogTriggeringManualGC = 4;
const int MaxBlocksWithoutGC = 250;
const int MinSecondsBetweenForcedGC = 120;
Stopwatch stopwatch = new();

if (_logger.IsDebug) _logger.Debug($"Starting block processor - {_blockQueue.Count} blocks waiting in the queue.");

FireProcessingQueueEmpty();

bool gcTimerSet = true;
SwitchOnBackgroundGC();
bool fireGC = false;
long countToGC = 0L;
GCScheduler.Instance.SwitchOnBackgroundGC(0);
foreach (BlockRef blockRef in _blockQueue.GetConsumingEnumerable(_loopCancellationSource.Token))
{
if (gcTimerSet)
{
// Have block, switch off background GC timer
SwitchOffBackgroundGC();
}
int queueCount = _blockQueue.Count;
if (!fireGC && queueCount > BlocksBacklogTriggeringManualGC)
{
// Long chains in archive sync don't force GC and don't call MallocTrim;
// so we trigger it manually
fireGC = true;
countToGC = MaxBlocksWithoutGC;
stopwatch.Restart();
}
else if (queueCount == 0)
{
// Nothing remaining in the queue, so we can stop forcing GC
fireGC = false;
}
// Have block, switch off background GC timer
GCScheduler.Instance.SwitchOffBackgroundGC(_blockQueue.Count);

try
{
Expand Down Expand Up @@ -376,85 +345,12 @@ private void RunProcessingLoop()
if (_logger.IsTrace) _logger.Trace($"Now {_blockQueue.Count} blocks waiting in the queue.");
FireProcessingQueueEmpty();

if (_blockQueue.Count == 0)
{
// No blocks, switch on background GC timer
gcTimerSet = true;
SwitchOnBackgroundGC();
}
if (fireGC)
{
countToGC--;
if (countToGC <= 0 && stopwatch.Elapsed.TotalSeconds > MinSecondsBetweenForcedGC)
{
fireGC = false;
stopwatch.Reset();
if (_lastGcTask.IsCompleted)
{
_lastGcTask = PerformFullGCAsync();
}
}
}
GCScheduler.Instance.SwitchOnBackgroundGC(_blockQueue.Count);
}

if (_logger.IsInfo) _logger.Info("Block processor queue stopped.");
}

private void SwitchOnBackgroundGC()
// GC every 2 minutes if block processing idle
=> _gcTimer.Change(TimeSpan.FromMinutes(2), TimeSpan.FromMinutes(2));

private void SwitchOffBackgroundGC()
=> _gcTimer.Change(Timeout.Infinite, Timeout.Infinite);

private async Task PerformFullGCAsync()
{
// Flip to ThreadPool to avoid blocking the main processing thread
await Task.Yield();

PerformFullGC();
}

private void PerformFullGC()
{
if (Interlocked.CompareExchange(ref _isPerformingGC, 1, 0) == 1)
{
// Skip if another GC is in progress
return;
}

// Compacting GC every other cycle of blocking GC
bool compacting = _isNextGcBlocking && _isNextGcCompacting;
if (_logger.IsDebug) _logger.Debug($"Performing Full GC, blocking: {_isNextGcBlocking} compacting:{compacting}");

int generation = 1;
GCCollectionMode mode = GCCollectionMode.Forced;
if (compacting)
{
// Collect all generations
generation = GC.MaxGeneration;
// Compact large object heap
GCSettings.LargeObjectHeapCompactionMode = GCLargeObjectHeapCompactionMode.CompactOnce;
// Release memory back to the OS
mode = GCCollectionMode.Aggressive;
}

GC.Collect(generation, mode, blocking: _isNextGcBlocking, compacting: compacting);

if (_isNextGcBlocking)
{
// Switch compacting every other cycle of blocking GC
_isNextGcCompacting = !_isNextGcCompacting;
}
_isNextGcBlocking = !_isNextGcBlocking;

// Trim native memory
MallocHelper.Instance.MallocTrim((uint)1.MiB());

// Mark GC as finished
Volatile.Write(ref _isPerformingGC, 0);
}

private void FireProcessingQueueEmpty()
{
if (((IBlockProcessingQueue)this).IsEmpty)
Expand Down
7 changes: 6 additions & 1 deletion src/Nethermind/Nethermind.Merge.Plugin/GC/GCKeeper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Threading;
using System.Threading.Tasks;
using FastEnumUtility;
using Nethermind.Consensus;
using Nethermind.Core.Extensions;
using Nethermind.Core.Memory;
using Nethermind.Logging;
Expand All @@ -29,7 +30,7 @@ public GCKeeper(IGCStrategy gcStrategy, ILogManager logManager)
public IDisposable TryStartNoGCRegion(long? size = null)
{
size ??= _defaultSize;
if (_gcStrategy.CanStartNoGCRegion())
if (GCScheduler.MarkGCPaused() && _gcStrategy.CanStartNoGCRegion())
{
FailCause failCause = FailCause.None;
try
Expand Down Expand Up @@ -87,6 +88,7 @@ internal NoGCRegion(GCKeeper gcKeeper, FailCause failCause, long? size, ILogger

public void Dispose()
{
GCScheduler.MarkGCResumed();
if (_failCause == FailCause.None)
{
if (GCSettings.LatencyMode == GCLatencyMode.NoGCRegion)
Expand Down Expand Up @@ -147,6 +149,8 @@ private async Task ScheduleGCInternal()

if (GCSettings.LatencyMode != GCLatencyMode.NoGCRegion)
{
if (!GCScheduler.MarkGCPaused()) return;

ulong forcedGcCount = Interlocked.Increment(ref _forcedGcCount);
int collectionsPerDecommit = _gcStrategy.CollectionsPerDecommit;

Expand All @@ -168,6 +172,7 @@ private async Task ScheduleGCInternal()
System.GC.Collect((int)generation, mode, blocking: true, compacting: compacting > 0);

MallocHelper.Instance.MallocTrim((uint)1.MiB());
GCScheduler.MarkGCResumed();
}
}
}
Expand Down

0 comments on commit f64441c

Please sign in to comment.