Skip to content

Commit

Permalink
Proceed to next time window when no segment match for RealtimeToOffli…
Browse files Browse the repository at this point in the history
…neTask (#7814)
  • Loading branch information
Jackie-Jiang authored Nov 23, 2021
1 parent cce4932 commit 767aa8a
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,8 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
Map<String, TaskState> incompleteTasks =
TaskGeneratorUtils.getIncompleteTasks(taskType, realtimeTableName, _clusterInfoAccessor);
if (!incompleteTasks.isEmpty()) {
LOGGER
.warn("Found incomplete tasks: {} for same table: {}. Skipping task generation.", incompleteTasks.keySet(),
realtimeTableName);
LOGGER.warn("Found incomplete tasks: {} for same table: {}. Skipping task generation.",
incompleteTasks.keySet(), realtimeTableName);
continue;
}

Expand All @@ -132,16 +131,15 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
getCompletedSegmentsInfo(realtimeTableName, completedSegmentsZKMetadata, partitionToLatestCompletedSegmentName,
allPartitions);
if (completedSegmentsZKMetadata.isEmpty()) {
LOGGER
.info("No realtime-completed segments found for table: {}, skipping task generation: {}", realtimeTableName,
taskType);
LOGGER.info("No realtime-completed segments found for table: {}, skipping task generation: {}",
realtimeTableName, taskType);
continue;
}
allPartitions.removeAll(partitionToLatestCompletedSegmentName.keySet());
if (!allPartitions.isEmpty()) {
LOGGER
.info("Partitions: {} have no completed segments. Table: {} is not ready for {}. Skipping task generation.",
allPartitions, realtimeTableName, taskType);
LOGGER.info(
"Partitions: {} have no completed segments. Table: {} is not ready for {}. Skipping task generation.",
allPartitions, realtimeTableName, taskType);
continue;
}

Expand All @@ -163,47 +161,53 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
long windowStartMs = getWatermarkMs(realtimeTableName, completedSegmentsZKMetadata, bucketMs);
long windowEndMs = windowStartMs + bucketMs;

// Check that execution window is older than bufferTime
if (windowEndMs > System.currentTimeMillis() - bufferMs) {
LOGGER.info(
"Window with start: {} and end: {} is not older than buffer time: {} configured as {} ago. Skipping task "
+ "generation: {}",
windowStartMs, windowEndMs, bufferMs, bufferTimePeriod, taskType);
continue;
}

// Find all COMPLETED segments with data overlapping execution window: windowStart (inclusive) to windowEnd
// (exclusive)
List<String> segmentNames = new ArrayList<>();
List<String> downloadURLs = new ArrayList<>();
Set<String> lastCompletedSegmentPerPartition = new HashSet<>(partitionToLatestCompletedSegmentName.values());
boolean skipGenerate = false;
for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) {
String segmentName = segmentZKMetadata.getSegmentName();
long segmentStartTimeMs = segmentZKMetadata.getStartTimeMs();
long segmentEndTimeMs = segmentZKMetadata.getEndTimeMs();

// Check overlap with window
if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < windowEndMs) {
// If last completed segment is being used, make sure that segment crosses over end of window.
// In the absence of this check, CONSUMING segments could contain some portion of the window. That data
// would be skipped forever.
if (lastCompletedSegmentPerPartition.contains(segmentName) && segmentEndTimeMs < windowEndMs) {
LOGGER.info(
"Window data overflows into CONSUMING segments for partition of segment: {}. Skipping task "
+ "generation: {}",
segmentName, taskType);
skipGenerate = true;
break;
while (true) {
// Check that execution window is older than bufferTime
if (windowEndMs > System.currentTimeMillis() - bufferMs) {
LOGGER.info(
"Window with start: {} and end: {} is not older than buffer time: {} configured as {} ago. Skipping task "
+ "generation: {}", windowStartMs, windowEndMs, bufferMs, bufferTimePeriod, taskType);
skipGenerate = true;
break;
}

for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) {
String segmentName = segmentZKMetadata.getSegmentName();
long segmentStartTimeMs = segmentZKMetadata.getStartTimeMs();
long segmentEndTimeMs = segmentZKMetadata.getEndTimeMs();

// Check overlap with window
if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < windowEndMs) {
// If last completed segment is being used, make sure that segment crosses over end of window.
// In the absence of this check, CONSUMING segments could contain some portion of the window. That data
// would be skipped forever.
if (lastCompletedSegmentPerPartition.contains(segmentName) && segmentEndTimeMs < windowEndMs) {
LOGGER.info("Window data overflows into CONSUMING segments for partition of segment: {}. Skipping task "
+ "generation: {}", segmentName, taskType);
skipGenerate = true;
break;
}
segmentNames.add(segmentName);
downloadURLs.add(segmentZKMetadata.getDownloadUrl());
}
segmentNames.add(segmentName);
downloadURLs.add(segmentZKMetadata.getDownloadUrl());
}
if (skipGenerate || !segmentNames.isEmpty()) {
break;
}

LOGGER.info("Found no eligible segments for task: {} with window [{} - {}), moving to the next time bucket",
taskType, windowStartMs, windowEndMs);
windowStartMs = windowEndMs;
windowEndMs += bucketMs;
}

if (segmentNames.isEmpty() || skipGenerate) {
LOGGER.info("Found no eligible segments for task: {} with window [{} - {}). Skipping task generation", taskType,
windowStartMs, windowEndMs);
if (skipGenerate) {
continue;
}

Expand Down Expand Up @@ -264,8 +268,8 @@ private void getCompletedSegmentsInfo(String realtimeTableName, List<SegmentZKMe

if (segmentZKMetadata.getStatus().equals(Segment.Realtime.Status.DONE)) {
completedSegmentsZKMetadata.add(segmentZKMetadata);
latestLLCSegmentNameMap
.compute(llcSegmentName.getPartitionGroupId(), (partitionGroupId, latestLLCSegmentName) -> {
latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionGroupId(),
(partitionGroupId, latestLLCSegmentName) -> {
if (latestLLCSegmentName == null) {
return llcSegmentName;
} else {
Expand All @@ -291,11 +295,12 @@ private void getCompletedSegmentsInfo(String realtimeTableName, List<SegmentZKMe
*/
private long getWatermarkMs(String realtimeTableName, List<SegmentZKMetadata> completedSegmentsZKMetadata,
long bucketMs) {
ZNRecord realtimeToOfflineZNRecord = _clusterInfoAccessor
.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, realtimeTableName);
ZNRecord realtimeToOfflineZNRecord =
_clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
realtimeTableName);
RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata =
realtimeToOfflineZNRecord != null ? RealtimeToOfflineSegmentsTaskMetadata
.fromZNRecord(realtimeToOfflineZNRecord) : null;
realtimeToOfflineZNRecord != null ? RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(
realtimeToOfflineZNRecord) : null;

if (realtimeToOfflineSegmentsTaskMetadata == null) {
// No ZNode exists. Cold-start.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments;

import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -403,6 +404,37 @@ public void testOverflowIntoConsuming() {
assertEquals(pinotTaskConfigs.size(), 1);
}

/**
* Tests for task generation when there is time gap between segments.
*/
@Test
public void testTimeGap() {
Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>());
TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);

ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new HashMap<>());
when(mockClusterInfoProvide.getMinionTaskMetadataZNRecord(RealtimeToOfflineSegmentsTask.TASK_TYPE,
REALTIME_TABLE_NAME)).thenReturn(
new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 1590019200000L).toZNRecord()); // 21 May 2020 UTC
SegmentZKMetadata segmentZKMetadata =
getSegmentZKMetadata("testTable__0__1__12345", Status.DONE, 1590220800000L, 1590307200000L,
TimeUnit.MILLISECONDS, "download2"); // 05-23-2020T08:00:00 UTC to 05-24-2020T08:00:00 UTC
when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(
Collections.singletonList(segmentZKMetadata));

RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator();
generator.init(mockClusterInfoProvide);

// Generated task should skip 2 days and have time window of [23 May 2020 UTC, 24 May 2020 UTC)
List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
assertEquals(pinotTaskConfigs.size(), 1);
Map<String, String> configs = pinotTaskConfigs.get(0).getConfigs();
assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY), "1590192000000");
assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY), "1590278400000");
}

@Test
public void testBuffer() {
Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
Expand Down

0 comments on commit 767aa8a

Please sign in to comment.