From e12a5b92c9e1638685c684d8bd5a75964c23a150 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Tue, 4 Apr 2023 18:45:54 +0530 Subject: [PATCH] [Remote Store] Add file details to recoveryState while downloading segments from remote store (#6825) * Use existing StatsDirectoryWrapper to record recovery stats Signed-off-by: Sachin Kale --- .../opensearch/index/shard/IndexShard.java | 15 ++++- .../opensearch/index/shard/StoreRecovery.java | 6 +- .../store/RemoteSegmentStoreDirectory.java | 17 +++-- .../RemoteSegmentStoreDirectoryTests.java | 66 ++++++++++++++----- .../RemoteSegmentMetadataHandlerTests.java | 20 +++++- 5 files changed, 99 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 2629b2c954015..3be1816e1c4b8 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4402,12 +4402,25 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re ((RemoteSegmentStoreDirectory) remoteDirectory).init(); Map uploadedSegments = ((RemoteSegmentStoreDirectory) remoteDirectory) .getSegmentsUploadedToRemoteStore(); - final Directory storeDirectory = store.directory(); store.incRef(); remoteStore.incRef(); List downloadedSegments = new ArrayList<>(); List skippedSegments = new ArrayList<>(); try { + final Directory storeDirectory; + if (recoveryState.getStage() == RecoveryState.Stage.INDEX) { + storeDirectory = new StoreRecovery.StatsDirectoryWrapper(store.directory(), recoveryState.getIndex()); + for (String file : uploadedSegments.keySet()) { + long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum()); + if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) { + recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), false); + } else { + recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), true); + } + } + } else { + storeDirectory = store.directory(); + } String segmentInfosSnapshotFilename = null; Set localSegmentFiles = Sets.newHashSet(storeDirectory.listAll()); for (String file : uploadedSegments.keySet()) { diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 972a76bc17eb5..31a863129cc8c 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -268,7 +268,9 @@ public void copyFrom(Directory from, String src, String dest, IOContext context) in.copyFrom(new FilterDirectory(from) { @Override public IndexInput openInput(String name, IOContext context) throws IOException { - index.addFileDetail(dest, l, false); + if (index.getFileDetails(dest) == null) { + index.addFileDetail(dest, l, false); + } copies.set(true); final IndexInput input = in.openInput(name, context); return new IndexInput("StatsDirectoryWrapper(" + input.toString() + ")") { @@ -311,7 +313,7 @@ public void readBytes(byte[] b, int offset, int len) throws IOException { }; } }, src, dest, context); - if (copies.get() == false) { + if (copies.get() == false && index.getFileDetails(dest) == null) { index.addFileDetail(dest, l, true); // hardlinked - we treat it as reused since the file was already somewhat there } else { assert index.getFileDetails(dest) != null : "File [" + dest + "] has no file details"; diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 095f1c01792e6..c385303813844 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -153,25 +153,31 @@ public static class UploadedSegmentMetadata { private final String originalFilename; private final String uploadedFilename; private final String checksum; + private final long length; - UploadedSegmentMetadata(String originalFilename, String uploadedFilename, String checksum) { + UploadedSegmentMetadata(String originalFilename, String uploadedFilename, String checksum, long length) { this.originalFilename = originalFilename; this.uploadedFilename = uploadedFilename; this.checksum = checksum; + this.length = length; } @Override public String toString() { - return String.join(SEPARATOR, originalFilename, uploadedFilename, checksum); + return String.join(SEPARATOR, originalFilename, uploadedFilename, checksum, String.valueOf(length)); } public String getChecksum() { return this.checksum; } + public long getLength() { + return this.length; + } + public static UploadedSegmentMetadata fromString(String uploadedFilename) { String[] values = uploadedFilename.split(SEPARATOR); - return new UploadedSegmentMetadata(values[0], values[1], values[2]); + return new UploadedSegmentMetadata(values[0], values[1], values[2], Long.parseLong(values[3])); } } @@ -273,6 +279,9 @@ public void deleteFile(String name) throws IOException { */ @Override public long fileLength(String name) throws IOException { + if (segmentsUploadedToRemoteStore.containsKey(name)) { + return segmentsUploadedToRemoteStore.get(name).getLength(); + } String remoteFilename = getExistingRemoteFilename(name); if (remoteFilename != null) { return remoteDataDirectory.fileLength(remoteFilename); @@ -317,7 +326,7 @@ public void copyFrom(Directory from, String src, String dest, IOContext context, } remoteDataDirectory.copyFrom(from, src, remoteFilename, context); String checksum = getChecksumOfLocalFile(from, src); - UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum); + UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum, from.fileLength(src)); segmentsUploadedToRemoteStore.put(src, segmentMetadata); } diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 956279c3ea048..49a2d50dfae06 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -64,16 +64,17 @@ public void testUploadedSegmentMetadataToString() { RemoteSegmentStoreDirectory.UploadedSegmentMetadata metadata = new RemoteSegmentStoreDirectory.UploadedSegmentMetadata( "abc", "pqr", - "123456" + "123456", + 1234 ); - assertEquals("abc::pqr::123456", metadata.toString()); + assertEquals("abc::pqr::123456::1234", metadata.toString()); } public void testUploadedSegmentMetadataFromString() { RemoteSegmentStoreDirectory.UploadedSegmentMetadata metadata = RemoteSegmentStoreDirectory.UploadedSegmentMetadata.fromString( - "_0.cfe::_0.cfe__uuidxyz::4567" + "_0.cfe::_0.cfe__uuidxyz::4567::372000" ); - assertEquals("_0.cfe::_0.cfe__uuidxyz::4567", metadata.toString()); + assertEquals("_0.cfe::_0.cfe__uuidxyz::4567::372000", metadata.toString()); } public void testGetMetadataFilename() { @@ -141,9 +142,42 @@ public void testInitNoMetadataFile() throws IOException { private Map getDummyMetadata(String prefix, int commitGeneration) { Map metadata = new HashMap<>(); - metadata.put(prefix + ".cfe", prefix + ".cfe::" + prefix + ".cfe__" + UUIDs.base64UUID() + "::" + randomIntBetween(1000, 5000)); - metadata.put(prefix + ".cfs", prefix + ".cfs::" + prefix + ".cfs__" + UUIDs.base64UUID() + "::" + randomIntBetween(1000, 5000)); - metadata.put(prefix + ".si", prefix + ".si::" + prefix + ".si__" + UUIDs.base64UUID() + "::" + randomIntBetween(1000, 5000)); + metadata.put( + prefix + ".cfe", + prefix + + ".cfe::" + + prefix + + ".cfe__" + + UUIDs.base64UUID() + + "::" + + randomIntBetween(1000, 5000) + + "::" + + randomIntBetween(512000, 1024000) + ); + metadata.put( + prefix + ".cfs", + prefix + + ".cfs::" + + prefix + + ".cfs__" + + UUIDs.base64UUID() + + "::" + + randomIntBetween(1000, 5000) + + "::" + + randomIntBetween(512000, 1024000) + ); + metadata.put( + prefix + ".si", + prefix + + ".si::" + + prefix + + ".si__" + + UUIDs.base64UUID() + + "::" + + randomIntBetween(1000, 5000) + + "::" + + randomIntBetween(512000, 1024000) + ); metadata.put( "segments_" + commitGeneration, "segments_" @@ -154,6 +188,8 @@ private Map getDummyMetadata(String prefix, int commitGeneration + UUIDs.base64UUID() + "::" + randomIntBetween(1000, 5000) + + "::" + + randomIntBetween(1024, 5120) ); return metadata; } @@ -250,7 +286,7 @@ public void testDeleteFileException() throws IOException { assertThrows(IOException.class, () -> remoteSegmentStoreDirectory.deleteFile("_0.si")); } - public void testFileLenght() throws IOException { + public void testFileLength() throws IOException { populateMetadata(); remoteSegmentStoreDirectory.init(); @@ -259,9 +295,7 @@ public void testFileLenght() throws IOException { assertTrue(uploadedSegments.containsKey("_0.si")); - when(remoteDataDirectory.fileLength(startsWith("_0.si"))).thenReturn(1234L); - - assertEquals(1234L, remoteSegmentStoreDirectory.fileLength("_0.si")); + assertEquals(uploadedSegments.get("_0.si").getLength(), remoteSegmentStoreDirectory.fileLength("_0.si")); } public void testFileLenghtNoSuchFile() throws IOException { @@ -376,8 +410,8 @@ public void testContainsFile() throws IOException { ); Map metadata = new HashMap<>(); - metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234"); - metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345"); + metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::512"); + metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345::1024"); when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn(createMetadataFileBytes(metadata)); @@ -390,7 +424,7 @@ public void testContainsFile() throws IOException { UnsupportedOperationException.class, () -> uploadedSegmentMetadataMap.put( "_100.si", - new RemoteSegmentStoreDirectory.UploadedSegmentMetadata("_100.si", "_100.si__uuid1", "1234") + new RemoteSegmentStoreDirectory.UploadedSegmentMetadata("_100.si", "_100.si__uuid1", "1234", 500) ) ); @@ -531,8 +565,8 @@ public void testIncorrectChecksumCorruptIndexException() throws IOException { ); Map metadata = new HashMap<>(); - metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234"); - metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345"); + metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::512"); + metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345::1024"); BytesStreamOutput output = new BytesStreamOutput(); IndexOutput indexOutput = new OutputStreamIndexOutput("segment metadata", "metadata output stream", output, 4096); diff --git a/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java b/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java index 2a30e58b8802c..3a73015c25589 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java @@ -59,11 +59,27 @@ private Map getDummyData() { String prefix = "_0"; expectedOutput.put( prefix + ".cfe", - prefix + ".cfe::" + prefix + ".cfe__" + UUIDs.base64UUID() + "::" + randomIntBetween(1000, 5000) + prefix + + ".cfe::" + + prefix + + ".cfe__" + + UUIDs.base64UUID() + + "::" + + randomIntBetween(1000, 5000) + + "::" + + randomIntBetween(1024, 2048) ); expectedOutput.put( prefix + ".cfs", - prefix + ".cfs::" + prefix + ".cfs__" + UUIDs.base64UUID() + "::" + randomIntBetween(1000, 5000) + prefix + + ".cfs::" + + prefix + + ".cfs__" + + UUIDs.base64UUID() + + "::" + + randomIntBetween(1000, 5000) + + "::" + + randomIntBetween(1024, 2048) ); return expectedOutput; }