Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQL: Implement binary format support for SQL clear cursor #84230

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
9ab021a
Implement binary format support for SQL clear cursor
luigidellaquila Feb 22, 2022
eedaa4b
Update docs/changelog/84230.yaml
luigidellaquila Feb 22, 2022
e0e9e22
Merge branch 'master' into binary_format_on_cursor_close
elasticmachine Feb 22, 2022
a968837
Fix test case
luigidellaquila Feb 23, 2022
1f8d2b5
Enhance test cases on binary option (Clear Cursor response)
luigidellaquila Mar 7, 2022
b26c949
Merge branch 'master' into binary_format_on_cursor_close
elasticmachine Mar 7, 2022
553bf13
Allow only binary mode on SqlClearCursorResponseListener
luigidellaquila Mar 10, 2022
eb584b4
Merge remote-tracking branch 'luigidellaquila/binary_format_on_cursor…
luigidellaquila Mar 10, 2022
cfc0eff
Fix check on binary communication
luigidellaquila Mar 10, 2022
c53c7b8
Merge branch 'master' into binary_format_on_cursor_close
elasticmachine Mar 10, 2022
ca1eefd
SQL: Support JSON and CBOR in clear cursor response
luigidellaquila Mar 14, 2022
3fdcde0
Merge remote-tracking branch 'luigidellaquila/binary_format_on_cursor…
luigidellaquila Mar 14, 2022
c9481a5
Better handle the choice of CBOR vs JSON based on driver mode
luigidellaquila Mar 15, 2022
c8e9ce3
Refactor clear cursor action to be an anonymous inner class
luigidellaquila Mar 15, 2022
5ad8c0b
Make SqlClearCursorRequest action and proto more consistent
luigidellaquila Mar 17, 2022
c03406c
Merge branch 'master' into binary_format_on_cursor_close
elasticmachine Mar 17, 2022
9eebe9c
Merge branch 'master' into binary_format_on_cursor_close
elasticmachine Mar 18, 2022
ceab361
Improve test randomization
luigidellaquila Mar 23, 2022
6fd1163
Merge remote-tracking branch 'luigidellaquila/binary_format_on_cursor…
luigidellaquila Mar 23, 2022
ce6922f
Merge branch 'master' into binary_format_on_cursor_close
elasticmachine Mar 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/84230.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 84230
summary: Implement binary format support for SQL clear cursor
area: SQL
type: bug
issues:
- 53359
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ private void assertBinaryRequest(boolean isBinary, XContentType xContentType) th
logger.info("Ignored SQLException", e);
}
assertValues(isBinary, xContentType);

prepareMockResponse();
try {
httpClient.queryClose("");
} catch (SQLException e) {
logger.info("Ignored SQLException", e);
}
assertValues(isBinary, xContentType);
}

private void assertValues(boolean isBinary, XContentType xContentType) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.sql.qa.jdbc.single_node;

import org.elasticsearch.xpack.sql.qa.jdbc.CloseCursorTestCase;

public class JdbcCloseCursorIT extends CloseCursorTestCase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.sql.qa.jdbc;

import org.elasticsearch.core.CheckedConsumer;
import org.junit.Before;

import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public abstract class CloseCursorTestCase extends JdbcIntegrationTestCase {

@Before
public void initIndex() throws IOException {
index("library", "1", builder -> { builder.field("name", "foo"); });
index("library", "2", builder -> { builder.field("name", "bar"); });
index("library", "3", builder -> { builder.field("name", "baz"); });
}

public void testCloseCursor() throws SQLException {
luigidellaquila marked this conversation as resolved.
Show resolved Hide resolved
doWithQuery("SELECT name FROM library", results -> {
assertTrue(results.next());
results.close(); // force sending a cursor close since more pages are available
assertTrue(results.isClosed());
});
}

public void testCloseConsumedCursor() throws SQLException {
doWithQuery("SELECT name FROM library", results -> {
for (int i = 0; i < 3; i++) {
assertTrue(results.next());
}
luigidellaquila marked this conversation as resolved.
Show resolved Hide resolved
assertFalse(results.next());
results.close();
assertTrue(results.isClosed());
});
}

public void testCloseNoCursor() throws SQLException {
doWithQuery("SELECT name FROM library WHERE name = 'zzz'", results -> {
results.close();
assertTrue(results.isClosed());
});
}

private void doWithQuery(String query, CheckedConsumer<ResultSet, SQLException> consumer) throws SQLException {
try (Connection connection = createConnection(connectionProperties()); Statement statement = connection.createStatement()) {
statement.setFetchSize(1);
ResultSet results = statement.executeQuery(query);
consumer.accept(results);
}
}
luigidellaquila marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.proto.RequestInfo;
Expand All @@ -24,12 +25,16 @@
import static org.elasticsearch.xpack.sql.action.AbstractSqlQueryRequest.CURSOR;
import static org.elasticsearch.xpack.sql.action.AbstractSqlQueryRequest.MODE;
import static org.elasticsearch.xpack.sql.action.AbstractSqlQueryRequest.VERSION;
import static org.elasticsearch.xpack.sql.proto.CoreProtocol.BINARY_COMMUNICATION;
import static org.elasticsearch.xpack.sql.proto.CoreProtocol.BINARY_FORMAT_NAME;

/**
* Request to clean all SQL resources associated with the cursor
*/
public class SqlClearCursorRequest extends AbstractSqlRequest {

static final ParseField BINARY_FORMAT = new ParseField(BINARY_FORMAT_NAME);

private static final ConstructingObjectParser<SqlClearCursorRequest, Void> PARSER =
// here the position in "objects" is the same as the fields parser declarations below
new ConstructingObjectParser<>(SqlClearCursorAction.NAME, objects -> {
Expand All @@ -43,9 +48,11 @@ public class SqlClearCursorRequest extends AbstractSqlRequest {
PARSER.declareString(optionalConstructorArg(), MODE);
PARSER.declareString(optionalConstructorArg(), CLIENT_ID);
PARSER.declareString(optionalConstructorArg(), VERSION);
PARSER.declareBoolean(SqlClearCursorRequest::binaryCommunication, BINARY_FORMAT);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Declaring this field mandatory (instead of optional), breaks backwards compatibility for clients that do not set this parameter.

Copy link
Contributor Author

@luigidellaquila luigidellaquila Mar 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I got it, maybe I'm misinterpreting some logic here.
Isn't it optional already? (see also test https://github.com/elastic/elasticsearch/pull/84230/files#diff-7d14bc9a0a5fb5167a3d7eea6b457ea14324136f334a33df4417ec5f78fefb84R102 )

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read the above as having constructorArg() in a similar style to the previous declaration.

}

private String cursor;
private Boolean binaryCommunication = BINARY_COMMUNICATION;
luigidellaquila marked this conversation as resolved.
Show resolved Hide resolved
luigidellaquila marked this conversation as resolved.
Show resolved Hide resolved

public SqlClearCursorRequest() {}

Expand Down Expand Up @@ -80,12 +87,23 @@ public String getDescription() {
public SqlClearCursorRequest(StreamInput in) throws IOException {
super(in);
cursor = in.readString();
binaryCommunication = in.readOptionalBoolean();
}

public SqlClearCursorRequest binaryCommunication(Boolean binaryCommunication) {
this.binaryCommunication = binaryCommunication;
return this;
}

public Boolean binaryCommunication() {
return this.binaryCommunication;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(cursor);
out.writeOptionalBoolean(binaryCommunication);
}

@Override
Expand All @@ -94,12 +112,12 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
if (super.equals(o) == false) return false;
SqlClearCursorRequest that = (SqlClearCursorRequest) o;
return Objects.equals(cursor, that.cursor);
return Objects.equals(cursor, that.cursor) && Objects.equals(binaryCommunication, that.binaryCommunication);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), cursor);
return Objects.hash(super.hashCode(), cursor, binaryCommunication);
}

public static SqlClearCursorRequest fromXContent(XContentParser parser) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ protected TestSqlClearCursorRequest createXContextTestInstance(XContentType xCon

@Override
protected TestSqlClearCursorRequest createTestInstance() {
return new TestSqlClearCursorRequest(requestInfo, randomAlphaOfLength(100));
TestSqlClearCursorRequest result = new TestSqlClearCursorRequest(requestInfo, randomAlphaOfLength(100));
result.binaryCommunication(randomBoolean());
return result;
}

@Override
Expand All @@ -58,9 +60,11 @@ protected TestSqlClearCursorRequest mutateInstance(TestSqlClearCursorRequest ins
@SuppressWarnings("unchecked")
Consumer<TestSqlClearCursorRequest> mutator = randomFrom(
request -> request.requestInfo(randomValueOtherThan(request.requestInfo(), this::randomRequestInfo)),
request -> request.setCursor(randomValueOtherThan(request.getCursor(), SqlQueryResponseTests::randomStringCursor))
request -> request.setCursor(randomValueOtherThan(request.getCursor(), SqlQueryResponseTests::randomStringCursor)),
request -> request.binaryCommunication(randomValueOtherThan(request.binaryCommunication(), () -> randomBoolean()))
);
TestSqlClearCursorRequest newRequest = new TestSqlClearCursorRequest(instance.requestInfo(), instance.getCursor());
newRequest.binaryCommunication(instance.binaryCommunication());
mutator.accept(newRequest);
return newRequest;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.sql.proto.CoreProtocol;
import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.proto.SqlTypedParamValue;

Expand Down Expand Up @@ -61,23 +62,27 @@ public void testClearCursorRequestParser() throws IOException {
"cursor": "whatever",
"mode": "%s",
"client_id": "bla",
"version": "1.2.3"
"version": "1.2.3",
"binary_format": true
luigidellaquila marked this conversation as resolved.
Show resolved Hide resolved
}""".formatted(randomMode), SqlClearCursorRequest::fromXContent);
assertNull(request.clientId());
assertNull(request.version());
assertEquals(randomMode, request.mode());
assertEquals("whatever", request.getCursor());
assertTrue(request.binaryCommunication());

randomMode = randomFrom(Mode.values());
request = generateRequest("""
{
"cursor": "whatever",
"mode": "%s",
"client_id": "bla"
"client_id": "bla",
"binary_format": false
}""".formatted(randomMode.toString()), SqlClearCursorRequest::fromXContent);
assertNull(request.clientId());
assertEquals(randomMode, request.mode());
assertEquals("whatever", request.getCursor());
assertFalse(request.binaryCommunication());

request = generateRequest("{\"cursor\" : \"whatever\"}", SqlClearCursorRequest::fromXContent);
assertNull(request.clientId());
Expand All @@ -94,6 +99,7 @@ public void testClearCursorRequestParser() throws IOException {
assertNull(request.version());
assertEquals(Mode.PLAIN, request.mode());
assertEquals("whatever", request.getCursor());
assertEquals(CoreProtocol.BINARY_COMMUNICATION, request.binaryCommunication());

request = generateRequest("""
{"cursor" : "whatever", "client_id" : "cANVAs"}""", SqlClearCursorRequest::fromXContent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public TestSqlClearCursorRequest(RequestInfo requestInfo, String cursor) {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
org.elasticsearch.xpack.sql.proto.SqlClearCursorRequest protoInstance = new org.elasticsearch.xpack.sql.proto.SqlClearCursorRequest(
this.getCursor(),
this.requestInfo()
this.requestInfo(),
this.binaryCommunication()
);
return SqlTestUtils.toXContentBuilder(builder, g -> Payloads.generate(g, protoInstance));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public SqlQueryResponse nextPage(String cursor) throws SQLException {
public boolean queryClose(String cursor, Mode mode) throws SQLException {
ResponseWithWarnings<SqlClearCursorResponse> response = post(
CoreProtocol.CLEAR_CURSOR_REST_ENDPOINT,
new SqlClearCursorRequest(cursor, new RequestInfo(mode)),
new SqlClearCursorRequest(cursor, new RequestInfo(mode), cfg.binaryCommunication()),
Payloads::parseClearCursorResponse
);
return response.response().isSucceeded();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ public static void generate(JsonGenerator generator, SqlClearCursorRequest reque
generator.writeStringField("mode", request.mode().toString());
writeIfValid(generator, "client_id", request.clientId());
writeIfValidAsString(generator, "version", request.version());
writeIfValid(generator, "binary_format", request.binaryCommunication());

generator.writeEndObject();
}
Expand Down Expand Up @@ -222,20 +223,15 @@ public static void generate(
if (request.pageTimeout() != CoreProtocol.PAGE_TIMEOUT) {
generator.writeStringField(PAGE_TIMEOUT_NAME, request.pageTimeout().getStringRep());
}
if (request.columnar() != null) {
generator.writeBooleanField(COLUMNAR_NAME, request.columnar());
}
writeIfValid(generator, COLUMNAR_NAME, request.columnar());
if (request.fieldMultiValueLeniency()) {
generator.writeBooleanField(FIELD_MULTI_VALUE_LENIENCY_NAME, request.fieldMultiValueLeniency());
}
if (request.indexIncludeFrozen()) {
generator.writeBooleanField(INDEX_INCLUDE_FROZEN_NAME, request.indexIncludeFrozen());
}
if (request.binaryCommunication() != null) {
generator.writeBooleanField(BINARY_FORMAT_NAME, request.binaryCommunication());
}
writeIfValid(generator, BINARY_FORMAT_NAME, request.binaryCommunication());
writeIfValid(generator, CURSOR_NAME, request.cursor());

writeIfValidAsString(generator, WAIT_FOR_COMPLETION_TIMEOUT_NAME, request.waitForCompletionTimeout(), TimeValue::getStringRep);

if (request.keepOnCompletion()) {
Expand All @@ -255,6 +251,12 @@ private static void writeIfValid(JsonGenerator generator, String name, String va
}
}

private static void writeIfValid(JsonGenerator generator, String name, Boolean value) throws IOException {
if (value != null) {
generator.writeBooleanField(name, value);
}
}

private static void writeIfValidAsString(JsonGenerator generator, String name, Object value) throws IOException {
writeIfValidAsString(generator, name, value, Object::toString);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,33 @@
public class SqlClearCursorRequest extends AbstractSqlRequest {

private final String cursor;
private final Boolean binaryCommunication;

public SqlClearCursorRequest(String cursor, RequestInfo requestInfo) {
public SqlClearCursorRequest(String cursor, RequestInfo requestInfo, Boolean binaryCommunication) {
super(requestInfo);
this.cursor = cursor;
this.binaryCommunication = binaryCommunication;
}

public String getCursor() {
return cursor;
}

public Boolean binaryCommunication() {
return binaryCommunication;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (super.equals(o) == false) return false;
SqlClearCursorRequest that = (SqlClearCursorRequest) o;
return Objects.equals(cursor, that.cursor);
return Objects.equals(cursor, that.cursor) && Objects.equals(binaryCommunication, that.binaryCommunication);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), cursor);
return Objects.hash(super.hashCode(), cursor, binaryCommunication);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,19 @@
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestResponseListener;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.sql.action.Protocol;
import org.elasticsearch.xpack.sql.action.SqlClearCursorAction;
import org.elasticsearch.xpack.sql.action.SqlClearCursorRequest;
import org.elasticsearch.xpack.sql.action.SqlClearCursorResponse;
import org.elasticsearch.xpack.sql.proto.Mode;

import java.io.IOException;
import java.util.List;
Expand All @@ -40,7 +47,18 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
sqlRequest = SqlClearCursorRequest.fromXContent(parser);
}

return channel -> client.executeLocally(SqlClearCursorAction.INSTANCE, sqlRequest, new RestToXContentListener<>(channel));
return channel -> client.executeLocally(SqlClearCursorAction.INSTANCE, sqlRequest, new RestResponseListener<>(channel) {
@Override
public RestResponse buildResponse(SqlClearCursorResponse response) throws Exception {
Boolean binaryRequest = sqlRequest.binaryCommunication();
XContentType type = Boolean.TRUE.equals(binaryRequest) || (binaryRequest == null && Mode.isDriver(sqlRequest.mode()))
? XContentType.CBOR
: XContentType.JSON;
XContentBuilder builder = channel.newBuilder(request.getXContentType(), type, false);
response.toXContent(builder, request);
return new BytesRestResponse(RestStatus.OK, builder);
}
});
}

@Override
Expand Down