Skip to content

Commit

Permalink
[Remote Store] Make translog transfer timeout configurable using dyna…
Browse files Browse the repository at this point in the history
…mic setting (#12704)

Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
sachinpkale committed Apr 3, 2024
1 parent ef2a9e5 commit b7396e1
Show file tree
Hide file tree
Showing 13 changed files with 147 additions and 84 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Derived fields support to derive field values at query time without indexing ([#12569](https://github.com/opensearch-project/OpenSearch/pull/12569))
- Detect breaking changes on pull requests ([#9044](https://github.com/opensearch-project/OpenSearch/pull/9044))
- Add cluster primary balance contraint for rebalancing with buffer ([#12656](https://github.com/opensearch-project/OpenSearch/pull/12656))
- [Remote Store] Make translog transfer timeout configurable ([#12704](https://github.com/opensearch-project/OpenSearch/pull/12704))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,8 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING,

RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4969,7 +4969,7 @@ public void deleteTranslogFilesFromRemoteTranslog() throws IOException {
TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting);
assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory;
Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository();
RemoteFsTranslog.cleanup(repository, shardId, getThreadPool(), indexSettings.getRemoteStorePathStrategy());
RemoteFsTranslog.cleanup(repository, shardId, getThreadPool(), indexSettings.getRemoteStorePathStrategy(), remoteStoreSettings);
}

/*
Expand All @@ -4992,6 +4992,7 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
getThreadPool(),
shardPath().resolveTranslog(),
indexSettings.getRemoteStorePathStrategy(),
remoteStoreSettings,
logger
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.index.translog;

import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
Expand All @@ -34,11 +35,14 @@ public class RemoteBlobStoreInternalTranslogFactory implements TranslogFactory {

private final RemoteTranslogTransferTracker remoteTranslogTransferTracker;

private final RemoteStoreSettings remoteStoreSettings;

public RemoteBlobStoreInternalTranslogFactory(
Supplier<RepositoriesService> repositoriesServiceSupplier,
ThreadPool threadPool,
String repositoryName,
RemoteTranslogTransferTracker remoteTranslogTransferTracker
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
RemoteStoreSettings remoteStoreSettings
) {
Repository repository;
try {
Expand All @@ -49,6 +53,7 @@ public RemoteBlobStoreInternalTranslogFactory(
this.repository = repository;
this.threadPool = threadPool;
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
this.remoteStoreSettings = remoteStoreSettings;
}

@Override
Expand All @@ -74,7 +79,8 @@ public Translog newTranslog(
blobStoreRepository,
threadPool,
startedPrimarySupplier,
remoteTranslogTransferTracker
remoteTranslogTransferTracker,
remoteStoreSettings
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.index.translog.transfer.TranslogTransferManager;
import org.opensearch.index.translog.transfer.TranslogTransferMetadata;
import org.opensearch.index.translog.transfer.listener.TranslogTransferListener;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -100,7 +101,8 @@ public RemoteFsTranslog(
BlobStoreRepository blobStoreRepository,
ThreadPool threadPool,
BooleanSupplier startedPrimarySupplier,
RemoteTranslogTransferTracker remoteTranslogTransferTracker
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
RemoteStoreSettings remoteStoreSettings
) throws IOException {
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer);
logger = Loggers.getLogger(getClass(), shardId);
Expand All @@ -113,7 +115,8 @@ public RemoteFsTranslog(
shardId,
fileTransferTracker,
remoteTranslogTransferTracker,
indexSettings().getRemoteStorePathStrategy()
indexSettings().getRemoteStorePathStrategy(),
remoteStoreSettings
);
try {
download(translogTransferManager, location, logger);
Expand Down Expand Up @@ -163,6 +166,7 @@ public static void download(
ThreadPool threadPool,
Path location,
RemoteStorePathStrategy pathStrategy,
RemoteStoreSettings remoteStoreSettings,
Logger logger
) throws IOException {
assert repository instanceof BlobStoreRepository : String.format(
Expand All @@ -181,7 +185,8 @@ public static void download(
shardId,
fileTransferTracker,
remoteTranslogTransferTracker,
pathStrategy
pathStrategy,
remoteStoreSettings
);
RemoteFsTranslog.download(translogTransferManager, location, logger);
logger.trace(remoteTranslogTransferTracker.toString());
Expand Down Expand Up @@ -259,7 +264,8 @@ public static TranslogTransferManager buildTranslogTransferManager(
ShardId shardId,
FileTransferTracker fileTransferTracker,
RemoteTranslogTransferTracker tracker,
RemoteStorePathStrategy pathStrategy
RemoteStorePathStrategy pathStrategy,
RemoteStoreSettings remoteStoreSettings
) {
assert Objects.nonNull(pathStrategy);
String indexUUID = shardId.getIndex().getUUID();
Expand All @@ -281,7 +287,7 @@ public static TranslogTransferManager buildTranslogTransferManager(
.build();
BlobPath mdPath = pathStrategy.generatePath(mdPathInput);
BlobStoreTransferService transferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool);
return new TranslogTransferManager(shardId, transferService, dataPath, mdPath, fileTransferTracker, tracker);
return new TranslogTransferManager(shardId, transferService, dataPath, mdPath, fileTransferTracker, tracker, remoteStoreSettings);
}

@Override
Expand Down Expand Up @@ -553,8 +559,13 @@ private void deleteStaleRemotePrimaryTerms() {
}
}

public static void cleanup(Repository repository, ShardId shardId, ThreadPool threadPool, RemoteStorePathStrategy pathStrategy)
throws IOException {
public static void cleanup(
Repository repository,
ShardId shardId,
ThreadPool threadPool,
RemoteStorePathStrategy pathStrategy,
RemoteStoreSettings remoteStoreSettings
) throws IOException {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
// We use a dummy stats tracker to ensure the flow doesn't break.
Expand All @@ -567,7 +578,8 @@ public static void cleanup(Repository repository, ShardId shardId, ThreadPool th
shardId,
fileTransferTracker,
remoteTranslogTransferTracker,
pathStrategy
pathStrategy,
remoteStoreSettings
);
// clean up all remote translog files
translogTransferManager.deleteTranslogFiles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.transfer.listener.TranslogTransferListener;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
Expand Down Expand Up @@ -60,9 +61,7 @@ public class TranslogTransferManager {
private final BlobPath remoteMetadataTransferPath;
private final FileTransferTracker fileTransferTracker;
private final RemoteTranslogTransferTracker remoteTranslogTransferTracker;

private static final long TRANSFER_TIMEOUT_IN_MILLIS = 30000;

private final RemoteStoreSettings remoteStoreSettings;
private static final int METADATA_FILES_TO_FETCH = 10;

private final Logger logger;
Expand All @@ -79,7 +78,8 @@ public TranslogTransferManager(
BlobPath remoteDataTransferPath,
BlobPath remoteMetadataTransferPath,
FileTransferTracker fileTransferTracker,
RemoteTranslogTransferTracker remoteTranslogTransferTracker
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
RemoteStoreSettings remoteStoreSettings
) {
this.shardId = shardId;
this.transferService = transferService;
Expand All @@ -88,6 +88,7 @@ public TranslogTransferManager(
this.fileTransferTracker = fileTransferTracker;
this.logger = Loggers.getLogger(getClass(), shardId);
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
this.remoteStoreSettings = remoteStoreSettings;
}

public RemoteTranslogTransferTracker getRemoteTranslogTransferTracker() {
Expand Down Expand Up @@ -151,7 +152,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
transferService.uploadBlobs(toUpload, blobPathMap, latchedActionListener, WritePriority.HIGH);

try {
if (latch.await(TRANSFER_TIMEOUT_IN_MILLIS, TimeUnit.MILLISECONDS) == false) {
if (latch.await(remoteStoreSettings.getClusterRemoteTranslogTransferTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
Exception ex = new TranslogUploadFailedException(
"Timed out waiting for transfer of snapshot " + transferSnapshot + " to complete"
);
Expand Down
12 changes: 8 additions & 4 deletions server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,8 @@ protected void closeInternal() {
repositoriesServiceSupplier,
threadPool,
remoteStoreStatsTrackerFactory,
settings
settings,
remoteStoreSettings
);
this.searchRequestStats = searchRequestStats;
this.clusterDefaultRefreshInterval = CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.get(clusterService.getSettings());
Expand Down Expand Up @@ -528,22 +529,25 @@ private static BiFunction<IndexSettings, ShardRouting, TranslogFactory> getTrans
Supplier<RepositoriesService> repositoriesServiceSupplier,
ThreadPool threadPool,
RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
Settings settings
Settings settings,
RemoteStoreSettings remoteStoreSettings
) {
return (indexSettings, shardRouting) -> {
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
return new RemoteBlobStoreInternalTranslogFactory(
repositoriesServiceSupplier,
threadPool,
indexSettings.getRemoteStoreTranslogRepository(),
remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId())
remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId()),
remoteStoreSettings
);
} else if (isRemoteDataAttributePresent(settings) && shardRouting.primary()) {
return new RemoteBlobStoreInternalTranslogFactory(
repositoriesServiceSupplier,
threadPool,
RemoteStoreNodeAttribute.getRemoteStoreTranslogRepo(indexSettings.getNodeSettings()),
remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId())
remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId()),
remoteStoreSettings
);
}
return new InternalTranslogFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,20 @@ public class RemoteStoreSettings {
Property.Dynamic
);

/**
* Controls timeout value while uploading translog and checkpoint files to remote translog
*/
public static final Setting<TimeValue> CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.remote_store.translog.transfer_timeout",
TimeValue.timeValueSeconds(30),
TimeValue.timeValueSeconds(30),
Property.NodeScope,
Property.Dynamic
);

private volatile TimeValue clusterRemoteTranslogBufferInterval;
private volatile int minRemoteSegmentMetadataFiles;
private volatile TimeValue clusterRemoteTranslogTransferTimeout;

public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {
this.clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings);
Expand All @@ -69,9 +81,14 @@ public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {
CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
this::setMinRemoteSegmentMetadataFiles
);

this.clusterRemoteTranslogTransferTimeout = CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(
CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING,
this::setClusterRemoteTranslogTransferTimeout
);
}

// Exclusively for testing, please do not use it elsewhere.
public TimeValue getClusterRemoteTranslogBufferInterval() {
return clusterRemoteTranslogBufferInterval;
}
Expand All @@ -87,4 +104,12 @@ private void setMinRemoteSegmentMetadataFiles(int minRemoteSegmentMetadataFiles)
public int getMinRemoteSegmentMetadataFiles() {
return this.minRemoteSegmentMetadataFiles;
}

public TimeValue getClusterRemoteTranslogTransferTimeout() {
return clusterRemoteTranslogTransferTimeout;
}

private void setClusterRemoteTranslogTransferTimeout(TimeValue clusterRemoteTranslogTransferTimeout) {
this.clusterRemoteTranslogTransferTimeout = clusterRemoteTranslogTransferTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ private IndexService newIndexService(IndexModule module) throws IOException {
repositoriesServiceReference::get,
threadPool,
indexSettings.getRemoteStoreTranslogRepository(),
new RemoteTranslogTransferTracker(shardRouting.shardId(), 10)
new RemoteTranslogTransferTracker(shardRouting.shardId(), 10),
DefaultRemoteStoreSettings.INSTANCE
);
}
return new InternalTranslogFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.index.translog.transfer.TranslogTransferManager;
import org.opensearch.index.translog.transfer.TranslogTransferMetadata;
import org.opensearch.index.translog.transfer.TranslogUploadFailedException;
import org.opensearch.indices.DefaultRemoteStoreSettings;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
Expand Down Expand Up @@ -188,7 +189,8 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin
repository,
threadPool,
primaryMode::get,
new RemoteTranslogTransferTracker(shardId, 10)
new RemoteTranslogTransferTracker(shardId, 10),
DefaultRemoteStoreSettings.INSTANCE
);
}

Expand Down Expand Up @@ -459,7 +461,8 @@ public void testExtraGenToKeep() throws Exception {
repository,
threadPool,
() -> Boolean.TRUE,
new RemoteTranslogTransferTracker(shardId, 10)
new RemoteTranslogTransferTracker(shardId, 10),
DefaultRemoteStoreSettings.INSTANCE
) {
@Override
ChannelFactory getChannelFactory() {
Expand Down Expand Up @@ -1508,7 +1511,8 @@ public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException {
repository,
threadPool,
() -> Boolean.TRUE,
new RemoteTranslogTransferTracker(shardId, 10)
new RemoteTranslogTransferTracker(shardId, 10),
DefaultRemoteStoreSettings.INSTANCE
) {
@Override
ChannelFactory getChannelFactory() {
Expand Down Expand Up @@ -1616,7 +1620,8 @@ public void force(boolean metaData) throws IOException {
repository,
threadPool,
() -> Boolean.TRUE,
new RemoteTranslogTransferTracker(shardId, 10)
new RemoteTranslogTransferTracker(shardId, 10),
DefaultRemoteStoreSettings.INSTANCE
) {
@Override
ChannelFactory getChannelFactory() {
Expand Down
Loading

0 comments on commit b7396e1

Please sign in to comment.