diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java index 6daaad9d73e3..eec08ff133df 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java @@ -199,7 +199,8 @@ public HadoopIndexTask createTask(Interval interval, String version, List { @Nullable private Granularity rollupGranularity = null; + private DeterminePartitionsJobSampler sampler; @Override protected void setup(Context context) @@ -343,6 +355,7 @@ protected void setup(Context context) { super.setup(context); rollupGranularity = getConfig().getGranularitySpec().getQueryGranularity(); + sampler = createSampler(getConfig()); } @Override @@ -355,10 +368,14 @@ protected void innerMap( rollupGranularity.bucketStart(inputRow.getTimestamp()).getMillis(), inputRow ); - context.write( - new BytesWritable(HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsBytes(groupKey)), - NullWritable.get() - ); + + final byte[] groupKeyBytes = HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsBytes(groupKey); + if (sampler.shouldEmitRow(groupKeyBytes)) { + context.write( + new BytesWritable(groupKeyBytes), + NullWritable.get() + ); + } context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_PROCESSED_COUNTER).increment(1); } @@ -413,6 +430,7 @@ public static class DeterminePartitionsDimSelectionAssumeGroupedMapper extends HadoopDruidIndexerMapper { private DeterminePartitionsDimSelectionMapperHelper helper; + private DeterminePartitionsJobSampler sampler; @Override protected void setup(Context context) @@ -421,6 +439,7 @@ protected void setup(Context context) super.setup(context); final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); helper = new DeterminePartitionsDimSelectionMapperHelper(config); + sampler = createSampler(getConfig()); } @Override @@ -429,11 +448,13 @@ protected void innerMap( Context context ) throws IOException, InterruptedException { - final Map> dims = new HashMap<>(); - for (final String dim : inputRow.getDimensions()) { - dims.put(dim, inputRow.getDimension(dim)); + if (sampler.shouldEmitRow()) { + final Map> dims = new HashMap<>(); + for (final String dim : inputRow.getDimensions()) { + dims.put(dim, inputRow.getDimension(dim)); + } + helper.emitDimValueCounts(context, DateTimes.utc(inputRow.getTimestampFromEpoch()), dims); } - helper.emitDimValueCounts(context, DateTimes.utc(inputRow.getTimestampFromEpoch()), dims); } } @@ -705,6 +726,7 @@ protected void innerReduce(Context context, SortableBytes keyBytes, Iterable, DimPartitions> dimPartitionss = new HashMap<>(); final DimensionRangePartitionsSpec partitionsSpec = (DimensionRangePartitionsSpec) config.getPartitionsSpec(); + final DeterminePartitionsJobSampler sampler = createSampler(config); while (iterator.hasNext()) { final DimValueCount dvc = iterator.next(); @@ -728,7 +750,7 @@ protected void innerReduce(Context context, SortableBytes keyBytes, Iterable 0 && currentDimPartition.rows + dvc.numRows > config.getTargetPartitionSize()) { + if (currentDimPartition.rows > 0 && currentDimPartition.rows + dvc.numRows > sampler.getSampledTargetPartitionSize()) { final ShardSpec shardSpec = createShardSpec( partitionsSpec instanceof SingleDimensionPartitionsSpec, currentDimPartitions.dims, @@ -764,7 +786,7 @@ protected void innerReduce(Context context, SortableBytes keyBytes, Iterable partitionsSpec.getMaxRowsPerSegment()) { + if (partition.rows > sampler.getSampledMaxRowsPerSegment()) { log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dims, partition.shardSpec); oversized = true; } @@ -861,7 +883,7 @@ protected void innerReduce(Context context, SortableBytes keyBytes, Iterable maxCardinality) { maxCardinality = cardinality; diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJobSampler.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJobSampler.java new file mode 100644 index 000000000000..d26e818e9246 --- /dev/null +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJobSampler.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer; + +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; + +import java.util.concurrent.ThreadLocalRandom; + +public class DeterminePartitionsJobSampler +{ + private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32(); + + private final int samplingFactor; + + private final int sampledTargetPartitionSize; + + private final int sampledMaxRowsPerSegment; + + public DeterminePartitionsJobSampler(int samplingFactor, int targetPartitionSize, int maxRowsPerSegment) + { + this.samplingFactor = Math.max(samplingFactor, 1); + this.sampledTargetPartitionSize = targetPartitionSize / this.samplingFactor; + this.sampledMaxRowsPerSegment = maxRowsPerSegment / this.samplingFactor; + } + + /** + * If input rows is duplicate, we can use hash and mod to do sample. As we hash on whole group key, + * there will not likely data skew if the hash function is balanced enough. + */ + boolean shouldEmitRow(byte[] groupKeyBytes) + { + return samplingFactor == 1 || HASH_FUNCTION.hashBytes(groupKeyBytes).asInt() % samplingFactor == 0; + } + + /** + * If input rows is not duplicate, we can sample at random. + */ + boolean shouldEmitRow() + { + return samplingFactor == 1 || ThreadLocalRandom.current().nextInt(samplingFactor) == 0; + } + + public int getSampledTargetPartitionSize() + { + return sampledTargetPartitionSize; + } + + public int getSampledMaxRowsPerSegment() + { + return sampledMaxRowsPerSegment; + } +} diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java index 7371e70653f2..3a18bf10d9fe 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java @@ -41,6 +41,8 @@ @JsonTypeName("hadoop") public class HadoopTuningConfig implements TuningConfig { + public static final int DEFAULT_DETERMINE_PARTITIONS_SAMPLING_FACTOR = 1; + private static final DimensionBasedPartitionsSpec DEFAULT_PARTITIONS_SPEC = HashedPartitionsSpec.defaultSpec(); private static final Map> DEFAULT_SHARD_SPECS = ImmutableMap.of(); private static final IndexSpec DEFAULT_INDEX_SPEC = IndexSpec.DEFAULT; @@ -74,7 +76,8 @@ public static HadoopTuningConfig makeDefaultTuningConfig() null, null, null, - null + null, + DEFAULT_DETERMINE_PARTITIONS_SAMPLING_FACTOR ); } @Nullable @@ -102,6 +105,15 @@ public static HadoopTuningConfig makeDefaultTuningConfig() private final int maxParseExceptions; private final boolean useYarnRMJobStatusFallback; private final long awaitSegmentAvailabilityTimeoutMillis; + // The sample parameter is only used for range partition spec now. When using range + // partition spec, we need launch many mapper and one reducer to do global sorting and + // find the upper and lower bound for every segment. This mr job may cost a lot of time + // if the input data is large. So we can sample the input data and make the mr job run + // faster. After all, we don't need a segment size which exactly equals targetRowsPerSegment. + // For example, if we ingest 10,000,000,000 rows and the targetRowsPerSegment is 5,000,000, + // we can sample by 500, so the mr job need only process 20,000,000 rows, this helps save + // a lot of time. + private final int determinePartitionsSamplingFactor; @JsonCreator public HadoopTuningConfig( @@ -130,7 +142,8 @@ public HadoopTuningConfig( final @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, final @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, final @JsonProperty("useYarnRMJobStatusFallback") @Nullable Boolean useYarnRMJobStatusFallback, - final @JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis + final @JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis, + final @JsonProperty("determinePartitionsSamplingFactor") @Nullable Integer determinePartitionsSamplingFactor ) { this.workingPath = workingPath; @@ -182,6 +195,11 @@ public HadoopTuningConfig( } else { this.awaitSegmentAvailabilityTimeoutMillis = awaitSegmentAvailabilityTimeoutMillis; } + if (determinePartitionsSamplingFactor == null || determinePartitionsSamplingFactor < 1) { + this.determinePartitionsSamplingFactor = 1; + } else { + this.determinePartitionsSamplingFactor = determinePartitionsSamplingFactor; + } } @Nullable @@ -336,6 +354,12 @@ public long getAwaitSegmentAvailabilityTimeoutMillis() return awaitSegmentAvailabilityTimeoutMillis; } + @JsonProperty + public int getDeterminePartitionsSamplingFactor() + { + return determinePartitionsSamplingFactor; + } + public HadoopTuningConfig withWorkingPath(String path) { return new HadoopTuningConfig( @@ -363,7 +387,8 @@ public HadoopTuningConfig withWorkingPath(String path) logParseExceptions, maxParseExceptions, useYarnRMJobStatusFallback, - awaitSegmentAvailabilityTimeoutMillis + awaitSegmentAvailabilityTimeoutMillis, + determinePartitionsSamplingFactor ); } @@ -394,7 +419,8 @@ public HadoopTuningConfig withVersion(String ver) logParseExceptions, maxParseExceptions, useYarnRMJobStatusFallback, - awaitSegmentAvailabilityTimeoutMillis + awaitSegmentAvailabilityTimeoutMillis, + determinePartitionsSamplingFactor ); } @@ -425,7 +451,8 @@ public HadoopTuningConfig withShardSpecs(Map> specs logParseExceptions, maxParseExceptions, useYarnRMJobStatusFallback, - awaitSegmentAvailabilityTimeoutMillis + awaitSegmentAvailabilityTimeoutMillis, + determinePartitionsSamplingFactor ); } } diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java index 1e92584e7195..ce725230b963 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java @@ -489,7 +489,8 @@ private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig( null, null, null, - null + null, + 1 ) ) ); diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java index 5f7b0157fba0..fb1ff1520a28 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java @@ -231,7 +231,8 @@ public DetermineHashedPartitionsJobTest( null, null, null, - null + null, + 1 ) ); this.indexerConfig = new HadoopDruidIndexerConfig(ingestionSpec); diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobSamplerTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobSamplerTest.java new file mode 100644 index 000000000000..21f6be8114d5 --- /dev/null +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobSamplerTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer; + +import org.junit.Assert; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.util.UUID; + +public class DeterminePartitionsJobSamplerTest +{ + @Test + public void testSampled() + { + int samplingFactor = 10; + int targetPartitionSize = 1000000; + int maxRowsPerSegment = 5000000; + DeterminePartitionsJobSampler sampler = new DeterminePartitionsJobSampler( + samplingFactor, + targetPartitionSize, + maxRowsPerSegment + ); + Assert.assertEquals(100000, sampler.getSampledTargetPartitionSize()); + Assert.assertEquals(500000, sampler.getSampledMaxRowsPerSegment()); + } + + @Test + public void testNotSampled() + { + int samplingFactor = 0; + int targetPartitionSize = 1000000; + int maxRowsPerSegment = 5000000; + DeterminePartitionsJobSampler sampler = new DeterminePartitionsJobSampler( + samplingFactor, + targetPartitionSize, + maxRowsPerSegment + ); + Assert.assertEquals(targetPartitionSize, sampler.getSampledTargetPartitionSize()); + Assert.assertEquals(maxRowsPerSegment, sampler.getSampledMaxRowsPerSegment()); + } + + @Test + public void testShouldEmitRowByHash() + { + int samplingFactor = 10; + DeterminePartitionsJobSampler sampler = new DeterminePartitionsJobSampler( + samplingFactor, + 1000, + 5000 + ); + long total = 100000L; + long hit = 0; + for (long i = 0; i < total; i++) { + String str = UUID.randomUUID().toString(); + if (sampler.shouldEmitRow(str.getBytes(StandardCharsets.UTF_8))) { + hit++; + } + } + double expect = total * 1.0 / samplingFactor; + double error = Math.abs(hit - expect) / expect; + Assert.assertTrue(error < 0.01); + } + + @Test + public void testShouldEmitRowByRandom() + { + int samplingFactor = 10; + DeterminePartitionsJobSampler sampler = new DeterminePartitionsJobSampler( + samplingFactor, + 1000, + 5000 + ); + long total = 1000000L; + long hit = 0; + for (long i = 0; i < total; i++) { + if (sampler.shouldEmitRow()) { + hit++; + } + } + double expect = total * 1.0 / samplingFactor; + double error = Math.abs(hit - expect) / expect; + Assert.assertTrue(error < 0.01); + } +} diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java index aedc3695d207..5f338936a409 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java @@ -342,7 +342,8 @@ public DeterminePartitionsJobTest( null, null, null, - null + null, + 1 ) ) ); diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineRangePartitionsJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineRangePartitionsJobTest.java index 83a9bd58e57a..f10a898d1260 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineRangePartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineRangePartitionsJobTest.java @@ -397,7 +397,8 @@ public DetermineRangePartitionsJobTest( null, null, null, - null + null, + 1 ) ) ); diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java index f1352cece9d7..f69ad04915c5 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -280,7 +280,8 @@ HadoopIngestionSpec build() null, null, null, - null + null, + 1 ); return new HadoopIngestionSpec( diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java index 7bd993d2382e..97c764dd10b1 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java @@ -63,7 +63,8 @@ public void testSerde() throws Exception null, null, null, - null + null, + 1 ); HadoopTuningConfig actual = jsonReadWriteRead(JSON_MAPPER.writeValueAsString(expected), HadoopTuningConfig.class); diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java index e38bc1b1ad6f..2b8fc6749e31 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java @@ -547,7 +547,8 @@ public void setUp() throws Exception null, null, null, - null + null, + 1 ) ) ); diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java index 5956bbe6ffa5..3c1b80166c51 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java @@ -185,7 +185,8 @@ public void setup() throws Exception null, null, null, - null + null, + 1 ) ) ); diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java index fcecb0bf0782..d1aee3d66992 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java @@ -79,7 +79,8 @@ public class GranularityPathSpecTest null, null, null, - null + null, + 1 ); private GranularityPathSpec granularityPathSpec;