From 5c095db1dfad8d0707d2b649651fe27b400484e0 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 18 Dec 2023 08:55:54 +0530 Subject: [PATCH 1/4] Poll allocation queue only when non-empty --- .../actions/SegmentAllocationQueue.java | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java index 6638c2f2578e..20e39adb5632 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java @@ -75,11 +75,15 @@ public class SegmentAllocationQueue private final long maxWaitTimeMillis; private final TaskLockbox taskLockbox; - private final ScheduledExecutorService executor; private final IndexerMetadataStorageCoordinator metadataStorage; private final AtomicBoolean isLeader = new AtomicBoolean(false); private final ServiceEmitter emitter; + /** + * Single-threaded executor to process allocation queue. + */ + private final ScheduledExecutorService executor; + private final ConcurrentHashMap keyToBatch = new ConcurrentHashMap<>(); private final BlockingDeque processingQueue = new LinkedBlockingDeque<>(MAX_QUEUE_SIZE); @@ -149,6 +153,10 @@ public boolean isEnabled() return executor != null && !executor.isShutdown(); } + /** + * Schedules a poll of the allocation queue that runs on the {@link #executor}. + * It is okay to schedule multiple polls since the executor is single threaded. + */ private void scheduleQueuePoll(long delay) { executor.schedule(this::processBatchesDue, delay, TimeUnit.MILLISECONDS); @@ -196,6 +204,7 @@ public Future add(SegmentAllocateRequest request) } }); + scheduleQueuePoll(maxWaitTimeMillis); return futureReference.get(); } @@ -262,6 +271,7 @@ private void processBatchesDue() { clearQueueIfNotLeader(); + // Process all the batches that are already due int numProcessedBatches = 0; AllocateRequestKey nextKey = processingQueue.peekFirst(); while (nextKey != null && nextKey.isDue()) { @@ -289,17 +299,16 @@ private void processBatchesDue() nextKey = processingQueue.peek(); } - // Schedule the next round of processing - final long nextScheduleDelay; + // Schedule the next round of processing if the queue is not empty if (processingQueue.isEmpty()) { - nextScheduleDelay = maxWaitTimeMillis; + log.debug("Processed [%d] batches, not scheduling again since queue is empty.", numProcessedBatches); } else { nextKey = processingQueue.peek(); long timeElapsed = System.currentTimeMillis() - nextKey.getQueueTime(); - nextScheduleDelay = Math.max(0, maxWaitTimeMillis - timeElapsed); + long nextScheduleDelay = Math.max(0, maxWaitTimeMillis - timeElapsed); + scheduleQueuePoll(nextScheduleDelay); + log.debug("Processed [%d] batches, next execution in [%d ms]", numProcessedBatches, nextScheduleDelay); } - scheduleQueuePoll(nextScheduleDelay); - log.debug("Processed [%d] batches, next execution in [%d ms]", numProcessedBatches, nextScheduleDelay); } /** From db40c4943c9aa0772359ccffdb0afe2c5822a5b2 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 18 Dec 2023 09:41:01 +0530 Subject: [PATCH 2/4] Schedule poll for every new batch --- .../druid/indexing/common/actions/SegmentAllocationQueue.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java index 20e39adb5632..ce901b475be7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java @@ -204,7 +204,6 @@ public Future add(SegmentAllocateRequest request) } }); - scheduleQueuePoll(maxWaitTimeMillis); return futureReference.get(); } @@ -238,6 +237,7 @@ private boolean addBatchToQueue(AllocateRequestBatch batch) return false; } else if (processingQueue.offer(batch.key)) { log.debug("Added a new batch [%s] to queue.", batch.key); + scheduleQueuePoll(maxWaitTimeMillis); return true; } else { batch.failPendingRequests( From 16ac92a5530cb05ffb6815ed903e6e06791e1198 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 18 Dec 2023 16:11:41 +0530 Subject: [PATCH 3/4] Fix race conditions --- .../actions/SegmentAllocationQueue.java | 170 +++++++++--------- 1 file changed, 82 insertions(+), 88 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java index ce901b475be7..7aa30d3a9356 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java @@ -85,7 +85,7 @@ public class SegmentAllocationQueue private final ScheduledExecutorService executor; private final ConcurrentHashMap keyToBatch = new ConcurrentHashMap<>(); - private final BlockingDeque processingQueue = new LinkedBlockingDeque<>(MAX_QUEUE_SIZE); + private final BlockingDeque processingQueue = new LinkedBlockingDeque<>(MAX_QUEUE_SIZE); @Inject public SegmentAllocationQueue( @@ -182,19 +182,18 @@ public Future add(SegmentAllocateRequest request) throw new ISE("Batched segment allocation is disabled."); } - final AllocateRequestKey requestKey = getKeyForAvailableBatch(request); + final AllocateRequestKey requestKey = new AllocateRequestKey(request); final AtomicReference> futureReference = new AtomicReference<>(); // Possible race condition: // t1 -> new batch is added to queue or batch already exists in queue // t2 -> executor pops batch, processes all requests in it // t1 -> new request is added to dangling batch and is never picked up - // Solution: For existing batch, call keyToBatch.remove() on the key to - // wait on keyToBatch.compute() to finish before proceeding with processBatch(). - // For new batch, keyToBatch.remove() would not wait as key is not in map yet - // but a new batch is unlikely to be due immediately, so it won't get popped right away. + // Solution: Perform the following operations only inside keyToBatch.compute(): + // 1. Add or remove from map, add batch to queue + // 2. Mark batch as started keyToBatch.compute(requestKey, (key, existingBatch) -> { - if (existingBatch == null) { + if (existingBatch == null || existingBatch.isStarted() || existingBatch.isFull()) { AllocateRequestBatch newBatch = new AllocateRequestBatch(key); futureReference.set(newBatch.add(request)); return addBatchToQueue(newBatch) ? newBatch : null; @@ -207,36 +206,18 @@ public Future add(SegmentAllocateRequest request) return futureReference.get(); } - /** - * Returns the key for a batch that is not added to the queue yet and/or has - * available space. Throws an exception if the queue is already full and no - * batch has available capacity. - */ - private AllocateRequestKey getKeyForAvailableBatch(SegmentAllocateRequest request) - { - for (int batchIncrementalId = 0; batchIncrementalId < MAX_QUEUE_SIZE; ++batchIncrementalId) { - AllocateRequestKey nextKey = new AllocateRequestKey(request, maxWaitTimeMillis, batchIncrementalId); - AllocateRequestBatch nextBatch = keyToBatch.get(nextKey); - if (nextBatch == null || nextBatch.size() < MAX_BATCH_SIZE) { - return nextKey; - } - } - - throw new ISE("Allocation queue is at capacity, all batches are full."); - } - /** * Tries to add the given batch to the processing queue. Fails all the pending * requests in the batch if we are not leader or if the queue is full. */ private boolean addBatchToQueue(AllocateRequestBatch batch) { - batch.key.resetQueueTime(); + batch.resetQueueTime(); if (!isLeader.get()) { batch.failPendingRequests("Not leader anymore"); return false; - } else if (processingQueue.offer(batch.key)) { - log.debug("Added a new batch [%s] to queue.", batch.key); + } else if (processingQueue.offer(batch)) { + log.debug("Added a new batch for key[%s] to queue.", batch.key); scheduleQueuePoll(maxWaitTimeMillis); return true; } else { @@ -257,7 +238,7 @@ private void requeueBatch(AllocateRequestBatch batch) { log.info("Requeueing [%d] failed requests in batch [%s].", batch.size(), batch.key); keyToBatch.compute(batch.key, (key, existingBatch) -> { - if (existingBatch == null) { + if (existingBatch == null || existingBatch.isFull() || existingBatch.isStarted()) { return addBatchToQueue(batch) ? batch : null; } else { // Merge requests from this batch to existing one @@ -273,38 +254,37 @@ private void processBatchesDue() // Process all the batches that are already due int numProcessedBatches = 0; - AllocateRequestKey nextKey = processingQueue.peekFirst(); - while (nextKey != null && nextKey.isDue()) { - processingQueue.pollFirst(); - + AllocateRequestBatch nextBatch = processingQueue.peekFirst(); + while (nextBatch != null && nextBatch.isDue()) { // Process the next batch in the queue + processingQueue.pollFirst(); + final AllocateRequestBatch currentBatch = nextBatch; boolean processed; - AllocateRequestBatch nextBatch = keyToBatch.remove(nextKey); try { - processed = processBatch(nextBatch); + processed = processBatch(currentBatch); } catch (Throwable t) { - nextBatch.failPendingRequests(t); + currentBatch.failPendingRequests(t); processed = true; - log.error(t, "Error while processing batch [%s]", nextKey); + log.error(t, "Error while processing batch [%s]", currentBatch); } // Requeue if not fully processed yet if (processed) { ++numProcessedBatches; } else { - requeueBatch(nextBatch); + requeueBatch(currentBatch); } - nextKey = processingQueue.peek(); + nextBatch = processingQueue.peek(); } // Schedule the next round of processing if the queue is not empty if (processingQueue.isEmpty()) { log.debug("Processed [%d] batches, not scheduling again since queue is empty.", numProcessedBatches); } else { - nextKey = processingQueue.peek(); - long timeElapsed = System.currentTimeMillis() - nextKey.getQueueTime(); + nextBatch = processingQueue.peek(); + long timeElapsed = System.currentTimeMillis() - nextBatch.getQueueTime(); long nextScheduleDelay = Math.max(0, maxWaitTimeMillis - timeElapsed); scheduleQueuePoll(nextScheduleDelay); log.debug("Processed [%d] batches, next execution in [%d ms]", numProcessedBatches, nextScheduleDelay); @@ -317,14 +297,14 @@ private void processBatchesDue() private void clearQueueIfNotLeader() { int failedBatches = 0; - AllocateRequestKey nextKey = processingQueue.peekFirst(); - while (nextKey != null && !isLeader.get()) { + AllocateRequestBatch nextBatch = processingQueue.peekFirst(); + while (nextBatch != null && !isLeader.get()) { processingQueue.pollFirst(); - AllocateRequestBatch nextBatch = keyToBatch.remove(nextKey); + keyToBatch.remove(nextBatch.key); nextBatch.failPendingRequests("Not leader anymore"); ++failedBatches; - nextKey = processingQueue.peekFirst(); + nextBatch = processingQueue.peekFirst(); } if (failedBatches > 0) { log.info("Not leader. Failed [%d] batches, remaining in queue [%d].", failedBatches, processingQueue.size()); @@ -332,11 +312,21 @@ private void clearQueueIfNotLeader() } /** - * Processes the given batch. Returns true if the batch was completely processed - * and should not be requeued. + * Processes the given batch. This method marks the batch as started and + * removes it from the map {@link #keyToBatch} so that no more requests can be + * added to it. + * + * @return true if the batch was completely processed and should not be requeued. */ private boolean processBatch(AllocateRequestBatch requestBatch) { + keyToBatch.compute(requestBatch.key, (batchKey, latestBatchForKey) -> { + // Mark the batch as started so that no more requests are added to it + requestBatch.markStarted(); + // Remove the corresponding key from the map if this is the latest batch for the key + return requestBatch.equals(latestBatchForKey) ? null : latestBatchForKey; + }); + final AllocateRequestKey requestKey = requestBatch.key; if (requestBatch.isEmpty()) { return true; @@ -347,13 +337,13 @@ private boolean processBatch(AllocateRequestBatch requestBatch) log.debug( "Processing [%d] requests for batch [%s], queue time [%s].", - requestBatch.size(), requestKey, requestKey.getQueueTime() + requestBatch.size(), requestKey, requestBatch.getQueueTime() ); final long startTimeMillis = System.currentTimeMillis(); final int batchSize = requestBatch.size(); emitBatchMetric("task/action/batch/size", batchSize, requestKey); - emitBatchMetric("task/action/batch/queueTime", (startTimeMillis - requestKey.getQueueTime()), requestKey); + emitBatchMetric("task/action/batch/queueTime", (startTimeMillis - requestBatch.getQueueTime()), requestKey); final Set usedSegments = retrieveUsedSegments(requestKey); final int successCount = allocateSegmentsForBatch(requestBatch, usedSegments); @@ -555,7 +545,9 @@ private void emitBatchMetric(String metric, long value, AllocateRequestKey key) */ private class AllocateRequestBatch { + private long queueTimeMillis; private final AllocateRequestKey key; + private boolean started = false; /** * Map from allocate requests (represents a single SegmentAllocateAction) @@ -573,29 +565,60 @@ private class AllocateRequestBatch this.key = key; } - synchronized Future add(SegmentAllocateRequest request) + long getQueueTime() + { + return queueTimeMillis; + } + + boolean isDue() + { + return System.currentTimeMillis() - queueTimeMillis >= maxWaitTimeMillis; + } + + synchronized void resetQueueTime() + { + queueTimeMillis = System.currentTimeMillis(); + started = false; + } + + void markStarted() + { + started = true; + } + + boolean isStarted() + { + return started; + } + + boolean isFull() + { + return size() >= MAX_BATCH_SIZE; + } + + Future add(SegmentAllocateRequest request) { log.debug("Adding request to batch [%s]: %s", key, request.getAction()); return requestToFuture.computeIfAbsent(request, req -> new CompletableFuture<>()); } - synchronized void transferRequestsFrom(AllocateRequestBatch batch) + void transferRequestsFrom(AllocateRequestBatch batch) { requestToFuture.putAll(batch.requestToFuture); batch.requestToFuture.clear(); } - synchronized Set getRequests() + Set getRequests() { return new HashSet<>(requestToFuture.keySet()); } - synchronized void failPendingRequests(String reason) + void failPendingRequests(String reason) { failPendingRequests(new ISE(reason)); } - synchronized void failPendingRequests(Throwable cause) + void failPendingRequests(Throwable cause) { if (!requestToFuture.isEmpty()) { log.warn("Failing [%d] requests in batch [%s], reason [%s].", size(), cause.getMessage(), key); @@ -607,7 +630,7 @@ synchronized void failPendingRequests(Throwable cause) } } - synchronized void completePendingRequestsWithNull() + void completePendingRequestsWithNull() { if (requestToFuture.isEmpty()) { return; @@ -620,7 +643,7 @@ synchronized void completePendingRequestsWithNull() requestToFuture.clear(); } - synchronized void handleResult(SegmentAllocateResult result, SegmentAllocateRequest request) + void handleResult(SegmentAllocateResult result, SegmentAllocateRequest request) { request.incrementAttempts(); @@ -643,12 +666,12 @@ synchronized void handleResult(SegmentAllocateResult result, SegmentAllocateRequ } } - synchronized boolean isEmpty() + boolean isEmpty() { return requestToFuture.isEmpty(); } - synchronized int size() + int size() { return requestToFuture.size(); } @@ -659,14 +682,6 @@ synchronized int size() */ private static class AllocateRequestKey { - /** - * ID to distinguish between two batches for the same datasource, groupId, etc. - */ - private final int batchIncrementalId; - - private long queueTimeMillis; - private final long maxWaitTimeMillis; - private final String dataSource; private final String groupId; private final Interval preferredAllocationInterval; @@ -684,12 +699,11 @@ private static class AllocateRequestKey * Creates a new key for the given request. The batch for a unique key will * always contain a single request. */ - AllocateRequestKey(SegmentAllocateRequest request, long maxWaitTimeMillis, int batchIncrementalId) + AllocateRequestKey(SegmentAllocateRequest request) { final SegmentAllocateAction action = request.getAction(); final Task task = request.getTask(); - this.batchIncrementalId = batchIncrementalId; this.dataSource = action.getDataSource(); this.groupId = task.getGroupId(); this.skipSegmentLineageCheck = action.isSkipSegmentLineageCheck(); @@ -703,30 +717,12 @@ private static class AllocateRequestKey this.hash = Objects.hash( dataSource, groupId, - batchIncrementalId, skipSegmentLineageCheck, useNonRootGenPartitionSpace, preferredAllocationInterval, lockGranularity ); this.serialized = serialize(); - - this.maxWaitTimeMillis = maxWaitTimeMillis; - } - - void resetQueueTime() - { - queueTimeMillis = System.currentTimeMillis(); - } - - long getQueueTime() - { - return queueTimeMillis; - } - - boolean isDue() - { - return System.currentTimeMillis() - queueTimeMillis >= maxWaitTimeMillis; } @Override @@ -741,7 +737,6 @@ public boolean equals(Object o) AllocateRequestKey that = (AllocateRequestKey) o; return dataSource.equals(that.dataSource) && groupId.equals(that.groupId) - && batchIncrementalId == that.batchIncrementalId && skipSegmentLineageCheck == that.skipSegmentLineageCheck && useNonRootGenPartitionSpace == that.useNonRootGenPartitionSpace && preferredAllocationInterval.equals(that.preferredAllocationInterval) @@ -765,7 +760,6 @@ private String serialize() return "{" + "datasource='" + dataSource + '\'' + ", groupId='" + groupId + '\'' + - ", batchId=" + batchIncrementalId + ", lock=" + lockGranularity + ", allocInterval=" + preferredAllocationInterval + ", skipLineageCheck=" + skipSegmentLineageCheck + From 69ddc0dc82685c18d77f4916312b98f46d80278b Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 18 Dec 2023 16:20:06 +0530 Subject: [PATCH 4/4] Clean up --- .../common/actions/SegmentAllocationQueue.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java index 7aa30d3a9356..6986ec683a56 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java @@ -190,8 +190,10 @@ public Future add(SegmentAllocateRequest request) // t2 -> executor pops batch, processes all requests in it // t1 -> new request is added to dangling batch and is never picked up // Solution: Perform the following operations only inside keyToBatch.compute(): - // 1. Add or remove from map, add batch to queue - // 2. Mark batch as started + // 1. Add or remove from map + // 2. Add batch to queue + // 3. Mark batch as started + // 4. Update requests in batch keyToBatch.compute(requestKey, (key, existingBatch) -> { if (existingBatch == null || existingBatch.isStarted() || existingBatch.isFull()) { AllocateRequestBatch newBatch = new AllocateRequestBatch(key); @@ -266,7 +268,7 @@ private void processBatchesDue() catch (Throwable t) { currentBatch.failPendingRequests(t); processed = true; - log.error(t, "Error while processing batch [%s]", currentBatch); + log.error(t, "Error while processing batch [%s]", currentBatch.key); } // Requeue if not fully processed yet @@ -552,10 +554,6 @@ private class AllocateRequestBatch /** * Map from allocate requests (represents a single SegmentAllocateAction) * to the future of allocated segment id. - *

- * This must be accessed through methods synchronized on this batch. - * It is to avoid races between a new request being added just when the batch - * is being processed. */ private final Map> requestToFuture = new HashMap<>(); @@ -575,7 +573,7 @@ boolean isDue() return System.currentTimeMillis() - queueTimeMillis >= maxWaitTimeMillis; } - synchronized void resetQueueTime() + void resetQueueTime() { queueTimeMillis = System.currentTimeMillis(); started = false;