Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve polling in segment allocation queue #15590

Merged
merged 4 commits into from
Jan 4, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,15 @@ public class SegmentAllocationQueue
private final long maxWaitTimeMillis;

private final TaskLockbox taskLockbox;
private final ScheduledExecutorService executor;
private final IndexerMetadataStorageCoordinator metadataStorage;
private final AtomicBoolean isLeader = new AtomicBoolean(false);
private final ServiceEmitter emitter;

/**
* Single-threaded executor to process allocation queue.
*/
private final ScheduledExecutorService executor;

private final ConcurrentHashMap<AllocateRequestKey, AllocateRequestBatch> keyToBatch = new ConcurrentHashMap<>();
private final BlockingDeque<AllocateRequestKey> processingQueue = new LinkedBlockingDeque<>(MAX_QUEUE_SIZE);

Expand Down Expand Up @@ -149,6 +153,10 @@ public boolean isEnabled()
return executor != null && !executor.isShutdown();
}

/**
* Schedules a poll of the allocation queue that runs on the {@link #executor}.
* It is okay to schedule multiple polls since the executor is single threaded.
*/
private void scheduleQueuePoll(long delay)
{
executor.schedule(this::processBatchesDue, delay, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -196,6 +204,7 @@ public Future<SegmentIdWithShardSpec> add(SegmentAllocateRequest request)
}
});

scheduleQueuePoll(maxWaitTimeMillis);
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
return futureReference.get();
}

Expand Down Expand Up @@ -262,6 +271,7 @@ private void processBatchesDue()
{
clearQueueIfNotLeader();

// Process all the batches that are already due
int numProcessedBatches = 0;
AllocateRequestKey nextKey = processingQueue.peekFirst();
while (nextKey != null && nextKey.isDue()) {
Expand Down Expand Up @@ -289,17 +299,16 @@ private void processBatchesDue()
nextKey = processingQueue.peek();
}

// Schedule the next round of processing
final long nextScheduleDelay;
// Schedule the next round of processing if the queue is not empty
if (processingQueue.isEmpty()) {
nextScheduleDelay = maxWaitTimeMillis;
log.debug("Processed [%d] batches, not scheduling again since queue is empty.", numProcessedBatches);
} else {
nextKey = processingQueue.peek();
long timeElapsed = System.currentTimeMillis() - nextKey.getQueueTime();
nextScheduleDelay = Math.max(0, maxWaitTimeMillis - timeElapsed);
long nextScheduleDelay = Math.max(0, maxWaitTimeMillis - timeElapsed);
scheduleQueuePoll(nextScheduleDelay);
log.debug("Processed [%d] batches, next execution in [%d ms]", numProcessedBatches, nextScheduleDelay);
}
scheduleQueuePoll(nextScheduleDelay);
log.debug("Processed [%d] batches, next execution in [%d ms]", numProcessedBatches, nextScheduleDelay);
}

/**
Expand Down
Loading