From 7e0fc0a9eda6a4e7bbfc391252567f1276127081 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 4 Sep 2020 16:28:26 -0700 Subject: [PATCH 01/10] Add shuffle metrics for parallel indexing --- .../druid/java/util/metrics/Monitor.java | 5 + .../druid/indexing/common/TaskToolbox.java | 2 +- .../indexing/common/TaskToolboxFactory.java | 2 +- .../parallel/ParallelIndexSupervisorTask.java | 3 +- .../PartialHashSegmentGenerateTask.java | 3 +- .../PartialRangeSegmentGenerateTask.java | 2 +- .../parallel/PartialSegmentGenerateTask.java | 2 +- .../task/batch/parallel/ShuffleClient.java | 4 +- .../IntermediaryDataManager.java | 7 +- .../ShuffleDataSegmentPusher.java | 2 +- .../worker/shuffle/ShuffleMetrics.java | 81 +++++++ .../worker/shuffle/ShuffleModule.java | 39 ++++ .../worker/shuffle/ShuffleMonitor.java | 59 +++++ .../{http => shuffle}/ShuffleResource.java | 8 +- ...stractParallelIndexSupervisorTaskTest.java | 2 +- ...ntermediaryDataManagerAutoCleanupTest.java | 2 +- ...iaryDataManagerManualAddAndDeleteTest.java | 2 +- .../ShuffleDataSegmentPusherTest.java | 2 +- .../worker/shuffle/ShuffleMetricsTest.java | 84 +++++++ .../worker/shuffle/ShuffleResourceTest.java | 211 ++++++++++++++++++ .../java/org/apache/druid/cli/CliIndexer.java | 4 +- .../apache/druid/cli/CliMiddleManager.java | 5 +- 22 files changed, 508 insertions(+), 23 deletions(-) rename indexing-service/src/main/java/org/apache/druid/indexing/worker/{ => shuffle}/IntermediaryDataManager.java (98%) rename indexing-service/src/main/java/org/apache/druid/indexing/worker/{ => shuffle}/ShuffleDataSegmentPusher.java (97%) create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetrics.java create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleModule.java create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitor.java rename indexing-service/src/main/java/org/apache/druid/indexing/worker/{http => shuffle}/ShuffleResource.java (94%) rename indexing-service/src/test/java/org/apache/druid/indexing/worker/{ => shuffle}/IntermediaryDataManagerAutoCleanupTest.java (98%) rename indexing-service/src/test/java/org/apache/druid/indexing/worker/{ => shuffle}/IntermediaryDataManagerManualAddAndDeleteTest.java (99%) rename indexing-service/src/test/java/org/apache/druid/indexing/worker/{ => shuffle}/ShuffleDataSegmentPusherTest.java (98%) create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetricsTest.java create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java b/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java index ac926bee749d..2ccd5db3ca6f 100644 --- a/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java +++ b/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java @@ -29,5 +29,10 @@ public interface Monitor void stop(); + /** + * Emit metrics using the given emitter. + * + * @return true if this monitor needs to continue monitoring. False otherwise. + */ boolean monitor(ServiceEmitter emitter); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java index 009a407d8547..00c9fe78e18c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java @@ -43,7 +43,7 @@ import org.apache.druid.indexing.common.task.IndexTaskClientFactory; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient; import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient; -import org.apache.druid.indexing.worker.IntermediaryDataManager; +import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.metrics.Monitor; import org.apache.druid.java.util.metrics.MonitorScheduler; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java index 3f8063f6a6c8..61b647f0e9ba 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java @@ -42,7 +42,7 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient; import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient; -import org.apache.druid.indexing.worker.IntermediaryDataManager; +import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.metrics.MonitorScheduler; import org.apache.druid.query.QueryRunnerFactoryConglomerate; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 7b72895ed857..9fae63d61e2e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -58,6 +58,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution; import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistributionMerger; import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketchMerger; +import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; @@ -486,7 +487,7 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception * - In the first phase, each task partitions input data and stores those partitions in local storage. * - The partition is created based on the segment granularity (primary partition key) and the partition dimension * values in {@link PartitionsSpec} (secondary partition key). - * - Partitioned data is maintained by {@link org.apache.druid.indexing.worker.IntermediaryDataManager}. + * - Partitioned data is maintained by {@link IntermediaryDataManager}. * - In the second phase, each task reads partitioned data from the intermediary data server (middleManager * or indexer) and merges them to create the final segments. */ diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java index 98dae9926cf3..11cf4a1e3ca5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java @@ -29,6 +29,7 @@ import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder; import org.apache.druid.indexing.common.task.batch.partition.HashPartitionAnalysis; +import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher; import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.BucketNumberedShardSpec; @@ -46,7 +47,7 @@ /** * The worker task of {@link PartialHashSegmentGenerateParallelIndexTaskRunner}. This task partitions input data by * hashing the segment granularity and partition dimensions in {@link HashedPartitionsSpec}. Partitioned segments are - * stored in local storage using {@link org.apache.druid.indexing.worker.ShuffleDataSegmentPusher}. + * stored in local storage using {@link ShuffleDataSegmentPusher}. */ public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java index 89b9f80fcf46..57978f4b8afc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java @@ -31,7 +31,7 @@ import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.batch.parallel.iterator.RangePartitionIndexTaskInputRowIteratorBuilder; import org.apache.druid.indexing.common.task.batch.partition.RangePartitionAnalysis; -import org.apache.druid.indexing.worker.ShuffleDataSegmentPusher; +import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.BucketNumberedShardSpec; import org.apache.druid.timeline.partition.PartitionBoundaries; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java index b65d4f2eeec1..1923bc79c991 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java @@ -31,7 +31,7 @@ import org.apache.druid.indexing.common.task.SequenceNameFunction; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskInputRowIteratorBuilder; -import org.apache.druid.indexing.worker.ShuffleDataSegmentPusher; +import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher; import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.RealtimeIOConfig; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ShuffleClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ShuffleClient.java index 4395c7c6f753..b6ea7aad6cbf 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ShuffleClient.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ShuffleClient.java @@ -19,6 +19,8 @@ package org.apache.druid.indexing.common.task.batch.parallel; +import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager; + import java.io.File; import java.io.IOException; @@ -27,7 +29,7 @@ * The only available implementation for production code is {@link HttpShuffleClient} and * this interface is more for easier testing. * - * @see org.apache.druid.indexing.worker.IntermediaryDataManager + * @see IntermediaryDataManager * @see PartialSegmentMergeTask */ public interface ShuffleClient diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java similarity index 98% rename from indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java rename to indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java index 408fba5fa6ea..9afa1c03d3e0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.indexing.worker; +package org.apache.druid.indexing.worker.shuffle; import com.google.common.collect.Iterators; import com.google.common.io.Files; @@ -41,6 +41,7 @@ import org.apache.druid.segment.loading.StorageLocation; import org.apache.druid.timeline.DataSegment; import org.apache.druid.utils.CompressionUtils; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.Period; @@ -67,7 +68,7 @@ /** * This class manages intermediary segments for data shuffle between native parallel index tasks. * In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers (or indexer) - * and phase 2 tasks read those files via HTTP. + * and phase 2 tasks read those files over HTTP. * * The directory where segment files are placed is structured as * {@link StorageLocation#path}/supervisorTaskId/startTimeOfSegment/endTimeOfSegment/bucketIdOfSegment. @@ -100,7 +101,7 @@ public class IntermediaryDataManager // but middleManager or indexer could miss the request. This executor is to automatically clean up unused intermediary // partitions. // This can be null until IntermediaryDataManager is started. - @Nullable + @MonotonicNonNull private ScheduledExecutorService supervisorTaskChecker; @Inject diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusher.java similarity index 97% rename from indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java rename to indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusher.java index fcbdf9d6c3fd..6bc83ba17baf 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusher.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.indexing.worker; +package org.apache.druid.indexing.worker.shuffle; import org.apache.druid.segment.SegmentUtils; import org.apache.druid.segment.loading.DataSegmentPusher; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetrics.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetrics.java new file mode 100644 index 000000000000..0986efaa7d8c --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetrics.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.worker.shuffle; + +import com.google.common.annotations.VisibleForTesting; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; + +public class ShuffleMetrics +{ + private final AtomicReference> datasourceMetrics = + new AtomicReference<>(); + + public ShuffleMetrics() + { + datasourceMetrics.set(new ConcurrentHashMap<>()); + } + + public void shuffleRequested(String supervisorTaskId, long fileLength) + { + datasourceMetrics + .get() + .computeIfAbsent(supervisorTaskId, k -> new PerDatasourceShuffleMetrics()).accumulate(fileLength); + } + + public Map snapshot() + { + return Collections.unmodifiableMap(datasourceMetrics.getAndSet(new ConcurrentHashMap<>())); + } + + /** + * This method is visible only for testing. Use {@link #snapshot()} instead to get the current snapshot. + */ + @VisibleForTesting + Map getDatasourceMetrics() + { + return datasourceMetrics.get(); + } + + public static class PerDatasourceShuffleMetrics + { + private long shuffleBytes; + private int shuffleRequests; + + public void accumulate(long shuffleBytes) + { + this.shuffleBytes += shuffleBytes; + this.shuffleRequests++; + } + + public long getShuffleBytes() + { + return shuffleBytes; + } + + public int getShuffleRequests() + { + return shuffleRequests; + } + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleModule.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleModule.java new file mode 100644 index 000000000000..bcf3124e8d92 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleModule.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.worker.shuffle; + +import com.google.inject.Binder; +import com.google.inject.Module; +import org.apache.druid.guice.Jerseys; +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.server.metrics.MetricsModule; + +public class ShuffleModule implements Module +{ + @Override + public void configure(Binder binder) + { + Jerseys.addResource(binder, ShuffleResource.class); + + binder.bind(ShuffleMetrics.class).in(LazySingleton.class); + binder.bind(ShuffleMonitor.class).in(LazySingleton.class); + MetricsModule.register(binder, ShuffleMonitor.class); + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitor.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitor.java new file mode 100644 index 000000000000..221b0725f971 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitor.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.worker.shuffle; + +import com.google.inject.Inject; +import org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent.Builder; +import org.apache.druid.java.util.metrics.AbstractMonitor; + +import java.util.Map; + +public class ShuffleMonitor extends AbstractMonitor +{ + private static final String SUPERVISOR_TASK_ID_DIMENSION = "supervisorTaskId"; + private static final String SHUFFLE_BYTES_KEY = "shuffle/bytes"; + private static final String SHUFFLE_REQUESTS_KEY = "shuffle/requests"; + + private final ShuffleMetrics shuffleMetrics; + + @Inject + public ShuffleMonitor(ShuffleMetrics shuffleMetrics) + { + this.shuffleMetrics = shuffleMetrics; + } + + @Override + public boolean doMonitor(ServiceEmitter emitter) + { + final Map snapshot = shuffleMetrics.snapshot(); + snapshot.forEach((supervisorTaskId, perDatasourceShuffleMetrics) -> { + final Builder metricBuilder = ServiceMetricEvent + .builder() + .setDimension(SUPERVISOR_TASK_ID_DIMENSION, supervisorTaskId); + emitter.emit(metricBuilder.build(SHUFFLE_BYTES_KEY, perDatasourceShuffleMetrics.getShuffleBytes())); + emitter.emit(metricBuilder.build(SHUFFLE_REQUESTS_KEY, perDatasourceShuffleMetrics.getShuffleRequests())); + }); + + return true; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/ShuffleResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleResource.java similarity index 94% rename from indexing-service/src/main/java/org/apache/druid/indexing/worker/http/ShuffleResource.java rename to indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleResource.java index 0e0e9364e211..262126ebca0f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/ShuffleResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleResource.java @@ -17,12 +17,11 @@ * under the License. */ -package org.apache.druid.indexing.worker.http; +package org.apache.druid.indexing.worker.shuffle; import com.google.common.io.ByteStreams; import com.google.inject.Inject; import com.sun.jersey.spi.container.ResourceFilters; -import org.apache.druid.indexing.worker.IntermediaryDataManager; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; @@ -60,11 +59,13 @@ public class ShuffleResource private static final Logger log = new Logger(ShuffleResource.class); private final IntermediaryDataManager intermediaryDataManager; + private final ShuffleMetrics shuffleMetrics; @Inject - public ShuffleResource(IntermediaryDataManager intermediaryDataManager) + public ShuffleResource(IntermediaryDataManager intermediaryDataManager, ShuffleMetrics shuffleMetrics) { this.intermediaryDataManager = intermediaryDataManager; + this.shuffleMetrics = shuffleMetrics; } @GET @@ -96,6 +97,7 @@ public Response getPartition( ); return Response.status(Status.NOT_FOUND).entity(errorMessage).build(); } else { + shuffleMetrics.shuffleRequested(supervisorTaskId, partitionFile.length()); return Response.ok( (StreamingOutput) output -> { try (final FileInputStream fileInputStream = new FileInputStream(partitionFile)) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index 94d1b3fa1916..5ca96cae84c1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -65,8 +65,8 @@ import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.TestAppenderatorsManager; import org.apache.druid.indexing.overlord.Segments; -import org.apache.druid.indexing.worker.IntermediaryDataManager; import org.apache.druid.indexing.worker.config.WorkerConfig; +import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java similarity index 98% rename from indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java rename to indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java index 7d0233b6b16d..dcce4809c4aa 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.indexing.worker; +package org.apache.druid.indexing.worker.shuffle; import com.google.common.collect.ImmutableList; import org.apache.commons.io.FileUtils; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java similarity index 99% rename from indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java rename to indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java index 15aad92b6a3c..4db1b39b809e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.indexing.worker; +package org.apache.druid.indexing.worker.shuffle; import com.google.common.collect.ImmutableList; import org.apache.commons.io.FileUtils; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java similarity index 98% rename from indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java rename to indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java index 153192633967..0604742f100c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.indexing.worker; +package org.apache.druid.indexing.worker.shuffle; import com.google.common.collect.ImmutableList; import com.google.common.io.Files; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetricsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetricsTest.java new file mode 100644 index 000000000000..d8352f5a216b --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetricsTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.worker.shuffle; + +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Collections; +import java.util.Map; + +public class ShuffleMetricsTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testShuffleRequested() + { + final ShuffleMetrics metrics = new ShuffleMetrics(); + final String supervisorTask1 = "supervisor1"; + final String supervisorTask2 = "supervisor2"; + final String supervisorTask3 = "supervisor3"; + metrics.shuffleRequested(supervisorTask1, 1024); + metrics.shuffleRequested(supervisorTask2, 10); + metrics.shuffleRequested(supervisorTask1, 512); + metrics.shuffleRequested(supervisorTask3, 10000); + metrics.shuffleRequested(supervisorTask2, 30); + + final Map snapshot = metrics.snapshot(); + Assert.assertEquals(ImmutableSet.of(supervisorTask1, supervisorTask2, supervisorTask3), snapshot.keySet()); + + PerDatasourceShuffleMetrics perDatasourceShuffleMetrics = snapshot.get(supervisorTask1); + Assert.assertEquals(2, perDatasourceShuffleMetrics.getShuffleRequests()); + Assert.assertEquals(1536, perDatasourceShuffleMetrics.getShuffleBytes()); + + perDatasourceShuffleMetrics = snapshot.get(supervisorTask2); + Assert.assertEquals(2, perDatasourceShuffleMetrics.getShuffleRequests()); + Assert.assertEquals(40, perDatasourceShuffleMetrics.getShuffleBytes()); + + perDatasourceShuffleMetrics = snapshot.get(supervisorTask3); + Assert.assertEquals(1, perDatasourceShuffleMetrics.getShuffleRequests()); + Assert.assertEquals(10000, perDatasourceShuffleMetrics.getShuffleBytes()); + } + + @Test + public void testSnapshotUnmodifiable() + { + expectedException.expect(UnsupportedOperationException.class); + new ShuffleMetrics().snapshot().put("k", new PerDatasourceShuffleMetrics()); + } + + @Test + public void testResetDatasourceMetricsAfterSnapshot() + { + final ShuffleMetrics shuffleMetrics = new ShuffleMetrics(); + shuffleMetrics.shuffleRequested("supervisor", 10); + shuffleMetrics.shuffleRequested("supervisor", 10); + shuffleMetrics.shuffleRequested("supervisor2", 10); + shuffleMetrics.snapshot(); + + Assert.assertEquals(Collections.emptyMap(), shuffleMetrics.getDatasourceMetrics()); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java new file mode 100644 index 000000000000..fd00f7da1dbd --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.worker.shuffle; + +import com.google.common.collect.ImmutableList; +import org.apache.commons.io.FileUtils; +import org.apache.druid.client.indexing.IndexingServiceClient; +import org.apache.druid.client.indexing.NoopIndexingServiceClient; +import org.apache.druid.client.indexing.TaskStatus; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.indexing.worker.config.WorkerConfig; +import org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.segment.loading.StorageLocationConfig; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.easymock.EasyMock; +import org.joda.time.Interval; +import org.joda.time.Period; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class ShuffleResourceTest +{ + private static final String DATASOURCE = "datasource"; + + @Rule + public TemporaryFolder tempDir = new TemporaryFolder(); + + private IntermediaryDataManager intermediaryDataManager; + private ShuffleMetrics shuffleMetrics; + private ShuffleResource shuffleResource; + + @Before + public void setup() throws IOException + { + final WorkerConfig workerConfig = new WorkerConfig() + { + @Override + public long getIntermediaryPartitionDiscoveryPeriodSec() + { + return 1; + } + + @Override + public long getIntermediaryPartitionCleanupPeriodSec() + { + return 2; + } + + @Override + public Period getIntermediaryPartitionTimeout() + { + return new Period("PT2S"); + } + + }; + final TaskConfig taskConfig = new TaskConfig( + null, + null, + null, + null, + null, + false, + null, + null, + ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null)) + ); + final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient() + { + @Override + public Map getTaskStatuses(Set taskIds) + { + final Map result = new HashMap<>(); + for (String taskId : taskIds) { + result.put(taskId, new TaskStatus(taskId, TaskState.SUCCESS, 10)); + } + return result; + } + }; + intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient); + shuffleMetrics = new ShuffleMetrics(); + shuffleResource = new ShuffleResource(intermediaryDataManager, shuffleMetrics); + } + + @Test + public void testGetUnknownPartitionReturnNotFound() + { + final Response response = shuffleResource.getPartition( + "unknownSupervisorTask", + "unknownSubtask", + "2020-01-01", + "2020-01-02", + 0 + ); + Assert.assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus()); + Assert.assertNotNull(response.getEntity()); + final String errorMessage = (String) response.getEntity(); + Assert.assertTrue(errorMessage.contains("Can't find the partition for supervisorTask")); + } + + @Test + public void testGetPartitionWithValidParamsReturnOk() throws IOException + { + final String supervisorTaskId = "supervisorTask"; + final String subtaskId = "subtaskId"; + final Interval interval = Intervals.of("2020-01-01/P1D"); + final DataSegment segment = newSegment(interval); + final File segmentDir = generateSegmentDir("test"); + intermediaryDataManager.addSegment(supervisorTaskId, subtaskId, segment, segmentDir); + + final Response response = shuffleResource.getPartition( + supervisorTaskId, + subtaskId, + interval.getStart().toString(), + interval.getEnd().toString(), + segment.getId().getPartitionNum() + ); + final Map snapshot = shuffleMetrics.snapshot(); + Assert.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + Assert.assertEquals(1, snapshot.get(supervisorTaskId).getShuffleRequests()); + Assert.assertEquals(134, snapshot.get(supervisorTaskId).getShuffleBytes()); + } + + @Test + public void testDeleteUnknownPartitionReturnOk() + { + final Response response = shuffleResource.deletePartitions("unknownSupervisorTask"); + Assert.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + } + + @Test + public void testDeletePartitionWithValidParamsReturnOk() throws IOException + { + final String supervisorTaskId = "supervisorTask"; + final String subtaskId = "subtaskId"; + final Interval interval = Intervals.of("2020-01-01/P1D"); + final DataSegment segment = newSegment(interval); + final File segmentDir = generateSegmentDir("test"); + intermediaryDataManager.addSegment(supervisorTaskId, subtaskId, segment, segmentDir); + + final Response response = shuffleResource.deletePartitions(supervisorTaskId); + Assert.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + } + + @Test + public void testDeletePartitionThrowingExceptionReturnIntervalServerError() throws IOException + { + final IntermediaryDataManager exceptionThrowingManager = EasyMock.niceMock(IntermediaryDataManager.class); + exceptionThrowingManager.deletePartitions(EasyMock.anyString()); + EasyMock.expectLastCall().andThrow(new IOException("test")); + EasyMock.replay(exceptionThrowingManager); + final ShuffleResource shuffleResource = new ShuffleResource(exceptionThrowingManager, shuffleMetrics); + + final Response response = shuffleResource.deletePartitions("supervisorTask"); + Assert.assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus()); + } + + private static DataSegment newSegment(Interval interval) + { + return new DataSegment( + DATASOURCE, + interval, + "version", + null, + null, + null, + new NumberedShardSpec(0, 0), + 0, + 10 + ); + } + + private File generateSegmentDir(String fileName) throws IOException + { + // Each file size is 138 bytes after compression + final File segmentDir = tempDir.newFolder(); + FileUtils.write(new File(segmentDir, fileName), "test data.", StandardCharsets.UTF_8); + return segmentDir; + } +} diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java index 953c03fb055d..10a701453310 100644 --- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java +++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java @@ -57,7 +57,7 @@ import org.apache.druid.indexing.overlord.ThreadingTaskRunner; import org.apache.druid.indexing.worker.Worker; import org.apache.druid.indexing.worker.config.WorkerConfig; -import org.apache.druid.indexing.worker.http.ShuffleResource; +import org.apache.druid.indexing.worker.shuffle.ShuffleModule; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.input.InputSourceModule; import org.apache.druid.query.QuerySegmentWalker; @@ -143,7 +143,6 @@ public void configure(Binder binder) binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class); Jerseys.addResource(binder, SegmentListerResource.class); - Jerseys.addResource(binder, ShuffleResource.class); LifecycleModule.register(binder, Server.class, RemoteChatHandler.class); @@ -201,6 +200,7 @@ public DataNodeService getDataNodeService(DruidServerConfig serverConfig) ); } }, + new ShuffleModule(), new IndexingServiceFirehoseModule(), new IndexingServiceInputSourceModule(), new IndexingServiceTaskLogsModule(), diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java index 19ed6bc4c19e..84b023e2fed7 100644 --- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java @@ -57,9 +57,9 @@ import org.apache.druid.indexing.worker.WorkerCuratorCoordinator; import org.apache.druid.indexing.worker.WorkerTaskMonitor; import org.apache.druid.indexing.worker.config.WorkerConfig; -import org.apache.druid.indexing.worker.http.ShuffleResource; import org.apache.druid.indexing.worker.http.TaskManagementResource; import org.apache.druid.indexing.worker.http.WorkerResource; +import org.apache.druid.indexing.worker.shuffle.ShuffleModule; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.input.InputSourceModule; import org.apache.druid.query.lookup.LookupSerdeModule; @@ -142,8 +142,6 @@ public void configure(Binder binder) .to(DummyForInjectionAppenderatorsManager.class) .in(LazySingleton.class); - Jerseys.addResource(binder, ShuffleResource.class); - LifecycleModule.register(binder, Server.class); bindNodeRoleAndAnnouncer( @@ -184,6 +182,7 @@ public WorkerNodeService getWorkerNodeService(WorkerConfig workerConfig) ); } }, + new ShuffleModule(), new IndexingServiceFirehoseModule(), new IndexingServiceInputSourceModule(), new IndexingServiceTaskLogsModule(), From b73677ffa0dc9fa11639ceef35b952ae1a1d9dc1 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 8 Sep 2020 12:38:56 -0700 Subject: [PATCH 02/10] javadoc and concurrency test --- .../worker/shuffle/ShuffleMetrics.java | 40 ++++++- .../worker/shuffle/ShuffleMonitor.java | 2 +- .../worker/shuffle/ShuffleMetricsTest.java | 108 +++++++++++++++++- .../worker/shuffle/ShuffleResourceTest.java | 2 +- 4 files changed, 144 insertions(+), 8 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetrics.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetrics.java index 0986efaa7d8c..b48d1bd94a7e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetrics.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetrics.java @@ -26,8 +26,19 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; +/** + * Shuffle metrcis for middleManagers and indexers. This class is thread-safe because shuffle can be performed by + * multiple HTTP threads while a monitoring thread periodically emits the snapshot of metrics. + * + * @see ShuffleResource + * @see org.apache.druid.java.util.metrics.MonitorScheduler + */ public class ShuffleMetrics { + /** + * A reference to a map of (datasource name) -> {@link PerDatasourceShuffleMetrics}. See {@link #shuffleRequested} + * and {@link #snapshotAndReset()} for details of the concurrent access pattern. + */ private final AtomicReference> datasourceMetrics = new AtomicReference<>(); @@ -36,6 +47,14 @@ public ShuffleMetrics() datasourceMetrics.set(new ConcurrentHashMap<>()); } + /** + * This method is called whenever a new shuffle is requested. Multiple tasks can request shuffle at the same time, + * while the monitoring thread takes a snapshot of the metrics. When {@link #snapshotAndReset()} is called + * before shuffleRequested(), the result map of snapshotAndReset() will not include the update made by + * shuffleRequested(). When this method is called before snapshotAndReset(), the result map of snapshotAndReset() + * can either include the update of the last shuffleRequested() or not. If the update is not in the result map, + * it will be included in the next call to snapshotAndReset(). + */ public void shuffleRequested(String supervisorTaskId, long fileLength) { datasourceMetrics @@ -43,13 +62,24 @@ public void shuffleRequested(String supervisorTaskId, long fileLength) .computeIfAbsent(supervisorTaskId, k -> new PerDatasourceShuffleMetrics()).accumulate(fileLength); } - public Map snapshot() + /** + * This method is called whenever the monitoring thread takes a snapshot of the current metrics. The map inside + * AtomicReference will be reset to an empty map after this call. This is to return the snapshot metrics collected + * during the monitornig period. + * + * This method can be called while {@link #shuffleRequested} is called. When snapshotAndReset() is called + * before shuffleRequested(), the result map of snapshotAndReset() will not include the update made by + * shuffleRequested(). When shuffleRequested() is called before snapshotAndReset(), the result map of + * snapshotAndReset() can either include the update of the last shuffleRequested() or not. If the update is not + * in the result map, it will be included in the next call to snapshotAndReset(). + */ + public Map snapshotAndReset() { return Collections.unmodifiableMap(datasourceMetrics.getAndSet(new ConcurrentHashMap<>())); } /** - * This method is visible only for testing. Use {@link #snapshot()} instead to get the current snapshot. + * This method is visible only for testing. Use {@link #snapshotAndReset()} instead to get the current snapshot. */ @VisibleForTesting Map getDatasourceMetrics() @@ -57,12 +87,16 @@ Map getDatasourceMetrics() return datasourceMetrics.get(); } + /** + * This class represents shuffle metrics of one datasource. This class is not thread-safe and should never accessed + * by multiple threads at the same time. + */ public static class PerDatasourceShuffleMetrics { private long shuffleBytes; private int shuffleRequests; - public void accumulate(long shuffleBytes) + private void accumulate(long shuffleBytes) { this.shuffleBytes += shuffleBytes; this.shuffleRequests++; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitor.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitor.java index 221b0725f971..f0eba2108683 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitor.java @@ -45,7 +45,7 @@ public ShuffleMonitor(ShuffleMetrics shuffleMetrics) @Override public boolean doMonitor(ServiceEmitter emitter) { - final Map snapshot = shuffleMetrics.snapshot(); + final Map snapshot = shuffleMetrics.snapshotAndReset(); snapshot.forEach((supervisorTaskId, perDatasourceShuffleMetrics) -> { final Builder metricBuilder = ServiceMetricEvent .builder() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetricsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetricsTest.java index d8352f5a216b..d4f625893b49 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetricsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetricsTest.java @@ -21,13 +21,21 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics; +import org.apache.druid.java.util.common.concurrent.Execs; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; public class ShuffleMetricsTest { @@ -47,7 +55,7 @@ public void testShuffleRequested() metrics.shuffleRequested(supervisorTask3, 10000); metrics.shuffleRequested(supervisorTask2, 30); - final Map snapshot = metrics.snapshot(); + final Map snapshot = metrics.snapshotAndReset(); Assert.assertEquals(ImmutableSet.of(supervisorTask1, supervisorTask2, supervisorTask3), snapshot.keySet()); PerDatasourceShuffleMetrics perDatasourceShuffleMetrics = snapshot.get(supervisorTask1); @@ -67,7 +75,7 @@ public void testShuffleRequested() public void testSnapshotUnmodifiable() { expectedException.expect(UnsupportedOperationException.class); - new ShuffleMetrics().snapshot().put("k", new PerDatasourceShuffleMetrics()); + new ShuffleMetrics().snapshotAndReset().put("k", new PerDatasourceShuffleMetrics()); } @Test @@ -77,8 +85,102 @@ public void testResetDatasourceMetricsAfterSnapshot() shuffleMetrics.shuffleRequested("supervisor", 10); shuffleMetrics.shuffleRequested("supervisor", 10); shuffleMetrics.shuffleRequested("supervisor2", 10); - shuffleMetrics.snapshot(); + shuffleMetrics.snapshotAndReset(); Assert.assertEquals(Collections.emptyMap(), shuffleMetrics.getDatasourceMetrics()); } + + @Test(timeout = 5000L) + public void testConcurrency() throws ExecutionException, InterruptedException + { + final ExecutorService exec = Execs.multiThreaded(3, "shuffle-metrics-test-%d"); // 2 for write, 1 for read + + try { + final ShuffleMetrics metrics = new ShuffleMetrics(); + final String supervisorTask1 = "supervisor1"; + final String supervisorTask2 = "supervisor2"; + + final CountDownLatch firstUpdatelatch = new CountDownLatch(2); + final List> futures = new ArrayList<>(); + + futures.add( + exec.submit(() -> { + metrics.shuffleRequested(supervisorTask1, 1024); + metrics.shuffleRequested(supervisorTask2, 30); + firstUpdatelatch.countDown(); + Thread.sleep(ThreadLocalRandom.current().nextInt(10)); + metrics.shuffleRequested(supervisorTask2, 10); + return null; + }) + ); + futures.add( + exec.submit(() -> { + metrics.shuffleRequested(supervisorTask2, 30); + metrics.shuffleRequested(supervisorTask1, 1024); + firstUpdatelatch.countDown(); + Thread.sleep(ThreadLocalRandom.current().nextInt(10)); + metrics.shuffleRequested(supervisorTask1, 32); + return null; + }) + ); + final Map firstSnapshot = exec.submit(() -> { + firstUpdatelatch.await(); + Thread.sleep(ThreadLocalRandom.current().nextInt(10)); + return metrics.snapshotAndReset(); + }).get(); + + int expectedSecondSnapshotSize = 0; + boolean task1ShouldBeInSecondSnapshot = false; + boolean task2ShouldBeInSecondSnapshot = false; + + Assert.assertEquals(2, firstSnapshot.size()); + Assert.assertNotNull(firstSnapshot.get(supervisorTask1)); + Assert.assertTrue( + 2048 == firstSnapshot.get(supervisorTask1).getShuffleBytes() + || 2080 == firstSnapshot.get(supervisorTask1).getShuffleBytes() + ); + Assert.assertTrue( + 2 == firstSnapshot.get(supervisorTask1).getShuffleRequests() + || 3 == firstSnapshot.get(supervisorTask1).getShuffleRequests() + ); + if (firstSnapshot.get(supervisorTask1).getShuffleRequests() == 2) { + expectedSecondSnapshotSize++; + task1ShouldBeInSecondSnapshot = true; + } + Assert.assertNotNull(firstSnapshot.get(supervisorTask2)); + Assert.assertTrue( + 60 == firstSnapshot.get(supervisorTask2).getShuffleBytes() + || 70 == firstSnapshot.get(supervisorTask2).getShuffleBytes() + ); + Assert.assertTrue( + 2 == firstSnapshot.get(supervisorTask2).getShuffleRequests() + || 3 == firstSnapshot.get(supervisorTask2).getShuffleRequests() + ); + if (firstSnapshot.get(supervisorTask2).getShuffleRequests() == 2) { + expectedSecondSnapshotSize++; + task2ShouldBeInSecondSnapshot = true; + } + + for (Future future : futures) { + future.get(); + } + final Map secondSnapshot = metrics.snapshotAndReset(); + + Assert.assertEquals(expectedSecondSnapshotSize, secondSnapshot.size()); + Assert.assertEquals(task1ShouldBeInSecondSnapshot, secondSnapshot.containsKey(supervisorTask1)); + if (task1ShouldBeInSecondSnapshot) { + Assert.assertEquals(32, secondSnapshot.get(supervisorTask1).getShuffleBytes()); + Assert.assertEquals(1, secondSnapshot.get(supervisorTask1).getShuffleRequests()); + } + Assert.assertEquals(task2ShouldBeInSecondSnapshot, secondSnapshot.containsKey(supervisorTask2)); + if (task2ShouldBeInSecondSnapshot) { + Assert.assertEquals(10, secondSnapshot.get(supervisorTask2).getShuffleBytes()); + Assert.assertEquals(1, secondSnapshot.get(supervisorTask2).getShuffleRequests()); + } + + } + finally { + exec.shutdown(); + } + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java index fd00f7da1dbd..c6a867e8ee5d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java @@ -146,7 +146,7 @@ public void testGetPartitionWithValidParamsReturnOk() throws IOException interval.getEnd().toString(), segment.getId().getPartitionNum() ); - final Map snapshot = shuffleMetrics.snapshot(); + final Map snapshot = shuffleMetrics.snapshotAndReset(); Assert.assertEquals(Status.OK.getStatusCode(), response.getStatus()); Assert.assertEquals(1, snapshot.get(supervisorTaskId).getShuffleRequests()); Assert.assertEquals(134, snapshot.get(supervisorTaskId).getShuffleBytes()); From fcd8cb2cff3333478f2af47340ade08550457ebd Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 9 Sep 2020 10:25:35 -0700 Subject: [PATCH 03/10] concurrency --- .../worker/shuffle/ShuffleMetrics.java | 60 ++++++++++--------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetrics.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetrics.java index b48d1bd94a7e..36844c7f078e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetrics.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetrics.java @@ -20,11 +20,11 @@ package org.apache.druid.indexing.worker.shuffle; import com.google.common.annotations.VisibleForTesting; +import com.google.errorprone.annotations.concurrent.GuardedBy; import java.util.Collections; +import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; /** * Shuffle metrcis for middleManagers and indexers. This class is thread-safe because shuffle can be performed by @@ -36,46 +36,48 @@ public class ShuffleMetrics { /** - * A reference to a map of (datasource name) -> {@link PerDatasourceShuffleMetrics}. See {@link #shuffleRequested} - * and {@link #snapshotAndReset()} for details of the concurrent access pattern. + * This lock is used to synchronize accesses to the reference to {@link #datasourceMetrics} and the + * {@link PerDatasourceShuffleMetrics} values of the map. This means, + * + * - Any updates on PerDatasourceShuffleMetrics in the map (and thus its key) should be synchronized under this lock. + * - Any updates on the reference to datasourceMetrics should be synchronized under this lock. */ - private final AtomicReference> datasourceMetrics = - new AtomicReference<>(); + private final Object lock = new Object(); - public ShuffleMetrics() - { - datasourceMetrics.set(new ConcurrentHashMap<>()); - } + /** + * A map of (datasource name) -> {@link PerDatasourceShuffleMetrics}. This map is replaced with an empty map + * whenever a snapshot is taken since the map can keep growing over time otherwise. For concurrent access pattern, + * see {@link #shuffleRequested} and {@link #snapshotAndReset()}. + */ + @GuardedBy("lock") + private Map datasourceMetrics = new HashMap<>(); /** * This method is called whenever a new shuffle is requested. Multiple tasks can request shuffle at the same time, - * while the monitoring thread takes a snapshot of the metrics. When {@link #snapshotAndReset()} is called - * before shuffleRequested(), the result map of snapshotAndReset() will not include the update made by - * shuffleRequested(). When this method is called before snapshotAndReset(), the result map of snapshotAndReset() - * can either include the update of the last shuffleRequested() or not. If the update is not in the result map, - * it will be included in the next call to snapshotAndReset(). + * while the monitoring thread takes a snapshot of the metrics. There is a happens-before relationship between + * shuffleRequested and {@link #snapshotAndReset()}. */ public void shuffleRequested(String supervisorTaskId, long fileLength) { - datasourceMetrics - .get() - .computeIfAbsent(supervisorTaskId, k -> new PerDatasourceShuffleMetrics()).accumulate(fileLength); + synchronized (lock) { + datasourceMetrics.computeIfAbsent(supervisorTaskId, k -> new PerDatasourceShuffleMetrics()) + .accumulate(fileLength); + } } /** * This method is called whenever the monitoring thread takes a snapshot of the current metrics. The map inside * AtomicReference will be reset to an empty map after this call. This is to return the snapshot metrics collected - * during the monitornig period. - * - * This method can be called while {@link #shuffleRequested} is called. When snapshotAndReset() is called - * before shuffleRequested(), the result map of snapshotAndReset() will not include the update made by - * shuffleRequested(). When shuffleRequested() is called before snapshotAndReset(), the result map of - * snapshotAndReset() can either include the update of the last shuffleRequested() or not. If the update is not - * in the result map, it will be included in the next call to snapshotAndReset(). + * during the monitornig period. There is a happens-before relationship between snapshotAndReset() and + * {@link #shuffleRequested}. */ public Map snapshotAndReset() { - return Collections.unmodifiableMap(datasourceMetrics.getAndSet(new ConcurrentHashMap<>())); + synchronized (lock) { + final Map snapshot = Collections.unmodifiableMap(datasourceMetrics); + datasourceMetrics = new HashMap<>(); + return snapshot; + } } /** @@ -84,11 +86,13 @@ public Map snapshotAndReset() @VisibleForTesting Map getDatasourceMetrics() { - return datasourceMetrics.get(); + synchronized (lock) { + return datasourceMetrics; + } } /** - * This class represents shuffle metrics of one datasource. This class is not thread-safe and should never accessed + * This class represents shuffle metrics of one datasource. This class is not thread-safe and should never be accessed * by multiple threads at the same time. */ public static class PerDatasourceShuffleMetrics From bf23cbc0675e44afbb1ea7d9be4779779e53d45e Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 10 Sep 2020 11:48:03 -0700 Subject: [PATCH 04/10] fix javadoc --- .../druid/indexing/worker/shuffle/ShuffleMetrics.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetrics.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetrics.java index 36844c7f078e..18dce4b2c603 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetrics.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetrics.java @@ -39,7 +39,8 @@ public class ShuffleMetrics * This lock is used to synchronize accesses to the reference to {@link #datasourceMetrics} and the * {@link PerDatasourceShuffleMetrics} values of the map. This means, * - * - Any updates on PerDatasourceShuffleMetrics in the map (and thus its key) should be synchronized under this lock. + * - Any updates on PerDatasourceShuffleMetrics in the map (and thus its key as well) should be synchronized + * under this lock. * - Any updates on the reference to datasourceMetrics should be synchronized under this lock. */ private final Object lock = new Object(); @@ -66,10 +67,10 @@ public void shuffleRequested(String supervisorTaskId, long fileLength) } /** - * This method is called whenever the monitoring thread takes a snapshot of the current metrics. The map inside - * AtomicReference will be reset to an empty map after this call. This is to return the snapshot metrics collected - * during the monitornig period. There is a happens-before relationship between snapshotAndReset() and - * {@link #shuffleRequested}. + * This method is called whenever the monitoring thread takes a snapshot of the current metrics. + * {@link #datasourceMetrics} will be reset to an empty map after this call. This is to return the snapshot + * metrics collected during the monitornig period. There is a happens-before relationship between snapshotAndReset() + * and {@link #shuffleRequested}. */ public Map snapshotAndReset() { From 80b29498386881c112574aaec207af59b994c7ea Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 25 Sep 2020 22:05:25 -0700 Subject: [PATCH 05/10] Feature flag --- .../java/util/metrics/MonitorScheduler.java | 12 +++++ .../worker/shuffle/ShuffleModule.java | 29 ++++++++++-- .../worker/shuffle/ShuffleMonitor.java | 45 ++++++++++++------- .../worker/shuffle/ShuffleResource.java | 7 +-- .../worker/shuffle/ShuffleResourceTest.java | 5 ++- 5 files changed, 74 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java index 118f283ac2c3..2adbe9510a36 100644 --- a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java +++ b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java @@ -27,6 +27,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService; @@ -93,6 +94,17 @@ public void removeMonitor(final Monitor monitor) } } + /** + * Returns a {@link Monitor} instance of the given class if any. Note that this method searches for the monitor + * from the current snapshot of {@link #monitors}. + */ + public Optional findMonitor(Class monitorClass) + { + synchronized (lock) { + return (Optional) monitors.stream().filter(m -> m.getClass() == monitorClass).findFirst(); + } + } + @LifecycleStop public void stop() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleModule.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleModule.java index bcf3124e8d92..1c2afb15e6d8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleModule.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleModule.java @@ -21,9 +21,12 @@ import com.google.inject.Binder; import com.google.inject.Module; +import com.google.inject.Provides; import org.apache.druid.guice.Jerseys; import org.apache.druid.guice.LazySingleton; -import org.apache.druid.server.metrics.MetricsModule; +import org.apache.druid.java.util.metrics.MonitorScheduler; + +import java.util.Optional; public class ShuffleModule implements Module { @@ -31,9 +34,27 @@ public class ShuffleModule implements Module public void configure(Binder binder) { Jerseys.addResource(binder, ShuffleResource.class); + } - binder.bind(ShuffleMetrics.class).in(LazySingleton.class); - binder.bind(ShuffleMonitor.class).in(LazySingleton.class); - MetricsModule.register(binder, ShuffleMonitor.class); + /** + * {@link ShuffleMetrics} is used in {@link ShuffleResource} and {@link ShuffleMonitor} to collect metrics + * and report them, respectively. Unlike ShuffleResource, ShuffleMonitor can be created via a user config + * ({@link org.apache.druid.server.metrics.MonitorsConfig}) in potentially any node types, where it is not + * possible to create ShuffleMetrics. This method checks the {@link MonitorScheduler} if ShuffleMonitor is + * registered on it, and sets the proper ShuffleMetrics. + */ + @Provides + @LazySingleton + public Optional getShuffleMetrics(MonitorScheduler monitorScheduler) + { + // ShuffleMonitor cannot be registered dynamically, but can only via the static configuration (MonitorsConfig). + // As a result, it is safe to check only one time if it is registered in MonitorScheduler. + final Optional maybeMonitor = monitorScheduler.findMonitor(ShuffleMonitor.class); + if (maybeMonitor.isPresent()) { + final ShuffleMetrics metrics = new ShuffleMetrics(); + maybeMonitor.get().setShuffleMetrics(metrics); + return Optional.of(metrics); + } + return Optional.empty(); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitor.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitor.java index f0eba2108683..5f09393353e6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitor.java @@ -19,41 +19,56 @@ package org.apache.druid.indexing.worker.shuffle; -import com.google.inject.Inject; import org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent.Builder; import org.apache.druid.java.util.metrics.AbstractMonitor; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.Nullable; import java.util.Map; public class ShuffleMonitor extends AbstractMonitor { private static final String SUPERVISOR_TASK_ID_DIMENSION = "supervisorTaskId"; - private static final String SHUFFLE_BYTES_KEY = "shuffle/bytes"; - private static final String SHUFFLE_REQUESTS_KEY = "shuffle/requests"; + private static final String SHUFFLE_BYTES_KEY = "ingest/shuffle/bytes"; + private static final String SHUFFLE_REQUESTS_KEY = "ingest/shuffle/requests"; - private final ShuffleMetrics shuffleMetrics; + /** + * ShuffleMonitor can be instantiated in any node types if it is defined in + * {@link org.apache.druid.server.metrics.MonitorsConfig}. Since {@link ShuffleMetrics} is defined + * in the `indexing-service` module, some node types (such as broker) would fail to create it + * if they don't have required dependencies. To avoid this problem, this variable is lazily initialized + * only in the node types which has the {@link ShuffleModule}. + */ + @MonotonicNonNull + private ShuffleMetrics shuffleMetrics; - @Inject - public ShuffleMonitor(ShuffleMetrics shuffleMetrics) + public void setShuffleMetrics(ShuffleMetrics shuffleMetrics) { this.shuffleMetrics = shuffleMetrics; } + @Nullable + public ShuffleMetrics getShuffleMetrics() + { + return shuffleMetrics; + } + @Override public boolean doMonitor(ServiceEmitter emitter) { - final Map snapshot = shuffleMetrics.snapshotAndReset(); - snapshot.forEach((supervisorTaskId, perDatasourceShuffleMetrics) -> { - final Builder metricBuilder = ServiceMetricEvent - .builder() - .setDimension(SUPERVISOR_TASK_ID_DIMENSION, supervisorTaskId); - emitter.emit(metricBuilder.build(SHUFFLE_BYTES_KEY, perDatasourceShuffleMetrics.getShuffleBytes())); - emitter.emit(metricBuilder.build(SHUFFLE_REQUESTS_KEY, perDatasourceShuffleMetrics.getShuffleRequests())); - }); - + if (shuffleMetrics != null) { + final Map snapshot = shuffleMetrics.snapshotAndReset(); + snapshot.forEach((supervisorTaskId, perDatasourceShuffleMetrics) -> { + final Builder metricBuilder = ServiceMetricEvent + .builder() + .setDimension(SUPERVISOR_TASK_ID_DIMENSION, supervisorTaskId); + emitter.emit(metricBuilder.build(SHUFFLE_BYTES_KEY, perDatasourceShuffleMetrics.getShuffleBytes())); + emitter.emit(metricBuilder.build(SHUFFLE_REQUESTS_KEY, perDatasourceShuffleMetrics.getShuffleRequests())); + }); + } return true; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleResource.java index 262126ebca0f..dd885a2ab0d6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleResource.java @@ -41,6 +41,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.util.Optional; /** * HTTP endpoints for shuffle system. The MiddleManager and Indexer use this resource to serve intermediary shuffle @@ -59,10 +60,10 @@ public class ShuffleResource private static final Logger log = new Logger(ShuffleResource.class); private final IntermediaryDataManager intermediaryDataManager; - private final ShuffleMetrics shuffleMetrics; + private final Optional shuffleMetrics; @Inject - public ShuffleResource(IntermediaryDataManager intermediaryDataManager, ShuffleMetrics shuffleMetrics) + public ShuffleResource(IntermediaryDataManager intermediaryDataManager, Optional shuffleMetrics) { this.intermediaryDataManager = intermediaryDataManager; this.shuffleMetrics = shuffleMetrics; @@ -97,7 +98,7 @@ public Response getPartition( ); return Response.status(Status.NOT_FOUND).entity(errorMessage).build(); } else { - shuffleMetrics.shuffleRequested(supervisorTaskId, partitionFile.length()); + shuffleMetrics.ifPresent(metrics -> metrics.shuffleRequested(supervisorTaskId, partitionFile.length())); return Response.ok( (StreamingOutput) output -> { try (final FileInputStream fileInputStream = new FileInputStream(partitionFile)) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java index c6a867e8ee5d..bd1b2117042f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java @@ -48,6 +48,7 @@ import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.Set; public class ShuffleResourceTest @@ -110,7 +111,7 @@ public Map getTaskStatuses(Set taskIds) }; intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient); shuffleMetrics = new ShuffleMetrics(); - shuffleResource = new ShuffleResource(intermediaryDataManager, shuffleMetrics); + shuffleResource = new ShuffleResource(intermediaryDataManager, Optional.of(shuffleMetrics)); } @Test @@ -180,7 +181,7 @@ public void testDeletePartitionThrowingExceptionReturnIntervalServerError() thro exceptionThrowingManager.deletePartitions(EasyMock.anyString()); EasyMock.expectLastCall().andThrow(new IOException("test")); EasyMock.replay(exceptionThrowingManager); - final ShuffleResource shuffleResource = new ShuffleResource(exceptionThrowingManager, shuffleMetrics); + final ShuffleResource shuffleResource = new ShuffleResource(exceptionThrowingManager, Optional.of(shuffleMetrics)); final Response response = shuffleResource.deletePartitions("supervisorTask"); Assert.assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus()); From e258dd32d948b0f3276cd3e49051879d78732775 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 6 Oct 2020 14:56:53 -0700 Subject: [PATCH 06/10] doc --- docs/operations/metrics.md | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 2ddfd7462073..878b966aa47a 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -171,7 +171,7 @@ These metrics are only available if the RealtimeMetricsMonitor is included in th Note: If the JVM does not support CPU time measurement for the current thread, ingest/merge/cpu and ingest/persists/cpu will be 0. -### Indexing service +## Indexing service |Metric|Description|Dimensions|Normal Value| |------|-----------|----------|------------| @@ -187,6 +187,16 @@ Note: If the JVM does not support CPU time measurement for the current thread, i |`task/pending/count`|Number of current pending tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| |`task/waiting/count`|Number of current waiting tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| +## Shuffle metrics (Native parallel task) + +The shuffle metrics can be enabled by adding `org.apache.druid.indexing.worker.shuffle.ShuffleMonitor` in `druid.monitoring.monitors` +See [Enabling Metrics](../configuration/index.md#enabling-metrics) for more details. + +|Metric|Description|Dimensions|Normal Value| +|------|-----------|----------|------------| +|`ingest/shuffle/bytes`|Number of bytes shuffled per emissionPeriod.|supervisorTaskId|Varies| +|`ingest/shuffle/requests`|Number of shuffle requests per emissionPeriod.|supervisorTaskId|Varies| + ## Coordination These metrics are for the Druid Coordinator and are reset each time the Coordinator runs the coordination logic. From e205c1df2e4e813938485435dd590b952bc14ca1 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 6 Oct 2020 15:55:12 -0700 Subject: [PATCH 07/10] fix doc and add a test --- core/pom.xml | 6 ++ .../util/metrics/MonitorSchedulerTest.java | 86 +++++++++++++++++++ docs/operations/metrics.md | 4 +- indexing-service/pom.xml | 5 ++ website/.spelling | 1 + 5 files changed, 100 insertions(+), 2 deletions(-) create mode 100644 core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java diff --git a/core/pom.xml b/core/pom.xml index 86dcd3c4c5b3..36e08d54a843 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -333,6 +333,12 @@ com.google.errorprone error_prone_annotations + + org.mockito + mockito-core + ${mockito.version} + test + diff --git a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java new file mode 100644 index 000000000000..f5722b031c28 --- /dev/null +++ b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.metrics; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.Optional; + +public class MonitorSchedulerTest +{ + @Test + public void testFindMonitor() + { + class Monitor1 extends NoopMonitor + { + } + class Monitor2 extends NoopMonitor + { + } + class Monitor3 extends NoopMonitor + { + } + + final Monitor1 monitor1 = new Monitor1(); + final Monitor2 monitor2 = new Monitor2(); + + final MonitorScheduler scheduler = new MonitorScheduler( + Mockito.mock(MonitorSchedulerConfig.class), + Execs.scheduledSingleThreaded("monitor-scheduler-test"), + Mockito.mock(ServiceEmitter.class), + ImmutableList.of(monitor1, monitor2) + ); + + final Optional maybeFound1 = scheduler.findMonitor(Monitor1.class); + final Optional maybeFound2 = scheduler.findMonitor(Monitor2.class); + Assert.assertTrue(maybeFound1.isPresent()); + Assert.assertTrue(maybeFound2.isPresent()); + Assert.assertSame(monitor1, maybeFound1.get()); + Assert.assertSame(monitor2, maybeFound2.get()); + + Assert.assertFalse(scheduler.findMonitor(Monitor3.class).isPresent()); + } + + private static class NoopMonitor implements Monitor + { + @Override + public void start() + { + + } + + @Override + public void stop() + { + + } + + @Override + public boolean monitor(ServiceEmitter emitter) + { + return true; + } + } +} \ No newline at end of file diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 878b966aa47a..4ca3b6026b06 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -194,8 +194,8 @@ See [Enabling Metrics](../configuration/index.md#enabling-metrics) for more deta |Metric|Description|Dimensions|Normal Value| |------|-----------|----------|------------| -|`ingest/shuffle/bytes`|Number of bytes shuffled per emissionPeriod.|supervisorTaskId|Varies| -|`ingest/shuffle/requests`|Number of shuffle requests per emissionPeriod.|supervisorTaskId|Varies| +|`ingest/shuffle/bytes`|Number of bytes shuffled per emission period.|supervisorTaskId|Varies| +|`ingest/shuffle/requests`|Number of shuffle requests per emission period.|supervisorTaskId|Varies| ## Coordination diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml index 9da439df611b..6c6cd4033112 100644 --- a/indexing-service/pom.xml +++ b/indexing-service/pom.xml @@ -203,6 +203,11 @@ org.apache.logging.log4j log4j-api + + org.checkerframework + checker-qual + ${checkerframework.version} + org.apache.datasketches diff --git a/website/.spelling b/website/.spelling index 249db61de5f5..da47144fe896 100644 --- a/website/.spelling +++ b/website/.spelling @@ -379,6 +379,7 @@ subsecond substring subtask subtasks +supervisorTaskId symlink tiering timeseries From d70529d0f0d2a8dd305074aff5cfb7f3f4e8aa2f Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 6 Oct 2020 17:04:56 -0700 Subject: [PATCH 08/10] checkstyle --- .../apache/druid/java/util/metrics/MonitorSchedulerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java index f5722b031c28..76671968b911 100644 --- a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java +++ b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java @@ -83,4 +83,4 @@ public boolean monitor(ServiceEmitter emitter) return true; } } -} \ No newline at end of file +} From 394b4734f01115d11279e38dd3594663bd78df4e Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 8 Oct 2020 22:16:07 -0700 Subject: [PATCH 09/10] add tests --- indexing-service/pom.xml | 6 ++ .../worker/shuffle/ShuffleMetrics.java | 3 +- .../worker/shuffle/ShuffleMonitor.java | 13 +-- .../worker/shuffle/ShuffleModuleTest.java | 88 +++++++++++++++++++ .../worker/shuffle/ShuffleMonitorTest.java | 68 ++++++++++++++ 5 files changed, 167 insertions(+), 11 deletions(-) create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleModuleTest.java create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitorTest.java diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml index 6c6cd4033112..9a52f87a2338 100644 --- a/indexing-service/pom.xml +++ b/indexing-service/pom.xml @@ -283,6 +283,12 @@ system-rules test + + org.mockito + mockito-core + ${mockito.version} + test + diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetrics.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetrics.java index 18dce4b2c603..500d8e96a8dc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetrics.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetrics.java @@ -101,7 +101,8 @@ public static class PerDatasourceShuffleMetrics private long shuffleBytes; private int shuffleRequests; - private void accumulate(long shuffleBytes) + @VisibleForTesting + void accumulate(long shuffleBytes) { this.shuffleBytes += shuffleBytes; this.shuffleRequests++; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitor.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitor.java index 5f09393353e6..6157698d6271 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitor.java @@ -25,15 +25,14 @@ import org.apache.druid.java.util.emitter.service.ServiceMetricEvent.Builder; import org.apache.druid.java.util.metrics.AbstractMonitor; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; -import org.checkerframework.checker.nullness.qual.Nullable; import java.util.Map; public class ShuffleMonitor extends AbstractMonitor { - private static final String SUPERVISOR_TASK_ID_DIMENSION = "supervisorTaskId"; - private static final String SHUFFLE_BYTES_KEY = "ingest/shuffle/bytes"; - private static final String SHUFFLE_REQUESTS_KEY = "ingest/shuffle/requests"; + static final String SUPERVISOR_TASK_ID_DIMENSION = "supervisorTaskId"; + static final String SHUFFLE_BYTES_KEY = "ingest/shuffle/bytes"; + static final String SHUFFLE_REQUESTS_KEY = "ingest/shuffle/requests"; /** * ShuffleMonitor can be instantiated in any node types if it is defined in @@ -50,12 +49,6 @@ public void setShuffleMetrics(ShuffleMetrics shuffleMetrics) this.shuffleMetrics = shuffleMetrics; } - @Nullable - public ShuffleMetrics getShuffleMetrics() - { - return shuffleMetrics; - } - @Override public boolean doMonitor(ServiceEmitter emitter) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleModuleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleModuleTest.java new file mode 100644 index 000000000000..aff3ee95cb30 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleModuleTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.worker.shuffle; + +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Scopes; +import com.google.inject.TypeLiteral; +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.java.util.metrics.MonitorScheduler; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; + +import java.util.Optional; + +public class ShuffleModuleTest +{ + private ShuffleModule shuffleModule; + private Injector injector; + + @Before + public void setup() + { + shuffleModule = new ShuffleModule(); + } + + @Test + public void testGetShuffleMetricsWhenShuffleMonitorExists() + { + final ShuffleMonitor shuffleMonitor = new ShuffleMonitor(); + final MonitorScheduler monitorScheduler = Mockito.mock(MonitorScheduler.class); + Mockito.when(monitorScheduler.findMonitor(ArgumentMatchers.eq(ShuffleMonitor.class))) + .thenReturn(Optional.of(shuffleMonitor)); + injector = Guice.createInjector( + binder -> { + binder.bindScope(LazySingleton.class, Scopes.SINGLETON); + binder.bind(MonitorScheduler.class).toInstance(monitorScheduler); + binder.bind(IntermediaryDataManager.class).toInstance(Mockito.mock(IntermediaryDataManager.class)); + }, + shuffleModule + ); + final Optional optional = injector.getInstance( + Key.get(new TypeLiteral>() {}) + ); + Assert.assertTrue(optional.isPresent()); + } + + @Test + public void testGetShuffleMetricsWithNoShuffleMonitor() + { + final MonitorScheduler monitorScheduler = Mockito.mock(MonitorScheduler.class); + Mockito.when(monitorScheduler.findMonitor(ArgumentMatchers.eq(ShuffleMonitor.class))) + .thenReturn(Optional.empty()); + injector = Guice.createInjector( + binder -> { + binder.bindScope(LazySingleton.class, Scopes.SINGLETON); + binder.bind(MonitorScheduler.class).toInstance(monitorScheduler); + binder.bind(IntermediaryDataManager.class).toInstance(Mockito.mock(IntermediaryDataManager.class)); + }, + shuffleModule + ); + final Optional optional = injector.getInstance( + Key.get(new TypeLiteral>() {}) + ); + Assert.assertFalse(optional.isPresent()); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitorTest.java new file mode 100644 index 000000000000..1174bc842ede --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitorTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.worker.shuffle; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics; +import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.List; + +public class ShuffleMonitorTest +{ + @Test + public void testDoMonitor() + { + final ShuffleMetrics shuffleMetrics = Mockito.mock(ShuffleMetrics.class); + final PerDatasourceShuffleMetrics perDatasourceShuffleMetrics = new PerDatasourceShuffleMetrics(); + perDatasourceShuffleMetrics.accumulate(100); + perDatasourceShuffleMetrics.accumulate(200); + perDatasourceShuffleMetrics.accumulate(10); + Mockito.when(shuffleMetrics.snapshotAndReset()) + .thenReturn(ImmutableMap.of("supervisor", perDatasourceShuffleMetrics)); + final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + final ShuffleMonitor monitor = new ShuffleMonitor(); + monitor.setShuffleMetrics(shuffleMetrics); + Assert.assertTrue(monitor.doMonitor(emitter)); + final List events = emitter.getEvents(); + Assert.assertEquals(2, events.size()); + Assert.assertSame(ServiceMetricEvent.class, events.get(0).getClass()); + ServiceMetricEvent event = (ServiceMetricEvent) events.get(0); + Assert.assertEquals(ShuffleMonitor.SHUFFLE_BYTES_KEY, event.getMetric()); + Assert.assertEquals(310L, event.getValue()); + Assert.assertEquals( + ImmutableMap.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION, "supervisor"), + event.getUserDims() + ); + Assert.assertSame(ServiceMetricEvent.class, events.get(1).getClass()); + event = (ServiceMetricEvent) events.get(1); + Assert.assertEquals(ShuffleMonitor.SHUFFLE_REQUESTS_KEY, event.getMetric()); + Assert.assertEquals(3, event.getValue()); + Assert.assertEquals( + ImmutableMap.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION, "supervisor"), + event.getUserDims() + ); + } +} From 23312bb1ff09302bae1169b2d99bf797b4295f19 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 9 Oct 2020 12:06:19 -0700 Subject: [PATCH 10/10] fix build and address comments --- indexing-service/pom.xml | 5 ---- .../worker/shuffle/ShuffleModuleTest.java | 27 +++++++++---------- 2 files changed, 12 insertions(+), 20 deletions(-) diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml index 9a52f87a2338..9a075a56a788 100644 --- a/indexing-service/pom.xml +++ b/indexing-service/pom.xml @@ -203,11 +203,6 @@ org.apache.logging.log4j log4j-api - - org.checkerframework - checker-qual - ${checkerframework.version} - org.apache.datasketches diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleModuleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleModuleTest.java index aff3ee95cb30..fec860ab502d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleModuleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleModuleTest.java @@ -37,7 +37,6 @@ public class ShuffleModuleTest { private ShuffleModule shuffleModule; - private Injector injector; @Before public void setup() @@ -50,16 +49,9 @@ public void testGetShuffleMetricsWhenShuffleMonitorExists() { final ShuffleMonitor shuffleMonitor = new ShuffleMonitor(); final MonitorScheduler monitorScheduler = Mockito.mock(MonitorScheduler.class); - Mockito.when(monitorScheduler.findMonitor(ArgumentMatchers.eq(ShuffleMonitor.class))) + Mockito.when(monitorScheduler.findMonitor(ShuffleMonitor.class)) .thenReturn(Optional.of(shuffleMonitor)); - injector = Guice.createInjector( - binder -> { - binder.bindScope(LazySingleton.class, Scopes.SINGLETON); - binder.bind(MonitorScheduler.class).toInstance(monitorScheduler); - binder.bind(IntermediaryDataManager.class).toInstance(Mockito.mock(IntermediaryDataManager.class)); - }, - shuffleModule - ); + final Injector injector = createInjector(monitorScheduler); final Optional optional = injector.getInstance( Key.get(new TypeLiteral>() {}) ); @@ -72,7 +64,16 @@ public void testGetShuffleMetricsWithNoShuffleMonitor() final MonitorScheduler monitorScheduler = Mockito.mock(MonitorScheduler.class); Mockito.when(monitorScheduler.findMonitor(ArgumentMatchers.eq(ShuffleMonitor.class))) .thenReturn(Optional.empty()); - injector = Guice.createInjector( + final Injector injector = createInjector(monitorScheduler); + final Optional optional = injector.getInstance( + Key.get(new TypeLiteral>() {}) + ); + Assert.assertFalse(optional.isPresent()); + } + + private Injector createInjector(MonitorScheduler monitorScheduler) + { + return Guice.createInjector( binder -> { binder.bindScope(LazySingleton.class, Scopes.SINGLETON); binder.bind(MonitorScheduler.class).toInstance(monitorScheduler); @@ -80,9 +81,5 @@ public void testGetShuffleMetricsWithNoShuffleMonitor() }, shuffleModule ); - final Optional optional = injector.getInstance( - Key.get(new TypeLiteral>() {}) - ); - Assert.assertFalse(optional.isPresent()); } }