Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce TranslogFactory for Local/Remote Translog support #4172

Merged
merged 3 commits into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.opensearch.index.mapper.SourceToParse;
import org.opensearch.index.seqno.RetentionLeaseSyncer;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.translog.InternalTranslogFactory;
import org.opensearch.index.translog.TestTranslog;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogStats;
Expand Down Expand Up @@ -675,6 +676,7 @@ public static final IndexShard newIndexShard(
() -> {},
RetentionLeaseSyncer.EMPTY,
cbs,
new InternalTranslogFactory(),
SegmentReplicationCheckpointPublisher.EMPTY,
null
);
Expand Down
3 changes: 3 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.InternalTranslogFactory;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.cluster.IndicesClusterStateService;
Expand Down Expand Up @@ -547,6 +548,8 @@ public synchronized IndexShard createShard(
() -> globalCheckpointSyncer.accept(shardId),
retentionLeaseSyncer,
circuitBreakerService,
// TODO Replace with remote translog factory in the follow up PR
this.indexSettings.isRemoteTranslogStoreEnabled() ? null : new InternalTranslogFactory(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also need to check if the node is primary?

Copy link
Collaborator Author

@Bukhtawar Bukhtawar Aug 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally the engine should take care of using the write flavor of TranslogManager. Will ensure we add the right assertions once RemoteFactory changes are made. There is a PR in flight #4127 which will take care of primary and replica

this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null,
remoteStore
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.InternalTranslogFactory;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.indices.IndexingMemoryController;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -150,6 +152,8 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {

private final TranslogConfig translogConfig;

private final TranslogFactory translogFactory;

public EngineConfig(
ShardId shardId,
ThreadPool threadPool,
Expand Down Expand Up @@ -253,7 +257,8 @@ public EngineConfig(
retentionLeasesSupplier,
primaryTermSupplier,
tombstoneDocSupplier,
false
false,
new InternalTranslogFactory()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need similar check that is added for IndexService here as well?

this.indexSettings.isRemoteTranslogStoreEnabled() ? null : new InternalTranslogFactory()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the default constructor for EngineConfig

);
}

Expand Down Expand Up @@ -284,7 +289,8 @@ public EngineConfig(
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier,
boolean isReadOnlyReplica
boolean isReadOnlyReplica,
TranslogFactory translogFactory
) {
if (isReadOnlyReplica && indexSettings.isSegRepEnabled() == false) {
throw new IllegalArgumentException("Shard can only be wired as a read only replica with Segment Replication enabled");
Expand Down Expand Up @@ -328,6 +334,7 @@ public EngineConfig(
this.primaryTermSupplier = primaryTermSupplier;
this.tombstoneDocSupplier = tombstoneDocSupplier;
this.isReadOnlyReplica = isReadOnlyReplica;
this.translogFactory = translogFactory;
}

/**
Expand Down Expand Up @@ -532,6 +539,14 @@ public boolean isReadOnlyReplica() {
return indexSettings.isSegRepEnabled() && isReadOnlyReplica;
}

/**
* Returns the underlying translog factory
* @return the translog factory
*/
public TranslogFactory getTranslogFactory() {
return translogFactory;
}

/**
* A supplier supplies tombstone documents which will be used in soft-update methods.
* The returned document consists only _uid, _seqno, _term and _version fields; other metadata fields are excluded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.plugins.EnginePlugin;
import org.opensearch.plugins.PluginsService;
Expand Down Expand Up @@ -147,7 +148,8 @@ public EngineConfig newEngineConfig(
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier,
boolean isReadOnlyReplica
boolean isReadOnlyReplica,
TranslogFactory translogFactory
) {
CodecService codecServiceToUse = codecService;
if (codecService == null && this.codecServiceFactory != null) {
Expand Down Expand Up @@ -178,7 +180,8 @@ public EngineConfig newEngineConfig(
retentionLeasesSupplier,
primaryTermSupplier,
tombstoneDocSupplier,
isReadOnlyReplica
isReadOnlyReplica,
translogFactory
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ public void onFailure(String reason, Exception ex) {
() -> getLocalCheckpointTracker(),
translogUUID,
new CompositeTranslogEventListener(Arrays.asList(internalTranslogEventListener, translogEventListener), shardId),
this::ensureOpen
this::ensureOpen,
engineConfig.getTranslogFactory()
);
this.translogManager = translogManagerRef;
this.softDeletesPolicy = newSoftDeletesPolicy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ public void onAfterTranslogSync() {
}
}
},
this
this,
engineConfig.getTranslogFactory()
);
this.translogManager = translogManagerRef;
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,14 +195,15 @@ public void trimUnreferencedTranslogFiles() throws TranslogException {
final TranslogDeletionPolicy translogDeletionPolicy = new DefaultTranslogDeletionPolicy(-1, -1, 0);
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
try (
Translog translog = new Translog(
translogConfig,
translogUuid,
translogDeletionPolicy,
engineConfig.getGlobalCheckpointSupplier(),
engineConfig.getPrimaryTermSupplier(),
seqNo -> {}
)
Translog translog = engineConfig.getTranslogFactory()
.newTranslog(
translogConfig,
translogUuid,
translogDeletionPolicy,
engineConfig.getGlobalCheckpointSupplier(),
engineConfig.getPrimaryTermSupplier(),
seqNo -> {}
)
) {
translog.trimUnreferencedReaders();
// refresh the translog stats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,14 +258,15 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm
final long localCheckpoint = Long.parseLong(infos.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
try (
Translog translog = new Translog(
translogConfig,
translogUuid,
translogDeletionPolicy,
config.getGlobalCheckpointSupplier(),
config.getPrimaryTermSupplier(),
seqNo -> {}
)
Translog translog = config.getTranslogFactory()
.newTranslog(
translogConfig,
translogUuid,
translogDeletionPolicy,
config.getGlobalCheckpointSupplier(),
config.getPrimaryTermSupplier(),
seqNo -> {}
)
) {
return translog.stats();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@
import org.opensearch.index.store.StoreStats;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.index.translog.TranslogRecoveryRunner;
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.index.warmer.ShardIndexWarmerService;
Expand Down Expand Up @@ -308,6 +309,7 @@ Runnable getGlobalCheckpointSyncer() {
private final ReferenceManager.RefreshListener checkpointRefreshListener;

private final Store remoteStore;
private final TranslogFactory translogFactory;

public IndexShard(
final ShardRouting shardRouting,
Expand All @@ -330,6 +332,7 @@ public IndexShard(
final Runnable globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final CircuitBreakerService circuitBreakerService,
final TranslogFactory translogFactory,
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable final Store remoteStore
) throws IOException {
Expand Down Expand Up @@ -420,6 +423,7 @@ public boolean shouldCache(Query query) {
this.checkpointRefreshListener = null;
}
this.remoteStore = remoteStore;
this.translogFactory = translogFactory;
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -3247,7 +3251,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
replicationTracker::getRetentionLeases,
() -> getOperationPrimaryTerm(),
tombstoneDocSupplier(),
indexSettings.isSegRepEnabled() && shardRouting.primary() == false
indexSettings.isSegRepEnabled() && shardRouting.primary() == false,
translogFactory
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.translog;

import java.io.IOException;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;

/**
* Translog Factory for the local on-disk {@link Translog}
*
* @opensearch.internal
*/
public class InternalTranslogFactory implements TranslogFactory {

@Override
public Translog newTranslog(
TranslogConfig translogConfig,
String translogUUID,
TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier,
LongSupplier primaryTermSupplier,
LongConsumer persistedSequenceNumberConsumer
) throws IOException {

return new Translog(
translogConfig,
translogUUID,
translogDeletionPolicy,
globalCheckpointSupplier,
primaryTermSupplier,
persistedSequenceNumberConsumer
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public InternalTranslogManager(
Supplier<LocalCheckpointTracker> localCheckpointTrackerSupplier,
String translogUUID,
TranslogEventListener translogEventListener,
LifecycleAware engineLifeCycleAware
LifecycleAware engineLifeCycleAware,
TranslogFactory translogFactory
) throws IOException {
this.shardId = shardId;
this.readLock = readLock;
Expand All @@ -67,7 +68,7 @@ public InternalTranslogManager(
if (tracker != null) {
tracker.markSeqNoAsPersisted(seqNo);
}
}, translogUUID);
}, translogUUID, translogFactory);
assert translog.getGeneration() != null;
this.translog = translog;
assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it";
Expand Down Expand Up @@ -333,10 +334,11 @@ protected Translog openTranslog(
TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier,
LongConsumer persistedSequenceNumberConsumer,
String translogUUID
String translogUUID,
TranslogFactory translogFactory
) throws IOException {

return new Translog(
return translogFactory.newTranslog(
translogConfig,
translogUUID,
translogDeletionPolicy,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.translog;

import java.io.IOException;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;

/**
* Translog Factory to enable creation of various local on-disk
* and remote store flavors of {@link Translog}
*
* @opensearch.internal
*/
@FunctionalInterface
public interface TranslogFactory {

Translog newTranslog(
final TranslogConfig config,
final String translogUUID,
final TranslogDeletionPolicy deletionPolicy,
final LongSupplier globalCheckpointSupplier,
final LongSupplier primaryTermSupplier,
final LongConsumer persistedSequenceNumberConsumer
) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public WriteOnlyTranslogManager(
Supplier<LocalCheckpointTracker> localCheckpointTrackerSupplier,
String translogUUID,
TranslogEventListener translogEventListener,
LifecycleAware engineLifecycleAware
LifecycleAware engineLifecycleAware,
TranslogFactory translogFactory
) throws IOException {
super(
translogConfig,
Expand All @@ -47,7 +48,8 @@ public WriteOnlyTranslogManager(
localCheckpointTrackerSupplier,
translogUUID,
translogEventListener,
engineLifecycleAware
engineLifecycleAware,
translogFactory
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.codec.CodecServiceFactory;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.translog.InternalTranslogFactory;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
import org.opensearch.index.translog.TranslogReader;
Expand Down Expand Up @@ -66,7 +67,8 @@ public void testCreateEngineConfigFromFactory() {
() -> new RetentionLeases(0, 0, Collections.emptyList()),
null,
null,
false
false,
new InternalTranslogFactory()
);

assertNotNull(config.getCodec());
Expand Down Expand Up @@ -143,7 +145,8 @@ public void testCreateCodecServiceFromFactory() {
() -> new RetentionLeases(0, 0, Collections.emptyList()),
null,
null,
false
false,
new InternalTranslogFactory()
);
assertNotNull(config.getCodec());
}
Expand Down
Loading