Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

fix: peer disconnect event and improve logging performance #309

Merged
merged 3 commits into from
Mar 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/connection/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class BaseConnection extends EventEmitter {
*/
close (err) {
if (this._state._state === 'DISCONNECTING') return
this.log(`closing connection to ${this.theirB58Id}`)
this.log('closing connection to %s', this.theirB58Id)
if (err && this._events.error) {
this.emit('error', err)
}
Expand Down Expand Up @@ -80,7 +80,7 @@ class BaseConnection extends EventEmitter {
* @returns {void}
*/
_onDisconnected () {
this.log(`disconnected from ${this.theirB58Id}`)
this.log('disconnected from %s', this.theirB58Id)
this.emit('close')
this.removeAllListeners()
}
Expand All @@ -92,7 +92,7 @@ class BaseConnection extends EventEmitter {
* @returns {void}
*/
_onPrivatized () {
this.log(`successfully privatized incoming connection`)
this.log('successfully privatized incoming connection')
this.emit('private', this.conn)
}

Expand All @@ -113,7 +113,7 @@ class BaseConnection extends EventEmitter {
return this.close(err)
}

this.log(`successfully privatized conn to ${this.theirB58Id}`)
this.log('successfully privatized conn to %s', this.theirB58Id)
this.conn.setPeerInfo(this.theirPeerInfo)
this._state('done')
})
Expand Down
6 changes: 3 additions & 3 deletions src/connection/incoming.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ class IncomingConnectionFSM extends BaseConnection {
this._state.on('PRIVATIZED', () => this._onPrivatized())
this._state.on('ENCRYPTING', () => this._onEncrypting())
this._state.on('ENCRYPTED', () => {
this.log(`successfully encrypted connection to ${this.theirB58Id || 'unknown peer'}`)
this.log('successfully encrypted connection to %s', this.theirB58Id || 'unknown peer')
this.emit('encrypted', this.conn)
})
this._state.on('UPGRADING', () => this._onUpgrading())
this._state.on('MUXED', () => {
this.log(`successfully muxed connection to ${this.theirB58Id || 'unknown peer'}`)
this.log('successfully muxed connection to %s', this.theirB58Id || 'unknown peer')
this.emit('muxed', this.conn)
})
this._state.on('DISCONNECTING', () => {
Expand All @@ -81,7 +81,7 @@ class IncomingConnectionFSM extends BaseConnection {
* @returns {void}
*/
_onEncrypting () {
this.log(`encrypting connection via ${this.switch.crypto.tag}`)
this.log('encrypting connection via %s', this.switch.crypto.tag)

this.msListener.addHandler(this.switch.crypto.tag, (protocol, _conn) => {
this.conn = this.switch.crypto.encrypt(this.ourPeerInfo.id, _conn, undefined, (err) => {
Expand Down
37 changes: 18 additions & 19 deletions src/connection/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,17 @@ class ConnectionFSM extends BaseConnection {
this._state.on('PRIVATIZED', () => this._onPrivatized())
this._state.on('ENCRYPTING', () => this._onEncrypting())
this._state.on('ENCRYPTED', () => {
this.log(`successfully encrypted connection to ${this.theirB58Id}`)
this.log('successfully encrypted connection to %s', this.theirB58Id)
this.emit('encrypted', this.conn)
})
this._state.on('UPGRADING', () => this._onUpgrading())
this._state.on('MUXED', () => {
this.log(`successfully muxed connection to ${this.theirB58Id}`)
this.log('successfully muxed connection to %s', this.theirB58Id)
delete this.switch.conns[this.theirB58Id]
this.emit('muxed', this.muxer)
})
this._state.on('CONNECTED', () => {
this.log(`unmuxed connection opened to ${this.theirB58Id}`)
this.log('unmuxed connection opened to %s', this.theirB58Id)
this.emit('unmuxed', this.conn)
})
this._state.on('DISCONNECTING', () => this._onDisconnecting())
Expand Down Expand Up @@ -169,7 +169,7 @@ class ConnectionFSM extends BaseConnection {
return callback(err, null)
}

this.log(`created new stream to ${this.theirB58Id}`)
this.log('created new stream to %s', this.theirB58Id)
this._protocolHandshake(protocol, stream, callback)
})
}
Expand All @@ -194,7 +194,7 @@ class ConnectionFSM extends BaseConnection {
* @returns {void}
*/
_onDialing () {
this.log(`dialing ${this.theirB58Id}`)
this.log('dialing %s', this.theirB58Id)

if (!this.switch.hasTransports()) {
return this.close(NO_TRANSPORTS_REGISTERED())
Expand Down Expand Up @@ -226,7 +226,7 @@ class ConnectionFSM extends BaseConnection {
this.theirPeerInfo.multiaddrs.add(`/p2p-circuit/p2p/${this.theirB58Id}`)
}

this.log(`dialing transport ${transport}`)
this.log('dialing transport %s', transport)
this.switch.transport.dial(transport, this.theirPeerInfo, (errors, _conn) => {
if (errors) {
this.emit('error:connection_attempt_failed', errors)
Expand All @@ -250,7 +250,7 @@ class ConnectionFSM extends BaseConnection {
* @returns {void}
*/
_onDialed () {
this.log(`successfully dialed ${this.theirB58Id}`)
this.log('successfully dialed %s', this.theirB58Id)

this.emit('connected', this.conn)
}
Expand All @@ -261,33 +261,32 @@ class ConnectionFSM extends BaseConnection {
* @returns {void}
*/
_onDisconnecting () {
this.log(`disconnecting from ${this.theirB58Id}`)
this.log('disconnecting from %s', this.theirB58Id)

// Issue disconnects on both Peers
if (this.theirPeerInfo) {
this.theirPeerInfo.disconnect()
}

this.switch.connection.remove(this)

delete this.switch.conns[this.theirB58Id]

// Clean up stored connections
if (this.muxer) {
this.muxer.end()
delete this.muxer
this.switch.emit('peer-mux-closed', this.theirPeerInfo)
}

this.switch.connection.remove(this)

delete this.switch.conns[this.theirB58Id]
delete this.muxer

// If we have the base connection, abort it
if (this.conn) {
this.conn.source(true, () => {
this._state('done')
this.switch.emit('peer-mux-closed', this.theirPeerInfo)
delete this.conn
})
} else {
this._state('done')
this.switch.emit('peer-mux-closed', this.theirPeerInfo)
}
}

Expand Down Expand Up @@ -336,7 +335,7 @@ class ConnectionFSM extends BaseConnection {
*/
_onUpgrading () {
const muxers = Object.keys(this.switch.muxers)
this.log(`upgrading connection to ${this.theirB58Id}`)
this.log('upgrading connection to %s', this.theirB58Id)

if (muxers.length === 0) {
return this._state('stop')
Expand Down Expand Up @@ -376,7 +375,7 @@ class ConnectionFSM extends BaseConnection {

// For incoming streams, in case identify is on
this.muxer.on('stream', (conn) => {
this.log(`new stream created via muxer to ${this.theirB58Id}`)
this.log('new stream created via muxer to %s', this.theirB58Id)
conn.setPeerInfo(this.theirPeerInfo)
this.switch.protocolMuxer(null)(conn)
})
Expand Down Expand Up @@ -431,12 +430,12 @@ class ConnectionFSM extends BaseConnection {

msDialer.select(protocol, (err, _conn) => {
if (err) {
this.log(`could not perform protocol handshake: `, err)
this.log('could not perform protocol handshake:', err)
return callback(err, null)
}

const conn = observeConnection(null, protocol, _conn, this.switch.observer)
this.log(`successfully performed handshake of ${protocol} to ${this.theirB58Id}`)
this.log('successfully performed handshake of %s to %s', protocol, this.theirB58Id)
this.emit('connection', conn)
callback(null, conn)
})
Expand Down
8 changes: 4 additions & 4 deletions src/dialer.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const Connection = require('interface-connection').Connection
const ConnectionFSM = require('./connection')
const getPeerInfo = require('./get-peer-info')
const once = require('once')
const setImmediate = require('async/setImmediate')
const nextTick = require('async/nextTick')

const debug = require('debug')
const log = debug('libp2p:switch:dial')
Expand All @@ -22,7 +22,7 @@ function maybePerformHandshake ({ protocol, proxyConnection, connection, callbac
})
}

callback()
nextTick(callback)
}

/**
Expand Down Expand Up @@ -53,7 +53,7 @@ function dial (_switch, returnFSM) {
const peerInfo = getPeerInfo(peer, _switch._peerBook)
const b58Id = peerInfo.id.toB58String()

log(`dialing to ${b58Id.slice(0, 8)} with protocol ${protocol || 'unknown'}`)
log('dialing to %s with protocol %s', b58Id, protocol || 'unknown')

let connection = _switch.connection.getOne(b58Id)

Expand Down Expand Up @@ -89,7 +89,7 @@ function dial (_switch, returnFSM) {
const proxyConnection = new Connection()
proxyConnection.setPeerInfo(peerInfo)

setImmediate(() => {
nextTick(() => {
// If we have a muxed connection, attempt the protocol handshake
if (connection.getState() === 'MUXED') {
maybePerformHandshake({
Expand Down
6 changes: 3 additions & 3 deletions src/limit-dialer/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ class DialQueue {
*/
_doWork (transport, addr, token, callback) {
callback = once(callback)
log(`${transport.constructor.name}:work:start`)
log('work:start')
this._dialWithTimeout(transport, addr, (err, conn) => {
if (err) {
log.error(`${transport.constructor.name}:work`, err)
return callback(err)
}

if (token.cancel) {
log(`${transport.constructor.name}:work:cancel`)
log('work:cancel')
// clean up already done dials
pull(empty(), conn)
// If we can close the connection, do it
Expand All @@ -62,7 +62,7 @@ class DialQueue {
// one is enough
token.cancel = true

log(`${transport.constructor.name}:work:success`)
log('work:success')

const proxyConn = new Connection()
proxyConn.setInnerConn(conn)
Expand Down
2 changes: 1 addition & 1 deletion src/protocol-muxer.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ module.exports = function protocolMuxer (protocols, observer) {
}

const handler = (protocolName, _conn) => {
log(`registering handler with protocol ${protocolName}`)
log('registering handler with protocol %s', protocolName)
const protocol = protocols[protocolName]
if (protocol) {
const handlerFunc = protocol && protocol.handlerFunc
Expand Down