Skip to content

Commit

Permalink
Reindex negative TimeValue fix
Browse files Browse the repository at this point in the history
Reindex would use timeValueNanos(System.nanoTime()). The intended use
for TimeValue is as a duration, not as absolute time. In particular,
this could result in negative TimeValue's, being unsupported in elastic#53913.
  • Loading branch information
henningandersen committed Mar 24, 2020
1 parent d046489 commit d3ad55c
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -247,16 +247,16 @@ public void start() {
void onScrollResponse(ScrollableHitSource.AsyncResponse asyncResponse) {
// lastBatchStartTime is essentially unused (see WorkerBulkByScrollTaskState.throttleWaitTime. Leaving it for now, since it seems
// like a bug?
onScrollResponse(new TimeValue(System.nanoTime()), this.lastBatchSize, asyncResponse);
onScrollResponse(System.nanoTime(), this.lastBatchSize, asyncResponse);
}

/**
* Process a scroll response.
* @param lastBatchStartTime the time when the last batch started. Used to calculate the throttling delay.
* @param lastBatchStartTimeNS the time when the last batch started. Used to calculate the throttling delay.
* @param lastBatchSize the size of the last batch. Used to calculate the throttling delay.
* @param asyncResponse the response to process from ScrollableHitSource
*/
void onScrollResponse(TimeValue lastBatchStartTime, int lastBatchSize, ScrollableHitSource.AsyncResponse asyncResponse) {
void onScrollResponse(long lastBatchStartTimeNS, int lastBatchSize, ScrollableHitSource.AsyncResponse asyncResponse) {
ScrollableHitSource.Response response = asyncResponse.response();
logger.debug("[{}]: got scroll response with [{}] hits", task.getId(), response.getHits().size());
if (task.isCancelled()) {
Expand Down Expand Up @@ -284,7 +284,7 @@ protected void doRun() throws Exception {
* It is important that the batch start time be calculated from here, scroll response to scroll response. That way the time
* waiting on the scroll doesn't count against this batch in the throttle.
*/
prepareBulkRequest(timeValueNanos(System.nanoTime()), asyncResponse);
prepareBulkRequest(System.nanoTime(), asyncResponse);
}

@Override
Expand All @@ -293,15 +293,15 @@ public void onFailure(Exception e) {
}
};
prepareBulkRequestRunnable = (AbstractRunnable) threadPool.getThreadContext().preserveContext(prepareBulkRequestRunnable);
worker.delayPrepareBulkRequest(threadPool, lastBatchStartTime, lastBatchSize, prepareBulkRequestRunnable);
worker.delayPrepareBulkRequest(threadPool, lastBatchStartTimeNS, lastBatchSize, prepareBulkRequestRunnable);
}

/**
* Prepare the bulk request. Called on the generic thread pool after some preflight checks have been done one the SearchResponse and any
* delay has been slept. Uses the generic thread pool because reindex is rare enough not to need its own thread pool and because the
* thread may be blocked by the user script.
*/
void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.AsyncResponse asyncResponse) {
void prepareBulkRequest(long thisBatchStartTimeNS, ScrollableHitSource.AsyncResponse asyncResponse) {
ScrollableHitSource.Response response = asyncResponse.response();
logger.debug("[{}]: preparing bulk request", task.getId());
if (task.isCancelled()) {
Expand All @@ -327,12 +327,12 @@ void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.AsyncR
/*
* If we noop-ed the entire batch then just skip to the next batch or the BulkRequest would fail validation.
*/
notifyDone(thisBatchStartTime, asyncResponse, 0);
notifyDone(thisBatchStartTimeNS, asyncResponse, 0);
return;
}
request.timeout(mainRequest.getTimeout());
request.waitForActiveShards(mainRequest.getWaitForActiveShards());
sendBulkRequest(request, () -> notifyDone(thisBatchStartTime, asyncResponse, request.requests().size()));
sendBulkRequest(request, () -> notifyDone(thisBatchStartTimeNS, asyncResponse, request.requests().size()));
}

/**
Expand Down Expand Up @@ -418,14 +418,14 @@ void onBulkResponse(BulkResponse response, Runnable onSuccess) {
}
}

void notifyDone(TimeValue thisBatchStartTime, ScrollableHitSource.AsyncResponse asyncResponse, int batchSize) {
void notifyDone(long thisBatchStartTimeNS, ScrollableHitSource.AsyncResponse asyncResponse, int batchSize) {
if (task.isCancelled()) {
logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
finishHim(null);
return;
}
this.lastBatchSize = batchSize;
asyncResponse.done(worker.throttleWaitTime(thisBatchStartTime, timeValueNanos(System.nanoTime()), batchSize));
asyncResponse.done(worker.throttleWaitTime(thisBatchStartTimeNS, System.nanoTime(), batchSize));
}

private void recordFailure(Failure failure, List<Failure> failures) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@
import static org.apache.lucene.util.TestUtil.randomSimpleString;
import static org.elasticsearch.action.bulk.BackoffPolicy.constantBackoff;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -255,7 +254,7 @@ public void testScrollResponseSetsTotal() {

long total = randomIntBetween(0, Integer.MAX_VALUE);
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), total, emptyList(), null);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueSeconds(0), 0, response);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), 0, 0, response);
assertEquals(total, testTask.getStatus().getTotal());
}

Expand All @@ -268,7 +267,7 @@ public void testScrollResponseBatchingBehavior() throws Exception {
Hit hit = new ScrollableHitSource.BasicHit("index", "id", 0);
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 1, singletonList(hit), null);
DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction();
simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 0, response);
simulateScrollResponse(action, System.nanoTime(), 0, response);

// Use assert busy because the update happens on another thread
final int expectedBatches = batches;
Expand Down Expand Up @@ -354,7 +353,7 @@ public ScheduledCancellable schedule(Runnable command, TimeValue delay, String n
}
});
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 0, emptyList(), null);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 10, response);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), System.nanoTime(), 10, response);
ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
assertThat(e.getCause(), instanceOf(EsRejectedExecutionException.class));
assertThat(e.getCause(), hasToString(containsString("test")));
Expand All @@ -372,7 +371,7 @@ public void testShardFailuresAbortRequest() throws Exception {
SearchFailure shardFailure = new SearchFailure(new RuntimeException("test"));
ScrollableHitSource.Response scrollResponse = new ScrollableHitSource.Response(false, singletonList(shardFailure), 0,
emptyList(), null);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), System.nanoTime(), 0, scrollResponse);
BulkByScrollResponse response = listener.get();
assertThat(response.getBulkFailures(), empty());
assertThat(response.getSearchFailures(), contains(shardFailure));
Expand All @@ -386,7 +385,7 @@ public void testShardFailuresAbortRequest() throws Exception {
*/
public void testSearchTimeoutsAbortRequest() throws Exception {
ScrollableHitSource.Response scrollResponse = new ScrollableHitSource.Response(true, emptyList(), 0, emptyList(), null);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), System.nanoTime(), 0, scrollResponse);
BulkByScrollResponse response = listener.get();
assertThat(response.getBulkFailures(), empty());
assertThat(response.getSearchFailures(), empty());
Expand Down Expand Up @@ -423,7 +422,7 @@ protected AbstractAsyncBulkByScrollAction.RequestWrapper<?> buildRequest(Hit doc
ScrollableHitSource.BasicHit hit = new ScrollableHitSource.BasicHit("index", "id", 0);
hit.setSource(new BytesArray("{}"), XContentType.JSON);
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 1, singletonList(hit), null);
simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 0, response);
simulateScrollResponse(action, System.nanoTime(), 0, response);
ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
assertThat(e.getCause(), instanceOf(RuntimeException.class));
assertThat(e.getCause().getMessage(), equalTo("surprise"));
Expand Down Expand Up @@ -619,7 +618,7 @@ public void testCancelBeforeInitialSearch() throws Exception {
}

public void testCancelBeforeScrollResponse() throws Exception {
cancelTaskCase((DummyAsyncBulkByScrollAction action) -> simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 1,
cancelTaskCase((DummyAsyncBulkByScrollAction action) -> simulateScrollResponse(action, System.nanoTime(), 1,
new ScrollableHitSource.Response(false, emptyList(), between(1, 100000), emptyList(), null)));
}

Expand All @@ -634,7 +633,7 @@ public void testCancelBeforeOnBulkResponse() throws Exception {
}

public void testCancelBeforeStartNextScroll() throws Exception {
TimeValue now = timeValueNanos(System.nanoTime());
long now = System.nanoTime();
cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.notifyDone(now, null, 0));
}

Expand Down Expand Up @@ -683,7 +682,7 @@ public ScheduledCancellable schedule(Runnable command, TimeValue delay, String n
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), total, emptyList(), null);
// Use a long delay here so the test will time out if the cancellation doesn't reschedule the throttled task
worker.rethrottle(1);
simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 1000, response);
simulateScrollResponse(action, System.nanoTime(), 1000, response);

// Now that we've got our cancel we'll just verify that it all came through all right
assertEquals(reason, listener.get(10, TimeUnit.SECONDS).getReasonCancelled());
Expand Down Expand Up @@ -712,7 +711,7 @@ private void cancelTaskCase(Consumer<DummyAsyncBulkByScrollAction> testMe) throw
/**
* Simulate a scroll response by setting the scroll id and firing the onScrollResponse method.
*/
private void simulateScrollResponse(DummyAsyncBulkByScrollAction action, TimeValue lastBatchTime, int lastBatchSize,
private void simulateScrollResponse(DummyAsyncBulkByScrollAction action, long lastBatchTime, int lastBatchSize,
ScrollableHitSource.Response response) {
action.setScroll(scrollId());
action.onScrollResponse(lastBatchTime, lastBatchSize, new ScrollableHitSource.AsyncResponse() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,11 @@ TimeValue throttledUntil() {
* Schedule prepareBulkRequestRunnable to run after some delay. This is where throttling plugs into reindexing so the request can be
* rescheduled over and over again.
*/
public void delayPrepareBulkRequest(ThreadPool threadPool, TimeValue lastBatchStartTime, int lastBatchSize,
public void delayPrepareBulkRequest(ThreadPool threadPool, long lastBatchStartTimeNS, int lastBatchSize,
AbstractRunnable prepareBulkRequestRunnable) {
// Synchronize so we are less likely to schedule the same request twice.
synchronized (delayedPrepareBulkRequestReference) {
TimeValue delay = throttleWaitTime(lastBatchStartTime, timeValueNanos(System.nanoTime()), lastBatchSize);
TimeValue delay = throttleWaitTime(lastBatchStartTimeNS, System.nanoTime(), lastBatchSize);
logger.debug("[{}]: preparing bulk request for [{}]", task.getId(), delay);
try {
delayedPrepareBulkRequestReference.set(new DelayedPrepareBulkRequest(threadPool, getRequestsPerSecond(),
Expand All @@ -197,8 +197,8 @@ public void delayPrepareBulkRequest(ThreadPool threadPool, TimeValue lastBatchSt
}
}

public TimeValue throttleWaitTime(TimeValue lastBatchStartTime, TimeValue now, int lastBatchSize) {
long earliestNextBatchStartTime = now.nanos() + (long) perfectlyThrottledBatchTime(lastBatchSize);
public TimeValue throttleWaitTime(long lastBatchStartTimeNS, long nowNS, int lastBatchSize) {
long earliestNextBatchStartTime = nowNS + (long) perfectlyThrottledBatchTime(lastBatchSize);
long waitTime = min(MAX_THROTTLE_WAIT_TIME.nanos(), max(0, earliestNextBatchStartTime - System.nanoTime()));
return timeValueNanos(waitTime);
}
Expand Down

0 comments on commit d3ad55c

Please sign in to comment.