Skip to content
This repository has been archived by the owner on Oct 26, 2022. It is now read-only.

Commit

Permalink
Metrics (#280)
Browse files Browse the repository at this point in the history
  • Loading branch information
mirpo committed Jun 20, 2019
1 parent b1d9b2d commit 88a4f98
Show file tree
Hide file tree
Showing 14 changed files with 298 additions and 24 deletions.
2 changes: 1 addition & 1 deletion .idea/runConfigurations/publisher__30308_.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .idea/runConfigurations/publisher__30309_.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion bin/publisher.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env node

const util = require('util')
const { startNetworkNode } = require('../src/composition')

const port = process.argv[2] || 30302
Expand All @@ -21,11 +22,15 @@ startNetworkNode(host, port, id)
const timestamp = Date.now()
const msg = 'Hello world, ' + new Date().toLocaleString()

publisher.publish(streamId, 0, timestamp, 0, publisher.id, messageChainId, lastTimestamp, 0, {
publisher.publish(streamId, 0, timestamp, 0, publisher.opts.id, messageChainId, lastTimestamp, 0, {
msg
})
lastTimestamp = timestamp
}, intervalInMs)

setInterval(() => {
console.log(util.inspect(publisher.getMetrics(), false, null))
}, 5000)
})
.catch((err) => {
console.error(err)
Expand Down
4 changes: 4 additions & 0 deletions bin/subscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ startNetworkNode(host, port, id).then((subscriber) => {
subscriber.protocols.nodeToNode.on(NodeToNode.events.DATA_RECEIVED, (dataMessage) => {
console.log('received %s, data %j', dataMessage.getMessageId(), dataMessage.getData())
})

setInterval(() => {
console.log(subscriber.getMetrics())
}, 5000)
}).catch((err) => {
throw err
})
Expand Down
7 changes: 6 additions & 1 deletion bin/tracker.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ const maxNeighborsPerNode = process.argv[4] || 4
const id = `tracker-${port}`

startTracker(ip, port, id, maxNeighborsPerNode)
.then(() => {})
.then((tracker) => {
setInterval(() => {
console.log(tracker.getMetrics())
}, 5000)
})
.catch((err) => {
console.error(err)
process.exit(1)
Expand All @@ -17,3 +21,4 @@ startTracker(ip, port, id, maxNeighborsPerNode)
if (process.env.checkUncaughtException === 'true') {
process.on('uncaughtException', (err) => console.error((err && err.stack) ? err.stack : err))
}

18 changes: 14 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
"tracker": "node $NODE_DEBUG_OPTION bin/tracker.js",
"node": "node $NODE_DEBUG_OPTION bin/node.js",
"pub": "node $NODE_DEBUG_OPTION bin/publisher.js",
"pub-1": "node $NODE_DEBUG_OPTION bin/publisher.js 30323 127.0.0.1",
"pub-2": "node $NODE_DEBUG_OPTION bin/publisher.js 30333 127.0.0.1",
"pub-1": "node $NODE_DEBUG_OPTION bin/publisher.js 30323",
"pub-2": "node $NODE_DEBUG_OPTION bin/publisher.js 30333",
"sub": "node $NODE_DEBUG_OPTION bin/subscriber.js",
"sub-1": "node $NODE_DEBUG_OPTION bin/subscriber.js 30335 127.0.0.1",
"sub-2": "node $NODE_DEBUG_OPTION bin/subscriber.js 30345 127.0.0.1",
"sub-1": "node $NODE_DEBUG_OPTION bin/subscriber.js 30335",
"sub-2": "node $NODE_DEBUG_OPTION bin/subscriber.js 30345",
"storage": "node $NODE_DEBUG_OPTION bin/storage.js",
"test": "jest --detectOpenHandles",
"test-unit": "jest test/unit --detectOpenHandles",
Expand All @@ -50,14 +50,16 @@
"dependencies": {
"debug": "4.1.1",
"events": "3.0.0",
"prettysize": "^2.0.0",
"speedometer": "^1.1.0",
"uuid": "3.3.2",
"ws": "^7.0.0"
},
"devDependencies": {
"eslint": "^5.16.0",
"eslint-config-airbnb-base": "^13.1.0",
"eslint-config-streamr-nodejs": "^1.0.0",
"eslint-plugin-import": "^2.17.2",
"eslint-plugin-import": "^2.17.3",
"eslint-plugin-import-order": "2.1.4",
"into-stream": "^5.1.0",
"jest": "^24.7.1",
Expand Down
43 changes: 43 additions & 0 deletions src/connection/WsEndpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const url = require('url')
const debug = require('debug')('streamr:connection:ws-endpoint')
const WebSocket = require('ws')
const { disconnectionReasons } = require('../messages/messageTypes')
const Metrics = require('../metrics')
const Endpoint = require('./Endpoint')

class ReadyStateError extends Error {
Expand Down Expand Up @@ -62,6 +63,12 @@ class WsEndpoint extends EventEmitter {
this.endpoint = new Endpoint()
this.endpoint.implement(this)

this.metrics = new Metrics('WsEndpoint')

this.metrics.createSpeedometer('_inSpeed')
this.metrics.createSpeedometer('_outSpeed')
this.metrics.createSpeedometer('_msgSpeed')

this.connections = new Map()
this.pendingConnections = new Map()

Expand Down Expand Up @@ -93,6 +100,7 @@ class WsEndpoint extends EventEmitter {
// eslint-disable-next-line no-restricted-syntax
for (const [address, ws] of this.connections) {
if (ws.readyState !== 1) {
this.metrics.inc(`_checkConnections:readyState=${ws.readyState}`)
console.error(address + '\t\t\t' + ws.readyState)
}
}
Expand All @@ -101,25 +109,32 @@ class WsEndpoint extends EventEmitter {
send(recipientAddress, message) {
return new Promise((resolve, reject) => {
if (!this.isConnected(recipientAddress)) {
this.metrics.inc('send:failed:not-connected')
debug('cannot send to %s because not connected', recipientAddress)
reject(new Error(`cannot send to ${recipientAddress} because not connected`))
} else {
try {
const ws = this.connections.get(recipientAddress)
if (ws.readyState === ws.OPEN) {
this.metrics.speed('_outSpeed')(message.length)
this.metrics.speed('_msgSpeed')(1)

ws.send(message, (err) => {
if (err) {
reject(err)
} else {
this.metrics.inc('send:success')
debug('sent to %s message "%s"', recipientAddress, message)
resolve()
}
})
} else {
this.metrics.inc(`send:failed:readyState=${ws.readyState}`)
debug('sent failed because readyState of socket is %d', ws.readyState)
reject(new ReadyStateError(ws.readyState))
}
} catch (e) {
this.metrics.inc('send:failed')
console.error('sending to %s failed because of %s', recipientAddress, e)
reject(e)
}
Expand All @@ -128,6 +143,7 @@ class WsEndpoint extends EventEmitter {
}

onReceive(sender, message) {
this.metrics.inc('onReceive')
debug('received from %s message "%s"', sender, message)
this.emit(Endpoint.events.MESSAGE_RECEIVED, {
sender,
Expand All @@ -136,8 +152,10 @@ class WsEndpoint extends EventEmitter {
}

close(recipientAddress, reason) {
this.metrics.inc('close')
return new Promise((resolve, reject) => {
if (!this.isConnected(recipientAddress)) {
this.metrics.inc('close:error:not-connected')
debug('cannot close connection to %s because not connected', recipientAddress)
reject(new Error(`cannot close connection to ${recipientAddress} because not connected`))
} else {
Expand All @@ -146,6 +164,7 @@ class WsEndpoint extends EventEmitter {
const ws = this.connections.get(recipientAddress)
ws.close(1000, reason)
} catch (e) {
this.metrics.inc('close:error:failed')
console.error('closing connection to %s failed because of %s', recipientAddress, e)
reject(e)
}
Expand All @@ -154,11 +173,14 @@ class WsEndpoint extends EventEmitter {
}

connect(peerAddress) {
this.metrics.inc('connect')
if (this.isConnected(peerAddress)) {
this.metrics.inc('connect:already-connected')
debug('already connected to %s', peerAddress)
return Promise.resolve()
}
if (this.pendingConnections.has(peerAddress)) {
this.metrics.inc('connect:pending-connection')
debug('pending connection to %s', peerAddress)
return this.pendingConnections.get(peerAddress)
}
Expand All @@ -177,6 +199,7 @@ class WsEndpoint extends EventEmitter {
ws.on('open', () => {
if (!customHeadersOfServer) {
ws.terminate()
this.metrics.inc('connect:dropping-upgrade-never-received')
reject(new Error('dropping outgoing connection because upgrade event never received'))
} else {
this._onNewConnection(ws, peerAddress, customHeadersOfServer)
Expand All @@ -185,10 +208,12 @@ class WsEndpoint extends EventEmitter {
})

ws.on('error', (err) => {
this.metrics.inc('connect:failed-to-connect')
debug('failed to connect to %s, error: %o', peerAddress, err)
reject(new Error(err))
})
} catch (err) {
this.metrics.inc('connect:failed-to-connect')
debug('failed to connect to %s, error: %o', peerAddress, err)
reject(new Error(err))
}
Expand Down Expand Up @@ -227,6 +252,7 @@ class WsEndpoint extends EventEmitter {
const { address } = parameters.query

if (!address) {
this.metrics.inc('_onIncomingConnection:closed:no-address')
ws.terminate()
debug('dropped incoming connection from %s because address parameter missing',
req.connection.remoteAddress)
Expand All @@ -242,21 +268,28 @@ class WsEndpoint extends EventEmitter {
// Second condition is a tiebreaker to avoid both peers of simultaneously disconnecting their socket,
// thereby leaving no connection behind.
if (this.isConnected(address) && this.getAddress().localeCompare(address) === 1) {
this.metrics.inc('_onNewConnection:closed:dublicate')
debug('dropped new connection with %s because an existing connection already exists', address)
ws.close(1000, disconnectionReasons.DUPLICATE_SOCKET)
return
}

ws.on('message', (message) => {
// TODO check message.type [utf8|binary]
this.metrics.speed('_inSpeed')(message.length)
this.metrics.speed('_msgSpeed')(1)

this.onReceive(address, message)
})

ws.on('close', (code, reason) => {
if (reason === disconnectionReasons.DUPLICATE_SOCKET) {
this.metrics.inc('_onNewConnection:closed:dublicate')
debug('socket %s dropped from other side because existing connection already exists')
return
}

this.metrics.inc(`_onNewConnection:closed:code=${code}`)
debug('socket to %s closed (code %d, reason %s)', address, code, reason)
this.connections.delete(address)
debug('removed %s from connection list', address)
Expand All @@ -266,9 +299,19 @@ class WsEndpoint extends EventEmitter {
})

this.connections.set(address, ws)
this.metrics.set('connections', this.connections.size)
debug('added %s to connection list (headers %o)', address, customHeaders)
this.emit(Endpoint.events.PEER_CONNECTED, address, customHeaders)
}

getMetrics() {
return {
msg: this.metrics.speed('_msgSpeed')(),
inSpeed: this.metrics.speed('_inSpeed')(),
outSpeed: this.metrics.speed('_outSpeed')(),
metrics: this.metrics.report()
}
}
}

async function startWebSocketServer(host, port) {
Expand Down
Loading

0 comments on commit 88a4f98

Please sign in to comment.