Skip to content

Commit

Permalink
fix: prevent listener leak on unsubscribe before eventsource module l…
Browse files Browse the repository at this point in the history
…oad (#783)
  • Loading branch information
rexxars authored May 3, 2024
1 parent 7297ead commit f38b64e
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 11 deletions.
29 changes: 19 additions & 10 deletions src/data/listen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,13 @@ export function _listen<R extends Record<string, Any> = Record<string, Any>>(

return new Observable((observer) => {
let es: InstanceType<typeof import('@sanity/eventsource')>
getEventSource()
.then((eventSource) => {
es = eventSource
})
.catch((reason) => {
observer.error(reason)
stop()
})
let reconnectTimer: NodeJS.Timeout
let stopped = false
// Unsubscribe differs from stopped in that we will never reopen.
// Once it is`true`, it will never be `false` again.
let unsubscribed = false

open()

function onError() {
if (stopped) {
Expand Down Expand Up @@ -150,8 +147,17 @@ export function _listen<R extends Record<string, Any> = Record<string, Any>>(
}
}

async function getEventSource(): Promise<InstanceType<typeof import('@sanity/eventsource')>> {
async function getEventSource(): Promise<InstanceType<
typeof import('@sanity/eventsource')
> | void> {
const {default: EventSource} = await import('@sanity/eventsource')

// If the listener has been unsubscribed from before we managed to load the module,
// do not set up the EventSource.
if (unsubscribed) {
return
}

const evs = new EventSource(uri, esOptions)
evs.addEventListener('error', onError)
evs.addEventListener('channelError', onChannelError)
Expand All @@ -163,7 +169,9 @@ export function _listen<R extends Record<string, Any> = Record<string, Any>>(
function open() {
getEventSource()
.then((eventSource) => {
es = eventSource
if (eventSource) {
es = eventSource
}
})
.catch((reason) => {
observer.error(reason)
Expand All @@ -174,6 +182,7 @@ export function _listen<R extends Record<string, Any> = Record<string, Any>>(
function stop() {
stopped = true
unsubscribe()
unsubscribed = true
}

return stop
Expand Down
31 changes: 30 additions & 1 deletion test/listen.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type {AddressInfo} from 'node:net'

import {type ClientConfig, createClient} from '@sanity/client'
import {describe, expect, test} from 'vitest'
import {describe, expect, test, vitest} from 'vitest'

import {createSseServer, type OnRequest} from './helpers/sseServer'

Expand Down Expand Up @@ -212,5 +212,34 @@ describe.skipIf(typeof EdgeRuntime === 'string' || typeof document !== 'undefine
server.close()
}
})

test('can immediately unsubscribe, does not connect to server', async () => {
const onMessage = vitest.fn()
const onError = vitest.fn()
const onRequest = vitest.fn(({channel}) => {
channel!.send({event: 'mutation', data: {foo: 'bar'}})
process.nextTick(() => channel!.close())
})

const {server, client} = await testSse(onRequest)

const query = '*[_type == "beer" && title == $beerName]'
const params = {beerName: 'Headroom Double IPA'}

client
.listen(query, params)
.subscribe({
next: onMessage,
error: onError,
})
.unsubscribe()

await new Promise((resolve) => setTimeout(resolve, 100))

expect(onMessage).not.toHaveBeenCalled()
expect(onError).not.toHaveBeenCalled()
expect(onRequest).not.toHaveBeenCalled()
server.close()
})
},
)

0 comments on commit f38b64e

Please sign in to comment.