Skip to content

Commit

Permalink
[Remote Store] Add file details to recoveryState while downloading se…
Browse files Browse the repository at this point in the history
…gments from remote store (#6825)

* Use existing StatsDirectoryWrapper to record recovery stats

Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
sachinpkale committed Apr 4, 2023
1 parent 1856090 commit e12a5b9
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 25 deletions.
15 changes: 14 additions & 1 deletion server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -4402,12 +4402,25 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
((RemoteSegmentStoreDirectory) remoteDirectory).init();
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = ((RemoteSegmentStoreDirectory) remoteDirectory)
.getSegmentsUploadedToRemoteStore();
final Directory storeDirectory = store.directory();
store.incRef();
remoteStore.incRef();
List<String> downloadedSegments = new ArrayList<>();
List<String> skippedSegments = new ArrayList<>();
try {
final Directory storeDirectory;
if (recoveryState.getStage() == RecoveryState.Stage.INDEX) {
storeDirectory = new StoreRecovery.StatsDirectoryWrapper(store.directory(), recoveryState.getIndex());
for (String file : uploadedSegments.keySet()) {
long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum());
if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) {
recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), false);
} else {
recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), true);
}
}
} else {
storeDirectory = store.directory();
}
String segmentInfosSnapshotFilename = null;
Set<String> localSegmentFiles = Sets.newHashSet(storeDirectory.listAll());
for (String file : uploadedSegments.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,9 @@ public void copyFrom(Directory from, String src, String dest, IOContext context)
in.copyFrom(new FilterDirectory(from) {
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
index.addFileDetail(dest, l, false);
if (index.getFileDetails(dest) == null) {
index.addFileDetail(dest, l, false);
}
copies.set(true);
final IndexInput input = in.openInput(name, context);
return new IndexInput("StatsDirectoryWrapper(" + input.toString() + ")") {
Expand Down Expand Up @@ -311,7 +313,7 @@ public void readBytes(byte[] b, int offset, int len) throws IOException {
};
}
}, src, dest, context);
if (copies.get() == false) {
if (copies.get() == false && index.getFileDetails(dest) == null) {
index.addFileDetail(dest, l, true); // hardlinked - we treat it as reused since the file was already somewhat there
} else {
assert index.getFileDetails(dest) != null : "File [" + dest + "] has no file details";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,25 +153,31 @@ public static class UploadedSegmentMetadata {
private final String originalFilename;
private final String uploadedFilename;
private final String checksum;
private final long length;

UploadedSegmentMetadata(String originalFilename, String uploadedFilename, String checksum) {
UploadedSegmentMetadata(String originalFilename, String uploadedFilename, String checksum, long length) {
this.originalFilename = originalFilename;
this.uploadedFilename = uploadedFilename;
this.checksum = checksum;
this.length = length;
}

@Override
public String toString() {
return String.join(SEPARATOR, originalFilename, uploadedFilename, checksum);
return String.join(SEPARATOR, originalFilename, uploadedFilename, checksum, String.valueOf(length));
}

public String getChecksum() {
return this.checksum;
}

public long getLength() {
return this.length;
}

public static UploadedSegmentMetadata fromString(String uploadedFilename) {
String[] values = uploadedFilename.split(SEPARATOR);
return new UploadedSegmentMetadata(values[0], values[1], values[2]);
return new UploadedSegmentMetadata(values[0], values[1], values[2], Long.parseLong(values[3]));
}
}

Expand Down Expand Up @@ -273,6 +279,9 @@ public void deleteFile(String name) throws IOException {
*/
@Override
public long fileLength(String name) throws IOException {
if (segmentsUploadedToRemoteStore.containsKey(name)) {
return segmentsUploadedToRemoteStore.get(name).getLength();
}
String remoteFilename = getExistingRemoteFilename(name);
if (remoteFilename != null) {
return remoteDataDirectory.fileLength(remoteFilename);
Expand Down Expand Up @@ -317,7 +326,7 @@ public void copyFrom(Directory from, String src, String dest, IOContext context,
}
remoteDataDirectory.copyFrom(from, src, remoteFilename, context);
String checksum = getChecksumOfLocalFile(from, src);
UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum);
UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum, from.fileLength(src));
segmentsUploadedToRemoteStore.put(src, segmentMetadata);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,17 @@ public void testUploadedSegmentMetadataToString() {
RemoteSegmentStoreDirectory.UploadedSegmentMetadata metadata = new RemoteSegmentStoreDirectory.UploadedSegmentMetadata(
"abc",
"pqr",
"123456"
"123456",
1234
);
assertEquals("abc::pqr::123456", metadata.toString());
assertEquals("abc::pqr::123456::1234", metadata.toString());
}

public void testUploadedSegmentMetadataFromString() {
RemoteSegmentStoreDirectory.UploadedSegmentMetadata metadata = RemoteSegmentStoreDirectory.UploadedSegmentMetadata.fromString(
"_0.cfe::_0.cfe__uuidxyz::4567"
"_0.cfe::_0.cfe__uuidxyz::4567::372000"
);
assertEquals("_0.cfe::_0.cfe__uuidxyz::4567", metadata.toString());
assertEquals("_0.cfe::_0.cfe__uuidxyz::4567::372000", metadata.toString());
}

public void testGetMetadataFilename() {
Expand Down Expand Up @@ -141,9 +142,42 @@ public void testInitNoMetadataFile() throws IOException {
private Map<String, String> getDummyMetadata(String prefix, int commitGeneration) {
Map<String, String> metadata = new HashMap<>();

metadata.put(prefix + ".cfe", prefix + ".cfe::" + prefix + ".cfe__" + UUIDs.base64UUID() + "::" + randomIntBetween(1000, 5000));
metadata.put(prefix + ".cfs", prefix + ".cfs::" + prefix + ".cfs__" + UUIDs.base64UUID() + "::" + randomIntBetween(1000, 5000));
metadata.put(prefix + ".si", prefix + ".si::" + prefix + ".si__" + UUIDs.base64UUID() + "::" + randomIntBetween(1000, 5000));
metadata.put(
prefix + ".cfe",
prefix
+ ".cfe::"
+ prefix
+ ".cfe__"
+ UUIDs.base64UUID()
+ "::"
+ randomIntBetween(1000, 5000)
+ "::"
+ randomIntBetween(512000, 1024000)
);
metadata.put(
prefix + ".cfs",
prefix
+ ".cfs::"
+ prefix
+ ".cfs__"
+ UUIDs.base64UUID()
+ "::"
+ randomIntBetween(1000, 5000)
+ "::"
+ randomIntBetween(512000, 1024000)
);
metadata.put(
prefix + ".si",
prefix
+ ".si::"
+ prefix
+ ".si__"
+ UUIDs.base64UUID()
+ "::"
+ randomIntBetween(1000, 5000)
+ "::"
+ randomIntBetween(512000, 1024000)
);
metadata.put(
"segments_" + commitGeneration,
"segments_"
Expand All @@ -154,6 +188,8 @@ private Map<String, String> getDummyMetadata(String prefix, int commitGeneration
+ UUIDs.base64UUID()
+ "::"
+ randomIntBetween(1000, 5000)
+ "::"
+ randomIntBetween(1024, 5120)
);
return metadata;
}
Expand Down Expand Up @@ -250,7 +286,7 @@ public void testDeleteFileException() throws IOException {
assertThrows(IOException.class, () -> remoteSegmentStoreDirectory.deleteFile("_0.si"));
}

public void testFileLenght() throws IOException {
public void testFileLength() throws IOException {
populateMetadata();
remoteSegmentStoreDirectory.init();

Expand All @@ -259,9 +295,7 @@ public void testFileLenght() throws IOException {

assertTrue(uploadedSegments.containsKey("_0.si"));

when(remoteDataDirectory.fileLength(startsWith("_0.si"))).thenReturn(1234L);

assertEquals(1234L, remoteSegmentStoreDirectory.fileLength("_0.si"));
assertEquals(uploadedSegments.get("_0.si").getLength(), remoteSegmentStoreDirectory.fileLength("_0.si"));
}

public void testFileLenghtNoSuchFile() throws IOException {
Expand Down Expand Up @@ -376,8 +410,8 @@ public void testContainsFile() throws IOException {
);

Map<String, String> metadata = new HashMap<>();
metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234");
metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345");
metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::512");
metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345::1024");

when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn(createMetadataFileBytes(metadata));

Expand All @@ -390,7 +424,7 @@ public void testContainsFile() throws IOException {
UnsupportedOperationException.class,
() -> uploadedSegmentMetadataMap.put(
"_100.si",
new RemoteSegmentStoreDirectory.UploadedSegmentMetadata("_100.si", "_100.si__uuid1", "1234")
new RemoteSegmentStoreDirectory.UploadedSegmentMetadata("_100.si", "_100.si__uuid1", "1234", 500)
)
);

Expand Down Expand Up @@ -531,8 +565,8 @@ public void testIncorrectChecksumCorruptIndexException() throws IOException {
);

Map<String, String> metadata = new HashMap<>();
metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234");
metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345");
metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::512");
metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345::1024");

BytesStreamOutput output = new BytesStreamOutput();
IndexOutput indexOutput = new OutputStreamIndexOutput("segment metadata", "metadata output stream", output, 4096);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,27 @@ private Map<String, String> getDummyData() {
String prefix = "_0";
expectedOutput.put(
prefix + ".cfe",
prefix + ".cfe::" + prefix + ".cfe__" + UUIDs.base64UUID() + "::" + randomIntBetween(1000, 5000)
prefix
+ ".cfe::"
+ prefix
+ ".cfe__"
+ UUIDs.base64UUID()
+ "::"
+ randomIntBetween(1000, 5000)
+ "::"
+ randomIntBetween(1024, 2048)
);
expectedOutput.put(
prefix + ".cfs",
prefix + ".cfs::" + prefix + ".cfs__" + UUIDs.base64UUID() + "::" + randomIntBetween(1000, 5000)
prefix
+ ".cfs::"
+ prefix
+ ".cfs__"
+ UUIDs.base64UUID()
+ "::"
+ randomIntBetween(1000, 5000)
+ "::"
+ randomIntBetween(1024, 2048)
);
return expectedOutput;
}
Expand Down

0 comments on commit e12a5b9

Please sign in to comment.