Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shuffle metrics for parallel indexing #10359

Merged
merged 10 commits into from
Oct 11, 2020

Conversation

jihoonson
Copy link
Contributor

@jihoonson jihoonson commented Sep 4, 2020

Description

Part of #10352. This PR adds these metrics for middleManagers. These metrics have the supervisorTaskId as their dimension.

  • ingest/shuffle/bytes: Number of bytes shuffled per emissionPeriod.
  • ingest/shuffle/requests: Number of shuffle requests per emissionPeriod.

I haven't updated document yet, will add them with missing shuffle configurations together in a follow-up PR.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

.computeIfAbsent(supervisorTaskId, k -> new PerDatasourceShuffleMetrics()).accumulate(fileLength);
}

public Map<String, PerDatasourceShuffleMetrics> snapshot()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be renamed to snapshotAndReset or may be just reset ?

public Map<String, PerDatasourceShuffleMetrics> reset()
{
   return Collections.unmodifiableMap(datasourceMetrics.getAndSet(new ConcurrentHashMap<>()));
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Changed to snapshotAndReset() since it sounds more intuitive to me.

{
datasourceMetrics
.get()
.computeIfAbsent(supervisorTaskId, k -> new PerDatasourceShuffleMetrics()).accumulate(fileLength);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's still possible to miss an update in reporting because of race condition, right? Since the reference could be reset while the accumulation is happening.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The race condition exists, but it should be fine because the missing update should be included in the next call to snapshotAndReset(). I added javadocs explaining why.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to use something like AtomicReference.getAndUpdate so that it isn't racy with the monitor/emitter? Though I'm not sure getAndUpdate or the similar methods are actually appropriate since they are supposed to be side-effect free, so I'm not really sure how exactly to resolve this.

Like, the potentially problematic scenario I'm thinking of is where shuffleRequested is called "before" snapshotAndReset. It seems like once AtomicReference.get has completed, snapshotAndReset can proceed, so now the shuffle monitor has the same concurrent map we are still actively updating, and it is preparing to build the metrics to emit. It seems super unlikely that it would be a problem, but unless I'm missing something it does seem possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you guys are right. Will fix it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that any updates on the reference to datasourceMetrics should be synchronized with any updates on the map itself and its values. I could use ConcurrentHashMap.compute() if I didn't have to reset the reference to the map when a snapshot is taken, but I think it's needed since the map can keep growing over time otherwise. I'm not sure if there is any other way than using a big lock. I made this change, let me know if you have a better idea.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the lock should suffice. shuffleRequested doesn't need to be a high throughput call.

Copy link
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall lgtm

{
datasourceMetrics
.get()
.computeIfAbsent(supervisorTaskId, k -> new PerDatasourceShuffleMetrics()).accumulate(fileLength);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to use something like AtomicReference.getAndUpdate so that it isn't racy with the monitor/emitter? Though I'm not sure getAndUpdate or the similar methods are actually appropriate since they are supposed to be side-effect free, so I'm not really sure how exactly to resolve this.

Like, the potentially problematic scenario I'm thinking of is where shuffleRequested is called "before" snapshotAndReset. It seems like once AtomicReference.get has completed, snapshotAndReset can proceed, so now the shuffle monitor has the same concurrent map we are still actively updating, and it is preparing to build the metrics to emit. It seems super unlikely that it would be a problem, but unless I'm missing something it does seem possible.


/**
* This method is called whenever the monitoring thread takes a snapshot of the current metrics. The map inside
* AtomicReference will be reset to an empty map after this call. This is to return the snapshot metrics collected
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment needs an update after the latest changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Fixed.

Copy link
Contributor

@suneet-s suneet-s left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, looks very nice! Just one ask about a feature flag. I don't have a strong opinion on the name of the metric, but would love to know your thoughts

*/
public void shuffleRequested(String supervisorTaskId, long fileLength)
{
synchronized (lock) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there is a risk of the locking introducing a slow down here because of contention, can we update this to include a feature flag check?

This way, if there are some unforeseen issues with locking, we can disable metric computation and reporting. I think a static feature flag - like a system property would be good enough for this use case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this locking would introduce any noticeable slow down, but feature flag sounds good. Now, ShuffleMetrics and ShuffleMonitor will work only when ShuffleMonitor is defined in druid.monitoring.monitors. Added some doc for that too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this approach a lot 🤘

* whenever a snapshot is taken since the map can keep growing over time otherwise. For concurrent access pattern,
* see {@link #shuffleRequested} and {@link #snapshotAndReset()}.
*/
@GuardedBy("lock")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious - why did you choose to use the guarded by pattern instead of a ConcurrentMap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was some prior discussion about it. It was mainly because not only updating the datasourceMetrics map, but also updating PerDatasourceShuffleMetrics should be synchronized as well. For example, if it was updating PerDatasourceShuffleMetrics when snapshotAndReset() is called, it should guarantee that the updating will be done before snapshotAndReset().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah - that makes sense. Thanks for the explanation

{
private static final String SUPERVISOR_TASK_ID_DIMENSION = "supervisorTaskId";
private static final String SHUFFLE_BYTES_KEY = "shuffle/bytes";
private static final String SHUFFLE_REQUESTS_KEY = "shuffle/requests";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other ingestion related metrics start with "ingest/" any thoughts on whether these metrics fall under the ingestion metrics category?

I was thinking about where the metrics would live in the docs which is why I was asking this question. I thought maybe it belonged here https://druid.apache.org/docs/latest/operations/metrics.html#ingestion-metrics-realtime-process ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. The new metrics don't seem to belong to any existing section, so I added a new one. But our current doc doesn't seem organized well (for example, the metrics in the above link are not only for realtime processes, but for all task types as well), maybe we need to tidy up at some point after #10352 is done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I modified the metrics to start with ingest/ similar to other ingestion metrics.

{
// ShuffleMonitor cannot be registered dynamically, but can only via the static configuration (MonitorsConfig).
// As a result, it is safe to check only one time if it is registered in MonitorScheduler.
final Optional<ShuffleMonitor> maybeMonitor = monitorScheduler.findMonitor(ShuffleMonitor.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that MonitorScheduler has a removeMonitor method, and ShuffleMetrics is provided as a Singleton. Can someone remove the ShuffleMonitor while Druid is running? If they do that how would it impact ShuffleMetrics being reported

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, a monitor can be removed when 1) the monitor() method returns false or 2) tasks de-register task-specific monitors such as TaskRealtimeMetricsMonitor which is used in the deprecated Tranquility. So, ShuffleMonitor cannot be removed once a node is started.

In the future, I think we may want to dynamically register and remove monitors (because it's cool). In that case, we probably need to check all monitor implementations we have if they have any issues to do that. We can come back to ShuffleMonitor later to handle the case you mentioned.

Copy link
Contributor

@suneet-s suneet-s left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with some asks for unit tests

@Override
public void configure(Binder binder)
{
Jerseys.addResource(binder, ShuffleResource.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a ModuleTest that validates the ShuffleResource and Optional<ShuffleMetrics>is injectable a. I think I've written AuthorizerMapperModuleTest that would be a similar example

emitter.emit(metricBuilder.build(SHUFFLE_REQUESTS_KEY, perDatasourceShuffleMetrics.getShuffleRequests()));
});
}
return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add unit tests for this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I thought I added one already. Added now.

{
final ShuffleMonitor shuffleMonitor = new ShuffleMonitor();
final MonitorScheduler monitorScheduler = Mockito.mock(MonitorScheduler.class);
Mockito.when(monitorScheduler.findMonitor(ArgumentMatchers.eq(ShuffleMonitor.class)))
Copy link
Contributor

@suneet-s suneet-s Oct 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
Mockito.when(monitorScheduler.findMonitor(ArgumentMatchers.eq(ShuffleMonitor.class)))
Mockito.when(monitorScheduler.findMonitor(ShuffleMonitor.class))

Copy link
Contributor

@suneet-s suneet-s left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the analyzeDependencies job is failing

[WARNING] Unused declared dependencies found:
[WARNING]    org.checkerframework:checker-qual:jar:2.5.7:compile

Comment on lines 51 to 62
final ShuffleMonitor shuffleMonitor = new ShuffleMonitor();
final MonitorScheduler monitorScheduler = Mockito.mock(MonitorScheduler.class);
Mockito.when(monitorScheduler.findMonitor(ArgumentMatchers.eq(ShuffleMonitor.class)))
.thenReturn(Optional.of(shuffleMonitor));
injector = Guice.createInjector(
binder -> {
binder.bindScope(LazySingleton.class, Scopes.SINGLETON);
binder.bind(MonitorScheduler.class).toInstance(monitorScheduler);
binder.bind(IntermediaryDataManager.class).toInstance(Mockito.mock(IntermediaryDataManager.class));
},
shuffleModule
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can move this into a @Before method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As monitorScheduler behaves differently in tests, I think it's better to make them in each test. I extracted other common codes as a util method.

@jihoonson
Copy link
Contributor Author

Looks like the analyzeDependencies job is failing

[WARNING] Unused declared dependencies found:
[WARNING]    org.checkerframework:checker-qual:jar:2.5.7:compile

Thanks, fixed now.

@suneet-s suneet-s merged commit ad437dd into apache:master Oct 11, 2020
@jihoonson jihoonson added this to the 0.21.0 milestone Jan 4, 2021
JulianJaffePinterest pushed a commit to JulianJaffePinterest/druid that referenced this pull request Jan 22, 2021
* Add shuffle metrics for parallel indexing

* javadoc and concurrency test

* concurrency

* fix javadoc

* Feature flag

* doc

* fix doc and add a test

* checkstyle

* add tests

* fix build and address comments
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants