Skip to content

Commit

Permalink
Implement binary format support for SQL clear cursor
Browse files Browse the repository at this point in the history
- support binary_format parameter on _sql/close
- make cursor close response consistent with the query protocol
  (ie. in ODBC/JDBC/CLI return CBOR response for cursor close - as for query)
  • Loading branch information
luigidellaquila committed Apr 1, 2022
1 parent 85359f3 commit d6486ab
Show file tree
Hide file tree
Showing 12 changed files with 160 additions and 20 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/84230.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 84230
summary: Implement binary format support for SQL clear cursor
area: SQL
type: bug
issues:
- 53359
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ private void assertBinaryRequest(boolean isBinary, XContentType xContentType) th
logger.info("Ignored SQLException", e);
}
assertValues(isBinary, xContentType);

prepareMockResponse();
try {
httpClient.queryClose("");
} catch (SQLException e) {
logger.info("Ignored SQLException", e);
}
assertValues(isBinary, xContentType);
}

private void assertValues(boolean isBinary, XContentType xContentType) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.sql.qa.jdbc.single_node;

import org.elasticsearch.xpack.sql.qa.jdbc.CloseCursorTestCase;

public class JdbcCloseCursorIT extends CloseCursorTestCase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.sql.qa.jdbc;

import org.elasticsearch.core.CheckedConsumer;
import org.junit.Before;

import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public abstract class CloseCursorTestCase extends JdbcIntegrationTestCase {

@Before
public void initIndex() throws IOException {
index("library", "1", builder -> { builder.field("name", "foo"); });
index("library", "2", builder -> { builder.field("name", "bar"); });
index("library", "3", builder -> { builder.field("name", "baz"); });
}

public void testCloseCursor() throws SQLException {
doWithQuery("SELECT name FROM library", results -> {
assertTrue(results.next());
results.close(); // force sending a cursor close since more pages are available
assertTrue(results.isClosed());
});
}

public void testCloseConsumedCursor() throws SQLException {
doWithQuery("SELECT name FROM library", results -> {
for (int i = 0; i < 3; i++) {
assertTrue(results.next());
}
assertFalse(results.next());
results.close();
assertTrue(results.isClosed());
});
}

public void testCloseNoCursor() throws SQLException {
doWithQuery("SELECT name FROM library WHERE name = 'zzz'", results -> {
results.close();
assertTrue(results.isClosed());
});
}

private void doWithQuery(String query, CheckedConsumer<ResultSet, SQLException> consumer) throws SQLException {
try (Connection connection = createConnection(connectionProperties()); Statement statement = connection.createStatement()) {
statement.setFetchSize(1);
ResultSet results = statement.executeQuery(query);
consumer.accept(results);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.proto.RequestInfo;
Expand All @@ -24,12 +25,16 @@
import static org.elasticsearch.xpack.sql.action.AbstractSqlQueryRequest.CURSOR;
import static org.elasticsearch.xpack.sql.action.AbstractSqlQueryRequest.MODE;
import static org.elasticsearch.xpack.sql.action.AbstractSqlQueryRequest.VERSION;
import static org.elasticsearch.xpack.sql.proto.CoreProtocol.BINARY_COMMUNICATION;
import static org.elasticsearch.xpack.sql.proto.CoreProtocol.BINARY_FORMAT_NAME;

/**
* Request to clean all SQL resources associated with the cursor
*/
public class SqlClearCursorRequest extends AbstractSqlRequest {

static final ParseField BINARY_FORMAT = new ParseField(BINARY_FORMAT_NAME);

private static final ConstructingObjectParser<SqlClearCursorRequest, Void> PARSER =
// here the position in "objects" is the same as the fields parser declarations below
new ConstructingObjectParser<>(SqlClearCursorAction.NAME, objects -> {
Expand All @@ -43,9 +48,11 @@ public class SqlClearCursorRequest extends AbstractSqlRequest {
PARSER.declareString(optionalConstructorArg(), MODE);
PARSER.declareString(optionalConstructorArg(), CLIENT_ID);
PARSER.declareString(optionalConstructorArg(), VERSION);
PARSER.declareBoolean(SqlClearCursorRequest::binaryCommunication, BINARY_FORMAT);
}

private String cursor;
private Boolean binaryCommunication = BINARY_COMMUNICATION;

public SqlClearCursorRequest() {}

Expand Down Expand Up @@ -80,12 +87,23 @@ public String getDescription() {
public SqlClearCursorRequest(StreamInput in) throws IOException {
super(in);
cursor = in.readString();
binaryCommunication = in.readOptionalBoolean();
}

public SqlClearCursorRequest binaryCommunication(Boolean binaryCommunication) {
this.binaryCommunication = binaryCommunication;
return this;
}

public Boolean binaryCommunication() {
return this.binaryCommunication;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(cursor);
out.writeOptionalBoolean(binaryCommunication);
}

@Override
Expand All @@ -94,12 +112,12 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
if (super.equals(o) == false) return false;
SqlClearCursorRequest that = (SqlClearCursorRequest) o;
return Objects.equals(cursor, that.cursor);
return Objects.equals(cursor, that.cursor) && Objects.equals(binaryCommunication, that.binaryCommunication);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), cursor);
return Objects.hash(super.hashCode(), cursor, binaryCommunication);
}

public static SqlClearCursorRequest fromXContent(XContentParser parser) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ protected TestSqlClearCursorRequest createXContextTestInstance(XContentType xCon

@Override
protected TestSqlClearCursorRequest createTestInstance() {
return new TestSqlClearCursorRequest(requestInfo, randomAlphaOfLength(100));
TestSqlClearCursorRequest result = new TestSqlClearCursorRequest(requestInfo, randomAlphaOfLength(100));
result.binaryCommunication(randomBoolean());
return result;
}

@Override
Expand All @@ -58,9 +60,11 @@ protected TestSqlClearCursorRequest mutateInstance(TestSqlClearCursorRequest ins
@SuppressWarnings("unchecked")
Consumer<TestSqlClearCursorRequest> mutator = randomFrom(
request -> request.requestInfo(randomValueOtherThan(request.requestInfo(), this::randomRequestInfo)),
request -> request.setCursor(randomValueOtherThan(request.getCursor(), SqlQueryResponseTests::randomStringCursor))
request -> request.setCursor(randomValueOtherThan(request.getCursor(), SqlQueryResponseTests::randomStringCursor)),
request -> request.binaryCommunication(randomValueOtherThan(request.binaryCommunication(), () -> randomBoolean()))
);
TestSqlClearCursorRequest newRequest = new TestSqlClearCursorRequest(instance.requestInfo(), instance.getCursor());
newRequest.binaryCommunication(instance.binaryCommunication());
mutator.accept(newRequest);
return newRequest;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.sql.proto.CoreProtocol;
import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.proto.SqlTypedParamValue;

Expand Down Expand Up @@ -61,23 +62,27 @@ public void testClearCursorRequestParser() throws IOException {
"cursor": "whatever",
"mode": "%s",
"client_id": "bla",
"version": "1.2.3"
"version": "1.2.3",
"binary_format": true
}""".formatted(randomMode), SqlClearCursorRequest::fromXContent);
assertNull(request.clientId());
assertNull(request.version());
assertEquals(randomMode, request.mode());
assertEquals("whatever", request.getCursor());
assertTrue(request.binaryCommunication());

randomMode = randomFrom(Mode.values());
request = generateRequest("""
{
"cursor": "whatever",
"mode": "%s",
"client_id": "bla"
"client_id": "bla",
"binary_format": false
}""".formatted(randomMode.toString()), SqlClearCursorRequest::fromXContent);
assertNull(request.clientId());
assertEquals(randomMode, request.mode());
assertEquals("whatever", request.getCursor());
assertFalse(request.binaryCommunication());

request = generateRequest("{\"cursor\" : \"whatever\"}", SqlClearCursorRequest::fromXContent);
assertNull(request.clientId());
Expand All @@ -94,6 +99,7 @@ public void testClearCursorRequestParser() throws IOException {
assertNull(request.version());
assertEquals(Mode.PLAIN, request.mode());
assertEquals("whatever", request.getCursor());
assertEquals(CoreProtocol.BINARY_COMMUNICATION, request.binaryCommunication());

request = generateRequest("""
{"cursor" : "whatever", "client_id" : "cANVAs"}""", SqlClearCursorRequest::fromXContent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public TestSqlClearCursorRequest(RequestInfo requestInfo, String cursor) {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
org.elasticsearch.xpack.sql.proto.SqlClearCursorRequest protoInstance = new org.elasticsearch.xpack.sql.proto.SqlClearCursorRequest(
this.getCursor(),
this.requestInfo()
this.requestInfo(),
this.binaryCommunication()
);
return SqlTestUtils.toXContentBuilder(builder, g -> Payloads.generate(g, protoInstance));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public SqlQueryResponse nextPage(String cursor) throws SQLException {
public boolean queryClose(String cursor, Mode mode) throws SQLException {
ResponseWithWarnings<SqlClearCursorResponse> response = post(
CoreProtocol.CLEAR_CURSOR_REST_ENDPOINT,
new SqlClearCursorRequest(cursor, new RequestInfo(mode)),
new SqlClearCursorRequest(cursor, new RequestInfo(mode), cfg.binaryCommunication()),
Payloads::parseClearCursorResponse
);
return response.response().isSucceeded();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ public static void generate(JsonGenerator generator, SqlClearCursorRequest reque
generator.writeStringField("mode", request.mode().toString());
writeIfValid(generator, "client_id", request.clientId());
writeIfValidAsString(generator, "version", request.version());
writeIfValid(generator, "binary_format", request.binaryCommunication());

generator.writeEndObject();
}
Expand Down Expand Up @@ -222,20 +223,15 @@ public static void generate(
if (request.pageTimeout() != CoreProtocol.PAGE_TIMEOUT) {
generator.writeStringField(PAGE_TIMEOUT_NAME, request.pageTimeout().getStringRep());
}
if (request.columnar() != null) {
generator.writeBooleanField(COLUMNAR_NAME, request.columnar());
}
writeIfValid(generator, COLUMNAR_NAME, request.columnar());
if (request.fieldMultiValueLeniency()) {
generator.writeBooleanField(FIELD_MULTI_VALUE_LENIENCY_NAME, request.fieldMultiValueLeniency());
}
if (request.indexIncludeFrozen()) {
generator.writeBooleanField(INDEX_INCLUDE_FROZEN_NAME, request.indexIncludeFrozen());
}
if (request.binaryCommunication() != null) {
generator.writeBooleanField(BINARY_FORMAT_NAME, request.binaryCommunication());
}
writeIfValid(generator, BINARY_FORMAT_NAME, request.binaryCommunication());
writeIfValid(generator, CURSOR_NAME, request.cursor());

writeIfValidAsString(generator, WAIT_FOR_COMPLETION_TIMEOUT_NAME, request.waitForCompletionTimeout(), TimeValue::getStringRep);

if (request.keepOnCompletion()) {
Expand All @@ -255,6 +251,12 @@ private static void writeIfValid(JsonGenerator generator, String name, String va
}
}

private static void writeIfValid(JsonGenerator generator, String name, Boolean value) throws IOException {
if (value != null) {
generator.writeBooleanField(name, value);
}
}

private static void writeIfValidAsString(JsonGenerator generator, String name, Object value) throws IOException {
writeIfValidAsString(generator, name, value, Object::toString);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,33 @@
public class SqlClearCursorRequest extends AbstractSqlRequest {

private final String cursor;
private final Boolean binaryCommunication;

public SqlClearCursorRequest(String cursor, RequestInfo requestInfo) {
public SqlClearCursorRequest(String cursor, RequestInfo requestInfo, Boolean binaryCommunication) {
super(requestInfo);
this.cursor = cursor;
this.binaryCommunication = binaryCommunication;
}

public String getCursor() {
return cursor;
}

public Boolean binaryCommunication() {
return binaryCommunication;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (super.equals(o) == false) return false;
SqlClearCursorRequest that = (SqlClearCursorRequest) o;
return Objects.equals(cursor, that.cursor);
return Objects.equals(cursor, that.cursor) && Objects.equals(binaryCommunication, that.binaryCommunication);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), cursor);
return Objects.hash(super.hashCode(), cursor, binaryCommunication);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,19 @@
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestResponseListener;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.sql.action.Protocol;
import org.elasticsearch.xpack.sql.action.SqlClearCursorAction;
import org.elasticsearch.xpack.sql.action.SqlClearCursorRequest;
import org.elasticsearch.xpack.sql.action.SqlClearCursorResponse;
import org.elasticsearch.xpack.sql.proto.Mode;

import java.io.IOException;
import java.util.List;
Expand All @@ -40,7 +47,18 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
sqlRequest = SqlClearCursorRequest.fromXContent(parser);
}

return channel -> client.executeLocally(SqlClearCursorAction.INSTANCE, sqlRequest, new RestToXContentListener<>(channel));
return channel -> client.executeLocally(SqlClearCursorAction.INSTANCE, sqlRequest, new RestResponseListener<>(channel) {
@Override
public RestResponse buildResponse(SqlClearCursorResponse response) throws Exception {
Boolean binaryRequest = sqlRequest.binaryCommunication();
XContentType type = Boolean.TRUE.equals(binaryRequest) || (binaryRequest == null && Mode.isDriver(sqlRequest.mode()))
? XContentType.CBOR
: XContentType.JSON;
XContentBuilder builder = channel.newBuilder(request.getXContentType(), type, false);
response.toXContent(builder, request);
return new BytesRestResponse(RestStatus.OK, builder);
}
});
}

@Override
Expand Down

0 comments on commit d6486ab

Please sign in to comment.