Skip to content

Commit

Permalink
Use StubServiceEmitter in tests (apache#15426)
Browse files Browse the repository at this point in the history
* Use StubServiceEmitter in tests
* Remove unthrown exception from declaration
  • Loading branch information
kfaraz authored and ythorat2 committed Dec 1, 2023
1 parent 05ef6c5 commit e2b4cb3
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,14 @@
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.InputStream;
import java.net.HttpURLConnection;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;

Expand All @@ -59,22 +56,13 @@ public class KubernetesPeonClientTest
private KubernetesMockServer server;
private KubernetesClientApi clientApi;
private KubernetesPeonClient instance;
private ServiceEmitter serviceEmitter;
private Collection<Event> events;
private StubServiceEmitter serviceEmitter;

@BeforeEach
public void setup()
{
clientApi = new TestKubernetesClient(this.client);
events = new ArrayList<>();
serviceEmitter = new ServiceEmitter("service", "host", null)
{
@Override
public void emit(Event event)
{
events.add(event);
}
};
serviceEmitter = new StubServiceEmitter("service", "host");
instance = new KubernetesPeonClient(clientApi, NAMESPACE, false, serviceEmitter);
}

Expand Down Expand Up @@ -102,7 +90,7 @@ void test_launchPeonJobAndWaitForStart()
Pod peonPod = instance.launchPeonJobAndWaitForStart(job, NoopTask.create(), 1, TimeUnit.SECONDS);

Assertions.assertNotNull(peonPod);
Assertions.assertEquals(1, events.size());
Assertions.assertEquals(1, serviceEmitter.getEvents().size());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
import org.apache.druid.k8s.overlord.common.JobResponse;
Expand All @@ -47,6 +45,7 @@
import org.apache.druid.k8s.overlord.common.PeonPhase;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -92,14 +91,7 @@ public void setup()
new NamedType(IndexTask.IndexTuningConfig.class, "index")
);
k8sClient = new DruidKubernetesClient();
ServiceEmitter serviceEmitter = new ServiceEmitter("service", "host", null)
{
@Override
public void emit(Event event)
{
}
};
peonClient = new KubernetesPeonClient(k8sClient, "default", false, serviceEmitter);
peonClient = new KubernetesPeonClient(k8sClient, "default", false, new NoopServiceEmitter());
druidNode = new DruidNode(
"test",
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@
import org.apache.druid.java.util.common.guava.Accumulators;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.BrokerParallelMergeConfig;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
Expand Down Expand Up @@ -374,13 +372,7 @@ public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback c
);

ClientQuerySegmentWalker walker = new ClientQuerySegmentWalker(
new ServiceEmitter("", "", null)
{
@Override
public void emit(Event event)
{
}
},
new NoopServiceEmitter(),
baseClient,
null /* local client; unused in this test, so pass in null */,
warehouse,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,12 @@
package org.apache.druid.query;

import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import java.util.ArrayList;
import java.util.List;

@SuppressWarnings("DoNotMock")
public class MetricsEmittingQueryProcessingPoolTest
{
Expand All @@ -41,24 +36,14 @@ public void testPrioritizedExecutorDelegate()
Mockito.when(service.getQueueSize()).thenReturn(10);
Mockito.when(service.getActiveTasks()).thenReturn(2);
ExecutorServiceMonitor monitor = new ExecutorServiceMonitor();
List<Event> events = new ArrayList<>();
MetricsEmittingQueryProcessingPool processingPool = new MetricsEmittingQueryProcessingPool(service, monitor);
Assert.assertSame(service, processingPool.delegate());

ServiceEmitter serviceEmitter = new ServiceEmitter("service", "host", Mockito.mock(Emitter.class))
{
@Override
public void emit(Event event)
{
events.add(event);
}
};
final StubServiceEmitter serviceEmitter = new StubServiceEmitter("service", "host");
monitor.doMonitor(serviceEmitter);
Assert.assertEquals(2, events.size());
Assert.assertEquals(((ServiceMetricEvent) (events.get(0))).getMetric(), "segment/scan/pending");
Assert.assertEquals(((ServiceMetricEvent) (events.get(0))).getValue(), 10);
Assert.assertEquals(((ServiceMetricEvent) (events.get(1))).getMetric(), "segment/scan/active");
Assert.assertEquals(((ServiceMetricEvent) (events.get(1))).getValue(), 2);

serviceEmitter.verifyValue("segment/scan/pending", 10);
serviceEmitter.verifyValue("segment/scan/active", 2);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,10 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.AbstractMonitor;
import org.easymock.EasyMock;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -67,7 +66,6 @@
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -206,25 +204,16 @@ public void configure(Binder binder)
public void testMonitor() throws Exception
{
final MemcachedCache cache = MemcachedCache.create(memcachedCacheConfig);
final Emitter emitter = EasyMock.createNiceMock(Emitter.class);
final Collection<Event> events = new ArrayList<>();
final ServiceEmitter serviceEmitter = new ServiceEmitter("service", "host", emitter)
{
@Override
public void emit(Event event)
{
events.add(event);
}
};
final StubServiceEmitter serviceEmitter = new StubServiceEmitter("service", "host");

while (events.isEmpty()) {
while (serviceEmitter.getEvents().isEmpty()) {
Thread.sleep(memcachedCacheConfig.getTimeout());
cache.doMonitor(serviceEmitter);
}

Assert.assertFalse(events.isEmpty());
Assert.assertFalse(serviceEmitter.getEvents().isEmpty());
ObjectMapper mapper = new DefaultObjectMapper();
for (Event event : events) {
for (Event event : serviceEmitter.getEvents()) {
log.debug("Found event `%s`", mapper.writeValueAsString(event.toMap()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,22 @@

package org.apache.druid.curator;

import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class DruidConnectionStateListenerTest
{
private TestEmitter emitter;
private StubServiceEmitter emitter;
private DruidConnectionStateListener listener;

@Before
public void setUp()
{
emitter = new TestEmitter();
emitter = new StubServiceEmitter("DruidConnectionStateListenerTest", "localhost");
listener = new DruidConnectionStateListener(emitter);
}

Expand All @@ -70,10 +61,7 @@ public void test_doMonitor_init()
{
listener.doMonitor(emitter);
Assert.assertEquals(1, emitter.getEvents().size());

final Map<String, Object> eventMap = emitter.getEvents().get(0).toMap();
Assert.assertEquals("zk/connected", eventMap.get("metric"));
Assert.assertEquals(0, eventMap.get("value"));
emitter.verifyValue("zk/connected", 0);
}

@Test
Expand All @@ -83,9 +71,7 @@ public void test_doMonitor_connected()
listener.doMonitor(emitter);
Assert.assertEquals(1, emitter.getEvents().size());

final Map<String, Object> eventMap = emitter.getEvents().get(0).toMap();
Assert.assertEquals("zk/connected", eventMap.get("metric"));
Assert.assertEquals(1, eventMap.get("value"));
emitter.verifyValue("zk/connected", 1);
}

@Test
Expand All @@ -95,9 +81,7 @@ public void test_doMonitor_notConnected()
listener.doMonitor(emitter);
Assert.assertEquals(2, emitter.getEvents().size()); // 2 because stateChanged emitted an alert

final Map<String, Object> eventMap = emitter.getEvents().get(1).toMap();
Assert.assertEquals("zk/connected", eventMap.get("metric"));
Assert.assertEquals(0, eventMap.get("value"));
emitter.verifyValue("zk/connected", 0);
}

@Test
Expand All @@ -106,9 +90,9 @@ public void test_suspendedAlert()
listener.stateChanged(null, ConnectionState.SUSPENDED);
Assert.assertEquals(1, emitter.getEvents().size());

final Map<String, Object> alertMap = emitter.getEvents().get(0).toMap();
Assert.assertEquals("alerts", alertMap.get("feed"));
Assert.assertEquals("ZooKeeper connection[SUSPENDED]", alertMap.get("description"));
final AlertEvent alert = emitter.getAlerts().get(0);
Assert.assertEquals("alerts", alert.getFeed());
Assert.assertEquals("ZooKeeper connection[SUSPENDED]", alert.getDescription());
}

@Test
Expand All @@ -120,31 +104,8 @@ public void test_reconnectedMetric()
listener.stateChanged(null, ConnectionState.RECONNECTED);
Assert.assertEquals(2, emitter.getEvents().size()); // the second stateChanged emits a metric

final Map<String, Object> eventMap = emitter.getEvents().get(1).toMap();
Assert.assertEquals("metrics", eventMap.get("feed"));
Assert.assertEquals("zk/reconnect/time", eventMap.get("metric"));
MatcherAssert.assertThat(eventMap.get("value"), CoreMatchers.instanceOf(Long.class));
MatcherAssert.assertThat(((Number) eventMap.get("value")).longValue(), Matchers.greaterThanOrEqualTo(0L));
long observedReconnectTime = emitter.getValue("zk/reconnect/time", null).longValue();
Assert.assertTrue(observedReconnectTime >= 0);
}

private static class TestEmitter extends NoopServiceEmitter
{
@GuardedBy("events")
private final List<Event> events = new ArrayList<>();

@Override
public void emit(Event event)
{
synchronized (events) {
events.add(event);
}
}

public List<Event> getEvents()
{
synchronized (events) {
return ImmutableList.copyOf(events);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,16 @@

package org.apache.druid.server.initialization.jetty;

import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class JettyServerModuleTest
{
@Test
public void testJettyServerModule()
{
List<Event> events = new ArrayList<>();
ServiceEmitter serviceEmitter = new ServiceEmitter("service", "host", Mockito.mock(Emitter.class))
{
@Override
public void emit(Event event)
{
events.add(event);
}
};
QueuedThreadPool jettyServerThreadPool = Mockito.mock(QueuedThreadPool.class);
JettyServerModule.setJettyServerThreadPool(jettyServerThreadPool);
Mockito.when(jettyServerThreadPool.getThreads()).thenReturn(100);
Expand All @@ -58,25 +40,17 @@ public void emit(Event event)
Mockito.when(jettyServerThreadPool.getBusyThreads()).thenReturn(60);

JettyServerModule.JettyMonitor jettyMonitor = new JettyServerModule.JettyMonitor("ds", "t0");
jettyMonitor.doMonitor(serviceEmitter);

Assert.assertEquals(8, events.size());
List<Pair<String, Number>> expectedEvents = Arrays.asList(
new Pair<>("jetty/numOpenConnections", 0),
new Pair<>("jetty/threadPool/total", 100),
new Pair<>("jetty/threadPool/idle", 40),
new Pair<>("jetty/threadPool/isLowOnThreads", 1),
new Pair<>("jetty/threadPool/min", 30),
new Pair<>("jetty/threadPool/max", 100),
new Pair<>("jetty/threadPool/queueSize", 50),
new Pair<>("jetty/threadPool/busy", 60)
);
final StubServiceEmitter serviceEmitter = new StubServiceEmitter("service", "host");
jettyMonitor.doMonitor(serviceEmitter);

for (int i = 0; i < expectedEvents.size(); i++) {
Pair<String, Number> expected = expectedEvents.get(i);
ServiceMetricEvent actual = (ServiceMetricEvent) (events.get(i));
Assert.assertEquals(expected.lhs, actual.getMetric());
Assert.assertEquals(expected.rhs, actual.getValue());
}
serviceEmitter.verifyValue("jetty/numOpenConnections", 0);
serviceEmitter.verifyValue("jetty/threadPool/total", 100);
serviceEmitter.verifyValue("jetty/threadPool/idle", 40);
serviceEmitter.verifyValue("jetty/threadPool/isLowOnThreads", 1);
serviceEmitter.verifyValue("jetty/threadPool/min", 30);
serviceEmitter.verifyValue("jetty/threadPool/max", 100);
serviceEmitter.verifyValue("jetty/threadPool/queueSize", 50);
serviceEmitter.verifyValue("jetty/threadPool/busy", 60);
}
}
Loading

0 comments on commit e2b4cb3

Please sign in to comment.