Skip to content

Commit

Permalink
Allow missing intervals for Parallel task with hash/range partitioning (
Browse files Browse the repository at this point in the history
apache#10592)

* Allow missing intervals for Parallel task

* fix row filter

* fix tests

* fix log
  • Loading branch information
jihoonson authored and JulianJaffePinterest committed Jan 22, 2021
1 parent 0f6a3f6 commit 741a4ef
Show file tree
Hide file tree
Showing 23 changed files with 521 additions and 242 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.common.actions;

import java.io.IOException;

/**
* A {@link TaskActionClient} that wraps a given {@link TaskAction} with {@link SurrogateAction}.
* All subtasks of {@link org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask} must
* use this client or wrap taskActions manually.
*/
public class SurrogateTaskActionClient implements TaskActionClient
{
private final String supervisorTaskId;
private final TaskActionClient delegate;

public SurrogateTaskActionClient(String supervisorTaskId, TaskActionClient delegate)
{
this.supervisorTaskId = supervisorTaskId;
this.delegate = delegate;
}

@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOException
{
return delegate.submit(new SurrogateAction<>(supervisorTaskId, taskAction));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SurrogateAction;
import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.TaskLockHelper.OverwritingRootGenerationPartitions;
import org.apache.druid.indexing.common.task.batch.parallel.SupervisorTaskAccess;
import org.apache.druid.java.util.common.ISE;
Expand Down Expand Up @@ -58,8 +59,12 @@ public class OverlordCoordinatingSegmentAllocator implements SegmentAllocatorFor
final PartitionsSpec partitionsSpec
)
{
final TaskActionClient taskActionClient =
supervisorTaskAccess == null
? toolbox.getTaskActionClient()
: new SurrogateTaskActionClient(supervisorTaskAccess.getSupervisorTaskId(), toolbox.getTaskActionClient());
this.internalAllocator = new ActionBasedSegmentAllocator(
toolbox.getTaskActionClient(),
taskActionClient,
dataSchema,
(schema, row, sequenceName, previousSegmentId, skipSegmentLineageCheck) -> {
final GranularitySpec granularitySpec = schema.getGranularitySpec();
Expand All @@ -72,34 +77,17 @@ public class OverlordCoordinatingSegmentAllocator implements SegmentAllocatorFor
taskLockHelper,
interval
);
if (supervisorTaskAccess != null) {
return new SurrogateAction<>(
supervisorTaskAccess.getSupervisorTaskId(),
new SegmentAllocateAction(
schema.getDataSource(),
row.getTimestamp(),
schema.getGranularitySpec().getQueryGranularity(),
schema.getGranularitySpec().getSegmentGranularity(),
sequenceName,
previousSegmentId,
skipSegmentLineageCheck,
partialShardSpec,
taskLockHelper.getLockGranularityToUse()
)
);
} else {
return new SegmentAllocateAction(
schema.getDataSource(),
row.getTimestamp(),
schema.getGranularitySpec().getQueryGranularity(),
schema.getGranularitySpec().getSegmentGranularity(),
sequenceName,
previousSegmentId,
skipSegmentLineageCheck,
partialShardSpec,
taskLockHelper.getLockGranularityToUse()
);
}
return new SegmentAllocateAction(
schema.getDataSource(),
row.getTimestamp(),
schema.getGranularitySpec().getQueryGranularity(),
schema.getGranularitySpec().getSegmentGranularity(),
sequenceName,
previousSegmentId,
skipSegmentLineageCheck,
partialShardSpec,
taskLockHelper.getLockGranularityToUse()
);
}
);
this.sequenceNameFunction = new LinearlyPartitionedSequenceNameFunction(taskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ public ParallelIndexIngestionSpec(
this.tuningConfig = tuningConfig == null ? ParallelIndexTuningConfig.defaultConfig() : tuningConfig;
}

public ParallelIndexIngestionSpec withDataSchema(DataSchema dataSchema)
{
return new ParallelIndexIngestionSpec(dataSchema, ioConfig, tuningConfig);
}

@Override
@JsonProperty("dataSchema")
public DataSchema getDataSchema()
Expand Down
Loading

0 comments on commit 741a4ef

Please sign in to comment.