Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shuffle metrics for parallel indexing #10359

Merged
merged 10 commits into from
Oct 11, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,10 @@ public interface Monitor

void stop();

/**
* Emit metrics using the given emitter.
*
* @return true if this monitor needs to continue monitoring. False otherwise.
*/
boolean monitor(ServiceEmitter emitter);
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
import org.apache.druid.indexing.worker.IntermediaryDataManager;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.Monitor;
import org.apache.druid.java.util.metrics.MonitorScheduler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
import org.apache.druid.indexing.worker.IntermediaryDataManager;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistributionMerger;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketchMerger;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
Expand Down Expand Up @@ -486,7 +487,7 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception
* - In the first phase, each task partitions input data and stores those partitions in local storage.
* - The partition is created based on the segment granularity (primary partition key) and the partition dimension
* values in {@link PartitionsSpec} (secondary partition key).
* - Partitioned data is maintained by {@link org.apache.druid.indexing.worker.IntermediaryDataManager}.
* - Partitioned data is maintained by {@link IntermediaryDataManager}.
* - In the second phase, each task reads partitioned data from the intermediary data server (middleManager
* or indexer) and merges them to create the final segments.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
import org.apache.druid.indexing.common.task.batch.partition.HashPartitionAnalysis;
import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
Expand All @@ -46,7 +47,7 @@
/**
* The worker task of {@link PartialHashSegmentGenerateParallelIndexTaskRunner}. This task partitions input data by
* hashing the segment granularity and partition dimensions in {@link HashedPartitionsSpec}. Partitioned segments are
* stored in local storage using {@link org.apache.druid.indexing.worker.ShuffleDataSegmentPusher}.
* stored in local storage using {@link ShuffleDataSegmentPusher}.
*/
public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask<GeneratedPartitionsMetadataReport>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.iterator.RangePartitionIndexTaskInputRowIteratorBuilder;
import org.apache.druid.indexing.common.task.batch.partition.RangePartitionAnalysis;
import org.apache.druid.indexing.worker.ShuffleDataSegmentPusher;
import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionBoundaries;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.druid.indexing.common.task.SequenceNameFunction;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskInputRowIteratorBuilder;
import org.apache.druid.indexing.worker.ShuffleDataSegmentPusher;
import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.druid.indexing.common.task.batch.parallel;

import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;

import java.io.File;
import java.io.IOException;

Expand All @@ -27,7 +29,7 @@
* The only available implementation for production code is {@link HttpShuffleClient} and
* this interface is more for easier testing.
*
* @see org.apache.druid.indexing.worker.IntermediaryDataManager
* @see IntermediaryDataManager
* @see PartialSegmentMergeTask
*/
public interface ShuffleClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.druid.indexing.worker;
package org.apache.druid.indexing.worker.shuffle;

import com.google.common.collect.Iterators;
import com.google.common.io.Files;
Expand All @@ -41,6 +41,7 @@
import org.apache.druid.segment.loading.StorageLocation;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CompressionUtils;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
Expand All @@ -67,7 +68,7 @@
/**
* This class manages intermediary segments for data shuffle between native parallel index tasks.
* In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers (or indexer)
* and phase 2 tasks read those files via HTTP.
* and phase 2 tasks read those files over HTTP.
*
* The directory where segment files are placed is structured as
* {@link StorageLocation#path}/supervisorTaskId/startTimeOfSegment/endTimeOfSegment/bucketIdOfSegment.
Expand Down Expand Up @@ -100,7 +101,7 @@ public class IntermediaryDataManager
// but middleManager or indexer could miss the request. This executor is to automatically clean up unused intermediary
// partitions.
// This can be null until IntermediaryDataManager is started.
@Nullable
@MonotonicNonNull
private ScheduledExecutorService supervisorTaskChecker;

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.druid.indexing.worker;
package org.apache.druid.indexing.worker.shuffle;

import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.loading.DataSegmentPusher;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.worker.shuffle;

import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.concurrent.GuardedBy;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
* Shuffle metrcis for middleManagers and indexers. This class is thread-safe because shuffle can be performed by
* multiple HTTP threads while a monitoring thread periodically emits the snapshot of metrics.
*
* @see ShuffleResource
* @see org.apache.druid.java.util.metrics.MonitorScheduler
*/
public class ShuffleMetrics
{
/**
* This lock is used to synchronize accesses to the reference to {@link #datasourceMetrics} and the
* {@link PerDatasourceShuffleMetrics} values of the map. This means,
*
* - Any updates on PerDatasourceShuffleMetrics in the map (and thus its key) should be synchronized under this lock.
* - Any updates on the reference to datasourceMetrics should be synchronized under this lock.
*/
private final Object lock = new Object();

/**
* A map of (datasource name) -> {@link PerDatasourceShuffleMetrics}. This map is replaced with an empty map
* whenever a snapshot is taken since the map can keep growing over time otherwise. For concurrent access pattern,
* see {@link #shuffleRequested} and {@link #snapshotAndReset()}.
*/
@GuardedBy("lock")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious - why did you choose to use the guarded by pattern instead of a ConcurrentMap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was some prior discussion about it. It was mainly because not only updating the datasourceMetrics map, but also updating PerDatasourceShuffleMetrics should be synchronized as well. For example, if it was updating PerDatasourceShuffleMetrics when snapshotAndReset() is called, it should guarantee that the updating will be done before snapshotAndReset().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah - that makes sense. Thanks for the explanation

private Map<String, PerDatasourceShuffleMetrics> datasourceMetrics = new HashMap<>();

/**
* This method is called whenever a new shuffle is requested. Multiple tasks can request shuffle at the same time,
* while the monitoring thread takes a snapshot of the metrics. There is a happens-before relationship between
* shuffleRequested and {@link #snapshotAndReset()}.
*/
public void shuffleRequested(String supervisorTaskId, long fileLength)
{
synchronized (lock) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there is a risk of the locking introducing a slow down here because of contention, can we update this to include a feature flag check?

This way, if there are some unforeseen issues with locking, we can disable metric computation and reporting. I think a static feature flag - like a system property would be good enough for this use case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this locking would introduce any noticeable slow down, but feature flag sounds good. Now, ShuffleMetrics and ShuffleMonitor will work only when ShuffleMonitor is defined in druid.monitoring.monitors. Added some doc for that too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this approach a lot 🤘

datasourceMetrics.computeIfAbsent(supervisorTaskId, k -> new PerDatasourceShuffleMetrics())
.accumulate(fileLength);
}
}

/**
* This method is called whenever the monitoring thread takes a snapshot of the current metrics. The map inside
* AtomicReference will be reset to an empty map after this call. This is to return the snapshot metrics collected
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment needs an update after the latest changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Fixed.

* during the monitornig period. There is a happens-before relationship between snapshotAndReset() and
* {@link #shuffleRequested}.
*/
public Map<String, PerDatasourceShuffleMetrics> snapshotAndReset()
{
synchronized (lock) {
final Map<String, PerDatasourceShuffleMetrics> snapshot = Collections.unmodifiableMap(datasourceMetrics);
datasourceMetrics = new HashMap<>();
return snapshot;
}
}

/**
* This method is visible only for testing. Use {@link #snapshotAndReset()} instead to get the current snapshot.
*/
@VisibleForTesting
Map<String, PerDatasourceShuffleMetrics> getDatasourceMetrics()
{
synchronized (lock) {
return datasourceMetrics;
}
}

/**
* This class represents shuffle metrics of one datasource. This class is not thread-safe and should never be accessed
* by multiple threads at the same time.
*/
public static class PerDatasourceShuffleMetrics
{
private long shuffleBytes;
private int shuffleRequests;

private void accumulate(long shuffleBytes)
{
this.shuffleBytes += shuffleBytes;
this.shuffleRequests++;
}

public long getShuffleBytes()
{
return shuffleBytes;
}

public int getShuffleRequests()
{
return shuffleRequests;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.worker.shuffle;

import com.google.inject.Binder;
import com.google.inject.Module;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.server.metrics.MetricsModule;

public class ShuffleModule implements Module
{
@Override
public void configure(Binder binder)
{
Jerseys.addResource(binder, ShuffleResource.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a ModuleTest that validates the ShuffleResource and Optional<ShuffleMetrics>is injectable a. I think I've written AuthorizerMapperModuleTest that would be a similar example


binder.bind(ShuffleMetrics.class).in(LazySingleton.class);
binder.bind(ShuffleMonitor.class).in(LazySingleton.class);
MetricsModule.register(binder, ShuffleMonitor.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.worker.shuffle;

import com.google.inject.Inject;
import org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent.Builder;
import org.apache.druid.java.util.metrics.AbstractMonitor;

import java.util.Map;

public class ShuffleMonitor extends AbstractMonitor
{
private static final String SUPERVISOR_TASK_ID_DIMENSION = "supervisorTaskId";
private static final String SHUFFLE_BYTES_KEY = "shuffle/bytes";
private static final String SHUFFLE_REQUESTS_KEY = "shuffle/requests";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other ingestion related metrics start with "ingest/" any thoughts on whether these metrics fall under the ingestion metrics category?

I was thinking about where the metrics would live in the docs which is why I was asking this question. I thought maybe it belonged here https://druid.apache.org/docs/latest/operations/metrics.html#ingestion-metrics-realtime-process ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. The new metrics don't seem to belong to any existing section, so I added a new one. But our current doc doesn't seem organized well (for example, the metrics in the above link are not only for realtime processes, but for all task types as well), maybe we need to tidy up at some point after #10352 is done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I modified the metrics to start with ingest/ similar to other ingestion metrics.


private final ShuffleMetrics shuffleMetrics;

@Inject
public ShuffleMonitor(ShuffleMetrics shuffleMetrics)
{
this.shuffleMetrics = shuffleMetrics;
}

@Override
public boolean doMonitor(ServiceEmitter emitter)
{
final Map<String, PerDatasourceShuffleMetrics> snapshot = shuffleMetrics.snapshotAndReset();
snapshot.forEach((supervisorTaskId, perDatasourceShuffleMetrics) -> {
final Builder metricBuilder = ServiceMetricEvent
.builder()
.setDimension(SUPERVISOR_TASK_ID_DIMENSION, supervisorTaskId);
emitter.emit(metricBuilder.build(SHUFFLE_BYTES_KEY, perDatasourceShuffleMetrics.getShuffleBytes()));
emitter.emit(metricBuilder.build(SHUFFLE_REQUESTS_KEY, perDatasourceShuffleMetrics.getShuffleRequests()));
});

return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add unit tests for this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I thought I added one already. Added now.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
* under the License.
*/

package org.apache.druid.indexing.worker.http;
package org.apache.druid.indexing.worker.shuffle;

import com.google.common.io.ByteStreams;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.indexing.worker.IntermediaryDataManager;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
Expand Down Expand Up @@ -60,11 +59,13 @@ public class ShuffleResource
private static final Logger log = new Logger(ShuffleResource.class);

private final IntermediaryDataManager intermediaryDataManager;
private final ShuffleMetrics shuffleMetrics;

@Inject
public ShuffleResource(IntermediaryDataManager intermediaryDataManager)
public ShuffleResource(IntermediaryDataManager intermediaryDataManager, ShuffleMetrics shuffleMetrics)
{
this.intermediaryDataManager = intermediaryDataManager;
this.shuffleMetrics = shuffleMetrics;
}

@GET
Expand Down Expand Up @@ -96,6 +97,7 @@ public Response getPartition(
);
return Response.status(Status.NOT_FOUND).entity(errorMessage).build();
} else {
shuffleMetrics.shuffleRequested(supervisorTaskId, partitionFile.length());
return Response.ok(
(StreamingOutput) output -> {
try (final FileInputStream fileInputStream = new FileInputStream(partitionFile)) {
Expand Down
Loading