Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BESU-128: Fix WebSocket frames handling #210

Merged
merged 3 commits into from
Nov 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.concurrent.ConcurrentLinkedDeque;

import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.RequestOptions;
import io.vertx.core.http.WebSocket;
Expand Down Expand Up @@ -70,7 +69,7 @@ public JsonRpcSuccessEvent unsubscribe(final Subscription subscription) {

private JsonRpcSuccessEvent send(final String json) {

connection.writeBinaryMessage(Buffer.buffer(json));
connection.writeTextMessage(json);

WaitUtils.waitFor(() -> assertThat(receivedResponse).isEqualTo(true));

Expand Down
12 changes: 5 additions & 7 deletions besu/src/test/java/org/hyperledger/besu/RunnerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
Expand Down Expand Up @@ -320,13 +319,12 @@ private void syncFromGenesis(final SyncMode mode, final GenesisConfigFile genesi
WebSocketConfiguration.DEFAULT_WEBSOCKET_HOST,
"/",
ws -> {
ws.write(
Buffer.buffer(
"{\"id\": 1, \"method\": \"eth_subscribe\", \"params\": [\"syncing\"]}"));
ws.handler(
buffer -> {
ws.writeTextMessage(
"{\"id\": 1, \"method\": \"eth_subscribe\", \"params\": [\"syncing\"]}");
ws.textMessageHandler(
payload -> {
final boolean matches =
buffer.toString().equals("{\"jsonrpc\":\"2.0\",\"id\":2,\"result\":\"0x0\"}");
payload.equals("{\"jsonrpc\":\"2.0\",\"id\":2,\"result\":\"0x0\"}");
if (matches) {
future.complete();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,20 @@ public WebSocketRequestHandler(final Vertx vertx, final Map<String, JsonRpcMetho
this.methods = methods;
}

public void handle(final String id, final Buffer buffer) {
handle(Optional.empty(), id, buffer, Optional.empty());
public void handle(final String id, final String payload) {
handle(Optional.empty(), id, payload, Optional.empty());
}

public void handle(
final Optional<AuthenticationService> authenticationService,
final String id,
final Buffer buffer,
final String payload,
final Optional<User> user) {
vertx.executeBlocking(
future -> {
final WebSocketRpcRequest request;
try {
request = buffer.toJsonObject().mapTo(WebSocketRpcRequest.class);
request = Json.decodeValue(payload, WebSocketRpcRequest.class);
} catch (final IllegalArgumentException | DecodeException e) {
LOG.debug("Error mapping json to WebSocketRpcRequest", e);
future.complete(new JsonRpcErrorResponse(null, JsonRpcError.INVALID_REQUEST));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,19 @@ private Handler<ServerWebSocket> websocketHandler() {

LOG.debug("Websocket Connected ({})", socketAddressAsString(socketAddress));

websocket.handler(
buffer -> {
websocket.textMessageHandler(
payload -> {
LOG.debug(
"Received Websocket request {} ({})",
buffer.toString(),
payload,
socketAddressAsString(socketAddress));

AuthenticationUtils.getUser(
authenticationService,
token,
user ->
websocketRequestHandler.handle(
authenticationService, connectionId, buffer, user));
authenticationService, connectionId, payload, user));
});

websocket.closeHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.UUID;

import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.unit.Async;
Expand Down Expand Up @@ -93,7 +92,7 @@ public void handlerDeliversResponseSuccessfully(final TestContext context) {
context.assertEquals(Json.encode(expectedResponse), msg.body());
async.complete();
})
.completionHandler(v -> handler.handle(websocketId, Buffer.buffer(requestJson.toString())));
.completionHandler(v -> handler.handle(websocketId, requestJson.toString()));

async.awaitSuccess(WebSocketRequestHandlerTest.VERTX_AWAIT_TIMEOUT_MILLIS);
}
Expand All @@ -116,7 +115,7 @@ public void jsonDecodeFailureShouldRespondInvalidRequest(final TestContext conte
verifyZeroInteractions(jsonRpcMethodMock);
async.complete();
})
.completionHandler(v -> handler.handle(websocketId, Buffer.buffer("")));
.completionHandler(v -> handler.handle(websocketId, ""));

async.awaitSuccess(VERTX_AWAIT_TIMEOUT_MILLIS);
}
Expand All @@ -139,7 +138,7 @@ public void objectMapperFailureShouldRespondInvalidRequest(final TestContext con
verifyZeroInteractions(jsonRpcMethodMock);
async.complete();
})
.completionHandler(v -> handler.handle(websocketId, Buffer.buffer("{}")));
.completionHandler(v -> handler.handle(websocketId, "{}"));

async.awaitSuccess(VERTX_AWAIT_TIMEOUT_MILLIS);
}
Expand All @@ -163,7 +162,7 @@ public void absentMethodShouldRespondMethodNotFound(final TestContext context) {
context.assertEquals(Json.encode(expectedResponse), msg.body());
async.complete();
})
.completionHandler(v -> handler.handle(websocketId, Buffer.buffer(requestJson.toString())));
.completionHandler(v -> handler.handle(websocketId, requestJson.toString()));

async.awaitSuccess(WebSocketRequestHandlerTest.VERTX_AWAIT_TIMEOUT_MILLIS);
}
Expand All @@ -189,7 +188,7 @@ public void onExceptionProcessingRequestShouldRespondInternalError(final TestCon
context.assertEquals(Json.encode(expectedResponse), msg.body());
async.complete();
})
.completionHandler(v -> handler.handle(websocketId, Buffer.buffer(requestJson.toString())));
.completionHandler(v -> handler.handle(websocketId, requestJson.toString()));

async.awaitSuccess(WebSocketRequestHandlerTest.VERTX_AWAIT_TIMEOUT_MILLIS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import com.google.common.collect.Lists;
import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpClientRequest;
Expand Down Expand Up @@ -209,7 +208,7 @@ public void websocketServiceWithBadHeaderAuthenticationToken(final TestContext c
options,
headers,
webSocket -> {
webSocket.write(Buffer.buffer(request));
webSocket.writeTextMessage(request);

webSocket.handler(
buffer -> {
Expand Down Expand Up @@ -246,7 +245,7 @@ public void websocketServiceWithGoodHeaderAuthenticationToken(final TestContext
options,
headers,
webSocket -> {
webSocket.write(Buffer.buffer(requestSub));
webSocket.writeTextMessage(requestSub);

webSocket.handler(
buffer -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verifyNoInteractions;

import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.methods.WebSocketMethodsFactory;
Expand All @@ -34,7 +35,9 @@
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.WebSocketBase;
import io.vertx.core.http.WebSocketFrame;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
Expand Down Expand Up @@ -98,13 +101,13 @@ public void websocketServiceExecutesHandlerOnMessage(final TestContext context)
httpClient.websocket(
"/",
webSocket -> {
webSocket.write(Buffer.buffer(request));

webSocket.handler(
buffer -> {
context.assertEquals(expectedResponse, buffer.toString());
async.complete();
});

webSocket.writeTextMessage(request);
});

async.awaitSuccess(VERTX_AWAIT_TIMEOUT_MILLIS);
Expand Down Expand Up @@ -182,4 +185,34 @@ public void handleLoginRequestWithAuthDisabled() {
request.putHeader("Content-Type", "application/json; charset=utf-8");
request.end("{\"username\":\"user\",\"password\":\"pass\"}");
}

@Test
public void webSocketDoesNotToHandlePingPayloadAsJsonRpcRequest(final TestContext context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it would have taken me way longer to figure out how to test this. 👍

final Async async = context.async();

httpClient.webSocket(
"/",
result -> {
WebSocket websocket = result.result();

websocket.handler(
buffer -> {
final String payload = buffer.toString();
if (!payload.equals("foo")) {
context.fail("Only expected PONG response with same payload as PING request");
}
});

websocket.closeHandler(
h -> {
verifyNoInteractions(webSocketRequestHandlerSpy);
async.complete();
});

websocket.writeFrame(WebSocketFrame.pingFrame(Buffer.buffer("foo")));
websocket.close();
});

async.awaitSuccess(VERTX_AWAIT_TIMEOUT_MILLIS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.stream.Collectors;

import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.Json;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
Expand Down Expand Up @@ -77,8 +76,7 @@ public void shouldAddConnectionToMap(final TestContext context) {
})
.completionHandler(
v ->
webSocketRequestHandler.handle(
CONNECTION_ID_1, Buffer.buffer(Json.encode(subscribeRequestBody))));
webSocketRequestHandler.handle(CONNECTION_ID_1, Json.encode(subscribeRequestBody)));

async.awaitSuccess(ASYNC_TIMEOUT);
}
Expand Down Expand Up @@ -119,12 +117,12 @@ public void shouldAddMultipleConnectionsToMap(final TestContext context) {
.completionHandler(
v ->
webSocketRequestHandler.handle(
CONNECTION_ID_2, Buffer.buffer(Json.encode(subscribeRequestBody2))));
CONNECTION_ID_2, Json.encode(subscribeRequestBody2)));
})
.completionHandler(
v ->
webSocketRequestHandler.handle(
CONNECTION_ID_1, Buffer.buffer(Json.encode(subscribeRequestBody1))));
CONNECTION_ID_1, Json.encode(subscribeRequestBody1)));

async.awaitSuccess(ASYNC_TIMEOUT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.HashMap;

import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.Json;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
Expand Down Expand Up @@ -76,8 +75,7 @@ public void shouldRemoveConnectionWithSingleSubscriptionFromMap(final TestContex
})
.completionHandler(
v ->
webSocketRequestHandler.handle(
CONNECTION_ID, Buffer.buffer(Json.encode(unsubscribeRequestBody))));
webSocketRequestHandler.handle(CONNECTION_ID, Json.encode(unsubscribeRequestBody)));

async.awaitSuccess(ASYNC_TIMEOUT);
}
Expand Down Expand Up @@ -109,8 +107,7 @@ public void shouldRemoveSubscriptionAndKeepConnection(final TestContext context)
})
.completionHandler(
v ->
webSocketRequestHandler.handle(
CONNECTION_ID, Buffer.buffer(Json.encode(unsubscribeRequestBody))));
webSocketRequestHandler.handle(CONNECTION_ID, Json.encode(unsubscribeRequestBody)));

async.awaitSuccess(ASYNC_TIMEOUT);
}
Expand Down