diff --git a/core/local/chokidar/event_buffer.js b/core/local/chokidar/event_buffer.js index ab5a3a7a3..7adacc0bd 100644 --- a/core/local/chokidar/event_buffer.js +++ b/core/local/chokidar/event_buffer.js @@ -49,25 +49,29 @@ class EventBuffer /*:: */ { push(event /*: EventType */) /*: void */ { this.events.push(event) - this.shiftTimeout() + this.resetTimeout() } unflush(events /*: Array */) /*: void */ { this.events = events.concat(this.events) - this.shiftTimeout() + this.resetTimeout() } - shiftTimeout() /*: void */ { - if (this.mode === 'timeout') { - this.clearTimeout() + setTimeout() /*: void */ { + if (this.mode === 'timeout' && this.timeout == null) { this.timeout = setTimeout(this.flush, this.timeoutInMs) } } clearTimeout() /*: void */ { - if (this.timeout != null) { - clearTimeout(this.timeout) - delete this.timeout + clearTimeout(this.timeout) + delete this.timeout + } + + resetTimeout() /*: void */ { + if (this.mode === 'timeout') { + this.clearTimeout() + this.setTimeout() } } @@ -81,8 +85,11 @@ class EventBuffer /*:: */ { } switchMode(mode /*: EventBufferMode */) /*: void */ { - this.flush() this.mode = mode + this.clearTimeout() + if (this.events.length > 0) { + this.setTimeout() + } } clear() /*: void */ { diff --git a/core/local/chokidar/watcher.js b/core/local/chokidar/watcher.js index 82b7ec0e7..070df4f81 100644 --- a/core/local/chokidar/watcher.js +++ b/core/local/chokidar/watcher.js @@ -177,7 +177,7 @@ class LocalWatcher { } this.watcher - .on('ready', () => this.buffer.switchMode('timeout')) + .on('ready', () => this.buffer.flush()) .on('raw', async (event, path, details) => { log.chokidar.debug({ event, path, details }, 'raw') @@ -210,10 +210,12 @@ class LocalWatcher { } // TODO: Start checksuming as soon as an add/change event is buffered - // TODO: Put flushed event batches in a queue async onFlush(rawEvents /*: ChokidarEvent[] */) { log.debug(`Flushed ${rawEvents.length} events`) + // Keep buffering all events while `rawEvents` are processed. + this.buffer.switchMode('idle') + this.events.emit('buffering-end') syncDir.ensureExistsSync(this) this.events.emit('local-start') @@ -233,6 +235,7 @@ class LocalWatcher { if (events.length === 0) { this.events.emit('local-end') if (this.initialScanParams != null) this.initialScanParams.resolve() + this.buffer.switchMode('timeout') return } @@ -267,6 +270,8 @@ class LocalWatcher { this.initialScanParams.resolve() this.initialScanParams = null } + + this.buffer.switchMode('timeout') } async stop(force /*: ?bool */ = false) { @@ -275,7 +280,7 @@ class LocalWatcher { if (!this.watcher) return if (force) { - // Drop buffered events + this.buffer.switchMode('idle') this.buffer.clear() } else { // XXX manually fire events for added file, because chokidar will cancel @@ -289,20 +294,17 @@ class LocalWatcher { log.warn({ err }, 'Could not fire remaining add events') } } + + // Trigger buffered events processing + this.buffer.flush() + + // Give some time for awaitWriteFinish events to be managed + await Promise.delay(1000) } // Stop underlying Chokidar watcher await this.watcher.close() this.watcher = null - // Stop accepting new events - this.buffer.switchMode('idle') - - if (!force) { - // Give some time for awaitWriteFinish events to be managed - return new Promise(resolve => { - setTimeout(resolve, 1000) - }) - } } /* Helpers */ diff --git a/test/unit/local/chokidar/event_buffer.js b/test/unit/local/chokidar/event_buffer.js index fb91bcc6c..9ecd58c17 100644 --- a/test/unit/local/chokidar/event_buffer.js +++ b/test/unit/local/chokidar/event_buffer.js @@ -30,6 +30,88 @@ onPlatform('darwin', () => { const event1 = { path: 'path/1' } const event2 = { path: 'path/2' } + describe('switchMode', () => { + context('with buffered events', () => { + beforeEach(() => { + buffer.push(event1) + buffer.push(event2) + }) + + context('from idle to timeout', () => { + beforeEach(() => { + buffer.mode = 'idle' + }) + + it('sets timeout', () => { + buffer.switchMode('timeout') + should(buffer).have.property('timeout') + }) + }) + + context('from timeout to idle', () => { + beforeEach(() => { + buffer.mode = 'timeout' + buffer.setTimeout() + }) + + it('clears existing timeout', () => { + clock.tick(TIMEOUT_IN_MS - 1) + buffer.switchMode('idle') + clock.tick(1) + should(flushed).not.have.been.called() + }) + + it('does not set new timeout', () => { + buffer.switchMode('idle') + should(buffer).not.have.property('timeout') + }) + }) + + context('from timeout to timeout', () => { + beforeEach(() => { + buffer.mode = 'timeout' + buffer.setTimeout() + }) + + it('clears existing timeout', () => { + clock.tick(TIMEOUT_IN_MS - 1) + buffer.switchMode('timeout') + clock.tick(1) + should(flushed).not.have.been.called() + }) + + it('sets new timeout', () => { + buffer.switchMode('timeout') + should(buffer).have.property('timeout') + }) + }) + }) + + context('without buffered events', () => { + context('from idle to timeout', () => { + beforeEach(() => { + buffer.mode = 'idle' + }) + + it('does not set timeout', () => { + buffer.switchMode('timeout') + should(buffer).not.have.property('timeout') + }) + }) + + context('from timeout to timeout', () => { + beforeEach(() => { + buffer.mode = 'timeout' + }) + + it('does not set timeout', () => { + buffer.switchMode('timeout') + should(buffer).not.have.property('timeout') + }) + }) + }) + }) + context('in idle mode (default)', () => { beforeEach(() => { clock.tick(TIMEOUT_IN_MS) @@ -51,11 +133,6 @@ onPlatform('darwin', () => { buffer.flush() should(flushed).have.been.calledWith([event1, event2]) }) - - it('can be switched to timeout mode', () => { - buffer.switchMode('timeout') - should(flushed).have.been.calledWith([event1, event2]) - }) }) context('in timeout mode', () => { @@ -63,35 +140,6 @@ onPlatform('darwin', () => { buffer.switchMode('timeout') }) - it('can be switched back to idle mode, canceling timeout if any', () => { - buffer.push(event1) - buffer.switchMode('idle') - clock.tick(TIMEOUT_IN_MS) - should(flushed).have.been.calledWith([event1]) - }) - - it('does not flush without events', () => { - clock.tick(TIMEOUT_IN_MS) - should(flushed).not.have.been.called() - }) - - context('when last event occured less than TIMEOUT_IN_MS ago', () => { - beforeEach(() => { - buffer.push(event1) - clock.tick(TIMEOUT_IN_MS - 1) - buffer.push(event2) - clock.tick(TIMEOUT_IN_MS - 1) - }) - - it('does not flush', () => { - should(flushed).not.have.been.called() - }) - - it('stores new events', () => { - should(buffer.events).deepEqual([event1, event2]) - }) - }) - context( 'when last event occured since or more than TIMEOUT_IN_MS ago', () => { diff --git a/test/unit/local/chokidar/watcher.js b/test/unit/local/chokidar/watcher.js index 960167beb..7ae691f45 100644 --- a/test/unit/local/chokidar/watcher.js +++ b/test/unit/local/chokidar/watcher.js @@ -18,6 +18,7 @@ const { ContextDir } = require('../../../support/helpers/context_dir') const { onPlatform } = require('../../../support/helpers/platform') const pouchHelpers = require('../../../support/helpers/pouch') +// TODO: run on darwin platform instead? onPlatform('linux', () => { describe('ChokidarWatcher Tests', function () { let builders @@ -27,8 +28,12 @@ onPlatform('linux', () => { beforeEach('instanciate local watcher', function () { builders = new Builders({ pouch: this.pouch }) this.prep = {} - const events = { emit: sinon.stub() } - this.watcher = new Watcher(this.syncPath, this.prep, this.pouch, events) + this.watcher = new Watcher( + this.syncPath, + this.prep, + this.pouch, + sinon.createStubInstance(EventEmitter) + ) }) afterEach('stop watcher and clean path', function (done) { this.watcher.stop(true) @@ -151,6 +156,29 @@ onPlatform('linux', () => { delete this.prep.putFolderAsync }) + it('switches buffer mode to idle then back to timeout', async function () { + const bufferSpy = sinon.spy(this.watcher.buffer, 'switchMode') + + try { + // Not an initial scan flush + this.watcher.initialScanParams = null + + this.watcher.buffer.push({ + type: 'addDir', + path: __dirname, + stats: builders.stats().build() + }) + await this.watcher.buffer.flush() + + should(bufferSpy).have.been.calledTwice() + should(bufferSpy.firstCall.calledWith('idle')).be.true() + should(bufferSpy.secondCall.calledWith('timeout')).be.true() + should(this.watcher.buffer.mode).equal('timeout') + } finally { + bufferSpy.restore() + } + }) + context( 'when processing the initial events of an empty sync directory', () => { @@ -178,9 +206,38 @@ onPlatform('linux', () => { this.watcher.pouch.initialScanDocs.restore() } }) + + it('switches buffer mode to idle then back to timeout', async function () { + const bufferSpy = sinon.spy(this.watcher.buffer, 'switchMode') + + try { + // Make sure we're in initial scan mode + this.watcher.initialScanParams = { + paths: [], + emptyDirRetryCount: 3, + resolve: Promise.resolve, + flushed: false + } + + this.watcher.buffer.push({ + type: 'addDir', + path: '' // XXX: events on the sync directory have an empty path + }) + await this.watcher.buffer.flush() + + should(bufferSpy).have.been.calledTwice() + should(bufferSpy.firstCall.calledWith('idle')).be.true() + should(bufferSpy.secondCall.calledWith('timeout')).be.true() + should(this.watcher.buffer.mode).equal('timeout') + } finally { + bufferSpy.restore() + } + }) } ) + // TODO: refactor to test that buffer is not flushed while another batch + // is being processed. context('while an initial scan is being processed', () => { const trigger = new EventEmitter() const SECOND_FLUSH_TRIGGER = 'second-flush' @@ -237,11 +294,6 @@ onPlatform('linux', () => { }) describe('onAddFile', () => { - if (process.env.APPVEYOR) { - it('is unstable on AppVeyor') - return - } - it('detects when a file is created', function () { return this.watcher.start().then(() => { this.prep.addFileAsync = function (side, doc) { @@ -282,11 +334,6 @@ onPlatform('linux', () => { }) describe('onAddDir', function () { - if (process.env.APPVEYOR) { - it('is unstable on AppVeyor') - return - } - it('detects when a folder is created', function () { return this.watcher.start().then(() => { this.prep.putFolderAsync = function (side, doc) { @@ -326,12 +373,7 @@ onPlatform('linux', () => { }) describe('onUnlinkFile', () => { - if (process.env.APPVEYOR) { - it('is unstable on AppVeyor') - return - } - - it.skip('detects when a file is deleted', function () { + it('detects when a file is deleted', function () { // This test does not create the file in pouchdb. // the watcher will not find a inode number for the unlink // and therefore discard it. @@ -353,12 +395,7 @@ onPlatform('linux', () => { }) describe('onUnlinkDir', () => { - if (process.env.APPVEYOR) { - it('is unstable on AppVeyor') - return - } - - it.skip('detects when a folder is deleted', function () { + it('detects when a folder is deleted', function () { // This test does not create the file in pouchdb. // the watcher will not find a inode number for the unlink // and therefore discard it. @@ -405,10 +442,7 @@ onPlatform('linux', () => { })) describe('when a file is moved', function () { - beforeEach('instanciate pouch', pouchHelpers.createDatabase) - afterEach('clean pouch', pouchHelpers.cleanDatabase) - - it.skip('deletes the source and adds the destination', function () { + it('deletes the source and adds the destination', function () { // This test does not create the file in pouchdb. // the watcher will not find a inode number for the unlink // and therefore discard it.