Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(upgrade): add ability to provide a startingOffset for RestoreIndices #8539

Merged
merged 4 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public class RestoreIndices implements Upgrade {
public static final String URN_ARG_NAME = "urn";
public static final String URN_LIKE_ARG_NAME = "urnLike";

public static final String STARTING_OFFSET_ARG_NAME = "startingOffset";

private final List<UpgradeStep> _steps;

public RestoreIndices(final EbeanServer server, final EntityService entityService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class SendMAEStep implements UpgradeStep {

private static final int DEFAULT_BATCH_SIZE = 1000;
private static final long DEFAULT_BATCH_DELAY_MS = 250;

private static final int DEFAULT_STARTING_OFFSET = 0;
private static final int DEFAULT_THREADS = 1;

private final EbeanServer _server;
Expand Down Expand Up @@ -83,6 +85,7 @@ private RestoreIndicesArgs getArgs(UpgradeContext context) {
result.batchSize = getBatchSize(context.parsedArgs());
result.numThreads = getThreadCount(context.parsedArgs());
result.batchDelayMs = getBatchDelayMs(context.parsedArgs());
result.start = getStartingOffset(context.parsedArgs());
if (containsKey(context.parsedArgs(), RestoreIndices.ASPECT_NAME_ARG_NAME)) {
result.aspectName = context.parsedArgs().get(RestoreIndices.ASPECT_NAME_ARG_NAME).get();
}
Expand Down Expand Up @@ -124,7 +127,7 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
final int rowCount = getRowCount(args);
context.report().addLine(String.format("Found %s latest aspects in aspects table in %.2f minutes.",
rowCount, (float) (System.currentTimeMillis() - startTime) / 1000 / 60));
int start = 0;
int start = args.start;

List<Future<RestoreIndicesResult>> futures = new ArrayList<>();
startTime = System.currentTimeMillis();
Expand Down Expand Up @@ -186,6 +189,10 @@ private int getBatchSize(final Map<String, Optional<String>> parsedArgs) {
return getInt(parsedArgs, DEFAULT_BATCH_SIZE, RestoreIndices.BATCH_SIZE_ARG_NAME);
}

private int getStartingOffset(final Map<String, Optional<String>> parsedArgs) {
return getInt(parsedArgs, DEFAULT_STARTING_OFFSET, RestoreIndices.STARTING_OFFSET_ARG_NAME);
}

private long getBatchDelayMs(final Map<String, Optional<String>> parsedArgs) {
long resolvedBatchDelayMs = DEFAULT_BATCH_DELAY_MS;
if (containsKey(parsedArgs, RestoreIndices.BATCH_DELAY_MS_ARG_NAME)) {
Expand Down
Loading