Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added SpanProcessor OnEnding callback #6367

Merged
merged 18 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/apidiffs/current_vs_latest/opentelemetry-sdk-trace.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
Comparing source compatibility of opentelemetry-sdk-trace-1.41.0-SNAPSHOT.jar against opentelemetry-sdk-trace-1.40.0.jar
No changes.
*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.sdk.trace.SpanProcessor (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) boolean isOnEndingRequired()
+++ NEW METHOD: PUBLIC(+) void onEnding(io.opentelemetry.sdk.trace.ReadWriteSpan)
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
final class MultiSpanProcessor implements SpanProcessor {
private final List<SpanProcessor> spanProcessorsStart;
private final List<SpanProcessor> spanProcessorsBeforeEnd;
private final List<SpanProcessor> spanProcessorsEnd;
private final List<SpanProcessor> spanProcessorsAll;
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
Expand Down Expand Up @@ -58,6 +59,18 @@ public boolean isEndRequired() {
return !spanProcessorsEnd.isEmpty();
}

@Override
public void onEnding(ReadWriteSpan span) {
for (SpanProcessor spanProcessor : spanProcessorsBeforeEnd) {
spanProcessor.onEnding(span);
}
}

@Override
public boolean isOnEndingRequired() {
return !spanProcessorsBeforeEnd.isEmpty();
}

@Override
public CompletableResultCode shutdown() {
if (isShutdown.getAndSet(true)) {
Expand All @@ -83,10 +96,14 @@ private MultiSpanProcessor(List<SpanProcessor> spanProcessors) {
this.spanProcessorsAll = spanProcessors;
this.spanProcessorsStart = new ArrayList<>(spanProcessorsAll.size());
this.spanProcessorsEnd = new ArrayList<>(spanProcessorsAll.size());
this.spanProcessorsBeforeEnd = new ArrayList<>(spanProcessorsAll.size());
for (SpanProcessor spanProcessor : spanProcessorsAll) {
if (spanProcessor.isStartRequired()) {
spanProcessorsStart.add(spanProcessor);
}
if (spanProcessor.isOnEndingRequired()) {
spanProcessorsBeforeEnd.add(spanProcessor);
}
if (spanProcessor.isEndRequired()) {
spanProcessorsEnd.add(spanProcessor);
}
Expand Down
43 changes: 26 additions & 17 deletions sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,14 @@ final class SdkSpan implements ReadWriteSpan {
@GuardedBy("lock")
private long endEpochNanos;

// True if the span is ended.
private enum EndState {
NOT_ENDED,
ENDING,
ENDED
}

@GuardedBy("lock")
private boolean hasEnded;
private EndState hasEnded;

private SdkSpan(
SpanContext context,
Expand All @@ -122,7 +127,7 @@ private SdkSpan(
this.kind = kind;
this.spanProcessor = spanProcessor;
this.resource = resource;
this.hasEnded = false;
this.hasEnded = EndState.NOT_ENDED;
this.clock = clock;
this.startEpochNanos = startEpochNanos;
this.attributes = attributes;
Expand Down Expand Up @@ -220,7 +225,7 @@ public SpanData toSpanData() {
status,
name,
endEpochNanos,
hasEnded);
hasEnded == EndState.ENDED);
}
}

Expand All @@ -242,7 +247,7 @@ public Attributes getAttributes() {
@Override
public boolean hasEnded() {
synchronized (lock) {
return hasEnded;
return hasEnded == EndState.ENDED;
}
}

Expand Down Expand Up @@ -288,7 +293,7 @@ public InstrumentationScopeInfo getInstrumentationScopeInfo() {
@Override
public long getLatencyNanos() {
synchronized (lock) {
return (hasEnded ? endEpochNanos : clock.now()) - startEpochNanos;
return (hasEnded == EndState.NOT_ENDED ? clock.now() : endEpochNanos) - startEpochNanos;
}
}

Expand All @@ -303,7 +308,7 @@ public <T> ReadWriteSpan setAttribute(AttributeKey<T> key, T value) {
return this;
}
synchronized (lock) {
if (hasEnded) {
if (hasEnded == EndState.ENDED) {
logger.log(Level.FINE, "Calling setAttribute() on an ended Span.");
return this;
}
Expand Down Expand Up @@ -380,7 +385,7 @@ public ReadWriteSpan addEvent(String name, Attributes attributes, long timestamp

private void addTimedEvent(EventData timedEvent) {
synchronized (lock) {
if (hasEnded) {
if (hasEnded == EndState.ENDED) {
logger.log(Level.FINE, "Calling addEvent() on an ended Span.");
return;
}
Expand All @@ -400,7 +405,7 @@ public ReadWriteSpan setStatus(StatusCode statusCode, @Nullable String descripti
return this;
}
synchronized (lock) {
if (hasEnded) {
if (hasEnded == EndState.ENDED) {
logger.log(Level.FINE, "Calling setStatus() on an ended Span.");
return this;
} else if (this.status.getStatusCode() == StatusCode.OK) {
Expand Down Expand Up @@ -438,7 +443,7 @@ public ReadWriteSpan updateName(String name) {
return this;
}
synchronized (lock) {
if (hasEnded) {
if (hasEnded == EndState.ENDED) {
logger.log(Level.FINE, "Calling updateName() on an ended Span.");
return this;
}
Expand All @@ -463,7 +468,7 @@ public Span addLink(SpanContext spanContext, Attributes attributes) {
spanLimits.getMaxNumberOfAttributesPerLink(),
spanLimits.getMaxAttributeValueLength()));
synchronized (lock) {
if (hasEnded) {
if (hasEnded == EndState.ENDED) {
logger.log(Level.FINE, "Calling addLink() on an ended Span.");
return this;
}
Expand Down Expand Up @@ -493,12 +498,16 @@ public void end(long timestamp, TimeUnit unit) {

private void endInternal(long endEpochNanos) {
synchronized (lock) {
if (hasEnded) {
logger.log(Level.FINE, "Calling end() on an ended Span.");
if (hasEnded != EndState.NOT_ENDED) {
logger.log(Level.FINE, "Calling end() on an ended or ending Span.");
return;
}
hasEnded = EndState.ENDING;
this.endEpochNanos = endEpochNanos;
hasEnded = true;
if (spanProcessor.isOnEndingRequired()) {
spanProcessor.onEnding(this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's probably a mistake to be calling this under the lock. I believe we only want to do internal state management while the lock is being held, not call not-under-our-control methods provided by users.

Copy link
Contributor Author

@JonasKunz JonasKunz Aug 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock was actually intentionally hold to cover this part of the spec:

The SDK MUST guarantee that the span can no longer be modified by any other thread
before invoking OnEnding of the first SpanProcessor. From that point on, modifications
are only allowed synchronously from within the invoked OnEnding callbacks.

This part was added to the spec so that SpanProcessors can be sure that the state they see the span in is actually consistent and not prone to race conditions due to span modifications from the application code.

However, I now switched to a different way of implementing this, because as you said exposing the lock is probably not the best way of achieving the desired safety here.

}
hasEnded = EndState.ENDED;
}
if (spanProcessor.isEndRequired()) {
spanProcessor.onEnd(this);
Expand All @@ -508,7 +517,7 @@ private void endInternal(long endEpochNanos) {
@Override
public boolean isRecording() {
synchronized (lock) {
return !hasEnded;
return hasEnded != EndState.ENDED;
}
}

Expand All @@ -533,7 +542,7 @@ private List<EventData> getImmutableTimedEvents() {

// if the span has ended, then the events are unmodifiable
// so we can return them directly and save copying all the data.
if (hasEnded) {
if (hasEnded == EndState.ENDED) {
return Collections.unmodifiableList(events);
}

Expand All @@ -547,7 +556,7 @@ private Attributes getImmutableAttributes() {
}
// if the span has ended, then the attributes are unmodifiable,
// so we can return them directly and save copying all the data.
if (hasEnded) {
if (hasEnded == EndState.ENDED) {
return attributes;
}
// otherwise, make a copy of the data into an immutable container.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,32 @@
*/
boolean isEndRequired();

/**
* Called just before a {@link io.opentelemetry.api.trace.Span} is ended, if the {@link
* Span#isRecording()} returns true. This means that the span will still be mutable. Note that the
* span will only be modifiable synchronously from this callback, concurrent modifications from
* other threads will be prevented.
*
* <p>This method is called synchronously on the execution thread, should not throw or block the
* execution thread.
*
* <p>Note: This method is experimental and might be subject to future changes.
*
* @param span the {@code Span} that is just about to be ended.
*/
default void onEnding(ReadWriteSpan span) {}

Check warning on line 101 in sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java

View check run for this annotation

Codecov / codecov/patch

sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java#L101

Added line #L101 was not covered by tests
JonasKunz marked this conversation as resolved.
Show resolved Hide resolved

/**
* Returns {@code true} if this {@link SpanProcessor} requires onEnding events.
*
* <p>Note: This method is experimental and might be subject to future changes.
*
* @return {@code true} if this {@link SpanProcessor} requires onEnding events.
*/
default boolean isOnEndingRequired() {
return false;
}

/**
* Processes all span events that have not yet been processed and closes used resources.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ class MultiSpanProcessorTest {
@BeforeEach
void setUp() {
when(spanProcessor1.isStartRequired()).thenReturn(true);
when(spanProcessor1.isOnEndingRequired()).thenReturn(true);
when(spanProcessor1.isEndRequired()).thenReturn(true);
when(spanProcessor1.forceFlush()).thenReturn(CompletableResultCode.ofSuccess());
when(spanProcessor1.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
when(spanProcessor2.isStartRequired()).thenReturn(true);
when(spanProcessor2.isOnEndingRequired()).thenReturn(true);
when(spanProcessor2.isEndRequired()).thenReturn(true);
when(spanProcessor2.forceFlush()).thenReturn(CompletableResultCode.ofSuccess());
when(spanProcessor2.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
Expand Down Expand Up @@ -67,6 +69,10 @@ void twoSpanProcessor() {
verify(spanProcessor1).onStart(same(Context.root()), same(readWriteSpan));
verify(spanProcessor2).onStart(same(Context.root()), same(readWriteSpan));

multiSpanProcessor.onEnding(readWriteSpan);
verify(spanProcessor1).onEnding(same(readWriteSpan));
verify(spanProcessor2).onEnding(same(readWriteSpan));

multiSpanProcessor.onEnd(readableSpan);
verify(spanProcessor1).onEnd(same(readableSpan));
verify(spanProcessor2).onEnd(same(readableSpan));
Expand All @@ -83,6 +89,7 @@ void twoSpanProcessor() {
@Test
void twoSpanProcessor_DifferentRequirements() {
when(spanProcessor1.isEndRequired()).thenReturn(false);
when(spanProcessor2.isOnEndingRequired()).thenReturn(false);
when(spanProcessor2.isStartRequired()).thenReturn(false);
SpanProcessor multiSpanProcessor =
SpanProcessor.composite(Arrays.asList(spanProcessor1, spanProcessor2));
Expand All @@ -94,6 +101,10 @@ void twoSpanProcessor_DifferentRequirements() {
verify(spanProcessor1).onStart(same(Context.root()), same(readWriteSpan));
verify(spanProcessor2, times(0)).onStart(any(Context.class), any(ReadWriteSpan.class));

multiSpanProcessor.onEnding(readWriteSpan);
verify(spanProcessor1).onEnding(same(readWriteSpan));
verify(spanProcessor2, times(0)).onEnding(any(ReadWriteSpan.class));

multiSpanProcessor.onEnd(readableSpan);
verify(spanProcessor1, times(0)).onEnd(any(ReadableSpan.class));
verify(spanProcessor2).onEnd(same(readableSpan));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -58,6 +60,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -107,6 +111,7 @@ void setUp() {
expectedAttributes = builder.build();
testClock = TestClock.create(Instant.ofEpochSecond(0, START_EPOCH_NANOS));
when(spanProcessor.isStartRequired()).thenReturn(true);
when(spanProcessor.isOnEndingRequired()).thenReturn(true);
when(spanProcessor.isEndRequired()).thenReturn(true);
}

Expand Down Expand Up @@ -140,6 +145,60 @@ void endSpanTwice_DoNotCrash() {
assertThat(span.hasEnded()).isTrue();
}

@Test
void beforeEnd_spanStillMutable() {
SdkSpan span = createTestSpan(SpanKind.INTERNAL);

AttributeKey<String> dummyAttrib = AttributeKey.stringKey("processor_foo");

AtomicBoolean endedStateInProcessor = new AtomicBoolean();
doAnswer(
invocation -> {
ReadWriteSpan sp = invocation.getArgument(0, ReadWriteSpan.class);
assertThat(sp.hasEnded()).isFalse();
sp.end(); // should have no effect, nested end should be detected
endedStateInProcessor.set(sp.hasEnded());
sp.setAttribute(dummyAttrib, "bar");
return null;
})
.when(spanProcessor)
.onEnding(any());

span.end();
verify(spanProcessor).onEnding(same(span));
assertThat(span.hasEnded()).isTrue();
assertThat(endedStateInProcessor.get()).isFalse();
assertThat(span.getAttribute(dummyAttrib)).isEqualTo("bar");
}

@Test
void beforeEnd_latencyPinned() {
SdkSpan span = createTestSpan(SpanKind.INTERNAL);

AtomicLong spanLatencyInProcessor = new AtomicLong();
doAnswer(
invocation -> {
ReadWriteSpan sp = invocation.getArgument(0, ReadWriteSpan.class);

testClock.advance(Duration.ofSeconds(100));
spanLatencyInProcessor.set(sp.getLatencyNanos());
return null;
})
.when(spanProcessor)
.onEnding(any());

testClock.advance(Duration.ofSeconds(1));
long expectedDuration = testClock.now() - START_EPOCH_NANOS;

assertThat(span.getLatencyNanos()).isEqualTo(expectedDuration);

span.end();
verify(spanProcessor).onEnding(same(span));
assertThat(span.hasEnded()).isTrue();
assertThat(span.getLatencyNanos()).isEqualTo(expectedDuration);
assertThat(spanLatencyInProcessor.get()).isEqualTo(expectedDuration);
}

@Test
void toSpanData_ActiveSpan() {
SdkSpan span = createTestSpan(SpanKind.INTERNAL);
Expand Down
Loading