From 9272aa21493eb16734b43d461ebc1f111087d60b Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Wed, 23 Aug 2023 13:47:26 +0000 Subject: [PATCH] [Remote Store] Fix tests when we restore index without any refresh (#9480) Signed-off-by: Sachin Kale --- .../RemoteStoreBaseIntegTestCase.java | 2 +- .../remotestore/RemoteStoreRestoreIT.java | 36 ++++++++-------- .../opensearch/index/shard/StoreRecovery.java | 20 ++++++--- .../org/opensearch/index/store/Store.java | 15 +++++-- .../index/translog/TranslogHeader.java | 32 ++++++++------ .../opensearch/index/store/StoreTests.java | 37 ++++++++++++++++ .../index/translog/TranslogHeaderTests.java | 43 +++++++++++++++++++ 7 files changed, 143 insertions(+), 42 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index f81edc1ff0e4d..90efafe9423c6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -72,7 +72,7 @@ protected Map indexData(int numberOfIterations, boolean invokeFlus Map indexingStats = new HashMap<>(); for (int i = 0; i < numberOfIterations; i++) { if (invokeFlush) { - flush(index); + flushAndRefresh(index); } else { refresh(index); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java index 507ab40084355..e9d8933961073 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java @@ -38,7 +38,6 @@ public class RemoteStoreRestoreIT extends RemoteStoreBaseIntegTestCase { private static final String TOTAL_OPERATIONS = "total-operations"; private static final String REFRESHED_OR_FLUSHED_OPERATIONS = "refreshed-or-flushed-operations"; private static final String MAX_SEQ_NO_TOTAL = "max-seq-no-total"; - private static final String MAX_SEQ_NO_REFRESHED_OR_FLUSHED = "max-seq-no-refreshed-or-flushed"; @Override public Settings indexSettings() { @@ -68,18 +67,18 @@ private void restore(String... indices) { ); } - private void verifyRestoredData(Map indexStats, boolean checkTotal, String indexName) { + private void verifyRestoredData(Map indexStats, String indexName) { // This is required to get updated number from already active shards which were not restored refresh(indexName); - String statsGranularity = checkTotal ? TOTAL_OPERATIONS : REFRESHED_OR_FLUSHED_OPERATIONS; - String maxSeqNoGranularity = checkTotal ? MAX_SEQ_NO_TOTAL : MAX_SEQ_NO_REFRESHED_OR_FLUSHED; ensureYellowAndNoInitializingShards(indexName); ensureGreen(indexName); - assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(statsGranularity)); + assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS)); IndexResponse response = indexSingleDoc(indexName); - assertEquals(indexStats.get(maxSeqNoGranularity + "-shard-" + response.getShardId().id()) + 1, response.getSeqNo()); + if (indexStats.containsKey(MAX_SEQ_NO_TOTAL + "-shard-" + response.getShardId().id())) { + assertEquals(indexStats.get(MAX_SEQ_NO_TOTAL + "-shard-" + response.getShardId().id()) + 1, response.getSeqNo()); + } refresh(indexName); - assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(statsGranularity) + 1); + assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS) + 1); } private void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) { @@ -96,7 +95,6 @@ private void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, St * Simulates all data restored using Remote Translog Store. * @throws IOException IO Exception. */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6188") public void testRemoteTranslogRestoreWithNoDataPostCommit() throws IOException { testRestoreFlow(1, true, randomIntBetween(1, 5)); } @@ -131,7 +129,6 @@ public void testRemoteTranslogRestoreWithCommittedData() throws IOException { * Simulates all data restored using Remote Translog Store. * @throws IOException IO Exception. */ - // @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6188") @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479") public void testRTSRestoreWithNoDataPostCommitPrimaryReplicaDown() throws IOException { testRestoreFlowBothPrimaryReplicasDown(1, true, randomIntBetween(1, 5)); @@ -172,7 +169,7 @@ private void restoreAndVerify(int shardCount, int replicaCount, Map indexStats = indexData(numberOfIterations, invokeFlush, INDEX_NAME); assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards); + assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(REFRESHED_OR_FLUSHED_OPERATIONS)); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME))); ensureRed(INDEX_NAME); @@ -256,7 +255,7 @@ private void testRestoreFlowMultipleIndices(int numberOfIterations, boolean invo ensureGreen(indices); for (String index : indices) { assertEquals(shardCount, getNumShards(index).totalNumShards); - verifyRestoredData(indicesStats.get(index), true, index); + verifyRestoredData(indicesStats.get(index), index); } } @@ -288,7 +287,7 @@ public void testRestoreFlowNoRedIndex() { ensureGreen(INDEX_NAME); assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards); - verifyRestoredData(indexStats, true, INDEX_NAME); + verifyRestoredData(indexStats, INDEX_NAME); } /** @@ -340,7 +339,7 @@ public void testRTSRestoreWithCommittedDataDefaultAllIndices() throws IOExceptio for (String index : indices) { assertEquals(shardCount, getNumShards(index).totalNumShards); - verifyRestoredData(indicesStats.get(index), true, index); + verifyRestoredData(indicesStats.get(index), index); } } @@ -384,9 +383,9 @@ public void testRTSRestoreWithCommittedDataNotAllRedRemoteIndices() throws IOExc ); ensureGreen(indices[0], indices[1]); assertEquals(shardCount, getNumShards(indices[0]).totalNumShards); - verifyRestoredData(indicesStats.get(indices[0]), true, indices[0]); + verifyRestoredData(indicesStats.get(indices[0]), indices[0]); assertEquals(shardCount, getNumShards(indices[1]).totalNumShards); - verifyRestoredData(indicesStats.get(indices[1]), true, indices[1]); + verifyRestoredData(indicesStats.get(indices[1]), indices[1]); ensureRed(indices[2], indices[3]); } @@ -436,9 +435,9 @@ public void testRTSRestoreWithCommittedDataExcludeIndicesPatterns() throws IOExc ); ensureGreen(indices[0], indices[1]); assertEquals(shardCount, getNumShards(indices[0]).totalNumShards); - verifyRestoredData(indicesStats.get(indices[0]), true, indices[0]); + verifyRestoredData(indicesStats.get(indices[0]), indices[0]); assertEquals(shardCount, getNumShards(indices[1]).totalNumShards); - verifyRestoredData(indicesStats.get(indices[1]), true, indices[1]); + verifyRestoredData(indicesStats.get(indices[1]), indices[1]); ensureRed(indices[2], indices[3]); } @@ -447,8 +446,7 @@ public void testRTSRestoreWithCommittedDataExcludeIndicesPatterns() throws IOExc * when the index has no data. * @throws IOException IO Exception. */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6188") - public void testRTSRestoreNoData() throws IOException { + public void testRTSRestoreDataOnlyInTranslog() throws IOException { testRestoreFlow(0, true, randomIntBetween(1, 5)); } diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index b565ddd6c819a..d0c083390ab70 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -64,7 +64,9 @@ import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.index.store.Store; +import org.opensearch.index.translog.Checkpoint; import org.opensearch.index.translog.Translog; +import org.opensearch.index.translog.TranslogHeader; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.repositories.IndexId; @@ -74,6 +76,8 @@ import java.io.IOException; import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -83,6 +87,7 @@ import java.util.stream.Collectors; import static org.opensearch.common.unit.TimeValue.timeValueMillis; +import static org.opensearch.index.translog.Translog.CHECKPOINT_FILE_NAME; /** * This package private utility class encapsulates the logic to recover an index shard from either an existing index on @@ -532,13 +537,16 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco // Download segments from remote segment store indexShard.syncSegmentsFromRemoteSegmentStore(true, true); + indexShard.syncTranslogFilesFromRemoteTranslog(); + if (store.directory().listAll().length == 0) { - store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion); - } - if (indexShard.indexSettings.isRemoteTranslogStoreEnabled()) { - indexShard.syncTranslogFilesFromRemoteTranslog(); - } else { - bootstrap(indexShard, store); + Path location = indexShard.shardPath().resolveTranslog(); + Checkpoint checkpoint = Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME)); + final Path translogFile = location.resolve(Translog.getFilename(checkpoint.getGeneration())); + try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.READ)) { + TranslogHeader translogHeader = TranslogHeader.read(translogFile, channel); + store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion, translogHeader.getTranslogUUID()); + } } assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 16059e0604072..4f51994a6ac2f 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -1747,13 +1747,13 @@ public void accept(ShardLock Lock) {} }; } - /** - * creates an empty lucene index and a corresponding empty translog. Any existing data will be deleted. - */ - public void createEmpty(Version luceneVersion) throws IOException { + public void createEmpty(Version luceneVersion, String translogUUID) throws IOException { metadataLock.writeLock().lock(); try (IndexWriter writer = newEmptyIndexWriter(directory, luceneVersion)) { final Map map = new HashMap<>(); + if (translogUUID != null) { + map.put(Translog.TRANSLOG_UUID_KEY, translogUUID); + } map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()); map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(SequenceNumbers.NO_OPS_PERFORMED)); map.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(SequenceNumbers.NO_OPS_PERFORMED)); @@ -1764,6 +1764,13 @@ public void createEmpty(Version luceneVersion) throws IOException { } } + /** + * creates an empty lucene index and a corresponding empty translog. Any existing data will be deleted. + */ + public void createEmpty(Version luceneVersion) throws IOException { + createEmpty(luceneVersion, null); + } + /** * Marks an existing lucene index with a new history uuid. * This is used to make sure no existing shard will recovery from this index using ops based recovery. diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java b/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java index 1090a994bf6ad..42bda11d75783 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java @@ -57,7 +57,7 @@ * * @opensearch.internal */ -final class TranslogHeader { +public final class TranslogHeader { public static final String TRANSLOG_CODEC = "translog"; public static final int VERSION_CHECKSUMS = 1; // pre-2.0 - unsupported @@ -137,9 +137,26 @@ static int readHeaderVersion(final Path path, final FileChannel channel, final S } /** - * Read a translog header from the given path and file channel + * Read a translog header from the given path and file channel and compare the given UUID */ static TranslogHeader read(final String translogUUID, final Path path, final FileChannel channel) throws IOException { + TranslogHeader translogHeader = read(path, channel); + // verify UUID only after checksum, to ensure that UUID is not corrupted + final BytesRef expectedUUID = new BytesRef(translogUUID); + final BytesRef actualUUID = new BytesRef(translogHeader.translogUUID); + if (actualUUID.bytesEquals(expectedUUID) == false) { + throw new TranslogCorruptedException( + path.toString(), + "expected shard UUID " + expectedUUID + " but got: " + actualUUID + " this translog file belongs to a different translog" + ); + } + return translogHeader; + } + + /** + * Read a translog header from the given path and file channel and compare the given UUID + */ + public static TranslogHeader read(final Path path, final FileChannel channel) throws IOException { try { // This input is intentionally not closed because closing it will close the FileChannel. final BufferedChecksumStreamInput in = new BufferedChecksumStreamInput( @@ -179,16 +196,7 @@ static TranslogHeader read(final String translogUUID, final Path path, final Fil + channel.position() + "]"; - // verify UUID only after checksum, to ensure that UUID is not corrupted - final BytesRef expectedUUID = new BytesRef(translogUUID); - if (uuid.bytesEquals(expectedUUID) == false) { - throw new TranslogCorruptedException( - path.toString(), - "expected shard UUID " + expectedUUID + " but got: " + uuid + " this translog file belongs to a different translog" - ); - } - - return new TranslogHeader(translogUUID, primaryTerm, headerSizeInBytes); + return new TranslogHeader(uuid.utf8ToString(), primaryTerm, headerSizeInBytes); } catch (EOFException e) { throw new TranslogCorruptedException(path.toString(), "translog header truncated", e); } diff --git a/server/src/test/java/org/opensearch/index/store/StoreTests.java b/server/src/test/java/org/opensearch/index/store/StoreTests.java index 957a3bdb08501..8395b3e8ac08e 100644 --- a/server/src/test/java/org/opensearch/index/store/StoreTests.java +++ b/server/src/test/java/org/opensearch/index/store/StoreTests.java @@ -84,6 +84,7 @@ import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLease; import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.translog.Translog; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; import org.opensearch.test.DummyShardLock; @@ -1166,6 +1167,42 @@ public void testGetMetadataWithSegmentInfos() throws IOException { store.close(); } + public void testCreateEmptyStore() throws IOException { + final ShardId shardId = new ShardId("index", "_na_", 1); + Store store = new Store(shardId, INDEX_SETTINGS, new NIOFSDirectory(createTempDir()), new DummyShardLock(shardId)); + store.createEmpty(Version.LATEST); + SegmentInfos segmentInfos = Lucene.readSegmentInfos(store.directory()); + assertFalse(segmentInfos.getUserData().containsKey(Translog.TRANSLOG_UUID_KEY)); + testDefaultUserData(segmentInfos); + store.close(); + } + + public void testCreateEmptyStoreWithTranlogUUID() throws IOException { + final ShardId shardId = new ShardId("index", "_na_", 1); + Store store = new Store(shardId, INDEX_SETTINGS, new NIOFSDirectory(createTempDir()), new DummyShardLock(shardId)); + store.createEmpty(Version.LATEST, "dummy-translog-UUID"); + SegmentInfos segmentInfos = Lucene.readSegmentInfos(store.directory()); + assertEquals("dummy-translog-UUID", segmentInfos.getUserData().get(Translog.TRANSLOG_UUID_KEY)); + testDefaultUserData(segmentInfos); + store.close(); + } + + public void testCreateEmptyWithNullTranlogUUID() throws IOException { + final ShardId shardId = new ShardId("index", "_na_", 1); + Store store = new Store(shardId, INDEX_SETTINGS, new NIOFSDirectory(createTempDir()), new DummyShardLock(shardId)); + store.createEmpty(Version.LATEST, null); + SegmentInfos segmentInfos = Lucene.readSegmentInfos(store.directory()); + assertFalse(segmentInfos.getUserData().containsKey(Translog.TRANSLOG_UUID_KEY)); + testDefaultUserData(segmentInfos); + store.close(); + } + + private void testDefaultUserData(SegmentInfos segmentInfos) { + assertEquals("-1", segmentInfos.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + assertEquals("-1", segmentInfos.getUserData().get(SequenceNumbers.MAX_SEQ_NO)); + assertEquals("-1", segmentInfos.getUserData().get(Engine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)); + } + public void testGetSegmentMetadataMap() throws IOException { final ShardId shardId = new ShardId("index", "_na_", 1); Store store = new Store( diff --git a/server/src/test/java/org/opensearch/index/translog/TranslogHeaderTests.java b/server/src/test/java/org/opensearch/index/translog/TranslogHeaderTests.java index 4441e30ea639d..a5d6ee7a06e23 100644 --- a/server/src/test/java/org/opensearch/index/translog/TranslogHeaderTests.java +++ b/server/src/test/java/org/opensearch/index/translog/TranslogHeaderTests.java @@ -132,6 +132,49 @@ public void testHeaderWithoutPrimaryTerm() throws Exception { }); } + public void testCurrentHeaderVersionWithoutUUIDComparison() throws Exception { + final String translogUUID = UUIDs.randomBase64UUID(); + final TranslogHeader outHeader = new TranslogHeader(translogUUID, randomNonNegativeLong()); + final long generation = randomNonNegativeLong(); + final Path translogFile = createTempDir().resolve(Translog.getFilename(generation)); + try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) { + outHeader.write(channel, true); + assertThat(outHeader.sizeInBytes(), equalTo((int) channel.position())); + } + try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.READ)) { + final TranslogHeader inHeader = TranslogHeader.read(translogFile, channel); + assertThat(inHeader.getTranslogUUID(), equalTo(translogUUID)); + assertThat(inHeader.getPrimaryTerm(), equalTo(outHeader.getPrimaryTerm())); + assertThat(inHeader.sizeInBytes(), equalTo((int) channel.position())); + } + + TestTranslog.corruptFile(logger, random(), translogFile, false); + final TranslogCorruptedException corruption = expectThrows(TranslogCorruptedException.class, () -> { + try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.READ)) { + final TranslogHeader translogHeader = TranslogHeader.read(translogFile, channel); + assertThat( + "version " + TranslogHeader.VERSION_CHECKPOINTS + " translog", + translogHeader.getPrimaryTerm(), + equalTo(SequenceNumbers.UNASSIGNED_PRIMARY_TERM) + ); + throw new TranslogCorruptedException(translogFile.toString(), "adjusted translog version"); + } catch (IllegalStateException e) { + // corruption corrupted the version byte making this look like a v2, v1 or v0 translog + assertThat( + "version " + TranslogHeader.VERSION_CHECKPOINTS + "-or-earlier translog", + e.getMessage(), + anyOf( + containsString("pre-2.0 translog found"), + containsString("pre-1.4 translog found"), + containsString("pre-6.3 translog found") + ) + ); + throw new TranslogCorruptedException(translogFile.toString(), "adjusted translog version", e); + } + }); + assertThat(corruption.getMessage(), not(containsString("this translog file belongs to a different translog"))); + } + static void writeHeaderWithoutTerm(FileChannel channel, String translogUUID) throws IOException { final OutputStreamStreamOutput out = new OutputStreamStreamOutput(Channels.newOutputStream(channel)); CodecUtil.writeHeader(new OutputStreamDataOutput(out), TranslogHeader.TRANSLOG_CODEC, TranslogHeader.VERSION_CHECKPOINTS);