Skip to content

Commit

Permalink
Modify DataSegmentProvider to also return DataSegment (#17021) (#17217)
Browse files Browse the repository at this point in the history
Currently, TaskDataSegmentProvider fetches the DataSegment from the Coordinator while loading the segment, but just discards it later. This PR refactors this to also return the DataSegment so that it can be used by workers without a separate fetch.

Co-authored-by: Adarsh Sanjeev <adarshsanjeev@gmail.com>
  • Loading branch information
kfaraz and adarshsanjeev authored Oct 2, 2024
1 parent cf44747 commit 491087f
Show file tree
Hide file tree
Showing 15 changed files with 209 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.querykit.DataSegmentProvider;
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -70,7 +70,7 @@ public TaskDataSegmentProvider(
}

@Override
public Supplier<ResourceHolder<Segment>> fetchSegment(
public Supplier<ResourceHolder<CompleteSegment>> fetchSegment(
final SegmentId segmentId,
final ChannelCounters channelCounters,
final boolean isReindex
Expand All @@ -79,7 +79,7 @@ public Supplier<ResourceHolder<Segment>> fetchSegment(
// Returns Supplier<ResourceHolder> instead of ResourceHolder, so the Coordinator calls and segment downloads happen
// in processing threads, rather than the main thread. (They happen when fetchSegmentInternal is called.)
return () -> {
ResourceHolder<Segment> holder = null;
ResourceHolder<CompleteSegment> holder = null;

while (holder == null) {
holder = holders.computeIfAbsent(
Expand All @@ -99,7 +99,7 @@ public Supplier<ResourceHolder<Segment>> fetchSegment(
* Helper used by {@link #fetchSegment(SegmentId, ChannelCounters, boolean)}. Does the actual fetching of a segment, once it
* is determined that we definitely need to go out and get one.
*/
private ReferenceCountingResourceHolder<Segment> fetchSegmentInternal(
private ReferenceCountingResourceHolder<CompleteSegment> fetchSegmentInternal(
final SegmentId segmentId,
final ChannelCounters channelCounters,
final boolean isReindex
Expand Down Expand Up @@ -133,7 +133,7 @@ private ReferenceCountingResourceHolder<Segment> fetchSegmentInternal(
final int numRows = index.getNumRows();
final long size = dataSegment.getSize();
closer.register(() -> channelCounters.addFile(numRows, size));
return new ReferenceCountingResourceHolder<>(segment, closer);
return new ReferenceCountingResourceHolder<>(new CompleteSegment(dataSegment, segment), closer);
}
catch (IOException | SegmentLoadingException e) {
throw CloseableUtils.closeInCatch(
Expand All @@ -143,29 +143,29 @@ private ReferenceCountingResourceHolder<Segment> fetchSegmentInternal(
}
}

private static class SegmentHolder implements Supplier<ResourceHolder<Segment>>
private static class SegmentHolder implements Supplier<ResourceHolder<CompleteSegment>>
{
private final Supplier<ResourceHolder<Segment>> holderSupplier;
private final Supplier<ResourceHolder<CompleteSegment>> holderSupplier;
private final Closeable cleanupFn;

@GuardedBy("this")
private ReferenceCountingResourceHolder<Segment> holder;
private ReferenceCountingResourceHolder<CompleteSegment> holder;

@GuardedBy("this")
private boolean closing;

@GuardedBy("this")
private boolean closed;

public SegmentHolder(Supplier<ResourceHolder<Segment>> holderSupplier, Closeable cleanupFn)
public SegmentHolder(Supplier<ResourceHolder<CompleteSegment>> holderSupplier, Closeable cleanupFn)
{
this.holderSupplier = holderSupplier;
this.cleanupFn = cleanupFn;
}

@Override
@Nullable
public ResourceHolder<Segment> get()
public ResourceHolder<CompleteSegment> get()
{
synchronized (this) {
if (closing) {
Expand All @@ -183,7 +183,7 @@ public ResourceHolder<Segment> get()
// Then, return null so "fetchSegment" will try again.
return null;
} else if (holder == null) {
final ResourceHolder<Segment> segmentHolder = holderSupplier.get();
final ResourceHolder<CompleteSegment> segmentHolder = holderSupplier.get();
holder = new ReferenceCountingResourceHolder<>(
segmentHolder.get(),
() -> {
Expand All @@ -210,7 +210,7 @@ public ResourceHolder<Segment> get()
}
}
);
final ResourceHolder<Segment> retVal = holder.increment();
final ResourceHolder<CompleteSegment> retVal = holder.increment();
// Store already-closed holder, so it disappears when the last reference is closed.
holder.close();
return retVal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.druid.msq.input.table.RichSegmentDescriptor;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.msq.util.DimensionSchemaUtils;
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.RowBasedSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.column.ColumnHolder;
Expand Down Expand Up @@ -163,7 +164,7 @@ private static Iterator<SegmentWithDescriptor> inputSourceSegmentIterator(
signature
);
return new SegmentWithDescriptor(
() -> ResourceHolder.fromCloseable(segment),
() -> ResourceHolder.fromCloseable(new CompleteSegment(null, segment)),
new RichSegmentDescriptor(segmentId.toDescriptor(), null)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.druid.msq.input.table.RichSegmentDescriptor;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.InlineSegmentWrangler;
import org.apache.druid.segment.SegmentWrangler;
import org.apache.druid.timeline.SegmentId;
Expand Down Expand Up @@ -74,7 +75,7 @@ public ReadableInputs attach(
segmentWrangler.getSegmentsForIntervals(dataSource, Intervals.ONLY_ETERNITY),
segment -> ReadableInput.segment(
new SegmentWithDescriptor(
() -> ResourceHolder.fromCloseable(segment),
() -> ResourceHolder.fromCloseable(new CompleteSegment(null, segment)),
DUMMY_SEGMENT_DESCRIPTOR
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.msq.input.table.RichSegmentDescriptor;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentWrangler;
import org.apache.druid.timeline.SegmentId;
Expand Down Expand Up @@ -98,7 +99,7 @@ public ReadableInputs attach(
throw new ISE("Lookup[%s] has multiple segments; cannot read", lookupName);
}

return ResourceHolder.fromCloseable(segment);
return ResourceHolder.fromCloseable(new CompleteSegment(null, segment));
},
new RichSegmentDescriptor(SegmentId.dummy(lookupName).toDescriptor(), null)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Preconditions;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.Segment;

import java.util.Objects;
Expand All @@ -31,7 +32,7 @@
*/
public class SegmentWithDescriptor
{
private final Supplier<? extends ResourceHolder<Segment>> segmentSupplier;
private final Supplier<? extends ResourceHolder<CompleteSegment>> segmentSupplier;
private final RichSegmentDescriptor descriptor;

/**
Expand All @@ -42,7 +43,7 @@ public class SegmentWithDescriptor
* @param descriptor segment descriptor
*/
public SegmentWithDescriptor(
final Supplier<? extends ResourceHolder<Segment>> segmentSupplier,
final Supplier<? extends ResourceHolder<CompleteSegment>> segmentSupplier,
final RichSegmentDescriptor descriptor
)
{
Expand All @@ -59,7 +60,7 @@ public SegmentWithDescriptor(
* It is not necessary to call {@link Segment#close()} on the returned segment. Calling {@link ResourceHolder#close()}
* is enough.
*/
public ResourceHolder<Segment> getOrLoad()
public ResourceHolder<CompleteSegment> getOrLoad()
{
return segmentSupplier.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.timeline.SegmentId;

import java.util.function.Supplier;
Expand All @@ -35,7 +35,7 @@ public interface DataSegmentProvider
* <br>
* It is not necessary to call {@link ResourceHolder#close()} if you never call {@link Supplier#get()}.
*/
Supplier<ResourceHolder<Segment>> fetchSegment(
Supplier<ResourceHolder<CompleteSegment>> fetchSegment(
SegmentId segmentId,
ChannelCounters channelCounters,
boolean isReindex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.TimeBoundaryInspector;
import org.apache.druid.segment.column.RowSignature;
Expand Down Expand Up @@ -152,8 +152,8 @@ protected ReturnOrAwait<SegmentsInputSlice> runWithDataServerQuery(DataServerQue
protected ReturnOrAwait<Unit> runWithSegment(final SegmentWithDescriptor segment) throws IOException
{
if (resultYielder == null) {
final ResourceHolder<Segment> segmentHolder = closer.register(segment.getOrLoad());
final SegmentReference mappedSegment = mapSegment(segmentHolder.get());
final ResourceHolder<CompleteSegment> segmentHolder = closer.register(segment.getOrLoad());
final SegmentReference mappedSegment = mapSegment(segmentHolder.get().getSegment());

final Sequence<ResultRow> rowSequence =
groupingEngine.process(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.druid.query.scan.ScanQueryEngine;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.CursorHolder;
Expand Down Expand Up @@ -245,9 +246,9 @@ protected ReturnOrAwait<SegmentsInputSlice> runWithDataServerQuery(final DataSer
protected ReturnOrAwait<Unit> runWithSegment(final SegmentWithDescriptor segment) throws IOException
{
if (cursor == null) {
final ResourceHolder<Segment> segmentHolder = closer.register(segment.getOrLoad());
final ResourceHolder<CompleteSegment> segmentHolder = closer.register(segment.getOrLoad());

final Segment mappedSegment = mapSegment(segmentHolder.get());
final Segment mappedSegment = mapSegment(segmentHolder.get().getSegment());
final CursorFactory cursorFactory = mappedSegment.asCursorFactory();
if (cursorFactory == null) {
throw new ISE(
Expand All @@ -264,7 +265,7 @@ protected ReturnOrAwait<Unit> runWithSegment(final SegmentWithDescriptor segment
// No cursors!
return ReturnOrAwait.returnObject(Unit.instance());
} else {
final long rowsFlushed = setNextCursor(nextCursor, segmentHolder.get());
final long rowsFlushed = setNextCursor(nextCursor, segmentHolder.get().getSegment());
assert rowsFlushed == 0; // There's only ever one cursor when running with a segment
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.query.OrderBy;
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.DimensionHandler;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.ColumnHolder;
Expand Down Expand Up @@ -187,16 +187,16 @@ public void testConcurrency()
for (int i = 0; i < iterations; i++) {
final int expectedSegmentNumber = i % NUM_SEGMENTS;
final DataSegment segment = segments.get(expectedSegmentNumber);
final ListenableFuture<Supplier<ResourceHolder<Segment>>> f =
final ListenableFuture<Supplier<ResourceHolder<CompleteSegment>>> f =
exec.submit(() -> provider.fetchSegment(segment.getId(), new ChannelCounters(), false));

testFutures.add(
FutureUtils.transform(
f,
holderSupplier -> {
try {
final ResourceHolder<Segment> holder = holderSupplier.get();
Assert.assertEquals(segment.getId(), holder.get().getId());
final ResourceHolder<CompleteSegment> holder = holderSupplier.get();
Assert.assertEquals(segment.getId(), holder.get().getSegment().getId());

final String expectedStorageDir = DataSegmentPusher.getDefaultStorageDir(segment, false);
final File expectedFile = new File(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,14 @@
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.groupby.TestGroupByBuffers;
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexCursorFactory;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.loading.DataSegmentPusher;
Expand All @@ -86,6 +85,7 @@
import org.apache.druid.sql.calcite.util.TestDataBuilder;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.mockito.Mockito;
Expand Down Expand Up @@ -161,11 +161,10 @@ public String getFormatString()
)
);
ObjectMapper testMapper = MSQTestBase.setupObjectMapper(dummyInjector);
IndexIO indexIO = new IndexIO(testMapper, ColumnConfig.DEFAULT);
SegmentCacheManager segmentCacheManager = new SegmentCacheManagerFactory(TestIndex.INDEX_IO, testMapper)
.manufacturate(cacheManagerDir);
LocalDataSegmentPusherConfig config = new LocalDataSegmentPusherConfig();
MSQTestSegmentManager segmentManager = new MSQTestSegmentManager(segmentCacheManager, indexIO);
MSQTestSegmentManager segmentManager = new MSQTestSegmentManager(segmentCacheManager);
config.storageDirectory = storageDir;
binder.bind(DataSegmentPusher.class).toProvider(() -> new MSQTestDelegateDataSegmentPusher(
new LocalDataSegmentPusher(config),
Expand Down Expand Up @@ -206,7 +205,10 @@ private static DataServerQueryHandlerFactory getTestDataServerQueryHandlerFactor
return mockFactory;
}

private static Supplier<ResourceHolder<Segment>> getSupplierForSegment(Function<String, File> tempFolderProducer, SegmentId segmentId)
protected static Supplier<ResourceHolder<CompleteSegment>> getSupplierForSegment(
Function<String, File> tempFolderProducer,
SegmentId segmentId
)
{
final QueryableIndex index;
switch (segmentId.getDataSource()) {
Expand Down Expand Up @@ -450,6 +452,13 @@ public void close()
{
}
};
return () -> new ReferenceCountingResourceHolder<>(segment, Closer.create());
DataSegment dataSegment = DataSegment.builder()
.dataSource(segmentId.getDataSource())
.interval(segmentId.getInterval())
.version(segmentId.getVersion())
.shardSpec(new LinearShardSpec(0))
.size(0)
.build();
return () -> new ReferenceCountingResourceHolder<>(new CompleteSegment(dataSegment, segment), Closer.create());
}
}
Loading

0 comments on commit 491087f

Please sign in to comment.