Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set target routing shard by partition key #316

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/sql-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ jobs:
./gradlew :core:jacocoTestCoverageVerification || echo "* Jacoco failed for core" >> report.log
./gradlew :protocol:jacocoTestCoverageVerification || echo "* Jacoco failed for protocol" >> report.log
./gradlew :opensearch-sql-plugin:jacocoTestCoverageVerification || echo "* Jacoco failed for plugin" >> report.log
# Misc tests
# Misc/Additional Integration tests
./gradlew :integ-test:integTest || echo "* Integration test failed" >> report.log
./gradlew :integ-test:multiClusterSearch || echo "* Multi-Cluster Search tests failed" >> report.log
./gradlew :doctest:doctest || echo "* Doctest failed" >> report.log
./scripts/bwctest.sh || echo "* Backward compatibility test failed" >> report.log

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public LogicalPlan analyze(UnresolvedPlan unresolved, AnalysisContext context) {
@Override
public LogicalPlan visitRelation(Relation node, AnalysisContext context) {
QualifiedName qualifiedName = node.getTableQualifiedName();
String partitionName = node.getTablePartitionKeys();
DataSourceSchemaIdentifierNameResolver dataSourceSchemaIdentifierNameResolver
= new DataSourceSchemaIdentifierNameResolver(dataSourceService, qualifiedName.getParts());
String tableName = dataSourceSchemaIdentifierNameResolver.getIdentifierName();
Expand All @@ -156,9 +157,11 @@ public LogicalPlan visitRelation(Relation node, AnalysisContext context) {
.getDataSource(dataSourceSchemaIdentifierNameResolver.getDataSourceName())
.getStorageEngine()
.getTable(new DataSourceSchemaName(
dataSourceSchemaIdentifierNameResolver.getDataSourceName(),
dataSourceSchemaIdentifierNameResolver.getSchemaName()),
dataSourceSchemaIdentifierNameResolver.getIdentifierName());
dataSourceSchemaIdentifierNameResolver.getDataSourceName(),
dataSourceSchemaIdentifierNameResolver.getSchemaName()
),
dataSourceSchemaIdentifierNameResolver.getIdentifierName(),
partitionName);
}
table.getFieldTypes().forEach((k, v) -> curEnv.define(new Symbol(Namespace.FIELD_NAME, k), v));
table.getReservedFieldTypes().forEach(
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ public UnresolvedPlan relation(String tableName, String alias) {
return new Relation(qualifiedName(tableName), alias);
}

public UnresolvedPlan relation(String tableName, String alias, List<String> partitionKeys) {
return new Relation(qualifiedName(tableName), alias, partitionKeys);
}

public UnresolvedPlan tableFunction(List<String> functionName, UnresolvedExpression... args) {
return new TableFunction(new QualifiedName(functionName), Arrays.asList(args));
}
Expand Down
28 changes: 28 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/Relation.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,33 @@ public Relation(UnresolvedExpression tableName) {
}

public Relation(UnresolvedExpression tableName, String alias) {
this(tableName, alias, null);
}

/**
* Constructor with Partition Keys for the relation.
*
* @param tableName - name of the name relation
* @param alias - alias name for the relation
* @param partitionKeys - partition or routing keys for the relation shard
*/
public Relation(UnresolvedExpression tableName, String alias, List<String> partitionKeys) {
this.tableName = Arrays.asList(tableName);
this.alias = alias;
this.partitionKeys = partitionKeys;
}

/**
* Optional alias name for the relation.
*/
private String alias;


/**
* Optional partition key(s) for the relation.
*/
private List<String> partitionKeys;

/**
* Return table name.
*
Expand Down Expand Up @@ -88,6 +106,16 @@ public QualifiedName getTableQualifiedName() {
}
}

/**
* Retrieve the partition keys associated with the table/relation.
*
* @return TablePartitionKeys | null
*/
public String getTablePartitionKeys() {
return partitionKeys == null
? null : String.join(COMMA, partitionKeys);
}

@Override
public List<UnresolvedPlan> getChild() {
return ImmutableList.of();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import java.util.Collection;
import java.util.Collections;
import javax.annotation.Nullable;
import org.opensearch.sql.DataSourceSchemaName;
import org.opensearch.sql.expression.function.FunctionResolver;

Expand All @@ -19,7 +20,10 @@ public interface StorageEngine {
/**
* Get {@link Table} from storage engine.
*/
Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName);
Table getTable(
DataSourceSchemaName dataSourceSchemaName,
String tableName,
@Nullable String partition);

/**
* Get list of datasource related functions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.DataSourceSchemaName;
import org.opensearch.sql.analysis.symbol.Namespace;
Expand Down Expand Up @@ -55,7 +56,7 @@ protected Map<String, ExprType> typeMapping() {
}

protected StorageEngine storageEngine() {
return (dataSourceSchemaName, tableName) -> table;
return (dataSourceSchemaName, tableName, partition) -> table;
}

protected StorageEngine prometheusStorageEngine() {
Expand Down Expand Up @@ -85,7 +86,9 @@ public FunctionName getFunctionName() {
}

@Override
public Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName) {
public Table getTable(DataSourceSchemaName dataSourceSchemaName,
String tableName,
@Nullable String partition) {
return table;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.opensearch.sql.DataSourceSchemaName;
import org.opensearch.sql.analysis.symbol.Namespace;
import org.opensearch.sql.analysis.symbol.Symbol;
Expand Down Expand Up @@ -66,7 +67,9 @@ public class TestConfig {
protected StorageEngine storageEngine() {
return new StorageEngine() {
@Override
public Table getTable(DataSourceSchemaName dataSourceSchemaName, String name) {
public Table getTable(DataSourceSchemaName dataSourceSchemaName,
String name,
@Nullable String partition) {
return new Table() {
@Override
public boolean exists() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class PlannerTest extends PhysicalPlanTestBase {

@BeforeEach
public void setUp() {
when(storageEngine.getTable(any(), any())).thenReturn(new MockTable());
when(storageEngine.getTable(any(), any(), any())).thenReturn(new MockTable());
}

@Test
Expand All @@ -82,7 +82,7 @@ public void planner_test() {
LogicalPlanDSL.relation("schema",
storageEngine.getTable(
new DataSourceSchemaName(DEFAULT_DATASOURCE_NAME, "default"),
"schema")),
"schema", "partition")),
DSL.equal(DSL.ref("response", INTEGER), DSL.literal(10))
),
ImmutableList.of(DSL.named("avg(response)", DSL.avg(DSL.ref("response", INTEGER)))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class StorageEngineTest {

@Test
void testFunctionsMethod() {
StorageEngine k = (dataSourceSchemaName, tableName) -> null;
StorageEngine k = (dataSourceSchemaName, tableName, partition) -> null;
Assertions.assertEquals(Collections.emptyList(), k.getFunctions());
}
}
11 changes: 11 additions & 0 deletions docs/user/dql/basics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,17 @@ SQL query::
{
"query" : "SELECT account_number FROM accounts/account"
}
Example 4: Selecting From Index using Partition Shard

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when PARTITION is provided for another data source, like Prometheus or Spark?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They will need to implement a sharing or partition option separately. Most datasources would have such an option.

-----------------------------------------------------------

You can also specify a specific shard or partition to target using a routing hash key in ``PARTITION``. You can target multiple shards by providing a list separated by commas.

SQL query::

POST /_plugins/_sql
{
"query" : "SELECT account_number FROM account PARTITION(shard1, shard2)"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be confusing to use PARTITION an existing operation in SQL for this.

What about introducing a general notion of data source options, keeping them as key-value pairs in Relation that get analyzed by each StorageEngine.

This would allow other data sources to benefit from this and avoid question about features that only apply to OpenSearch.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exactly what the PARTITION in SQL is designed for. Also, the proposal in the issue suggested using this syntax.

I DO like your idea, but I'd rather stick with well-understood SQL syntax than introduce something new.

}

WHERE
=====
Expand Down
22 changes: 11 additions & 11 deletions docs/user/optimization/optimization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ The consecutive Filter operator will be merged as one Filter operator::
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"range\":{\"age\":{\"from\":null,\"to\":20,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":10,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"range\":{\"age\":{\"from\":null,\"to\":20,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":10,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone=false, routingId=null)"
},
"children": []
}
Expand All @@ -71,7 +71,7 @@ The Filter operator should be push down under Sort operator::
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":null,\"to\":20,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":null,\"to\":20,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone=false, routingId=null)"
},
"children": []
}
Expand Down Expand Up @@ -102,7 +102,7 @@ The Project list will push down to Query DSL to `filter the source <https://www.
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, searchDone=false, routingId=null)"
},
"children": []
}
Expand All @@ -128,7 +128,7 @@ The Filter operator will merge into OpenSearch Query DSL::
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone=false, routingId=null)"
},
"children": []
}
Expand All @@ -154,7 +154,7 @@ The Sort operator will merge into OpenSearch Query DSL::
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone=false, routingId=null)"
},
"children": []
}
Expand Down Expand Up @@ -188,7 +188,7 @@ Because the OpenSearch Script Based Sorting can't handle NULL/MISSING value, the
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\"}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\"}, searchDone=false, routingId=null)"
},
"children": []
}
Expand Down Expand Up @@ -216,7 +216,7 @@ The Limit operator will merge in OpenSearch Query DSL::
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":5,\"size\":10,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":5,\"size\":10,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, searchDone=false, routingId=null)"
},
"children": []
}
Expand Down Expand Up @@ -257,7 +257,7 @@ If sort that includes expression, which cannot be merged into query DSL, also ex
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\"}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\"}, searchDone=false, routingId=null)"
},
"children": []
}
Expand Down Expand Up @@ -287,7 +287,7 @@ The Aggregation operator will merge into OpenSearch Aggregation::
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg(age)\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg(age)\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone=false, routingId=null)"
},
"children": []
}
Expand All @@ -313,7 +313,7 @@ The Sort operator will merge into OpenSearch Aggregation.::
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"missing_order\":\"last\",\"order\":\"desc\"}}}]},\"aggregations\":{\"avg(age)\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"missing_order\":\"last\",\"order\":\"desc\"}}}]},\"aggregations\":{\"avg(age)\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone=false, routingId=null)"
},
"children": []
}
Expand Down Expand Up @@ -348,7 +348,7 @@ Because the OpenSearch Composite Aggregation doesn't support order by metrics fi
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg(age)\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg(age)\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone=false, routingId=null)"
},
"children": []
}
Expand Down
2 changes: 1 addition & 1 deletion docs/user/ppl/interfaces/endpoint.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ The following PPL query demonstrated that where and stats command were pushed do
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":10,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"avg(age)\":{\"avg\":{\"field\":\"age\"}}}}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":10,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"avg(age)\":{\"avg\":{\"field\":\"age\"}}}}, searchDone=false, routingId=null)"
},
"children": []
}
Expand Down
Loading
Loading