-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BUG] Serialization bugs can cause node drops #1885
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,6 +47,7 @@ | |
import org.opensearch.common.util.concurrent.ThreadContext; | ||
import org.opensearch.threadpool.ThreadPool; | ||
|
||
import java.io.EOFException; | ||
import java.io.IOException; | ||
import java.net.InetSocketAddress; | ||
import java.nio.ByteBuffer; | ||
|
@@ -149,27 +150,13 @@ private void messageReceived(TcpChannel channel, InboundMessage message, long st | |
streamInput = namedWriteableStream(message.openOrGetStreamInput()); | ||
assertRemoteVersion(streamInput, header.getVersion()); | ||
if (header.isError()) { | ||
handlerResponseError(streamInput, handler); | ||
handlerResponseError(requestId, streamInput, handler); | ||
} else { | ||
handleResponse(remoteAddress, streamInput, handler); | ||
} | ||
// Check the entire message has been read | ||
final int nextByte = streamInput.read(); | ||
// calling read() is useful to make sure the message is fully read, even if there is an EOS marker | ||
if (nextByte != -1) { | ||
throw new IllegalStateException( | ||
"Message not fully read (response) for requestId [" | ||
+ requestId | ||
+ "], handler [" | ||
+ handler | ||
+ "], error [" | ||
+ header.isError() | ||
+ "]; resetting" | ||
); | ||
handleResponse(requestId, remoteAddress, streamInput, handler); | ||
} | ||
} else { | ||
assert header.isError() == false; | ||
handleResponse(remoteAddress, EMPTY_STREAM_INPUT, handler); | ||
handleResponse(requestId, remoteAddress, EMPTY_STREAM_INPUT, handler); | ||
} | ||
} | ||
} | ||
|
@@ -246,32 +233,47 @@ private <T extends TransportRequest> void handleRequest(TcpChannel channel, Head | |
assertRemoteVersion(stream, header.getVersion()); | ||
final RequestHandlerRegistry<T> reg = requestHandlers.getHandler(action); | ||
assert reg != null; | ||
final T request = reg.newRequest(stream); | ||
request.remoteAddress(new TransportAddress(channel.getRemoteAddress())); | ||
// in case we throw an exception, i.e. when the limit is hit, we don't want to verify | ||
final int nextByte = stream.read(); | ||
// calling read() is useful to make sure the message is fully read, even if there some kind of EOS marker | ||
if (nextByte != -1) { | ||
|
||
try { | ||
final T request = reg.newRequest(stream); | ||
request.remoteAddress(new TransportAddress(channel.getRemoteAddress())); | ||
|
||
// in case we throw an exception, i.e. when the limit is hit, we don't want to verify | ||
final int nextByte = stream.read(); | ||
// calling read() is useful to make sure the message is fully read, even if there some kind of EOS marker | ||
if (nextByte != -1) { | ||
throw new IllegalStateException( | ||
"Message not fully read (request) for requestId [" | ||
+ requestId | ||
+ "], action [" | ||
+ action | ||
+ "], available [" | ||
+ stream.available() | ||
+ "]; resetting" | ||
); | ||
} | ||
final String executor = reg.getExecutor(); | ||
if (ThreadPool.Names.SAME.equals(executor)) { | ||
try { | ||
reg.processMessageReceived(request, transportChannel); | ||
} catch (Exception e) { | ||
sendErrorResponse(reg.getAction(), transportChannel, e); | ||
} | ||
} else { | ||
threadPool.executor(executor).execute(new RequestHandler<>(reg, request, transportChannel)); | ||
} | ||
} catch (EOFException e) { | ||
// Another favor of (de)serialization issues is when stream contains less bytes than | ||
// the request handler needs to deserialize the payload. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Explicitly handle |
||
throw new IllegalStateException( | ||
"Message not fully read (request) for requestId [" | ||
"Message fully read (request) but more data is expected for requestId [" | ||
+ requestId | ||
+ "], action [" | ||
+ action | ||
+ "], available [" | ||
+ stream.available() | ||
+ "]; resetting" | ||
+ "]; resetting", | ||
e | ||
); | ||
} | ||
final String executor = reg.getExecutor(); | ||
if (ThreadPool.Names.SAME.equals(executor)) { | ||
try { | ||
reg.processMessageReceived(request, transportChannel); | ||
} catch (Exception e) { | ||
sendErrorResponse(reg.getAction(), transportChannel, e); | ||
} | ||
} else { | ||
threadPool.executor(executor).execute(new RequestHandler<>(reg, request, transportChannel)); | ||
} | ||
} | ||
} catch (Exception e) { | ||
sendErrorResponse(action, transportChannel, e); | ||
|
@@ -289,6 +291,7 @@ private static void sendErrorResponse(String actionName, TransportChannel transp | |
} | ||
|
||
private <T extends TransportResponse> void handleResponse( | ||
final long requestId, | ||
InetSocketAddress remoteAddress, | ||
final StreamInput stream, | ||
final TransportResponseHandler<T> handler | ||
|
@@ -297,6 +300,23 @@ private <T extends TransportResponse> void handleResponse( | |
try { | ||
response = handler.read(stream); | ||
response.remoteAddress(new TransportAddress(remoteAddress)); | ||
|
||
if (stream != EMPTY_STREAM_INPUT) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Treat unconsumed stream as (de)serialization failure and handle it as |
||
// Check the entire message has been read | ||
final int nextByte = stream.read(); | ||
reta marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// calling read() is useful to make sure the message is fully read, even if there is an EOS marker | ||
if (nextByte != -1) { | ||
throw new IllegalStateException( | ||
"Message not fully read (response) for requestId [" | ||
+ requestId | ||
+ "], handler [" | ||
+ handler | ||
+ "], error [" | ||
+ false | ||
+ "]; resetting" | ||
); | ||
} | ||
} | ||
} catch (Exception e) { | ||
final Exception serializationException = new TransportSerializationException( | ||
"Failed to deserialize response from handler [" + handler + "]", | ||
|
@@ -322,10 +342,25 @@ private <T extends TransportResponse> void doHandleResponse(TransportResponseHan | |
} | ||
} | ||
|
||
private void handlerResponseError(StreamInput stream, final TransportResponseHandler<?> handler) { | ||
private void handlerResponseError(final long requestId, StreamInput stream, final TransportResponseHandler<?> handler) { | ||
Exception error; | ||
try { | ||
error = stream.readException(); | ||
|
||
// Check the entire message has been read | ||
final int nextByte = stream.read(); | ||
// calling read() is useful to make sure the message is fully read, even if there is an EOS marker | ||
if (nextByte != -1) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Treat unconsumed stream as (de)serialization failure and handle it as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there are redundant code blocks for
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Those two are slightly different (fe logging, remote address setting), I am not sure the simplification would make sense here (unless we alter the behaviour slightly) |
||
throw new IllegalStateException( | ||
"Message not fully read (response) for requestId [" | ||
+ requestId | ||
+ "], handler [" | ||
+ handler | ||
+ "], error [" | ||
+ true | ||
+ "]; resetting" | ||
); | ||
} | ||
} catch (Exception e) { | ||
error = new TransportSerializationException( | ||
"Failed to deserialize exception response from stream for handler [" + handler + "]", | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The EOFException will come from this line, right? I'd prefer to keep the EOFException try/catch more tightly bounded in order to make things easier to follow with less indentation. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct
I thought about that as well, the issue is that
request
is used at the end of the code block as wellSo the alternative would introduce
nullable
request instance, no sure it is better (but readability would suffer I think). With theIllegalStateException
check wrapped into dedicated function it may look better actuallyThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the catch block always throws you can do this:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That the part I don't like to be fair:
I would try with the function instead, just a moment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@andrross done, thank you!