diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/DataPrepper.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/DataPrepper.java index 19c0822ce9..b2dd7a5541 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/DataPrepper.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/DataPrepper.java @@ -97,17 +97,21 @@ public void shutdown() { shutdownServers(); } + private void shutdownPipelines() { + shutdownPipelines(DataPrepperShutdownOptions.defaultOptions()); + } + /** * Triggers the shutdown of all configured valid pipelines. */ - public void shutdownPipelines() { + public void shutdownPipelines(final DataPrepperShutdownOptions shutdownOptions) { transformationPipelines.forEach((name, pipeline) -> { pipeline.removeShutdownObserver(pipelinesObserver); }); for (final Pipeline pipeline : transformationPipelines.values()) { LOG.info("Shutting down pipeline: {}", pipeline.getName()); - pipeline.shutdown(); + pipeline.shutdown(shutdownOptions); } } @@ -127,11 +131,12 @@ public void shutdownServers() { * * @param pipeline name of the pipeline */ - public void shutdownPipelines(final String pipeline) { + public void shutdownPipeline(final String pipeline) { if (transformationPipelines.containsKey(pipeline)) { transformationPipelines.get(pipeline).shutdown(); } } + public PluginFactory getPluginFactory() { return pluginFactory; } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/DataPrepperShutdownOptions.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/DataPrepperShutdownOptions.java new file mode 100644 index 0000000000..ea3edbf4f5 --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/DataPrepperShutdownOptions.java @@ -0,0 +1,62 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper; + +import java.time.Duration; + +public class DataPrepperShutdownOptions { + private final Duration bufferReadTimeout; + private final Duration bufferDrainTimeout; + + public static DataPrepperShutdownOptions defaultOptions() { + return new DataPrepperShutdownOptions(builder()); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private Duration bufferReadTimeout; + private Duration bufferDrainTimeout; + + private Builder() { + } + + public Builder withBufferReadTimeout(final Duration bufferReadTimeout) { + this.bufferReadTimeout = bufferReadTimeout; + return this; + } + + public Builder withBufferDrainTimeout(final Duration bufferDrainTimeout) { + this.bufferDrainTimeout = bufferDrainTimeout; + return this; + } + + public DataPrepperShutdownOptions build() { + return new DataPrepperShutdownOptions(this); + } + } + + private DataPrepperShutdownOptions(final Builder builder) { + this.bufferReadTimeout = builder.bufferReadTimeout; + this.bufferDrainTimeout = builder.bufferDrainTimeout; + + if(bufferReadTimeout != null && bufferDrainTimeout != null) { + if (bufferReadTimeout.compareTo(bufferDrainTimeout) > 0) { + throw new IllegalArgumentException("Buffer read timeout cannot be greater than buffer drain timeout"); + } + } + } + + public Duration getBufferReadTimeout() { + return bufferReadTimeout; + } + + public Duration getBufferDrainTimeout() { + return bufferDrainTimeout; + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java index 29bb69db46..de22876041 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.pipeline; import com.google.common.base.Preconditions; +import org.opensearch.dataprepper.DataPrepperShutdownOptions; import org.opensearch.dataprepper.acknowledgements.InactiveAcknowledgementSetManager; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; @@ -41,7 +42,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; @@ -55,7 +55,7 @@ public class Pipeline { private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class); private static final int SINK_LOGGING_FREQUENCY = (int) Duration.ofSeconds(60).toMillis(); - private volatile AtomicBoolean stopRequested; + private final PipelineShutdown pipelineShutdown; private final String name; private final Source source; @@ -137,7 +137,7 @@ public Pipeline( this.sinkExecutorService = PipelineThreadPoolExecutor.newFixedThreadPool(processorThreads, new PipelineThreadFactory(format("%s-sink-worker", name)), this); - stopRequested = new AtomicBoolean(false); + this.pipelineShutdown = new PipelineShutdown(buffer); } AcknowledgementSetManager getAcknowledgementSetManager() { @@ -176,7 +176,11 @@ public Collection getSinks() { } public boolean isStopRequested() { - return stopRequested.get(); + return pipelineShutdown.isStopRequested(); + } + + public boolean isForceStopReadingBuffers() { + return pipelineShutdown.isForceStopReadingBuffers(); } public Duration getPeerForwarderDrainTimeout() { @@ -267,6 +271,10 @@ public void execute() { } } + public synchronized void shutdown() { + shutdown(DataPrepperShutdownOptions.defaultOptions()); + } + /** * Initiates shutdown of the pipeline by: * 1. Stopping the source to prevent new items from being consumed @@ -276,19 +284,20 @@ public void execute() { * 5. Shutting down processors and sinks * 6. Stopping the sink ExecutorService */ - public synchronized void shutdown() { + public synchronized void shutdown(final DataPrepperShutdownOptions dataPrepperShutdownOptions) { LOG.info("Pipeline [{}] - Received shutdown signal with buffer drain timeout {}, processor shutdown timeout {}, " + "and sink shutdown timeout {}. Initiating the shutdown process", name, buffer.getDrainTimeout(), processorShutdownTimeout, sinkShutdownTimeout); try { source.stop(); - stopRequested.set(true); } catch (Exception ex) { LOG.error("Pipeline [{}] - Encountered exception while stopping the source, " + "proceeding with termination of process workers", name, ex); } - shutdownExecutorService(processorExecutorService, buffer.getDrainTimeout().toMillis() + processorShutdownTimeout.toMillis(), "processor"); + pipelineShutdown.shutdown(dataPrepperShutdownOptions); + + shutdownExecutorService(processorExecutorService, pipelineShutdown.getBufferDrainTimeout().plus(processorShutdownTimeout), "processor"); processorSets.forEach(processorSet -> processorSet.forEach(Processor::shutdown)); buffer.shutdown(); @@ -297,7 +306,7 @@ public synchronized void shutdown() { .map(DataFlowComponent::getComponent) .forEach(Sink::shutdown); - shutdownExecutorService(sinkExecutorService, sinkShutdownTimeout.toMillis(), "sink"); + shutdownExecutorService(sinkExecutorService, sinkShutdownTimeout, "sink"); LOG.info("Pipeline [{}] - Pipeline fully shutdown.", name); @@ -312,13 +321,13 @@ public void removeShutdownObserver(final PipelineObserver pipelineObserver) { observers.remove(pipelineObserver); } - private void shutdownExecutorService(final ExecutorService executorService, final long timeoutForTerminationInMillis, final String workerName) { + private void shutdownExecutorService(final ExecutorService executorService, final Duration timeoutForTermination, final String workerName) { LOG.info("Pipeline [{}] - Shutting down {} process workers.", name, workerName); executorService.shutdown(); try { - if (!executorService.awaitTermination(timeoutForTerminationInMillis, TimeUnit.MILLISECONDS)) { - LOG.warn("Pipeline [{}] - Workers did not terminate in time, forcing termination of {} workers.", name, workerName); + if (!executorService.awaitTermination(timeoutForTermination.toMillis(), TimeUnit.MILLISECONDS)) { + LOG.warn("Pipeline [{}] - Workers did not terminate in {}, forcing termination of {} workers.", name, timeoutForTermination, workerName); executorService.shutdownNow(); } } catch (InterruptedException ex) { diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/PipelineShutdown.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/PipelineShutdown.java new file mode 100644 index 0000000000..f3731e9d67 --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/PipelineShutdown.java @@ -0,0 +1,70 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.pipeline; + +import org.opensearch.dataprepper.DataPrepperShutdownOptions; +import org.opensearch.dataprepper.model.buffer.Buffer; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; + +class PipelineShutdown { + private final AtomicBoolean stopRequested = new AtomicBoolean(false); + private final Duration bufferDrainTimeout; + private final Clock clock; + private Instant shutdownRequestedAt; + private Instant forceStopReadingBuffersAt; + private Duration bufferDrainTimeoutOverride; + + PipelineShutdown(final Buffer buffer) { + this(buffer, Clock.systemDefaultZone()); + } + + PipelineShutdown(final Buffer buffer, final Clock clock) { + bufferDrainTimeout = Objects.requireNonNull(buffer.getDrainTimeout()); + this.clock = clock; + } + + public void shutdown(final DataPrepperShutdownOptions dataPrepperShutdownOptions) { + final boolean stopPreviouslyRequested = stopRequested.get(); + if(stopPreviouslyRequested) { + return; + } + + stopRequested.set(true); + shutdownRequestedAt = now(); + + final Duration bufferReadTimeout = dataPrepperShutdownOptions.getBufferReadTimeout(); + if(bufferReadTimeout != null) { + forceStopReadingBuffersAt = shutdownRequestedAt.plus(bufferReadTimeout); + } + + final Duration bufferDrainTimeoutOverride = dataPrepperShutdownOptions.getBufferDrainTimeout(); + if(bufferDrainTimeoutOverride != null) { + this.bufferDrainTimeoutOverride = bufferDrainTimeoutOverride; + } + } + + boolean isStopRequested() { + return stopRequested.get(); + } + + boolean isForceStopReadingBuffers() { + return forceStopReadingBuffersAt != null && now().isAfter(forceStopReadingBuffersAt); + } + + public Duration getBufferDrainTimeout() { + return bufferDrainTimeoutOverride != null ? + bufferDrainTimeoutOverride : bufferDrainTimeout; + } + + private Instant now() { + return Instant.ofEpochMilli(clock.millis()); + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java index b5538dfe73..8117848f9a 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java @@ -60,37 +60,41 @@ public void run() { while (!pipeline.isStopRequested()) { doRun(); } - LOG.info("Processor shutdown phase 1 complete."); + executeShutdownProcess(); + } catch (final Exception e) { + LOG.error("Encountered exception during pipeline {} processing", pipeline.getName(), e); + } + } - // Phase 2 - execute until buffers are empty - LOG.info("Beginning processor shutdown phase 2, iterating until buffers empty."); - while (!readBuffer.isEmpty()) { - doRun(); - } - LOG.info("Processor shutdown phase 2 complete."); + private void executeShutdownProcess() { + LOG.info("Processor shutdown phase 1 complete."); - // Phase 3 - execute until peer forwarder drain period expires (best effort to process all peer forwarder data) - final long drainTimeoutExpiration = System.currentTimeMillis() + pipeline.getPeerForwarderDrainTimeout().toMillis(); - LOG.info("Beginning processor shutdown phase 3, iterating until {}.", drainTimeoutExpiration); - while (System.currentTimeMillis() < drainTimeoutExpiration) { - doRun(); - } - LOG.info("Processor shutdown phase 3 complete."); + // Phase 2 - execute until buffers are empty + LOG.info("Beginning processor shutdown phase 2, iterating until buffers empty."); + while (!isBufferReadyForShutdown()) { + doRun(); + } + LOG.info("Processor shutdown phase 2 complete."); - // Phase 4 - prepare processors for shutdown - LOG.info("Beginning processor shutdown phase 4, preparing processors for shutdown."); - processors.forEach(Processor::prepareForShutdown); - LOG.info("Processor shutdown phase 4 complete."); + // Phase 3 - execute until peer forwarder drain period expires (best effort to process all peer forwarder data) + final long drainTimeoutExpiration = System.currentTimeMillis() + pipeline.getPeerForwarderDrainTimeout().toMillis(); + LOG.info("Beginning processor shutdown phase 3, iterating until {}.", drainTimeoutExpiration); + while (System.currentTimeMillis() < drainTimeoutExpiration) { + doRun(); + } + LOG.info("Processor shutdown phase 3 complete."); - // Phase 5 - execute until processors are ready to shutdown - LOG.info("Beginning processor shutdown phase 5, iterating until processors are ready to shutdown."); - while (!areComponentsReadyForShutdown()) { - doRun(); - } - LOG.info("Processor shutdown phase 5 complete."); - } catch (final Exception e) { - LOG.error("Encountered exception during pipeline {} processing", pipeline.getName(), e); + // Phase 4 - prepare processors for shutdown + LOG.info("Beginning processor shutdown phase 4, preparing processors for shutdown."); + processors.forEach(Processor::prepareForShutdown); + LOG.info("Processor shutdown phase 4 complete."); + + // Phase 5 - execute until processors are ready to shutdown + LOG.info("Beginning processor shutdown phase 5, iterating until processors are ready to shutdown."); + while (!areComponentsReadyForShutdown()) { + doRun(); } + LOG.info("Processor shutdown phase 5 complete."); } private void processAcknowledgements(List inputEvents, Collection> outputRecords) { @@ -153,11 +157,19 @@ private void doRun() { } private boolean areComponentsReadyForShutdown() { - return readBuffer.isEmpty() && processors.stream() + return isBufferReadyForShutdown() && processors.stream() .map(Processor::isReadyForShutdown) .allMatch(result -> result == true); } + private boolean isBufferReadyForShutdown() { + final boolean isBufferEmpty = readBuffer.isEmpty(); + final boolean forceStopReadingBuffers = pipeline.isForceStopReadingBuffers(); + final boolean isBufferReadyForShutdown = isBufferEmpty || forceStopReadingBuffers; + LOG.debug("isBufferReadyForShutdown={}, isBufferEmpty={}, forceStopReadingBuffers={}", isBufferReadyForShutdown, isBufferEmpty, forceStopReadingBuffers); + return isBufferReadyForShutdown; + } + /** * TODO Add isolator pattern - Fail if one of the Sink fails [isolator Pattern] * Uses the pipeline method to publish to sinks, waits for each of the sink result to be true before attempting to diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/server/ShutdownHandler.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/server/ShutdownHandler.java index 08449e0b21..e3da8fc51d 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/server/ShutdownHandler.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/server/ShutdownHandler.java @@ -7,16 +7,23 @@ import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; +import org.apache.http.NameValuePair; +import org.apache.http.client.utils.URLEncodedUtils; import org.opensearch.dataprepper.DataPrepper; +import org.opensearch.dataprepper.DataPrepperShutdownOptions; +import org.opensearch.dataprepper.pipeline.parser.DataPrepperDurationParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.ws.rs.HttpMethod; import java.io.IOException; import java.net.HttpURLConnection; +import java.net.URI; +import java.nio.charset.Charset; +import java.util.List; /** - * HttpHandler to handle requests to shut down the data prepper instance + * HttpHandler to handle requests to shut down the Data Prepper instance */ public class ShutdownHandler implements HttpHandler { private final DataPrepper dataPrepper; @@ -40,7 +47,8 @@ public void handle(final HttpExchange exchange) throws IOException { LOG.info("Received HTTP shutdown request to shutdown Data Prepper. Shutdown pipelines and server. User-Agent='{}'", exchange.getRequestHeaders().getFirst("User-Agent")); } - dataPrepper.shutdownPipelines(); + final DataPrepperShutdownOptions dataPrepperShutdownOptions = mapShutdownOptions(exchange.getRequestURI()); + dataPrepper.shutdownPipelines(dataPrepperShutdownOptions); exchange.sendResponseHeaders(HttpURLConnection.HTTP_OK, 0); } catch (final Exception e) { LOG.error("Caught exception shutting down data prepper", e); @@ -50,4 +58,28 @@ public void handle(final HttpExchange exchange) throws IOException { dataPrepper.shutdownServers(); } } + + private DataPrepperShutdownOptions mapShutdownOptions(final URI requestURI) { + final List queryParams = URLEncodedUtils.parse(requestURI, Charset.defaultCharset()); + + DataPrepperShutdownOptions.Builder shutdownOptionsBuilder + = DataPrepperShutdownOptions.builder(); + + for (final NameValuePair queryParam : queryParams) { + final String value = queryParam.getValue(); + switch(queryParam.getName()) { + case "bufferReadTimeout": + shutdownOptionsBuilder = + shutdownOptionsBuilder.withBufferReadTimeout(DataPrepperDurationParser.parse(value)); + break; + case "bufferDrainTimeout": + shutdownOptionsBuilder = + shutdownOptionsBuilder.withBufferDrainTimeout(DataPrepperDurationParser.parse(value)); + break; + default: + LOG.warn("Unrecognized query parameter '{}'", queryParam.getName()); + } + } + return shutdownOptionsBuilder.build(); + } } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/DataPrepperShutdownOptionsTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/DataPrepperShutdownOptionsTest.java new file mode 100644 index 0000000000..42ea27e97b --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/DataPrepperShutdownOptionsTest.java @@ -0,0 +1,82 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Random; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class DataPrepperShutdownOptionsTest { + private Random random; + + @BeforeEach + void setUp() { + random = new Random(); + } + + @Test + void defaultOptions_returns_correct_defaults() { + final DataPrepperShutdownOptions options = DataPrepperShutdownOptions.defaultOptions(); + + assertThat(options.getBufferDrainTimeout(), nullValue()); + assertThat(options.getBufferReadTimeout(), nullValue()); + } + + @Test + void builder_returns_valid_builder() { + final DataPrepperShutdownOptions.Builder builder = DataPrepperShutdownOptions.builder(); + + assertThat(builder, notNullValue()); + } + + @Test + void build_throws_if_bufferReadTimeout_is_greater_than_bufferDrainTimeout() { + final Duration bufferDrainTimeout = Duration.ofSeconds(random.nextInt(20)); + final Duration bufferReadTimeout = bufferDrainTimeout.plus(1, ChronoUnit.MILLIS); + final DataPrepperShutdownOptions.Builder builder = DataPrepperShutdownOptions.builder() + .withBufferDrainTimeout(bufferDrainTimeout) + .withBufferReadTimeout(bufferReadTimeout); + assertThrows(IllegalArgumentException.class, builder::build); + } + + @Test + void build_creates_new_options_with_bufferReadTimeout_equal_to_bufferDrainTimeout() { + final Duration timeout = Duration.ofSeconds(random.nextInt(20)); + final DataPrepperShutdownOptions dataPrepperShutdownOptions = DataPrepperShutdownOptions.builder() + .withBufferDrainTimeout(timeout) + .withBufferReadTimeout(timeout) + .build(); + + + assertThat(dataPrepperShutdownOptions, notNullValue()); + assertThat(dataPrepperShutdownOptions.getBufferReadTimeout(), equalTo(timeout)); + assertThat(dataPrepperShutdownOptions.getBufferDrainTimeout(), equalTo(timeout)); + } + + @Test + void build_creates_new_options_with_bufferReadTimeout_less_than_bufferDrainTimeout() { + final Duration bufferReadTimeout = Duration.ofSeconds(random.nextInt(20)); + final Duration bufferDrainTimeout = Duration.ofSeconds(random.nextInt(20)).plus(bufferReadTimeout); + final DataPrepperShutdownOptions dataPrepperShutdownOptions = DataPrepperShutdownOptions.builder() + .withBufferDrainTimeout(bufferDrainTimeout) + .withBufferReadTimeout(bufferReadTimeout) + .build(); + + + assertThat(dataPrepperShutdownOptions, notNullValue()); + assertThat(dataPrepperShutdownOptions.getBufferReadTimeout(), equalTo(bufferReadTimeout)); + assertThat(dataPrepperShutdownOptions.getBufferDrainTimeout(), equalTo(bufferDrainTimeout)); + } +} \ No newline at end of file diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/DataPrepperTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/DataPrepperTests.java index 670d9664c6..3332be605f 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/DataPrepperTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/DataPrepperTests.java @@ -111,8 +111,9 @@ public void testGivenValidPipelineParserWhenExecuteThenAllPipelinesExecuteAndSer @Test public void testDataPrepperShutdown() throws NoSuchFieldException, IllegalAccessException { - createObjectUnderTest().shutdownPipelines(); - verify(pipeline).shutdown(); + final DataPrepperShutdownOptions dataPrepperShutdownOptions = mock(DataPrepperShutdownOptions.class); + createObjectUnderTest().shutdownPipelines(dataPrepperShutdownOptions); + verify(pipeline).shutdown(dataPrepperShutdownOptions); } @Test @@ -120,14 +121,14 @@ public void testDataPrepperShutdownPipeline() throws NoSuchFieldException, Illeg final Pipeline randomPipeline = mock(Pipeline.class); lenient().when(randomPipeline.isReady()).thenReturn(true); parseConfigurationFixture.put("Random Pipeline", randomPipeline); - createObjectUnderTest().shutdownPipelines("Random Pipeline"); + createObjectUnderTest().shutdownPipeline("Random Pipeline"); verify(randomPipeline).shutdown(); } @Test public void testDataPrepperShutdownNonExistentPipelineWithoutException() throws NoSuchFieldException, IllegalAccessException { - createObjectUnderTest().shutdownPipelines("Missing Pipeline"); + createObjectUnderTest().shutdownPipeline("Missing Pipeline"); } @Test diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineShutdownTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineShutdownTest.java new file mode 100644 index 0000000000..36ac4aa3d1 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineShutdownTest.java @@ -0,0 +1,186 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.pipeline; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.DataPrepperShutdownOptions; +import org.opensearch.dataprepper.model.buffer.Buffer; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.Random; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class PipelineShutdownTest { + @Mock + private Buffer buffer; + + @Mock + private Clock clock; + + @Mock + private DataPrepperShutdownOptions dataPrepperShutdownOptions; + + private Duration bufferDrainTimeout; + private Random random; + + @BeforeEach + void setUp() { + random = new Random(); + bufferDrainTimeout = Duration.ofSeconds(random.nextInt(100) + 1_000); + + when(buffer.getDrainTimeout()).thenReturn(bufferDrainTimeout); + } + + private PipelineShutdown createObjectUnderTest() { + return new PipelineShutdown(buffer, clock); + } + + @Test + void constructor_throws_if_drainTimeout_is_null() { + reset(buffer); + when(buffer.getDrainTimeout()).thenReturn(null); + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Test + void isStopRequested_returns_false() { + assertThat(createObjectUnderTest().isStopRequested(), equalTo(false)); + } + + @Test + void isForceStopReadingBuffers_returns_false() { + assertThat(createObjectUnderTest().isForceStopReadingBuffers(), equalTo(false)); + } + + @Test + void isStopRequested_returns_true_after_shutdown() { + final PipelineShutdown objectUnderTest = createObjectUnderTest(); + when(clock.millis()).thenReturn(Clock.systemUTC().millis()); + objectUnderTest.shutdown(dataPrepperShutdownOptions); + assertThat(objectUnderTest.isStopRequested(), equalTo(true)); + } + + @Test + void isStopRequested_returns_true_after_multiple_shutdown_calls() { + final PipelineShutdown objectUnderTest = createObjectUnderTest(); + when(clock.millis()).thenReturn(Clock.systemUTC().millis()); + for (int i = 0; i < 10; i++) { + objectUnderTest.shutdown(dataPrepperShutdownOptions); + } + assertThat(objectUnderTest.isStopRequested(), equalTo(true)); + } + + @Test + void isForceStopReadingBuffers_returns_false_after_shutdown_if_getBufferReadTimeout_is_null() { + final PipelineShutdown objectUnderTest = createObjectUnderTest(); + + when(dataPrepperShutdownOptions.getBufferReadTimeout()).thenReturn(null); + objectUnderTest.shutdown(dataPrepperShutdownOptions); + + assertThat(objectUnderTest.isForceStopReadingBuffers(), equalTo(false)); + } + + @Test + void isForceStopReadingBuffers_returns_false_after_shutdown_if_time_is_before_shutdown_plus_getBufferReadTimeout() { + final PipelineShutdown objectUnderTest = createObjectUnderTest(); + + when(dataPrepperShutdownOptions.getBufferReadTimeout()).thenReturn(Duration.ofSeconds(1)); + final Instant baseTime = Instant.now(); + when(clock.millis()) + .thenReturn(baseTime.toEpochMilli()); + + objectUnderTest.shutdown(dataPrepperShutdownOptions); + + assertThat(objectUnderTest.isForceStopReadingBuffers(), equalTo(false)); + } + + @Test + void isForceStopReadingBuffers_returns_true_after_shutdown_if_time_is_after_shutdown_plus_getBufferReadTimeout() { + final PipelineShutdown objectUnderTest = createObjectUnderTest(); + + when(dataPrepperShutdownOptions.getBufferReadTimeout()).thenReturn(Duration.ofSeconds(1)); + final Instant baseTime = Instant.now(); + when(clock.millis()) + .thenReturn(baseTime.toEpochMilli()) + .thenReturn(baseTime.plusSeconds(2).toEpochMilli()); + + objectUnderTest.shutdown(dataPrepperShutdownOptions); + + assertThat(objectUnderTest.isForceStopReadingBuffers(), equalTo(true)); + } + + @Test + void isForceStopReadingBuffers_returns_true_if_shutdown_is_called_multiple_times() { + final PipelineShutdown objectUnderTest = createObjectUnderTest(); + + when(dataPrepperShutdownOptions.getBufferReadTimeout()) + .thenReturn(Duration.ofSeconds(1)) + .thenReturn(Duration.ofSeconds(5)); + final Instant baseTime = Instant.now(); + when(clock.millis()) + .thenReturn(baseTime.toEpochMilli()) + .thenReturn(baseTime.plusSeconds(2).toEpochMilli()); + + objectUnderTest.shutdown(dataPrepperShutdownOptions); + objectUnderTest.shutdown(dataPrepperShutdownOptions); + + assertThat(objectUnderTest.isForceStopReadingBuffers(), equalTo(true)); + } + + @Test + void isForceStopReadingBuffers_returns_true_if_shutdown_is_called_in_between_isForceStopReadingBuffers_calls() { + final PipelineShutdown objectUnderTest = createObjectUnderTest(); + + when(dataPrepperShutdownOptions.getBufferReadTimeout()) + .thenReturn(Duration.ofSeconds(1)) + .thenReturn(Duration.ofSeconds(5)); + final Instant baseTime = Instant.now(); + when(clock.millis()) + .thenReturn(baseTime.toEpochMilli()) + .thenReturn(baseTime.plusSeconds(2).toEpochMilli()); + + objectUnderTest.shutdown(dataPrepperShutdownOptions); + assertThat(objectUnderTest.isForceStopReadingBuffers(), equalTo(true)); + + objectUnderTest.shutdown(dataPrepperShutdownOptions); + assertThat(objectUnderTest.isForceStopReadingBuffers(), equalTo(true)); + } + + + @Test + void getBufferDrainTimeout_returns_buffer_getDrainTimeout_if_shutdown_not_called() { + assertThat(createObjectUnderTest().getBufferDrainTimeout(), equalTo(bufferDrainTimeout)); + } + + @Test + void getBufferDrainTimeout_returns_buffer_getDrainTimeout_if_shutdown_called_without_bufferDrainTimeout() { + final PipelineShutdown objectUnderTest = createObjectUnderTest(); + when(dataPrepperShutdownOptions.getBufferDrainTimeout()).thenReturn(null); + objectUnderTest.shutdown(dataPrepperShutdownOptions); + assertThat(objectUnderTest.getBufferDrainTimeout(), equalTo(bufferDrainTimeout)); + } + + @Test + void getBufferDrainTimeout_returns_buffer_shutdownOptions_bufferDrainTimeout_if_provided() { + final PipelineShutdown objectUnderTest = createObjectUnderTest(); + Duration bufferDrainTimeoutFromOptions = Duration.ofSeconds(random.nextInt(100) + 100); + when(dataPrepperShutdownOptions.getBufferDrainTimeout()).thenReturn(bufferDrainTimeoutFromOptions); + objectUnderTest.shutdown(dataPrepperShutdownOptions); + assertThat(objectUnderTest.getBufferDrainTimeout(), equalTo(bufferDrainTimeoutFromOptions)); + } +} \ No newline at end of file diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java index c2e0ad769f..66300969c0 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java @@ -9,6 +9,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.DataPrepperShutdownOptions; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.buffer.Buffer; @@ -93,9 +94,9 @@ void setup() { eventFactory = mock(EventFactory.class); acknowledgementSetManager = mock(AcknowledgementSetManager.class); sourceCoordinatorFactory = mock(SourceCoordinatorFactory.class); - processorShutdownTimeout = Duration.ofSeconds(Math.abs(new Random().nextInt(10))); - sinkShutdownTimeout = Duration.ofSeconds(Math.abs(new Random().nextInt(10))); - peerForwarderDrainTimeout = Duration.ofSeconds(Math.abs(new Random().nextInt(10))); + processorShutdownTimeout = Duration.ofMillis(Math.abs(new Random().nextInt(10))); + sinkShutdownTimeout = Duration.ofMillis(Math.abs(new Random().nextInt(10))); + peerForwarderDrainTimeout = Duration.ofMillis(Math.abs(new Random().nextInt(10))); } @AfterEach @@ -620,4 +621,33 @@ void shutdown_does_not_call_removed_PipelineObservers() { testPipeline.shutdown(); verifyNoInteractions(pipelineObserver); } + + @Test + void isForceStopReadingBuffers_returns_false_if_not_in_shutdown() { + final Source> testSource = new TestSource(); + final DataFlowComponent sinkDataFlowComponent = mock(DataFlowComponent.class); + final TestSink testSink = new TestSink(); + when(sinkDataFlowComponent.getComponent()).thenReturn(testSink); + testPipeline = new Pipeline(TEST_PIPELINE_NAME, testSource, new BlockingBuffer(TEST_PIPELINE_NAME), + Collections.emptyList(), Collections.singletonList(sinkDataFlowComponent), router, + eventFactory, acknowledgementSetManager, sourceCoordinatorFactory, TEST_PROCESSOR_THREADS, TEST_READ_BATCH_TIMEOUT, + processorShutdownTimeout, sinkShutdownTimeout, peerForwarderDrainTimeout); + assertThat(testPipeline.isForceStopReadingBuffers(), equalTo(false)); + } + + @Test + void isForceStopReadingBuffers_returns_true_if_bufferReadTimeout_is_exceeded() throws InterruptedException { + final Source> testSource = new TestSource(); + final DataFlowComponent sinkDataFlowComponent = mock(DataFlowComponent.class); + final TestSink testSink = new TestSink(); + when(sinkDataFlowComponent.getComponent()).thenReturn(testSink); + testPipeline = new Pipeline(TEST_PIPELINE_NAME, testSource, new BlockingBuffer(TEST_PIPELINE_NAME), + Collections.emptyList(), Collections.singletonList(sinkDataFlowComponent), router, + eventFactory, acknowledgementSetManager, sourceCoordinatorFactory, TEST_PROCESSOR_THREADS, TEST_READ_BATCH_TIMEOUT, + processorShutdownTimeout, sinkShutdownTimeout, peerForwarderDrainTimeout); + + testPipeline.shutdown(DataPrepperShutdownOptions.builder().withBufferReadTimeout(Duration.ofMillis(1)).build()); + Thread.sleep(2); + assertThat(testPipeline.isForceStopReadingBuffers(), is(true)); + } } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/server/ShutdownHandlerTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/server/ShutdownHandlerTest.java index 19f1e839e1..0d36d05b1d 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/server/ShutdownHandlerTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/server/ShutdownHandlerTest.java @@ -8,20 +8,29 @@ import com.sun.net.httpserver.Headers; import com.sun.net.httpserver.HttpExchange; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.DataPrepper; +import org.opensearch.dataprepper.DataPrepperShutdownOptions; import javax.ws.rs.HttpMethod; import java.io.IOException; import java.io.OutputStream; import java.net.HttpURLConnection; +import java.net.URI; +import java.time.Duration; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.lenient; @@ -51,23 +60,6 @@ public void beforeEach() { .thenReturn(new Headers()); } - @Test - public void testWhenShutdownWithPostRequestThenResponseWritten() throws IOException { - when(exchange.getRequestMethod()) - .thenReturn(HttpMethod.POST); - - shutdownHandler.handle(exchange); - - verify(dataPrepper, times(1)) - .shutdownPipelines(); - verify(exchange, times(1)) - .sendResponseHeaders(eq(HttpURLConnection.HTTP_OK), eq(0L)); - verify(responseBody, times(1)) - .close(); - verify(dataPrepper, times(1)) - .shutdownServers(); - } - @ParameterizedTest @ValueSource(strings = { HttpMethod.DELETE, HttpMethod.GET, HttpMethod.PATCH, HttpMethod.PUT }) public void testWhenShutdownWithProhibitedHttpMethodThenErrorResponseWritten(String httpMethod) throws IOException { @@ -82,17 +74,91 @@ public void testWhenShutdownWithProhibitedHttpMethodThenErrorResponseWritten(Str .close(); } - @Test - public void testHandleException() throws IOException { - when(exchange.getRequestMethod()) - .thenReturn(HttpMethod.POST); - doThrow(RuntimeException.class).when(dataPrepper).shutdownPipelines(); - - shutdownHandler.handle(exchange); + @Nested + class WithoutQueryParameters { + @BeforeEach + void setUp() { + when(exchange.getRequestURI()).thenReturn(URI.create("/shutdown")); + } + + @Test + public void testWhenShutdownWithPostRequestThenResponseWritten() throws IOException { + when(exchange.getRequestMethod()) + .thenReturn(HttpMethod.POST); + + shutdownHandler.handle(exchange); + + ArgumentCaptor shutdownOptionsArgumentCaptor = ArgumentCaptor.forClass(DataPrepperShutdownOptions.class); + verify(dataPrepper, times(1)) + .shutdownPipelines(shutdownOptionsArgumentCaptor.capture()); + verify(exchange, times(1)) + .sendResponseHeaders(eq(HttpURLConnection.HTTP_OK), eq(0L)); + verify(responseBody, times(1)) + .close(); + verify(dataPrepper, times(1)) + .shutdownServers(); + + DataPrepperShutdownOptions actualShutdownOptions = shutdownOptionsArgumentCaptor.getValue(); + assertThat(actualShutdownOptions.getBufferDrainTimeout(), nullValue()); + assertThat(actualShutdownOptions.getBufferReadTimeout(), nullValue()); + } + + @Test + public void testHandleException() throws IOException { + when(exchange.getRequestMethod()) + .thenReturn(HttpMethod.POST); + doThrow(RuntimeException.class).when(dataPrepper).shutdownPipelines(any(DataPrepperShutdownOptions.class)); + + shutdownHandler.handle(exchange); + + verify(exchange, times(1)) + .sendResponseHeaders(eq(HttpURLConnection.HTTP_INTERNAL_ERROR), eq(0L)); + verify(responseBody, times(1)) + .close(); + } + } - verify(exchange, times(1)) - .sendResponseHeaders(eq(HttpURLConnection.HTTP_INTERNAL_ERROR), eq(0L)); - verify(responseBody, times(1)) - .close(); + @Nested + class WithoutShutdownQueryParameters { + @BeforeEach + void setUp() { + when(exchange.getRequestURI()).thenReturn(URI.create("/shutdown?bufferReadTimeout=1500ms&bufferDrainTimeout=20s")); + } + + @Test + public void testWhenShutdownWithPostRequestThenResponseWritten() throws IOException { + when(exchange.getRequestMethod()) + .thenReturn(HttpMethod.POST); + + shutdownHandler.handle(exchange); + + final ArgumentCaptor shutdownOptionsArgumentCaptor = ArgumentCaptor.forClass(DataPrepperShutdownOptions.class); + verify(dataPrepper, times(1)) + .shutdownPipelines(shutdownOptionsArgumentCaptor.capture()); + verify(exchange, times(1)) + .sendResponseHeaders(eq(HttpURLConnection.HTTP_OK), eq(0L)); + verify(responseBody, times(1)) + .close(); + verify(dataPrepper, times(1)) + .shutdownServers(); + + final DataPrepperShutdownOptions actualShutdownOptions = shutdownOptionsArgumentCaptor.getValue(); + assertThat(actualShutdownOptions.getBufferDrainTimeout(), equalTo(Duration.ofSeconds(20))); + assertThat(actualShutdownOptions.getBufferReadTimeout(), equalTo(Duration.ofMillis(1500))); + } + + @Test + public void testHandleException() throws IOException { + when(exchange.getRequestMethod()) + .thenReturn(HttpMethod.POST); + doThrow(RuntimeException.class).when(dataPrepper).shutdownPipelines(any(DataPrepperShutdownOptions.class)); + + shutdownHandler.handle(exchange); + + verify(exchange, times(1)) + .sendResponseHeaders(eq(HttpURLConnection.HTTP_INTERNAL_ERROR), eq(0L)); + verify(responseBody, times(1)) + .close(); + } } } diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/DataPrepperDurationDeserializer.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/DataPrepperDurationDeserializer.java index 5005eb9f96..04d6e05867 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/DataPrepperDurationDeserializer.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/DataPrepperDurationDeserializer.java @@ -24,54 +24,17 @@ */ public class DataPrepperDurationDeserializer extends StdDeserializer { - private static final String SIMPLE_DURATION_REGEX = "^(0|[1-9]\\d*)(s|ms)$"; - private static final Pattern SIMPLE_DURATION_PATTERN = Pattern.compile(SIMPLE_DURATION_REGEX); - public DataPrepperDurationDeserializer() { this(null); } - protected DataPrepperDurationDeserializer(Class vc) { + protected DataPrepperDurationDeserializer(final Class vc) { super(vc); } @Override - public Duration deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + public Duration deserialize(final JsonParser p, final DeserializationContext ctxt) throws IOException { final String durationString = p.getValueAsString(); - Duration duration; - - try { - duration = Duration.parse(durationString); - } catch (final DateTimeParseException e) { - duration = parseSimpleDuration(durationString); - if (duration == null) { - throw new IllegalArgumentException("Durations must use either ISO 8601 notation or simple notations for seconds (60s) or milliseconds (100ms). Whitespace is ignored."); - } - } - - return duration; - } - - private Duration parseSimpleDuration(final String durationString) throws IllegalArgumentException { - final String durationStringNoSpaces = durationString.replaceAll("\\s", ""); - final Matcher matcher = SIMPLE_DURATION_PATTERN.matcher(durationStringNoSpaces); - if (!matcher.find()) { - return null; - } - - final long durationNumber = Long.parseLong(matcher.group(1)); - final String durationUnit = matcher.group(2); - - return getDurationFromUnitAndNumber(durationNumber, durationUnit); - } - - private Duration getDurationFromUnitAndNumber(final long durationNumber, final String durationUnit) { - switch (durationUnit) { - case "s": - return Duration.ofSeconds(durationNumber); - case "ms": - return Duration.ofMillis(durationNumber); - } - return null; + return DataPrepperDurationParser.parse(durationString); } } diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/DataPrepperDurationParser.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/DataPrepperDurationParser.java new file mode 100644 index 0000000000..e758278cf7 --- /dev/null +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/DataPrepperDurationParser.java @@ -0,0 +1,59 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.pipeline.parser; + +import java.time.Duration; +import java.time.format.DateTimeParseException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Parses strings into {@link Duration} supporting the Data Prepper duration format. + * It supports ISO 8601 notation ("PT20.345S", "PT15M", etc.) and simple durations for + * seconds (60s) and milliseconds (100ms). It does not support combining the units for simple durations ("60s 100ms" is not allowed). + * Whitespace is ignored and leading zeroes are not allowed. + * @since 2.10 + */ +public class DataPrepperDurationParser { + private static final String SIMPLE_DURATION_REGEX = "^(0|[1-9]\\d*)(s|ms)$"; + private static final Pattern SIMPLE_DURATION_PATTERN = Pattern.compile(SIMPLE_DURATION_REGEX); + + public static Duration parse(final String durationString) { + try { + return Duration.parse(durationString); + } catch (final DateTimeParseException e) { + final Duration duration = parseSimpleDuration(durationString); + if (duration == null) { + throw new IllegalArgumentException("Durations must use either ISO 8601 notation or simple notations for seconds (60s) or milliseconds (100ms). Whitespace is ignored."); + } + return duration; + } + } + + private static Duration parseSimpleDuration(final String durationString) throws IllegalArgumentException { + final String durationStringNoSpaces = durationString.replaceAll("\\s", ""); + final Matcher matcher = SIMPLE_DURATION_PATTERN.matcher(durationStringNoSpaces); + if (!matcher.find()) { + return null; + } + + final long durationNumber = Long.parseLong(matcher.group(1)); + final String durationUnit = matcher.group(2); + + return getDurationFromUnitAndNumber(durationNumber, durationUnit); + } + + private static Duration getDurationFromUnitAndNumber(final long durationNumber, final String durationUnit) { + switch (durationUnit) { + case "s": + return Duration.ofSeconds(durationNumber); + case "ms": + return Duration.ofMillis(durationNumber); + } + return null; + } + +} diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/DataPrepperDurationParserTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/DataPrepperDurationParserTest.java new file mode 100644 index 0000000000..2ed41661a7 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/DataPrepperDurationParserTest.java @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.pipeline.parser; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.time.Duration; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.*; + +class DataPrepperDurationParserTest { + + @ParameterizedTest + @ValueSource(strings = {"6s1s", "60ms 100s", "20.345s", "-1s", "06s", "100m", "100sm", "100"}) + void invalidDurationStringsThrowIllegalArgumentException(final String durationString) { + assertThrows(IllegalArgumentException.class, () -> DataPrepperDurationParser.parse(durationString)); + } + + @Test + void ISO_8601_duration_string_returns_correct_duration() { + final String durationString = "PT15M"; + final Duration result = DataPrepperDurationParser.parse(durationString); + assertThat(result, equalTo(Duration.ofMinutes(15))); + } + + @ParameterizedTest + @ValueSource(strings = {"0s", "0ms"}) + void simple_duration_strings_of_0_return_correct_duration(final String durationString) { + final Duration result = DataPrepperDurationParser.parse(durationString); + + assertThat(result, equalTo(Duration.ofSeconds(0))); + } + + @ParameterizedTest + @ValueSource(strings = {"60s", "60000ms", "60 s", "60000 ms", " 60 s "}) + void simple_duration_strings_of_60_seconds_return_correct_duration(final String durationString) { + final Duration result = DataPrepperDurationParser.parse(durationString); + + assertThat(result, equalTo(Duration.ofSeconds(60))); + } + + @ParameterizedTest + @ValueSource(strings = {"5s", "5000ms", "5 s", "5000 ms", " 5 s "}) + void simple_duration_strings_of_5_seconds_return_correct_duration(final String durationString) { + final Duration result = DataPrepperDurationParser.parse(durationString); + + assertThat(result, equalTo(Duration.ofSeconds(5))); + } +} \ No newline at end of file