Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[multistage] Make Intermediate Stage Worker Assignment Tenant Aware #10617

Merged
merged 2 commits into from
Apr 19, 2023

Conversation

vvivekiyer
Copy link
Contributor

@vvivekiyer vvivekiyer commented Apr 14, 2023

In OSS issue #10605, it was discovered that the worker assignment for Intermediate Stages in a multistage query is not tenant aware. We pick servers from the entire cluster which could mean that the query gets executed on a totally unrelated server tenant.

This PR addresses above issue.

Notes

  1. With this PR, the candidate servers for Intermediate Stage Assignment is the list of all servers belonging to the server tenants from all the tables in the query. Decided to go with this approach because the server tenants for 2 tables could be disjoint. We currently assign all the servers for intermediate stage processing. However, if we decide to implement a smarter strategy for worker assignment in the future, it could be impossible to pick a common tenant belonging to all the tables in the query.
  2. Created a new data structure in BrokerRoutingManager to store the server hosts for each table's tenant.

Additional changes in the PR
After discussing with @tibrewalpratik17 and @ankitsultana, removed the restriction enforced in #10336 where a multistage query should have at least one common server tenant. This restriction might not work when the tables in the query belong to disjoint server tenants.

Testing
I have manually tested the changes on our prod deployment. I'll figure out how to add unit tests as the review is in-progress.

@vvivekiyer vvivekiyer force-pushed the tenant_aware_routing branch 3 times, most recently from 5a2b322 to 2ff0124 Compare April 14, 2023 18:49
@vvivekiyer vvivekiyer marked this pull request as ready for review April 14, 2023 18:53
@codecov-commenter
Copy link

codecov-commenter commented Apr 14, 2023

Codecov Report

Merging #10617 (42fcd03) into master (f2afe21) will decrease coverage by 1.43%.
The diff coverage is 86.51%.

@@             Coverage Diff              @@
##             master   #10617      +/-   ##
============================================
- Coverage     70.31%   68.89%   -1.43%     
  Complexity     6495     6495              
============================================
  Files          2106     2106              
  Lines        113381   113444      +63     
  Branches      17090    17097       +7     
============================================
- Hits          79728    78157    -1571     
- Misses        28059    29787    +1728     
+ Partials       5594     5500      -94     
Flag Coverage Δ
integration1 ?
integration2 23.87% <41.57%> (-0.09%) ⬇️
unittests1 67.88% <90.90%> (+<0.01%) ⬆️
unittests2 13.88% <25.84%> (+0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...requesthandler/MultiStageBrokerRequestHandler.java 15.67% <0.00%> (-47.30%) ⬇️
...t/controller/api/resources/PinotQueryResource.java 41.47% <0.00%> (-19.40%) ⬇️
...ache/pinot/query/planner/logical/StagePlanner.java 94.33% <ø> (ø)
.../org/apache/pinot/query/routing/WorkerManager.java 86.86% <88.00%> (-0.32%) ⬇️
...che/pinot/broker/routing/BrokerRoutingManager.java 73.61% <90.24%> (-12.14%) ⬇️
.../java/org/apache/pinot/query/QueryEnvironment.java 93.47% <93.33%> (+0.70%) ⬆️
...uery/planner/physical/DispatchablePlanContext.java 100.00% <100.00%> (ø)
...uery/planner/physical/DispatchablePlanVisitor.java 100.00% <100.00%> (ø)

... and 146 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@siddharthteotia
Copy link
Contributor

siddharthteotia commented Apr 14, 2023

I wonder how can the decision always be random ? I mean shouldn't it be deterministic otherwise it won't even work ?

For example, if the intermediate stage is running BROADCAST JOIN on tables that are in 2 different tenants, then the leaf stage sender should be sending (broadcasting) the right table T1 from tenant1 to instances of left table T2 from tenant2. So here it is clear that intermediate stage (that will run JOIN) will run on the enabled instances of tenant2 ? Is this scenario handled ?

@siddharthteotia
Copy link
Contributor

More generally speaking -- For the scenario where tables from different tenants (with disjoint instances) are participating in the intermediary stage and it is not clear to the planner where to set up that stage, can we pick a fair share of hosts from each tenant rather that doing complete random otherwise it may end up running completely on the set of hosts belonging to one of the tenants ?

@vvivekiyer vvivekiyer force-pushed the tenant_aware_routing branch 2 times, most recently from 9c9f7f5 to 94a5dc0 Compare April 14, 2023 22:04
@vvivekiyer
Copy link
Contributor Author

@siddharthteotia The current WorkerAssignment strategy assigns all the servers for intermediate stage processing. The only case where we assign a random server is for SINGLETON (eg: empty over())

Just to clarify, this PR doesn't modify the intermediate stage's worker assignment strategy. This PR ensures that server hosts outside the tables' tenants are not considered when the worker assignment is done.

@siddharthteotia
Copy link
Contributor

@siddharthteotia The current WorkerAssignment strategy assigns all the servers for intermediate stage processing. The only case where we assign a random server is for SINGLETON (eg: empty over())

Just to clarify, this PR doesn't modify the intermediate stage's worker assignment strategy. This PR ensures that server hosts outside the tables' tenants are not considered when the worker assignment is done.

Yes I am aligned on the scope of this PR. What I was asking especially for the broadcast join scenario must be done as a follow-up to improve the strategy itself imo.

LOGGER.info("Iterating for table={}, tag={}, server_instance_id={}, host={}", tableNameWithType, tableServerTag,
instanceId, serverInstance.getHostname());

if (tags.contains(tableServerTag) && !tenantServerMap.containsKey(instanceId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be tags should be converted to a Set ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the 2nd check be the first check ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed order of conditions. I think having a list is fine since that's how helix function returns? Highly unlikely that an instance is configured with duplicate tags. And correctness wise, it shouldn't be a problem.

@@ -183,4 +197,34 @@ private RoutingTable getRoutingTable(String tableName, TableType tableType, long
return _routingManager.getRoutingTable(
CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + tableNameWithType), requestId);
}

private boolean isLeafStage(StageMetadata stageMetadata) {
return stageMetadata.getScannedTables().size() == 1;
Copy link
Contributor

@siddharthteotia siddharthteotia Apr 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a reliable indicator. May be true for now but unlikely to give the right answer as functionality evolves.

Consider a standard GROUP BY query run in multi stage engine where the intermediary stage also processes the same 1 table after hash exchange. Even there the stageMetadata will indicate single table.

Also, does getScannedTables() actually refer to true scan of the tables -- which will only happen on leaf stage.

Since the non leaf stages will always work off blocks and thus getScannedTables() will return empty, may be one way to indicate leaf v/s non-leaf is if getScannedTables().size() > 0 ? It depends on the implementation of getScannedTables.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also have stages terminate in a value node in which case there wouldn't be any scanned tables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that we need a better way to figure out whether it is a leaf stage or not.

We can also have stages terminate in a value node in which case there wouldn't be any scanned tables.

True, we actually treat these stages as intermediate stages today.

Note that this PR only moved this check into a function. I'm leaving a TODO to address this.

@@ -102,11 +104,16 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle
private final ServerRoutingStatsManager _serverRoutingStatsManager;
private final PinotConfiguration _pinotConfig;

// Map that contains the tableNameWithType as key and the enabled serverInstances that are tagged with the table's
// tenant.
private final Map<String, Map<String, ServerInstance>> _tableTenantServersMap = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible / worth it to invoke the new code path that works on this map only if feature flag of multi stage query engine is enabled ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, using the condition in many places might make the code unreadable. I would prefer not having it behind the condition. Thoughts?

@@ -258,25 +261,6 @@ private String getCommonBrokerTenant(List<String> tableNames) {
return (String) (tableBrokers.toArray()[0]);
}

// return the serverTenant if all tables point to the same broker, else returns null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why delete this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As stated in the description of PR, it's not the right condition to enforce. We could have tables that are hosted in disjoint server tenants.

After discussing with the authors, removed the restriction enforced in #10336 where a multistage query should have at least one common server tenant. This restriction might not work when the tables in the query belong to disjoint server tenants.

Set<String> tableNames = new HashSet<>();
List<String> qualifiedTableNames = RelOptUtil.findAllTableQualifiedNames(relRoot);
for (String qualifiedTableName : qualifiedTableNames) {
String tableName = qualifiedTableName.replaceAll("^\\[(.*)\\]$", "$1");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate why this regex is needed ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good callout. I added a comment in the code explaining why it is needed.

@siddharthteotia siddharthteotia merged commit 18adba0 into apache:master Apr 19, 2023
@vvivekiyer vvivekiyer deleted the tenant_aware_routing branch April 19, 2023 17:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants