Skip to content

Commit

Permalink
Add in tests for useLinks option
Browse files Browse the repository at this point in the history
  • Loading branch information
McSick committed Mar 5, 2024
1 parent e8dece9 commit 16908aa
Show file tree
Hide file tree
Showing 2 changed files with 682 additions and 1 deletion.
276 changes: 275 additions & 1 deletion plugins/node/instrumentation-amqplib/test/amqplib-callbacks.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import {
registerInstrumentationTesting,
} from '@opentelemetry/contrib-test-utils';

registerInstrumentationTesting(new AmqplibInstrumentation());
const instrumentation = registerInstrumentationTesting(
new AmqplibInstrumentation()
);


import * as amqpCallback from 'amqplib/callback_api';
import {
Expand Down Expand Up @@ -409,4 +412,275 @@ describe('amqplib instrumentation callback model', () => {
});
});
});

describe('channel with links config', () => {
let channel: amqpCallback.Channel;
beforeEach(done => {
instrumentation.setConfig({
useLinksForConsume: true,
});
conn.createChannel(
context.bind(context.active(), (err, c) => {
channel = c;
// install an error handler, otherwise when we have tests that create error on the channel,
// it throws and crash process
channel.on('error', () => {});
channel.assertQueue(
queueName,
{ durable: false },
context.bind(context.active(), (err, ok) => {
channel.purgeQueue(
queueName,
context.bind(context.active(), (err, ok) => {
done();
})
);
})
);
})
);
});

afterEach(done => {
try {
channel.close(err => {
done();
});
} catch {}
});

it('simple publish and consume from queue callback', done => {
const hadSpaceInBuffer = channel.sendToQueue(
queueName,
Buffer.from(msgPayload)
);
expect(hadSpaceInBuffer).toBeTruthy();

asyncConsume(
channel,
queueName,
[msg => expect(msg.content.toString()).toEqual(msgPayload)],
{
noAck: true,
}
).then(() => {
const [publishSpan, consumeSpan] = getTestSpans();

// assert publish span
expect(publishSpan.kind).toEqual(SpanKind.PRODUCER);
expect(
publishSpan.attributes[SemanticAttributes.MESSAGING_SYSTEM]
).toEqual('rabbitmq');
expect(
publishSpan.attributes[SemanticAttributes.MESSAGING_DESTINATION]
).toEqual(''); // according to spec: "This will be an empty string if the default exchange is used"
expect(
publishSpan.attributes[SemanticAttributes.MESSAGING_DESTINATION_KIND]
).toEqual(MessagingDestinationKindValues.TOPIC);
expect(
publishSpan.attributes[
SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY
]
).toEqual(queueName);
expect(
publishSpan.attributes[SemanticAttributes.MESSAGING_PROTOCOL]
).toEqual('AMQP');
expect(
publishSpan.attributes[SemanticAttributes.MESSAGING_PROTOCOL_VERSION]
).toEqual('0.9.1');
expect(
publishSpan.attributes[SemanticAttributes.MESSAGING_URL]
).toEqual(censoredUrl);
expect(
publishSpan.attributes[SemanticAttributes.NET_PEER_NAME]
).toEqual(TEST_RABBITMQ_HOST);
expect(
publishSpan.attributes[SemanticAttributes.NET_PEER_PORT]
).toEqual(TEST_RABBITMQ_PORT);

// assert consume span
expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER);
expect(
consumeSpan.attributes[SemanticAttributes.MESSAGING_SYSTEM]
).toEqual('rabbitmq');
expect(
consumeSpan.attributes[SemanticAttributes.MESSAGING_DESTINATION]
).toEqual(''); // according to spec: "This will be an empty string if the default exchange is used"
expect(
consumeSpan.attributes[SemanticAttributes.MESSAGING_DESTINATION_KIND]
).toEqual(MessagingDestinationKindValues.TOPIC);
expect(
consumeSpan.attributes[
SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY
]
).toEqual(queueName);
expect(
consumeSpan.attributes[SemanticAttributes.MESSAGING_PROTOCOL]
).toEqual('AMQP');
expect(
consumeSpan.attributes[SemanticAttributes.MESSAGING_PROTOCOL_VERSION]
).toEqual('0.9.1');
expect(
consumeSpan.attributes[SemanticAttributes.MESSAGING_URL]
).toEqual(censoredUrl);
expect(
consumeSpan.attributes[SemanticAttributes.NET_PEER_NAME]
).toEqual(TEST_RABBITMQ_HOST);
expect(
consumeSpan.attributes[SemanticAttributes.NET_PEER_PORT]
).toEqual(TEST_RABBITMQ_PORT);

// new trace should be created
expect(consumeSpan.spanContext().traceId).not.toEqual(
publishSpan.spanContext().traceId
);
expect(consumeSpan.parentSpanId).toBeUndefined();

// link back to publish span
expect(consumeSpan.links.length).toBe(1);
expect(consumeSpan.links[0].context.traceId).toEqual(publishSpan.spanContext().traceId);
expect(consumeSpan.links[0].context.spanId).toEqual(publishSpan.spanContext().spanId);

done();
});
});
});

describe('confirm channel with links config', () => {
let confirmChannel: amqpCallback.ConfirmChannel;
beforeEach(done => {
instrumentation.setConfig({
useLinksForConsume: true,
});
conn.createConfirmChannel(
context.bind(context.active(), (err, c) => {
confirmChannel = c;
// install an error handler, otherwise when we have tests that create error on the channel,
// it throws and crash process
confirmChannel.on('error', () => {});
confirmChannel.assertQueue(
queueName,
{ durable: false },
context.bind(context.active(), (err, ok) => {
confirmChannel.purgeQueue(
queueName,
context.bind(context.active(), (err, ok) => {
done();
})
);
})
);
})
);
});

afterEach(done => {
try {
confirmChannel.close(err => {
done();
});
} catch {}
});

it('simple publish and consume from queue callback', done => {
asyncConfirmSend(confirmChannel, queueName, msgPayload).then(() => {
asyncConsume(
confirmChannel,
queueName,
[msg => expect(msg.content.toString()).toEqual(msgPayload)],
{
noAck: true,
}
).then(() => {
const [publishSpan, consumeSpan] = getTestSpans();

// assert publish span
expect(publishSpan.kind).toEqual(SpanKind.PRODUCER);
expect(
publishSpan.attributes[SemanticAttributes.MESSAGING_SYSTEM]
).toEqual('rabbitmq');
expect(
publishSpan.attributes[SemanticAttributes.MESSAGING_DESTINATION]
).toEqual(''); // according to spec: "This will be an empty string if the default exchange is used"
expect(
publishSpan.attributes[
SemanticAttributes.MESSAGING_DESTINATION_KIND
]
).toEqual(MessagingDestinationKindValues.TOPIC);
expect(
publishSpan.attributes[
SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY
]
).toEqual(queueName);
expect(
publishSpan.attributes[SemanticAttributes.MESSAGING_PROTOCOL]
).toEqual('AMQP');
expect(
publishSpan.attributes[
SemanticAttributes.MESSAGING_PROTOCOL_VERSION
]
).toEqual('0.9.1');
expect(
publishSpan.attributes[SemanticAttributes.MESSAGING_URL]
).toEqual(censoredUrl);
expect(
publishSpan.attributes[SemanticAttributes.NET_PEER_NAME]
).toEqual(TEST_RABBITMQ_HOST);
expect(
publishSpan.attributes[SemanticAttributes.NET_PEER_PORT]
).toEqual(TEST_RABBITMQ_PORT);

// assert consume span
expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER);
expect(
consumeSpan.attributes[SemanticAttributes.MESSAGING_SYSTEM]
).toEqual('rabbitmq');
expect(
consumeSpan.attributes[SemanticAttributes.MESSAGING_DESTINATION]
).toEqual(''); // according to spec: "This will be an empty string if the default exchange is used"
expect(
consumeSpan.attributes[
SemanticAttributes.MESSAGING_DESTINATION_KIND
]
).toEqual(MessagingDestinationKindValues.TOPIC);
expect(
consumeSpan.attributes[
SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY
]
).toEqual(queueName);
expect(
consumeSpan.attributes[SemanticAttributes.MESSAGING_PROTOCOL]
).toEqual('AMQP');
expect(
consumeSpan.attributes[
SemanticAttributes.MESSAGING_PROTOCOL_VERSION
]
).toEqual('0.9.1');
expect(
consumeSpan.attributes[SemanticAttributes.MESSAGING_URL]
).toEqual(censoredUrl);
expect(
consumeSpan.attributes[SemanticAttributes.NET_PEER_NAME]
).toEqual(TEST_RABBITMQ_HOST);
expect(
consumeSpan.attributes[SemanticAttributes.NET_PEER_PORT]
).toEqual(TEST_RABBITMQ_PORT);

// new trace should be created
expect(consumeSpan.spanContext().traceId).not.toEqual(
publishSpan.spanContext().traceId
);
expect(consumeSpan.parentSpanId).toBeUndefined();

// link back to publish span
expect(consumeSpan.links.length).toBe(1);
expect(consumeSpan.links[0].context.traceId).toEqual(publishSpan.spanContext().traceId);
expect(consumeSpan.links[0].context.spanId).toEqual(publishSpan.spanContext().spanId);

done();
});
});
});

});
});
Loading

0 comments on commit 16908aa

Please sign in to comment.