Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: send config to transports on init #1930

Merged
merged 3 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions lib/multistream.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ function multistream (streamsArray, opts) {
const res = {
write,
add,
emit,
flushSync,
end,
minLevel: 0,
Expand Down Expand Up @@ -79,6 +80,14 @@ function multistream (streamsArray, opts) {
}
}

function emit (...args) {
for (const { stream } of this.streams) {
if (typeof stream.emit === 'function') {
stream.emit(...args)
}
}
}

function flushSync () {
for (const { stream } of this.streams) {
if (typeof stream.flushSync === 'function') {
Expand Down Expand Up @@ -153,6 +162,7 @@ function multistream (streamsArray, opts) {
minLevel: level,
streams,
clone,
emit,
flushSync,
[metadata]: true
}
Expand Down
2 changes: 2 additions & 0 deletions lib/transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ function transport (fullOptions) {
options.dedupe = dedupe
}

options.pinoWillSendConfig = true

return buildStream(fixTarget(target), options, worker)

function fixTarget (origin) {
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,14 @@
"atomic-sleep": "^1.0.0",
"fast-redact": "^3.1.1",
"on-exit-leak-free": "^2.1.0",
"pino-abstract-transport": "^1.1.0",
"pino-abstract-transport": "^1.2.0",
"pino-std-serializers": "^6.0.0",
"process-warning": "^3.0.0",
"quick-format-unescaped": "^4.0.3",
"real-require": "^0.2.0",
"safe-stable-stringify": "^2.3.1",
"sonic-boom": "^3.7.0",
"thread-stream": "^2.0.0"
"thread-stream": "^2.6.0"
},
"tsd": {
"directory": "test/types"
Expand Down
4 changes: 4 additions & 0 deletions pino.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ function pino (...args) {
assertDefaultLevelFound(level, customLevels, useOnlyCustomLevels)
const levels = mappings(customLevels, useOnlyCustomLevels)

if (typeof stream.emit === 'function') {
stream.emit('message', { code: 'PINO_CONFIG', config: { levels, messageKey, errorKey } })
}

assertLevelComparison(levelComparison)
const levelCompFunc = genLevelComparison(levelComparison)

Expand Down
33 changes: 33 additions & 0 deletions test/basic.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,39 @@ test('serializers can return undefined to strip field', async ({ equal }) => {
equal('test' in result, false)
})

test('streams receive a message event with PINO_CONFIG', ({ match, end }) => {
const stream = sink()
stream.once('message', (message) => {
match(message, {
code: 'PINO_CONFIG',
config: {
errorKey: 'err',
levels: {
labels: {
10: 'trace',
20: 'debug',
30: 'info',
40: 'warn',
50: 'error',
60: 'fatal'
},
values: {
debug: 20,
error: 50,
fatal: 60,
info: 30,
trace: 10,
warn: 40
}
},
messageKey: 'msg'
}
})
end()
})
pino(stream)
})

test('does not explode with a circular ref', async ({ doesNotThrow }) => {
const stream = sink()
const instance = pino(stream)
Expand Down
33 changes: 33 additions & 0 deletions test/fixtures/transport-uses-pino-config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
'use strict'

const build = require('pino-abstract-transport')
const { pipeline, Transform } = require('stream')
module.exports = () => {
return build(function (source) {
const myTransportStream = new Transform({
autoDestroy: true,
objectMode: true,
transform (chunk, enc, cb) {
const {
time,
level,
[source.messageKey]: body,
[source.errorKey]: error,
...attributes
} = chunk
this.push(JSON.stringify({
severityText: source.levels.labels[level],
body,
attributes,
...(error && { error })
}))
cb()
}
})
pipeline(source, myTransportStream, () => {})
return myTransportStream
}, {
enablePipelining: true,
expectPinoConfig: true
})
}
19 changes: 19 additions & 0 deletions test/fixtures/transport-worker-data.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
'use strict'

const { parentPort, workerData } = require('worker_threads')
const { Writable } = require('stream')

module.exports = (options) => {
const myTransportStream = new Writable({
autoDestroy: true,
write (chunk, enc, cb) {
parentPort.postMessage({
code: 'EVENT',
name: 'workerData',
args: [workerData]
})
cb()
}
})
return myTransportStream
}
15 changes: 14 additions & 1 deletion test/multistream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const pino = require('../')
const multistream = pino.multistream
const proxyquire = require('proxyquire')
const strip = require('strip-ansi')
const { file } = require('./helper')
const { file, sink } = require('./helper')

test('sends to multiple streams using string levels', function (t) {
let messageCount = 0
Expand Down Expand Up @@ -246,6 +246,19 @@ test('supports pretty print', function (t) {
log.info('pretty print')
})

test('emit propagates events to each stream', function (t) {
t.plan(3)
const handler = function (data) {
t.equal(data.msg, 'world')
}
const streams = [sink(), sink(), sink()]
streams.forEach(function (s) {
s.once('hello', handler)
})
const stream = multistream(streams)
stream.emit('hello', { msg: 'world' })
})

test('children support custom levels', function (t) {
const stream = writeStream(function (data, enc, cb) {
t.equal(JSON.parse(data).msg, 'bar')
Expand Down
13 changes: 13 additions & 0 deletions test/transport/core.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,19 @@ test('pino.transport with target pino-pretty', async ({ match, teardown }) => {
match(strip(actual), /\[.*\] INFO.*hello/)
})

test('sets worker data informing the transport that pino will send its config', ({ match, plan, teardown }) => {
plan(1)
const transport = pino.transport({
target: join(__dirname, '..', 'fixtures', 'transport-worker-data.js')
})
teardown(transport.end.bind(transport))
const instance = pino(transport)
transport.once('workerData', (workerData) => {
match(workerData.workerData, { pinoWillSendConfig: true })
})
instance.info('hello')
})

test('stdout in worker', async ({ not }) => {
let actual = ''
const child = execa(process.argv[0], [join(__dirname, '..', 'fixtures', 'transport-main.js')])
Expand Down
167 changes: 167 additions & 0 deletions test/transport/uses-pino-config.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
'use strict'

const os = require('os')
const { join } = require('path')
const { readFile } = require('fs').promises
const writeStream = require('flush-write-stream')
const { watchFileCreated, file } = require('../helper')
const { test } = require('tap')
const pino = require('../../')

const { pid } = process
const hostname = os.hostname()

function serializeError (error) {
return {
type: error.name,
message: error.message,
stack: error.stack
}
}

function parseLogs (buffer) {
return JSON.parse(`[${buffer.toString().replace(/}{/g, '},{')}]`)
}

test('transport uses pino config', async ({ same, teardown, plan }) => {
plan(1)
const destination = file()
const transport = pino.transport({
pipeline: [{
target: join(__dirname, '..', 'fixtures', 'transport-uses-pino-config.js')
}, {
target: 'pino/file',
options: { destination }
}]
})
teardown(transport.end.bind(transport))
const instance = pino({
messageKey: 'customMessageKey',
errorKey: 'customErrorKey',
customLevels: { custom: 35 }
}, transport)

const error = new Error('bar')
instance.custom('foo')
instance.error(error)
await watchFileCreated(destination)
const result = parseLogs(await readFile(destination))

same(result, [{
severityText: 'custom',
body: 'foo',
attributes: {
pid,
hostname
}
}, {
severityText: 'error',
body: 'bar',
attributes: {
pid,
hostname
},
error: serializeError(error)
}])
})

test('transport uses pino config without customizations', async ({ same, teardown, plan }) => {
plan(1)
const destination = file()
const transport = pino.transport({
pipeline: [{
target: join(__dirname, '..', 'fixtures', 'transport-uses-pino-config.js')
}, {
target: 'pino/file',
options: { destination }
}]
})
teardown(transport.end.bind(transport))
const instance = pino(transport)

const error = new Error('qux')
instance.info('baz')
instance.error(error)
await watchFileCreated(destination)
const result = parseLogs(await readFile(destination))

same(result, [{
severityText: 'info',
body: 'baz',
attributes: {
pid,
hostname
}
}, {
severityText: 'error',
body: 'qux',
attributes: {
pid,
hostname
},
error: serializeError(error)
}])
})

test('transport uses pino config with multistream', async ({ same, teardown, plan }) => {
plan(2)
const destination = file()
const messages = []
const stream = writeStream(function (data, enc, cb) {
const message = JSON.parse(data)
delete message.time
messages.push(message)
cb()
})
const transport = pino.transport({
pipeline: [{
target: join(__dirname, '..', 'fixtures', 'transport-uses-pino-config.js')
}, {
target: 'pino/file',
options: { destination }
}]
})
teardown(transport.end.bind(transport))
const instance = pino({
messageKey: 'customMessageKey',
errorKey: 'customErrorKey',
customLevels: { custom: 35 }
}, pino.multistream([transport, { stream }]))

const error = new Error('buzz')
const serializedError = serializeError(error)
instance.custom('fizz')
instance.error(error)
await watchFileCreated(destination)
const result = parseLogs(await readFile(destination))

same(result, [{
severityText: 'custom',
body: 'fizz',
attributes: {
pid,
hostname
}
}, {
severityText: 'error',
body: 'buzz',
attributes: {
pid,
hostname
},
error: serializedError
}])

same(messages, [{
level: 35,
pid,
hostname,
customMessageKey: 'fizz'
}, {
level: 50,
pid,
hostname,
customErrorKey: serializedError,
customMessageKey: 'buzz'
}])
})
Loading