Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spans auto app name #566

Merged
merged 10 commits into from
Feb 4, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public HarvestResult attemptToSendReservoir(String appName, EventSender<T> event
}

@Override
public SamplingPriorityQueue<T> getOrCreateReservoir() {
public SamplingPriorityQueue<T> getOrCreateReservoir(String appName) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
public interface ReservoirManager<T extends PriorityAware> {

SamplingPriorityQueue<T> getOrCreateReservoir();
SamplingPriorityQueue<T> getOrCreateReservoir(String appName);

void clearReservoir();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public void externalTest() throws Exception {
doCall();

SpanEventsService spanEventsService = ServiceFactory.getServiceManager().getSpanEventsService();
SamplingPriorityQueue<SpanEvent> spanEventsPool = spanEventsService.getOrCreateDistributedSamplingReservoir();
String appName = ServiceFactory.getConfigService().getDefaultAgentConfig().getApplicationName();
SamplingPriorityQueue<SpanEvent> spanEventsPool = spanEventsService.getOrCreateDistributedSamplingReservoir(appName);
assertNotNull(spanEventsPool);

List<SpanEvent> spanEvents = spanEventsPool.asList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public void testLateAcceptPayload() throws Exception {
deepTransaction(payload);

SpanEventsService spanEventsService = ServiceFactory.getServiceManager().getSpanEventsService();
SamplingPriorityQueue<SpanEvent> spanEventsPool = spanEventsService.getOrCreateDistributedSamplingReservoir();
String appName = ServiceFactory.getConfigService().getDefaultAgentConfig().getApplicationName();
SamplingPriorityQueue<SpanEvent> spanEventsPool = spanEventsService.getOrCreateDistributedSamplingReservoir(appName);
assertNotNull(spanEventsPool);
List<SpanEvent> spanEvents = spanEventsPool.asList();
assertNotNull(spanEvents);
Expand Down Expand Up @@ -94,7 +95,8 @@ public void dispatcherTransactionStatsFinished(TransactionData transactionData,
latch.await(30, TimeUnit.SECONDS);

SpanEventsService spanEventsService = ServiceFactory.getServiceManager().getSpanEventsService();
SamplingPriorityQueue<SpanEvent> spanEventsPool = spanEventsService.getOrCreateDistributedSamplingReservoir();
String appName = ServiceFactory.getConfigService().getDefaultAgentConfig().getApplicationName();
SamplingPriorityQueue<SpanEvent> spanEventsPool = spanEventsService.getOrCreateDistributedSamplingReservoir(appName);
assertNotNull(spanEventsPool);
List<SpanEvent> spanEvents = spanEventsPool.asList();
assertNotNull(spanEvents);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.junit.Assert.assertTrue;

public class RootSpanStatusErrorsTest {
private String APP_NAME;
private static final String CONFIG_FILE = "configs/span_events_test.yml";
private static final ClassLoader CLASS_LOADER = RootSpanStatusErrorsTest.class.getClassLoader();

Expand All @@ -33,9 +34,10 @@ public class RootSpanStatusErrorsTest {
public void before() throws Exception {
holder = new EnvironmentHolder(new EnvironmentHolderSettingsGenerator(CONFIG_FILE, "all_enabled_test", CLASS_LOADER));
holder.setupEnvironment();
APP_NAME = ServiceFactory.getConfigService().getDefaultAgentConfig().getApplicationName();

ServiceFactory.getSpanEventService()
.getOrCreateDistributedSamplingReservoir()
.getOrCreateDistributedSamplingReservoir(APP_NAME)
.clear();
}

Expand All @@ -44,7 +46,7 @@ public void after() {
holder.close();

ServiceFactory.getSpanEventService()
.getOrCreateDistributedSamplingReservoir()
.getOrCreateDistributedSamplingReservoir(APP_NAME)
.clear();
}

Expand All @@ -64,7 +66,7 @@ public void statusCodeShouldOnlyBeStringOnErrorClassOnRootSpan() {

private void assertRootSpanAttributes(Integer expectedStatus, String errorClass) {
SamplingPriorityQueue<SpanEvent> reservoir = ServiceFactory.getSpanEventService()
.getOrCreateDistributedSamplingReservoir();
.getOrCreateDistributedSamplingReservoir(APP_NAME);
assertTrue(reservoir.asList().size() > 0);
List<String> seenNames = new LinkedList<>();
for(SpanEvent event : reservoir.asList()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.junit.Assert.*;

public class SpanErrorsTest {
private String APP_NAME;
private static final String CONFIG_FILE = "configs/span_events_test.yml";
private static final ClassLoader CLASS_LOADER = SpanErrorsTest.class.getClassLoader();

Expand All @@ -34,9 +35,10 @@ public class SpanErrorsTest {
public void before() throws Exception {
holder = new EnvironmentHolder(new EnvironmentHolderSettingsGenerator(CONFIG_FILE, "all_enabled_test", CLASS_LOADER));
holder.setupEnvironment();
APP_NAME = ServiceFactory.getConfigService().getDefaultAgentConfig().getApplicationName();

ServiceFactory.getSpanEventService()
.getOrCreateDistributedSamplingReservoir()
.getOrCreateDistributedSamplingReservoir(APP_NAME)
.clear();
}

Expand All @@ -45,7 +47,7 @@ public void after() {
holder.close();

ServiceFactory.getSpanEventService()
.getOrCreateDistributedSamplingReservoir()
.getOrCreateDistributedSamplingReservoir(APP_NAME)
.clear();
}

Expand Down Expand Up @@ -130,7 +132,7 @@ private void checkSpansNoOtherAttributes(Map<String, List<String>> spanChecks) {

private void checkSpans(Map<String, List<String>> spanChecks, List<String> defaultChecks) {
SamplingPriorityQueue<SpanEvent> reservoir = ServiceFactory.getSpanEventService()
.getOrCreateDistributedSamplingReservoir();
.getOrCreateDistributedSamplingReservoir(APP_NAME);

assertTrue(reservoir.asList().size() > 0);
for (SpanEvent event : reservoir.asList()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.junit.Assert.assertTrue;

public class SpanIdOnErrorsTest {
private String APP_NAME;
private static final String CONFIG_FILE = "configs/span_events_test.yml";
private static final ClassLoader CLASS_LOADER = SpanParentTest.class.getClassLoader();

Expand All @@ -38,6 +39,8 @@ public class SpanIdOnErrorsTest {
public void before() throws Exception {
holder = new EnvironmentHolder(new EnvironmentHolderSettingsGenerator(CONFIG_FILE, "all_enabled_test", CLASS_LOADER));
holder.setupEnvironment();
APP_NAME = ServiceFactory.getConfigService().getDefaultAgentConfig().getApplicationName();

cleanup();
}

Expand All @@ -52,7 +55,7 @@ private void cleanup() {
ServiceFactory.getRPMService().getErrorService().clearReservoir();
Transaction.clearTransaction();
ServiceFactory.getSpanEventService()
.getOrCreateDistributedSamplingReservoir()
.getOrCreateDistributedSamplingReservoir(APP_NAME)
.clear();
ServiceFactory.getTransactionEventsService()
.getOrCreateDistributedSamplingReservoir(ServiceFactory.getRPMService().getApplicationName())
Expand Down Expand Up @@ -111,7 +114,7 @@ private void matchFirstErrorToOriginatingSpan(String expectedSpanName) {

private void assertMethodWhereErrorOriginatedHasThisSpanId(Object spanId, String expectedSpanName) {
List<String> seenSpanNames = new LinkedList<>();
for(SpanEvent spanEvent : ServiceFactory.getSpanEventService().getOrCreateDistributedSamplingReservoir().asList()) {
for(SpanEvent spanEvent : ServiceFactory.getSpanEventService().getOrCreateDistributedSamplingReservoir(APP_NAME).asList()) {
if (spanEvent.getGuid().equals(spanId)) {
assertEquals(expectedSpanName, spanEvent.getName());
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ public void testCrossProcessOnly() throws Exception {
assertEquals(4, tracers2.size()); // 1 "rootTracer" (not in this list) + 2 non-external/datastore tracers + 2 external datastore tracers

SpanEventsService spanEventsService = ServiceFactory.getServiceManager().getSpanEventsService();
SamplingPriorityQueue<SpanEvent> spanEventsPool = spanEventsService.getOrCreateDistributedSamplingReservoir();
String appName = ServiceFactory.getConfigService().getDefaultAgentConfig().getApplicationName();
SamplingPriorityQueue<SpanEvent> spanEventsPool = spanEventsService.getOrCreateDistributedSamplingReservoir(appName);
assertNotNull(spanEventsPool);
List<SpanEvent> spanEvents = spanEventsPool.asList();
spanEventsPool.clear();
Expand Down Expand Up @@ -167,9 +168,9 @@ public void testSpanAndTransactionParenting() throws Exception {
Collection<Tracer> tracers2 = tx2.getTracers();
assertEquals(4, tracers2.size()); // 1 "rootTracer" (not in this list) + 2 non-external/datastore tracers + 2 external datastore tracers

String appName = ServiceFactory.getConfigService().getDefaultAgentConfig().getApplicationName();
SpanEventsService spanEventsService = ServiceFactory.getServiceManager().getSpanEventsService();
SamplingPriorityQueue<SpanEvent> spanEventsPool = spanEventsService.getOrCreateDistributedSamplingReservoir();
String appName = ServiceFactory.getConfigService().getDefaultAgentConfig().getApplicationName();
SamplingPriorityQueue<SpanEvent> spanEventsPool = spanEventsService.getOrCreateDistributedSamplingReservoir(appName);
assertNotNull(spanEventsPool);
List<SpanEvent> spanEvents = spanEventsPool.asList();
spanEventsPool.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public static Collection<Object[]> data() {
public void before() throws Exception {
holder = new EnvironmentHolder(new EnvironmentHolderSettingsGenerator(CONFIG_FILE, ymlEnvironmentName, CLASS_LOADER));
holder.setupEnvironment();
spanEventPool = ServiceFactory.getSpanEventService().getOrCreateDistributedSamplingReservoir();
String appName = ServiceFactory.getConfigService().getDefaultAgentConfig().getApplicationName();
spanEventPool = ServiceFactory.getSpanEventService().getOrCreateDistributedSamplingReservoir(appName);
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void dispatcherTransactionFinished(TransactionData transactionData, Trans
}

@Override
public DistributedSamplingPriorityQueue<SpanEvent> getOrCreateDistributedSamplingReservoir() {
public DistributedSamplingPriorityQueue<SpanEvent> getOrCreateDistributedSamplingReservoir(String appName) {
return new DistributedSamplingPriorityQueue<>(10);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ public ReservoirAddingSpanEventConsumer(ReservoirManager<SpanEvent> reservoirMan
@Override
public void accept(SpanEvent spanEvent) {
if (isSpanEventsEnabled()) {
SamplingPriorityQueue<SpanEvent> reservoir = reservoirManager.getOrCreateReservoir();
String appName = spanEvent.getAppName();
SamplingPriorityQueue<SpanEvent> reservoir = reservoirManager.getOrCreateReservoir(appName);
reservoir.add(spanEvent);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@

import java.util.Collections;
import java.util.Comparator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;

public class CollectorSpanEventReservoirManager implements ReservoirManager<SpanEvent> {

private final ConfigService configService;
private SamplingPriorityQueue<SpanEvent> reservoir;
private ConcurrentHashMap<String, SamplingPriorityQueue<SpanEvent>> spanReservoirsForApp = new ConcurrentHashMap<>();
private volatile int maxSamplesStored;

public CollectorSpanEventReservoirManager(ConfigService configService) {
Expand All @@ -32,23 +33,26 @@ public CollectorSpanEventReservoirManager(ConfigService configService) {
}

@Override
public SamplingPriorityQueue<SpanEvent> getOrCreateReservoir() {
public SamplingPriorityQueue<SpanEvent> getOrCreateReservoir(String appName) {
SamplingPriorityQueue<SpanEvent> reservoir = spanReservoirsForApp.get(appName);
if (reservoir == null) {
reservoir = createDistributedSamplingReservoir(0);
reservoir = spanReservoirsForApp.putIfAbsent(appName, createDistributedSamplingReservoir(appName, 0));
if (reservoir == null) {
reservoir = spanReservoirsForApp.get(appName);
}
}
return reservoir;
}

private SamplingPriorityQueue<SpanEvent> createDistributedSamplingReservoir(int decidedLast) {
String appName = configService.getDefaultAgentConfig().getApplicationName();
private SamplingPriorityQueue<SpanEvent> createDistributedSamplingReservoir(String appName, int decidedLast) {
SpanEventsConfig spanEventsConfig = configService.getDefaultAgentConfig().getSpanEventsConfig();
int target = spanEventsConfig.getTargetSamplesStored();
return new DistributedSamplingPriorityQueue<>(appName, "Span Event Service", maxSamplesStored, decidedLast, target, SPAN_EVENT_COMPARATOR);
}

@Override
public void clearReservoir() {
getOrCreateReservoir().clear();
spanReservoirsForApp.clear();
}

@Override
Expand All @@ -59,11 +63,11 @@ public HarvestResult attemptToSendReservoir(final String appName, EventSender<Sp
}

SpanEventsConfig config = configService.getAgentConfig(appName).getSpanEventsConfig();
int decidedLast = AdaptiveSampling.decidedLast(reservoir, config.getTargetSamplesStored());
int decidedLast = AdaptiveSampling.decidedLast(spanReservoirsForApp.get(appName), config.getTargetSamplesStored());

// save a reference to the old reservoir to finish harvesting, and create a new one
final SamplingPriorityQueue<SpanEvent> toSend = reservoir;
reservoir = createDistributedSamplingReservoir(decidedLast);
final SamplingPriorityQueue<SpanEvent> toSend = spanReservoirsForApp.get(appName);
spanReservoirsForApp.put(appName, createDistributedSamplingReservoir(appName, decidedLast));

if (toSend == null || toSend.size() <= 0) {
return null;
Expand All @@ -80,7 +84,7 @@ public HarvestResult attemptToSendReservoir(final String appName, EventSender<Sp
if (!e.discardHarvestData()) {
logger.log(Level.FINE, "Unable to send span events. Unsent events will be included in the next harvest.", e);
// Save unsent data by merging it with toSend data using reservoir algorithm
reservoir.retryAll(toSend);
spanReservoirsForApp.get(appName).retryAll(toSend);
} else {
// discard harvest data
toSend.clear();
Expand All @@ -102,17 +106,14 @@ public int getMaxSamplesStored() {
@Override
public void setMaxSamplesStored(int newMax) {
maxSamplesStored = newMax;
reservoir = createDistributedSamplingReservoir(0);
ConcurrentHashMap<String, SamplingPriorityQueue<SpanEvent>> newMaxSpanReservoirs = new ConcurrentHashMap<>();
spanReservoirsForApp.forEach((appName,reservoir ) -> newMaxSpanReservoirs.putIfAbsent(appName, createDistributedSamplingReservoir(appName, 0)));
spanReservoirsForApp = newMaxSpanReservoirs;
}

// This is where you can add secondary sorting for Span Events
private static final Comparator<SpanEvent> SPAN_EVENT_COMPARATOR = new Comparator<SpanEvent>() {
@Override
public int compare(SpanEvent left, SpanEvent right) {
return ComparisonChain.start()
.compare(right.getPriority(), left.getPriority()) // Take highest priority first
.result();
}
};
private static final Comparator<SpanEvent> SPAN_EVENT_COMPARATOR = (left, right) -> ComparisonChain.start()
.compare(right.getPriority(), left.getPriority()) // Take highest priority first
.result();

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ public interface SpanEventsService extends EventService {

void addHarvestableToService(String appName);

SamplingPriorityQueue<SpanEvent> getOrCreateDistributedSamplingReservoir();
SamplingPriorityQueue<SpanEvent> getOrCreateDistributedSamplingReservoir(String appName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ private void createAndStoreSpanEvent(Tracer tracer, TransactionData transactionD
return;
}

SamplingPriorityQueue<SpanEvent> reservoir = getOrCreateDistributedSamplingReservoir();
String appName = transactionData.getApplicationName();
SamplingPriorityQueue<SpanEvent> reservoir = getOrCreateDistributedSamplingReservoir(appName);
if (reservoir.isFull() && reservoir.getMinPriority() >= transactionData.getPriority()) {
// The reservoir is full and this event wouldn't make it in, so lets prevent some object allocations
reservoir.incrementNumberOfTries();
Expand Down Expand Up @@ -217,17 +218,11 @@ public void clearReservoir() {
reservoirManager.clearReservoir();
}

/**
* There is only one event reservoir for Distributed Tracing so there only needs to be one harvestable for it.
*/
@Override
public void addHarvestableToService(String appName) {
final String primaryApplication = ServiceFactory.getConfigService().getDefaultAgentConfig().getApplicationName();
if (primaryApplication.equals(appName)) {
Harvestable harvestable = new SpanEventHarvestableImpl(this, appName);
ServiceFactory.getHarvestService().addHarvestable(harvestable);
harvestables.add(harvestable);
}
}

@Override
Expand All @@ -243,8 +238,8 @@ public void configChanged(String appName, AgentConfig agentConfig) {
}

@Override
public SamplingPriorityQueue<SpanEvent> getOrCreateDistributedSamplingReservoir() {
return reservoirManager.getOrCreateReservoir();
public SamplingPriorityQueue<SpanEvent> getOrCreateDistributedSamplingReservoir(String appName) {
return reservoirManager.getOrCreateReservoir(appName);
}

public static Builder builder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,4 @@ private DistributedTraceUtil() {
public static boolean isSampledPriority(float priority) {
return priority >= 1.0f;
}

public static AtomicReference<Float> getSpanEventPriority() {
SamplingPriorityQueue<SpanEvent> spanReservoir = ServiceFactory.getServiceManager()
.getSpanEventsService().getOrCreateDistributedSamplingReservoir();
return new AtomicReference<>(ServiceFactory.getDistributedTraceService().calculatePriority(null, spanReservoir));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ private void createAndVerifySpanEvents(Long expectedCount, Long expectedEndpoint
}

// Verify that the correct number of events were stored in the reservoir
SamplingPriorityQueue<SpanEvent> eventQueue = spanEventsService.getOrCreateDistributedSamplingReservoir();
SamplingPriorityQueue<SpanEvent> eventQueue = spanEventsService.getOrCreateDistributedSamplingReservoir(APP_NAME);
assertNotNull(eventQueue);
assertEquals(expectedCount.intValue(), eventQueue.size());

Expand Down
Loading