Skip to content

Commit

Permalink
Add pinned timestamp utils and setting to enable/disable the feature (#…
Browse files Browse the repository at this point in the history
…15401) (#15549)

(cherry picked from commit 9014894)

Signed-off-by: Sachin Kale <kalsac@amazon.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
3 people committed Aug 31, 2024
1 parent a58d09b commit 4ea5a3b
Show file tree
Hide file tree
Showing 7 changed files with 754 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
package org.opensearch.remotestore;

import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand All @@ -20,6 +22,14 @@
public class RemoteStorePinnedTimestampsIT extends RemoteStoreBaseIntegTestCase {
static final String INDEX_NAME = "remote-store-test-idx-1";

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true)
.build();
}

ActionListener<Void> noOpActionListener = new ActionListener<>() {
@Override
public void onResponse(Void unused) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_LOOKBACK_INTERVAL,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED,

SystemTemplatesService.SETTING_APPLICATION_BASED_CONFIGURATION_TEMPLATES_ENABLED,

Expand Down
176 changes: 176 additions & 0 deletions server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,27 @@
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -373,4 +380,173 @@ public static boolean isSwitchToStrictCompatibilityMode(ClusterUpdateSettingsReq
incomingSettings
) == RemoteStoreNodeService.CompatibilityMode.STRICT;
}

/**
* Determines and returns a set of metadata files that match provided pinned timestamps.
*
* This method is an overloaded version of getPinnedTimestampLockedFiles and do not use cached entries to find
* the metadata file
*
* @param metadataFiles A list of metadata file names. Expected to be sorted in descending order of timestamp.
* @param pinnedTimestampSet A set of timestamps representing pinned points in time.
* @param getTimestampFunction A function that extracts the timestamp from a metadata file name.
* @param prefixFunction A function that extracts a tuple of prefix information from a metadata file name.
* @return A set of metadata file names that are implicitly locked based on the pinned timestamps.
*/
public static Set<String> getPinnedTimestampLockedFiles(
List<String> metadataFiles,
Set<Long> pinnedTimestampSet,
Function<String, Long> getTimestampFunction,
Function<String, Tuple<String, String>> prefixFunction
) {
return getPinnedTimestampLockedFiles(metadataFiles, pinnedTimestampSet, new HashMap<>(), getTimestampFunction, prefixFunction);
}

/**
* Determines and returns a set of metadata files that match provided pinned timestamps. If pinned timestamp
* feature is not enabled, this function is a no-op.
*
* This method identifies metadata files that are considered implicitly locked due to their timestamps
* matching or being the closest preceding timestamp to the pinned timestamps. It uses a caching mechanism
* to improve performance for previously processed timestamps.
*
* The method performs the following steps:
* 1. Validates input parameters.
* 2. Updates the cache (metadataFilePinnedTimestampMap) to remove outdated entries.
* 3. Processes cached entries and identifies new timestamps to process.
* 4. For new timestamps, iterates through metadata files to find matching or closest preceding files.
* 5. Updates the cache with newly processed timestamps and their corresponding metadata files.
*
* @param metadataFiles A list of metadata file names. Expected to be sorted in descending order of timestamp.
* @param pinnedTimestampSet A set of timestamps representing pinned points in time.
* @param metadataFilePinnedTimestampMap A map used for caching processed timestamps and their corresponding metadata files.
* @param getTimestampFunction A function that extracts the timestamp from a metadata file name.
* @param prefixFunction A function that extracts a tuple of prefix information from a metadata file name.
* @return A set of metadata file names that are implicitly locked based on the pinned timestamps.
*
*/
public static Set<String> getPinnedTimestampLockedFiles(
List<String> metadataFiles,
Set<Long> pinnedTimestampSet,
Map<Long, String> metadataFilePinnedTimestampMap,
Function<String, Long> getTimestampFunction,
Function<String, Tuple<String, String>> prefixFunction
) {
Set<String> implicitLockedFiles = new HashSet<>();

if (RemoteStoreSettings.isPinnedTimestampsEnabled() == false) {
return implicitLockedFiles;
}

if (metadataFiles == null || metadataFiles.isEmpty() || pinnedTimestampSet == null) {
return implicitLockedFiles;
}

// Remove entries for timestamps that are no longer pinned
metadataFilePinnedTimestampMap.keySet().retainAll(pinnedTimestampSet);

// Add cached entries and collect new timestamps
Set<Long> newPinnedTimestamps = new TreeSet<>(Collections.reverseOrder());
for (Long pinnedTimestamp : pinnedTimestampSet) {
String cachedFile = metadataFilePinnedTimestampMap.get(pinnedTimestamp);
if (cachedFile != null) {
implicitLockedFiles.add(cachedFile);
} else {
newPinnedTimestamps.add(pinnedTimestamp);
}
}

if (newPinnedTimestamps.isEmpty()) {
return implicitLockedFiles;
}

// Sort metadata files in descending order of timestamp
// ToDo: Do we really need this? Files fetched from remote store are already lexicographically sorted.
metadataFiles.sort(String::compareTo);

// If we have metadata files from multiple writers, it can result in picking file generated by stale primary.
// To avoid this, we fail fast.
RemoteStoreUtils.verifyNoMultipleWriters(metadataFiles, prefixFunction);

Iterator<Long> timestampIterator = newPinnedTimestamps.iterator();
Long currentPinnedTimestamp = timestampIterator.next();
long prevMdTimestamp = Long.MAX_VALUE;
for (String metadataFileName : metadataFiles) {
long currentMdTimestamp = getTimestampFunction.apply(metadataFileName);
// We always prefer md file with higher values of prefix like primary term, generation etc.
if (currentMdTimestamp > prevMdTimestamp) {
continue;
}
while (currentMdTimestamp <= currentPinnedTimestamp && prevMdTimestamp > currentPinnedTimestamp) {
implicitLockedFiles.add(metadataFileName);
// Do not cache entry for latest metadata file as the next metadata can also match the same pinned timestamp
if (prevMdTimestamp != Long.MAX_VALUE) {
metadataFilePinnedTimestampMap.put(currentPinnedTimestamp, metadataFileName);
}
if (timestampIterator.hasNext() == false) {
return implicitLockedFiles;
}
currentPinnedTimestamp = timestampIterator.next();
}
prevMdTimestamp = currentMdTimestamp;
}

return implicitLockedFiles;
}

/**
* Filters out metadata files based on their age and pinned timestamps settings.
*
* This method filters a list of metadata files, keeping only those that are older
* than a certain threshold determined by the last successful fetch of pinned timestamps
* and a configured lookback interval.
*
* @param metadataFiles A list of metadata file names to be filtered.
* @param getTimestampFunction A function that extracts a timestamp from a metadata file name.
* @param lastSuccessfulFetchOfPinnedTimestamps The timestamp of the last successful fetch of pinned timestamps.
* @return A new list containing only the metadata files that meet the age criteria.
* If pinned timestamps are not enabled, returns a copy of the input list.
*/
public static List<String> filterOutMetadataFilesBasedOnAge(
List<String> metadataFiles,
Function<String, Long> getTimestampFunction,
long lastSuccessfulFetchOfPinnedTimestamps
) {
if (RemoteStoreSettings.isPinnedTimestampsEnabled() == false) {
return new ArrayList<>(metadataFiles);
}
long maximumAllowedTimestamp = lastSuccessfulFetchOfPinnedTimestamps - RemoteStoreSettings.getPinnedTimestampsLookbackInterval()
.getMillis();
List<String> metadataFilesWithMinAge = new ArrayList<>();
for (String metadataFileName : metadataFiles) {
long metadataTimestamp = getTimestampFunction.apply(metadataFileName);
if (metadataTimestamp < maximumAllowedTimestamp) {
metadataFilesWithMinAge.add(metadataFileName);
}
}
return metadataFilesWithMinAge;
}

/**
* Determines if the pinned timestamp state is stale.
*
* This method checks whether the last successful fetch of pinned timestamps
* is considered stale based on the current time and configured intervals.
* The state is considered stale if the last successful fetch occurred before
* a certain threshold, which is calculated as three times the scheduler interval
* plus the lookback interval.
*
* @return true if the pinned timestamp state is stale, false otherwise.
* Always returns false if pinned timestamps are not enabled.
*/
public static boolean isPinnedTimestampStateStale() {
if (RemoteStoreSettings.isPinnedTimestampsEnabled() == false) {
return false;
}
long lastSuccessfulFetchTimestamp = RemoteStorePinnedTimestampService.getPinnedTimestamps().v1();
long staleBufferInMillis = (RemoteStoreSettings.getPinnedTimestampsSchedulerInterval().millis() * 3) + RemoteStoreSettings
.getPinnedTimestampsLookbackInterval()
.millis();
return lastSuccessfulFetchTimestamp < (System.currentTimeMillis() - staleBufferInMillis);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,11 @@ static long getGeneration(String[] filenameTokens) {
return RemoteStoreUtils.invertLong(filenameTokens[2]);
}

public static long getTimestamp(String filename) {
String[] filenameTokens = filename.split(SEPARATOR);
return RemoteStoreUtils.invertLong(filenameTokens[6]);
}

public static Tuple<String, String> getNodeIdByPrimaryTermAndGen(String filename) {
String[] tokens = filename.split(SEPARATOR);
if (tokens.length < 8) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,15 @@ public class RemoteStoreSettings {
Property.Dynamic
);

/**
* Controls pinned timestamp feature enablement
*/
public static final Setting<Boolean> CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED = Setting.boolSetting(
"cluster.remote_store.pinned_timestamps.enabled",
false,
Setting.Property.NodeScope
);

/**
* Controls pinned timestamp scheduler interval
*/
Expand Down Expand Up @@ -163,6 +172,7 @@ public class RemoteStoreSettings {
private volatile RemoteStoreEnums.PathHashAlgorithm pathHashAlgorithm;
private volatile int maxRemoteTranslogReaders;
private volatile boolean isTranslogMetadataEnabled;
private static volatile boolean isPinnedTimestampsEnabled;
private static volatile TimeValue pinnedTimestampsSchedulerInterval;
private static volatile TimeValue pinnedTimestampsLookbackInterval;

Expand Down Expand Up @@ -205,6 +215,7 @@ public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {

pinnedTimestampsSchedulerInterval = CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL.get(settings);
pinnedTimestampsLookbackInterval = CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_LOOKBACK_INTERVAL.get(settings);
isPinnedTimestampsEnabled = CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.get(settings);
}

public TimeValue getClusterRemoteTranslogBufferInterval() {
Expand Down Expand Up @@ -280,4 +291,8 @@ public static TimeValue getPinnedTimestampsSchedulerInterval() {
public static TimeValue getPinnedTimestampsLookbackInterval() {
return pinnedTimestampsLookbackInterval;
}

public static boolean isPinnedTimestampsEnabled() {
return isPinnedTimestampsEnabled;
}
}
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@
import static org.opensearch.common.util.FeatureFlags.TELEMETRY;
import static org.opensearch.env.NodeEnvironment.collectFileCacheDataPath;
import static org.opensearch.index.ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

Expand Down Expand Up @@ -811,7 +812,7 @@ protected Node(
remoteClusterStateCleanupManager = null;
}
final RemoteStorePinnedTimestampService remoteStorePinnedTimestampService;
if (isRemoteStoreAttributePresent(settings)) {
if (isRemoteStoreAttributePresent(settings) && CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.get(settings)) {
remoteStorePinnedTimestampService = new RemoteStorePinnedTimestampService(
repositoriesServiceReference::get,
settings,
Expand Down
Loading

0 comments on commit 4ea5a3b

Please sign in to comment.