Skip to content

Commit

Permalink
feat: restore streaming support
Browse files Browse the repository at this point in the history
  • Loading branch information
ricokahler authored and binoy14 committed Aug 1, 2024
1 parent e9e2bf7 commit 9fd927c
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 22 deletions.
1 change: 1 addition & 0 deletions packages/sanity/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@
"esbuild-register": "^3.5.0",
"execa": "^2.0.0",
"exif-component": "^1.0.1",
"form-data": "^4.0.0",
"framer-motion": "11.0.8",
"get-it": "^8.6.3",
"get-random-values-esm": "1.0.2",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {type Stats} from 'node:fs'
import fs from 'node:fs/promises'
import {Readable} from 'node:stream'
import {type Gzip} from 'node:zlib'

import {beforeEach, describe, expect, it, jest} from '@jest/globals'
Expand Down Expand Up @@ -43,11 +44,11 @@ global.fetch = mockFetch

// Mock the Gzip stream
class MockGzip {
constructor(private chunks: Buffer[]) {}
constructor(private chunks: AsyncIterable<Buffer> | Iterable<Buffer>) {}
[Symbol.asyncIterator]() {
const chunks = this.chunks
return (async function* thing() {
for (const chunk of chunks) yield chunk
for await (const chunk of chunks) yield chunk
})()
}
}
Expand Down Expand Up @@ -148,16 +149,16 @@ describe('createDeployment', () => {
})

it('sends the correct request to create a deployment and includes authorization header if token is present', async () => {
const chunks = [Buffer.from('first chunk'), Buffer.from('second chunk')]
const tarball = new MockGzip(chunks) as unknown as Gzip
const tarball = Readable.from([Buffer.from('example chunk', 'utf-8')]) as Gzip
const applicationId = 'test-app-id'
const version = '1.0.0'

mockFetch.mockResolvedValueOnce(new Response())

await createDeployment({
client: mockClient,
applicationId,
version: '1.0.0',
version,
isAutoUpdating: true,
tarball,
})
Expand All @@ -174,15 +175,90 @@ describe('createDeployment', () => {

// Extract and validate form data
const mockFetchCalls = mockFetch.mock.calls as Parameters<typeof fetch>[]
const formData = mockFetchCalls[0][1]?.body as FormData
expect(formData.get('version')).toBe('1.0.0')
expect(formData.get('isAutoUpdating')).toBe('true')
expect(formData.get('tarball')).toBeInstanceOf(Blob)
const formData = mockFetchCalls[0][1]?.body as unknown as Readable

// dump the raw content of the form data into a string
let content = ''
for await (const chunk of formData) {
content += chunk
}

expect(content).toContain('isAutoUpdating')
expect(content).toContain('true')
expect(content).toContain('version')
expect(content).toContain(version)
expect(content).toContain('example chunk')

// Check Authorization header
const headers = mockFetchCalls[0][1]?.headers as Headers
expect(headers.get('Authorization')).toBe('Bearer fake-token')
})

it('streams the tarball contents', async () => {
const firstEmission = 'first emission\n'
const secondEmission = 'second emission\n'

async function* createMockStream() {
await new Promise((resolve) => setTimeout(resolve, 0))
yield Buffer.from(firstEmission, 'utf-8')
await new Promise((resolve) => setTimeout(resolve, 0))
yield Buffer.from(secondEmission, 'utf-8')
}

const mockTarball = Readable.from(createMockStream()) as Gzip
const applicationId = 'test-app-id'

mockFetch.mockResolvedValueOnce(new Response())

const version = '1.0.0'

await createDeployment({
client: mockClient,
applicationId,
version,
isAutoUpdating: true,
tarball: mockTarball,
})

// Check URL and method
expect(mockClient.getUrl).toHaveBeenCalledWith(
`/user-applications/${applicationId}/deployments`,
)
expect(mockFetch).toHaveBeenCalledTimes(1)
const url = mockFetch.mock.calls[0][0] as URL
expect(url.toString()).toBe(
'http://example.api.sanity.io/user-applications/test-app-id/deployments',
)

// Extract and validate form data
const mockFetchCalls = mockFetch.mock.calls as Parameters<typeof fetch>[]
const requestInit = mockFetchCalls[0][1] as any

// this is required to enable streaming with the native fetch API
// https://github.com/nodejs/node/issues/46221
expect(requestInit.duplex).toBe('half')

const formData = requestInit.body as Readable

let emissions = 0
let content = ''
for await (const chunk of formData) {
content += chunk
emissions++
}

expect(emissions).toBeGreaterThan(1)

expect(content).toContain('isAutoUpdating')
expect(content).toContain('true')

expect(content).toContain('version')
expect(content).toContain(version)

expect(content).toContain(firstEmission)
expect(content).toContain(secondEmission)
expect(content).toContain('application/gzip')
})
})

describe('deleteUserApplication', () => {
Expand Down
33 changes: 20 additions & 13 deletions packages/sanity/src/_internal/cli/actions/deploy/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import fs from 'node:fs/promises'
import path from 'node:path'
import {PassThrough} from 'node:stream'
import {type Gzip} from 'node:zlib'

import {type CliCommandContext, type CliConfig} from '@sanity/cli'
import {type SanityClient} from '@sanity/client'
import FormData from 'form-data'
import readPkgUp from 'read-pkg-up'

// TODO: replace with `Promise.withResolvers()` once it lands in node
Expand Down Expand Up @@ -135,29 +137,34 @@ export async function createDeployment({
client,
tarball,
applicationId,
...options
isAutoUpdating,
version,
}: CreateDeploymentOptions): Promise<void> {
const config = client.config()

const formData = new FormData()
for (const [key, value] of Object.entries(options)) {
formData.set(key, value.toString())
}

// convert the tarball into a blob and add it to the form data
const chunks: Buffer[] = []
for await (const chunk of tarball) {
chunks.push(chunk)
}
const blob = new Blob([Buffer.concat(chunks)], {type: 'application/gzip'})
formData.set('tarball', blob)
formData.append('isAutoUpdating', isAutoUpdating.toString())
formData.append('version', version)
formData.append('tarball', tarball, {contentType: 'application/gzip'})

const url = new URL(client.getUrl(`/user-applications/${applicationId}/deployments`))
const headers = new Headers({
...(config.token && {Authorization: `Bearer ${config.token}`}),
})

await fetch(url, {method: 'POST', headers, body: formData})
await fetch(url, {
method: 'POST',
headers,
// NOTE:
// - the fetch API in node.js supports streams but it's not in the types
// - the PassThrough is required because `form-data` does not fully conform
// to the node.js stream API
// eslint-disable-next-line @typescript-eslint/no-explicit-any
body: formData.pipe(new PassThrough()) as any,
// @ts-expect-error the `duplex` param is required in order to send a stream
// https://github.com/nodejs/node/issues/46221#issuecomment-1383246036
duplex: 'half',
})
}

export interface DeleteUserApplicationOptions {
Expand Down
3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 9fd927c

Please sign in to comment.