Skip to content

Commit

Permalink
Have only single thread local .get() call on recording hot path (#142)
Browse files Browse the repository at this point in the history
* Unite thread local calls WIP
  • Loading branch information
0xaa4eb committed Jul 9, 2024
1 parent 74efc44 commit 50a4532
Show file tree
Hide file tree
Showing 16 changed files with 408 additions and 297 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ private AgentContext() {
.classpath(new Classpath().toList())
.build();
this.typeResolver = ReflectionBasedTypeResolver.getInstance();
this.recordingEventQueue = new RecordingEventQueue(settings, typeResolver, new AgentDataWriter(recordingDataWriter, methodRepository), metrics);
this.recorder = new Recorder(methodRepository, startRecordingPolicy, recordingEventQueue, metrics);
this.recordingEventQueue = new RecordingEventQueue(typeResolver, new AgentDataWriter(recordingDataWriter, methodRepository), metrics);
this.recorder = new Recorder(settings, typeResolver, methodRepository, startRecordingPolicy, recordingEventQueue, metrics);

if (settings.getBindNetworkAddress() != null) {
apiServer = AgentApiBootstrap.bootstrap(
Expand Down
124 changes: 76 additions & 48 deletions ulyp-agent-core/src/main/java/com/ulyp/agent/Recorder.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import com.ulyp.agent.policy.StartRecordingPolicy;
import com.ulyp.agent.queue.RecordingEventQueue;
import com.ulyp.agent.util.RecordingStateStore;
import com.ulyp.core.*;
import com.ulyp.core.metrics.Counter;
import com.ulyp.core.metrics.Metrics;
import com.ulyp.core.util.BitUtil;
import com.ulyp.core.util.LoggingSettings;

import lombok.Getter;
Expand All @@ -26,7 +28,6 @@
@ThreadSafe
public class Recorder {

public static final AtomicInteger recordingIdGenerator = new AtomicInteger(-1);
/**
* Keeps current active recording session count. Based on the fact that most of the time there is no
* recording sessions and this counter is equal to 0, it's possible to make a small performance optimization.
Expand All @@ -36,18 +37,25 @@ public class Recorder {
*/
public static final AtomicInteger currentRecordingSessionCount = new AtomicInteger();

private final Settings settings;
private final TypeResolver typeResolver;
private final MethodRepository methodRepository;
private final ThreadLocal<RecordingState> threadLocalRecordingState = new ThreadLocal<>();
private final RecordingStateStore recordingStateStore = new RecordingStateStore();
private final StartRecordingPolicy startRecordingPolicy;
@Getter
private final RecordingEventQueue recordingEventQueue;
private final Counter recordingsCounter;

public Recorder(
MethodRepository methodRepository,
StartRecordingPolicy startRecordingPolicy,
RecordingEventQueue recordingEventQueue,
Metrics metrics) {
Settings settings,
TypeResolver typeResolver,
MethodRepository methodRepository,
StartRecordingPolicy startRecordingPolicy,
RecordingEventQueue recordingEventQueue,
Metrics metrics) {
this.settings = settings;
this.typeResolver = typeResolver;
this.methodRepository = methodRepository;
this.recordingEventQueue = recordingEventQueue;
this.startRecordingPolicy = startRecordingPolicy;
Expand All @@ -59,6 +67,15 @@ public boolean recordingIsActiveInCurrentThread() {
return recordingState != null && recordingState.isEnabled();
}

public RecordingState getCurrentRecordingState() {
RecordingState recordingState = threadLocalRecordingState.get();
if (recordingState != null && recordingState.isEnabled()) {
return recordingState;
} else {
return null;
}
}

/**
* Allows disabling recording temporary so that no recording is done. Currently, is only used in logging
* facilities in order to avoid unneeded recording calls while logging.
Expand All @@ -78,15 +95,15 @@ public void disableRecording() {
public void enableRecording() {
RecordingState recordingState = threadLocalRecordingState.get();
if (recordingState != null) {
if (recordingState.getRecordingId() >= 0) {
if (recordingState.getRecordingId() > 0) {
recordingState.setEnabled(true);
} else {
threadLocalRecordingState.set(null);
}
}
}

public int startRecordingOnMethodEnter(int methodId, @Nullable Object callee, Object[] args) {
public long startRecordingOnMethodEnter(int methodId, @Nullable Object callee, Object[] args) {
if (startRecordingPolicy.canStartRecording()) {
RecordingState recordingState = initializeRecordingState(methodId);

Expand All @@ -96,50 +113,35 @@ public int startRecordingOnMethodEnter(int methodId, @Nullable Object callee, Ob
}
}

public int startRecordingOnConstructorEnter(int methodId, Object[] args) {
if (startRecordingPolicy.canStartRecording()) {
RecordingState recordingState = initializeRecordingState(methodId);

return onConstructorEnter(recordingState, methodId, args);
} else {
return -1;
}
}

@NotNull
private RecordingState initializeRecordingState(int methodId) {
RecordingState recordingState = threadLocalRecordingState.get();
if (recordingState == null) {
recordingState = new RecordingState();
recordingState.setEnabled(false);
RecordingMetadata recordingMetadata = generateRecordingMetadata();
int recordingId = recordingStateStore.add(recordingState);
RecordingMetadata recordingMetadata = generateRecordingMetadata(recordingId);
recordingState.setRecordingMetadata(recordingMetadata);
threadLocalRecordingState.set(recordingState);
RecordingEventBuffer recordingEventBuffer = new RecordingEventBuffer(recordingMetadata.getId(), settings, typeResolver);
recordingState.setEventBuffer(recordingEventBuffer);

currentRecordingSessionCount.incrementAndGet();
if (LoggingSettings.DEBUG_ENABLED) {
log.debug("Started recording {} at method {}", recordingMetadata.getId(), methodRepository.get(methodId));
}
recordingsCounter.inc();
recordingState.setEnabled(true);
recordingEventQueue.enqueueRecordingStarted(recordingMetadata);
recordingEventBuffer.appendRecordingStartedEvent(recordingMetadata);
}
return recordingState;
}

public int onConstructorEnter(int methodId, Object[] args) {
return onMethodEnter(threadLocalRecordingState.get(), methodId, null, args);
}

public int onConstructorEnter(RecordingState recordingState, int methodId, Object[] args) {
return onMethodEnter(recordingState, methodId, null, args);
}

public int onMethodEnter(int methodId, @Nullable Object callee, Object[] args) {
return onMethodEnter(threadLocalRecordingState.get(), methodId, callee, args);
}

public int onMethodEnter(RecordingState recordingState, int methodId, @Nullable Object callee, Object[] args) {
/**
* @return call token which should be passed back to method {@link Recorder#onMethodEnter} when the corresponding
* method completes
*/
public long onMethodEnter(RecordingState recordingState, int methodId, @Nullable Object callee, Object[] args) {
try {
if (recordingState == null || !recordingState.isEnabled()) {
return -1;
Expand All @@ -148,12 +150,17 @@ public int onMethodEnter(RecordingState recordingState, int methodId, @Nullable
try {
recordingState.setEnabled(false);
int callId = recordingState.nextCallId();
RecordingEventBuffer eventBuffer = recordingState.getEventBuffer();
if (Settings.TIMESTAMPS_ENABLED) {
recordingEventQueue.enqueueMethodEnter(recordingState.getRecordingId(), callId, methodId, callee, args, System.nanoTime());
eventBuffer.appendMethodEnterEvent(callId, methodId, callee, args, System.nanoTime());
} else {
recordingEventQueue.enqueueMethodEnter(recordingState.getRecordingId(), callId, methodId, callee, args);
eventBuffer.appendMethodEnterEvent(callId, methodId, callee, args);
}
return callId;
if (eventBuffer.isFull()) {
recordingEventQueue.enqueue(eventBuffer);
eventBuffer.reset();
}
return BitUtil.longFromInts(recordingState.getRecordingId(), callId);
} finally {
recordingState.setEnabled(true);
}
Expand All @@ -163,28 +170,27 @@ public int onMethodEnter(RecordingState recordingState, int methodId, @Nullable
}
}

public void onConstructorExit(int methodId, Object result, int callId) {
onMethodExit(methodId, result, null, callId);
}

public void onMethodExit(int methodId, Object result, Throwable thrown, int callId) {
public void onMethodExit(int methodId, Object result, Throwable thrown, long callToken) {
try {
RecordingState recordingState = threadLocalRecordingState.get();
int recordingId = recordingId(callToken);
int callId = callId(callToken);
RecordingState recordingState = recordingStateStore.get(recordingId);
if (recordingState == null || !recordingState.isEnabled()) return;

try {
recordingState.setEnabled(false);

RecordingEventBuffer eventBuffer = recordingState.getEventBuffer();
if (Settings.TIMESTAMPS_ENABLED) {
recordingEventQueue.enqueueMethodExit(recordingState.getRecordingId(), callId, thrown != null ? thrown : result, thrown != null, System.nanoTime());
eventBuffer.appendMethodExitEvent(callId, thrown != null ? thrown : result, thrown != null, System.nanoTime());
} else {
recordingEventQueue.enqueueMethodExit(recordingState.getRecordingId(), callId, thrown != null ? thrown : result, thrown != null);
eventBuffer.appendMethodExitEvent(callId, thrown != null ? thrown : result, thrown != null);
}

if (callId == RecordingState.ROOT_CALL_RECORDING_ID) {
int recordingId = recordingState.getRecordingId();
recordingEventQueue.enqueueRecordingFinished(recordingId, System.currentTimeMillis());
recordingEventQueue.flush(recordingId);
eventBuffer.appendRecordingFinishedEvent(System.currentTimeMillis());
recordingEventQueue.enqueue(eventBuffer);
recordingStateStore.remove(recordingId);
threadLocalRecordingState.set(null);
currentRecordingSessionCount.decrementAndGet();
if (LoggingSettings.DEBUG_ENABLED) {
Expand All @@ -195,6 +201,11 @@ public void onMethodExit(int methodId, Object result, Throwable thrown, int call
recordingState.getCallId()
);
}
} else {
if (eventBuffer.isFull()) {
recordingEventQueue.enqueue(eventBuffer);
eventBuffer.reset();
}
}
} finally {
recordingState.setEnabled(true);
Expand All @@ -204,14 +215,22 @@ public void onMethodExit(int methodId, Object result, Throwable thrown, int call
}
}

private RecordingMetadata generateRecordingMetadata() {
private int recordingId(long callToken) {
return (int) (callToken >> 32);
}

private int callId(long callToken) {
return (int) callToken;
}

private RecordingMetadata generateRecordingMetadata(int recordingId) {
List<String> stackTraceElements = Stream.of(new Exception().getStackTrace())
.skip(2)
.map(StackTraceElement::toString)
.collect(Collectors.toList());

return RecordingMetadata.builder()
.id(recordingIdGenerator.incrementAndGet())
.id(recordingId)
.recordingStartedMillis(System.currentTimeMillis())
.logCreatedEpochMillis(System.currentTimeMillis())
.threadId(Thread.currentThread().getId())
Expand All @@ -220,6 +239,15 @@ private RecordingMetadata generateRecordingMetadata() {
.build();
}

/**
* @return call token which should be passed back to method {@link Recorder#onMethodEnter} when the corresponding
* method completes
*/
@TestOnly
public long onMethodEnter(int methodId, @Nullable Object callee, Object[] args) {
return onMethodEnter(threadLocalRecordingState.get(), methodId, callee, args);
}

@TestOnly
RecordingState getRecordingState() {
return threadLocalRecordingState.get();
Expand Down
Loading

0 comments on commit 50a4532

Please sign in to comment.