Skip to content

Commit

Permalink
Consolidate exception translation and bubbling #1275
Browse files Browse the repository at this point in the history
Exceptions are unwrapped now in a single utility. Future synchronization properly creates exception wrappers to associate the local stack trace with the exception.
  • Loading branch information
mp911de committed Apr 27, 2020
1 parent 290f827 commit bfaafc3
Show file tree
Hide file tree
Showing 23 changed files with 202 additions and 234 deletions.
30 changes: 4 additions & 26 deletions src/main/java/io/lettuce/core/AbstractRedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import reactor.core.publisher.Mono;
import io.lettuce.core.Transports.NativeTransports;
import io.lettuce.core.internal.AsyncCloseable;
import io.lettuce.core.internal.Exceptions;
import io.lettuce.core.internal.Futures;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.protocol.ConnectionWatchdog;
Expand Down Expand Up @@ -291,12 +292,7 @@ protected <T> T getConnection(ConnectionFuture<T> connectionFuture) {
Thread.currentThread().interrupt();
throw RedisConnectionException.create(connectionFuture.getRemoteAddress(), e);
} catch (Exception e) {

if (e instanceof ExecutionException) {
throw RedisConnectionException.create(connectionFuture.getRemoteAddress(), e.getCause());
}

throw RedisConnectionException.create(connectionFuture.getRemoteAddress(), e);
throw RedisConnectionException.create(connectionFuture.getRemoteAddress(), Exceptions.unwrap(e));
}
}

Expand All @@ -318,12 +314,7 @@ protected <T> T getConnection(CompletableFuture<T> connectionFuture) {
Thread.currentThread().interrupt();
throw RedisConnectionException.create(e);
} catch (Exception e) {

if (e instanceof ExecutionException) {
throw RedisConnectionException.create(e.getCause());
}

throw RedisConnectionException.create(e);
throw RedisConnectionException.create(Exceptions.unwrap(e));
}
}

Expand Down Expand Up @@ -467,21 +458,8 @@ public void shutdown(long quietPeriod, long timeout, TimeUnit timeUnit) {

try {
shutdownAsync(quietPeriod, timeout, timeUnit).get();
} catch (RuntimeException e) {
throw e;
} catch (ExecutionException e) {

if (e.getCause() instanceof RedisCommandExecutionException) {
throw ExceptionFactory.createExecutionException(e.getCause().getMessage(), e.getCause());
}

throw new RedisException(e.getCause());
} catch (InterruptedException e) {

Thread.currentThread().interrupt();
throw new RedisCommandInterruptedException(e);
} catch (Exception e) {
throw ExceptionFactory.createExecutionException(null, e);
throw Exceptions.bubble(e);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/lettuce/core/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;

import io.lettuce.core.internal.ExceptionFactory;
import reactor.core.publisher.Mono;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.codec.RedisCodec;
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/lettuce/core/RedisPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import io.lettuce.core.internal.ExceptionFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.util.context.Context;
Expand Down
35 changes: 16 additions & 19 deletions src/main/java/io/lettuce/core/cluster/MultiNodeExecution.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import java.util.concurrent.atomic.AtomicLong;

import io.lettuce.core.RedisCommandInterruptedException;
import io.lettuce.core.RedisException;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.internal.Exceptions;

/**
* Utility to perform and synchronize command executions on multiple cluster nodes.
Expand All @@ -35,11 +35,8 @@ class MultiNodeExecution {
static <T> T execute(Callable<T> function) {
try {
return function.call();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RedisCommandInterruptedException(e);
} catch (Exception e) {
throw new RedisException(e);
throw Exceptions.bubble(e);
}
}

Expand Down Expand Up @@ -75,14 +72,14 @@ protected static <T> RedisFuture<T> firstOfAsync(Map<?, ? extends CompletionStag

return new PipelinedRedisFuture<>(executions, objectPipelinedRedisFuture -> {
// make sure, that all futures are executed before returning the result.
for (CompletionStage<T> future : executions.values()) {
execute(() -> future.toCompletableFuture().get());
}
for (CompletionStage<T> future : executions.values()) {
return execute(() -> future.toCompletableFuture().get());
}
return null;
});
for (CompletionStage<T> future : executions.values()) {
execute(() -> future.toCompletableFuture().get());
}
for (CompletionStage<T> future : executions.values()) {
return execute(() -> future.toCompletableFuture().get());
}
return null;
});
}

/**
Expand All @@ -96,12 +93,12 @@ static <T> RedisFuture<T> lastOfAsync(Map<?, ? extends CompletionStage<T>> execu

return new PipelinedRedisFuture<>(executions, objectPipelinedRedisFuture -> {
// make sure, that all futures are executed before returning the result.
T result = null;
for (CompletionStage<T> future : executions.values()) {
result = execute(() -> future.toCompletableFuture().get());
}
return result;
});
T result = null;
for (CompletionStage<T> future : executions.values()) {
result = execute(() -> future.toCompletableFuture().get());
}
return result;
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@

import org.reactivestreams.Publisher;

import io.lettuce.core.*;
import io.lettuce.core.RedisCommandExecutionException;
import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.cluster.api.NodeSelectionSupport;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.internal.AbstractInvocationHandler;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.internal.TimeoutProvider;
import io.lettuce.core.internal.*;
import io.lettuce.core.protocol.RedisCommand;

/**
Expand Down Expand Up @@ -212,13 +211,8 @@ private static boolean awaitAll(long timeout, TimeUnit unit, Collection<Completi
complete = true;
} catch (TimeoutException e) {
complete = false;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RedisCommandInterruptedException(e);
} catch (ExecutionException e) {
throw new RedisException(e.getCause());
} catch (Exception e) {
throw new RedisCommandInterruptedException(e);
throw Exceptions.bubble(e);
}

return complete;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@
import io.lettuce.core.cluster.models.partitions.Partitions;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.internal.AsyncConnectionProvider;
import io.lettuce.core.internal.Futures;
import io.lettuce.core.internal.HostAndPort;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.internal.*;
import io.lettuce.core.models.role.RedisInstance;
import io.lettuce.core.models.role.RedisNodeDescription;
import io.netty.util.internal.logging.InternalLogger;
Expand Down Expand Up @@ -87,15 +84,8 @@ public StatefulRedisConnection<K, V> getConnection(Intent intent, int slot) {

try {
return getConnectionAsync(intent, slot).get();
} catch (RedisException e) {
throw e;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RedisCommandInterruptedException(e);
} catch (ExecutionException e) {
throw new RedisException(e.getCause());
} catch (RuntimeException e) {
throw new RedisException(e);
} catch (Exception e) {
throw Exceptions.bubble(e);
}
}

Expand Down Expand Up @@ -381,7 +371,8 @@ protected ConnectionFuture<StatefulRedisConnection<K, V>> getConnectionAsync(Con

if (throwable != null) {

result.completeExceptionally(RedisConnectionException.create(connectionFuture.getRemoteAddress(), throwable));
result.completeExceptionally(
RedisConnectionException.create(connectionFuture.getRemoteAddress(), Exceptions.bubble(throwable)));
} else {
result.complete(connection);
}
Expand Down
30 changes: 11 additions & 19 deletions src/main/java/io/lettuce/core/cluster/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import java.net.SocketAddress;
import java.net.URI;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
Expand All @@ -41,6 +44,7 @@
import io.lettuce.core.cluster.topology.TopologyComparators;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.internal.Exceptions;
import io.lettuce.core.internal.Futures;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.internal.LettuceLists;
Expand Down Expand Up @@ -881,10 +885,9 @@ private static <T> T get(CompletableFuture<T> future, Function<RedisException, R
throw mapper.apply((RedisException) e.getCause());
}

throw new RedisException(e.getCause());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RedisCommandInterruptedException(e);
throw Exceptions.bubble(e);
} catch (Exception e) {
throw Exceptions.bubble(e);
}
}

Expand Down Expand Up @@ -912,16 +915,16 @@ protected CompletableFuture<Partitions> loadPartitionsAsync() {
fetchPartitions(initialUris).whenComplete((nextNodes, nextThrowable) -> {

if (nextThrowable != null) {
Throwable exception = unwrap(nextThrowable);
exception.addSuppressed(unwrap(throwable));
Throwable exception = Exceptions.unwrap(nextThrowable);
exception.addSuppressed(Exceptions.unwrap(throwable));

future.completeExceptionally(exception);
} else {
future.complete(nextNodes);
}
});
} else {
future.completeExceptionally(unwrap(throwable));
future.completeExceptionally(Exceptions.unwrap(throwable));
}
});

Expand Down Expand Up @@ -1128,17 +1131,6 @@ private static RedisURI getViewedBy(Map<RedisURI, Partitions> map, Partitions pa
return null;
}

private static Throwable unwrap(Throwable throwable) {

Throwable ex = throwable;

while (ex instanceof CompletionException || ex instanceof ExecutionException) {
ex = ex.getCause();
}

return ex;
}

ClusterClientOptions getClusterClientOptions() {
return (ClusterClientOptions) getOptions();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import io.lettuce.core.ExceptionFactory;
import io.lettuce.core.internal.ExceptionFactory;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.codec.StringCodec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.topology.TopologyComparators.SortAction;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.internal.ExceptionFactory;
import io.lettuce.core.internal.Exceptions;
import io.lettuce.core.internal.Futures;
import io.lettuce.core.resource.ClientResources;
import io.netty.util.Timeout;
Expand Down Expand Up @@ -315,10 +317,7 @@ private void openConnections(ConnectionTracker tracker, Iterable<RedisURI> redis

if (throwable != null) {

Throwable throwableToUse = throwable;
if (throwable instanceof CompletionException) {
throwableToUse = throwableToUse.getCause();
}
Throwable throwableToUse = Exceptions.unwrap(throwable);

String message = String.format("Unable to connect to [%s]: %s", socketAddress,
throwableToUse.getMessage() != null ? throwableToUse.getMessage() : throwableToUse.toString());
Expand Down Expand Up @@ -395,10 +394,8 @@ private static Throwable getException(Future<?> future) {

try {
future.get();
} catch (InterruptedException e) {
throw new RedisCommandInterruptedException(e);
} catch (ExecutionException e) {
return e.getCause();
} catch (Exception e) {
return Exceptions.bubble(e);
}

return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
/*
* Copyright 2017-2020 the original author or authors.
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core;
package io.lettuce.core.internal;

import io.lettuce.core.*;

import java.time.Duration;
import java.time.LocalTime;
Expand Down Expand Up @@ -132,8 +134,8 @@ public static RedisCommandExecutionException createExecutionException(String mes
return cause != null ? new RedisLoadingException(message, cause) : new RedisLoadingException(message);
}

return cause != null ? new RedisCommandExecutionException(message, cause) : new RedisCommandExecutionException(
message);
return cause != null ? new RedisCommandExecutionException(message, cause)
: new RedisCommandExecutionException(message);
}

return new RedisCommandExecutionException(cause);
Expand Down
Loading

0 comments on commit bfaafc3

Please sign in to comment.