Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

Commit

Permalink
fix: use labels to differentiate interfaces for metrics
Browse files Browse the repository at this point in the history
Instead of inserting the interface address into the metric name,
use the metric address as a label prefix for the value being reported.

This allows our metric names to be stable even if you don't
know the ip/port combo that will be used ahead of time.

The tradeoff is the label names may change between restarts if
the port number changes, but we have to apply a disambguator somewhere.

Depends on:

- [ ] libp2p/js-libp2p-prometheus-metrics#6
  • Loading branch information
achingbrain committed Nov 20, 2022
1 parent de04ef4 commit 2d161e3
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 36 deletions.
9 changes: 5 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ class TCP implements Transport {

if (components.metrics != null) {
this.metrics = {
dialerEvents: components.metrics.registerCounterGroup('libp2p_tcp_dialer_errors_total', {
dialerEvents: components.metrics.registerCounterGroup('libp2p_tcp_dialer_events_total', {
label: 'event',
help: 'Total count of TCP dialer errors by error type'
}),
listenerEvents: components.metrics.registerCounterGroup('libp2p_tcp_listener_errors_total', {
listenerEvents: components.metrics.registerCounterGroup('libp2p_tcp_listener_events_total', {
label: 'event',
help: 'Total count of TCP listener errors by error type'
})
Expand Down Expand Up @@ -111,11 +111,12 @@ class TCP implements Transport {
signal: options.signal,
socketInactivityTimeout: this.opts.outboundSocketInactivityTimeout,
socketCloseTimeout: this.opts.socketCloseTimeout,
metrics: this.metrics?.dialerEvents
metrics: this.metrics?.dialerEvents,
metricPrefix: ''
})
log('new outbound connection %s', maConn.remoteAddr)
const conn = await options.upgrader.upgradeOutbound(maConn)
log('outbound connection %s upgraded', maConn.remoteAddr)
log('outbound connection upgraded %s', maConn.remoteAddr)
return conn
}

Expand Down
52 changes: 31 additions & 21 deletions src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import type { MultiaddrConnection, Connection } from '@libp2p/interface-connecti
import type { Upgrader, Listener, ListenerEvents } from '@libp2p/interface-transport'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { TCPCreateListenerOptions } from './index.js'
import type { CounterGroup, Metric, Metrics } from '@libp2p/interface-metrics'
import type { CounterGroup, MetricGroup, Metrics } from '@libp2p/interface-metrics'

const log = logger('libp2p:tcp:listener')

Expand Down Expand Up @@ -39,7 +39,7 @@ const SERVER_STATUS_UP = 1
const SERVER_STATUS_DOWN = 0

export interface TCPListenerMetrics {
status: Metric
status: MetricGroup
errors: CounterGroup
events: CounterGroup
}
Expand All @@ -52,12 +52,14 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
private readonly connections = new Set<MultiaddrConnection>()
private status: Status = { started: false }
private metrics?: TCPListenerMetrics
private addr: string

constructor (private readonly context: Context) {
super()

context.keepAlive = context.keepAlive ?? true

this.addr = 'unknown'
this.server = net.createServer(context, this.onSocket.bind(this))

// https://nodejs.org/api/net.html#servermaxconnections
Expand All @@ -72,39 +74,44 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
if (context.metrics != null) {
// we are listening, register metrics for our port
const address = this.server.address()
let addr: string

if (address == null) {
addr = 'unknown'
this.addr = 'unknown'
} else if (typeof address === 'string') {
// unix socket
addr = address
this.addr = address
} else {
addr = `${address.address}:${address.port}`
this.addr = `${address.address}:${address.port}`
}

context.metrics?.registerMetric(`libp2p_tcp_connections_${addr}_total`, {
context.metrics?.registerMetricGroup('libp2p_tcp_inbound_connections_total', {
label: 'address',
help: 'Current active connections in TCP listener',
calculate: () => {
return this.connections.size
return {
[this.addr]: this.connections.size
}
}
})

this.metrics = {
status: context.metrics.registerMetric(`libp2p_tcp_${addr}_server_status_info`, {
status: context.metrics.registerMetricGroup('libp2p_tcp_server_status_info', {
label: 'address',
help: 'Current status of the TCP server'
}),
errors: context.metrics.registerCounterGroup(`libp2p_tcp_${addr}_server_errors_total`, {
label: 'error',
errors: context.metrics.registerMetricGroup('libp2p_tcp_server_errors_total', {
label: 'address',
help: 'Total count of TCP listener errors by error type'
}),
events: context.metrics.registerCounterGroup(`libp2p_tcp_${addr}_socket_events_total`, {
label: 'event',
events: context.metrics.registerMetricGroup('libp2p_tcp_socket_events_total', {
label: 'address',
help: 'Total count of TCP socket events by event'
})
}

this.metrics?.status.update(SERVER_STATUS_UP)
this.metrics?.status.update({
[this.addr]: SERVER_STATUS_UP
})
}

this.dispatchEvent(new CustomEvent('listening'))
Expand All @@ -114,7 +121,9 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
this.dispatchEvent(new CustomEvent<Error>('error', { detail: err }))
})
.on('close', () => {
this.metrics?.status.update(SERVER_STATUS_DOWN)
this.metrics?.status.update({
[this.addr]: SERVER_STATUS_DOWN
})
this.dispatchEvent(new CustomEvent('close'))
})
}
Expand All @@ -123,7 +132,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
// Avoid uncaught errors caused by unstable connections
socket.on('error', err => {
log('socket error', err)
this.metrics?.events.increment({ error: true })
this.metrics?.events.increment({ [`${this.addr} error`]: true })
})

let maConn: MultiaddrConnection
Expand All @@ -132,19 +141,20 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
listeningAddr: this.status.started ? this.status.listeningAddr : undefined,
socketInactivityTimeout: this.context.socketInactivityTimeout,
socketCloseTimeout: this.context.socketCloseTimeout,
metrics: this.metrics?.events
metrics: this.metrics?.events,
metricPrefix: `${this.addr} `
})
} catch (err) {
log.error('inbound connection failed', err)
this.metrics?.errors.increment({ inbound_to_connection: true })
this.metrics?.errors.increment({ [`${this.addr} inbound_to_connection`]: true })
return
}

log('new inbound connection %s', maConn.remoteAddr)
try {
this.context.upgrader.upgradeInbound(maConn)
.then((conn) => {
log('inbound connection %s upgraded', maConn.remoteAddr)
log('inbound connection upgraded %s', maConn.remoteAddr)
this.connections.add(maConn)

socket.once('close', () => {
Expand All @@ -159,7 +169,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
})
.catch(async err => {
log.error('inbound connection failed', err)
this.metrics?.errors.increment({ inbound_upgrade: true })
this.metrics?.errors.increment({ [`${this.addr} inbound_upgrade`]: true })

await attemptClose(maConn)
})
Expand All @@ -172,7 +182,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
attemptClose(maConn)
.catch(err => {
log.error('closing inbound connection failed', err)
this.metrics?.errors.increment({ inbound_closing_failed: true })
this.metrics?.errors.increment({ [`${this.addr} inbound_closing_failed`]: true })
})
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/socket-to-conn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ interface ToConnectionOptions {
socketInactivityTimeout?: number
socketCloseTimeout?: number
metrics?: CounterGroup
metricPrefix: string
}

/**
Expand Down Expand Up @@ -63,7 +64,7 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#socketsettimeouttimeout-callback
socket.setTimeout(inactivityTimeout, () => {
log('%s socket read timeout', lOptsStr)
metrics?.increment({ timeout: true })
metrics?.increment({ [`${options.metricPrefix}timeout`]: true })

// only destroy with an error if the remote has not sent the FIN message
let err: Error | undefined
Expand All @@ -78,7 +79,7 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio

socket.once('close', () => {
log('%s socket read timeout', lOptsStr)
metrics?.increment({ close: true })
metrics?.increment({ [`${options.metricPrefix}close`]: true })

// In instances where `close` was not explicitly called,
// such as an iterable stream ending, ensure we have set the close
Expand All @@ -92,7 +93,7 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio
// the remote sent a FIN packet which means no more data will be sent
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#event-end
log('socket ended', maConn.remoteAddr.toString())
metrics?.increment({ end: true })
metrics?.increment({ [`${options.metricPrefix}end`]: true })
})

const maConn: MultiaddrConnection = {
Expand Down
24 changes: 16 additions & 8 deletions test/socket-to-conn.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ describe('socket-to-conn', () => {
const serverErrored = defer<Error>()

const inboundMaConn = toMultiaddrConnection(serverSocket, {
socketInactivityTimeout: 100
socketInactivityTimeout: 100,
metricPrefix: 'test'
})
expect(inboundMaConn.timeline.open).to.be.ok()
expect(inboundMaConn.timeline.close).to.not.be.ok()
Expand Down Expand Up @@ -122,7 +123,8 @@ describe('socket-to-conn', () => {
const serverErrored = defer<Error>()

const inboundMaConn = toMultiaddrConnection(serverSocket, {
socketInactivityTimeout: 100
socketInactivityTimeout: 100,
metricPrefix: 'test'
})
expect(inboundMaConn.timeline.open).to.be.ok()
expect(inboundMaConn.timeline.close).to.not.be.ok()
Expand Down Expand Up @@ -172,7 +174,8 @@ describe('socket-to-conn', () => {
const serverErrored = defer<Error>()

const inboundMaConn = toMultiaddrConnection(serverSocket, {
socketInactivityTimeout: 100
socketInactivityTimeout: 100,
metricPrefix: 'test'
})
expect(inboundMaConn.timeline.open).to.be.ok()
expect(inboundMaConn.timeline.close).to.not.be.ok()
Expand Down Expand Up @@ -214,7 +217,8 @@ describe('socket-to-conn', () => {
const serverErrored = defer<Error>()

const inboundMaConn = toMultiaddrConnection(serverSocket, {
socketInactivityTimeout: 100
socketInactivityTimeout: 100,
metricPrefix: 'test'
})
expect(inboundMaConn.timeline.open).to.be.ok()
expect(inboundMaConn.timeline.close).to.not.be.ok()
Expand Down Expand Up @@ -255,7 +259,8 @@ describe('socket-to-conn', () => {

const inboundMaConn = toMultiaddrConnection(serverSocket, {
socketInactivityTimeout: 100,
socketCloseTimeout: 10
socketCloseTimeout: 10,
metricPrefix: 'test'
})
expect(inboundMaConn.timeline.open).to.be.ok()
expect(inboundMaConn.timeline.close).to.not.be.ok()
Expand Down Expand Up @@ -294,7 +299,8 @@ describe('socket-to-conn', () => {

const inboundMaConn = toMultiaddrConnection(serverSocket, {
socketInactivityTimeout: 100,
socketCloseTimeout: 10
socketCloseTimeout: 10,
metricPrefix: 'test'
})
expect(inboundMaConn.timeline.open).to.be.ok()
expect(inboundMaConn.timeline.close).to.not.be.ok()
Expand Down Expand Up @@ -333,7 +339,8 @@ describe('socket-to-conn', () => {

const inboundMaConn = toMultiaddrConnection(serverSocket, {
socketInactivityTimeout: 500,
socketCloseTimeout: 100
socketCloseTimeout: 100,
metricPrefix: 'test'
})
expect(inboundMaConn.timeline.open).to.be.ok()
expect(inboundMaConn.timeline.close).to.not.be.ok()
Expand Down Expand Up @@ -377,7 +384,8 @@ describe('socket-to-conn', () => {

const inboundMaConn = toMultiaddrConnection(serverSocket, {
socketInactivityTimeout: 100,
socketCloseTimeout: 100
socketCloseTimeout: 100,
metricPrefix: 'test'
})
expect(inboundMaConn.timeline.open).to.be.ok()
expect(inboundMaConn.timeline.close).to.not.be.ok()
Expand Down

0 comments on commit 2d161e3

Please sign in to comment.