Skip to content

Commit

Permalink
Set target routing shard by partition key (#316)
Browse files Browse the repository at this point in the history
* Update documentation for _routing

Signed-off-by: acarbonetto <andrewc@bitquilltech.com>

* Proof of Concept: request routing shard through SQL partition

Signed-off-by: acarbonetto <andrewc@bitquilltech.com>

* Fix core refactor: StreamIO from common to core.common (#296)

* Fix core refactor: StreamIO from common to core.common

Signed-off-by: acarbonetto <andrewc@bitquilltech.com>

* Fix core refactor: StreamIO from common to core.common

Signed-off-by: acarbonetto <andrewc@bitquilltech.com>

---------

Signed-off-by: acarbonetto <andrewc@bitquilltech.com>

* Add IT tests for multi-cluster

Signed-off-by: acarbonetto <andrewc@bitquilltech.com>

* Fix cross-cluster tests

Signed-off-by: acarbonetto <andrewc@bitquilltech.com>

* Revert query parameter 'routing'

Signed-off-by: acarbonetto <andrewc@bitquilltech.com>

* Fix unit tests with partition add

Signed-off-by: acarbonetto <andrewc@bitquilltech.com>

* clean up checkstyle and add test coverage

Signed-off-by: acarbonetto <andrewc@bitquilltech.com>

* Fix IT tests

Signed-off-by: acarbonetto <andrewc@bitquilltech.com>

* Add multi-cluster search tests to build

Signed-off-by: acarbonetto <andrewc@bitquilltech.com>

* Updates for comments

Signed-off-by: acarbonetto <andrewc@bitquilltech.com>

* Update comment

Signed-off-by: acarbonetto <andrewc@bitquilltech.com>

* Fix doctests

Signed-off-by: acarbonetto <andrewc@bitquilltech.com>

---------

Signed-off-by: acarbonetto <andrewc@bitquilltech.com>
  • Loading branch information
acarbonetto committed Aug 23, 2023
1 parent 31b2b28 commit f6643f7
Show file tree
Hide file tree
Showing 51 changed files with 772 additions and 139 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/sql-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ jobs:
./gradlew :core:jacocoTestCoverageVerification || echo "* Jacoco failed for core" >> report.log
./gradlew :protocol:jacocoTestCoverageVerification || echo "* Jacoco failed for protocol" >> report.log
./gradlew :opensearch-sql-plugin:jacocoTestCoverageVerification || echo "* Jacoco failed for plugin" >> report.log
# Misc tests
# Misc/Additional Integration tests
./gradlew :integ-test:integTest || echo "* Integration test failed" >> report.log
./gradlew :integ-test:multiClusterSearch || echo "* Multi-Cluster Search tests failed" >> report.log
./gradlew :doctest:doctest || echo "* Doctest failed" >> report.log
./scripts/bwctest.sh || echo "* Backward compatibility test failed" >> report.log
Expand Down
9 changes: 6 additions & 3 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public LogicalPlan analyze(UnresolvedPlan unresolved, AnalysisContext context) {
@Override
public LogicalPlan visitRelation(Relation node, AnalysisContext context) {
QualifiedName qualifiedName = node.getTableQualifiedName();
String partitionName = node.getTablePartitionKeys();
DataSourceSchemaIdentifierNameResolver dataSourceSchemaIdentifierNameResolver
= new DataSourceSchemaIdentifierNameResolver(dataSourceService, qualifiedName.getParts());
String tableName = dataSourceSchemaIdentifierNameResolver.getIdentifierName();
Expand All @@ -156,9 +157,11 @@ public LogicalPlan visitRelation(Relation node, AnalysisContext context) {
.getDataSource(dataSourceSchemaIdentifierNameResolver.getDataSourceName())
.getStorageEngine()
.getTable(new DataSourceSchemaName(
dataSourceSchemaIdentifierNameResolver.getDataSourceName(),
dataSourceSchemaIdentifierNameResolver.getSchemaName()),
dataSourceSchemaIdentifierNameResolver.getIdentifierName());
dataSourceSchemaIdentifierNameResolver.getDataSourceName(),
dataSourceSchemaIdentifierNameResolver.getSchemaName()
),
dataSourceSchemaIdentifierNameResolver.getIdentifierName(),
partitionName);
}
table.getFieldTypes().forEach((k, v) -> curEnv.define(new Symbol(Namespace.FIELD_NAME, k), v));
table.getReservedFieldTypes().forEach(
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ public UnresolvedPlan relation(String tableName, String alias) {
return new Relation(qualifiedName(tableName), alias);
}

public UnresolvedPlan relation(String tableName, String alias, List<String> partitionKeys) {
return new Relation(qualifiedName(tableName), alias, partitionKeys);
}

public UnresolvedPlan tableFunction(List<String> functionName, UnresolvedExpression... args) {
return new TableFunction(new QualifiedName(functionName), Arrays.asList(args));
}
Expand Down
28 changes: 28 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/Relation.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,33 @@ public Relation(UnresolvedExpression tableName) {
}

public Relation(UnresolvedExpression tableName, String alias) {
this(tableName, alias, null);
}

/**
* Constructor with Partition Keys for the relation.
*
* @param tableName - name of the name relation
* @param alias - alias name for the relation
* @param partitionKeys - partition or routing keys for the relation shard
*/
public Relation(UnresolvedExpression tableName, String alias, List<String> partitionKeys) {
this.tableName = Arrays.asList(tableName);
this.alias = alias;
this.partitionKeys = partitionKeys;
}

/**
* Optional alias name for the relation.
*/
private String alias;


/**
* Optional partition key(s) for the relation.
*/
private List<String> partitionKeys;

/**
* Return table name.
*
Expand Down Expand Up @@ -88,6 +106,16 @@ public QualifiedName getTableQualifiedName() {
}
}

/**
* Retrieve the partition keys associated with the table/relation.
*
* @return TablePartitionKeys | null
*/
public String getTablePartitionKeys() {
return partitionKeys == null
? null : String.join(COMMA, partitionKeys);
}

@Override
public List<UnresolvedPlan> getChild() {
return ImmutableList.of();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import java.util.Collection;
import java.util.Collections;
import javax.annotation.Nullable;
import org.opensearch.sql.DataSourceSchemaName;
import org.opensearch.sql.expression.function.FunctionResolver;

Expand All @@ -19,7 +20,10 @@ public interface StorageEngine {
/**
* Get {@link Table} from storage engine.
*/
Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName);
Table getTable(
DataSourceSchemaName dataSourceSchemaName,
String tableName,
@Nullable String partition);

/**
* Get list of datasource related functions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.DataSourceSchemaName;
import org.opensearch.sql.analysis.symbol.Namespace;
Expand Down Expand Up @@ -55,7 +56,7 @@ protected Map<String, ExprType> typeMapping() {
}

protected StorageEngine storageEngine() {
return (dataSourceSchemaName, tableName) -> table;
return (dataSourceSchemaName, tableName, partition) -> table;
}

protected StorageEngine prometheusStorageEngine() {
Expand Down Expand Up @@ -85,7 +86,9 @@ public FunctionName getFunctionName() {
}

@Override
public Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName) {
public Table getTable(DataSourceSchemaName dataSourceSchemaName,
String tableName,
@Nullable String partition) {
return table;
}
};
Expand Down
5 changes: 4 additions & 1 deletion core/src/test/java/org/opensearch/sql/config/TestConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.opensearch.sql.DataSourceSchemaName;
import org.opensearch.sql.analysis.symbol.Namespace;
import org.opensearch.sql.analysis.symbol.Symbol;
Expand Down Expand Up @@ -66,7 +67,9 @@ public class TestConfig {
protected StorageEngine storageEngine() {
return new StorageEngine() {
@Override
public Table getTable(DataSourceSchemaName dataSourceSchemaName, String name) {
public Table getTable(DataSourceSchemaName dataSourceSchemaName,
String name,
@Nullable String partition) {
return new Table() {
@Override
public boolean exists() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class PlannerTest extends PhysicalPlanTestBase {

@BeforeEach
public void setUp() {
when(storageEngine.getTable(any(), any())).thenReturn(new MockTable());
when(storageEngine.getTable(any(), any(), any())).thenReturn(new MockTable());
}

@Test
Expand All @@ -82,7 +82,7 @@ public void planner_test() {
LogicalPlanDSL.relation("schema",
storageEngine.getTable(
new DataSourceSchemaName(DEFAULT_DATASOURCE_NAME, "default"),
"schema")),
"schema", "partition")),
DSL.equal(DSL.ref("response", INTEGER), DSL.literal(10))
),
ImmutableList.of(DSL.named("avg(response)", DSL.avg(DSL.ref("response", INTEGER)))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class StorageEngineTest {

@Test
void testFunctionsMethod() {
StorageEngine k = (dataSourceSchemaName, tableName) -> null;
StorageEngine k = (dataSourceSchemaName, tableName, partition) -> null;
Assertions.assertEquals(Collections.emptyList(), k.getFunctions());
}
}
11 changes: 11 additions & 0 deletions docs/user/dql/basics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,17 @@ SQL query::
{
"query" : "SELECT account_number FROM accounts/account"
}
Example 4: Selecting From Index using Partition Shard
-----------------------------------------------------------

You can also specify a specific shard or partition to target using a routing hash key in ``PARTITION``. You can target multiple shards by providing a list separated by commas.

SQL query::

POST /_plugins/_sql
{
"query" : "SELECT account_number FROM account PARTITION(shard1, shard2)"
}

WHERE
=====
Expand Down
22 changes: 11 additions & 11 deletions docs/user/optimization/optimization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ The consecutive Filter operator will be merged as one Filter operator::
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"range\":{\"age\":{\"from\":null,\"to\":20,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":10,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"range\":{\"age\":{\"from\":null,\"to\":20,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":10,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone=false, routingId=null)"
},
"children": []
}
Expand All @@ -71,7 +71,7 @@ The Filter operator should be push down under Sort operator::
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":null,\"to\":20,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":null,\"to\":20,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone=false, routingId=null)"
},
"children": []
}
Expand Down Expand Up @@ -102,7 +102,7 @@ The Project list will push down to Query DSL to `filter the source <https://www.
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, searchDone=false, routingId=null)"
},
"children": []
}
Expand All @@ -128,7 +128,7 @@ The Filter operator will merge into OpenSearch Query DSL::
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone=false, routingId=null)"
},
"children": []
}
Expand All @@ -154,7 +154,7 @@ The Sort operator will merge into OpenSearch Query DSL::
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone=false, routingId=null)"
},
"children": []
}
Expand Down Expand Up @@ -188,7 +188,7 @@ Because the OpenSearch Script Based Sorting can't handle NULL/MISSING value, the
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\"}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\"}, searchDone=false, routingId=null)"
},
"children": []
}
Expand Down Expand Up @@ -216,7 +216,7 @@ The Limit operator will merge in OpenSearch Query DSL::
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":5,\"size\":10,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":5,\"size\":10,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, searchDone=false, routingId=null)"
},
"children": []
}
Expand Down Expand Up @@ -257,7 +257,7 @@ If sort that includes expression, which cannot be merged into query DSL, also ex
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\"}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\"}, searchDone=false, routingId=null)"
},
"children": []
}
Expand Down Expand Up @@ -287,7 +287,7 @@ The Aggregation operator will merge into OpenSearch Aggregation::
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg(age)\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg(age)\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone=false, routingId=null)"
},
"children": []
}
Expand All @@ -313,7 +313,7 @@ The Sort operator will merge into OpenSearch Aggregation.::
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"missing_order\":\"last\",\"order\":\"desc\"}}}]},\"aggregations\":{\"avg(age)\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"missing_order\":\"last\",\"order\":\"desc\"}}}]},\"aggregations\":{\"avg(age)\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone=false, routingId=null)"
},
"children": []
}
Expand Down Expand Up @@ -348,7 +348,7 @@ Because the OpenSearch Composite Aggregation doesn't support order by metrics fi
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg(age)\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg(age)\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone=false, routingId=null)"
},
"children": []
}
Expand Down
2 changes: 1 addition & 1 deletion docs/user/ppl/interfaces/endpoint.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ The following PPL query demonstrated that where and stats command were pushed do
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":10,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"avg(age)\":{\"avg\":{\"field\":\"age\"}}}}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":10,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"avg(age)\":{\"avg\":{\"field\":\"age\"}}}}, searchDone=false, routingId=null)"
},
"children": []
}
Expand Down
Loading

0 comments on commit f6643f7

Please sign in to comment.