Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add code to wait for segments generated to be loaded on historicals #14322

Merged
merged 26 commits into from
Sep 6, 2023

Conversation

adarshsanjeev
Copy link
Contributor

@adarshsanjeev adarshsanjeev commented May 22, 2023

Currently, after an MSQ query, the web console is responsible for waiting for the segments to load. It does so by checking if there are any segments loading into the datasource ingested into, which can cause some issues, like in cases where the segments would never be loaded, or would end up waiting for other ingests as well.

This PR shifts this responsibility from the web console to the controller, which would have the list of segments created.
This PR also introduces a new field in the reports, that provides a realtime update of the current progress of waiting for the task.

  • The controller will release locks before waiting for the segments so that it does not block any other tasks.
  • If an exception is thrown while waiting, this is only logged, and will not fail the entire task.
 "segmentLoadStatus": {
          "state": "SUCCESS",
          "dataSource": "kttm_simple",
          "startTime": "2022-09-14T23:12:09.266Z",
          "duration": 15,
          "totalSegments": 1,
          "usedSegments": 1,
          "precachedSegments": 0,
          "onDemandSegments": 0,
          "pendingSegments": 0,
          "unknownSegments": 0
        }

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@adarshsanjeev adarshsanjeev changed the title [WIP][Do not merge] Add code to wait for segments generated to be loaded on historicals Add code to wait for segments generated to be loaded on historicals Jun 13, 2023
@cryptoe cryptoe added the Needs web console change Backend API changes that would benefit from frontend support in the web console label Jul 8, 2023
Comment on lines 60 to 75
Injector injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(), ImmutableList.<Module>of(
binder -> {
JsonConfigProvider.bindInstance(
binder,
Key.get(DruidNode.class, Self.class),
node
);
binder.bind(Integer.class).annotatedWith(Names.named("port")).toInstance(node.getPlaintextPort());
binder.bind(JettyServerInitializer.class).to(DruidLeaderClientTest.TestJettyServerInitializer.class).in(
LazySingleton.class);
Jerseys.addResource(binder, DruidLeaderClientTest.SimpleResource.class);
LifecycleModule.register(binder, Server.class);
}
)
);

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note test

Invoking
Initialization.makeInjectorWithModules
should be avoided because it has been deprecated.
Copy link
Contributor

@LakshSingla LakshSingla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented a few line items on the PR. Also, can you please use DruidException class for errors that might get surfaced to the user, either due to their fault or ours? (i.e. all the exceptions that aren't caught and rethrown).

{
StringFullResponseHandler responseHandler = new StringFullResponseHandler(StandardCharsets.UTF_8);

for (int counter = 0; counter < MAX_RETRIES; counter++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should wait for a while before retrying. This would prevent bombarding the Broker with requests in a short span of time, and also allow any transient failures to auto-resolve before sending another request. We should also have a back-off strategy here.

Consider refactoring it to RetryUtils.retry which does it for us.

}
catch (IOException | ChannelException ex) {
// can happen if the node is stopped.
log.warn(ex, "Request [%s] failed.", request.getUrl());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would log it after each retry. Since the retries are happening in a short span, there's a high likelihood that we would be posting the same stack over and over. This should log once after all the retries are exhausted. If you refactor it to RetryUtils, I think it also handles that for you.

public static String pickOneHost(DruidNodeDiscovery druidNodeDiscovery)
{
Iterator<DiscoveryDruidNode> iter = druidNodeDiscovery.getAllNodes().iterator();
if (iter.hasNext()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to me that this would have the affinity of picking the first broker node all the time. It would be better if we choose this in a round-robin fashion or at random. Is there any pre-existing code that gives a server at random, seems like this would be a common use case?

Copy link
Contributor

@cryptoe cryptoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Left some comments.

@@ -257,7 +257,15 @@ The response shows an example report for a query.
"startTime": "2022-09-14T22:12:09.266Z",
"durationMs": 28227,
"pendingTasks": 0,
"runningTasks": 2
"runningTasks": 2,
"segmentLoadWaiterStatus": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: segmentLoadStatus?
What is this start time ?
How would segments which match a drop rule get communicated to the console?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startTime is the time at which we started checking the load status. That with duration would give us a clear idea of when it started and when it ended, and this is the structure for other MSQ stages.

private static final long SLEEP_DURATION_MILLIS = TimeUnit.SECONDS.toMillis(5);
private static final long TIMEOUT_DURATION_MILLIS = TimeUnit.MINUTES.toMillis(10);
private static final String LOAD_QUERY = "SELECT COUNT(*) AS totalSegments,\n"
+ "COUNT(*) FILTER (WHERE is_available = 0 AND is_published = 1 AND replication_factor != 0) AS loadingSegments\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the replication factor filter needed here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replication factor is used to filter out cold segments. Even if a cold segment is unavailable, we don't wait for it.

performSegmentPublish(
context.taskActionClient(),
SegmentTransactionalInsertAction.overwriteAction(null, null, segmentsWithTombstones)
);
}
} else if (!segments.isEmpty()) {
Set<String> versionsToAwait = segments.stream().map(DataSegment::getVersion).collect(Collectors.toSet());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There would always be one version rite ?
Can we add a check here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this initially as well, but looking into the code, we actually generate the version based on the lock we acquire for the segment. So if there are multiple intervals we are replacing into and therefore multiple locks, we would have more than one version.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh i did not know that. Thanks for explaining the rational. We could also document this as well :)

* <br>
* Only expected to be called from the main controller thread.
*/
public void waitForSegmentsToLoad()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would want the experience on the console to be realtime ie like how counters currently work so that the console can render the waiting segment status progress to the end user.

With this approach the main thread looks to be blocked until the segment loading is complete and the info is only included in the task report once the call returns?
You could add a method which check if the segment loading is complete. If not get the status of loading and write it in the task report.
We would want to do this until the segment loading is completed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, the controller thread updates the status periodically without blocking. This status is included from the liveReports() in the controller. I have tested this out as well and the realtime information is available to the console

Copy link
Contributor

@cryptoe cryptoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments. Overall LGTM!

"totalSegments": 1,
"usedSegments": 1,
"precachedSegments": 0,
"asyncOnlySegments": 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"asyncOnlySegments": 0,
"onDemandSegments": 0,

Async only seems weird to me since its ties the execution mode to segment.

// If successful and there are segments created, segmentLoadWaiter should wait for them to become available.
segmentLoadWaiter.waitForSegmentsToLoad();
try {
final List<TaskLock> locks = context.taskActionClient().submit(new LockListAction());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this can go in a separate method with Exception Handling clearly mentioning unable to release locks.

private static final long SLEEP_DURATION_MILLIS = TimeUnit.SECONDS.toMillis(5);
private static final long TIMEOUT_DURATION_MILLIS = TimeUnit.MINUTES.toMillis(10);
private static final String LOAD_QUERY = "SELECT COUNT(*) AS totalSegments,\n"
+ "COUNT(*) FILTER (WHERE is_available = 0 AND is_published = 1 AND replication_factor != 0) AS loadingSegments\n"
private static final String LOAD_QUERY = "SELECT COUNT(*) AS usedSegments,\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably a writeup here of what each replication_factor means and link to : #14403 would be helpful.

That PR might needs an updated description though .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added


public enum State
{
INIT,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add more dev notes to this enum.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

HttpResponseStatus responseStatus = fullResponseHolder.getResponse().getStatus();
if (HttpResponseStatus.SERVICE_UNAVAILABLE.equals(responseStatus)
|| HttpResponseStatus.GATEWAY_TIMEOUT.equals(responseStatus)) {
throw new IOE(StringUtils.format("Request to broker failed due to failed response status: [%s]", responseStatus));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DruidException defensive exceptions here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be defensive? These would be thrown if the broker is not reachable which is a condition that can happen without any bugs in the code, right?

@cryptoe cryptoe merged commit 959148a into apache:master Sep 6, 2023
12 checks passed
@cryptoe cryptoe added the Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 label Sep 6, 2023
cryptoe added a commit that referenced this pull request Sep 22, 2023
…oad (#15000)

With PR #14322 , MSQ insert/Replace q's will wait for segment to be loaded on the historical's before finishing.

The patch introduces a bug where in the main thread had a thread.sleep() which could not be interrupted via the cancel calls from the overlord.

This new patch addressed that problem by moving the thread.sleep inside a thread of its own. Thus the main thread is now waiting on the future object of this execution.

The cancel call can now shutdown the executor service via another method thus unblocking the main thread to proceed.
abhishekagarwal87 pushed a commit that referenced this pull request Oct 11, 2023
This relies on the work done in #14322 and #15076. It allows the user to set waitTillSegmentsLoad in the query context (if they want, else it defaults to true) and shows the results in the UI :
@LakshSingla LakshSingla added this to the 28.0 milestone Oct 12, 2023
LakshSingla pushed a commit to LakshSingla/druid that referenced this pull request Oct 14, 2023
This relies on the work done in apache#14322 and apache#15076. It allows the user to set waitTillSegmentsLoad in the query context (if they want, else it defaults to true) and shows the results in the UI :
LakshSingla added a commit that referenced this pull request Oct 16, 2023
This relies on the work done in #14322 and #15076. It allows the user to set waitTillSegmentsLoad in the query context (if they want, else it defaults to true) and shows the results in the UI :

Co-authored-by: Sébastien <sebastien@imply.io>
ektravel pushed a commit to ektravel/druid that referenced this pull request Oct 16, 2023
This relies on the work done in apache#14322 and apache#15076. It allows the user to set waitTillSegmentsLoad in the query context (if they want, else it defaults to true) and shows the results in the UI :
CaseyPan pushed a commit to CaseyPan/druid that referenced this pull request Nov 17, 2023
This relies on the work done in apache#14322 and apache#15076. It allows the user to set waitTillSegmentsLoad in the query context (if they want, else it defaults to true) and shows the results in the UI :
@vogievetsky vogievetsky removed the Needs web console change Backend API changes that would benefit from frontend support in the web console label Dec 14, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area - Documentation Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants