Skip to content

Commit

Permalink
Address PR Comments
Browse files Browse the repository at this point in the history
Signed-off-by: Harish Bhakuni <hbhakuni@amazon.com>
  • Loading branch information
Harish Bhakuni committed Jun 23, 2023
1 parent d0f2aa5 commit 941208a
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ public Settings indexSettings() {

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build();
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.REMOTE_STORE, "true")
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
.build();
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ public void testParallelRestoreOperationsShallowCopyEnabled() throws IOException
ensureYellowAndNoInitializingShards(restoredIndexName1);
ensureGreen(restoredIndexName1);
assertDocsPresentInIndex(client(), restoredIndexName1, numDocsInIndex1);
// indexing some new docs and validating
indexDocuments(client, restoredIndexName1, numDocsInIndex1, numDocsInIndex1 + 2);
ensureGreen(restoredIndexName1);
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1 + 2);

// restore index as seg rep enabled with remote store and remote translog disabled
RestoreSnapshotResponse restoreSnapshotResponse3 = client.admin()
Expand All @@ -288,6 +292,10 @@ public void testParallelRestoreOperationsShallowCopyEnabled() throws IOException
assertNull(indexSettings.get(SETTING_REMOTE_STORE_REPOSITORY, null));
assertEquals(ReplicationType.SEGMENT.toString(), indexSettings.get(IndexMetadata.SETTING_REPLICATION_TYPE));
assertDocsPresentInIndex(client, restoredIndexName1Seg, numDocsInIndex1);
// indexing some new docs and validating
indexDocuments(client, restoredIndexName1Seg, numDocsInIndex1, numDocsInIndex1 + 2);
ensureGreen(restoredIndexName1Seg);
assertDocsPresentInIndex(client, restoredIndexName1Seg, numDocsInIndex1 + 2);

// restore index as doc rep based from shallow copy snapshot
RestoreSnapshotResponse restoreSnapshotResponse4 = client.admin()
Expand Down Expand Up @@ -315,6 +323,10 @@ public void testParallelRestoreOperationsShallowCopyEnabled() throws IOException
assertNull(indexSettings.get(SETTING_REMOTE_STORE_REPOSITORY, null));
assertNull(indexSettings.get(IndexMetadata.SETTING_REPLICATION_TYPE));
assertDocsPresentInIndex(client, restoredIndexName1Doc, numDocsInIndex1);
// indexing some new docs and validating
indexDocuments(client, restoredIndexName1Doc, numDocsInIndex1, numDocsInIndex1 + 2);
ensureGreen(restoredIndexName1Doc);
assertDocsPresentInIndex(client, restoredIndexName1Doc, numDocsInIndex1 + 2);
}

public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException {
Expand Down Expand Up @@ -376,6 +388,7 @@ public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException
.get();
assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED);
ensureGreen(restoredIndexName1);
assertDocsPresentInIndex(client(), restoredIndexName1, numDocsInIndex1);

// deleting data for restoredIndexName1 and restoring from remote store.
stopNodeWithPrimaryShard(restoredIndexName1);
Expand All @@ -385,7 +398,11 @@ public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException
.restoreRemoteStore(new RestoreRemoteStoreRequest().indices(restoredIndexName1), PlainActionFuture.newFuture());
ensureYellowAndNoInitializingShards(restoredIndexName1);
ensureGreen(restoredIndexName1);
assertDocsPresentInIndex(client(), restoredIndexName1, numDocsInIndex1);
// indexing some new docs and validating
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1);
indexDocuments(client, restoredIndexName1, numDocsInIndex1, numDocsInIndex1 + 2);
ensureGreen(restoredIndexName1);
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1 + 2);
}

private Settings.Builder getIndexSettings(
Expand Down Expand Up @@ -500,6 +517,11 @@ public void testRestoreShallowSnapshotRepositoryOverriden() throws ExecutionExce
assertTrue(restoreSnapshotResponse2.getRestoreInfo().failedShards() == 0);
ensureGreen(restoredIndexName1);
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1);

// indexing some new docs and validating
indexDocuments(client, restoredIndexName1, numDocsInIndex1, numDocsInIndex1 + 2);
ensureGreen(restoredIndexName1);
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1 + 2);
}

private void stopNodeWithPrimaryShard(String indexName) throws IOException {
Expand All @@ -524,7 +546,11 @@ private void stopNodeWithPrimaryShard(String indexName) throws IOException {
}

private void indexDocuments(Client client, String indexName, int numOfDocs) {
for (int i = 0; i < numOfDocs; i++) {
indexDocuments(client, indexName, 0, numOfDocs);
}

private void indexDocuments(Client client, String indexName, int fromId, int toId) {
for (int i = fromId; i < toId; i++) {
String id = Integer.toString(i);
client.prepareIndex(indexName).setId(id).setSource("text", "sometext").get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public RestoreSnapshotRequest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_7_0)) {
storageType = in.readEnum(StorageType.class);
}
if (in.getVersion().onOrAfter(Version.V_2_9_0)) {
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
sourceRemoteStoreRepository = in.readOptionalString();
}
}
Expand All @@ -174,7 +174,7 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_7_0)) {
out.writeEnum(storageType);
}
if (out.getVersion().onOrAfter(Version.V_2_9_0)) {
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalString(sourceRemoteStoreRepository);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ public SnapshotRecoverySource(
} else {
isSearchableSnapshot = false;
}
if (in.getVersion().onOrAfter(Version.V_2_9_0)) {
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
remoteStoreIndexShallowCopy = in.readBoolean();
sourceRemoteStoreRepository = in.readOptionalString();
} else {
Expand Down Expand Up @@ -345,7 +345,7 @@ protected void writeAdditionalFields(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_7_0)) {
out.writeBoolean(isSearchableSnapshot);
}
if (out.getVersion().onOrAfter(Version.V_2_9_0)) {
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeBoolean(remoteStoreIndexShallowCopy);
out.writeOptionalString(sourceRemoteStoreRepository);
}
Expand Down
47 changes: 16 additions & 31 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1496,7 +1496,7 @@ public GatedCloseable<IndexCommit> acquireLastIndexCommitAndRefresh(boolean flus
* @throws IOException if there is some failure in acquiring lock in remote store.
*/
public void acquireLockOnCommitData(String snapshotId, long primaryTerm, long generation) throws IOException {
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = getRemoteSegmentDirectoryForShard();
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = getRemoteDirectory();
remoteSegmentStoreDirectory.acquireLock(primaryTerm, generation, snapshotId);
}

Expand All @@ -1508,20 +1508,10 @@ public void acquireLockOnCommitData(String snapshotId, long primaryTerm, long ge
* @throws IOException if there is some failure in releasing lock in remote store.
*/
public void releaseLockOnCommitData(String snapshotId, long primaryTerm, long generation) throws IOException {
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = getRemoteSegmentDirectoryForShard();
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = getRemoteDirectory();
remoteSegmentStoreDirectory.releaseLock(primaryTerm, generation, snapshotId);
}

private RemoteSegmentStoreDirectory getRemoteSegmentDirectoryForShard() {
FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory();
assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory
: "Store.directory is not enclosing an instance of FilterDirectory";
FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate();
final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate();
assert remoteDirectory instanceof RemoteSegmentStoreDirectory : "remoteDirectory is not an instance of RemoteSegmentStoreDirectory";
return ((RemoteSegmentStoreDirectory) remoteDirectory);
}

public Optional<NRTReplicationEngine> getReplicationEngine() {
if (getEngine() instanceof NRTReplicationEngine) {
return Optional.of((NRTReplicationEngine) getEngine());
Expand Down Expand Up @@ -2295,7 +2285,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
assert currentEngineReference.get() == null : "engine is running";
verifyNotClosed();
if (indexSettings.isRemoteStoreEnabled()) {
syncSegmentsFromRemoteSegmentStore(true, true, true);
syncSegmentsFromRemoteSegmentStore(false, true, true);
}
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
syncRemoteTranslogAndUpdateGlobalCheckpoint();
Expand Down Expand Up @@ -3508,10 +3498,6 @@ private void executeRecovery(
threadPool.generic().execute(ActionRunnable.wrap(ActionListener.wrap(r -> {
if (r) {
recoveryListener.onDone(recoveryState);
if (remoteStore != null && recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT) {
// do something

}
}
}, e -> recoveryListener.onFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)), action));
}
Expand Down Expand Up @@ -4640,45 +4626,45 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
throw new IndexShardRecoveryException(shardId, "Exception while copying segment files from remote segment store", e);
} finally {
store.decRef();
remoteStore.incRef();
remoteStore.decRef();
}
}

/**
* Downloads segments from given remote segment store for a specific commit.
* @param overrideLocal flag to override local segment files with those in remote store
* @param sourceRemoteSegmentDirectory RemoteSegmentDirectory Instance from which we need to sync segments
* @param sourceRemoteDirectory RemoteSegmentDirectory Instance from which we need to sync segments
* @param primaryTerm Primary Term for shard at the time of commit operation for which we are syncing segments
* @param commitGeneration commit generation at the time of commit operation for which we are syncing segments
* @throws IOException if exception occurs while reading segments from remote store
*/
public void syncSegmentsFromGivenRemoteSegmentStore(
boolean overrideLocal,
RemoteSegmentStoreDirectory sourceRemoteSegmentDirectory,
RemoteSegmentStoreDirectory sourceRemoteDirectory,
long primaryTerm,
long commitGeneration
) throws IOException {
logger.info("Downloading segments from given remote segment store");
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = null;
RemoteSegmentStoreDirectory remoteDirectory = null;
if (remoteStore != null) {
remoteSegmentStoreDirectory = getRemoteSegmentDirectoryForShard();
remoteSegmentStoreDirectory.init();
remoteDirectory = getRemoteDirectory();
remoteDirectory.init();
remoteStore.incRef();
}
sourceRemoteSegmentDirectory.init();
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = sourceRemoteSegmentDirectory
sourceRemoteDirectory.init();
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = sourceRemoteDirectory
.getSegmentsUploadedToRemoteStore(primaryTerm, commitGeneration);
final Directory storeDirectory = store.directory();
store.incRef();

try {
copySegmentFiles(storeDirectory, sourceRemoteSegmentDirectory, remoteSegmentStoreDirectory, uploadedSegments, overrideLocal);
copySegmentFiles(storeDirectory, sourceRemoteDirectory, remoteDirectory, uploadedSegments, overrideLocal);
} catch (IOException e) {
throw new IndexShardRecoveryException(shardId, "Exception while copying segment files from remote segment store", e);
} finally {
store.decRef();
if (remoteStore != null) {
remoteStore.incRef();
remoteStore.decRef();
}
}
}
Expand All @@ -4702,14 +4688,13 @@ private void copySegmentFiles(
storeDirectory.deleteFile(file);
}
storeDirectory.copyFrom(sourceRemoteDirectory, file, file, IOContext.DEFAULT);
storeDirectory.sync(Collections.singleton(file));
if (targetRemoteDirectory != null) {
targetRemoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT);
}
downloadedSegments.add(file);
} else {
skippedSegments.add(file);
}
if (targetRemoteDirectory != null) {
targetRemoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT);
}
}
} finally {
logger.info("Downloaded segments here: {}", downloadedSegments);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,6 @@ void recoverFromSnapshotAndRemoteStore(
String.valueOf(shardId.id())
);
indexShard.syncSegmentsFromGivenRemoteSegmentStore(true, sourceRemoteDirectory, primaryTerm, commitGeneration);
// indexShard.resetEngineToGlobalCheckpoint();
bootstrap(indexShard, indexShard.store());
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
writeEmptyRetentionLeasesFile(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,18 +546,16 @@ private void createIndices(final ClusterState state) {
AllocatedIndex<? extends Shard> indexService = null;
try {
List<IndexEventListener> updatedIndexEventListeners = new ArrayList<>(builtInIndexListener);
if (entry.getValue().get(0).recoverySource().getType() == Type.SNAPSHOT) {
if (entry.getValue().size() > 0
&& entry.getValue().get(0).recoverySource().getType() == Type.SNAPSHOT
&& indexMetadata.getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false)) {
final IndexEventListener refreshListenerAfterSnapshotRestore = new IndexEventListener() {
@Override
public void afterIndexShardStarted(IndexShard indexShard) {
indexShard.refresh("snapshot restore done");
indexShard.refresh("refresh to upload metadata to remote store");
}
};
if (indexMetadata.getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false)) {

updatedIndexEventListeners.add(refreshListenerAfterSnapshotRestore);
}

updatedIndexEventListeners.add(refreshListenerAfterSnapshotRestore);
}
indexService = indicesService.createIndex(indexMetadata, updatedIndexEventListeners, true);
if (indexService.updateMapping(null, indexMetadata) && sendRefreshMapping) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ public ClusterState execute(ClusterState currentState) {
final boolean isRemoteStoreShallowCopy = Boolean.TRUE.equals(
snapshotInfo.isRemoteStoreIndexShallowCopyEnabled()
) && metadata.index(index).getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false);
if (isRemoteStoreShallowCopy && !currentState.getNodes().getMinNodeVersion().onOrAfter(Version.V_2_9_0)) {
if (isRemoteStoreShallowCopy && !currentState.getNodes().getMinNodeVersion().onOrAfter(Version.V_3_0_0)) {
throw new SnapshotRestoreException(
snapshot,
"cannot restore shallow copy snapshot for index ["
Expand Down

0 comments on commit 941208a

Please sign in to comment.