Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Serialization bugs can cause node drops #1885

Merged
merged 3 commits into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 73 additions & 38 deletions server/src/main/java/org/opensearch/transport/InboundHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.threadpool.ThreadPool;

import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -149,27 +150,13 @@ private void messageReceived(TcpChannel channel, InboundMessage message, long st
streamInput = namedWriteableStream(message.openOrGetStreamInput());
assertRemoteVersion(streamInput, header.getVersion());
if (header.isError()) {
handlerResponseError(streamInput, handler);
handlerResponseError(requestId, streamInput, handler);
} else {
handleResponse(remoteAddress, streamInput, handler);
}
// Check the entire message has been read
final int nextByte = streamInput.read();
// calling read() is useful to make sure the message is fully read, even if there is an EOS marker
if (nextByte != -1) {
throw new IllegalStateException(
"Message not fully read (response) for requestId ["
+ requestId
+ "], handler ["
+ handler
+ "], error ["
+ header.isError()
+ "]; resetting"
);
handleResponse(requestId, remoteAddress, streamInput, handler);
}
} else {
assert header.isError() == false;
handleResponse(remoteAddress, EMPTY_STREAM_INPUT, handler);
handleResponse(requestId, remoteAddress, EMPTY_STREAM_INPUT, handler);
}
}
}
Expand Down Expand Up @@ -246,32 +233,47 @@ private <T extends TransportRequest> void handleRequest(TcpChannel channel, Head
assertRemoteVersion(stream, header.getVersion());
final RequestHandlerRegistry<T> reg = requestHandlers.getHandler(action);
assert reg != null;
final T request = reg.newRequest(stream);
request.remoteAddress(new TransportAddress(channel.getRemoteAddress()));
// in case we throw an exception, i.e. when the limit is hit, we don't want to verify
final int nextByte = stream.read();
// calling read() is useful to make sure the message is fully read, even if there some kind of EOS marker
if (nextByte != -1) {

try {
final T request = reg.newRequest(stream);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The EOFException will come from this line, right? I'd prefer to keep the EOFException try/catch more tightly bounded in order to make things easier to follow with less indentation. What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The EOFException will come from this line, right?

Correct

I'd prefer to keep the EOFException try/catch more tightly bounded in order to make things easier to follow with less indentation. What do you think?

I thought about that as well, the issue is that request is used at the end of the code block as well

...

if (ThreadPool.Names.SAME.equals(executor)) {
    try {
        reg.processMessageReceived(request, transportChannel);
    } catch (Exception e) {
        sendErrorResponse(reg.getAction(), transportChannel, e);
    }
} else {
    threadPool.executor(executor).execute(new RequestHandler<>(reg, request, transportChannel));
}

So the alternative would introduce nullable request instance, no sure it is better (but readability would suffer I think). With the IllegalStateException check wrapped into dedicated function it may look better actually

Copy link
Member

@andrross andrross Jan 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the catch block always throws you can do this:

final T request;
try {
    request = reg.newRequest(stream);
} catch (EOFException e) {
    // Another favor of (de)serialization issues is when stream contains less bytes than
    // the request handler needs to deserialize the payload.
    throw new IllegalStateException(
        "Message fully read (request) but more data is expected for requestId ["
            + requestId
            + "], action ["
            + action
            + "]; resetting",
        e
    );
}
request.remoteAddress(new TransportAddress(channel.getRemoteAddress()));
checkStreamIsFullyConsumed(requestId, action, stream);

final String executor = reg.getExecutor();
if (ThreadPool.Names.SAME.equals(executor)) {
    try {
        reg.processMessageReceived(request, transportChannel);
    } catch (Exception e) {
        sendErrorResponse(reg.getAction(), transportChannel, e);
    }
} else {
    threadPool.executor(executor).execute(new RequestHandler<>(reg, request, transportChannel));
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That the part I don't like to be fair:

final T request;

try {
    request = reg.newRequest(stream);
}

I would try with the function instead, just a moment

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrross done, thank you!

request.remoteAddress(new TransportAddress(channel.getRemoteAddress()));

// in case we throw an exception, i.e. when the limit is hit, we don't want to verify
final int nextByte = stream.read();
// calling read() is useful to make sure the message is fully read, even if there some kind of EOS marker
if (nextByte != -1) {
throw new IllegalStateException(
"Message not fully read (request) for requestId ["
+ requestId
+ "], action ["
+ action
+ "], available ["
+ stream.available()
+ "]; resetting"
);
}
final String executor = reg.getExecutor();
if (ThreadPool.Names.SAME.equals(executor)) {
try {
reg.processMessageReceived(request, transportChannel);
} catch (Exception e) {
sendErrorResponse(reg.getAction(), transportChannel, e);
}
} else {
threadPool.executor(executor).execute(new RequestHandler<>(reg, request, transportChannel));
}
} catch (EOFException e) {
// Another favor of (de)serialization issues is when stream contains less bytes than
// the request handler needs to deserialize the payload.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explicitly handle EOFException as a favour of (de)serialization failure.

throw new IllegalStateException(
"Message not fully read (request) for requestId ["
"Message fully read (request) but more data is expected for requestId ["
+ requestId
+ "], action ["
+ action
+ "], available ["
+ stream.available()
+ "]; resetting"
+ "]; resetting",
e
);
}
final String executor = reg.getExecutor();
if (ThreadPool.Names.SAME.equals(executor)) {
try {
reg.processMessageReceived(request, transportChannel);
} catch (Exception e) {
sendErrorResponse(reg.getAction(), transportChannel, e);
}
} else {
threadPool.executor(executor).execute(new RequestHandler<>(reg, request, transportChannel));
}
}
} catch (Exception e) {
sendErrorResponse(action, transportChannel, e);
Expand All @@ -289,6 +291,7 @@ private static void sendErrorResponse(String actionName, TransportChannel transp
}

private <T extends TransportResponse> void handleResponse(
final long requestId,
InetSocketAddress remoteAddress,
final StreamInput stream,
final TransportResponseHandler<T> handler
Expand All @@ -297,6 +300,23 @@ private <T extends TransportResponse> void handleResponse(
try {
response = handler.read(stream);
response.remoteAddress(new TransportAddress(remoteAddress));

if (stream != EMPTY_STREAM_INPUT) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Treat unconsumed stream as (de)serialization failure and handle it as TransportSerializationException

// Check the entire message has been read
final int nextByte = stream.read();
reta marked this conversation as resolved.
Show resolved Hide resolved
// calling read() is useful to make sure the message is fully read, even if there is an EOS marker
if (nextByte != -1) {
throw new IllegalStateException(
"Message not fully read (response) for requestId ["
+ requestId
+ "], handler ["
+ handler
+ "], error ["
+ false
+ "]; resetting"
);
}
}
} catch (Exception e) {
final Exception serializationException = new TransportSerializationException(
"Failed to deserialize response from handler [" + handler + "]",
Expand All @@ -322,10 +342,25 @@ private <T extends TransportResponse> void doHandleResponse(TransportResponseHan
}
}

private void handlerResponseError(StreamInput stream, final TransportResponseHandler<?> handler) {
private void handlerResponseError(final long requestId, StreamInput stream, final TransportResponseHandler<?> handler) {
Exception error;
try {
error = stream.readException();

// Check the entire message has been read
final int nextByte = stream.read();
// calling read() is useful to make sure the message is fully read, even if there is an EOS marker
if (nextByte != -1) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Treat unconsumed stream as (de)serialization failure and handle it as TransportSerializationException

Copy link
Collaborator

@Bukhtawar Bukhtawar Jan 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are redundant code blocks for

  1. handlerResponseError
  2. handleResponse
    Can we simply this further

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those two are slightly different (fe logging, remote address setting), I am not sure the simplification would make sense here (unless we alter the behaviour slightly)

throw new IllegalStateException(
"Message not fully read (response) for requestId ["
+ requestId
+ "], handler ["
+ handler
+ "], error ["
+ true
+ "]; resetting"
);
}
} catch (Exception e) {
error = new TransportSerializationException(
"Failed to deserialize exception response from stream for handler [" + handler + "]",
Expand Down
Loading