Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add the ability to use span links when consuming a message amqp plugin #1972

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
21bf544
Add the ability to use span links in AMQP plugin
McSick Feb 28, 2024
875e987
Remove console messages
McSick Feb 28, 2024
5720402
Spelling fix
McSick Feb 28, 2024
7e90336
add default behavior
McSick Feb 28, 2024
8bbc88d
Merge branch 'main' into feature/config-links-amqp-plugin
McSick Feb 28, 2024
adcac3c
Merge branch 'main' into feature/config-links-amqp-plugin
McSick Mar 4, 2024
e8dece9
Merge branch 'main' into feature/config-links-amqp-plugin
McSick Mar 5, 2024
16908aa
Add in tests for useLinks option
McSick Mar 5, 2024
f8ced07
Lint fix
McSick Mar 5, 2024
0bd1f7f
Adding in extra info for future testers
McSick Mar 5, 2024
96db167
Merge branch 'main' into feature/config-links-amqp-plugin
McSick Mar 6, 2024
468f1dd
Merge branch 'main' into feature/config-links-amqp-plugin
McSick Mar 6, 2024
bce0bee
Merge branch 'main' into feature/config-links-amqp-plugin
McSick Mar 7, 2024
19e123d
Merge branch 'main' into feature/config-links-amqp-plugin
McSick Mar 14, 2024
94b6e2a
Merge branch 'main' into feature/config-links-amqp-plugin
McSick Mar 18, 2024
3504de9
Merge branch 'main' into feature/config-links-amqp-plugin
McSick Apr 3, 2024
e388cf2
better code so not duplicate
McSick May 29, 2024
b6470a3
Merge remote-tracking branch 'upstream/main' into feature/config-link…
McSick May 29, 2024
80dbd23
Updating tests to correct Semantic attributes
McSick May 29, 2024
69ef4f6
Merge branch 'main' into feature/config-links-amqp-plugin
McSick May 30, 2024
e2e0160
Merge remote-tracking branch 'upstream/main' into feature/config-link…
McSick Jun 21, 2024
aeac819
Update based on suggestion
McSick Jun 21, 2024
e5e7d93
update suggestion
McSick Jun 21, 2024
b33b4bc
Merge branch 'main' into feature/config-links-amqp-plugin
McSick Jun 25, 2024
39068db
Merge branch 'main' into feature/config-links-amqp-plugin
McSick Jul 1, 2024
eaf88f8
Merge branch 'main' into feature/config-links-amqp-plugin
McSick Jul 1, 2024
fc5139a
Update plugins/node/instrumentation-amqplib/README.md
JamieDanielson Jul 2, 2024
1fca93a
appease linter
JamieDanielson Jul 2, 2024
c5300b7
Merge branch 'main' into feature/config-links-amqp-plugin
JamieDanielson Jul 9, 2024
9184cc3
Merge branch 'main' into feature/config-links-amqp-plugin
JamieDanielson Jul 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions plugins/node/instrumentation-amqplib/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ registerInstrumentations({
// publishConfirmHook: (span: Span, publishConfirmedInto: PublishConfirmedInfo) => { },
// consumeHook: (span: Span, consumeInfo: ConsumeInfo) => { },
// consumeEndHook: (span: Span, consumeEndInfo: ConsumeEndInfo) => { },
// useLinksForConsume: boolean,
}),
],
})
Expand All @@ -56,6 +57,7 @@ amqplib instrumentation has few options available to choose from. You can set th
| `consumeHook` | `AmqplibConsumeCustomAttributeFunction` | hook for adding custom attributes before consumer message is processed. |
| `consumeEndHook` | `AmqplibConsumeEndCustomAttributeFunction` | hook for adding custom attributes after consumer message is acked to server. |
| `consumeTimeoutMs` | `number` | read [Consume Timeout](#ConsumeTimeout) below |
| `useLinksForConsume` | `boolean` | read [Links for Consume](#LinksforConsume) below |

### Consume Timeout

Expand All @@ -69,6 +71,12 @@ If timeout is not big enough, span might be closed with 'InstrumentationTimeout'

Default is 1 minute

### Links for Consume

By default, consume spans continue the trace where a message was produced. However, per the [spec](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#consumer-spans), consume spans should be linked to the message's creation context. Setting to true, this will enable the behavior to follow the spec.

Default is false

## Migration From opentelemetry-instrumentation-amqplib

This instrumentation was originally published under the name `"opentelemetry-instrumentation-amqplib"` in [this repo](https://github.com/aspecto-io/opentelemetry-ext-js). Few breaking changes were made during porting to the contrib repo to align with conventions:
Expand Down
44 changes: 38 additions & 6 deletions plugins/node/instrumentation-amqplib/src/amqplib.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
SpanKind,
SpanStatusCode,
ROOT_CONTEXT,
Link,
} from '@opentelemetry/api';
import {
hrTime,
Expand Down Expand Up @@ -409,9 +410,18 @@
const headers = msg.properties.headers ?? {};
const parentContext = propagation.extract(ROOT_CONTEXT, headers);
const exchange = msg.fields?.exchange;
const span = self.tracer.startSpan(
`${queue} process`,
{
let span: Span;
if (self._config.useLinksForConsume) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the 2 cases in the if-else contains some duplication.

WDYT about preparing the links array only once (with 0 or 1 elements) and the parent based on the instrumentation configuration, then just invoking self.tracer.startSpan once with these values? or at least calculate the attributes once, outside the if-else

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a null parent be fine in this instance? Because with the config option, the parentcontext should not exists. If that's fine, then I can make those changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

undefined is fine. It's the default anyway

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd do something like this

        const links: Link[] = [];
        if (self._config.useLinksForConsume) {
          const parentSpanContext = trace.getSpan(parentContext)?.spanContext();
          if (parentSpanContext) {
            links.push({ context: parentSpanContext });
          }
        }

        const span = self.tracer.startSpan(`${queue} process`, {
          kind: SpanKind.CONSUMER,
          attributes: {
            ...channel?.connection?.[CONNECTION_ATTRIBUTES],
            [SemanticAttributes.MESSAGING_DESTINATION]: exchange,
            [SemanticAttributes.MESSAGING_DESTINATION_KIND]:
              MessagingDestinationKindValues.TOPIC,
            [SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY]:
              msg.fields?.routingKey,
            [SemanticAttributes.MESSAGING_OPERATION]:
              MessagingOperationValues.PROCESS,
            [SemanticAttributes.MESSAGING_MESSAGE_ID]:
              msg?.properties.messageId,
            [SemanticAttributes.MESSAGING_CONVERSATION_ID]:
              msg?.properties.correlationId,
          },
          links,
        },
        self._config.useLinksForConsume ? undefined : parentContext,
        );

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@McSick looks like you ended up making the change for this, albeit a bit more verbose. I think it achieves the same thing though. As a nit I tend to lean toward the more succinct version but non blocking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to change it a little as parentContext was used deeper in the code so I wanted to explicitly make it undefined in this instance so the trace was not continued.

const parentSpanContext = trace.getSpan(parentContext)?.spanContext();

Check warning on line 415 in plugins/node/instrumentation-amqplib/src/amqplib.ts

View check run for this annotation

Codecov / codecov/patch

plugins/node/instrumentation-amqplib/src/amqplib.ts#L415

Added line #L415 was not covered by tests
let links: Link[] | undefined;
if (parentSpanContext) {
links = [

Check warning on line 418 in plugins/node/instrumentation-amqplib/src/amqplib.ts

View check run for this annotation

Codecov / codecov/patch

plugins/node/instrumentation-amqplib/src/amqplib.ts#L417-L418

Added lines #L417 - L418 were not covered by tests
{
context: parentSpanContext,
},
];
}
span = self.tracer.startSpan(`${queue} process`, {

Check warning on line 424 in plugins/node/instrumentation-amqplib/src/amqplib.ts

View check run for this annotation

Codecov / codecov/patch

plugins/node/instrumentation-amqplib/src/amqplib.ts#L424

Added line #L424 was not covered by tests
kind: SpanKind.CONSUMER,
attributes: {
...channel?.connection?.[CONNECTION_ATTRIBUTES],
Expand All @@ -427,9 +437,31 @@
[SemanticAttributes.MESSAGING_CONVERSATION_ID]:
msg?.properties.correlationId,
},
},
parentContext
);
links: links,
McSick marked this conversation as resolved.
Show resolved Hide resolved
});
} else {
span = self.tracer.startSpan(
`${queue} process`,
{
kind: SpanKind.CONSUMER,
attributes: {
...channel?.connection?.[CONNECTION_ATTRIBUTES],
[SemanticAttributes.MESSAGING_DESTINATION]: exchange,
[SemanticAttributes.MESSAGING_DESTINATION_KIND]:
MessagingDestinationKindValues.TOPIC,
[SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY]:
msg.fields?.routingKey,
[SemanticAttributes.MESSAGING_OPERATION]:
MessagingOperationValues.PROCESS,
[SemanticAttributes.MESSAGING_MESSAGE_ID]:
msg?.properties.messageId,
[SemanticAttributes.MESSAGING_CONVERSATION_ID]:
msg?.properties.correlationId,
},
},
parentContext
);
}

if (self._config.consumeHook) {
safeExecuteInTheMiddle(
Expand Down
4 changes: 4 additions & 0 deletions plugins/node/instrumentation-amqplib/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,14 @@ export interface AmqplibInstrumentationConfig extends InstrumentationConfig {
* Default is 1 minute
*/
consumeTimeoutMs?: number;

/** Used to use a span link for the consume message instead of continuing a trace */
McSick marked this conversation as resolved.
Show resolved Hide resolved
useLinksForConsume?: boolean;
}

export const DEFAULT_CONFIG: AmqplibInstrumentationConfig = {
consumeTimeoutMs: 1000 * 60, // 1 minute
useLinksForConsume: false,
};

// The following types are vendored from `@types/amqplib@0.10.1` - commit SHA: 4205e03127692a40b4871709a7134fe4e2ed5510
Expand Down
Loading