diff --git a/prolific.shuttle/index.js b/prolific.shuttle/index.js index eabc7125..8c820443 100644 --- a/prolific.shuttle/index.js +++ b/prolific.shuttle/index.js @@ -1,2 +1,5 @@ const Shuttle = require('./shuttle') -module.exports = new Shuttle +const coalesce = require('extant') +exports.create = function (options) { + return new Shuttle(coalesce(options, {})) +} diff --git a/prolific.shuttle/package.json b/prolific.shuttle/package.json index b2c654f1..e9c62f1b 100644 --- a/prolific.shuttle/package.json +++ b/prolific.shuttle/package.json @@ -25,14 +25,18 @@ "foremost": "1.0.x", "prolific.evaluator": "3.0.x", "prolific.level": "1.0.x", - "prolific.queue": "18.0.x", + "prolific.queue": "^19.0.0-alpha.3", "prolific.require": "1.0.x", "prolific.resolver": "9.0.x", "prolific.sink": "7.0.x" }, "devDependencies": { - "mocha": "6.2.0" + "destructible": "^5.0.0-alpha.0", + "mocha": "6.2.0", + "prolific.collector": "^11.0.0-alpha.1", + "prolific.watcher": "^0.0.2", + "rimraf": "3.0.0" }, "scripts": { diff --git a/prolific.shuttle/shuttle.js b/prolific.shuttle/shuttle.js index b22bee8f..86cc5136 100644 --- a/prolific.shuttle/shuttle.js +++ b/prolific.shuttle/shuttle.js @@ -6,107 +6,92 @@ const Queue = require('prolific.queue') const LEVEL = require('prolific.level') -const createUncaughtExceptionHandler = require('./uncaught') - const assert = require('assert') +const rethrow = require('./uncaught') + class Shuttle { - constructor () { - this.closed = false - this._initialized = false + constructor (options) { this._queue = null - } - - start (options) { - if (this._initialized) { - return Promise.resolve() - } - this._initialized = true - options || (options = {}) if (descendent.process.env.PROLIFIC_SUPERVISOR_PROCESS_ID != null) { - return this._listen(descendent, options) + this.monitored = true + this._initialze(options) } else { - this.closed = true - return Promise.resolve() + this.monitored = false } } - _listen (descendent, options) { - const now = coalesce(options.Date, Date).now() - const monitorProcessId = +descendent.process.env.PROLIFIC_SUPERVISOR_PROCESS_ID - + _initialze (options) { descendent.increment() - const id = [ descendent.process.pid, now ] - const path = descendent.path.splice(descendent.path.indexOf(monitorProcessId)) - - const queue = new Queue(512, id, descendent.process.stderr, { path: path }) - const send = queue.send() - this._queue = queue - - // TODO Unhandled rejection. - if (options.uncaughtException != null) { - const handler = createUncaughtExceptionHandler(options.uncaughtException) - descendent.process.on('uncaughtException', (error) => { - this.close() - handler(error) - queue.exit() - throw error - }) + const supervisorId = +descendent.process.env.PROLIFIC_SUPERVISOR_PROCESS_ID + const path = descendent.path.splice(descendent.path.indexOf(supervisorId)) + const directory = descendent.process.env.PROLIFIC_TMPDIR + + const queue = this._queue = new Queue(Date, directory, + path, coalesce(options.interval, 1000)) + + if (options.uncaughtException == null || options.uncaughtException) { + descendent.process.on('uncaughtException', rethrow('uncaught')) + } + + if (options.unhandledRejection == null || options.unhandledRejection) { + descendent.process.on('unhandledRejection', rethrow('unhandled')) } if (options.exit == null || options.exit) { - descendent.process.on('exit', () => { - this.close() - queue.exit() - }) + descendent.process.on('exit', this.exit.bind(this)) } // All filtering will be performed by the monitor initially. Until // we get a configuration we send everything. const sink = require('prolific.resolver').sink + sink.json = function (level, qualifier, label, body, system) { queue.push({ when: body.when || this.Date.now(), level, qualifier, label, body, system }) } - this._handlers = { pipe: null, accept: null } - - descendent.once('prolific:pipe', this._handlers.pipe = function (message, handle) { - queue.setPipe(handle) - }) - descendent.on('prolific:accept', this._handlers.accept = function (message) { - assert(message.body.source) - const processor = Evaluator.create(message.body.source, message.body.file) - assert(processor.triage) - const triage = processor.triage(require('prolific.require').require) - sink.json = function (level, qualifier, label, body, system) { - if (triage(LEVEL[level], qualifier, label, body, system)) { - queue.push({ - when: body.when || this.Date.now(), level, qualifier, label, body, system - }) + const handlers = this._handlers = { + 'prolific:pipe': function (message, handle) { + delete handlers['prolific:pipe'] + handle.unref() + queue.setPipe(handle) + }, + 'prolific:accept': (message) => { + assert(message.body.source) + const processor = Evaluator.create(message.body.source, message.body.file) + assert(processor.triage) + const triage = processor.triage(require('prolific.require').require) + sink.json = function (level, qualifier, label, body, system) { + if (triage(LEVEL[level], qualifier, label, body, system)) { + queue.push({ + when: body.when || this.Date.now(), level, qualifier, label, body, system + }) + } } + queue.version(message.body.version) } - queue.push([{ method: 'version', version: message.body.version }]) - }) + } - descendent.up(monitorProcessId, 'prolific:shuttle', id.join('/')) + descendent.once('prolific:pipe', this._handlers['prolific:pipe']) + descendent.on('prolific:accept', this._handlers['prolific:accept']) - return send } - close () { - if (!this.closed) { - this.closed = true - descendent.removeListener('prolific:pipe', this._handlers.pipe) - descendent.removeListener('prolific:accept', this._handlers.accept) + exit (code) { + if (this.monitored && !this.exited) { + this.exited = true + for (const name in this._handlers) { + descendent.removeListener(name, this._handlers[name]) + } descendent.decrement() - this._queue.close() + this._queue.exit(code) } } } -Shuttle.sink = require('prolific.sink') +Shuttle.sink = require('prolific.resolver').sink module.exports = Shuttle diff --git a/prolific.shuttle/test/shuttle.test.js b/prolific.shuttle/test/shuttle.test.js index 33b0e441..35d934de 100644 --- a/prolific.shuttle/test/shuttle.test.js +++ b/prolific.shuttle/test/shuttle.test.js @@ -3,17 +3,67 @@ describe('shuttle', () => { const events = require('events') const stream = require('stream') const Shuttle = require('../shuttle') + const callback = require('prospective/callback') + const rimraf = require('rimraf') const descendent = require('descendent') const sink = require('prolific.sink') const path = require('path') - const fs = require('fs') - it('can set a pipe', () => { + const Watcher = require('prolific.watcher') + const Collector = require('prolific.collector') + const Destructible = require('destructible') + const fs = require('fs').promises + const TMPDIR = path.join(__dirname, 'tmp') + const dir = { + stage: path.resolve(TMPDIR, 'stage'), + publish: path.resolve(TMPDIR, 'publish') + } + process.on('unhandledRejection', error => { throw error }) + class Gatherer { + constructor (collector, method, count = 1) { + this._method = method + this._count = count + this.promise = new Promise(resolve => { + let count = 0 + const gathered = [] + collector.on('data', data => { + gathered.push(data) + if (data.body.method == this._method && ++count == this._count) { + resolve(gathered) + } + }) + }) + } + } + async function reset () { + await callback(callback => rimraf(TMPDIR, callback)) + await fs.mkdir(dir.publish, { recursive: true }) + await fs.mkdir(dir.stage, { recursive: true }) + // For some reason we need to wait a bit for the above directories to + // actually take effect on OS X, otherwise files from previous run are + // extant and the first events are reporting a missing file. + await new Promise(resolve => setTimeout(resolve, 50)) + const destructible = new Destructible(__filename) + const watcher = new Watcher(destructible, () => 0, path.join(TMPDIR, 'publish')) + const collector = new Collector + watcher.on('data', data => collector.data(data)) + return { destructible, watcher, collector } + } + it('will not initialize when not run under prolific', async () => { + descendent.process = new events.EventEmitter + descendent.process.env = {} + const shuttle = new Shuttle() + assert(!shuttle.monitored, 'not monitored') + shuttle.exit(0) + }) + it('can set a pipe', async () => { + const { destructible, collector } = await reset() + const gatherer = new Gatherer(collector, 'exit') const test = [] - const shuttle = new Shuttle descendent.process = new events.EventEmitter descendent.process.env = { PROLIFIC_SUPERVISOR_PROCESS_ID: '1', - DESCENDENT_PROCESS_PATH: '1' + DESCENDENT_PROCESS_PATH: '1', + PROLIFIC_TMPDIR: path.resolve(__dirname, 'tmp') } descendent.process.pid = 2 descendent.process.stderr = new stream.PassThrough @@ -21,116 +71,65 @@ describe('shuttle', () => { descendent.process.send = message => test.push(message) sink.properties.pid = 0 sink.Date = { now: function () { return 0 } } - shuttle.start({ exit: false, Date: { now: function () { return 0 } } }) - shuttle.start() - + const shuttle = new Shuttle({ + Date: { now: function () { return 0 } }, + interval: 1, + exit: false, + uncaughtException: false, + unhandledRejection: false + }) sink.json('error', 'example', 'message', { key: 'value' }, { pid: 0 }) - assert.deepStrictEqual(descendent.process.stderr.read().toString().split('\n'), [ - '% 2/0 71c17733 aaaaaaaa 1 %', - '{"method":"announce","body":{"path":[1,2]}}', - '' - ], 'stderr start') - - const pipe = new stream.PassThrough - descendent.emit('prolific:pipe', {}, pipe) + const input = new stream.PassThrough + const output = new stream.PassThrough + descendent.emit('prolific:pipe', {}, { input, output, unref: () => {} }) descendent.emit('prolific:accept', { body: { - source: fs.readFileSync(path.join(__dirname, 'processor.js')), + source: await fs.readFile(path.join(__dirname, 'processor.js')), file: path.join(__dirname, 'processor.js'), version: 1 } }) sink.json('error', 'example', 'droppable', { key: 'value' }, { pid: 0 }) sink.json('error', 'example', 'acceptible', { key: 'value' }, { pid: 0 }) - shuttle.close() - shuttle.close() - assert.deepStrictEqual(test, [{ - module: 'descendent', - method: 'route', - name: 'prolific:shuttle', - to: [ 1 ], - path: [ 2 ], - body: '2/0' - }], 'test') - }) - it('will not initialize when not run under prolific', async () => { - descendent.process = new events.EventEmitter - descendent.process.env = {} - const shuttle = new Shuttle - await shuttle.start() - assert(shuttle.closed, 'closed') - }) - it('will propagate an exception', async () => { - const test = [] - descendent.process = new events.EventEmitter - descendent.process.env = { - PROLIFIC_SUPERVISOR_PROCESS_ID: '1', - DESCENDENT_PROCESS_PATH: '1' - } - descendent.process.pid = 2 - descendent.process.stderr = new stream.PassThrough - descendent.process.connected = true - descendent.process.send = message => test.push(message) - const shuttle = new Shuttle - const start = shuttle.start({ - uncaughtException: error => test.push(error.message), - Date: { now: () => 0 } - }) - assert(!shuttle.closed, 'closed') - try { - descendent.process.emit('uncaughtException', new Error('uncaught')) - } catch (error) { - test.push(error.message) - } - assert(shuttle.closed, 'closed') - assert.deepStrictEqual(test, [{ - module: 'descendent', - method: 'route', - name: 'prolific:shuttle', - to: [ 1 ], - path: [ 2 ], - body: '2/0' - }, 'uncaught', 'uncaught' ], 'test') - await start - assert.deepStrictEqual(descendent.process.stderr.read().toString().split('\n'), [ - '% 2/0 71c17733 aaaaaaaa 1 %', - '{"method":"announce","body":{"path":[1,2]}}', - '% 2/0 b798da34 71c17733 1 %', - '{\"method\":\"exit\"}', - '' - ], 'stderr') + await new Promise(resolve => setTimeout(resolve, 100)) + shuttle.exit(0) + const gathered = await gatherer.promise + destructible.destroy() + await destructible.promise + assert.deepStrictEqual(gathered.map(entry => entry.body.method), [ + 'start', 'log', 'log', 'log', 'exit' + ], 'synchronous') + const asynchronous = output.read() + .toString() + .split('\n') + .filter(line => line) + .map(JSON.parse) + .map(entry => entry.method) + assert.deepStrictEqual(asynchronous, [ + 'entries', 'version', 'entries' + ], 'asynchronous') }) - it('will respond to an exit', async () => { + it('will set default handlers', async () => { + const { destructible, collector } = await reset() + const gatherer = new Gatherer(collector, 'exit') const test = [] descendent.process = new events.EventEmitter descendent.process.env = { PROLIFIC_SUPERVISOR_PROCESS_ID: '1', - DESCENDENT_PROCESS_PATH: '1' + DESCENDENT_PROCESS_PATH: '1', + PROLIFIC_TMPDIR: path.resolve(__dirname, 'tmp') } descendent.process.pid = 2 descendent.process.stderr = new stream.PassThrough descendent.process.connected = true descendent.process.send = message => test.push(message) - const shuttle = new Shuttle - const start = shuttle.start({ exit: true, Date: { now: () => 0 } }) - assert(!shuttle.closed, 'closed') - descendent.process.emit('exit') - assert(shuttle.closed, 'closed') - assert.deepStrictEqual(test, [{ - module: 'descendent', - method: 'route', - name: 'prolific:shuttle', - to: [ 1 ], - path: [ 2 ], - body: '2/0' - } ], 'test') - await start - assert.deepStrictEqual(descendent.process.stderr.read().toString().split('\n'), [ - '% 2/0 71c17733 aaaaaaaa 1 %', - '{"method":"announce","body":{"path":[1,2]}}', - '% 2/0 b798da34 71c17733 1 %', - '{\"method\":\"exit\"}', - '' - ], 'stderr') + const shuttle = require('..').create({}) + shuttle.exit(0) + const gathered = await gatherer.promise + destructible.destroy() + await destructible.promise + assert.deepStrictEqual(gathered.map(entry => entry.body.method), [ + 'start', 'exit' + ], 'synchronous') }) }) diff --git a/prolific.shuttle/test/uncaught.test.js b/prolific.shuttle/test/uncaught.test.js index dd972aad..51f7d871 100644 --- a/prolific.shuttle/test/uncaught.test.js +++ b/prolific.shuttle/test/uncaught.test.js @@ -1,20 +1,12 @@ describe('shuttle uncaught', () => { const assert = require('assert') - const createUncaughtExceptionHandler = require('../uncaught') - it('can handle an uncaught exception via a user function', () => { - const test = [] - createUncaughtExceptionHandler(function (error) { - test.push(error.message) - })(new Error('function')) - assert.deepStrictEqual(test, [ 'function' ], 'function handler') - }) - it('can log an uncaught exception', () => { - const test = [] - createUncaughtExceptionHandler({ - panic: function (message, properties) { - test.push(message, !! properties.stack) - } - })(new Error('handled')) - assert.deepStrictEqual(test, [ 'uncaught', true ], 'logger handler') + const uncaught = require('../uncaught') + it('can rethrow an uncaught exception', () => { + try { + require('../uncaught')('uncaught')(new Error('thrown')) + throw new Error + } catch (error) { + assert.equal(error.message, 'thrown', 'rethrown') + } }) }) diff --git a/prolific.shuttle/uncaught.js b/prolific.shuttle/uncaught.js index 6e73479a..48befea5 100644 --- a/prolific.shuttle/uncaught.js +++ b/prolific.shuttle/uncaught.js @@ -1,9 +1,8 @@ -module.exports = function (finale) { - if (typeof finale == 'function') { - return finale - } else { - return function (error) { - finale.panic('uncaught', { stack: error.stack }) - } +const logger = require('prolific.logger').createLogger('prolific.shuttle') + +module.exports = function (label) { + return function (error) { + logger.panic(label, { stack: error.stack }) + throw error } }