-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[multistage] Make Intermediate Stage Worker Assignment Tenant Aware #10617
[multistage] Make Intermediate Stage Worker Assignment Tenant Aware #10617
Conversation
5a2b322
to
2ff0124
Compare
Codecov Report
@@ Coverage Diff @@
## master #10617 +/- ##
============================================
- Coverage 70.31% 68.89% -1.43%
Complexity 6495 6495
============================================
Files 2106 2106
Lines 113381 113444 +63
Branches 17090 17097 +7
============================================
- Hits 79728 78157 -1571
- Misses 28059 29787 +1728
+ Partials 5594 5500 -94
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 146 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
I wonder how can the decision always be random ? I mean shouldn't it be deterministic otherwise it won't even work ? For example, if the intermediate stage is running |
More generally speaking -- For the scenario where tables from different tenants (with disjoint instances) are participating in the intermediary stage and it is not clear to the planner where to set up that stage, can we pick a fair share of hosts from each tenant rather that doing complete random otherwise it may end up running completely on the set of hosts belonging to one of the tenants ? |
9c9f7f5
to
94a5dc0
Compare
@siddharthteotia The current WorkerAssignment strategy assigns all the servers for intermediate stage processing. The only case where we assign a random server is for SINGLETON (eg: empty over()) Just to clarify, this PR doesn't modify the intermediate stage's worker assignment strategy. This PR ensures that server hosts outside the tables' tenants are not considered when the worker assignment is done. |
Yes I am aligned on the scope of this PR. What I was asking especially for the broadcast join scenario must be done as a follow-up to improve the strategy itself imo. |
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
Outdated
Show resolved
Hide resolved
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
Outdated
Show resolved
Hide resolved
LOGGER.info("Iterating for table={}, tag={}, server_instance_id={}, host={}", tableNameWithType, tableServerTag, | ||
instanceId, serverInstance.getHostname()); | ||
|
||
if (tags.contains(tableServerTag) && !tenantServerMap.containsKey(instanceId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be tags
should be converted to a Set
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the 2nd check be the first check ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed order of conditions. I think having a list is fine since that's how helix function returns? Highly unlikely that an instance is configured with duplicate tags. And correctness wise, it shouldn't be a problem.
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
Show resolved
Hide resolved
@@ -183,4 +197,34 @@ private RoutingTable getRoutingTable(String tableName, TableType tableType, long | |||
return _routingManager.getRoutingTable( | |||
CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + tableNameWithType), requestId); | |||
} | |||
|
|||
private boolean isLeafStage(StageMetadata stageMetadata) { | |||
return stageMetadata.getScannedTables().size() == 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is a reliable indicator. May be true for now but unlikely to give the right answer as functionality evolves.
Consider a standard GROUP BY query run in multi stage engine where the intermediary stage also processes the same 1 table after hash exchange. Even there the stageMetadata will indicate single table.
Also, does getScannedTables()
actually refer to true scan of the tables -- which will only happen on leaf stage.
Since the non leaf stages will always work off blocks and thus getScannedTables()
will return empty, may be one way to indicate leaf v/s non-leaf is if getScannedTables().size() > 0
? It depends on the implementation of getScannedTables
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also have stages terminate in a value node in which case there wouldn't be any scanned tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed that we need a better way to figure out whether it is a leaf stage or not.
We can also have stages terminate in a value node in which case there wouldn't be any scanned tables.
True, we actually treat these stages as intermediate stages today.
Note that this PR only moved this check into a function. I'm leaving a TODO to address this.
@@ -102,11 +104,16 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle | |||
private final ServerRoutingStatsManager _serverRoutingStatsManager; | |||
private final PinotConfiguration _pinotConfig; | |||
|
|||
// Map that contains the tableNameWithType as key and the enabled serverInstances that are tagged with the table's | |||
// tenant. | |||
private final Map<String, Map<String, ServerInstance>> _tableTenantServersMap = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible / worth it to invoke the new code path that works on this map only if feature flag of multi stage query engine is enabled ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, using the condition in many places might make the code unreadable. I would prefer not having it behind the condition. Thoughts?
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
Outdated
Show resolved
Hide resolved
@@ -258,25 +261,6 @@ private String getCommonBrokerTenant(List<String> tableNames) { | |||
return (String) (tableBrokers.toArray()[0]); | |||
} | |||
|
|||
// return the serverTenant if all tables point to the same broker, else returns null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why delete this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As stated in the description of PR, it's not the right condition to enforce. We could have tables that are hosted in disjoint server tenants.
After discussing with the authors, removed the restriction enforced in #10336 where a multistage query should have at least one common server tenant. This restriction might not work when the tables in the query belong to disjoint server tenants.
Set<String> tableNames = new HashSet<>(); | ||
List<String> qualifiedTableNames = RelOptUtil.findAllTableQualifiedNames(relRoot); | ||
for (String qualifiedTableName : qualifiedTableNames) { | ||
String tableName = qualifiedTableName.replaceAll("^\\[(.*)\\]$", "$1"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate why this regex is needed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good callout. I added a comment in the code explaining why it is needed.
4d3a472
to
42fcd03
Compare
In OSS issue #10605, it was discovered that the worker assignment for Intermediate Stages in a multistage query is not tenant aware. We pick servers from the entire cluster which could mean that the query gets executed on a totally unrelated server tenant.
This PR addresses above issue.
Notes
Additional changes in the PR
After discussing with @tibrewalpratik17 and @ankitsultana, removed the restriction enforced in #10336 where a multistage query should have at least one common server tenant. This restriction might not work when the tables in the query belong to disjoint server tenants.
Testing
I have manually tested the changes on our prod deployment. I'll figure out how to add unit tests as the review is in-progress.