Skip to content

Commit

Permalink
[RCI] Add IndexShardOperationPermits.asyncBlockOperations(ActionListe…
Browse files Browse the repository at this point in the history
…ner<Releasable>)

The current implementation of asyncBlockOperations() can be used to
execute some code once all indexing operations permits have been acquired,
 then releases all permits immediately after the code execution. This
 immediate release is not suitable for treatments that need to keep all
 permits over multiple execution steps.

This commit adds a new asyncBlockOperations() that exposes a Releasable,
 making it possible to acquire all permits and only release them all
 when needed by closing the Releasable.

This method is aimed to be used in a TransportReplicationAction that
 will acquire all permits on the primary shard.

The existing blockOperations() and asyncBlockOperations() methods have
been modified to delegate permit acquisition/releasing to this new
method.

Relates to elastic#33888
  • Loading branch information
tlrx committed Nov 7, 2018
1 parent e685cfe commit 4d792af
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ <E extends Exception> void blockOperations(
final TimeUnit timeUnit,
final CheckedRunnable<E> onBlocked) throws InterruptedException, TimeoutException, E {
delayOperations();
try {
doBlockOperations(timeout, timeUnit, onBlocked);
try (Releasable ignored = acquireAll(timeout, timeUnit)) {
onBlocked.run();
} finally {
releaseDelayedOperations();
}
Expand All @@ -123,23 +123,66 @@ <E extends Exception> void blockOperations(
* @param onFailure the action to run if a failure occurs while blocking operations
* @param <E> the type of checked exception thrown by {@code onBlocked} (not thrown on the calling thread)
*/
<E extends Exception> void asyncBlockOperations(
final long timeout, final TimeUnit timeUnit, final CheckedRunnable<E> onBlocked, final Consumer<Exception> onFailure) {
<E extends Exception> void asyncBlockOperations(final long timeout, final TimeUnit timeUnit,
final CheckedRunnable<E> onBlocked, final Consumer<Exception> onFailure) {
asyncBlockOperations(new ActionListener<Releasable>() {
@Override
public void onFailure(final Exception e) {
onFailure.accept(e);
}

@Override
public void onResponse(final Releasable releasable) {
try (Releasable ignored = releasable) {
onBlocked.run();
} catch (final Exception e) {
onFailure.accept(e);
}
}
}, timeout, timeUnit);
}

/**
* Immediately delays operations and on another thread waits for in-flight operations to finish and then acquires all permits. When all
* permits are acquired, the provided {@link ActionListener} is called under the guarantee that no new operations are started. Delayed
* operations are run once the {@link Releasable} is released or if a failure occurs while acquiring all permits; in this case the
* {@code onFailure} handler will be invoked before running delayed operations.
*
* @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed
* @param timeout the maximum time to wait for the in-flight operations block
* @param timeUnit the time unit of the {@code timeout} argument
*/
public void asyncBlockOperations(final ActionListener<Releasable> onAcquired, final long timeout, final TimeUnit timeUnit) {
delayOperations();
threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() {

final AtomicBoolean released = new AtomicBoolean(false);

@Override
public void onFailure(final Exception e) {
onFailure.accept(e);
try {
onAcquired.onFailure(e);
} finally {
releaseDelayedOperationsIfNeeded();
}
}

@Override
protected void doRun() throws Exception {
doBlockOperations(timeout, timeUnit, onBlocked);
final Releasable releasable = acquireAll(timeout, timeUnit);
onAcquired.onResponse(() -> {
try {
releasable.close();
} finally {
releaseDelayedOperationsIfNeeded();
}
});
}

@Override
public void onAfter() {
releaseDelayedOperations();
private void releaseDelayedOperationsIfNeeded() {
if (released.compareAndSet(false, true)) {
releaseDelayedOperations();
}
}
});
}
Expand All @@ -154,23 +197,21 @@ private void delayOperations() {
}
}

private <E extends Exception> void doBlockOperations(
final long timeout,
final TimeUnit timeUnit,
final CheckedRunnable<E> onBlocked) throws InterruptedException, TimeoutException, E {
private Releasable acquireAll(final long timeout, final TimeUnit timeUnit) throws InterruptedException, TimeoutException {
if (Assertions.ENABLED) {
// since delayed is not volatile, we have to synchronize even here for visibility
synchronized (this) {
assert queuedBlockOperations > 0;
}
}
if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) {
assert semaphore.availablePermits() == 0;
try {
onBlocked.run();
} finally {
semaphore.release(TOTAL_PERMITS);
}
final AtomicBoolean closed = new AtomicBoolean();
return () -> {
if (closed.compareAndSet(false, true)) {
assert semaphore.availablePermits() == 0;
semaphore.release(TOTAL_PERMITS);
}
};
} else {
throw new TimeoutException("timeout while blocking operations");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
Expand Down Expand Up @@ -48,6 +49,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -220,7 +222,7 @@ public void testGetBlockWhenBlocked() throws ExecutionException, InterruptedExce
try (Releasable ignored = blockAndWait()) {
permits.acquire(future, ThreadPool.Names.GENERIC, true, "");

permits.asyncBlockOperations(
randomAsyncBlockOperations(permits,
30,
TimeUnit.MINUTES,
() -> {
Expand Down Expand Up @@ -334,7 +336,7 @@ public void testAsyncBlockOperationsOperationWhileBlocked() throws InterruptedEx
final CountDownLatch blockAcquired = new CountDownLatch(1);
final CountDownLatch releaseBlock = new CountDownLatch(1);
final AtomicBoolean blocked = new AtomicBoolean();
permits.asyncBlockOperations(
randomAsyncBlockOperations(permits,
30,
TimeUnit.MINUTES,
() -> {
Expand Down Expand Up @@ -392,7 +394,7 @@ public void testAsyncBlockOperationsOperationBeforeBlocked() throws InterruptedE
// now we will delay operations while the first operation is still executing (because it is latched)
final CountDownLatch blockedLatch = new CountDownLatch(1);
final AtomicBoolean onBlocked = new AtomicBoolean();
permits.asyncBlockOperations(
randomAsyncBlockOperations(permits,
30,
TimeUnit.MINUTES,
() -> {
Expand Down Expand Up @@ -486,7 +488,7 @@ public void onFailure(Exception e) {
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
permits.asyncBlockOperations(
randomAsyncBlockOperations(permits,
30,
TimeUnit.MINUTES,
() -> {
Expand Down Expand Up @@ -559,7 +561,7 @@ public void testActiveOperationsCount() throws ExecutionException, InterruptedEx
public void testAsyncBlockOperationsOnFailure() throws InterruptedException {
final AtomicReference<Exception> reference = new AtomicReference<>();
final CountDownLatch onFailureLatch = new CountDownLatch(1);
permits.asyncBlockOperations(
randomAsyncBlockOperations(permits,
10,
TimeUnit.MINUTES,
() -> {
Expand Down Expand Up @@ -596,7 +598,7 @@ public void testTimeout() throws BrokenBarrierException, InterruptedException {
{
final AtomicReference<Exception> reference = new AtomicReference<>();
final CountDownLatch onFailureLatch = new CountDownLatch(1);
permits.asyncBlockOperations(
randomAsyncBlockOperations(permits,
1,
TimeUnit.MILLISECONDS,
() -> {},
Expand Down Expand Up @@ -716,4 +718,32 @@ public void testPermitTraceCapturing() throws ExecutionException, InterruptedExc
assertThat(permits.getActiveOperationsCount(), equalTo(0));
assertThat(permits.getActiveOperations(), emptyIterable());
}

/**
* Randomizes the usage of {@link IndexShardOperationPermits#asyncBlockOperations(ActionListener, long, TimeUnit)} and
* {@link IndexShardOperationPermits#asyncBlockOperations(long, TimeUnit, CheckedRunnable, Consumer)}
*/
private <E extends Exception> void randomAsyncBlockOperations(final IndexShardOperationPermits permits,
final long timeout, final TimeUnit timeUnit,
final CheckedRunnable<E> onBlocked, final Consumer<Exception> onFailure) {
if (randomBoolean()) {
permits.asyncBlockOperations(timeout, timeUnit, onBlocked, onFailure);
} else {
permits.asyncBlockOperations(new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
try (Releasable ignored = releasable) {
onBlocked.run();
} catch (final Exception e) {
onFailure.accept(e);
}
}

@Override
public void onFailure(final Exception e) {
onFailure.accept(e);
}
}, timeout, timeUnit);
}
}
}

0 comments on commit 4d792af

Please sign in to comment.