Skip to content

Commit

Permalink
Remove Discovery.AckListener.onTimeout() (#30514)
Browse files Browse the repository at this point in the history
The MasterService takes responsibility for timeouts of the AckListeners that it
creates, and the rest of the Discovery subsystem is unaware of these timeouts,
so there's no need for this to appear in the Discovery.AckListener interface.

Also fix a typo in the name of DelegatingAckListener.
  • Loading branch information
DaveCTurner committed May 10, 2018
1 parent 37bb8f8 commit df17f85
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ public Discovery.AckListener createAckListener(ThreadPool threadPool, ClusterSta
}
});

return new DelegetingAckListener(ackListeners);
return new DelegatingAckListener(ackListeners);
}

public boolean clusterStateUnchanged() {
Expand Down Expand Up @@ -541,11 +541,11 @@ protected void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source)
}
}

private static class DelegetingAckListener implements Discovery.AckListener {
private static class DelegatingAckListener implements Discovery.AckListener {

private final List<Discovery.AckListener> listeners;

private DelegetingAckListener(List<Discovery.AckListener> listeners) {
private DelegatingAckListener(List<Discovery.AckListener> listeners) {
this.listeners = listeners;
}

Expand All @@ -555,11 +555,6 @@ public void onNodeAck(DiscoveryNode node, @Nullable Exception e) {
listener.onNodeAck(node, e);
}
}

@Override
public void onTimeout() {
throw new UnsupportedOperationException("no timeout delegation");
}
}

private static class AckCountDownListener implements Discovery.AckListener {
Expand Down Expand Up @@ -614,7 +609,6 @@ public void onNodeAck(DiscoveryNode node, @Nullable Exception e) {
}
}

@Override
public void onTimeout() {
if (countDown.fastForward()) {
logger.trace("timeout waiting for acknowledgement for cluster_state update (version: {})", clusterStateVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public interface Discovery extends LifecycleComponent {

interface AckListener {
void onNodeAck(DiscoveryNode node, @Nullable Exception e);
void onTimeout();
}

class FailedToCommitClusterStateException extends ElasticsearchException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,6 @@ public AssertingAckListener publishState(PublishClusterStateAction action, Clust

public static class AssertingAckListener implements Discovery.AckListener {
private final List<Tuple<DiscoveryNode, Throwable>> errors = new CopyOnWriteArrayList<>();
private final AtomicBoolean timeoutOccurred = new AtomicBoolean();
private final CountDownLatch countDown;

public AssertingAckListener(int nodeCount) {
Expand All @@ -829,23 +828,12 @@ public void onNodeAck(DiscoveryNode node, @Nullable Exception e) {
countDown.countDown();
}

@Override
public void onTimeout() {
timeoutOccurred.set(true);
// Fast forward the counter - no reason to wait here
long currentCount = countDown.getCount();
for (long i = 0; i < currentCount; i++) {
countDown.countDown();
}
}

public void await(long timeout, TimeUnit unit) throws InterruptedException {
assertThat(awaitErrors(timeout, unit), emptyIterable());
}

public List<Tuple<DiscoveryNode, Throwable>> awaitErrors(long timeout, TimeUnit unit) throws InterruptedException {
countDown.await(timeout, unit);
assertFalse(timeoutOccurred.get());
return errors;
}

Expand Down

0 comments on commit df17f85

Please sign in to comment.