Skip to content

Commit

Permalink
fix: Process Chokidar events batches sequentially
Browse files Browse the repository at this point in the history
  We buffer events emitted by Chokidar as the underlying FSEvents API,
  on macOS, can add delay between events relating to the same document
  and processing the first events without the latter ones can lead to
  wrong asumptions about what changes were actually made.

  Buffered events are then flushed 10 seconds after the last event was
  buffered (i.e. each buffered event resets the 10 seconds delay).

  Since this process is asynchronous and is only time-based, we can end
  up flushing buffered events while some previous batch is still being
  processed, creating situations where unexpected behavior can happen.

  To prevent this, we now prevent any buffer flushes while a batch is
  being processed and restore the flushing timeout once finished.
  This can add a small delay to the next batch processing but since we
  don't know if already buffered events could be flushed at that point
  it seems safer.
  • Loading branch information
taratatach committed Oct 3, 2023
1 parent 753ca3c commit e0532e9
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 83 deletions.
25 changes: 16 additions & 9 deletions core/local/chokidar/event_buffer.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,25 +49,29 @@ class EventBuffer /*:: <EventType> */ {

push(event /*: EventType */) /*: void */ {
this.events.push(event)
this.shiftTimeout()
this.resetTimeout()
}

unflush(events /*: Array<EventType> */) /*: void */ {
this.events = events.concat(this.events)
this.shiftTimeout()
this.resetTimeout()
}

shiftTimeout() /*: void */ {
if (this.mode === 'timeout') {
this.clearTimeout()
setTimeout() /*: void */ {
if (this.mode === 'timeout' && this.timeout == null) {
this.timeout = setTimeout(this.flush, this.timeoutInMs)
}
}

clearTimeout() /*: void */ {
if (this.timeout != null) {
clearTimeout(this.timeout)
delete this.timeout
clearTimeout(this.timeout)
delete this.timeout
}

resetTimeout() /*: void */ {
if (this.mode === 'timeout') {
this.clearTimeout()
this.setTimeout()
}
}

Expand All @@ -81,8 +85,11 @@ class EventBuffer /*:: <EventType> */ {
}

switchMode(mode /*: EventBufferMode */) /*: void */ {
this.flush()
this.mode = mode
this.clearTimeout()
if (this.events.length > 0) {
this.setTimeout()
}
}

clear() /*: void */ {
Expand Down
26 changes: 14 additions & 12 deletions core/local/chokidar/watcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class LocalWatcher {
}

this.watcher
.on('ready', () => this.buffer.switchMode('timeout'))
.on('ready', () => this.buffer.flush())
.on('raw', async (event, path, details) => {
log.chokidar.debug({ event, path, details }, 'raw')

Expand Down Expand Up @@ -210,10 +210,12 @@ class LocalWatcher {
}

// TODO: Start checksuming as soon as an add/change event is buffered
// TODO: Put flushed event batches in a queue
async onFlush(rawEvents /*: ChokidarEvent[] */) {
log.debug(`Flushed ${rawEvents.length} events`)

// Keep buffering all events while `rawEvents` are processed.
this.buffer.switchMode('idle')

this.events.emit('buffering-end')
syncDir.ensureExistsSync(this)
this.events.emit('local-start')
Expand All @@ -233,6 +235,7 @@ class LocalWatcher {
if (events.length === 0) {
this.events.emit('local-end')
if (this.initialScanParams != null) this.initialScanParams.resolve()
this.buffer.switchMode('timeout')
return
}

Expand Down Expand Up @@ -267,6 +270,8 @@ class LocalWatcher {
this.initialScanParams.resolve()
this.initialScanParams = null
}

this.buffer.switchMode('timeout')
}

async stop(force /*: ?bool */ = false) {
Expand All @@ -275,7 +280,7 @@ class LocalWatcher {
if (!this.watcher) return

if (force) {
// Drop buffered events
this.buffer.switchMode('idle')
this.buffer.clear()
} else {
// XXX manually fire events for added file, because chokidar will cancel
Expand All @@ -289,20 +294,17 @@ class LocalWatcher {
log.warn({ err }, 'Could not fire remaining add events')
}
}

// Trigger buffered events processing
this.buffer.flush()

// Give some time for awaitWriteFinish events to be managed
await Promise.delay(1000)
}

// Stop underlying Chokidar watcher
await this.watcher.close()
this.watcher = null
// Stop accepting new events
this.buffer.switchMode('idle')

if (!force) {
// Give some time for awaitWriteFinish events to be managed
return new Promise(resolve => {
setTimeout(resolve, 1000)
})
}
}

/* Helpers */
Expand Down
116 changes: 82 additions & 34 deletions test/unit/local/chokidar/event_buffer.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,88 @@ onPlatform('darwin', () => {
const event1 = { path: 'path/1' }
const event2 = { path: 'path/2' }

describe('switchMode', () => {
context('with buffered events', () => {
beforeEach(() => {
buffer.push(event1)
buffer.push(event2)
})

context('from idle to timeout', () => {
beforeEach(() => {
buffer.mode = 'idle'
})

it('sets timeout', () => {
buffer.switchMode('timeout')
should(buffer).have.property('timeout')
})
})

context('from timeout to idle', () => {
beforeEach(() => {
buffer.mode = 'timeout'
buffer.setTimeout()
})

it('clears existing timeout', () => {
clock.tick(TIMEOUT_IN_MS - 1)
buffer.switchMode('idle')
clock.tick(1)
should(flushed).not.have.been.called()
})

it('does not set new timeout', () => {
buffer.switchMode('idle')
should(buffer).not.have.property('timeout')
})
})

context('from timeout to timeout', () => {
beforeEach(() => {
buffer.mode = 'timeout'
buffer.setTimeout()
})

it('clears existing timeout', () => {
clock.tick(TIMEOUT_IN_MS - 1)
buffer.switchMode('timeout')
clock.tick(1)
should(flushed).not.have.been.called()
})

it('sets new timeout', () => {
buffer.switchMode('timeout')
should(buffer).have.property('timeout')
})
})
})

context('without buffered events', () => {
context('from idle to timeout', () => {
beforeEach(() => {
buffer.mode = 'idle'
})

it('does not set timeout', () => {
buffer.switchMode('timeout')
should(buffer).not.have.property('timeout')
})
})

context('from timeout to timeout', () => {
beforeEach(() => {
buffer.mode = 'timeout'
})

it('does not set timeout', () => {
buffer.switchMode('timeout')
should(buffer).not.have.property('timeout')
})
})
})
})

context('in idle mode (default)', () => {
beforeEach(() => {
clock.tick(TIMEOUT_IN_MS)
Expand All @@ -51,47 +133,13 @@ onPlatform('darwin', () => {
buffer.flush()
should(flushed).have.been.calledWith([event1, event2])
})

it('can be switched to timeout mode', () => {
buffer.switchMode('timeout')
should(flushed).have.been.calledWith([event1, event2])
})
})

context('in timeout mode', () => {
beforeEach(() => {
buffer.switchMode('timeout')
})

it('can be switched back to idle mode, canceling timeout if any', () => {
buffer.push(event1)
buffer.switchMode('idle')
clock.tick(TIMEOUT_IN_MS)
should(flushed).have.been.calledWith([event1])
})

it('does not flush without events', () => {
clock.tick(TIMEOUT_IN_MS)
should(flushed).not.have.been.called()
})

context('when last event occured less than TIMEOUT_IN_MS ago', () => {
beforeEach(() => {
buffer.push(event1)
clock.tick(TIMEOUT_IN_MS - 1)
buffer.push(event2)
clock.tick(TIMEOUT_IN_MS - 1)
})

it('does not flush', () => {
should(flushed).not.have.been.called()
})

it('stores new events', () => {
should(buffer.events).deepEqual([event1, event2])
})
})

context(
'when last event occured since or more than TIMEOUT_IN_MS ago',
() => {
Expand Down
Loading

0 comments on commit e0532e9

Please sign in to comment.