Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shuffle metrics for parallel indexing #10359

Merged
merged 10 commits into from
Oct 11, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,10 @@ public interface Monitor

void stop();

/**
* Emit metrics using the given emitter.
*
* @return true if this monitor needs to continue monitoring. False otherwise.
*/
boolean monitor(ServiceEmitter emitter);
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
import org.apache.druid.indexing.worker.IntermediaryDataManager;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.Monitor;
import org.apache.druid.java.util.metrics.MonitorScheduler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
import org.apache.druid.indexing.worker.IntermediaryDataManager;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistributionMerger;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketchMerger;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
Expand Down Expand Up @@ -486,7 +487,7 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception
* - In the first phase, each task partitions input data and stores those partitions in local storage.
* - The partition is created based on the segment granularity (primary partition key) and the partition dimension
* values in {@link PartitionsSpec} (secondary partition key).
* - Partitioned data is maintained by {@link org.apache.druid.indexing.worker.IntermediaryDataManager}.
* - Partitioned data is maintained by {@link IntermediaryDataManager}.
* - In the second phase, each task reads partitioned data from the intermediary data server (middleManager
* or indexer) and merges them to create the final segments.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
import org.apache.druid.indexing.common.task.batch.partition.HashPartitionAnalysis;
import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
Expand All @@ -46,7 +47,7 @@
/**
* The worker task of {@link PartialHashSegmentGenerateParallelIndexTaskRunner}. This task partitions input data by
* hashing the segment granularity and partition dimensions in {@link HashedPartitionsSpec}. Partitioned segments are
* stored in local storage using {@link org.apache.druid.indexing.worker.ShuffleDataSegmentPusher}.
* stored in local storage using {@link ShuffleDataSegmentPusher}.
*/
public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask<GeneratedPartitionsMetadataReport>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.iterator.RangePartitionIndexTaskInputRowIteratorBuilder;
import org.apache.druid.indexing.common.task.batch.partition.RangePartitionAnalysis;
import org.apache.druid.indexing.worker.ShuffleDataSegmentPusher;
import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionBoundaries;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.druid.indexing.common.task.SequenceNameFunction;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskInputRowIteratorBuilder;
import org.apache.druid.indexing.worker.ShuffleDataSegmentPusher;
import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.druid.indexing.common.task.batch.parallel;

import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;

import java.io.File;
import java.io.IOException;

Expand All @@ -27,7 +29,7 @@
* The only available implementation for production code is {@link HttpShuffleClient} and
* this interface is more for easier testing.
*
* @see org.apache.druid.indexing.worker.IntermediaryDataManager
* @see IntermediaryDataManager
* @see PartialSegmentMergeTask
*/
public interface ShuffleClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.druid.indexing.worker;
package org.apache.druid.indexing.worker.shuffle;

import com.google.common.collect.Iterators;
import com.google.common.io.Files;
Expand All @@ -41,6 +41,7 @@
import org.apache.druid.segment.loading.StorageLocation;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CompressionUtils;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
Expand All @@ -67,7 +68,7 @@
/**
* This class manages intermediary segments for data shuffle between native parallel index tasks.
* In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers (or indexer)
* and phase 2 tasks read those files via HTTP.
* and phase 2 tasks read those files over HTTP.
*
* The directory where segment files are placed is structured as
* {@link StorageLocation#path}/supervisorTaskId/startTimeOfSegment/endTimeOfSegment/bucketIdOfSegment.
Expand Down Expand Up @@ -100,7 +101,7 @@ public class IntermediaryDataManager
// but middleManager or indexer could miss the request. This executor is to automatically clean up unused intermediary
// partitions.
// This can be null until IntermediaryDataManager is started.
@Nullable
@MonotonicNonNull
private ScheduledExecutorService supervisorTaskChecker;

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.druid.indexing.worker;
package org.apache.druid.indexing.worker.shuffle;

import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.loading.DataSegmentPusher;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.worker.shuffle;

import com.google.common.annotations.VisibleForTesting;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

public class ShuffleMetrics
{
private final AtomicReference<ConcurrentHashMap<String, PerDatasourceShuffleMetrics>> datasourceMetrics =
new AtomicReference<>();

public ShuffleMetrics()
{
datasourceMetrics.set(new ConcurrentHashMap<>());
}

public void shuffleRequested(String supervisorTaskId, long fileLength)
{
datasourceMetrics
.get()
.computeIfAbsent(supervisorTaskId, k -> new PerDatasourceShuffleMetrics()).accumulate(fileLength);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's still possible to miss an update in reporting because of race condition, right? Since the reference could be reset while the accumulation is happening.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The race condition exists, but it should be fine because the missing update should be included in the next call to snapshotAndReset(). I added javadocs explaining why.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to use something like AtomicReference.getAndUpdate so that it isn't racy with the monitor/emitter? Though I'm not sure getAndUpdate or the similar methods are actually appropriate since they are supposed to be side-effect free, so I'm not really sure how exactly to resolve this.

Like, the potentially problematic scenario I'm thinking of is where shuffleRequested is called "before" snapshotAndReset. It seems like once AtomicReference.get has completed, snapshotAndReset can proceed, so now the shuffle monitor has the same concurrent map we are still actively updating, and it is preparing to build the metrics to emit. It seems super unlikely that it would be a problem, but unless I'm missing something it does seem possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you guys are right. Will fix it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that any updates on the reference to datasourceMetrics should be synchronized with any updates on the map itself and its values. I could use ConcurrentHashMap.compute() if I didn't have to reset the reference to the map when a snapshot is taken, but I think it's needed since the map can keep growing over time otherwise. I'm not sure if there is any other way than using a big lock. I made this change, let me know if you have a better idea.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the lock should suffice. shuffleRequested doesn't need to be a high throughput call.

}

public Map<String, PerDatasourceShuffleMetrics> snapshot()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be renamed to snapshotAndReset or may be just reset ?

public Map<String, PerDatasourceShuffleMetrics> reset()
{
   return Collections.unmodifiableMap(datasourceMetrics.getAndSet(new ConcurrentHashMap<>()));
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Changed to snapshotAndReset() since it sounds more intuitive to me.

{
return Collections.unmodifiableMap(datasourceMetrics.getAndSet(new ConcurrentHashMap<>()));
}

/**
* This method is visible only for testing. Use {@link #snapshot()} instead to get the current snapshot.
*/
@VisibleForTesting
Map<String, PerDatasourceShuffleMetrics> getDatasourceMetrics()
{
return datasourceMetrics.get();
}

public static class PerDatasourceShuffleMetrics
{
private long shuffleBytes;
private int shuffleRequests;

public void accumulate(long shuffleBytes)
{
this.shuffleBytes += shuffleBytes;
this.shuffleRequests++;
}

public long getShuffleBytes()
{
return shuffleBytes;
}

public int getShuffleRequests()
{
return shuffleRequests;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.worker.shuffle;

import com.google.inject.Binder;
import com.google.inject.Module;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.server.metrics.MetricsModule;

public class ShuffleModule implements Module
{
@Override
public void configure(Binder binder)
{
Jerseys.addResource(binder, ShuffleResource.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a ModuleTest that validates the ShuffleResource and Optional<ShuffleMetrics>is injectable a. I think I've written AuthorizerMapperModuleTest that would be a similar example


binder.bind(ShuffleMetrics.class).in(LazySingleton.class);
binder.bind(ShuffleMonitor.class).in(LazySingleton.class);
MetricsModule.register(binder, ShuffleMonitor.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.worker.shuffle;

import com.google.inject.Inject;
import org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent.Builder;
import org.apache.druid.java.util.metrics.AbstractMonitor;

import java.util.Map;

public class ShuffleMonitor extends AbstractMonitor
{
private static final String SUPERVISOR_TASK_ID_DIMENSION = "supervisorTaskId";
private static final String SHUFFLE_BYTES_KEY = "shuffle/bytes";
private static final String SHUFFLE_REQUESTS_KEY = "shuffle/requests";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other ingestion related metrics start with "ingest/" any thoughts on whether these metrics fall under the ingestion metrics category?

I was thinking about where the metrics would live in the docs which is why I was asking this question. I thought maybe it belonged here https://druid.apache.org/docs/latest/operations/metrics.html#ingestion-metrics-realtime-process ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. The new metrics don't seem to belong to any existing section, so I added a new one. But our current doc doesn't seem organized well (for example, the metrics in the above link are not only for realtime processes, but for all task types as well), maybe we need to tidy up at some point after #10352 is done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I modified the metrics to start with ingest/ similar to other ingestion metrics.


private final ShuffleMetrics shuffleMetrics;

@Inject
public ShuffleMonitor(ShuffleMetrics shuffleMetrics)
{
this.shuffleMetrics = shuffleMetrics;
}

@Override
public boolean doMonitor(ServiceEmitter emitter)
{
final Map<String, PerDatasourceShuffleMetrics> snapshot = shuffleMetrics.snapshot();
snapshot.forEach((supervisorTaskId, perDatasourceShuffleMetrics) -> {
final Builder metricBuilder = ServiceMetricEvent
.builder()
.setDimension(SUPERVISOR_TASK_ID_DIMENSION, supervisorTaskId);
emitter.emit(metricBuilder.build(SHUFFLE_BYTES_KEY, perDatasourceShuffleMetrics.getShuffleBytes()));
emitter.emit(metricBuilder.build(SHUFFLE_REQUESTS_KEY, perDatasourceShuffleMetrics.getShuffleRequests()));
});

return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add unit tests for this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I thought I added one already. Added now.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
* under the License.
*/

package org.apache.druid.indexing.worker.http;
package org.apache.druid.indexing.worker.shuffle;

import com.google.common.io.ByteStreams;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.indexing.worker.IntermediaryDataManager;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
Expand Down Expand Up @@ -60,11 +59,13 @@ public class ShuffleResource
private static final Logger log = new Logger(ShuffleResource.class);

private final IntermediaryDataManager intermediaryDataManager;
private final ShuffleMetrics shuffleMetrics;

@Inject
public ShuffleResource(IntermediaryDataManager intermediaryDataManager)
public ShuffleResource(IntermediaryDataManager intermediaryDataManager, ShuffleMetrics shuffleMetrics)
{
this.intermediaryDataManager = intermediaryDataManager;
this.shuffleMetrics = shuffleMetrics;
}

@GET
Expand Down Expand Up @@ -96,6 +97,7 @@ public Response getPartition(
);
return Response.status(Status.NOT_FOUND).entity(errorMessage).build();
} else {
shuffleMetrics.shuffleRequested(supervisorTaskId, partitionFile.length());
return Response.ok(
(StreamingOutput) output -> {
try (final FileInputStream fileInputStream = new FileInputStream(partitionFile)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.indexing.worker.IntermediaryDataManager;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.druid.indexing.worker;
package org.apache.druid.indexing.worker.shuffle;

import com.google.common.collect.ImmutableList;
import org.apache.commons.io.FileUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.druid.indexing.worker;
package org.apache.druid.indexing.worker.shuffle;

import com.google.common.collect.ImmutableList;
import org.apache.commons.io.FileUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.druid.indexing.worker;
package org.apache.druid.indexing.worker.shuffle;

import com.google.common.collect.ImmutableList;
import com.google.common.io.Files;
Expand Down
Loading