Skip to content

Commit

Permalink
Update Shuttle to use new Queue.
Browse files Browse the repository at this point in the history
Update the Shuttle to use the new Queue. Send the version change using
the new `Queue.version()`. Also setting `unref()` on the shuttle socket
so there is no more explicit close of the socket.

Closes #1037.
Clsoes #1018.
See #1023.
  • Loading branch information
flatheadmill committed Sep 10, 2019
1 parent b435d37 commit a36d9fc
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 188 deletions.
5 changes: 4 additions & 1 deletion prolific.shuttle/index.js
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
const Shuttle = require('./shuttle')
module.exports = new Shuttle
const coalesce = require('extant')
exports.create = function (options) {
return new Shuttle(coalesce(options, {}))
}
8 changes: 6 additions & 2 deletions prolific.shuttle/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@
"foremost": "1.0.x",
"prolific.evaluator": "3.0.x",
"prolific.level": "1.0.x",
"prolific.queue": "18.0.x",
"prolific.queue": "^19.0.0-alpha.3",
"prolific.require": "1.0.x",
"prolific.resolver": "9.0.x",
"prolific.sink": "7.0.x"
},
"devDependencies":
{
"mocha": "6.2.0"
"destructible": "^5.0.0-alpha.0",
"mocha": "6.2.0",
"prolific.collector": "^11.0.0-alpha.1",
"prolific.watcher": "^0.0.2",
"rimraf": "3.0.0"
},
"scripts":
{
Expand Down
117 changes: 51 additions & 66 deletions prolific.shuttle/shuttle.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,107 +6,92 @@ const Queue = require('prolific.queue')

const LEVEL = require('prolific.level')

const createUncaughtExceptionHandler = require('./uncaught')

const assert = require('assert')

const rethrow = require('./uncaught')

class Shuttle {
constructor () {
this.closed = false
this._initialized = false
constructor (options) {
this._queue = null
}

start (options) {
if (this._initialized) {
return Promise.resolve()
}
this._initialized = true
options || (options = {})
if (descendent.process.env.PROLIFIC_SUPERVISOR_PROCESS_ID != null) {
return this._listen(descendent, options)
this.monitored = true
this._initialze(options)
} else {
this.closed = true
return Promise.resolve()
this.monitored = false
}
}

_listen (descendent, options) {
const now = coalesce(options.Date, Date).now()
const monitorProcessId = +descendent.process.env.PROLIFIC_SUPERVISOR_PROCESS_ID

_initialze (options) {
descendent.increment()

const id = [ descendent.process.pid, now ]
const path = descendent.path.splice(descendent.path.indexOf(monitorProcessId))

const queue = new Queue(512, id, descendent.process.stderr, { path: path })
const send = queue.send()
this._queue = queue

// TODO Unhandled rejection.
if (options.uncaughtException != null) {
const handler = createUncaughtExceptionHandler(options.uncaughtException)
descendent.process.on('uncaughtException', (error) => {
this.close()
handler(error)
queue.exit()
throw error
})
const supervisorId = +descendent.process.env.PROLIFIC_SUPERVISOR_PROCESS_ID
const path = descendent.path.splice(descendent.path.indexOf(supervisorId))
const directory = descendent.process.env.PROLIFIC_TMPDIR

const queue = this._queue = new Queue(Date, directory,
path, coalesce(options.interval, 1000))

if (options.uncaughtException == null || options.uncaughtException) {
descendent.process.on('uncaughtException', rethrow('uncaught'))
}

if (options.unhandledRejection == null || options.unhandledRejection) {
descendent.process.on('unhandledRejection', rethrow('unhandled'))
}

if (options.exit == null || options.exit) {
descendent.process.on('exit', () => {
this.close()
queue.exit()
})
descendent.process.on('exit', this.exit.bind(this))
}

// All filtering will be performed by the monitor initially. Until
// we get a configuration we send everything.
const sink = require('prolific.resolver').sink

sink.json = function (level, qualifier, label, body, system) {
queue.push({
when: body.when || this.Date.now(), level, qualifier, label, body, system
})
}

this._handlers = { pipe: null, accept: null }

descendent.once('prolific:pipe', this._handlers.pipe = function (message, handle) {
queue.setPipe(handle)
})
descendent.on('prolific:accept', this._handlers.accept = function (message) {
assert(message.body.source)
const processor = Evaluator.create(message.body.source, message.body.file)
assert(processor.triage)
const triage = processor.triage(require('prolific.require').require)
sink.json = function (level, qualifier, label, body, system) {
if (triage(LEVEL[level], qualifier, label, body, system)) {
queue.push({
when: body.when || this.Date.now(), level, qualifier, label, body, system
})
const handlers = this._handlers = {
'prolific:pipe': function (message, handle) {
delete handlers['prolific:pipe']
handle.unref()
queue.setPipe(handle)
},
'prolific:accept': (message) => {
assert(message.body.source)
const processor = Evaluator.create(message.body.source, message.body.file)
assert(processor.triage)
const triage = processor.triage(require('prolific.require').require)
sink.json = function (level, qualifier, label, body, system) {
if (triage(LEVEL[level], qualifier, label, body, system)) {
queue.push({
when: body.when || this.Date.now(), level, qualifier, label, body, system
})
}
}
queue.version(message.body.version)
}
queue.push([{ method: 'version', version: message.body.version }])
})
}

descendent.up(monitorProcessId, 'prolific:shuttle', id.join('/'))
descendent.once('prolific:pipe', this._handlers['prolific:pipe'])
descendent.on('prolific:accept', this._handlers['prolific:accept'])

return send
}

close () {
if (!this.closed) {
this.closed = true
descendent.removeListener('prolific:pipe', this._handlers.pipe)
descendent.removeListener('prolific:accept', this._handlers.accept)
exit (code) {
if (this.monitored && !this.exited) {
this.exited = true
for (const name in this._handlers) {
descendent.removeListener(name, this._handlers[name])
}
descendent.decrement()
this._queue.close()
this._queue.exit(code)
}
}
}

Shuttle.sink = require('prolific.sink')
Shuttle.sink = require('prolific.resolver').sink

module.exports = Shuttle
Loading

0 comments on commit a36d9fc

Please sign in to comment.