diff --git a/ulyp-agent-core/src/main/java/com/ulyp/agent/AgentContext.java b/ulyp-agent-core/src/main/java/com/ulyp/agent/AgentContext.java index 5f913eda..d35b998c 100644 --- a/ulyp-agent-core/src/main/java/com/ulyp/agent/AgentContext.java +++ b/ulyp-agent-core/src/main/java/com/ulyp/agent/AgentContext.java @@ -65,8 +65,8 @@ private AgentContext() { .classpath(new Classpath().toList()) .build(); this.typeResolver = ReflectionBasedTypeResolver.getInstance(); - this.recordingEventQueue = new RecordingEventQueue(settings, typeResolver, new AgentDataWriter(recordingDataWriter, methodRepository), metrics); - this.recorder = new Recorder(methodRepository, startRecordingPolicy, recordingEventQueue, metrics); + this.recordingEventQueue = new RecordingEventQueue(typeResolver, new AgentDataWriter(recordingDataWriter, methodRepository), metrics); + this.recorder = new Recorder(settings, typeResolver, methodRepository, startRecordingPolicy, recordingEventQueue, metrics); if (settings.getBindNetworkAddress() != null) { apiServer = AgentApiBootstrap.bootstrap( diff --git a/ulyp-agent-core/src/main/java/com/ulyp/agent/Recorder.java b/ulyp-agent-core/src/main/java/com/ulyp/agent/Recorder.java index 57485127..abf7eac2 100644 --- a/ulyp-agent-core/src/main/java/com/ulyp/agent/Recorder.java +++ b/ulyp-agent-core/src/main/java/com/ulyp/agent/Recorder.java @@ -2,9 +2,11 @@ import com.ulyp.agent.policy.StartRecordingPolicy; import com.ulyp.agent.queue.RecordingEventQueue; +import com.ulyp.agent.util.RecordingStateStore; import com.ulyp.core.*; import com.ulyp.core.metrics.Counter; import com.ulyp.core.metrics.Metrics; +import com.ulyp.core.util.BitUtil; import com.ulyp.core.util.LoggingSettings; import lombok.Getter; @@ -26,7 +28,6 @@ @ThreadSafe public class Recorder { - public static final AtomicInteger recordingIdGenerator = new AtomicInteger(-1); /** * Keeps current active recording session count. Based on the fact that most of the time there is no * recording sessions and this counter is equal to 0, it's possible to make a small performance optimization. @@ -36,18 +37,25 @@ public class Recorder { */ public static final AtomicInteger currentRecordingSessionCount = new AtomicInteger(); + private final Settings settings; + private final TypeResolver typeResolver; private final MethodRepository methodRepository; private final ThreadLocal threadLocalRecordingState = new ThreadLocal<>(); + private final RecordingStateStore recordingStateStore = new RecordingStateStore(); private final StartRecordingPolicy startRecordingPolicy; @Getter private final RecordingEventQueue recordingEventQueue; private final Counter recordingsCounter; public Recorder( - MethodRepository methodRepository, - StartRecordingPolicy startRecordingPolicy, - RecordingEventQueue recordingEventQueue, - Metrics metrics) { + Settings settings, + TypeResolver typeResolver, + MethodRepository methodRepository, + StartRecordingPolicy startRecordingPolicy, + RecordingEventQueue recordingEventQueue, + Metrics metrics) { + this.settings = settings; + this.typeResolver = typeResolver; this.methodRepository = methodRepository; this.recordingEventQueue = recordingEventQueue; this.startRecordingPolicy = startRecordingPolicy; @@ -59,6 +67,15 @@ public boolean recordingIsActiveInCurrentThread() { return recordingState != null && recordingState.isEnabled(); } + public RecordingState getCurrentRecordingState() { + RecordingState recordingState = threadLocalRecordingState.get(); + if (recordingState != null && recordingState.isEnabled()) { + return recordingState; + } else { + return null; + } + } + /** * Allows disabling recording temporary so that no recording is done. Currently, is only used in logging * facilities in order to avoid unneeded recording calls while logging. @@ -78,7 +95,7 @@ public void disableRecording() { public void enableRecording() { RecordingState recordingState = threadLocalRecordingState.get(); if (recordingState != null) { - if (recordingState.getRecordingId() >= 0) { + if (recordingState.getRecordingId() > 0) { recordingState.setEnabled(true); } else { threadLocalRecordingState.set(null); @@ -86,7 +103,7 @@ public void enableRecording() { } } - public int startRecordingOnMethodEnter(int methodId, @Nullable Object callee, Object[] args) { + public long startRecordingOnMethodEnter(int methodId, @Nullable Object callee, Object[] args) { if (startRecordingPolicy.canStartRecording()) { RecordingState recordingState = initializeRecordingState(methodId); @@ -96,25 +113,18 @@ public int startRecordingOnMethodEnter(int methodId, @Nullable Object callee, Ob } } - public int startRecordingOnConstructorEnter(int methodId, Object[] args) { - if (startRecordingPolicy.canStartRecording()) { - RecordingState recordingState = initializeRecordingState(methodId); - - return onConstructorEnter(recordingState, methodId, args); - } else { - return -1; - } - } - @NotNull private RecordingState initializeRecordingState(int methodId) { RecordingState recordingState = threadLocalRecordingState.get(); if (recordingState == null) { recordingState = new RecordingState(); recordingState.setEnabled(false); - RecordingMetadata recordingMetadata = generateRecordingMetadata(); + int recordingId = recordingStateStore.add(recordingState); + RecordingMetadata recordingMetadata = generateRecordingMetadata(recordingId); recordingState.setRecordingMetadata(recordingMetadata); threadLocalRecordingState.set(recordingState); + RecordingEventBuffer recordingEventBuffer = new RecordingEventBuffer(recordingMetadata.getId(), settings, typeResolver); + recordingState.setEventBuffer(recordingEventBuffer); currentRecordingSessionCount.incrementAndGet(); if (LoggingSettings.DEBUG_ENABLED) { @@ -122,24 +132,16 @@ private RecordingState initializeRecordingState(int methodId) { } recordingsCounter.inc(); recordingState.setEnabled(true); - recordingEventQueue.enqueueRecordingStarted(recordingMetadata); + recordingEventBuffer.appendRecordingStartedEvent(recordingMetadata); } return recordingState; } - public int onConstructorEnter(int methodId, Object[] args) { - return onMethodEnter(threadLocalRecordingState.get(), methodId, null, args); - } - - public int onConstructorEnter(RecordingState recordingState, int methodId, Object[] args) { - return onMethodEnter(recordingState, methodId, null, args); - } - - public int onMethodEnter(int methodId, @Nullable Object callee, Object[] args) { - return onMethodEnter(threadLocalRecordingState.get(), methodId, callee, args); - } - - public int onMethodEnter(RecordingState recordingState, int methodId, @Nullable Object callee, Object[] args) { + /** + * @return call token which should be passed back to method {@link Recorder#onMethodEnter} when the corresponding + * method completes + */ + public long onMethodEnter(RecordingState recordingState, int methodId, @Nullable Object callee, Object[] args) { try { if (recordingState == null || !recordingState.isEnabled()) { return -1; @@ -148,12 +150,17 @@ public int onMethodEnter(RecordingState recordingState, int methodId, @Nullable try { recordingState.setEnabled(false); int callId = recordingState.nextCallId(); + RecordingEventBuffer eventBuffer = recordingState.getEventBuffer(); if (Settings.TIMESTAMPS_ENABLED) { - recordingEventQueue.enqueueMethodEnter(recordingState.getRecordingId(), callId, methodId, callee, args, System.nanoTime()); + eventBuffer.appendMethodEnterEvent(callId, methodId, callee, args, System.nanoTime()); } else { - recordingEventQueue.enqueueMethodEnter(recordingState.getRecordingId(), callId, methodId, callee, args); + eventBuffer.appendMethodEnterEvent(callId, methodId, callee, args); } - return callId; + if (eventBuffer.isFull()) { + recordingEventQueue.enqueue(eventBuffer); + eventBuffer.reset(); + } + return BitUtil.longFromInts(recordingState.getRecordingId(), callId); } finally { recordingState.setEnabled(true); } @@ -163,28 +170,27 @@ public int onMethodEnter(RecordingState recordingState, int methodId, @Nullable } } - public void onConstructorExit(int methodId, Object result, int callId) { - onMethodExit(methodId, result, null, callId); - } - - public void onMethodExit(int methodId, Object result, Throwable thrown, int callId) { + public void onMethodExit(int methodId, Object result, Throwable thrown, long callToken) { try { - RecordingState recordingState = threadLocalRecordingState.get(); + int recordingId = recordingId(callToken); + int callId = callId(callToken); + RecordingState recordingState = recordingStateStore.get(recordingId); if (recordingState == null || !recordingState.isEnabled()) return; try { recordingState.setEnabled(false); + RecordingEventBuffer eventBuffer = recordingState.getEventBuffer(); if (Settings.TIMESTAMPS_ENABLED) { - recordingEventQueue.enqueueMethodExit(recordingState.getRecordingId(), callId, thrown != null ? thrown : result, thrown != null, System.nanoTime()); + eventBuffer.appendMethodExitEvent(callId, thrown != null ? thrown : result, thrown != null, System.nanoTime()); } else { - recordingEventQueue.enqueueMethodExit(recordingState.getRecordingId(), callId, thrown != null ? thrown : result, thrown != null); + eventBuffer.appendMethodExitEvent(callId, thrown != null ? thrown : result, thrown != null); } if (callId == RecordingState.ROOT_CALL_RECORDING_ID) { - int recordingId = recordingState.getRecordingId(); - recordingEventQueue.enqueueRecordingFinished(recordingId, System.currentTimeMillis()); - recordingEventQueue.flush(recordingId); + eventBuffer.appendRecordingFinishedEvent(System.currentTimeMillis()); + recordingEventQueue.enqueue(eventBuffer); + recordingStateStore.remove(recordingId); threadLocalRecordingState.set(null); currentRecordingSessionCount.decrementAndGet(); if (LoggingSettings.DEBUG_ENABLED) { @@ -195,6 +201,11 @@ public void onMethodExit(int methodId, Object result, Throwable thrown, int call recordingState.getCallId() ); } + } else { + if (eventBuffer.isFull()) { + recordingEventQueue.enqueue(eventBuffer); + eventBuffer.reset(); + } } } finally { recordingState.setEnabled(true); @@ -204,14 +215,22 @@ public void onMethodExit(int methodId, Object result, Throwable thrown, int call } } - private RecordingMetadata generateRecordingMetadata() { + private int recordingId(long callToken) { + return (int) (callToken >> 32); + } + + private int callId(long callToken) { + return (int) callToken; + } + + private RecordingMetadata generateRecordingMetadata(int recordingId) { List stackTraceElements = Stream.of(new Exception().getStackTrace()) .skip(2) .map(StackTraceElement::toString) .collect(Collectors.toList()); return RecordingMetadata.builder() - .id(recordingIdGenerator.incrementAndGet()) + .id(recordingId) .recordingStartedMillis(System.currentTimeMillis()) .logCreatedEpochMillis(System.currentTimeMillis()) .threadId(Thread.currentThread().getId()) @@ -220,6 +239,15 @@ private RecordingMetadata generateRecordingMetadata() { .build(); } + /** + * @return call token which should be passed back to method {@link Recorder#onMethodEnter} when the corresponding + * method completes + */ + @TestOnly + public long onMethodEnter(int methodId, @Nullable Object callee, Object[] args) { + return onMethodEnter(threadLocalRecordingState.get(), methodId, callee, args); + } + @TestOnly RecordingState getRecordingState() { return threadLocalRecordingState.get(); diff --git a/ulyp-agent-core/src/main/java/com/ulyp/agent/RecordingEventBuffer.java b/ulyp-agent-core/src/main/java/com/ulyp/agent/RecordingEventBuffer.java new file mode 100644 index 00000000..860d7db2 --- /dev/null +++ b/ulyp-agent-core/src/main/java/com/ulyp/agent/RecordingEventBuffer.java @@ -0,0 +1,158 @@ +package com.ulyp.agent; + +import com.ulyp.agent.queue.events.*; +import com.ulyp.core.RecordingMetadata; +import com.ulyp.core.Type; +import com.ulyp.core.TypeResolver; +import com.ulyp.core.bytes.BufferBytesOut; +import com.ulyp.core.recorders.*; +import com.ulyp.core.util.LoggingSettings; +import com.ulyp.core.util.SystemPropertyUtil; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.agrona.concurrent.UnsafeBuffer; +import org.jetbrains.annotations.Nullable; + +import java.util.ArrayList; +import java.util.List; + +/** + * Thread-local buffer for recording events. Recording threads gather some number of + * events in into such buffers and post them to the background thread. + */ +@Getter +@Slf4j +public class RecordingEventBuffer { + + private static final int MAX_BUFFER_SIZE = SystemPropertyUtil.getInt("ulyp.recording.max-buffer-size", 256); + private static final int TMP_BUFFER_SIZE = SystemPropertyUtil.getInt("ulyp.recording.tmp-buffer.size", 16 * 1024); + + @Getter + private final int recordingId; + private final boolean performanceMode; + private final TypeResolver typeResolver; + @Getter + private List events; + private byte[] tmpBuffer; + + public RecordingEventBuffer(int recordingId, Settings settings, TypeResolver typeResolver) { + this.recordingId = recordingId; + this.performanceMode = settings.isPerformanceModeEnabled(); + this.typeResolver = typeResolver; + this.events = new ArrayList<>(MAX_BUFFER_SIZE); + } + + public void reset() { + events = new ArrayList<>(MAX_BUFFER_SIZE); + } + + public boolean isEmpty() { + return events.isEmpty(); + } + + public boolean isFull() { + return events.size() >= MAX_BUFFER_SIZE; + } + + public void add(RecordingEvent event) { + this.events.add(event); + } + + public void appendRecordingStartedEvent(RecordingMetadata recordingMetadata) { + add(new RecordingStartedEvent(recordingMetadata)); + } + + public void appendRecordingFinishedEvent(long recordingFinishedTimeMillis) { + add(new RecordingFinishedEvent(recordingFinishedTimeMillis)); + } + + public void appendMethodEnterEvent(int callId, int methodId, @Nullable Object callee, Object[] args) { + Object[] argsPrepared = prepareArgs(args); + + events.add(new EnterMethodRecordingEvent(callId, methodId, callee, argsPrepared)); + } + + public void appendMethodEnterEvent(int callId, int methodId, @Nullable Object callee, Object[] args, long nanoTime) { + Object[] argsPrepared = prepareArgs(args); + + events.add(new TimestampedEnterMethodRecordingEvent(callId, methodId, callee, argsPrepared, nanoTime)); + } + + public void appendMethodExitEvent(int callId, Object returnValue, boolean thrown) { + Object returnValuePrepared = prepareReturnValue(returnValue); + events.add(new ExitMethodRecordingEvent(callId, returnValuePrepared, thrown)); + } + + public void appendMethodExitEvent(int callId, Object returnValue, boolean thrown, long nanoTime) { + Object returnValuePrepared = prepareReturnValue(returnValue); + events.add(new TimestampedExitMethodRecordingEvent(callId, returnValuePrepared, thrown, nanoTime)); + } + + private Object prepareReturnValue(Object returnValue) { + Object returnValuePrepared; + if (performanceMode) { + returnValuePrepared = returnValue; + } else { + returnValuePrepared = convert(returnValue); + } + return returnValuePrepared; + } + + private Object[] prepareArgs(Object[] args) { + Object[] argsPrepared; + if (performanceMode) { + argsPrepared = args; + } else { + argsPrepared = convert(args); + } + return argsPrepared; + } + + private Object[] convert(Object[] args) { + for (int i = 0; i < args.length; i++) { + args[i] = convert(args[i]); + } + return args; + } + + /** + * Resolves type for an object, then checks if it can be recorded asynchronously in a background thread. Most objects + * have only their identity hash code and type id recorded, so it can be safely done concurrently in some other thread. + * Collections have a few of their items recorded (if enabled), so the recording must happen here. + */ + private Object convert(Object value) { + Type type = typeResolver.get(value); + ObjectRecorder recorder = type.getRecorderHint(); + if (value != null && recorder == null) { + recorder = RecorderChooser.getInstance().chooseForType(value.getClass()); + type.setRecorderHint(recorder); + } + if (value == null || recorder.supportsAsyncRecording()) { + if (value != null && recorder instanceof IdentityRecorder) { + return new QueuedIdentityObject(type.getId(), value); + } else { + return value; + } + } else { + BufferBytesOut output = new BufferBytesOut(new UnsafeBuffer(getTmpBuffer())); + try { + recorder.write(value, output, typeResolver); + return new QueuedRecordedObject(type, recorder.getId(), output.copy()); + } catch (Exception e) { + if (LoggingSettings.DEBUG_ENABLED) { + log.debug("Error while recording object", e); + } + return new QueuedIdentityObject(type.getId(), value); + } + } + } + + private byte[] getTmpBuffer() { + if (tmpBuffer != null) { + return tmpBuffer; + } else { + tmpBuffer = new byte[TMP_BUFFER_SIZE]; + return tmpBuffer; + } + } +} diff --git a/ulyp-agent-core/src/main/java/com/ulyp/agent/RecordingState.java b/ulyp-agent-core/src/main/java/com/ulyp/agent/RecordingState.java index 07680ff4..f31b90ea 100644 --- a/ulyp-agent-core/src/main/java/com/ulyp/agent/RecordingState.java +++ b/ulyp-agent-core/src/main/java/com/ulyp/agent/RecordingState.java @@ -5,10 +5,11 @@ import com.ulyp.core.RecordingMetadata; import lombok.Getter; -import lombok.NoArgsConstructor; import lombok.Setter; -@NoArgsConstructor +/** + * Single state for recording session. There is only single {@link ThreadLocal#get()} call for every recorded method + */ @Getter public class RecordingState { @@ -21,6 +22,13 @@ public class RecordingState { private int callId = ROOT_CALL_RECORDING_ID; @Setter private boolean enabled; + @Getter + @Setter + private RecordingEventBuffer eventBuffer; + + public RecordingState() { + + } public int nextCallId() { return callId++; diff --git a/ulyp-agent-core/src/main/java/com/ulyp/agent/queue/QueueBatchEventProcessor.java b/ulyp-agent-core/src/main/java/com/ulyp/agent/queue/QueueBatchEventProcessor.java index e7a4eda4..2bb316c4 100644 --- a/ulyp-agent-core/src/main/java/com/ulyp/agent/queue/QueueBatchEventProcessor.java +++ b/ulyp-agent-core/src/main/java/com/ulyp/agent/queue/QueueBatchEventProcessor.java @@ -1,9 +1,7 @@ package com.ulyp.agent.queue; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import com.lmax.disruptor.AlertException; @@ -29,15 +27,15 @@ public final class QueueBatchEventProcessor implements EventProcessor { private final AgentDataWriter agentDataWriter; private final Map recordingQueueProcessors = new HashMap<>(); private final AtomicInteger status = new AtomicInteger(IDLE); - private final DataProvider dataProvider; + private final DataProvider dataProvider; private final SequenceBarrier sequenceBarrier; private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); public QueueBatchEventProcessor( - DataProvider dataProvider, - SequenceBarrier sequenceBarrier, - TypeResolver typeResolver, - AgentDataWriter agentDataWriter) { + DataProvider dataProvider, + SequenceBarrier sequenceBarrier, + TypeResolver typeResolver, + AgentDataWriter agentDataWriter) { this.dataProvider = dataProvider; this.sequenceBarrier = sequenceBarrier; this.typeResolver = typeResolver; @@ -107,7 +105,7 @@ private void processEvents() { } private void processAtSeq(long sequence) { - RecordingEventBatch batch = dataProvider.get(sequence); + RecordingEventDisruptorEntry batch = dataProvider.get(sequence); try { int recordingId = batch.getRecordingId(); RecordingEventProcessor processor = recordingQueueProcessors.get(batch.getRecordingId()); diff --git a/ulyp-agent-core/src/main/java/com/ulyp/agent/queue/QueueBatchEventProcessorFactory.java b/ulyp-agent-core/src/main/java/com/ulyp/agent/queue/QueueBatchEventProcessorFactory.java index 95960111..85e3b7d1 100644 --- a/ulyp-agent-core/src/main/java/com/ulyp/agent/queue/QueueBatchEventProcessorFactory.java +++ b/ulyp-agent-core/src/main/java/com/ulyp/agent/queue/QueueBatchEventProcessorFactory.java @@ -9,7 +9,7 @@ import com.ulyp.core.TypeResolver; import lombok.Getter; -public class QueueBatchEventProcessorFactory implements EventProcessorFactory { +public class QueueBatchEventProcessorFactory implements EventProcessorFactory { private final TypeResolver typeResolver; private final AgentDataWriter agentDataWriter; @@ -22,7 +22,7 @@ public QueueBatchEventProcessorFactory(TypeResolver typeResolver, AgentDataWrite } @Override - public EventProcessor createEventProcessor(RingBuffer ringBuffer, Sequence[] barrierSequences) { + public EventProcessor createEventProcessor(RingBuffer ringBuffer, Sequence[] barrierSequences) { if (eventProcessor != null) { return eventProcessor; } diff --git a/ulyp-agent-core/src/main/java/com/ulyp/agent/queue/RecordingEventBatch.java b/ulyp-agent-core/src/main/java/com/ulyp/agent/queue/RecordingEventBatch.java deleted file mode 100644 index cfbb6e61..00000000 --- a/ulyp-agent-core/src/main/java/com/ulyp/agent/queue/RecordingEventBatch.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.ulyp.agent.queue; - -import com.ulyp.agent.queue.events.RecordingEvent; -import com.ulyp.core.util.SystemPropertyUtil; -import lombok.Getter; -import lombok.Setter; - -import java.util.ArrayList; -import java.util.List; - -/** - * A batch of events which inside every cell of the recording queue disruptor. Recording threads gather some number of - * events in thread local batch and add events until the batch is full. - */ -@Getter -public class RecordingEventBatch { - - private static final int BATCH_SIZE = SystemPropertyUtil.getInt("ulyp.recording-queue.batch-size", 256); - - @Setter - private int recordingId; - private List events; - - public RecordingEventBatch() { - this.events = null; - } - - public void reset() { - this.events = null; - } - - public void resetForUpcomingEvents() { - events = new ArrayList<>(BATCH_SIZE); - } - - public boolean isEmpty() { - return events.isEmpty(); - } - - public boolean isFull() { - return events.size() >= BATCH_SIZE; - } - - public void add(RecordingEvent event) { - this.events.add(event); - } - - public void moveFrom(RecordingEventBatch other) { - this.recordingId = other.recordingId; - this.events = other.events; - } -} diff --git a/ulyp-agent-core/src/main/java/com/ulyp/agent/queue/RecordingEventDisruptorEntry.java b/ulyp-agent-core/src/main/java/com/ulyp/agent/queue/RecordingEventDisruptorEntry.java new file mode 100644 index 00000000..d82d32e6 --- /dev/null +++ b/ulyp-agent-core/src/main/java/com/ulyp/agent/queue/RecordingEventDisruptorEntry.java @@ -0,0 +1,29 @@ +package com.ulyp.agent.queue; + +import com.ulyp.agent.RecordingEventBuffer; +import com.ulyp.agent.queue.events.RecordingEvent; +import lombok.Getter; +import lombok.Setter; + +import java.util.List; + +/** + * A batch of events which inside every cell of the recording queue disruptor. Recording threads gather some number of + * events in thread local batch and add events until the batch is full. + */ +@Getter +public class RecordingEventDisruptorEntry { + + @Setter + private int recordingId; + private List events; + + public void reset() { + this.events = null; + } + + public void moveFrom(RecordingEventBuffer batch) { + this.recordingId = batch.getRecordingId(); + this.events = batch.getEvents(); + } +} diff --git a/ulyp-agent-core/src/main/java/com/ulyp/agent/queue/RecordingEventQueue.java b/ulyp-agent-core/src/main/java/com/ulyp/agent/queue/RecordingEventQueue.java index bfbb84c1..eb6ae360 100644 --- a/ulyp-agent-core/src/main/java/com/ulyp/agent/queue/RecordingEventQueue.java +++ b/ulyp-agent-core/src/main/java/com/ulyp/agent/queue/RecordingEventQueue.java @@ -8,21 +8,12 @@ import java.util.concurrent.locks.LockSupport; import com.ulyp.agent.AgentDataWriter; -import com.ulyp.agent.Settings; +import com.ulyp.agent.RecordingEventBuffer; import com.ulyp.agent.queue.disruptor.RecordingQueueDisruptor; -import com.ulyp.agent.queue.events.*; import com.ulyp.core.metrics.Metrics; -import com.ulyp.core.recorders.*; -import com.ulyp.core.bytes.BufferBytesOut; -import com.ulyp.core.util.LoggingSettings; -import com.ulyp.core.util.ConcurrentSimpleObjectPool; import com.ulyp.core.util.SystemPropertyUtil; -import org.agrona.concurrent.UnsafeBuffer; -import org.jetbrains.annotations.Nullable; import com.lmax.disruptor.SleepingWaitStrategy; -import com.ulyp.core.RecordingMetadata; -import com.ulyp.core.Type; import com.ulyp.core.TypeResolver; import com.ulyp.core.util.NamedThreadFactory; @@ -38,33 +29,20 @@ public class RecordingEventQueue implements AutoCloseable { private static final int RECORDING_QUEUE_SIZE = SystemPropertyUtil.getInt("ulyp.recording-queue.size", 16 * 1024); - private static final int TMP_BUFFER_SIZE = SystemPropertyUtil.getInt("ulyp.recording-queue.tmp-buffer.size", 16 * 1024); - private static final int TMP_BUFFER_ENTRIES = SystemPropertyUtil.getInt("ulyp.recording-queue.tmp-buffer.entries", 8); - private final boolean performanceMode; - private final TypeResolver typeResolver; private final RecordingQueueDisruptor disruptor; private final ScheduledExecutorService scheduledExecutorService; private final QueueBatchEventProcessorFactory eventProcessorFactory; - private final ConcurrentSimpleObjectPool bufferPool; - private final ThreadLocal batchEventThreadLocal = ThreadLocal.withInitial(() -> { - RecordingEventBatch eventBatch = new RecordingEventBatch(); - eventBatch.resetForUpcomingEvents(); - return eventBatch; - }); - public RecordingEventQueue(Settings settings, TypeResolver typeResolver, AgentDataWriter agentDataWriter, Metrics metrics) { - this.typeResolver = typeResolver; - this.bufferPool = new ConcurrentSimpleObjectPool<>(TMP_BUFFER_ENTRIES, () -> new byte[TMP_BUFFER_SIZE]); + public RecordingEventQueue(TypeResolver typeResolver, AgentDataWriter agentDataWriter, Metrics metrics) { this.disruptor = new RecordingQueueDisruptor( - RecordingEventBatch::new, + RecordingEventDisruptorEntry::new, RECORDING_QUEUE_SIZE, new QueueEventHandlerThreadFactory(), new SleepingWaitStrategy(3, TimeUnit.MILLISECONDS.toNanos(1)), metrics ); this.eventProcessorFactory = new QueueBatchEventProcessorFactory(typeResolver, agentDataWriter); - this.performanceMode = settings.isPerformanceModeEnabled(); this.scheduledExecutorService = Executors.newScheduledThreadPool( 1, NamedThreadFactory.builder().name("ulyp-recorder-queue-stats-reporter").daemon(true).build() @@ -77,114 +55,8 @@ public void start() { this.disruptor.start(); } - public void enqueueRecordingStarted(RecordingMetadata recordingMetadata) { - appendEvent(recordingMetadata.getId(), new RecordingStartedEvent(recordingMetadata)); - } - - public void enqueueRecordingFinished(int recordingId, long recordingFinishedTimeMillis) { - appendEvent(recordingId, new RecordingFinishedEvent(recordingFinishedTimeMillis)); - } - - public void enqueueMethodEnter(int recordingId, int callId, int methodId, @Nullable Object callee, Object[] args) { - Object[] argsPrepared = prepareArgs(args); - - appendEvent(recordingId, new EnterMethodRecordingEvent(callId, methodId, callee, argsPrepared)); - } - - public void enqueueMethodEnter(int recordingId, int callId, int methodId, @Nullable Object callee, Object[] args, long nanoTime) { - Object[] argsPrepared = prepareArgs(args); - - appendEvent(recordingId, new TimestampedEnterMethodRecordingEvent(callId, methodId, callee, argsPrepared, nanoTime)); - } - - public void enqueueMethodExit(int recordingId, int callId, Object returnValue, boolean thrown) { - Object returnValuePrepared = prepareReturnValue(returnValue); - appendEvent(recordingId, new ExitMethodRecordingEvent(callId, returnValuePrepared, thrown)); - } - - public void enqueueMethodExit(int recordingId, int callId, Object returnValue, boolean thrown, long nanoTime) { - Object returnValuePrepared = prepareReturnValue(returnValue); - appendEvent(recordingId, new TimestampedExitMethodRecordingEvent(callId, returnValuePrepared, thrown, nanoTime)); - } - - public void flush(int recordingId) { - RecordingEventBatch eventBatch = batchEventThreadLocal.get(); - if (!eventBatch.isEmpty()) { - eventBatch.setRecordingId(recordingId); - disruptor.publish(eventBatch); - eventBatch.resetForUpcomingEvents(); - } - } - - private void appendEvent(int recordingId, RecordingEvent event) { - RecordingEventBatch eventBatch = batchEventThreadLocal.get(); - eventBatch.add(event); - if (eventBatch.isFull()) { - eventBatch.setRecordingId(recordingId); - disruptor.publish(eventBatch); - eventBatch.resetForUpcomingEvents(); - } - } - - private Object prepareReturnValue(Object returnValue) { - Object returnValuePrepared; - if (performanceMode) { - returnValuePrepared = returnValue; - } else { - returnValuePrepared = convert(returnValue); - } - return returnValuePrepared; - } - - private Object[] prepareArgs(Object[] args) { - Object[] argsPrepared; - if (performanceMode) { - argsPrepared = args; - } else { - argsPrepared = convert(args); - } - return argsPrepared; - } - - private Object[] convert(Object[] args) { - for (int i = 0; i < args.length; i++) { - args[i] = convert(args[i]); - } - return args; - } - - /** - * Resolves type for an object, then checks if it can be recorded asynchronously in a background thread. Most objects - * have only their identity hash code and type id recorded, so it can be safely done concurrently in some other thread. - * Collections have a few of their items recorded (if enabled), so the recording must happen here. - */ - private Object convert(Object value) { - Type type = typeResolver.get(value); - ObjectRecorder recorder = type.getRecorderHint(); - if (value != null && recorder == null) { - recorder = RecorderChooser.getInstance().chooseForType(value.getClass()); - type.setRecorderHint(recorder); - } - if (value == null || recorder.supportsAsyncRecording()) { - if (value != null && recorder instanceof IdentityRecorder) { - return new QueuedIdentityObject(type.getId(), value); - } else { - return value; - } - } else { - try (ConcurrentSimpleObjectPool.ObjectPoolClaim buffer = bufferPool.claim()) { - BufferBytesOut output = new BufferBytesOut(new UnsafeBuffer(buffer.get())); - try { - recorder.write(value, output, typeResolver); - return new QueuedRecordedObject(type, recorder.getId(), output.copy()); - } catch (Exception e) { - if (LoggingSettings.DEBUG_ENABLED) { - log.debug("Error while recording object", e); - } - return new QueuedIdentityObject(type.getId(), value); - } - } - } + public void enqueue(RecordingEventBuffer eventBuffer) { + disruptor.publish(eventBuffer); } public void sync(Duration duration) throws InterruptedException, TimeoutException { diff --git a/ulyp-agent-core/src/main/java/com/ulyp/agent/queue/disruptor/EventHandlerGroup.java b/ulyp-agent-core/src/main/java/com/ulyp/agent/queue/disruptor/EventHandlerGroup.java index 933d56b2..e6acc44a 100644 --- a/ulyp-agent-core/src/main/java/com/ulyp/agent/queue/disruptor/EventHandlerGroup.java +++ b/ulyp-agent-core/src/main/java/com/ulyp/agent/queue/disruptor/EventHandlerGroup.java @@ -3,18 +3,18 @@ import com.lmax.disruptor.EventProcessor; import com.lmax.disruptor.Sequence; import com.lmax.disruptor.WorkHandler; -import com.ulyp.agent.queue.RecordingEventBatch; +import com.ulyp.agent.queue.RecordingEventDisruptorEntry; import java.util.Arrays; public class EventHandlerGroup { private final RecordingQueueDisruptor disruptor; - private final ConsumerRepository consumerRepository; + private final ConsumerRepository consumerRepository; private final Sequence[] sequences; EventHandlerGroup( final RecordingQueueDisruptor disruptor, - final ConsumerRepository consumerRepository, + final ConsumerRepository consumerRepository, final Sequence[] sequences) { this.disruptor = disruptor; this.consumerRepository = consumerRepository; @@ -43,7 +43,7 @@ public EventHandlerGroup and(final EventProcessor... processors) { } @SafeVarargs - public final EventHandlerGroup handleEventsWithWorkerPool(final WorkHandler... handlers) { + public final EventHandlerGroup handleEventsWithWorkerPool(final WorkHandler... handlers) { return disruptor.createWorkerPool(sequences, handlers); } } diff --git a/ulyp-agent-core/src/main/java/com/ulyp/agent/queue/disruptor/RecordingQueueDisruptor.java b/ulyp-agent-core/src/main/java/com/ulyp/agent/queue/disruptor/RecordingQueueDisruptor.java index 68d58c5c..81276d42 100644 --- a/ulyp-agent-core/src/main/java/com/ulyp/agent/queue/disruptor/RecordingQueueDisruptor.java +++ b/ulyp-agent-core/src/main/java/com/ulyp/agent/queue/disruptor/RecordingQueueDisruptor.java @@ -4,7 +4,8 @@ import com.lmax.disruptor.dsl.BasicExecutor; import com.lmax.disruptor.dsl.ExceptionHandlerWrapper; import com.lmax.disruptor.util.Util; -import com.ulyp.agent.queue.RecordingEventBatch; +import com.ulyp.agent.RecordingEventBuffer; +import com.ulyp.agent.queue.RecordingEventDisruptorEntry; import com.ulyp.core.metrics.Metrics; import java.util.concurrent.Executor; @@ -17,14 +18,14 @@ * provide additional methods */ public class RecordingQueueDisruptor { - private final RingBuffer ringBuffer; + private final RingBuffer ringBuffer; private final Executor executor; - private final ConsumerRepository consumerRepository = new ConsumerRepository<>(); + private final ConsumerRepository consumerRepository = new ConsumerRepository<>(); private final AtomicBoolean started = new AtomicBoolean(false); - private final ExceptionHandler exceptionHandler = new ExceptionHandlerWrapper<>(); + private final ExceptionHandler exceptionHandler = new ExceptionHandlerWrapper<>(); public RecordingQueueDisruptor( - final EventFactory eventFactory, + final EventFactory eventFactory, final int ringBufferSize, final ThreadFactory threadFactory, final WaitStrategy waitStrategy, @@ -32,13 +33,13 @@ public RecordingQueueDisruptor( this(RingBuffer.create(eventFactory, ringBufferSize, waitStrategy, metrics), new BasicExecutor(threadFactory)); } - private RecordingQueueDisruptor(final RingBuffer ringBuffer, final Executor executor) { + private RecordingQueueDisruptor(final RingBuffer ringBuffer, final Executor executor) { this.ringBuffer = ringBuffer; this.executor = executor; } @SafeVarargs - public final EventHandlerGroup handleEventsWith(final EventProcessorFactory... eventProcessorFactories) { + public final EventHandlerGroup handleEventsWith(final EventProcessorFactory... eventProcessorFactories) { final Sequence[] barrierSequences = new Sequence[0]; return createEventProcessors(barrierSequences, eventProcessorFactories); } @@ -58,17 +59,17 @@ public EventHandlerGroup handleEventsWith(final EventProcessor... processors) { return new EventHandlerGroup(this, consumerRepository, Util.getSequencesFor(processors)); } - public void publish(RecordingEventBatch event) { + public void publish(RecordingEventBuffer eventBuffer) { long next = ringBuffer.next(1); try { - RecordingEventBatch ringEvent = get(next); - ringEvent.moveFrom(event); + RecordingEventDisruptorEntry ringEntry = get(next); + ringEntry.moveFrom(eventBuffer); } finally { ringBuffer.publish(next); } } - public RingBuffer start() { + public RingBuffer start() { checkOnlyStartedOnce(); for (final ConsumerInfo consumerInfo : consumerRepository) { consumerInfo.start(executor); @@ -106,7 +107,7 @@ public long getCursor() { return ringBuffer.getCursor(); } - public RecordingEventBatch get(final long sequence) { + public RecordingEventDisruptorEntry get(final long sequence) { return ringBuffer.get(sequence); } @@ -122,16 +123,16 @@ private boolean hasBacklog() { EventHandlerGroup createEventProcessors( final Sequence[] barrierSequences, - final EventHandler[] eventHandlers) { + final EventHandler[] eventHandlers) { checkNotStarted(); final Sequence[] processorSequences = new Sequence[eventHandlers.length]; final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) { - final EventHandler eventHandler = eventHandlers[i]; + final EventHandler eventHandler = eventHandlers[i]; - final BatchEventProcessor batchEventProcessor = + final BatchEventProcessor batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler); batchEventProcessor.setExceptionHandler(exceptionHandler); @@ -155,7 +156,7 @@ private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequenc } EventHandlerGroup createEventProcessors( - final Sequence[] barrierSequences, final EventProcessorFactory[] processorFactories) { + final Sequence[] barrierSequences, final EventProcessorFactory[] processorFactories) { final EventProcessor[] eventProcessors = new EventProcessor[processorFactories.length]; for (int i = 0; i < processorFactories.length; i++) { eventProcessors[i] = processorFactories[i].createEventProcessor(ringBuffer, barrierSequences); @@ -165,9 +166,9 @@ EventHandlerGroup createEventProcessors( } EventHandlerGroup createWorkerPool( - final Sequence[] barrierSequences, final WorkHandler[] workHandlers) { + final Sequence[] barrierSequences, final WorkHandler[] workHandlers) { final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences); - final WorkerPool workerPool = new WorkerPool<>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers); + final WorkerPool workerPool = new WorkerPool<>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers); consumerRepository.add(workerPool, sequenceBarrier); diff --git a/ulyp-agent-core/src/main/java/com/ulyp/agent/util/RecordingStateStore.java b/ulyp-agent-core/src/main/java/com/ulyp/agent/util/RecordingStateStore.java new file mode 100644 index 00000000..0925feba --- /dev/null +++ b/ulyp-agent-core/src/main/java/com/ulyp/agent/util/RecordingStateStore.java @@ -0,0 +1,58 @@ +package com.ulyp.agent.util; + +import com.ulyp.agent.RecordingState; + +import javax.annotation.concurrent.NotThreadSafe; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Maintains all recording states {@link RecordingState} in a simple array for fast access + * by recording id. The access to array is not synchronized - JVM guarantees everything is fine (no word tearing) as long as + * different threads access their own exclusive locations. + */ +@NotThreadSafe +public class RecordingStateStore { + + private static final int MAX_RECORDINGS = 64 * 1024; + + private final RecordingState[] recordingStates = new RecordingState[MAX_RECORDINGS]; + private final AtomicInteger idGenerator = new AtomicInteger(0); + + public RecordingStateStore() { + + } + + /** + * Atomically puts recording states and returns an index which might be used as a recording id + */ + public int add(RecordingState recordingState) { + int id = generateRecordingId(); + // no synchronization and volatile writes since different thread access different array locations + recordingStates[id] = recordingState; + return id; + } + + public RecordingState get(int recordingId) { + // no synchronization + return recordingStates[recordingId]; + } + + public void remove(int recordingId) { + recordingStates[recordingId] = null; + } + + private int generateRecordingId() { + int id = idGenerator.incrementAndGet(); + if (id >= MAX_RECORDINGS) { + // this is unlikely that 64k recordings will happen, mut maybe support wrapping around recording id in UI + synchronized (this) { + // relatively unsafe, but ok + if (idGenerator.get() >= MAX_RECORDINGS) { + idGenerator.set(0); + id = idGenerator.incrementAndGet(); + } + } + } + return id; + } +} diff --git a/ulyp-agent-core/src/test/java/com/ulyp/agent/RecorderCurrentSessionCountTest.java b/ulyp-agent-core/src/test/java/com/ulyp/agent/RecorderCurrentSessionCountTest.java index fc4fe479..4738907c 100644 --- a/ulyp-agent-core/src/test/java/com/ulyp/agent/RecorderCurrentSessionCountTest.java +++ b/ulyp-agent-core/src/test/java/com/ulyp/agent/RecorderCurrentSessionCountTest.java @@ -43,8 +43,8 @@ public String foo(Integer s) { private final MethodRepository methodRepository = new MethodRepository(); private final TypeResolver typeResolver = new ReflectionBasedTypeResolver(); private final StatsRecordingDataWriter recordingDataWriter = new StatsRecordingDataWriter(new NullMetrics(), new BlackholeRecordingDataWriter()); - private final RecordingEventQueue recordingEventQueue = new RecordingEventQueue(Settings.builder().build(), typeResolver, new AgentDataWriter(recordingDataWriter, methodRepository), new NullMetrics()); - private final Recorder recorder = new Recorder(methodRepository, new EnabledRecordingPolicy(), recordingEventQueue, new NullMetrics()); + private final RecordingEventQueue recordingEventQueue = new RecordingEventQueue(typeResolver, new AgentDataWriter(recordingDataWriter, methodRepository), new NullMetrics()); + private final Recorder recorder = new Recorder(Settings.builder().build(), typeResolver, methodRepository, new EnabledRecordingPolicy(), recordingEventQueue, new NullMetrics()); private final ReflectionBasedMethodResolver methodResolver = new ReflectionBasedMethodResolver(); private Method method; private int methodIdx; @@ -81,7 +81,7 @@ public void run() { X callee = new X(); for (int i = 0; i < recordingsCount && !Thread.currentThread().isInterrupted(); i++) { - int callId = recorder.startRecordingOnMethodEnter(methodIdx, callee, new Object[5]); + long callId = recorder.startRecordingOnMethodEnter(methodIdx, callee, new Object[5]); Assertions.assertTrue(Recorder.currentRecordingSessionCount.get() > 0, "Since at least one recording session is active, " + "Recorder.currentRecordingSessionCount must be positive"); diff --git a/ulyp-agent-core/src/test/java/com/ulyp/agent/RecorderTest.java b/ulyp-agent-core/src/test/java/com/ulyp/agent/RecorderTest.java index abff00be..9c38adfe 100644 --- a/ulyp-agent-core/src/test/java/com/ulyp/agent/RecorderTest.java +++ b/ulyp-agent-core/src/test/java/com/ulyp/agent/RecorderTest.java @@ -35,12 +35,13 @@ public String foo(Integer s) { private final HeapRecordingDataWrtiter storage = new HeapRecordingDataWrtiter(); private final TypeResolver typeResolver = new ReflectionBasedTypeResolver(); private final RecordingEventQueue callRecordQueue = new RecordingEventQueue( - Settings.builder().build(), typeResolver, new AgentDataWriter(storage, methodRepository), new NullMetrics() ); private final Recorder recorder = new Recorder( + Settings.builder().build(), + typeResolver, methodRepository, new EnabledRecordingPolicy(), callRecordQueue, @@ -65,24 +66,27 @@ public void tearDown() { @Test void shouldRecordDataWhenRecordingIsFinished() throws InterruptedException, TimeoutException { X recorded = new X(); - int callId = recorder.startRecordingOnMethodEnter(methodIdx, recorded, new Object[] {5}); - recorder.onMethodExit(methodIdx, "ABC", null, callId); + long callToken = recorder.startRecordingOnMethodEnter(methodIdx, recorded, new Object[] {5}); + recorder.onMethodExit(methodIdx, "ABC", null, callToken); + callRecordQueue.sync(Duration.ofSeconds(5)); + + long callToken2 = recorder.startRecordingOnMethodEnter(methodIdx, recorded, new Object[] {5}); + recorder.onMethodExit(methodIdx, "ABC", null, callToken2); callRecordQueue.sync(Duration.ofSeconds(5)); assertNull(recorder.getRecordingState()); - assertEquals(2, storage.getCallRecords().size()); + assertEquals(4, storage.getCallRecords().size()); } @Test void testTemporaryRecordingDisableWithOngoingRecording() throws InterruptedException, TimeoutException { - Recorder.recordingIdGenerator.set(0); - X recorded = new X(); - int callId1 = recorder.startRecordingOnMethodEnter(methodIdx, recorded, new Object[] {5}); + long callId1 = recorder.startRecordingOnMethodEnter(methodIdx, recorded, new Object[] {5}); recorder.disableRecording(); - int callId2 = recorder.onMethodEnter(methodIdx, recorded, new Object[]{10}); + long callId2 = recorder.onMethodEnter(methodIdx, recorded, new Object[]{10}); + Assertions.assertEquals(-1, callId2); recorder.onMethodExit(methodIdx, "CDE", null, callId2); recorder.enableRecording(); @@ -93,9 +97,9 @@ void testTemporaryRecordingDisableWithOngoingRecording() throws InterruptedExcep assertEquals(2, storage.getCallRecords().size()); // only the callId1 calls are recorded - assertEquals(new HashSet<>(Collections.singletonList((long) callId1)), storage.getCallRecords() + assertEquals(new HashSet<>(Collections.singletonList((int) callId1)), storage.getCallRecords() .stream() - .map(RecordedMethodCall::getCallId) + .map(call -> (int) call.getCallId()) .collect(Collectors.toSet())); } diff --git a/ulyp-agent/src/main/java/com/ulyp/agent/ConstructorCallRecordingAdvice.java b/ulyp-agent/src/main/java/com/ulyp/agent/ConstructorCallRecordingAdvice.java index d5910479..c275deee 100644 --- a/ulyp-agent/src/main/java/com/ulyp/agent/ConstructorCallRecordingAdvice.java +++ b/ulyp-agent/src/main/java/com/ulyp/agent/ConstructorCallRecordingAdvice.java @@ -17,16 +17,20 @@ public class ConstructorCallRecordingAdvice { @SuppressWarnings("UnusedAssignment") @Advice.OnMethodEnter static void enter( - @Advice.Local("callId") int callId, + @Advice.Local("callToken") long callToken, @MethodId int methodId, @Advice.AllArguments Object[] arguments) { // This if check is ugly, but the code is wired into bytecode, so it's more efficient to check right away instead of calling a method if (methodId >= MethodRepository.RECORD_METHODS_MIN_ID) { - callId = RecorderInstance.instance.startRecordingOnConstructorEnter(methodId, arguments); + callToken = RecorderInstance.instance.startRecordingOnMethodEnter(methodId, null, arguments); } else { - if (Recorder.currentRecordingSessionCount.get() > 0 && RecorderInstance.instance.recordingIsActiveInCurrentThread()) { - callId = RecorderInstance.instance.onConstructorEnter(methodId, arguments); + if (Recorder.currentRecordingSessionCount.get() > 0) { + RecordingState recordingState = RecorderInstance.instance.getCurrentRecordingState(); + if (recordingState != null) { + //noinspection UnusedAssignment + callToken = RecorderInstance.instance.onMethodEnter(recordingState, methodId, null, arguments); + } } } } @@ -38,11 +42,11 @@ static void enter( */ @Advice.OnMethodExit static void exit( - @Advice.Local("callId") int callId, + @Advice.Local("callToken") long callToken, @MethodId int methodId, @Advice.This Object returnValue) { - if (callId > 0) { - RecorderInstance.instance.onConstructorExit(methodId, returnValue, callId); + if (callToken > 0) { + RecorderInstance.instance.onMethodExit(methodId, returnValue, null, callToken); } } } diff --git a/ulyp-agent/src/main/java/com/ulyp/agent/MethodCallRecordingAdvice.java b/ulyp-agent/src/main/java/com/ulyp/agent/MethodCallRecordingAdvice.java index a47ccaa8..93306515 100644 --- a/ulyp-agent/src/main/java/com/ulyp/agent/MethodCallRecordingAdvice.java +++ b/ulyp-agent/src/main/java/com/ulyp/agent/MethodCallRecordingAdvice.java @@ -18,7 +18,7 @@ public class MethodCallRecordingAdvice { @Advice.OnMethodEnter static void enter( @MethodId int methodId, - @Advice.Local("callId") int callId, + @Advice.Local("callToken") long callToken, @Advice.This(optional = true) Object callee, @Advice.AllArguments Object[] arguments) { @@ -26,12 +26,15 @@ static void enter( if (methodId >= MethodRepository.RECORD_METHODS_MIN_ID) { // noinspection UnusedAssignment local variable callId is used by exit() method - callId = RecorderInstance.instance.startRecordingOnMethodEnter(methodId, callee, arguments); + callToken = RecorderInstance.instance.startRecordingOnMethodEnter(methodId, callee, arguments); } else { - if (Recorder.currentRecordingSessionCount.get() > 0 && RecorderInstance.instance.recordingIsActiveInCurrentThread()) { - //noinspection UnusedAssignment - callId = RecorderInstance.instance.onMethodEnter(methodId, callee, arguments); + if (Recorder.currentRecordingSessionCount.get() > 0) { + RecordingState recordingState = RecorderInstance.instance.getCurrentRecordingState(); + if (recordingState != null) { + //noinspection UnusedAssignment + callToken = RecorderInstance.instance.onMethodEnter(recordingState, methodId, callee, arguments); + } } } } @@ -44,11 +47,11 @@ static void enter( @Advice.OnMethodExit(onThrowable = Throwable.class) static void exit( @MethodId int methodId, - @Advice.Local("callId") int callId, + @Advice.Local("callToken") long callToken, @Advice.Thrown Throwable throwable, @Advice.Return(typing = Assigner.Typing.DYNAMIC) Object returnValue) { - if (callId > 0) { - RecorderInstance.instance.onMethodExit(methodId, returnValue, throwable, callId); + if (callToken > 0) { + RecorderInstance.instance.onMethodExit(methodId, returnValue, throwable, callToken); } } }