Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add the ability to use span links when consuming a message amqp plugin #1972

Merged
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
21bf544
Add the ability to use span links in AMQP plugin
McSick Feb 28, 2024
875e987
Remove console messages
McSick Feb 28, 2024
5720402
Spelling fix
McSick Feb 28, 2024
7e90336
add default behavior
McSick Feb 28, 2024
8bbc88d
Merge branch 'main' into feature/config-links-amqp-plugin
McSick Feb 28, 2024
adcac3c
Merge branch 'main' into feature/config-links-amqp-plugin
McSick Mar 4, 2024
e8dece9
Merge branch 'main' into feature/config-links-amqp-plugin
McSick Mar 5, 2024
16908aa
Add in tests for useLinks option
McSick Mar 5, 2024
f8ced07
Lint fix
McSick Mar 5, 2024
0bd1f7f
Adding in extra info for future testers
McSick Mar 5, 2024
96db167
Merge branch 'main' into feature/config-links-amqp-plugin
McSick Mar 6, 2024
468f1dd
Merge branch 'main' into feature/config-links-amqp-plugin
McSick Mar 6, 2024
bce0bee
Merge branch 'main' into feature/config-links-amqp-plugin
McSick Mar 7, 2024
19e123d
Merge branch 'main' into feature/config-links-amqp-plugin
McSick Mar 14, 2024
94b6e2a
Merge branch 'main' into feature/config-links-amqp-plugin
McSick Mar 18, 2024
3504de9
Merge branch 'main' into feature/config-links-amqp-plugin
McSick Apr 3, 2024
e388cf2
better code so not duplicate
McSick May 29, 2024
b6470a3
Merge remote-tracking branch 'upstream/main' into feature/config-link…
McSick May 29, 2024
80dbd23
Updating tests to correct Semantic attributes
McSick May 29, 2024
69ef4f6
Merge branch 'main' into feature/config-links-amqp-plugin
McSick May 30, 2024
e2e0160
Merge remote-tracking branch 'upstream/main' into feature/config-link…
McSick Jun 21, 2024
aeac819
Update based on suggestion
McSick Jun 21, 2024
e5e7d93
update suggestion
McSick Jun 21, 2024
b33b4bc
Merge branch 'main' into feature/config-links-amqp-plugin
McSick Jun 25, 2024
39068db
Merge branch 'main' into feature/config-links-amqp-plugin
McSick Jul 1, 2024
eaf88f8
Merge branch 'main' into feature/config-links-amqp-plugin
McSick Jul 1, 2024
fc5139a
Update plugins/node/instrumentation-amqplib/README.md
JamieDanielson Jul 2, 2024
1fca93a
appease linter
JamieDanielson Jul 2, 2024
c5300b7
Merge branch 'main' into feature/config-links-amqp-plugin
JamieDanielson Jul 9, 2024
9184cc3
Merge branch 'main' into feature/config-links-amqp-plugin
JamieDanielson Jul 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions plugins/node/instrumentation-amqplib/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ registerInstrumentations({
// publishConfirmHook: (span: Span, publishConfirmedInto: PublishConfirmedInfo) => { },
// consumeHook: (span: Span, consumeInfo: ConsumeInfo) => { },
// consumeEndHook: (span: Span, consumeEndInfo: ConsumeEndInfo) => { },
// useLinksForConsume: boolean,
}),
],
})
Expand All @@ -56,6 +57,7 @@ amqplib instrumentation has few options available to choose from. You can set th
| `consumeHook` | `AmqplibConsumeCustomAttributeFunction` | hook for adding custom attributes before consumer message is processed. |
| `consumeEndHook` | `AmqplibConsumeEndCustomAttributeFunction` | hook for adding custom attributes after consumer message is acked to server. |
| `consumeTimeoutMs` | `number` | read [Consume Timeout](#ConsumeTimeout) below |
| `useLinksForConsume` | `boolean` | read [Links for Consume](#LinksforConsume) below |

### Consume Timeout

Expand All @@ -69,6 +71,12 @@ If timeout is not big enough, span might be closed with 'InstrumentationTimeout'

Default is 1 minute

### Links for Consume

By default, consume spans continue the trace where a message was produced. However, per the [spec](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#consumer-spans), consume spans should be linked to the message's creation context. Setting to true, this will enable the behavior to follow the spec.

Default is false

## Migration From opentelemetry-instrumentation-amqplib

This instrumentation was originally published under the name `"opentelemetry-instrumentation-amqplib"` in [this repo](https://github.com/aspecto-io/opentelemetry-ext-js). Few breaking changes were made during porting to the contrib repo to align with conventions:
Expand All @@ -81,6 +89,16 @@ The instrumentation's config `publishHook`, `publishConfirmHook`, `consumeHook`

The `moduleVersionAttributeName` config option is removed. To add the amqplib package version to spans, use the `moduleVersion` attribute in hook info for `publishHook` and `consumeHook` functions.

## Running Tests Locally

To run the tests locally, you need to have a RabbitMQ server running. You can use the following command to start a RabbitMQ server using Docker:

```bash
npm run test:docker:run
```

By default, the tests that connect to RabbitMQ are skipped. To make sure these tests are run, you can set the `RUN_RABBIT_TESTS` environment variable to `true`

## Semantic Conventions

This package uses `@opentelemetry/semantic-conventions` version `1.22+`, which implements Semantic Convention [Version 1.7.0](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.7.0/semantic_conventions/README.md)
Expand Down
28 changes: 25 additions & 3 deletions plugins/node/instrumentation-amqplib/src/amqplib.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import {
SpanKind,
SpanStatusCode,
ROOT_CONTEXT,
Link,
Context,
} from '@opentelemetry/api';
import {
hrTime,
Expand Down Expand Up @@ -412,8 +414,25 @@ export class AmqplibInstrumentation extends InstrumentationBase {
}

const headers = msg.properties.headers ?? {};
const parentContext = propagation.extract(ROOT_CONTEXT, headers);
let parentContext: Context | undefined = propagation.extract(
ROOT_CONTEXT,
headers
);
const exchange = msg.fields?.exchange;
let links: Link[] | undefined;
if (self._config.useLinksForConsume) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the 2 cases in the if-else contains some duplication.

WDYT about preparing the links array only once (with 0 or 1 elements) and the parent based on the instrumentation configuration, then just invoking self.tracer.startSpan once with these values? or at least calculate the attributes once, outside the if-else

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a null parent be fine in this instance? Because with the config option, the parentcontext should not exists. If that's fine, then I can make those changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

undefined is fine. It's the default anyway

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd do something like this

        const links: Link[] = [];
        if (self._config.useLinksForConsume) {
          const parentSpanContext = trace.getSpan(parentContext)?.spanContext();
          if (parentSpanContext) {
            links.push({ context: parentSpanContext });
          }
        }

        const span = self.tracer.startSpan(`${queue} process`, {
          kind: SpanKind.CONSUMER,
          attributes: {
            ...channel?.connection?.[CONNECTION_ATTRIBUTES],
            [SemanticAttributes.MESSAGING_DESTINATION]: exchange,
            [SemanticAttributes.MESSAGING_DESTINATION_KIND]:
              MessagingDestinationKindValues.TOPIC,
            [SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY]:
              msg.fields?.routingKey,
            [SemanticAttributes.MESSAGING_OPERATION]:
              MessagingOperationValues.PROCESS,
            [SemanticAttributes.MESSAGING_MESSAGE_ID]:
              msg?.properties.messageId,
            [SemanticAttributes.MESSAGING_CONVERSATION_ID]:
              msg?.properties.correlationId,
          },
          links,
        },
        self._config.useLinksForConsume ? undefined : parentContext,
        );

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@McSick looks like you ended up making the change for this, albeit a bit more verbose. I think it achieves the same thing though. As a nit I tend to lean toward the more succinct version but non blocking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to change it a little as parentContext was used deeper in the code so I wanted to explicitly make it undefined in this instance so the trace was not continued.

const parentSpanContext = parentContext
? trace.getSpan(parentContext)?.spanContext()
: undefined;
parentContext = undefined;
if (parentSpanContext) {
links = [
{
context: parentSpanContext,
},
];
}
}
const span = self.tracer.startSpan(
`${queue} process`,
{
Expand All @@ -429,6 +448,7 @@ export class AmqplibInstrumentation extends InstrumentationBase {
[SEMATTRS_MESSAGING_CONVERSATION_ID]:
msg?.properties.correlationId,
},
links: links,
McSick marked this conversation as resolved.
Show resolved Hide resolved
},
parentContext
);
Expand All @@ -455,8 +475,10 @@ export class AmqplibInstrumentation extends InstrumentationBase {
// store the span on the message, so we can end it when user call 'ack' on it
msg[MESSAGE_STORED_SPAN] = span;
}

context.with(trace.setSpan(parentContext, span), () => {
const setContext: Context = parentContext
? parentContext
: ROOT_CONTEXT;
context.with(trace.setSpan(setContext, span), () => {
onMessage.call(this, msg);
});

Expand Down
4 changes: 4 additions & 0 deletions plugins/node/instrumentation-amqplib/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,14 @@ export interface AmqplibInstrumentationConfig extends InstrumentationConfig {
* Default is 1 minute
*/
consumeTimeoutMs?: number;

/** Used to use a span link for the consume message instead of continuing a trace */
McSick marked this conversation as resolved.
Show resolved Hide resolved
useLinksForConsume?: boolean;
}

export const DEFAULT_CONFIG: AmqplibInstrumentationConfig = {
consumeTimeoutMs: 1000 * 60, // 1 minute
useLinksForConsume: false,
};

// The following types are vendored from `@types/amqplib@0.10.1` - commit SHA: 4205e03127692a40b4871709a7134fe4e2ed5510
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import {
registerInstrumentationTesting,
} from '@opentelemetry/contrib-test-utils';

registerInstrumentationTesting(new AmqplibInstrumentation());
const instrumentation = registerInstrumentationTesting(
new AmqplibInstrumentation()
);

import * as amqpCallback from 'amqplib/callback_api';
import {
Expand Down Expand Up @@ -401,4 +403,266 @@ describe('amqplib instrumentation callback model', () => {
});
});
});

describe('channel with links config', () => {
let channel: amqpCallback.Channel;
beforeEach(done => {
instrumentation.setConfig({
useLinksForConsume: true,
});
conn.createChannel(
context.bind(context.active(), (err, c) => {
channel = c;
// install an error handler, otherwise when we have tests that create error on the channel,
// it throws and crash process
channel.on('error', () => {});
channel.assertQueue(
queueName,
{ durable: false },
context.bind(context.active(), (err, ok) => {
channel.purgeQueue(
queueName,
context.bind(context.active(), (err, ok) => {
done();
})
);
})
);
})
);
});

afterEach(done => {
try {
channel.close(err => {
done();
});
} catch {}
});

it('simple publish and consume from queue callback', done => {
const hadSpaceInBuffer = channel.sendToQueue(
queueName,
Buffer.from(msgPayload)
);
expect(hadSpaceInBuffer).toBeTruthy();

asyncConsume(
channel,
queueName,
[msg => expect(msg.content.toString()).toEqual(msgPayload)],
{
noAck: true,
}
).then(() => {
const [publishSpan, consumeSpan] = getTestSpans();

// assert publish span
expect(publishSpan.kind).toEqual(SpanKind.PRODUCER);
expect(publishSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual(
'rabbitmq'
);
expect(publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual(
''
); // according to spec: "This will be an empty string if the default exchange is used"
expect(
publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND]
).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC);
expect(
publishSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY]
).toEqual(queueName);
expect(publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual(
'AMQP'
);
expect(
publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION]
).toEqual('0.9.1');
expect(publishSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual(
censoredUrl
);
expect(publishSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual(
TEST_RABBITMQ_HOST
);
expect(publishSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual(
TEST_RABBITMQ_PORT
);

// assert consume span
expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER);
expect(consumeSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual(
'rabbitmq'
);
expect(consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]).toEqual(
''
); // according to spec: "This will be an empty string if the default exchange is used"
expect(
consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND]
).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC);
expect(
consumeSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY]
).toEqual(queueName);
expect(consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual(
'AMQP'
);
expect(
consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION]
).toEqual('0.9.1');
expect(consumeSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual(
censoredUrl
);
expect(consumeSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual(
TEST_RABBITMQ_HOST
);
expect(consumeSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual(
TEST_RABBITMQ_PORT
);

// new trace should be created
expect(consumeSpan.spanContext().traceId).not.toEqual(
publishSpan.spanContext().traceId
);
expect(consumeSpan.parentSpanId).toBeUndefined();

// link back to publish span
expect(consumeSpan.links.length).toBe(1);
expect(consumeSpan.links[0].context.traceId).toEqual(
publishSpan.spanContext().traceId
);
expect(consumeSpan.links[0].context.spanId).toEqual(
publishSpan.spanContext().spanId
);

done();
});
});
});

describe('confirm channel with links config', () => {
let confirmChannel: amqpCallback.ConfirmChannel;
beforeEach(done => {
instrumentation.setConfig({
useLinksForConsume: true,
});
conn.createConfirmChannel(
context.bind(context.active(), (err, c) => {
confirmChannel = c;
// install an error handler, otherwise when we have tests that create error on the channel,
// it throws and crash process
confirmChannel.on('error', () => {});
confirmChannel.assertQueue(
queueName,
{ durable: false },
context.bind(context.active(), (err, ok) => {
confirmChannel.purgeQueue(
queueName,
context.bind(context.active(), (err, ok) => {
done();
})
);
})
);
})
);
});

afterEach(done => {
try {
confirmChannel.close(err => {
done();
});
} catch {}
});

it('simple publish and consume from queue callback', done => {
asyncConfirmSend(confirmChannel, queueName, msgPayload).then(() => {
asyncConsume(
confirmChannel,
queueName,
[msg => expect(msg.content.toString()).toEqual(msgPayload)],
{
noAck: true,
}
).then(() => {
const [publishSpan, consumeSpan] = getTestSpans();

// assert publish span
expect(publishSpan.kind).toEqual(SpanKind.PRODUCER);
expect(publishSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual(
'rabbitmq'
);
expect(
publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]
).toEqual(''); // according to spec: "This will be an empty string if the default exchange is used"
expect(
publishSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND]
).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC);
expect(
publishSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY]
).toEqual(queueName);
expect(publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual(
'AMQP'
);
expect(
publishSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION]
).toEqual('0.9.1');
expect(publishSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual(
censoredUrl
);
expect(publishSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual(
TEST_RABBITMQ_HOST
);
expect(publishSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual(
TEST_RABBITMQ_PORT
);

// assert consume span
expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER);
expect(consumeSpan.attributes[SEMATTRS_MESSAGING_SYSTEM]).toEqual(
'rabbitmq'
);
expect(
consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION]
).toEqual(''); // according to spec: "This will be an empty string if the default exchange is used"
expect(
consumeSpan.attributes[SEMATTRS_MESSAGING_DESTINATION_KIND]
).toEqual(MESSAGINGDESTINATIONKINDVALUES_TOPIC);
expect(
consumeSpan.attributes[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY]
).toEqual(queueName);
expect(consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL]).toEqual(
'AMQP'
);
expect(
consumeSpan.attributes[SEMATTRS_MESSAGING_PROTOCOL_VERSION]
).toEqual('0.9.1');
expect(consumeSpan.attributes[SEMATTRS_MESSAGING_URL]).toEqual(
censoredUrl
);
expect(consumeSpan.attributes[SEMATTRS_NET_PEER_NAME]).toEqual(
TEST_RABBITMQ_HOST
);
expect(consumeSpan.attributes[SEMATTRS_NET_PEER_PORT]).toEqual(
TEST_RABBITMQ_PORT
);

// new trace should be created
expect(consumeSpan.spanContext().traceId).not.toEqual(
publishSpan.spanContext().traceId
);
expect(consumeSpan.parentSpanId).toBeUndefined();

// link back to publish span
expect(consumeSpan.links.length).toBe(1);
expect(consumeSpan.links[0].context.traceId).toEqual(
publishSpan.spanContext().traceId
);
expect(consumeSpan.links[0].context.spanId).toEqual(
publishSpan.spanContext().spanId
);

done();
});
});
});
});
});
Loading
Loading