Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: refresh the rollup index as part of the rollup indexer actions #86992

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
Expand Down Expand Up @@ -161,8 +163,27 @@ protected void doSaveState(IndexerState indexerState, Map<String, Object> positi

@Override
protected void onFinish(ActionListener<Void> listener) {
logger.debug("Finished indexing for job [" + job.getConfig().getId() + "]");
listener.onResponse(null);
final RollupJobConfig jobConfig = job.getConfig();
final ActionListener<RefreshResponse> refreshResponseActionListener = new ActionListener<>() {

@Override
public void onResponse(RefreshResponse refreshResponse) {
logger.trace("refreshing rollup index {} successful for job {}", jobConfig.getRollupIndex(), jobConfig.getId());
listener.onResponse(null);
}

@Override
public void onFailure(Exception e) {
logger.warn(
"refreshing rollup index {} failed for job {} with exception {}",
jobConfig.getRollupIndex(),
jobConfig.getId(),
e
);
listener.onResponse(null);
}
};
client.admin().indices().refresh(new RefreshRequest(jobConfig.getRollupIndex()), refreshResponseActionListener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ private void executeTestCase(
String dateHistoField = config.getGroupConfig().getDateHistogram().getField();
final ThreadPool threadPool = new TestThreadPool(getTestName());

try {
try (dir; reader) {
RollupJob job = new RollupJob(config, Collections.emptyMap());
final SyncRollupIndexer action = new SyncRollupIndexer(
threadPool,
Expand All @@ -718,8 +718,6 @@ private void executeTestCase(
rollupConsumer.accept(action.triggerAndWaitForCompletion(now));
} finally {
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
reader.close();
dir.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,6 @@ public void testAbortAfterCompletion() throws Exception {

// Don't use the indexer's latch because we completely change doNextSearch()
final CountDownLatch doNextSearchLatch = new CountDownLatch(1);

try {
DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(threadPool, job, state, null) {
@Override
Expand Down Expand Up @@ -840,7 +839,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

final ThreadPool threadPool = new TestThreadPool(getTestName());
try {

NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(
threadPool,
job,
Expand Down Expand Up @@ -897,7 +895,6 @@ public void testSearchShardFailure() throws Exception {

final ThreadPool threadPool = new TestThreadPool(getTestName());
try {

NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(
threadPool,
job,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.internal.Client;
Expand Down Expand Up @@ -49,6 +52,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -584,6 +588,11 @@ public void testTriggerWithoutHeaders() throws Exception {
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
Client client = mock(Client.class);
doAnswer(invocationOnMock -> {
RefreshResponse r = new RefreshResponse(2, 2, 0, Collections.emptyList());
((ActionListener) invocationOnMock.getArguments()[2]).onResponse(r);
return null;
}).when(client).execute(eq(RefreshAction.INSTANCE), any(), any());
when(client.settings()).thenReturn(Settings.EMPTY);

AtomicBoolean started = new AtomicBoolean(false);
Expand All @@ -609,7 +618,7 @@ public void testTriggerWithoutHeaders() throws Exception {

((ActionListener) invocationOnMock.getArguments()[2]).onResponse(r);
return null;
}).when(client).execute(any(), any(), any());
}).when(client).execute(eq(SearchAction.INSTANCE), any(), any());

SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
TaskId taskId = new TaskId("node", 123);
Expand Down Expand Up @@ -686,7 +695,11 @@ public void testTriggerWithHeaders() throws Exception {
headers.put("_xpack_security_authentication", "bar");
RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), headers);
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
doAnswer(invocationOnMock -> {
RefreshResponse r = new RefreshResponse(2, 2, 0, Collections.emptyList());
((ActionListener) invocationOnMock.getArguments()[2]).onResponse(r);
return null;
}).when(client).execute(eq(RefreshAction.INSTANCE), any(), any());

AtomicBoolean started = new AtomicBoolean(false);
AtomicBoolean finished = new AtomicBoolean(false);
Expand Down Expand Up @@ -714,7 +727,7 @@ public void testTriggerWithHeaders() throws Exception {

((ActionListener) invocationOnMock.getArguments()[2]).onResponse(r);
return null;
}).when(client).execute(any(), any(), any());
}).when(client).execute(eq(SearchAction.INSTANCE), any(), any());

SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
TaskId taskId = new TaskId("node", 123);
Expand Down Expand Up @@ -780,7 +793,7 @@ public void onFailure(Exception e) {
latch.countDown();

// Wait for the final persistent status to finish
assertBusy(() -> assertTrue(finished.get()));
assertBusy(() -> assertTrue(finished.get()), 30, TimeUnit.SECONDS);
}

@SuppressWarnings("unchecked")
Expand All @@ -791,6 +804,11 @@ public void testSaveStateChangesIDScheme() throws Exception {
headers.put("_xpack_security_authentication", "bar");
RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), headers);
Client client = mock(Client.class);
doAnswer(invocationOnMock -> {
RefreshResponse r = new RefreshResponse(2, 2, 0, Collections.emptyList());
((ActionListener) invocationOnMock.getArguments()[2]).onResponse(r);
return null;
}).when(client).execute(eq(RefreshAction.INSTANCE), any(), any());
when(client.settings()).thenReturn(Settings.EMPTY);

AtomicBoolean started = new AtomicBoolean(false);
Expand Down Expand Up @@ -819,7 +837,7 @@ public void testSaveStateChangesIDScheme() throws Exception {

((ActionListener) invocationOnMock.getArguments()[2]).onResponse(r);
return null;
}).when(client).execute(any(), any(), any());
}).when(client).execute(eq(SearchAction.INSTANCE), any(), any());

SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, null);
Expand Down