Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
deepaksahu562 committed Apr 17, 2023
2 parents a2a1882 + 829afa9 commit 8e79289
Show file tree
Hide file tree
Showing 62 changed files with 1,760 additions and 707 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,12 @@
package org.opensearch.dataprepper.model.event;

public interface EventHandle {
/**
* releases event handle
*
* @param result result to be used while releasing. This indicates if
* the operation on the event handle is success or not
* @since 2.2
*/
void release(boolean result);
}
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public String formatString(final String format) {
while ((position = format.indexOf("${", fromIndex)) != -1) {
int endPosition = format.indexOf("}", position + 1);
if (endPosition == -1) {
throw new RuntimeException("index name not properly formed");
throw new RuntimeException("Format string is not properly formed");
}
result += format.substring(fromIndex, position);
String name = format.substring(position + 2, endPosition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.model.failures;

import org.opensearch.dataprepper.model.event.EventHandle;
import org.apache.commons.lang3.StringUtils;

import java.time.Instant;
Expand Down Expand Up @@ -36,8 +37,10 @@ public class DlqObject {

private final String timestamp;

private final EventHandle eventHandle;

private DlqObject(final String pluginId, final String pluginName, final String pipelineName,
final String timestamp, final Object failedData) {
final String timestamp, final Object failedData, final EventHandle eventHandle) {

checkNotNull(pluginId, "pluginId cannot be null");
checkArgument(!pluginId.isEmpty(), "pluginId cannot be an empty string");
Expand All @@ -51,6 +54,7 @@ private DlqObject(final String pluginId, final String pluginName, final String p
this.pluginName = pluginName;
this.pipelineName = pipelineName;
this.failedData = failedData;
this.eventHandle = eventHandle;

this.timestamp = StringUtils.isEmpty(timestamp) ? FORMATTER.format(Instant.now()) : timestamp;
}
Expand All @@ -75,6 +79,16 @@ public String getTimestamp() {
return timestamp;
}

public EventHandle getEventHandle() {
return eventHandle;
}

public void releaseEventHandle(boolean result) {
if (eventHandle != null) {
eventHandle.release(result);
}
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
Expand All @@ -84,6 +98,7 @@ public boolean equals(final Object o) {
&& Objects.equals(pluginId, that.pluginId)
&& Objects.equals(pluginName, that.pluginName)
&& Objects.equals(pipelineName, that.pipelineName)
&& Objects.equals(eventHandle, that.eventHandle)
&& Objects.equals(timestamp, that.getTimestamp());
}

Expand Down Expand Up @@ -113,6 +128,7 @@ public static class Builder {
private String pluginName;
private String pipelineName;
private Object failedData;
private EventHandle eventHandle;

private String timestamp;

Expand Down Expand Up @@ -141,13 +157,18 @@ public Builder withTimestamp(final String timestamp) {
return this;
}

public Builder withEventHandle(final EventHandle eventHandle) {
this.eventHandle = eventHandle;
return this;
}

public Builder withTimestamp(final Instant instant) {
this.timestamp = FORMATTER.format(instant);
return this;
}

public DlqObject build() {
return new DlqObject(this.pluginId, this.pluginName, this.pipelineName, this.timestamp, this.failedData);
return new DlqObject(this.pluginId, this.pluginName, this.pipelineName, this.timestamp, this.failedData, this.eventHandle);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
public class JacksonEventTest {

class TestEventHandle implements EventHandle {
@Override
public void release(boolean result) {}
};

private Event event;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.model.failures;

import org.opensearch.dataprepper.model.event.EventHandle;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
Expand All @@ -27,6 +28,10 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.ArgumentMatchers.any;

public class DlqObjectTest {

Expand All @@ -35,13 +40,15 @@ public class DlqObjectTest {
private String pluginName;
private String pipelineName;
private Object failedData;
private EventHandle eventHandle;

@BeforeEach
public void setUp() {
pluginId = randomUUID().toString();
pluginName = randomUUID().toString();
pipelineName = randomUUID().toString();
failedData = randomUUID();
eventHandle = mock(EventHandle.class);
}

@Test
Expand All @@ -52,6 +59,7 @@ public void test_build_with_timestamp() {
.withPluginName(pluginName)
.withPipelineName(pipelineName)
.withFailedData(failedData)
.withEventHandle(eventHandle)
.withTimestamp(randomUUID().toString())
.build();

Expand Down Expand Up @@ -126,6 +134,7 @@ public void setup() {
.withPluginName(pluginName)
.withPipelineName(pipelineName)
.withFailedData(failedData)
.withEventHandle(eventHandle)
.build();
}

Expand Down Expand Up @@ -157,6 +166,16 @@ public void test_get_failedData() {
assertThat(actualFailedData, is(failedData));
}

@Test
public void test_get_release_eventHandle() {
doAnswer(a -> { return null; }).when(eventHandle).release(any(Boolean.class));
final Object actualEventHandle = testObject.getEventHandle();
assertThat(actualEventHandle, is(notNullValue()));
assertThat(actualEventHandle, is(eventHandle));
testObject.releaseEventHandle(true);
verify(eventHandle).release(any(Boolean.class));
}

@Test
public void test_get_timestamp() {
final String string = testObject.getTimestamp();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.acknowledgements;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

@Configuration
class AcknowledgementAppConfig {
private static final int MAX_THREADS = 12;

@Bean
CallbackTheadFactory callbackTheadFactory() {
final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
return new CallbackTheadFactory(defaultThreadFactory);
}

@Bean(name = "acknowledgementCallbackExecutor")
ExecutorService acknowledgementCallbackExecutor(final CallbackTheadFactory callbackTheadFactory) {
return Executors.newFixedThreadPool(MAX_THREADS, callbackTheadFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@
import org.slf4j.LoggerFactory;
/**
* AcknowledgementSetMonitor - monitors the acknowledgement sets for completion/expiration
*
* <p>
* Every acknowledgement set must complete (ie get acknowledgements from all the events in it)
* by a specified time. If it is not completed, then it is considered 'expired' and it is
* cleaned up. The 'run' method is invoked periodically to cleanup the acknowledgement sets
* that are either completed or expired.
*/
public class AcknowledgementSetMonitor implements Runnable {
class AcknowledgementSetMonitor implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(AcknowledgementSetMonitor.class);
private final Set<AcknowledgementSet> acknowledgementSets;
private final ReentrantLock lock;
private AtomicInteger numInvalidAcquires;
private AtomicInteger numInvalidReleases;
private AtomicInteger numNullHandles;
private final AtomicInteger numInvalidAcquires;
private final AtomicInteger numInvalidReleases;
private final AtomicInteger numNullHandles;

private DefaultAcknowledgementSet getAcknowledgementSet(final EventHandle eventHandle) {
return (DefaultAcknowledgementSet)((DefaultEventHandle)eventHandle).getAcknowledgementSet();
Expand Down Expand Up @@ -110,7 +110,10 @@ public void release(final EventHandle eventHandle, final boolean success) {
}
}

// For testing
/**
* for testing
* @return the size
*/
int getSize() {
return acknowledgementSets.size();
}
Expand All @@ -120,7 +123,7 @@ public void run() {
lock.lock();
try {
if (acknowledgementSets.size() > 0) {
acknowledgementSets.removeIf((ackSet) -> ((DefaultAcknowledgementSet)ackSet).isDone());
acknowledgementSets.removeIf((ackSet) -> ((DefaultAcknowledgementSet) ackSet).isDone());
}
} finally {
lock.unlock();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.acknowledgements;

import java.time.Duration;

class AcknowledgementSetMonitorThread {
private final Thread monitorThread;
private final AcknowledgementSetMonitor acknowledgementSetMonitor;
private final Duration delayTime;
private volatile boolean isStopped = false;

public AcknowledgementSetMonitorThread(
final AcknowledgementSetMonitor acknowledgementSetMonitor,
final Duration delayTime) {
this.acknowledgementSetMonitor = acknowledgementSetMonitor;
this.delayTime = delayTime;
monitorThread = new Thread(new Monitor());
monitorThread.setDaemon(true);
monitorThread.setName("acknowledgement-monitor");
}

public void start() {
monitorThread.start();
}

public void stop() {
isStopped = true;
}

private class Monitor implements Runnable {
@Override
public void run() {
while (!isStopped) {
acknowledgementSetMonitor.run();
try {
Thread.sleep(delayTime.toMillis());
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.acknowledgements;

import java.util.Objects;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class CallbackTheadFactory implements ThreadFactory {
private final ThreadFactory delegateFactory;
private final AtomicInteger threadNumber = new AtomicInteger(1);

public CallbackTheadFactory(final ThreadFactory delegateFactory) {
this.delegateFactory = Objects.requireNonNull(delegateFactory);
}

@Override
public Thread newThread(final Runnable runnable) {
final Thread thread = delegateFactory.newThread(runnable);
thread.setName("acknowledgement-callback-" + threadNumber.getAndIncrement());
return thread;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,39 @@

package org.opensearch.dataprepper.acknowledgements;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.Map;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.time.Instant;
import java.time.Duration;
import java.util.function.Consumer;

public class DefaultAcknowledgementSet implements AcknowledgementSet {
private static final Logger LOG = LoggerFactory.getLogger(DefaultAcknowledgementSet.class);
private final Consumer<Boolean> callback;
private final Instant expiryTime;
private final ScheduledExecutorService executor;
private final ExecutorService executor;
// This lock protects all the non-final members
private final ReentrantLock lock;
private boolean result;
private Map<EventHandle, AtomicInteger> pendingAcknowledgments;
private ScheduledFuture callbackFuture;
private final Map<EventHandle, AtomicInteger> pendingAcknowledgments;
private Future<?> callbackFuture;

private AtomicInteger numInvalidAcquires;
private AtomicInteger numInvalidReleases;
private final AtomicInteger numInvalidAcquires;
private final AtomicInteger numInvalidReleases;

public DefaultAcknowledgementSet(final ScheduledExecutorService executor, final Consumer<Boolean> callback, final Duration expiryTime) {
public DefaultAcknowledgementSet(final ExecutorService executor, final Consumer<Boolean> callback, final Duration expiryTime) {
this.callback = callback;
this.result = true;
this.executor = executor;
Expand Down Expand Up @@ -127,7 +124,7 @@ public boolean release(final EventHandle eventHandle, final boolean result) {
if (pendingAcknowledgments.get(eventHandle).decrementAndGet() == 0) {
pendingAcknowledgments.remove(eventHandle);
if (pendingAcknowledgments.size() == 0) {
callbackFuture = executor.schedule(() -> {callback.accept(this.result);}, 0, TimeUnit.SECONDS);
callbackFuture = executor.submit(() -> callback.accept(this.result));
return true;
}
}
Expand Down
Loading

0 comments on commit 8e79289

Please sign in to comment.