Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shuffle metrics for parallel indexing #10359

Merged
merged 10 commits into from
Oct 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,12 @@
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,10 @@ public interface Monitor

void stop();

/**
* Emit metrics using the given emitter.
*
* @return true if this monitor needs to continue monitoring. False otherwise.
*/
boolean monitor(ServiceEmitter emitter);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.druid.java.util.emitter.service.ServiceEmitter;

import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -93,6 +94,17 @@ public void removeMonitor(final Monitor monitor)
}
}

/**
* Returns a {@link Monitor} instance of the given class if any. Note that this method searches for the monitor
* from the current snapshot of {@link #monitors}.
*/
public <T extends Monitor> Optional<T> findMonitor(Class<T> monitorClass)
{
synchronized (lock) {
return (Optional<T>) monitors.stream().filter(m -> m.getClass() == monitorClass).findFirst();
}
}

@LifecycleStop
public void stop()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.java.util.metrics;

import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import java.util.Optional;

public class MonitorSchedulerTest
{
@Test
public void testFindMonitor()
{
class Monitor1 extends NoopMonitor
{
}
class Monitor2 extends NoopMonitor
{
}
class Monitor3 extends NoopMonitor
{
}

final Monitor1 monitor1 = new Monitor1();
final Monitor2 monitor2 = new Monitor2();

final MonitorScheduler scheduler = new MonitorScheduler(
Mockito.mock(MonitorSchedulerConfig.class),
Execs.scheduledSingleThreaded("monitor-scheduler-test"),
Mockito.mock(ServiceEmitter.class),
ImmutableList.of(monitor1, monitor2)
);

final Optional<Monitor1> maybeFound1 = scheduler.findMonitor(Monitor1.class);
final Optional<Monitor2> maybeFound2 = scheduler.findMonitor(Monitor2.class);
Assert.assertTrue(maybeFound1.isPresent());
Assert.assertTrue(maybeFound2.isPresent());
Assert.assertSame(monitor1, maybeFound1.get());
Assert.assertSame(monitor2, maybeFound2.get());

Assert.assertFalse(scheduler.findMonitor(Monitor3.class).isPresent());
}

private static class NoopMonitor implements Monitor
{
@Override
public void start()
{

}

@Override
public void stop()
{

}

@Override
public boolean monitor(ServiceEmitter emitter)
{
return true;
}
}
}
12 changes: 11 additions & 1 deletion docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ These metrics are only available if the RealtimeMetricsMonitor is included in th

Note: If the JVM does not support CPU time measurement for the current thread, ingest/merge/cpu and ingest/persists/cpu will be 0.

### Indexing service
## Indexing service

|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
Expand All @@ -187,6 +187,16 @@ Note: If the JVM does not support CPU time measurement for the current thread, i
|`task/pending/count`|Number of current pending tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
|`task/waiting/count`|Number of current waiting tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|

## Shuffle metrics (Native parallel task)

The shuffle metrics can be enabled by adding `org.apache.druid.indexing.worker.shuffle.ShuffleMonitor` in `druid.monitoring.monitors`
See [Enabling Metrics](../configuration/index.md#enabling-metrics) for more details.

|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`ingest/shuffle/bytes`|Number of bytes shuffled per emission period.|supervisorTaskId|Varies|
|`ingest/shuffle/requests`|Number of shuffle requests per emission period.|supervisorTaskId|Varies|

## Coordination

These metrics are for the Druid Coordinator and are reset each time the Coordinator runs the coordination logic.
Expand Down
6 changes: 6 additions & 0 deletions indexing-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,12 @@
<artifactId>system-rules</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
import org.apache.druid.indexing.worker.IntermediaryDataManager;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.Monitor;
import org.apache.druid.java.util.metrics.MonitorScheduler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
import org.apache.druid.indexing.worker.IntermediaryDataManager;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistributionMerger;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketchMerger;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
Expand Down Expand Up @@ -486,7 +487,7 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception
* - In the first phase, each task partitions input data and stores those partitions in local storage.
* - The partition is created based on the segment granularity (primary partition key) and the partition dimension
* values in {@link PartitionsSpec} (secondary partition key).
* - Partitioned data is maintained by {@link org.apache.druid.indexing.worker.IntermediaryDataManager}.
* - Partitioned data is maintained by {@link IntermediaryDataManager}.
* - In the second phase, each task reads partitioned data from the intermediary data server (middleManager
* or indexer) and merges them to create the final segments.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
import org.apache.druid.indexing.common.task.batch.partition.HashPartitionAnalysis;
import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
Expand All @@ -46,7 +47,7 @@
/**
* The worker task of {@link PartialHashSegmentGenerateParallelIndexTaskRunner}. This task partitions input data by
* hashing the segment granularity and partition dimensions in {@link HashedPartitionsSpec}. Partitioned segments are
* stored in local storage using {@link org.apache.druid.indexing.worker.ShuffleDataSegmentPusher}.
* stored in local storage using {@link ShuffleDataSegmentPusher}.
*/
public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask<GeneratedPartitionsMetadataReport>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.iterator.RangePartitionIndexTaskInputRowIteratorBuilder;
import org.apache.druid.indexing.common.task.batch.partition.RangePartitionAnalysis;
import org.apache.druid.indexing.worker.ShuffleDataSegmentPusher;
import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionBoundaries;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.druid.indexing.common.task.SequenceNameFunction;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskInputRowIteratorBuilder;
import org.apache.druid.indexing.worker.ShuffleDataSegmentPusher;
import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.druid.indexing.common.task.batch.parallel;

import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;

import java.io.File;
import java.io.IOException;

Expand All @@ -27,7 +29,7 @@
* The only available implementation for production code is {@link HttpShuffleClient} and
* this interface is more for easier testing.
*
* @see org.apache.druid.indexing.worker.IntermediaryDataManager
* @see IntermediaryDataManager
* @see PartialSegmentMergeTask
*/
public interface ShuffleClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.druid.indexing.worker;
package org.apache.druid.indexing.worker.shuffle;

import com.google.common.collect.Iterators;
import com.google.common.io.Files;
Expand All @@ -41,6 +41,7 @@
import org.apache.druid.segment.loading.StorageLocation;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CompressionUtils;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
Expand All @@ -67,7 +68,7 @@
/**
* This class manages intermediary segments for data shuffle between native parallel index tasks.
* In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers (or indexer)
* and phase 2 tasks read those files via HTTP.
* and phase 2 tasks read those files over HTTP.
*
* The directory where segment files are placed is structured as
* {@link StorageLocation#path}/supervisorTaskId/startTimeOfSegment/endTimeOfSegment/bucketIdOfSegment.
Expand Down Expand Up @@ -100,7 +101,7 @@ public class IntermediaryDataManager
// but middleManager or indexer could miss the request. This executor is to automatically clean up unused intermediary
// partitions.
// This can be null until IntermediaryDataManager is started.
@Nullable
@MonotonicNonNull
private ScheduledExecutorService supervisorTaskChecker;

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.druid.indexing.worker;
package org.apache.druid.indexing.worker.shuffle;

import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.loading.DataSegmentPusher;
Expand Down
Loading