Skip to content

Commit

Permalink
[Transform] do not fail checkpoint creation due to global checkpoint …
Browse files Browse the repository at this point in the history
…mismatch (#48423)

Take the max if global checkpoints mismatch instead of throwing an exception. It turned out global
checkpoints can mismatch by design

fixes #48379
  • Loading branch information
Hendrik Muhs committed Oct 24, 2019
1 parent 65c58ed commit ba1c13c
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
Expand Down Expand Up @@ -188,14 +188,12 @@ static Map<String, long[]> extractIndexCheckPoints(ShardStats[] shards, Set<Stri
if (checkpointsByIndex.containsKey(indexName)) {
// we have already seen this index, just check/add shards
TreeMap<Integer, Long> checkpoints = checkpointsByIndex.get(indexName);
if (checkpoints.containsKey(shard.getShardRouting().getId())) {
// there is already a checkpoint entry for this index/shard combination, check if they match
if (checkpoints.get(shard.getShardRouting().getId()) != globalCheckpoint) {
throw new CheckpointException("Global checkpoints mismatch for index [" + indexName + "] between shards of id ["
+ shard.getShardRouting().getId() + "]");
}
} else {
// 1st time we see this shard for this index, add the entry for the shard
// 1st time we see this shard for this index, add the entry for the shard
// or there is already a checkpoint entry for this index/shard combination
// but with a higher global checkpoint. This is by design(not a problem) and
// we take the higher value
if (checkpoints.containsKey(shard.getShardRouting().getId()) == false
|| checkpoints.get(shard.getShardRouting().getId()) < globalCheckpoint) {
checkpoints.put(shard.getShardRouting().getId(), globalCheckpoint);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@
import java.util.Map.Entry;
import java.util.Set;

import static org.hamcrest.Matchers.containsString;

public class TransformsCheckpointServiceTests extends ESTestCase {

public void testExtractIndexCheckpoints() {
Expand Down Expand Up @@ -104,11 +102,15 @@ public void testExtractIndexCheckpointsInconsistentGlobalCheckpoints() {

ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, randomBoolean(), true, false);

// fail
CheckpointException e = expectThrows(CheckpointException.class,
() -> DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices));
Map<String, long[]> checkpoints = DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices);

assertEquals(expectedCheckpoints.size(), checkpoints.size());
assertEquals(expectedCheckpoints.keySet(), checkpoints.keySet());

assertThat(e.getMessage(), containsString("Global checkpoints mismatch"));
// global checkpoints should be max() of all global checkpoints
for (Entry<String, long[]> entry : expectedCheckpoints.entrySet()) {
assertArrayEquals(entry.getValue(), checkpoints.get(entry.getKey()));
}
}

/**
Expand Down Expand Up @@ -176,8 +178,8 @@ private static ShardStats[] createRandomShardStats(Map<String, long[]> expectedC
}

// SeqNoStats asserts that checkpoints are logical
long localCheckpoint = randomLongBetween(0L, 100000000L);
long globalCheckpoint = randomBoolean() ? localCheckpoint : randomLongBetween(0L, 100000000L);
long localCheckpoint = randomLongBetween(100L, 100000000L);
long globalCheckpoint = randomBoolean() ? localCheckpoint : randomLongBetween(100L, 100000000L);
long maxSeqNo = Math.max(localCheckpoint, globalCheckpoint);

SeqNoStats validSeqNoStats = null;
Expand Down Expand Up @@ -221,7 +223,7 @@ private static ShardStats[] createRandomShardStats(Map<String, long[]> expectedC
if (inconsistentReplica == replica) {
// overwrite
SeqNoStats invalidSeqNoStats =
new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint + randomLongBetween(10L, 100L));
new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint - randomLongBetween(10L, 100L));
shardStats.add(
new ShardStats(shardRouting,
new ShardPath(false, path, path, shardId), stats, null, invalidSeqNoStats, null));
Expand Down

0 comments on commit ba1c13c

Please sign in to comment.