Skip to content

Commit

Permalink
[Snapshot Interop] Add Logic in Lock Manager to cleanup stale data po…
Browse files Browse the repository at this point in the history
…st index deletion.

Signed-off-by: Harish Bhakuni <hbhakuni@amazon.com>
  • Loading branch information
Harish Bhakuni committed Aug 1, 2023
1 parent b2a7348 commit 2c7b50b
Show file tree
Hide file tree
Showing 11 changed files with 270 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_2_NAME));
}

public int getFileCount(Path path) throws Exception {
public static int getFileCount(Path path) throws Exception {
final AtomicInteger filesExisting = new AtomicInteger(0);
Files.walkFileTree(path, new SimpleFileVisitor<>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,27 @@

import org.opensearch.action.ActionFuture;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.FeatureFlagSetter;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.comparesEqualTo;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
Expand Down Expand Up @@ -291,6 +297,71 @@ public void testDeleteMultipleShallowCopySnapshotsCase3() throws Exception {
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 0);
}

public void testRemoteStoreCleanupForDeletedIndex() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);

internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
final Client clusterManagerClient = internalCluster().clusterManagerClient();
ensureStableCluster(2);

final String snapshotRepoName = "snapshot-repo-name";
final Path snapshotRepoPath = randomRepoPath();
createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowCopy(snapshotRepoPath));
final String testIndex = "index-test";
createIndexWithContent(testIndex);

final Path remoteStoreRepoPath = randomRepoPath();
final String remoteStoreRepoName = "remote-store-repo-name";
createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath);

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName);
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10));

String indexUUID = client().admin()
.indices()
.prepareGetSettings(remoteStoreEnabledIndexName)
.get()
.getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_INDEX_UUID);

logger.info("--> create two remote index shallow snapshots");
List<String> shallowCopySnapshots = createNSnapshots(snapshotRepoName, 2);

String[] lockFiles = getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName);
assert (lockFiles.length == 2) : "lock files are " + Arrays.toString(lockFiles);

// delete remote store index
assertAcked(client().admin().indices().prepareDelete(remoteStoreEnabledIndexName));

logger.info("--> delete snapshot 1");
AcknowledgedResponse deleteSnapshotResponse = clusterManagerClient.admin()
.cluster()
.prepareDeleteSnapshot(snapshotRepoName, shallowCopySnapshots.get(0))
.get();
assertAcked(deleteSnapshotResponse);

lockFiles = getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName, indexUUID);
assert (lockFiles.length == 1) : "lock files are " + Arrays.toString(lockFiles);

logger.info("--> delete snapshot 2");
deleteSnapshotResponse = clusterManagerClient.admin()
.cluster()
.prepareDeleteSnapshot(snapshotRepoName, shallowCopySnapshots.get(1))
.get();
assertAcked(deleteSnapshotResponse);

Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID);
// Delete is async. Give time for it
assertBusy(() -> {
try {
assertThat(RemoteStoreBaseIntegTestCase.getFileCount(indexPath), comparesEqualTo(0));
} catch (Exception e) {}
}, 30, TimeUnit.SECONDS);
}

private List<String> createNSnapshots(String repoName, int count) {
final List<String> snapshotNames = new ArrayList<>(count);
final String prefix = "snap-" + UUIDs.randomBase64UUID(random()).toLowerCase(Locale.ROOT) + "-";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,29 +836,42 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
}
}

public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep) {
deleteStaleSegmentsAsync(lastNMetadataFilesToKeep, null);
}

/**
* Delete stale segment and metadata files asynchronously.
* This method calls {@link RemoteSegmentStoreDirectory#deleteStaleSegments(int)} in an async manner.
* @param lastNMetadataFilesToKeep number of metadata files to keep
*/
public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep) {
public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, ActionListener<Void> listener) {
if (canDeleteStaleCommits.compareAndSet(true, false)) {
try {
threadPool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
try {
deleteStaleSegments(lastNMetadataFilesToKeep);
if (listener != null) {
listener.onResponse(null);
}
} catch (Exception e) {
logger.info(
logger.error(
"Exception while deleting stale commits from remote segment store, will retry delete post next commit",
e
);
if (listener != null) {
listener.onFailure(e);
}
} finally {
canDeleteStaleCommits.set(true);
}
});
} catch (Exception e) {
logger.info("Exception occurred while scheduling deleteStaleCommits", e);
logger.error("Exception occurred while scheduling deleteStaleCommits", e);
canDeleteStaleCommits.set(true);
if (listener != null) {
listener.onFailure(e);
}
}
}
}
Expand Down Expand Up @@ -889,8 +902,25 @@ private boolean deleteIfEmpty() throws IOException {
return true;
}

/*
As method name suggests, this method should be called only once index is deleted.
*/
public static void cleanupPostIndexDeletion(
ThreadPool threadPool,
RemoteDirectory remoteDataDirectory,
RemoteDirectory remoteMetadataDirectory,
RemoteStoreLockManager mdLockManager
) throws IOException {
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(
remoteDataDirectory,
remoteMetadataDirectory,
mdLockManager,
threadPool
);
remoteSegmentStoreDirectory.close();
}

public void close() throws IOException {
deleteStaleSegmentsAsync(0);
deleteIfEmpty();
deleteStaleSegmentsAsync(0, ActionListener.wrap(r -> deleteIfEmpty(), e -> logger.error("Failed to cleanup remote directory")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -59,7 +59,7 @@ public Directory newDirectory(String repositoryName, String indexUUID, String sh

RemoteDirectory dataDirectory = createRemoteDirectory(repository, commonBlobPath, "data");
RemoteDirectory metadataDirectory = createRemoteDirectory(repository, commonBlobPath, "metadata");
RemoteStoreMetadataLockManager mdLockManager = RemoteStoreLockManagerFactory.newLockManager(
RemoteStoreLockManager mdLockManager = RemoteStoreLockManagerFactory.newLockManager(
repositoriesService.get(),
repositoryName,
indexUUID,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store.lockmanager;

import org.opensearch.index.store.RemoteBufferedOutputDirectory;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;

public class LockManagerWithRemoteStoreAccess extends RemoteStoreMetadataLockManager {
private final RemoteBufferedOutputDirectory dataDirectory;
private final RemoteBufferedOutputDirectory mdDirectory;
private final ThreadPool threadPool;

public LockManagerWithRemoteStoreAccess(
RemoteBufferedOutputDirectory dataDirectory,
RemoteBufferedOutputDirectory mdDirectory,
RemoteBufferedOutputDirectory lockDirectory,
ThreadPool threadPool
) {
super(lockDirectory);
this.mdDirectory = mdDirectory;
this.dataDirectory = dataDirectory;
this.threadPool = threadPool;
}

/*
*/

/**
* releases the lock and cleans up remote store directory if there are no more locks.
* this should be called only once index is deleted.
* see https://github.com/opensearch-project/OpenSearch/issues/8469
* @param lockInfo lock info instance for which lock need to be removed.
* @throws IOException throws if cleanup or releasing of lock fails.
*/
@Override
public void releaseAndCleanup(LockInfo lockInfo) throws IOException {
release(lockInfo);
RemoteSegmentStoreDirectory.cleanupPostIndexDeletion(threadPool, dataDirectory, mdDirectory, this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ public interface RemoteStoreLockManager {
*/
void release(LockInfo lockInfo) throws IOException;

/**
*
* releases lock and cleans up remote store directory if there are no more md files.
* this should be only called from cluster manager node, after remote store index is deleted
* see https://github.com/opensearch-project/OpenSearch/issues/8469
* @param lockInfo lock info instance for which lock need to be removed.
* @throws IOException throws exception in case there is a problem in releasing lock.
*/
default void releaseAndCleanup(LockInfo lockInfo) throws IOException {
throw new UnsupportedOperationException();
}

/**
*
* @param lockInfo lock info instance for which we need to check if lock is acquired.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.function.Supplier;
Expand All @@ -27,17 +28,24 @@
public class RemoteStoreLockManagerFactory {
private static final String SEGMENTS = "segments";
private static final String LOCK_FILES = "lock_files";
private static final String METADATA = "metadata";
private static final String DATA = "data";
private final Supplier<RepositoriesService> repositoriesService;

public RemoteStoreLockManagerFactory(Supplier<RepositoriesService> repositoriesService) {
this.repositoriesService = repositoriesService;
}

public RemoteStoreMetadataLockManager newLockManager(String repositoryName, String indexUUID, String shardId) throws IOException {
public RemoteStoreLockManager newLockManager(String repositoryName, String indexUUID, String shardId) throws IOException {
return newLockManager(repositoriesService.get(), repositoryName, indexUUID, shardId);
}

public static RemoteStoreMetadataLockManager newLockManager(
public RemoteStoreLockManager newLockManager(String repositoryName, String indexUUID, String shardId, ThreadPool threadPool)
throws IOException {
return newLockManager(repositoriesService.get(), repositoryName, indexUUID, shardId, threadPool);
}

public static RemoteStoreLockManager newLockManager(
RepositoriesService repositoriesService,
String repositoryName,
String indexUUID,
Expand All @@ -46,13 +54,27 @@ public static RemoteStoreMetadataLockManager newLockManager(
try (Repository repository = repositoriesService.repository(repositoryName)) {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobPath shardLevelBlobPath = ((BlobStoreRepository) repository).basePath().add(indexUUID).add(shardId).add(SEGMENTS);
RemoteBufferedOutputDirectory shardMDLockDirectory = createRemoteBufferedOutputDirectory(
repository,
shardLevelBlobPath,
LOCK_FILES
);
RemoteBufferedOutputDirectory lockDirectory = createRemoteBufferedOutputDirectory(repository, shardLevelBlobPath, LOCK_FILES);
return new RemoteStoreMetadataLockManager(lockDirectory);
} catch (RepositoryMissingException e) {
throw new IllegalArgumentException("Repository should be present to acquire/release lock", e);
}
}

return new RemoteStoreMetadataLockManager(shardMDLockDirectory);
public static RemoteStoreLockManager newLockManager(
RepositoriesService repositoriesService,
String repositoryName,
String indexUUID,
String shardId,
ThreadPool threadPool
) {
try (Repository repository = repositoriesService.repository(repositoryName)) {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobPath shardLevelBlobPath = ((BlobStoreRepository) repository).basePath().add(indexUUID).add(shardId).add(SEGMENTS);
RemoteBufferedOutputDirectory lockDirectory = createRemoteBufferedOutputDirectory(repository, shardLevelBlobPath, LOCK_FILES);
RemoteBufferedOutputDirectory metadataDirectory = createRemoteBufferedOutputDirectory(repository, shardLevelBlobPath, METADATA);
RemoteBufferedOutputDirectory dataDirectory = createRemoteBufferedOutputDirectory(repository, shardLevelBlobPath, DATA);
return new LockManagerWithRemoteStoreAccess(dataDirectory, metadataDirectory, lockDirectory, threadPool);
} catch (RepositoryMissingException e) {
throw new IllegalArgumentException("Repository should be present to acquire/release lock", e);
}
Expand Down
Loading

0 comments on commit 2c7b50b

Please sign in to comment.