Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reindex negative TimeValue fix #54057

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -247,16 +247,16 @@ public void start() {
void onScrollResponse(ScrollableHitSource.AsyncResponse asyncResponse) {
// lastBatchStartTime is essentially unused (see WorkerBulkByScrollTaskState.throttleWaitTime. Leaving it for now, since it seems
// like a bug?
onScrollResponse(new TimeValue(System.nanoTime()), this.lastBatchSize, asyncResponse);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was interesting anyway since this constructor TimeValue(long) is intended to be passed a value that represents milliseconds, which System#nanoTime does not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I double checked that and I introduced that "bug" when refactoring this last year. It never showed up, since the variable ends up unused anyway...

onScrollResponse(System.nanoTime(), this.lastBatchSize, asyncResponse);
}

/**
* Process a scroll response.
* @param lastBatchStartTime the time when the last batch started. Used to calculate the throttling delay.
* @param lastBatchStartTimeNS the time when the last batch started. Used to calculate the throttling delay.
* @param lastBatchSize the size of the last batch. Used to calculate the throttling delay.
* @param asyncResponse the response to process from ScrollableHitSource
*/
void onScrollResponse(TimeValue lastBatchStartTime, int lastBatchSize, ScrollableHitSource.AsyncResponse asyncResponse) {
void onScrollResponse(long lastBatchStartTimeNS, int lastBatchSize, ScrollableHitSource.AsyncResponse asyncResponse) {
ScrollableHitSource.Response response = asyncResponse.response();
logger.debug("[{}]: got scroll response with [{}] hits", task.getId(), response.getHits().size());
if (task.isCancelled()) {
Expand Down Expand Up @@ -284,7 +284,7 @@ protected void doRun() throws Exception {
* It is important that the batch start time be calculated from here, scroll response to scroll response. That way the time
* waiting on the scroll doesn't count against this batch in the throttle.
*/
prepareBulkRequest(timeValueNanos(System.nanoTime()), asyncResponse);
prepareBulkRequest(System.nanoTime(), asyncResponse);
}

@Override
Expand All @@ -293,15 +293,15 @@ public void onFailure(Exception e) {
}
};
prepareBulkRequestRunnable = (AbstractRunnable) threadPool.getThreadContext().preserveContext(prepareBulkRequestRunnable);
worker.delayPrepareBulkRequest(threadPool, lastBatchStartTime, lastBatchSize, prepareBulkRequestRunnable);
worker.delayPrepareBulkRequest(threadPool, lastBatchStartTimeNS, lastBatchSize, prepareBulkRequestRunnable);
}

/**
* Prepare the bulk request. Called on the generic thread pool after some preflight checks have been done one the SearchResponse and any
* delay has been slept. Uses the generic thread pool because reindex is rare enough not to need its own thread pool and because the
* thread may be blocked by the user script.
*/
void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.AsyncResponse asyncResponse) {
void prepareBulkRequest(long thisBatchStartTimeNS, ScrollableHitSource.AsyncResponse asyncResponse) {
ScrollableHitSource.Response response = asyncResponse.response();
logger.debug("[{}]: preparing bulk request", task.getId());
if (task.isCancelled()) {
Expand All @@ -327,12 +327,12 @@ void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.AsyncR
/*
* If we noop-ed the entire batch then just skip to the next batch or the BulkRequest would fail validation.
*/
notifyDone(thisBatchStartTime, asyncResponse, 0);
notifyDone(thisBatchStartTimeNS, asyncResponse, 0);
return;
}
request.timeout(mainRequest.getTimeout());
request.waitForActiveShards(mainRequest.getWaitForActiveShards());
sendBulkRequest(request, () -> notifyDone(thisBatchStartTime, asyncResponse, request.requests().size()));
sendBulkRequest(request, () -> notifyDone(thisBatchStartTimeNS, asyncResponse, request.requests().size()));
}

/**
Expand Down Expand Up @@ -418,14 +418,14 @@ void onBulkResponse(BulkResponse response, Runnable onSuccess) {
}
}

void notifyDone(TimeValue thisBatchStartTime, ScrollableHitSource.AsyncResponse asyncResponse, int batchSize) {
void notifyDone(long thisBatchStartTimeNS, ScrollableHitSource.AsyncResponse asyncResponse, int batchSize) {
if (task.isCancelled()) {
logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
finishHim(null);
return;
}
this.lastBatchSize = batchSize;
asyncResponse.done(worker.throttleWaitTime(thisBatchStartTime, timeValueNanos(System.nanoTime()), batchSize));
asyncResponse.done(worker.throttleWaitTime(thisBatchStartTimeNS, System.nanoTime(), batchSize));
}

private void recordFailure(Failure failure, List<Failure> failures) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@
import static org.apache.lucene.util.TestUtil.randomSimpleString;
import static org.elasticsearch.action.bulk.BackoffPolicy.constantBackoff;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -255,7 +254,7 @@ public void testScrollResponseSetsTotal() {

long total = randomIntBetween(0, Integer.MAX_VALUE);
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), total, emptyList(), null);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueSeconds(0), 0, response);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), 0, 0, response);
assertEquals(total, testTask.getStatus().getTotal());
}

Expand All @@ -268,7 +267,7 @@ public void testScrollResponseBatchingBehavior() throws Exception {
Hit hit = new ScrollableHitSource.BasicHit("index", "id", 0);
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 1, singletonList(hit), null);
DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction();
simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 0, response);
simulateScrollResponse(action, System.nanoTime(), 0, response);

// Use assert busy because the update happens on another thread
final int expectedBatches = batches;
Expand Down Expand Up @@ -354,7 +353,7 @@ public ScheduledCancellable schedule(Runnable command, TimeValue delay, String n
}
});
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 0, emptyList(), null);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 10, response);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), System.nanoTime(), 10, response);
ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
assertThat(e.getCause(), instanceOf(EsRejectedExecutionException.class));
assertThat(e.getCause(), hasToString(containsString("test")));
Expand All @@ -372,7 +371,7 @@ public void testShardFailuresAbortRequest() throws Exception {
SearchFailure shardFailure = new SearchFailure(new RuntimeException("test"));
ScrollableHitSource.Response scrollResponse = new ScrollableHitSource.Response(false, singletonList(shardFailure), 0,
emptyList(), null);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), System.nanoTime(), 0, scrollResponse);
BulkByScrollResponse response = listener.get();
assertThat(response.getBulkFailures(), empty());
assertThat(response.getSearchFailures(), contains(shardFailure));
Expand All @@ -386,7 +385,7 @@ public void testShardFailuresAbortRequest() throws Exception {
*/
public void testSearchTimeoutsAbortRequest() throws Exception {
ScrollableHitSource.Response scrollResponse = new ScrollableHitSource.Response(true, emptyList(), 0, emptyList(), null);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), System.nanoTime(), 0, scrollResponse);
BulkByScrollResponse response = listener.get();
assertThat(response.getBulkFailures(), empty());
assertThat(response.getSearchFailures(), empty());
Expand Down Expand Up @@ -423,7 +422,7 @@ protected AbstractAsyncBulkByScrollAction.RequestWrapper<?> buildRequest(Hit doc
ScrollableHitSource.BasicHit hit = new ScrollableHitSource.BasicHit("index", "id", 0);
hit.setSource(new BytesArray("{}"), XContentType.JSON);
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 1, singletonList(hit), null);
simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 0, response);
simulateScrollResponse(action, System.nanoTime(), 0, response);
ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
assertThat(e.getCause(), instanceOf(RuntimeException.class));
assertThat(e.getCause().getMessage(), equalTo("surprise"));
Expand Down Expand Up @@ -619,7 +618,7 @@ public void testCancelBeforeInitialSearch() throws Exception {
}

public void testCancelBeforeScrollResponse() throws Exception {
cancelTaskCase((DummyAsyncBulkByScrollAction action) -> simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 1,
cancelTaskCase((DummyAsyncBulkByScrollAction action) -> simulateScrollResponse(action, System.nanoTime(), 1,
new ScrollableHitSource.Response(false, emptyList(), between(1, 100000), emptyList(), null)));
}

Expand All @@ -634,7 +633,7 @@ public void testCancelBeforeOnBulkResponse() throws Exception {
}

public void testCancelBeforeStartNextScroll() throws Exception {
TimeValue now = timeValueNanos(System.nanoTime());
long now = System.nanoTime();
cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.notifyDone(now, null, 0));
}

Expand Down Expand Up @@ -683,7 +682,7 @@ public ScheduledCancellable schedule(Runnable command, TimeValue delay, String n
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), total, emptyList(), null);
// Use a long delay here so the test will time out if the cancellation doesn't reschedule the throttled task
worker.rethrottle(1);
simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 1000, response);
simulateScrollResponse(action, System.nanoTime(), 1000, response);

// Now that we've got our cancel we'll just verify that it all came through all right
assertEquals(reason, listener.get(10, TimeUnit.SECONDS).getReasonCancelled());
Expand Down Expand Up @@ -712,7 +711,7 @@ private void cancelTaskCase(Consumer<DummyAsyncBulkByScrollAction> testMe) throw
/**
* Simulate a scroll response by setting the scroll id and firing the onScrollResponse method.
*/
private void simulateScrollResponse(DummyAsyncBulkByScrollAction action, TimeValue lastBatchTime, int lastBatchSize,
private void simulateScrollResponse(DummyAsyncBulkByScrollAction action, long lastBatchTime, int lastBatchSize,
ScrollableHitSource.Response response) {
action.setScroll(scrollId());
action.onScrollResponse(lastBatchTime, lastBatchSize, new ScrollableHitSource.AsyncResponse() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,11 @@ TimeValue throttledUntil() {
* Schedule prepareBulkRequestRunnable to run after some delay. This is where throttling plugs into reindexing so the request can be
* rescheduled over and over again.
*/
public void delayPrepareBulkRequest(ThreadPool threadPool, TimeValue lastBatchStartTime, int lastBatchSize,
public void delayPrepareBulkRequest(ThreadPool threadPool, long lastBatchStartTimeNS, int lastBatchSize,
AbstractRunnable prepareBulkRequestRunnable) {
// Synchronize so we are less likely to schedule the same request twice.
synchronized (delayedPrepareBulkRequestReference) {
TimeValue delay = throttleWaitTime(lastBatchStartTime, timeValueNanos(System.nanoTime()), lastBatchSize);
TimeValue delay = throttleWaitTime(lastBatchStartTimeNS, System.nanoTime(), lastBatchSize);
logger.debug("[{}]: preparing bulk request for [{}]", task.getId(), delay);
try {
delayedPrepareBulkRequestReference.set(new DelayedPrepareBulkRequest(threadPool, getRequestsPerSecond(),
Expand All @@ -197,8 +197,8 @@ public void delayPrepareBulkRequest(ThreadPool threadPool, TimeValue lastBatchSt
}
}

public TimeValue throttleWaitTime(TimeValue lastBatchStartTime, TimeValue now, int lastBatchSize) {
long earliestNextBatchStartTime = now.nanos() + (long) perfectlyThrottledBatchTime(lastBatchSize);
public TimeValue throttleWaitTime(long lastBatchStartTimeNS, long nowNS, int lastBatchSize) {
long earliestNextBatchStartTime = nowNS + (long) perfectlyThrottledBatchTime(lastBatchSize);
long waitTime = min(MAX_THROTTLE_WAIT_TIME.nanos(), max(0, earliestNextBatchStartTime - System.nanoTime()));
return timeValueNanos(waitTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.closeTo;
Expand Down Expand Up @@ -152,7 +151,7 @@ public ScheduledCancellable schedule(Runnable command, TimeValue delay, String n
}
};
try {
workerState.delayPrepareBulkRequest(threadPool, timeValueNanos(System.nanoTime()), batchSizeForMaxDelay,
workerState.delayPrepareBulkRequest(threadPool, System.nanoTime(), batchSizeForMaxDelay,
new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
Expand Down Expand Up @@ -225,7 +224,7 @@ public boolean isCancelled() {
};
try {
// Have the task use the thread pool to delay a task that does nothing
workerState.delayPrepareBulkRequest(threadPool, timeValueSeconds(0), 1, new AbstractRunnable() {
workerState.delayPrepareBulkRequest(threadPool, 0, 1, new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
}
Expand Down