Skip to content

Commit

Permalink
chore: Implemented DT header injection for message specs (#2250)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsumners-nr committed Jun 6, 2024
1 parent 26003d6 commit 3e6bc6d
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 29 deletions.
28 changes: 20 additions & 8 deletions lib/instrumentation/kafkajs/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,19 @@ module.exports = function instrumentProducer({ shim, kafkajs, recordMethodMetric
shim.recordProduce(producer, 'send', function nrSend(shim, fn, name, args) {
recordMethodMetric({ agent, name })
const data = args[0]
const firstMessage = getByPath(data, 'messages[0]')

if (firstMessage) {
firstMessage.headers = firstMessage.headers ?? {}
}

return new MessageSpec({
promise: true,
destinationName: data.topic,
destinationType: shim.TOPIC,
headers: firstMessage.headers
messageHeaders: (inject) => {
return data.messages.map((msg) => {
if (msg.headers) {
return inject(msg.headers)
}
msg.headers = {}
return inject(msg.headers)
})
}
})
})

Expand All @@ -52,7 +54,17 @@ module.exports = function instrumentProducer({ shim, kafkajs, recordMethodMetric
promise: true,
destinationName: data.topicMessages[0].topic,
destinationType: shim.TOPIC,
headers: firstMessage.headers
messageHeaders: (inject) => {
return data.topicMessages.map((tm) => {
return tm.messages.map((m) => {
if (m.headers) {
return inject(m.headers)
}
m.headers = {}
return inject(m.headers)
})
})
}
})
})

Expand Down
10 changes: 10 additions & 0 deletions lib/shim/message-shim/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,16 @@ function recordProduce(nodule, properties, recordNamer) {
if (msgDesc.headers) {
shim.insertCATRequestHeaders(msgDesc.headers, true)
}

if (msgDesc.messageHeaders) {
// Some message broker clients, like kafkajs, allow for sending multiple
// messages in one send. Clients are likely to pick up such messages
// individually. Thus, we need to propagate any distributed trace
// headers to every message in the payload.
msgDesc.messageHeaders((headers, altNames = true) => {
shim.insertCATRequestHeaders(headers, altNames)
})
}
}
msgDesc.recorder = genericRecorder
return msgDesc
Expand Down
23 changes: 23 additions & 0 deletions lib/shim/specs/message.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,23 @@ const OperationSpec = require('./operation')
* @property {number|string} [destinationName]
* @property {string|null} [destinationType]
* @property {Object<string, string>|null} [headers]
* @property {MessageBrokerHeadersFn|null} [messageHeaders]
* @property {MessageHandlerFunction|null} [messageHandler]
* @property {number|string|null} [queue]
* @property {string|null} [routingKey]
*/

/**
* @typedef {Function} MessageBrokerHeadersFn
* @param {Function} inject A function with the signature
* `function(headers, useAlternateHeaderNames)`. The passed in headers object
* will be updated with distributed trace headers. When the second parameter
* is `true` (the default), alternate style (not HTTP style) header names will
* be used, i.e. names that are safe for non-HTTP transports.
* @returns {object[]} An array of objects, wherein each object will be updated
* with distributed trace headers.
*/

/**
* Spec that describes how to instrument a message broker.
*/
Expand Down Expand Up @@ -46,6 +58,16 @@ class MessageSpec extends OperationSpec {
*/
headers

/**
* Function that returns an iterable set of message header objects. The
* header objects will be modified to include distributed tracing headers so
* that they will be included in the payloads delivered, and read from, the
* message broker.
*
* @type {MessageBrokerHeadersFn}
*/
messageHeaders

/**
* A function to handle the result of the instrumented message broker
* function.
Expand Down Expand Up @@ -81,6 +103,7 @@ class MessageSpec extends OperationSpec {
this.destinationName = params.destinationName ?? null
this.destinationType = params.destinationType ?? null
this.headers = params.headers ?? null
this.messageHeaders = params.messageHeaders ?? null
this.messageHandler = params.messageHandler ?? null
this.queue = params.queue ?? null
this.routingKey = params.routingKey ?? null
Expand Down
54 changes: 54 additions & 0 deletions test/unit/shim/message-shim.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ tap.test('MessageShim', function (t) {
getActiveSegment: function () {
return agent.tracer.getSegment()
},
sendMessages: function () {},
withNested: function () {
const segment = agent.tracer.getSegment()
segment.add('ChildSegment')
Expand Down Expand Up @@ -386,6 +387,59 @@ tap.test('MessageShim', function (t) {
})
})

t.test('should insert distributed trace headers in all messages', function (t) {
t.plan(1)
const messages = [{}, { headers: { foo: 'foo' } }, {}]

shim.recordProduce(
wrappable,
'sendMessages',
() =>
new MessageSpec({
messageHeaders(inject) {
for (const msg of messages) {
if (msg.headers) {
inject(msg.headers)
continue
}
msg.headers = {}
inject(msg.headers)
}
}
})
)

agent.on('transactionFinished', () => {
t.match(messages, [
{
headers: {
newrelic: '',
traceparent: /^00-/
}
},
{
headers: {
newrelic: '',
traceparent: /^00-/,
foo: 'foo'
}
},
{
headers: {
newrelic: '',
traceparent: /^00-/
}
}
])
t.end()
})

helper.runInTransaction(agent, (tx) => {
wrappable.sendMessages()
tx.end()
})
})

t.test('should create message broker metrics', function (t) {
let transaction = null

Expand Down
32 changes: 21 additions & 11 deletions test/versioned/kafkajs/kafka.tap.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ tap.afterEach(async (t) => {
})

tap.test('send records correctly', (t) => {
t.plan(5)
t.plan(7)

const { agent, consumer, producer, topic } = t.context
const message = 'test message'
Expand Down Expand Up @@ -87,7 +87,9 @@ tap.test('send records correctly', (t) => {
consumer.run({
eachMessage: async ({ message: actualMessage }) => {
t.equal(actualMessage.value.toString(), message)
t.match(actualMessage.headers['x-foo'].toString(), 'foo')
t.equal(actualMessage.headers['x-foo'].toString(), 'foo')
t.equal(actualMessage.headers.newrelic.toString(), '')
t.equal(actualMessage.headers.traceparent.toString().startsWith('00-'), true)
resolve()
}
})
Expand Down Expand Up @@ -123,7 +125,7 @@ tap.test('send passes along DT headers', (t) => {
agent.config.primary_application_id = 'app_1'
agent.config.trusted_account_key = 42
let produceTx = null
let consumeTx = null
const consumeTxs = []
let txCount = 0

agent.on('transactionFinished', (tx) => {
Expand All @@ -132,11 +134,11 @@ tap.test('send passes along DT headers', (t) => {
if (tx.name === expectedName) {
produceTx = tx
} else {
consumeTx = tx
consumeTxs.push(tx)
}

if (txCount === 2) {
utils.verifyDistributedTrace({ t, consumeTx, produceTx })
if (txCount === 3) {
utils.verifyDistributedTrace({ t, consumeTxs, produceTx })
t.end()
}
})
Expand All @@ -146,10 +148,13 @@ tap.test('send passes along DT headers', (t) => {
await consumer.subscribe({ topic, fromBeginning: true })

const promise = new Promise((resolve) => {
let msgCount = 0
consumer.run({
eachMessage: async ({ message: actualMessage }) => {
t.equal(actualMessage.value.toString(), 'one')
resolve()
eachMessage: async () => {
++msgCount
if (msgCount === 2) {
resolve()
}
}
})
})
Expand All @@ -158,7 +163,10 @@ tap.test('send passes along DT headers', (t) => {
await producer.send({
acks: 1,
topic,
messages: [{ key: 'key', value: 'one' }]
messages: [
{ key: 'key', value: 'one' },
{ key: 'key2', value: 'two' }
]
})

await promise
Expand All @@ -168,7 +176,7 @@ tap.test('send passes along DT headers', (t) => {
})

tap.test('sendBatch records correctly', (t) => {
t.plan(6)
t.plan(8)

const { agent, consumer, producer, topic } = t.context
const message = 'test message'
Expand Down Expand Up @@ -203,6 +211,8 @@ tap.test('sendBatch records correctly', (t) => {
eachMessage: async ({ message: actualMessage }) => {
t.equal(actualMessage.value.toString(), message)
t.match(actualMessage.headers['x-foo'].toString(), 'foo')
t.equal(actualMessage.headers.newrelic.toString(), '')
t.equal(actualMessage.headers.traceparent.toString().startsWith('00-'), true)
resolve()
}
})
Expand Down
20 changes: 10 additions & 10 deletions test/versioned/kafkajs/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,18 @@ utils.verifyConsumeTransaction = ({ t, tx, topic, clientId }) => {
* Asserts the properties on both the produce and consume transactions
* @param {object} params function params
* @param {object} params.t test instance
* @param {object} params.consumeTx consumer transaction
* @param {object} params.consumeTxs consumer transactions
* @param {object} params.produceTx produce transaction
*/
utils.verifyDistributedTrace = ({ t, consumeTx, produceTx }) => {
utils.verifyDistributedTrace = ({ t, consumeTxs, produceTx }) => {
t.ok(produceTx.isDistributedTrace, 'should mark producer as distributed')
t.ok(consumeTx.isDistributedTrace, 'should mark consumer as distributed')

t.equal(consumeTx.incomingCatId, null, 'should not set old CAT properties')

t.equal(produceTx.id, consumeTx.parentId, 'should have proper parent id')
t.equal(produceTx.traceId, consumeTx.traceId, 'should have proper trace id')
const produceSegment = produceTx.trace.root.children[3]
t.equal(produceSegment.id, consumeTx.parentSpanId, 'should have proper parentSpanId')
t.equal(consumeTx.parentTransportType, 'Kafka', 'should have correct transport type')
consumeTxs.forEach((consumeTx) => {
t.ok(consumeTx.isDistributedTrace, 'should mark consumer as distributed')
t.equal(consumeTx.incomingCatId, null, 'should not set old CAT properties')
t.equal(produceTx.id, consumeTx.parentId, 'should have proper parent id')
t.equal(produceTx.traceId, consumeTx.traceId, 'should have proper trace id')
t.equal(produceSegment.id, consumeTx.parentSpanId, 'should have proper parentSpanId')
t.equal(consumeTx.parentTransportType, 'Kafka', 'should have correct transport type')
})
}

0 comments on commit 3e6bc6d

Please sign in to comment.