Skip to content

Commit

Permalink
Adapt low-level REST client to java 8 (elastic#41537)
Browse files Browse the repository at this point in the history
As a follow-up to elastic#38540 we can use lambda functions and method
references where convenient in the low-level REST client.

Also, we need to update the docs to state that the minimum java version
required is 1.8.
  • Loading branch information
javanna committed May 22, 2019
1 parent eae58c4 commit 15ad400
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.client;

import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
* Holds the state of a dead connection to a host. Keeps track of how many failed attempts were performed and
Expand All @@ -30,35 +31,36 @@ final class DeadHostState implements Comparable<DeadHostState> {

private static final long MIN_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(1);
static final long MAX_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(30);
static final Supplier<Long> DEFAULT_TIME_SUPPLIER = System::nanoTime;

private final int failedAttempts;
private final long deadUntilNanos;
private final TimeSupplier timeSupplier;
private final Supplier<Long> timeSupplier;

/**
* Build the initial dead state of a host. Useful when a working host stops functioning
* and needs to be marked dead after its first failure. In such case the host will be retried after a minute or so.
*
* @param timeSupplier a way to supply the current time and allow for unit testing
*/
DeadHostState(TimeSupplier timeSupplier) {
DeadHostState(Supplier<Long> timeSupplier) {
this.failedAttempts = 1;
this.deadUntilNanos = timeSupplier.nanoTime() + MIN_CONNECTION_TIMEOUT_NANOS;
this.deadUntilNanos = timeSupplier.get() + MIN_CONNECTION_TIMEOUT_NANOS;
this.timeSupplier = timeSupplier;
}

/**
* Build the dead state of a host given its previous dead state. Useful when a host has been failing before, hence
* it already failed for one or more consecutive times. The more failed attempts we register the longer we wait
* to retry that same host again. Minimum is 1 minute (for a node the only failed once created
* through {@link #DeadHostState(TimeSupplier)}), maximum is 30 minutes (for a node that failed more than 10 consecutive times)
* through {@link #DeadHostState(Supplier)}), maximum is 30 minutes (for a node that failed more than 10 consecutive times)
*
* @param previousDeadHostState the previous state of the host which allows us to increase the wait till the next retry attempt
*/
DeadHostState(DeadHostState previousDeadHostState) {
long timeoutNanos = (long)Math.min(MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1),
MAX_CONNECTION_TIMEOUT_NANOS);
this.deadUntilNanos = previousDeadHostState.timeSupplier.nanoTime() + timeoutNanos;
this.deadUntilNanos = previousDeadHostState.timeSupplier.get() + timeoutNanos;
this.failedAttempts = previousDeadHostState.failedAttempts + 1;
this.timeSupplier = previousDeadHostState.timeSupplier;
}
Expand All @@ -69,7 +71,7 @@ final class DeadHostState implements Comparable<DeadHostState> {
* @return true if the host should be retried, false otherwise
*/
boolean shallBeRetried() {
return timeSupplier.nanoTime() - deadUntilNanos > 0;
return timeSupplier.get() - deadUntilNanos > 0;
}

/**
Expand All @@ -87,8 +89,8 @@ int getFailedAttempts() {
@Override
public int compareTo(DeadHostState other) {
if (timeSupplier != other.timeSupplier) {
throw new IllegalArgumentException("can't compare DeadHostStates with different clocks ["
+ timeSupplier + " != " + other.timeSupplier + "]");
throw new IllegalArgumentException("can't compare DeadHostStates holding different time suppliers as they may " +
"be based on different clocks");
}
return Long.compare(deadUntilNanos, other.deadUntilNanos);
}
Expand All @@ -101,23 +103,4 @@ public String toString() {
", timeSupplier=" + timeSupplier +
'}';
}

/**
* Time supplier that makes timing aspects pluggable to ease testing
*/
interface TimeSupplier {
TimeSupplier DEFAULT = new TimeSupplier() {
@Override
public long nanoTime() {
return System.nanoTime();
}

@Override
public String toString() {
return "nanoTime";
}
};

long nanoTime();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public String getEndpoint() {
*/
public void addParameter(String name, String value) {
Objects.requireNonNull(name, "url parameter name cannot be null");
// .putIfAbsent(name, value) except we are in Java 7 which doesn't have that.
if (parameters.containsKey(name)) {
throw new IllegalArgumentException("url parameter [" + name + "] has already been set to [" + parameters.get(name) + "]");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package org.elasticsearch.client;

import org.apache.http.message.BasicHeader;
import org.apache.http.Header;
import org.apache.http.message.BasicHeader;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.elasticsearch.client.HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory;

Expand All @@ -38,7 +38,7 @@ public final class RequestOptions {
* Default request options.
*/
public static final RequestOptions DEFAULT = new Builder(
Collections.<Header>emptyList(), HeapBufferedResponseConsumerFactory.DEFAULT, null).build();
Collections.emptyList(), HeapBufferedResponseConsumerFactory.DEFAULT, null).build();

private final List<Header> headers;
private final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory;
Expand Down
38 changes: 11 additions & 27 deletions client/rest/src/main/java/org/elasticsearch/client/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.http.nio.client.methods.HttpAsyncMethods;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.elasticsearch.client.DeadHostState.TimeSupplier;

import javax.net.ssl.SSLHandshakeException;
import java.io.Closeable;
Expand All @@ -72,6 +71,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static java.util.Collections.singletonList;

Expand Down Expand Up @@ -139,7 +139,11 @@ public static RestClientBuilder builder(Node... nodes) {
* @see Node#Node(HttpHost)
*/
public static RestClientBuilder builder(HttpHost... hosts) {
return new RestClientBuilder(hostsToNodes(hosts));
if (hosts == null || hosts.length == 0) {
throw new IllegalArgumentException("hosts must not be null nor empty");
}
List<Node> nodes = Arrays.stream(hosts).map(Node::new).collect(Collectors.toList());
return new RestClientBuilder(nodes);
}

/**
Expand All @@ -163,17 +167,6 @@ public synchronized void setNodes(Collection<Node> nodes) {
this.blacklist.clear();
}

private static List<Node> hostsToNodes(HttpHost[] hosts) {
if (hosts == null || hosts.length == 0) {
throw new IllegalArgumentException("hosts must not be null nor empty");
}
List<Node> nodes = new ArrayList<>(hosts.length);
for (HttpHost host : hosts) {
nodes.add(new Node(host));
}
return nodes;
}

/**
* Get the list of nodes that the client knows about. The list is
* unmodifiable.
Expand Down Expand Up @@ -369,15 +362,11 @@ static Iterable<Node> selectNodes(NodeTuple<List<Node>> nodeTuple, Map<HttpHost,
List<DeadNode> deadNodes = new ArrayList<>(blacklist.size());
for (Node node : nodeTuple.nodes) {
DeadHostState deadness = blacklist.get(node.getHost());
if (deadness == null) {
livingNodes.add(node);
continue;
}
if (deadness.shallBeRetried()) {
if (deadness == null || deadness.shallBeRetried()) {
livingNodes.add(node);
continue;
} else {
deadNodes.add(new DeadNode(node, deadness));
}
deadNodes.add(new DeadNode(node, deadness));
}

if (false == livingNodes.isEmpty()) {
Expand Down Expand Up @@ -415,12 +404,7 @@ static Iterable<Node> selectNodes(NodeTuple<List<Node>> nodeTuple, Map<HttpHost,
* to compare many things. This saves us a sort on the unfiltered
* list.
*/
nodeSelector.select(new Iterable<Node>() {
@Override
public Iterator<Node> iterator() {
return new DeadNodeIteratorAdapter(selectedDeadNodes.iterator());
}
});
nodeSelector.select(() -> new DeadNodeIteratorAdapter(selectedDeadNodes.iterator()));
if (false == selectedDeadNodes.isEmpty()) {
return singletonList(Collections.min(selectedDeadNodes).node);
}
Expand All @@ -447,7 +431,7 @@ private void onResponse(Node node) {
private void onFailure(Node node) {
while(true) {
DeadHostState previousDeadHostState =
blacklist.putIfAbsent(node.getHost(), new DeadHostState(TimeSupplier.DEFAULT));
blacklist.putIfAbsent(node.getHost(), new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER));
if (previousDeadHostState == null) {
if (logger.isDebugEnabled()) {
logger.debug("added [" + node + "] to blacklist");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,8 @@ public RestClient build() {
if (failureListener == null) {
failureListener = new RestClient.FailureListener();
}
CloseableHttpAsyncClient httpClient = AccessController.doPrivileged(new PrivilegedAction<CloseableHttpAsyncClient>() {
@Override
public CloseableHttpAsyncClient run() {
return createHttpClient();
}
});
CloseableHttpAsyncClient httpClient = AccessController.doPrivileged(
(PrivilegedAction<CloseableHttpAsyncClient>) this::createHttpClient);
RestClient restClient = new RestClient(httpClient, defaultHeaders, nodes,
pathPrefix, failureListener, nodeSelector, strictDeprecationMode);
httpClient.start();
Expand All @@ -218,12 +214,7 @@ private CloseableHttpAsyncClient createHttpClient() {
}

final HttpAsyncClientBuilder finalBuilder = httpClientBuilder;
return AccessController.doPrivileged(new PrivilegedAction<CloseableHttpAsyncClient>() {
@Override
public CloseableHttpAsyncClient run() {
return finalBuilder.build();
}
});
return AccessController.doPrivileged((PrivilegedAction<CloseableHttpAsyncClient>) finalBuilder::build);
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException("could not create the default ssl context", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.elasticsearch.client.DeadHostState.TimeSupplier;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand All @@ -38,14 +36,14 @@ public class DeadHostStateTests extends RestClientTestCase {
private static long[] EXPECTED_TIMEOUTS_SECONDS = new long[]{60, 84, 120, 169, 240, 339, 480, 678, 960, 1357, 1800};

public void testInitialDeadHostStateDefaultTimeSupplier() {
DeadHostState deadHostState = new DeadHostState(DeadHostState.TimeSupplier.DEFAULT);
DeadHostState deadHostState = new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER);
long currentTime = System.nanoTime();
assertThat(deadHostState.getDeadUntilNanos(), greaterThanOrEqualTo(currentTime));
assertThat(deadHostState.getFailedAttempts(), equalTo(1));
}

public void testDeadHostStateFromPreviousDefaultTimeSupplier() {
DeadHostState previous = new DeadHostState(DeadHostState.TimeSupplier.DEFAULT);
DeadHostState previous = new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER);
int iters = randomIntBetween(5, 30);
for (int i = 0; i < iters; i++) {
DeadHostState deadHostState = new DeadHostState(previous);
Expand All @@ -58,10 +56,13 @@ public void testDeadHostStateFromPreviousDefaultTimeSupplier() {
public void testCompareToTimeSupplier() {
int numObjects = randomIntBetween(EXPECTED_TIMEOUTS_SECONDS.length, 30);
DeadHostState[] deadHostStates = new DeadHostState[numObjects];
final AtomicLong time = new AtomicLong(0);
for (int i = 0; i < numObjects; i++) {
if (i == 0) {
// this test requires a strictly increasing timer
deadHostStates[i] = new DeadHostState(new StrictMonotonicTimeSupplier());
// this test requires a strictly increasing timer. This ensures that even if we call this time supplier in a very tight
// loop we always notice time moving forward. This does not happen for real timer implementations
// (e.g. on Linux <code>clock_gettime</code> provides microsecond resolution).
deadHostStates[i] = new DeadHostState(time::incrementAndGet);
} else {
deadHostStates[i] = new DeadHostState(deadHostStates[i - 1]);
}
Expand All @@ -74,42 +75,39 @@ public void testCompareToTimeSupplier() {

public void testCompareToDifferingTimeSupplier() {
try {
new DeadHostState(TimeSupplier.DEFAULT).compareTo(
new DeadHostState(new ConfigurableTimeSupplier()));
new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER).compareTo(
new DeadHostState(() -> 0L));
fail("expected failure");
} catch (IllegalArgumentException e) {
assertEquals("can't compare DeadHostStates with different clocks [nanoTime != configured[0]]",
e.getMessage());
assertEquals("can't compare DeadHostStates holding different time suppliers as they may " +
"be based on different clocks", e.getMessage());
}
}

public void testShallBeRetried() {
ConfigurableTimeSupplier timeSupplier = new ConfigurableTimeSupplier();
final AtomicLong time = new AtomicLong(0);
DeadHostState deadHostState = null;
for (int i = 0; i < EXPECTED_TIMEOUTS_SECONDS.length; i++) {
long expectedTimeoutSecond = EXPECTED_TIMEOUTS_SECONDS[i];
timeSupplier.nanoTime = 0;
if (i == 0) {
deadHostState = new DeadHostState(timeSupplier);
deadHostState = new DeadHostState(time::get);
} else {
deadHostState = new DeadHostState(deadHostState);
}
for (int j = 0; j < expectedTimeoutSecond; j++) {
timeSupplier.nanoTime += TimeUnit.SECONDS.toNanos(1);
time.addAndGet(TimeUnit.SECONDS.toNanos(1));
assertThat(deadHostState.shallBeRetried(), is(false));
}
int iters = randomIntBetween(5, 30);
for (int j = 0; j < iters; j++) {
timeSupplier.nanoTime += TimeUnit.SECONDS.toNanos(1);
time.addAndGet(TimeUnit.SECONDS.toNanos(1));
assertThat(deadHostState.shallBeRetried(), is(true));
}
}
}

public void testDeadHostStateTimeouts() {
ConfigurableTimeSupplier zeroTimeSupplier = new ConfigurableTimeSupplier();
zeroTimeSupplier.nanoTime = 0L;
DeadHostState previous = new DeadHostState(zeroTimeSupplier);
DeadHostState previous = new DeadHostState(() -> 0L);
for (long expectedTimeoutsSecond : EXPECTED_TIMEOUTS_SECONDS) {
assertThat(TimeUnit.NANOSECONDS.toSeconds(previous.getDeadUntilNanos()), equalTo(expectedTimeoutsSecond));
previous = new DeadHostState(previous);
Expand All @@ -123,37 +121,4 @@ public void testDeadHostStateTimeouts() {
previous = deadHostState;
}
}

static class ConfigurableTimeSupplier implements DeadHostState.TimeSupplier {
long nanoTime;

@Override
public long nanoTime() {
return nanoTime;
}

@Override
public String toString() {
return "configured[" + nanoTime + "]";
}
}

/**
* Simulates a monotonically strict increasing time (i.e. the value increases on every call to <code>#nanoTime()</code>). This ensures
* that even if we call this time supplier in a very tight loop we always notice time moving forward. This does not happen for real
* timer implementations (e.g. on Linux <code>clock_gettime</code> provides microsecond resolution).
*/
static class StrictMonotonicTimeSupplier implements DeadHostState.TimeSupplier {
private final AtomicLong time = new AtomicLong(0);

@Override
public long nanoTime() {
return time.incrementAndGet();
}

@Override
public String toString() {
return "strict monotonic[" + time.get() + "]";
}
}
}
Loading

0 comments on commit 15ad400

Please sign in to comment.