From 08c01f1dae9732859ee69f12b4612c6df05c4dde Mon Sep 17 00:00:00 2001 From: Abhishek Radhakrishnan Date: Sun, 14 Jan 2024 20:39:01 -0800 Subject: [PATCH] Handle and map errors in delete pending segments API (#15673) Changes: - Handle exception in deletePendingSegments API and map to correct HTTP status code - Clean up exception message using `DruidException` - Add unit tests --- .../IndexerMetadataStorageAdapter.java | 39 +++-- .../druid/indexing/overlord/TaskMaster.java | 2 +- .../overlord/http/OverlordResource.java | 21 ++- .../IndexerMetadataStorageAdapterTest.java | 75 ++++++++-- .../overlord/http/OverlordResourceTest.java | 136 +++++++++++++++++- 5 files changed, 238 insertions(+), 35 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapter.java index 4671c5bb7a96..6d23fcec327e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapter.java @@ -19,11 +19,12 @@ package org.apache.druid.indexing.overlord; -import com.google.common.base.Preconditions; import com.google.inject.Inject; +import org.apache.druid.error.InvalidInput; import org.apache.druid.indexer.TaskInfo; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.task.Task; import org.apache.druid.java.util.common.DateTimes; -import org.joda.time.DateTime; import org.joda.time.Interval; import java.util.Comparator; @@ -46,23 +47,31 @@ public IndexerMetadataStorageAdapter( public int deletePendingSegments(String dataSource, Interval deleteInterval) { - // Check the given interval overlaps the interval(minCreatedDateOfActiveTasks, MAX) - final Optional minCreatedDateOfActiveTasks = taskStorageQueryAdapter + // Find the earliest active task created for the specified datasource; if one exists, + // check if its interval overlaps with the delete interval. + final Optional> earliestActiveTaskOptional = taskStorageQueryAdapter .getActiveTaskInfo(dataSource) .stream() - .map(TaskInfo::getCreatedTime) - .min(Comparator.naturalOrder()); + .min(Comparator.comparing(TaskInfo::getCreatedTime)); - final Interval activeTaskInterval = new Interval( - minCreatedDateOfActiveTasks.orElse(DateTimes.MAX), - DateTimes.MAX - ); + if (earliestActiveTaskOptional.isPresent()) { + final TaskInfo earliestActiveTask = earliestActiveTaskOptional.get(); + final Interval activeTaskInterval = new Interval( + earliestActiveTask.getCreatedTime(), + DateTimes.MAX + ); - Preconditions.checkArgument( - !deleteInterval.overlaps(activeTaskInterval), - "Cannot delete pendingSegments because there is at least one active task created at %s", - activeTaskInterval.getStart() - ); + if (deleteInterval.overlaps(activeTaskInterval)) { + throw InvalidInput.exception( + "Cannot delete pendingSegments for datasource[%s] as there is at least one active task[%s] created at[%s] " + + "that overlaps with the delete interval[%s]. Please retry when there are no active tasks.", + dataSource, + earliestActiveTask.getId(), + activeTaskInterval.getStart(), + deleteInterval + ); + } + } return indexerMetadataStorageCoordinator.deletePendingSegmentsCreatedInInterval(dataSource, deleteInterval); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java index e4d26b37573a..4798513aafd4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java @@ -238,7 +238,7 @@ public void stop() } /** - * Returns true if it's the leader and its all services have been properly initialized. + * Returns true if it's the leader and all its services have been initialized. */ public boolean isLeader() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index 7afaaa905264..b589dc5ffd81 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -939,10 +939,25 @@ public Response killPendingSegments( } if (taskMaster.isLeader()) { - final int numDeleted = indexerMetadataStorageAdapter.deletePendingSegments(dataSource, deleteInterval); - return Response.ok().entity(ImmutableMap.of("numDeleted", numDeleted)).build(); + try { + final int numDeleted = indexerMetadataStorageAdapter.deletePendingSegments(dataSource, deleteInterval); + return Response.ok().entity(ImmutableMap.of("numDeleted", numDeleted)).build(); + } + catch (DruidException e) { + return Response.status(e.getStatusCode()) + .entity(ImmutableMap.of("error", e.getMessage())) + .build(); + } + catch (Exception e) { + log.warn(e, "Failed to delete pending segments for datasource[%s] and interval[%s].", dataSource, deleteInterval); + return Response.status(Status.INTERNAL_SERVER_ERROR) + .entity(ImmutableMap.of("error", e.getMessage())) + .build(); + } } else { - return Response.status(Status.SERVICE_UNAVAILABLE).build(); + return Response.status(Status.SERVICE_UNAVAILABLE) + .entity(ImmutableMap.of("error", "overlord is not the leader or not initialized yet")) + .build(); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java index 196056341176..633d861410d8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java @@ -20,6 +20,8 @@ package org.apache.druid.indexing.overlord; import com.google.common.collect.ImmutableList; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.task.NoopTask; @@ -27,21 +29,16 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.easymock.EasyMock; -import org.hamcrest.CoreMatchers; +import org.hamcrest.MatcherAssert; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import java.util.List; public class IndexerMetadataStorageAdapterTest { - @Rule - public ExpectedException expectedException = ExpectedException.none(); - private TaskStorageQueryAdapter taskStorageQueryAdapter; private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; private IndexerMetadataStorageAdapter indexerMetadataStorageAdapter; @@ -69,7 +66,7 @@ public void testDeletePendingSegments() NoopTask.create() ), new TaskInfo<>( - "id1", + "id2", DateTimes.of("2017-12-02"), TaskStatus.running("id2"), "dataSource", @@ -93,7 +90,7 @@ public void testDeletePendingSegments() } @Test - public void testDeletePendingSegmentsOfRunningTasks() + public void testDeletePendingSegmentsOfOneOverlappingRunningTask() { final ImmutableList> taskInfos = ImmutableList.of( new TaskInfo<>( @@ -104,7 +101,7 @@ public void testDeletePendingSegmentsOfRunningTasks() NoopTask.create() ), new TaskInfo<>( - "id1", + "id2", DateTimes.of("2017-12-02"), TaskStatus.running("id2"), "dataSource", @@ -125,8 +122,62 @@ public void testDeletePendingSegmentsOfRunningTasks() .andReturn(10); EasyMock.replay(taskStorageQueryAdapter, indexerMetadataStorageCoordinator); - expectedException.expect(CoreMatchers.instanceOf(IllegalArgumentException.class)); - expectedException.expectMessage("Cannot delete pendingSegments because there is at least one active task created"); - indexerMetadataStorageAdapter.deletePendingSegments("dataSource", deleteInterval); + MatcherAssert.assertThat( + Assert.assertThrows( + DruidException.class, + () -> indexerMetadataStorageAdapter.deletePendingSegments("dataSource", deleteInterval) + ), + DruidExceptionMatcher.invalidInput().expectMessageIs( + "Cannot delete pendingSegments for datasource[dataSource] as there is at least one active task[id1]" + + " created at[2017-11-01T00:00:00.000Z] that overlaps with the delete " + + "interval[2017-01-01T00:00:00.000Z/2017-12-01T00:00:00.000Z]. Please retry when there are no active tasks." + ) + ); + } + + @Test + public void testDeletePendingSegmentsOfMultipleOverlappingRunningTasks() + { + final ImmutableList> taskInfos = ImmutableList.of( + new TaskInfo<>( + "id1", + DateTimes.of("2017-12-01"), + TaskStatus.running("id1"), + "dataSource", + NoopTask.create() + ), + new TaskInfo<>( + "id2", + DateTimes.of("2017-11-01"), + TaskStatus.running("id2"), + "dataSource", + NoopTask.create() + ) + ); + + EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("dataSource")).andReturn(taskInfos); + + final Interval deleteInterval = Intervals.of("2017-01-01/2018-12-01"); + EasyMock + .expect( + indexerMetadataStorageCoordinator.deletePendingSegmentsCreatedInInterval( + EasyMock.anyString(), + EasyMock.eq(deleteInterval) + ) + ) + .andReturn(10); + EasyMock.replay(taskStorageQueryAdapter, indexerMetadataStorageCoordinator); + + MatcherAssert.assertThat( + Assert.assertThrows( + DruidException.class, + () -> indexerMetadataStorageAdapter.deletePendingSegments("dataSource", deleteInterval) + ), + DruidExceptionMatcher.invalidInput().expectMessageIs( + "Cannot delete pendingSegments for datasource[dataSource] as there is at least one active task[id2]" + + " created at[2017-11-01T00:00:00.000Z] that overlaps with the delete" + + " interval[2017-01-01T00:00:00.000Z/2018-12-01T00:00:00.000Z]. Please retry when there are no active tasks." + ) + ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 561e8dd9b199..cdaf5b9b359a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -28,6 +28,8 @@ import org.apache.druid.audit.AuditEntry; import org.apache.druid.audit.AuditManager; import org.apache.druid.common.config.JacksonConfigManager; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskLocation; @@ -983,10 +985,136 @@ public void testKillPendingSegments() authConfig ); - final Map response = (Map) overlordResource - .killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req) - .getEntity(); - Assert.assertEquals(2, response.get("numDeleted").intValue()); + Response response = overlordResource + .killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("numDeleted", 2), response.getEntity()); + } + + @Test + public void testKillPendingSegmentsThrowsInvalidInputDruidException() + { + expectAuthorizationTokenCheck(); + + EasyMock.expect(taskMaster.isLeader()).andReturn(true); + final String exceptionMsg = "Some exception msg"; + EasyMock + .expect( + indexerMetadataStorageAdapter.deletePendingSegments( + EasyMock.eq("allow"), + EasyMock.anyObject(Interval.class) + ) + ) + .andThrow(InvalidInput.exception(exceptionMsg)) + .once(); + + EasyMock.replay( + taskRunner, + taskMaster, + taskStorageQueryAdapter, + indexerMetadataStorageAdapter, + req, + workerTaskRunnerQueryAdapter, + authConfig + ); + + Response response = overlordResource + .killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req); + + Assert.assertEquals(400, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("error", exceptionMsg), response.getEntity()); + } + + @Test + public void testKillPendingSegmentsThrowsDefensiveDruidException() + { + expectAuthorizationTokenCheck(); + + EasyMock.expect(taskMaster.isLeader()).andReturn(true); + final String exceptionMsg = "An internal defensive exception"; + EasyMock + .expect( + indexerMetadataStorageAdapter.deletePendingSegments( + EasyMock.eq("allow"), + EasyMock.anyObject(Interval.class) + ) + ) + .andThrow(DruidException.defensive(exceptionMsg)) + .once(); + + EasyMock.replay( + taskRunner, + taskMaster, + taskStorageQueryAdapter, + indexerMetadataStorageAdapter, + req, + workerTaskRunnerQueryAdapter, + authConfig + ); + + Response response = overlordResource + .killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req); + + Assert.assertEquals(500, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("error", exceptionMsg), response.getEntity()); + } + + @Test + public void testKillPendingSegmentsThrowsArbitraryException() + { + expectAuthorizationTokenCheck(); + + EasyMock.expect(taskMaster.isLeader()).andReturn(true); + final String exceptionMsg = "An unexpected illegal state exception"; + EasyMock + .expect( + indexerMetadataStorageAdapter.deletePendingSegments( + EasyMock.eq("allow"), + EasyMock.anyObject(Interval.class) + ) + ) + .andThrow(new IllegalStateException(exceptionMsg)) + .once(); + + EasyMock.replay( + taskRunner, + taskMaster, + taskStorageQueryAdapter, + indexerMetadataStorageAdapter, + req, + workerTaskRunnerQueryAdapter, + authConfig + ); + + Response response = overlordResource + .killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req); + + Assert.assertEquals(500, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("error", exceptionMsg), response.getEntity()); + } + + @Test + public void testKillPendingSegmentsToNonLeader() + { + expectAuthorizationTokenCheck(); + + EasyMock.expect(taskMaster.isLeader()).andReturn(false); + + EasyMock.replay( + taskRunner, + taskMaster, + taskStorageQueryAdapter, + indexerMetadataStorageAdapter, + req, + workerTaskRunnerQueryAdapter, + authConfig + ); + + Response response = overlordResource + .killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req); + + Assert.assertEquals(503, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("error", "overlord is not the leader or not initialized yet"), response.getEntity()); } @Test