Skip to content

Commit

Permalink
Add the ability to use span links in AMQP plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
McSick committed Feb 28, 2024
1 parent 7d5cdb0 commit 21bf544
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 6 deletions.
8 changes: 8 additions & 0 deletions plugins/node/instrumentation-amqplib/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ registerInstrumentations({
// publishConfirmHook: (span: Span, publishConfirmedInto: PublishConfirmedInfo) => { },
// consumeHook: (span: Span, consumeInfo: ConsumeInfo) => { },
// consumeEndHook: (span: Span, consumeEndInfo: ConsumeEndInfo) => { },
// useLinksForConsume: boolean,
}),
],
})
Expand All @@ -56,6 +57,7 @@ amqplib instrumentation has few options available to choose from. You can set th
| `consumeHook` | `AmqplibConsumeCustomAttributeFunction` | hook for adding custom attributes before consumer message is processed. |
| `consumeEndHook` | `AmqplibConsumeEndCustomAttributeFunction` | hook for adding custom attributes after consumer message is acked to server. |
| `consumeTimeoutMs` | `number` | read [Consume Timeout](#ConsumeTimeout) below |
| `useLinksForConsume` | `boolean` | read [Links for Consume](#LinksforConsume) below |

### Consume Timeout

Expand All @@ -69,6 +71,12 @@ If timeout is not big enough, span might be closed with 'InstrumentationTimeout'

Default is 1 minute

### Links for Consume

By default, consume spans continue the trace where a message was produced. However, per the [spec](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#consumer-spans), consume spans should be linked to the message's creation context. Setting to true, this will enable the behavior to follow the spec.

Default is false

## Migration From opentelemetry-instrumentation-amqplib

This instrumentation was originally published under the name `"opentelemetry-instrumentation-amqplib"` in [this repo](https://github.com/aspecto-io/opentelemetry-ext-js). Few breaking changes were made during porting to the contrib repo to align with conventions:
Expand Down
47 changes: 41 additions & 6 deletions plugins/node/instrumentation-amqplib/src/amqplib.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
SpanKind,
SpanStatusCode,
ROOT_CONTEXT,
Link,
} from '@opentelemetry/api';
import {
hrTime,
Expand Down Expand Up @@ -409,9 +410,21 @@ export class AmqplibInstrumentation extends InstrumentationBase {
const headers = msg.properties.headers ?? {};
const parentContext = propagation.extract(ROOT_CONTEXT, headers);
const exchange = msg.fields?.exchange;
const span = self.tracer.startSpan(
`${queue} process`,
{
let span: Span;
if (self._config.useLinksForConsume) {
const parentSpanContext = trace.getSpan(parentContext)?.spanContext();
console.log('parentContext', parentContext);
console.log('parentSpanContext', parentSpanContext);
let links: Link[] | undefined;
if (parentSpanContext) {
links = [
{
context: parentSpanContext,
},
];
console.log('links', links);
}
span = self.tracer.startSpan(`${queue} process`, {
kind: SpanKind.CONSUMER,
attributes: {
...channel?.connection?.[CONNECTION_ATTRIBUTES],
Expand All @@ -427,9 +440,31 @@ export class AmqplibInstrumentation extends InstrumentationBase {
[SemanticAttributes.MESSAGING_CONVERSATION_ID]:
msg?.properties.correlationId,
},
},
parentContext
);
links: links,
});
} else {
span = self.tracer.startSpan(
`${queue} process`,
{
kind: SpanKind.CONSUMER,
attributes: {
...channel?.connection?.[CONNECTION_ATTRIBUTES],
[SemanticAttributes.MESSAGING_DESTINATION]: exchange,
[SemanticAttributes.MESSAGING_DESTINATION_KIND]:
MessagingDestinationKindValues.TOPIC,
[SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY]:
msg.fields?.routingKey,
[SemanticAttributes.MESSAGING_OPERATION]:
MessagingOperationValues.PROCESS,
[SemanticAttributes.MESSAGING_MESSAGE_ID]:
msg?.properties.messageId,
[SemanticAttributes.MESSAGING_CONVERSATION_ID]:
msg?.properties.correlationId,
},
},
parentContext
);
}

if (self._config.consumeHook) {
safeExecuteInTheMiddle(
Expand Down
3 changes: 3 additions & 0 deletions plugins/node/instrumentation-amqplib/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ export interface AmqplibInstrumentationConfig extends InstrumentationConfig {
* Default is 1 minute
*/
consumeTimeoutMs?: number;

/** Used to use a span link for the consume message instead of continueing a trace */
useLinksForConsume?: boolean;
}

export const DEFAULT_CONFIG: AmqplibInstrumentationConfig = {
Expand Down

0 comments on commit 21bf544

Please sign in to comment.