Skip to content

Commit

Permalink
fix: refresh the rollup index as part of the rollup indexer actions (#…
Browse files Browse the repository at this point in the history
…86992)

Instead of waiting in the yaml test we refresh once the
indexing operation is complete in the callback. This way
we avoid possible timeout issues which make the test
unstable.

Resolves #81983
Resolves #53412
  • Loading branch information
salvatore-campagna committed Jul 6, 2022
1 parent d1bb8f4 commit 6aca35c
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
Expand Down Expand Up @@ -161,8 +163,27 @@ protected void doSaveState(IndexerState indexerState, Map<String, Object> positi

@Override
protected void onFinish(ActionListener<Void> listener) {
logger.debug("Finished indexing for job [" + job.getConfig().getId() + "]");
listener.onResponse(null);
final RollupJobConfig jobConfig = job.getConfig();
final ActionListener<RefreshResponse> refreshResponseActionListener = new ActionListener<>() {

@Override
public void onResponse(RefreshResponse refreshResponse) {
logger.trace("refreshing rollup index {} successful for job {}", jobConfig.getRollupIndex(), jobConfig.getId());
listener.onResponse(null);
}

@Override
public void onFailure(Exception e) {
logger.warn(
"refreshing rollup index {} failed for job {} with exception {}",
jobConfig.getRollupIndex(),
jobConfig.getId(),
e
);
listener.onResponse(null);
}
};
client.admin().indices().refresh(new RefreshRequest(jobConfig.getRollupIndex()), refreshResponseActionListener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ private void executeTestCase(
String dateHistoField = config.getGroupConfig().getDateHistogram().getField();
final ThreadPool threadPool = new TestThreadPool(getTestName());

try {
try (dir; reader) {
RollupJob job = new RollupJob(config, Collections.emptyMap());
final SyncRollupIndexer action = new SyncRollupIndexer(
threadPool,
Expand All @@ -718,8 +718,6 @@ private void executeTestCase(
rollupConsumer.accept(action.triggerAndWaitForCompletion(now));
} finally {
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
reader.close();
dir.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,6 @@ public void testAbortAfterCompletion() throws Exception {

// Don't use the indexer's latch because we completely change doNextSearch()
final CountDownLatch doNextSearchLatch = new CountDownLatch(1);

try {
DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(threadPool, job, state, null) {
@Override
Expand Down Expand Up @@ -840,7 +839,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

final ThreadPool threadPool = new TestThreadPool(getTestName());
try {

NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(
threadPool,
job,
Expand Down Expand Up @@ -897,7 +895,6 @@ public void testSearchShardFailure() throws Exception {

final ThreadPool threadPool = new TestThreadPool(getTestName());
try {

NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(
threadPool,
job,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.internal.Client;
Expand Down Expand Up @@ -49,6 +52,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -584,6 +588,11 @@ public void testTriggerWithoutHeaders() throws Exception {
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
Client client = mock(Client.class);
doAnswer(invocationOnMock -> {
RefreshResponse r = new RefreshResponse(2, 2, 0, Collections.emptyList());
((ActionListener) invocationOnMock.getArguments()[2]).onResponse(r);
return null;
}).when(client).execute(eq(RefreshAction.INSTANCE), any(), any());
when(client.settings()).thenReturn(Settings.EMPTY);

AtomicBoolean started = new AtomicBoolean(false);
Expand All @@ -609,7 +618,7 @@ public void testTriggerWithoutHeaders() throws Exception {

((ActionListener) invocationOnMock.getArguments()[2]).onResponse(r);
return null;
}).when(client).execute(any(), any(), any());
}).when(client).execute(eq(SearchAction.INSTANCE), any(), any());

SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
TaskId taskId = new TaskId("node", 123);
Expand Down Expand Up @@ -686,7 +695,11 @@ public void testTriggerWithHeaders() throws Exception {
headers.put("_xpack_security_authentication", "bar");
RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), headers);
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
doAnswer(invocationOnMock -> {
RefreshResponse r = new RefreshResponse(2, 2, 0, Collections.emptyList());
((ActionListener) invocationOnMock.getArguments()[2]).onResponse(r);
return null;
}).when(client).execute(eq(RefreshAction.INSTANCE), any(), any());

AtomicBoolean started = new AtomicBoolean(false);
AtomicBoolean finished = new AtomicBoolean(false);
Expand Down Expand Up @@ -714,7 +727,7 @@ public void testTriggerWithHeaders() throws Exception {

((ActionListener) invocationOnMock.getArguments()[2]).onResponse(r);
return null;
}).when(client).execute(any(), any(), any());
}).when(client).execute(eq(SearchAction.INSTANCE), any(), any());

SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
TaskId taskId = new TaskId("node", 123);
Expand Down Expand Up @@ -780,7 +793,7 @@ public void onFailure(Exception e) {
latch.countDown();

// Wait for the final persistent status to finish
assertBusy(() -> assertTrue(finished.get()));
assertBusy(() -> assertTrue(finished.get()), 30, TimeUnit.SECONDS);
}

@SuppressWarnings("unchecked")
Expand All @@ -791,6 +804,11 @@ public void testSaveStateChangesIDScheme() throws Exception {
headers.put("_xpack_security_authentication", "bar");
RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), headers);
Client client = mock(Client.class);
doAnswer(invocationOnMock -> {
RefreshResponse r = new RefreshResponse(2, 2, 0, Collections.emptyList());
((ActionListener) invocationOnMock.getArguments()[2]).onResponse(r);
return null;
}).when(client).execute(eq(RefreshAction.INSTANCE), any(), any());
when(client.settings()).thenReturn(Settings.EMPTY);

AtomicBoolean started = new AtomicBoolean(false);
Expand Down Expand Up @@ -819,7 +837,7 @@ public void testSaveStateChangesIDScheme() throws Exception {

((ActionListener) invocationOnMock.getArguments()[2]).onResponse(r);
return null;
}).when(client).execute(any(), any(), any());
}).when(client).execute(eq(SearchAction.INSTANCE), any(), any());

SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, null);
Expand Down

0 comments on commit 6aca35c

Please sign in to comment.