From e0532e9cf01712758d6cad73c9c8c2874fe1962d Mon Sep 17 00:00:00 2001 From: Erwan Guyader Date: Tue, 3 Oct 2023 11:40:30 +0200 Subject: [PATCH] fix: Process Chokidar events batches sequentially We buffer events emitted by Chokidar as the underlying FSEvents API, on macOS, can add delay between events relating to the same document and processing the first events without the latter ones can lead to wrong asumptions about what changes were actually made. Buffered events are then flushed 10 seconds after the last event was buffered (i.e. each buffered event resets the 10 seconds delay). Since this process is asynchronous and is only time-based, we can end up flushing buffered events while some previous batch is still being processed, creating situations where unexpected behavior can happen. To prevent this, we now prevent any buffer flushes while a batch is being processed and restore the flushing timeout once finished. This can add a small delay to the next batch processing but since we don't know if already buffered events could be flushed at that point it seems safer. --- core/local/chokidar/event_buffer.js | 25 +++-- core/local/chokidar/watcher.js | 26 ++--- test/unit/local/chokidar/event_buffer.js | 116 ++++++++++++++++------- test/unit/local/chokidar/watcher.js | 90 ++++++++++++------ 4 files changed, 174 insertions(+), 83 deletions(-) diff --git a/core/local/chokidar/event_buffer.js b/core/local/chokidar/event_buffer.js index ab5a3a7a3..7adacc0bd 100644 --- a/core/local/chokidar/event_buffer.js +++ b/core/local/chokidar/event_buffer.js @@ -49,25 +49,29 @@ class EventBuffer /*:: */ { push(event /*: EventType */) /*: void */ { this.events.push(event) - this.shiftTimeout() + this.resetTimeout() } unflush(events /*: Array */) /*: void */ { this.events = events.concat(this.events) - this.shiftTimeout() + this.resetTimeout() } - shiftTimeout() /*: void */ { - if (this.mode === 'timeout') { - this.clearTimeout() + setTimeout() /*: void */ { + if (this.mode === 'timeout' && this.timeout == null) { this.timeout = setTimeout(this.flush, this.timeoutInMs) } } clearTimeout() /*: void */ { - if (this.timeout != null) { - clearTimeout(this.timeout) - delete this.timeout + clearTimeout(this.timeout) + delete this.timeout + } + + resetTimeout() /*: void */ { + if (this.mode === 'timeout') { + this.clearTimeout() + this.setTimeout() } } @@ -81,8 +85,11 @@ class EventBuffer /*:: */ { } switchMode(mode /*: EventBufferMode */) /*: void */ { - this.flush() this.mode = mode + this.clearTimeout() + if (this.events.length > 0) { + this.setTimeout() + } } clear() /*: void */ { diff --git a/core/local/chokidar/watcher.js b/core/local/chokidar/watcher.js index 82b7ec0e7..070df4f81 100644 --- a/core/local/chokidar/watcher.js +++ b/core/local/chokidar/watcher.js @@ -177,7 +177,7 @@ class LocalWatcher { } this.watcher - .on('ready', () => this.buffer.switchMode('timeout')) + .on('ready', () => this.buffer.flush()) .on('raw', async (event, path, details) => { log.chokidar.debug({ event, path, details }, 'raw') @@ -210,10 +210,12 @@ class LocalWatcher { } // TODO: Start checksuming as soon as an add/change event is buffered - // TODO: Put flushed event batches in a queue async onFlush(rawEvents /*: ChokidarEvent[] */) { log.debug(`Flushed ${rawEvents.length} events`) + // Keep buffering all events while `rawEvents` are processed. + this.buffer.switchMode('idle') + this.events.emit('buffering-end') syncDir.ensureExistsSync(this) this.events.emit('local-start') @@ -233,6 +235,7 @@ class LocalWatcher { if (events.length === 0) { this.events.emit('local-end') if (this.initialScanParams != null) this.initialScanParams.resolve() + this.buffer.switchMode('timeout') return } @@ -267,6 +270,8 @@ class LocalWatcher { this.initialScanParams.resolve() this.initialScanParams = null } + + this.buffer.switchMode('timeout') } async stop(force /*: ?bool */ = false) { @@ -275,7 +280,7 @@ class LocalWatcher { if (!this.watcher) return if (force) { - // Drop buffered events + this.buffer.switchMode('idle') this.buffer.clear() } else { // XXX manually fire events for added file, because chokidar will cancel @@ -289,20 +294,17 @@ class LocalWatcher { log.warn({ err }, 'Could not fire remaining add events') } } + + // Trigger buffered events processing + this.buffer.flush() + + // Give some time for awaitWriteFinish events to be managed + await Promise.delay(1000) } // Stop underlying Chokidar watcher await this.watcher.close() this.watcher = null - // Stop accepting new events - this.buffer.switchMode('idle') - - if (!force) { - // Give some time for awaitWriteFinish events to be managed - return new Promise(resolve => { - setTimeout(resolve, 1000) - }) - } } /* Helpers */ diff --git a/test/unit/local/chokidar/event_buffer.js b/test/unit/local/chokidar/event_buffer.js index fb91bcc6c..9ecd58c17 100644 --- a/test/unit/local/chokidar/event_buffer.js +++ b/test/unit/local/chokidar/event_buffer.js @@ -30,6 +30,88 @@ onPlatform('darwin', () => { const event1 = { path: 'path/1' } const event2 = { path: 'path/2' } + describe('switchMode', () => { + context('with buffered events', () => { + beforeEach(() => { + buffer.push(event1) + buffer.push(event2) + }) + + context('from idle to timeout', () => { + beforeEach(() => { + buffer.mode = 'idle' + }) + + it('sets timeout', () => { + buffer.switchMode('timeout') + should(buffer).have.property('timeout') + }) + }) + + context('from timeout to idle', () => { + beforeEach(() => { + buffer.mode = 'timeout' + buffer.setTimeout() + }) + + it('clears existing timeout', () => { + clock.tick(TIMEOUT_IN_MS - 1) + buffer.switchMode('idle') + clock.tick(1) + should(flushed).not.have.been.called() + }) + + it('does not set new timeout', () => { + buffer.switchMode('idle') + should(buffer).not.have.property('timeout') + }) + }) + + context('from timeout to timeout', () => { + beforeEach(() => { + buffer.mode = 'timeout' + buffer.setTimeout() + }) + + it('clears existing timeout', () => { + clock.tick(TIMEOUT_IN_MS - 1) + buffer.switchMode('timeout') + clock.tick(1) + should(flushed).not.have.been.called() + }) + + it('sets new timeout', () => { + buffer.switchMode('timeout') + should(buffer).have.property('timeout') + }) + }) + }) + + context('without buffered events', () => { + context('from idle to timeout', () => { + beforeEach(() => { + buffer.mode = 'idle' + }) + + it('does not set timeout', () => { + buffer.switchMode('timeout') + should(buffer).not.have.property('timeout') + }) + }) + + context('from timeout to timeout', () => { + beforeEach(() => { + buffer.mode = 'timeout' + }) + + it('does not set timeout', () => { + buffer.switchMode('timeout') + should(buffer).not.have.property('timeout') + }) + }) + }) + }) + context('in idle mode (default)', () => { beforeEach(() => { clock.tick(TIMEOUT_IN_MS) @@ -51,11 +133,6 @@ onPlatform('darwin', () => { buffer.flush() should(flushed).have.been.calledWith([event1, event2]) }) - - it('can be switched to timeout mode', () => { - buffer.switchMode('timeout') - should(flushed).have.been.calledWith([event1, event2]) - }) }) context('in timeout mode', () => { @@ -63,35 +140,6 @@ onPlatform('darwin', () => { buffer.switchMode('timeout') }) - it('can be switched back to idle mode, canceling timeout if any', () => { - buffer.push(event1) - buffer.switchMode('idle') - clock.tick(TIMEOUT_IN_MS) - should(flushed).have.been.calledWith([event1]) - }) - - it('does not flush without events', () => { - clock.tick(TIMEOUT_IN_MS) - should(flushed).not.have.been.called() - }) - - context('when last event occured less than TIMEOUT_IN_MS ago', () => { - beforeEach(() => { - buffer.push(event1) - clock.tick(TIMEOUT_IN_MS - 1) - buffer.push(event2) - clock.tick(TIMEOUT_IN_MS - 1) - }) - - it('does not flush', () => { - should(flushed).not.have.been.called() - }) - - it('stores new events', () => { - should(buffer.events).deepEqual([event1, event2]) - }) - }) - context( 'when last event occured since or more than TIMEOUT_IN_MS ago', () => { diff --git a/test/unit/local/chokidar/watcher.js b/test/unit/local/chokidar/watcher.js index 960167beb..7ae691f45 100644 --- a/test/unit/local/chokidar/watcher.js +++ b/test/unit/local/chokidar/watcher.js @@ -18,6 +18,7 @@ const { ContextDir } = require('../../../support/helpers/context_dir') const { onPlatform } = require('../../../support/helpers/platform') const pouchHelpers = require('../../../support/helpers/pouch') +// TODO: run on darwin platform instead? onPlatform('linux', () => { describe('ChokidarWatcher Tests', function () { let builders @@ -27,8 +28,12 @@ onPlatform('linux', () => { beforeEach('instanciate local watcher', function () { builders = new Builders({ pouch: this.pouch }) this.prep = {} - const events = { emit: sinon.stub() } - this.watcher = new Watcher(this.syncPath, this.prep, this.pouch, events) + this.watcher = new Watcher( + this.syncPath, + this.prep, + this.pouch, + sinon.createStubInstance(EventEmitter) + ) }) afterEach('stop watcher and clean path', function (done) { this.watcher.stop(true) @@ -151,6 +156,29 @@ onPlatform('linux', () => { delete this.prep.putFolderAsync }) + it('switches buffer mode to idle then back to timeout', async function () { + const bufferSpy = sinon.spy(this.watcher.buffer, 'switchMode') + + try { + // Not an initial scan flush + this.watcher.initialScanParams = null + + this.watcher.buffer.push({ + type: 'addDir', + path: __dirname, + stats: builders.stats().build() + }) + await this.watcher.buffer.flush() + + should(bufferSpy).have.been.calledTwice() + should(bufferSpy.firstCall.calledWith('idle')).be.true() + should(bufferSpy.secondCall.calledWith('timeout')).be.true() + should(this.watcher.buffer.mode).equal('timeout') + } finally { + bufferSpy.restore() + } + }) + context( 'when processing the initial events of an empty sync directory', () => { @@ -178,9 +206,38 @@ onPlatform('linux', () => { this.watcher.pouch.initialScanDocs.restore() } }) + + it('switches buffer mode to idle then back to timeout', async function () { + const bufferSpy = sinon.spy(this.watcher.buffer, 'switchMode') + + try { + // Make sure we're in initial scan mode + this.watcher.initialScanParams = { + paths: [], + emptyDirRetryCount: 3, + resolve: Promise.resolve, + flushed: false + } + + this.watcher.buffer.push({ + type: 'addDir', + path: '' // XXX: events on the sync directory have an empty path + }) + await this.watcher.buffer.flush() + + should(bufferSpy).have.been.calledTwice() + should(bufferSpy.firstCall.calledWith('idle')).be.true() + should(bufferSpy.secondCall.calledWith('timeout')).be.true() + should(this.watcher.buffer.mode).equal('timeout') + } finally { + bufferSpy.restore() + } + }) } ) + // TODO: refactor to test that buffer is not flushed while another batch + // is being processed. context('while an initial scan is being processed', () => { const trigger = new EventEmitter() const SECOND_FLUSH_TRIGGER = 'second-flush' @@ -237,11 +294,6 @@ onPlatform('linux', () => { }) describe('onAddFile', () => { - if (process.env.APPVEYOR) { - it('is unstable on AppVeyor') - return - } - it('detects when a file is created', function () { return this.watcher.start().then(() => { this.prep.addFileAsync = function (side, doc) { @@ -282,11 +334,6 @@ onPlatform('linux', () => { }) describe('onAddDir', function () { - if (process.env.APPVEYOR) { - it('is unstable on AppVeyor') - return - } - it('detects when a folder is created', function () { return this.watcher.start().then(() => { this.prep.putFolderAsync = function (side, doc) { @@ -326,12 +373,7 @@ onPlatform('linux', () => { }) describe('onUnlinkFile', () => { - if (process.env.APPVEYOR) { - it('is unstable on AppVeyor') - return - } - - it.skip('detects when a file is deleted', function () { + it('detects when a file is deleted', function () { // This test does not create the file in pouchdb. // the watcher will not find a inode number for the unlink // and therefore discard it. @@ -353,12 +395,7 @@ onPlatform('linux', () => { }) describe('onUnlinkDir', () => { - if (process.env.APPVEYOR) { - it('is unstable on AppVeyor') - return - } - - it.skip('detects when a folder is deleted', function () { + it('detects when a folder is deleted', function () { // This test does not create the file in pouchdb. // the watcher will not find a inode number for the unlink // and therefore discard it. @@ -405,10 +442,7 @@ onPlatform('linux', () => { })) describe('when a file is moved', function () { - beforeEach('instanciate pouch', pouchHelpers.createDatabase) - afterEach('clean pouch', pouchHelpers.cleanDatabase) - - it.skip('deletes the source and adds the destination', function () { + it('deletes the source and adds the destination', function () { // This test does not create the file in pouchdb. // the watcher will not find a inode number for the unlink // and therefore discard it.