-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add shuffle metrics for parallel indexing #10359
Changes from 1 commit
7e0fc0a
b73677f
fcd8cb2
bf23cbc
80b2949
e258dd3
e205c1d
d70529d
394b473
23312bb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.druid.indexing.worker.shuffle; | ||
|
||
import com.google.common.annotations.VisibleForTesting; | ||
|
||
import java.util.Collections; | ||
import java.util.Map; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
|
||
public class ShuffleMetrics | ||
{ | ||
private final AtomicReference<ConcurrentHashMap<String, PerDatasourceShuffleMetrics>> datasourceMetrics = | ||
new AtomicReference<>(); | ||
|
||
public ShuffleMetrics() | ||
{ | ||
datasourceMetrics.set(new ConcurrentHashMap<>()); | ||
} | ||
|
||
public void shuffleRequested(String supervisorTaskId, long fileLength) | ||
{ | ||
datasourceMetrics | ||
.get() | ||
.computeIfAbsent(supervisorTaskId, k -> new PerDatasourceShuffleMetrics()).accumulate(fileLength); | ||
} | ||
|
||
public Map<String, PerDatasourceShuffleMetrics> snapshot() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this be renamed to
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good. Changed to |
||
{ | ||
return Collections.unmodifiableMap(datasourceMetrics.getAndSet(new ConcurrentHashMap<>())); | ||
} | ||
|
||
/** | ||
* This method is visible only for testing. Use {@link #snapshot()} instead to get the current snapshot. | ||
*/ | ||
@VisibleForTesting | ||
Map<String, PerDatasourceShuffleMetrics> getDatasourceMetrics() | ||
{ | ||
return datasourceMetrics.get(); | ||
} | ||
|
||
public static class PerDatasourceShuffleMetrics | ||
{ | ||
private long shuffleBytes; | ||
private int shuffleRequests; | ||
|
||
public void accumulate(long shuffleBytes) | ||
{ | ||
this.shuffleBytes += shuffleBytes; | ||
this.shuffleRequests++; | ||
} | ||
|
||
public long getShuffleBytes() | ||
{ | ||
return shuffleBytes; | ||
} | ||
|
||
public int getShuffleRequests() | ||
{ | ||
return shuffleRequests; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.druid.indexing.worker.shuffle; | ||
|
||
import com.google.inject.Binder; | ||
import com.google.inject.Module; | ||
import org.apache.druid.guice.Jerseys; | ||
import org.apache.druid.guice.LazySingleton; | ||
import org.apache.druid.server.metrics.MetricsModule; | ||
|
||
public class ShuffleModule implements Module | ||
{ | ||
@Override | ||
public void configure(Binder binder) | ||
{ | ||
Jerseys.addResource(binder, ShuffleResource.class); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a ModuleTest that validates the |
||
|
||
binder.bind(ShuffleMetrics.class).in(LazySingleton.class); | ||
binder.bind(ShuffleMonitor.class).in(LazySingleton.class); | ||
MetricsModule.register(binder, ShuffleMonitor.class); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.druid.indexing.worker.shuffle; | ||
|
||
import com.google.inject.Inject; | ||
import org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics; | ||
import org.apache.druid.java.util.emitter.service.ServiceEmitter; | ||
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; | ||
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent.Builder; | ||
import org.apache.druid.java.util.metrics.AbstractMonitor; | ||
|
||
import java.util.Map; | ||
|
||
public class ShuffleMonitor extends AbstractMonitor | ||
{ | ||
private static final String SUPERVISOR_TASK_ID_DIMENSION = "supervisorTaskId"; | ||
private static final String SHUFFLE_BYTES_KEY = "shuffle/bytes"; | ||
private static final String SHUFFLE_REQUESTS_KEY = "shuffle/requests"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. other ingestion related metrics start with "ingest/" any thoughts on whether these metrics fall under the ingestion metrics category? I was thinking about where the metrics would live in the docs which is why I was asking this question. I thought maybe it belonged here https://druid.apache.org/docs/latest/operations/metrics.html#ingestion-metrics-realtime-process ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question. The new metrics don't seem to belong to any existing section, so I added a new one. But our current doc doesn't seem organized well (for example, the metrics in the above link are not only for realtime processes, but for all task types as well), maybe we need to tidy up at some point after #10352 is done. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, I modified the metrics to start with |
||
|
||
private final ShuffleMetrics shuffleMetrics; | ||
|
||
@Inject | ||
public ShuffleMonitor(ShuffleMetrics shuffleMetrics) | ||
{ | ||
this.shuffleMetrics = shuffleMetrics; | ||
} | ||
|
||
@Override | ||
public boolean doMonitor(ServiceEmitter emitter) | ||
{ | ||
final Map<String, PerDatasourceShuffleMetrics> snapshot = shuffleMetrics.snapshot(); | ||
snapshot.forEach((supervisorTaskId, perDatasourceShuffleMetrics) -> { | ||
final Builder metricBuilder = ServiceMetricEvent | ||
.builder() | ||
.setDimension(SUPERVISOR_TASK_ID_DIMENSION, supervisorTaskId); | ||
emitter.emit(metricBuilder.build(SHUFFLE_BYTES_KEY, perDatasourceShuffleMetrics.getShuffleBytes())); | ||
emitter.emit(metricBuilder.build(SHUFFLE_REQUESTS_KEY, perDatasourceShuffleMetrics.getShuffleRequests())); | ||
}); | ||
|
||
return true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we add unit tests for this function? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oops, I thought I added one already. Added now. |
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's still possible to miss an update in reporting because of race condition, right? Since the reference could be reset while the accumulation is happening.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The race condition exists, but it should be fine because the missing update should be included in the next call to
snapshotAndReset()
. I added javadocs explaining why.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs to use something like
AtomicReference.getAndUpdate
so that it isn't racy with the monitor/emitter? Though I'm not suregetAndUpdate
or the similar methods are actually appropriate since they are supposed to be side-effect free, so I'm not really sure how exactly to resolve this.Like, the potentially problematic scenario I'm thinking of is where
shuffleRequested
is called "before"snapshotAndReset
. It seems like onceAtomicReference.get
has completed,snapshotAndReset
can proceed, so now the shuffle monitor has the same concurrent map we are still actively updating, and it is preparing to build the metrics to emit. It seems super unlikely that it would be a problem, but unless I'm missing something it does seem possible.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, you guys are right. Will fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that any updates on the reference to
datasourceMetrics
should be synchronized with any updates on the map itself and its values. I could useConcurrentHashMap.compute()
if I didn't have to reset the reference to the map when a snapshot is taken, but I think it's needed since the map can keep growing over time otherwise. I'm not sure if there is any other way than using a big lock. I made this change, let me know if you have a better idea.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the lock should suffice.
shuffleRequested
doesn't need to be a high throughput call.