Skip to content

Commit

Permalink
Add observability support for JMS
Browse files Browse the repository at this point in the history
This commit adds observability support for Jakarta JMS support in
spring-jms support. This feature leverages the `JmsInstrumentation`
infrastructure in `io.micrometer:micrometer-core` library.

This instruments the `JmsTemplate` and the `@JmsListener` support to
record observations:

* "jms.message.publish" when the `JmsTemplate` sends a message
* "jms.message.process" when a message is processed by a `@JmsListener`
  annotated method

The observation `Convention` and `Context` implementations are shipped
with "micrometer-core".

Closes spring-projectsgh-30335
  • Loading branch information
bclozel committed Aug 2, 2023
1 parent eed1421 commit bc5b06a
Show file tree
Hide file tree
Showing 12 changed files with 441 additions and 3 deletions.
2 changes: 2 additions & 0 deletions framework-docs/framework-docs.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ repositories {

dependencies {
api(project(":spring-context"))
api(project(":spring-jms"))
api(project(":spring-web"))
api("jakarta.jms:jakarta.jms-api")
api("jakarta.servlet:jakarta.servlet-api")

implementation(project(":spring-core-test"))
Expand Down
71 changes: 71 additions & 0 deletions framework-docs/modules/ROOT/pages/integration/observability.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ As outlined xref:integration/observability.adoc[at the beginning of this section
|xref:integration/observability.adoc#observability.http-server[`"http.server.requests"`]
|Processing time for HTTP server exchanges at the Framework level

|xref:integration/observability.adoc#observability.jms.publish[`"jms.message.publish"`]
|Time spent sending a JMS message to a destination by a message producer.

|xref:integration/observability.adoc#observability.jms.process[`"jms.message.process"`]
|Processing time for a JMS message that was previously received by a message consumer.

|xref:integration/observability.adoc#observability.tasks-scheduled[`"tasks.scheduled.execution"`]
|Processing time for an execution of a `@Scheduled` task
|===
Expand Down Expand Up @@ -108,6 +114,71 @@ By default, the following `KeyValues` are created:
|===


[[observability.jms]]
== JMS messaging instrumentation

Spring Framework uses the Jakarta JMS instrumentation provided by Micrometer if the `io.micrometer:micrometer-core` dependency is on the classpath.
The `io.micrometer.core.instrument.binder.jms.JmsInstrumentation` instruments `jakarta.jms.Session` and records the relevant observations.

This instrumentation will create 2 types of observations:

* `"jms.message.publish"` when a JMS message is sent to the broker, typically with `JmsTemplate`.
* `"jms.message.process"` when a JMS message is processed by the application, typically with a `MessageListener` or a `@JmsListener` annotated method.

NOTE: currently there is no instrumentation for `"jms.message.receive"` observations as there is little value in measuring the time spent waiting for the reception of a message.
Such an integration would typically instrument `MessageConsumer#receive` method calls. But once those return, the processing time is not measured and the trace scope cannot be propagated to the application.

By default, both observations share the same set of possible `KeyValues`:

.Low cardinality Keys
[cols="a,a"]
|===
|Name | Description
|`messaging.destination.temporary` _(required)_|Whether the destination is a `TemporaryQueue` or `TemporaryTopic` (values: `"true"` or `"false"`).
|`messaging.operation` _(required)_|Name of JMS operation being performed (values: `"publish"` or `"process"`).
|===

.High cardinality Keys
[cols="a,a"]
|===
|Name | Description
|`messaging.message.conversation_id` |The correlation ID of the JMS message.
|`messaging.destination.name` |The name of destination the current message was sent to.
|`messaging.message.id` |Value used by the messaging system as an identifier for the message.
|===

[[observability.jms.publish]]
=== JMS message Publication instrumentation

`"jms.message.publish"` observations are recorded when a JMS message is sent to the broker.
They measure the time spent sending the message and propagate the tracing information with outgoing JMS message headers.

You will need to configure the `ObservationRegistry` on the `JmsTemplate` to enable observations:

include-code::./JmsTemplatePublish[]

It uses the `io.micrometer.core.instrument.binder.jms.DefaultJmsPublishObservationConvention` by default, backed by the `io.micrometer.core.instrument.binder.jms.JmsPublishObservationContext`.

[[observability.jms.process]]
=== JMS message Processing instrumentation

`"jms.message.process"` observations are recorded when a JMS message is processed by the application.
They measure the time spent processing the message and propagate the tracing context with incoming JMS message headers.

Most applications will use the xref:integration/jms/annotated.adoc#jms-annotated[`@JmsListener` annotated methods] mechanism to process incoming messages.
You will need to ensure that the `ObservationRegistry` is configured on the dedicated `JmsListenerContainerFactory`:

include-code::./JmsConfiguration[]

A xref:integration/jms/annotated.adoc#jms-annotated-support[default container factory is required to enable the annotation support],
but note that `@JmsListener` annotations can refer to specific container factory beans for specific purposes.
In all cases, Observations are only recorded if the observation registry is configured on the container factory.

Similar observations are recorded with `JmsTemplate` when messages are processed by a `MessageListener`.
Such listeners are set on a `MessageConsumer` within a session callback (see `JmsTemplate.execute(SessionCallback<T>)`).

This observation uses the `io.micrometer.core.instrument.binder.jms.DefaultJmsProcessObservationConvention` by default, backed by the `io.micrometer.core.instrument.binder.jms.JmsProcessObservationContext`.

[[observability.http-server]]
== HTTP Server instrumentation

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.docs.integration.observability.jms.process;

import io.micrometer.observation.ObservationRegistry;
import jakarta.jms.ConnectionFactory;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;

@Configuration
@EnableJms
public class JmsConfiguration {

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory, ObservationRegistry observationRegistry) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setObservationRegistry(observationRegistry);
return factory;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.docs.integration.observability.jms.publish;

import io.micrometer.observation.ObservationRegistry;
import jakarta.jms.ConnectionFactory;

import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.jms.core.JmsTemplate;

public class JmsTemplatePublish {

private final JmsTemplate jmsTemplate;

private final JmsMessagingTemplate jmsMessagingTemplate;

public JmsTemplatePublish(ObservationRegistry observationRegistry, ConnectionFactory connectionFactory) {
this.jmsTemplate = new JmsTemplate(connectionFactory);
// configure the observation registry
this.jmsTemplate.setObservationRegistry(observationRegistry);

// For JmsMessagingTemplate, instantiate it with a JMS template that has a configured registry
this.jmsMessagingTemplate = new JmsMessagingTemplate(this.jmsTemplate);
}

public void sendMessages() {
this.jmsTemplate.convertAndSend("spring.observation.test", "test message");
}

}
4 changes: 3 additions & 1 deletion framework-platform/framework-platform.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ javaPlatform {

dependencies {
api(platform("com.fasterxml.jackson:jackson-bom:2.15.2"))
api(platform("io.micrometer:micrometer-bom:1.12.0-M1"))
api(platform("io.micrometer:micrometer-bom:1.12.0-SNAPSHOT"))
api(platform("io.netty:netty-bom:4.1.96.Final"))
api(platform("io.netty:netty5-bom:5.0.0.Alpha5"))
api(platform("io.projectreactor:reactor-bom:2023.0.0-M1"))
Expand Down Expand Up @@ -92,6 +92,8 @@ dependencies {
api("org.apache.activemq:activemq-broker:5.17.4")
api("org.apache.activemq:activemq-kahadb-store:5.17.4")
api("org.apache.activemq:activemq-stomp:5.17.4")
api("org.apache.activemq:artemis-junit-5:2.29.0")
api("org.apache.activemq:artemis-jakarta-client:2.29.0")
api("org.apache.commons:commons-pool2:2.9.0")
api("org.apache.derby:derby:10.16.1.1")
api("org.apache.derby:derbyclient:10.16.1.1")
Expand Down
6 changes: 6 additions & 0 deletions spring-jms/spring-jms.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,20 @@ dependencies {
api(project(":spring-core"))
api(project(":spring-messaging"))
api(project(":spring-tx"))
api("io.micrometer:micrometer-observation")
compileOnly("jakarta.jms:jakarta.jms-api")
optional(project(":spring-aop"))
optional(project(":spring-context"))
optional(project(":spring-oxm"))
optional("com.fasterxml.jackson.core:jackson-databind")
optional("io.micrometer:micrometer-core")
optional("jakarta.resource:jakarta.resource-api")
optional("jakarta.transaction:jakarta.transaction-api")
testImplementation(testFixtures(project(":spring-beans")))
testImplementation(testFixtures(project(":spring-tx")))
testImplementation("jakarta.jms:jakarta.jms-api")
testImplementation('io.micrometer:context-propagation')
testImplementation("io.micrometer:micrometer-observation-test")
testImplementation("org.apache.activemq:artemis-junit-5")
testImplementation("org.apache.activemq:artemis-jakarta-client")
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package org.springframework.jms.config;

import io.micrometer.observation.ObservationRegistry;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.ExceptionListener;
import org.apache.commons.logging.Log;
Expand Down Expand Up @@ -86,6 +87,9 @@ public abstract class AbstractJmsListenerContainerFactory<C extends AbstractMess
@Nullable
private Boolean autoStartup;

@Nullable
private ObservationRegistry observationRegistry;


/**
* @see AbstractMessageListenerContainer#setConnectionFactory(ConnectionFactory)
Expand Down Expand Up @@ -193,6 +197,12 @@ public void setAutoStartup(boolean autoStartup) {
this.autoStartup = autoStartup;
}

/**
* @see AbstractMessageListenerContainer#setObservationRegistry(ObservationRegistry)
*/
public void setObservationRegistry(ObservationRegistry observationRegistry) {
this.observationRegistry = observationRegistry;
}

@Override
public C createListenerContainer(JmsListenerEndpoint endpoint) {
Expand Down Expand Up @@ -243,6 +253,9 @@ public C createListenerContainer(JmsListenerEndpoint endpoint) {
if (this.autoStartup != null) {
instance.setAutoStartup(this.autoStartup);
}
if (this.observationRegistry != null) {
instance.setObservationRegistry(this.observationRegistry);
}

initializeContainer(instance);
endpoint.setupListenerContainer(instance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.springframework.jms.core;

import io.micrometer.core.instrument.binder.jms.JmsInstrumentation;
import io.micrometer.observation.ObservationRegistry;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.DeliveryMode;
Expand All @@ -40,6 +42,7 @@
import org.springframework.lang.Nullable;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

/**
* Helper class that simplifies synchronous JMS access code.
Expand Down Expand Up @@ -78,6 +81,7 @@
* @author Mark Pollack
* @author Juergen Hoeller
* @author Stephane Nicoll
* @author Brian Clozel
* @since 1.1
* @see #setConnectionFactory
* @see #setPubSubDomain
Expand All @@ -88,6 +92,9 @@
*/
public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations {

private static final boolean micrometerCorePresent = ClassUtils.isPresent(
"io.micrometer.core.instrument.binder.jms.JmsInstrumentation", JmsTemplate.class.getClassLoader());

/** Internal ResourceFactory adapter for interacting with ConnectionFactoryUtils. */
private final JmsTemplateResourceFactory transactionalResourceFactory = new JmsTemplateResourceFactory();

Expand Down Expand Up @@ -118,6 +125,9 @@ public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations

private long timeToLive = Message.DEFAULT_TIME_TO_LIVE;

@Nullable
private ObservationRegistry observationRegistry;


/**
* Create a new JmsTemplate for bean-style usage.
Expand Down Expand Up @@ -460,6 +470,15 @@ public long getTimeToLive() {
return this.timeToLive;
}

/**
* Configure the {@link ObservationRegistry} to use for recording JMS observations.
* @param observationRegistry the observation registry to use.
* @since 6.1
* @see io.micrometer.core.instrument.binder.jms.JmsObservationDocumentation
*/
public void setObservationRegistry(ObservationRegistry observationRegistry) {
this.observationRegistry = observationRegistry;
}

//---------------------------------------------------------------------------------------
// JmsOperations execute methods
Expand Down Expand Up @@ -504,6 +523,9 @@ public <T> T execute(SessionCallback<T> action, boolean startConnection) throws
if (logger.isDebugEnabled()) {
logger.debug("Executing callback on JMS Session: " + sessionToUse);
}
if (micrometerCorePresent && this.observationRegistry != null) {
sessionToUse = MicrometerInstrumentation.instrumentSession(sessionToUse, this.observationRegistry);
}
return action.doInJms(sessionToUse);
}
catch (JMSException ex) {
Expand Down Expand Up @@ -1194,4 +1216,12 @@ public boolean isSynchedLocalTransactionAllowed() {
}
}

private static abstract class MicrometerInstrumentation {

static Session instrumentSession(Session session, ObservationRegistry registry) {
return JmsInstrumentation.instrumentSession(session, registry);
}

}

}
Loading

0 comments on commit bc5b06a

Please sign in to comment.