Skip to content

Commit

Permalink
backport projections (#17257)
Browse files Browse the repository at this point in the history
* abstract `IncrementalIndex` cursor stuff to prepare for using different "views" of the data based on the cursor build spec (#17064)

* abstract `IncrementalIndex` cursor stuff to prepare to allow for possibility of using different "views" of the data based on the cursor build spec
changes:
* introduce `IncrementalIndexRowSelector` interface to capture how `IncrementalIndexCursor` and `IncrementalIndexColumnSelectorFactory` read data
* `IncrementalIndex` implements `IncrementalIndexRowSelector`
* move `FactsHolder` interface to separate file
* other minor refactorings

* add DataSchema.Builder to tidy stuff up a bit (#17065)

* add DataSchema.Builder to tidy stuff up a bit

* fixes

* fixes

* more style fixes

* review stuff

* Projections prototype (#17214)
  • Loading branch information
clintropolis authored Oct 5, 2024
1 parent 1435b9f commit 7b3fc4e
Show file tree
Hide file tree
Showing 191 changed files with 7,451 additions and 2,580 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.indexing.DataSchema;
Expand Down Expand Up @@ -66,18 +65,19 @@ public static Task getTask()
null,
null,
new IndexTask.IndexIngestionSpec(
new DataSchema(
"foo",
new TimestampSpec(null, null, null),
DimensionsSpec.EMPTY,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
new UniformGranularitySpec(
Granularities.DAY,
null,
ImmutableList.of(Intervals.of("2010-01-01/P2D"))
),
null
),
DataSchema.builder()
.withDataSource("foo")
.withTimestamp(new TimestampSpec(null, null, null))
.withDimensions(DimensionsSpec.EMPTY)
.withAggregators(new DoubleSumAggregatorFactory("met", "met"))
.withGranularity(
new UniformGranularitySpec(
Granularities.DAY,
null,
ImmutableList.of(Intervals.of("2010-01-01/P2D"))
)
)
.build(),
new IndexTask.IndexIOConfig(
new LocalInputSource(new File("lol"), "rofl"),
new NoopInputFormat(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
Expand Down Expand Up @@ -211,14 +210,13 @@ public HadoopIndexTask createTask(Interval interval, String version, List<DataSe
);

// generate DataSchema
DataSchema dataSchema = new DataSchema(
dataSourceName,
parser,
aggregators,
granularitySpec,
TransformSpec.NONE,
objectMapper
);
DataSchema dataSchema = DataSchema.builder()
.withDataSource(dataSourceName)
.withParserMap(parser)
.withAggregators(aggregators)
.withGranularity(granularitySpec)
.withObjectMapper(objectMapper)
.build();

// generate DatasourceIngestionSpec
DatasourceIngestionSpec datasourceIngestionSpec = new DatasourceIngestionSpec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
Expand Down Expand Up @@ -237,14 +236,10 @@ public void testCheckSegmentsAndSubmitTasks()
Map<Interval, HadoopIndexTask> runningTasks = runningTasksPair.lhs;
Map<Interval, String> runningVersion = runningTasksPair.rhs;

DataSchema dataSchema = new DataSchema(
"test_datasource",
null,
null,
null,
TransformSpec.NONE,
objectMapper
);
DataSchema dataSchema = DataSchema.builder()
.withDataSource("test_datasource")
.withObjectMapper(objectMapper)
.build();
HadoopIOConfig hadoopIOConfig = new HadoopIOConfig(new HashMap<>(), null, null);
HadoopIngestionSpec spec = new HadoopIngestionSpec(dataSchema, hadoopIOConfig, null);
HadoopIndexTask task1 = new HadoopIndexTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
Expand All @@ -44,7 +43,6 @@
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
Expand Down Expand Up @@ -102,16 +100,19 @@ private static DataSchema getDataSchema(String dataSource)
dimensions.add(StringDimensionSchema.create("dim1"));
dimensions.add(StringDimensionSchema.create("dim2"));

return new DataSchema(
dataSource,
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(dimensions),
new AggregatorFactory[] {new CountAggregatorFactory("rows")},
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.NONE,
ImmutableList.of()),
null);
return DataSchema.builder()
.withDataSource(dataSource)
.withTimestamp(new TimestampSpec("timestamp", "iso", null))
.withDimensions(dimensions)
.withAggregators(new CountAggregatorFactory("rows"))
.withGranularity(
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.NONE,
ImmutableList.of()
)
)
.build();
}

@BeforeClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,26 @@ public boolean equals(Object o)
&& stringEncoding == that.stringEncoding;
}

@Nullable
@Override
public AggregatorFactory substituteCombiningFactory(AggregatorFactory preAggregated)
{
if (this == preAggregated) {
return getCombiningFactory();
}
if (getClass() != preAggregated.getClass()) {
return null;
}
HllSketchAggregatorFactory that = (HllSketchAggregatorFactory) preAggregated;
if (lgK == that.lgK && tgtHllType == that.tgtHllType && stringEncoding == that.stringEncoding && Objects.equals(
fieldName,
that.fieldName
)) {
return getCombiningFactory();
}
return null;
}

@Override
public int hashCode()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,42 @@ public void testEqualsOtherMatches()
Assert.assertArrayEquals(target.getCacheKey(), other.getCacheKey());
}

@Test
public void testCanSubstitute()
{
HllSketchBuildAggregatorFactory factory = new HllSketchBuildAggregatorFactory(
NAME,
FIELD_NAME,
LG_K,
TGT_HLL_TYPE,
STRING_ENCODING,
true,
true
);
HllSketchBuildAggregatorFactory other = new HllSketchBuildAggregatorFactory(
"other name",
FIELD_NAME,
LG_K,
TGT_HLL_TYPE,
STRING_ENCODING,
false,
false
);

HllSketchBuildAggregatorFactory incompatible = new HllSketchBuildAggregatorFactory(
NAME,
"different field",
LG_K,
TGT_HLL_TYPE,
STRING_ENCODING,
false,
false
);
Assert.assertNotNull(other.substituteCombiningFactory(factory));
Assert.assertNotNull(factory.substituteCombiningFactory(other));
Assert.assertNull(factory.substituteCombiningFactory(incompatible));
}

@Test
public void testToString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.filter.SelectorDimFilter;
Expand Down Expand Up @@ -1262,28 +1261,27 @@ public void testKafkaRecordEntityInputFormat() throws Exception

final KafkaIndexTask task = createTask(
null,
new DataSchema(
"test_ds",
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(
Arrays.asList(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat"),
new StringDimensionSchema("kafka.topic"),
new LongDimensionSchema("kafka.offset"),
new StringDimensionSchema("kafka.header.encoding")
)
),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
},
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
null
),
DataSchema.builder()
.withDataSource("test_ds")
.withTimestamp(new TimestampSpec("timestamp", "iso", null))
.withDimensions(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat"),
new StringDimensionSchema("kafka.topic"),
new LongDimensionSchema("kafka.offset"),
new StringDimensionSchema("kafka.header.encoding")
)
.withAggregators(
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
)
.withGranularity(
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null)
)
.build(),
new KafkaIndexTaskIOConfig(
0,
"sequence0",
Expand Down Expand Up @@ -1337,26 +1335,25 @@ public void testKafkaInputFormat() throws Exception

final KafkaIndexTask task = createTask(
null,
new DataSchema(
"test_ds",
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(
Arrays.asList(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat"),
new StringDimensionSchema("kafka.testheader.encoding")
)
),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
},
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
null
),
DataSchema.builder()
.withDataSource("test_ds")
.withTimestamp(new TimestampSpec("timestamp", "iso", null))
.withDimensions(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat"),
new StringDimensionSchema("kafka.testheader.encoding")
)
.withAggregators(
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
)
.withGranularity(
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null)
)
.build(),
new KafkaIndexTaskIOConfig(
0,
"sequence0",
Expand Down Expand Up @@ -2887,16 +2884,7 @@ private KafkaIndexTask createTask(

private static DataSchema cloneDataSchema(final DataSchema dataSchema)
{
return new DataSchema(
dataSchema.getDataSource(),
dataSchema.getTimestampSpec(),
dataSchema.getDimensionsSpec(),
dataSchema.getAggregators(),
dataSchema.getGranularitySpec(),
dataSchema.getTransformSpec(),
dataSchema.getParserMap(),
OBJECT_MAPPER
);
return DataSchema.builder(dataSchema).withObjectMapper(OBJECT_MAPPER).build();
}

@Override
Expand Down
Loading

0 comments on commit 7b3fc4e

Please sign in to comment.