Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deps!: update stream types #61

Merged
merged 1 commit into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
node_modules
.nyc_output
build
dist
coverage
package-lock.json
.docs
.coverage
node_modules
package-lock.json
yarn.lock
.vscode
15 changes: 12 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# it-ws <!-- omit in toc -->

[![codecov](https://img.shields.io/codecov/c/github/alanshaw/it-ws.svg?style=flat-square)](https://codecov.io/gh/alanshaw/it-ws)
[![CI](https://img.shields.io/github/workflow/status/alanshaw/it-ws/test%20&%20maybe%20release/master?style=flat-square)](https://github.com/alanshaw/it-ws/actions/workflows/js-test-and-release.yml)
[![CI](https://img.shields.io/github/actions/workflow/status/alanshaw/it-ws/js-test-and-release.yml?branch=master\&style=flat-square)](https://github.com/alanshaw/it-ws/actions/workflows/js-test-and-release.yml?query=branch%3Amaster)

> Simple async iterables for websocket client connections

## Table of contents <!-- omit in toc -->

- [Install](#install)
- [Browser `<script>` tag](#browser-script-tag)
- [Usage](#usage)
- [Example - client](#example---client)
- [Example - server](#example---server)
Expand All @@ -21,14 +22,22 @@
- [`import sink from 'it-ws/sink'`](#import-sink-from-it-wssink)
- [`import source from 'it-ws/source'`](#import-source-from-it-wssource)
- [License](#license)
- [Contribute](#contribute)
- [Contribution](#contribution)

## Install

```console
$ npm i it-ws
```

### Browser `<script>` tag

Loading this module through a script tag will make it's exports available as `ItWs` in the global namespace.

```html
<script src="https://unpkg.com/it-ws/dist/index.min.js"></script>
```

## Usage

### Example - client
Expand Down Expand Up @@ -259,6 +268,6 @@ Licensed under either of
- Apache 2.0, ([LICENSE-APACHE](LICENSE-APACHE) / <http://www.apache.org/licenses/LICENSE-2.0>)
- MIT ([LICENSE-MIT](LICENSE-MIT) / <http://opensource.org/licenses/MIT>)

## Contribute
## Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.
19 changes: 10 additions & 9 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
},
"files": [
"src",
"dist/src",
"dist",
"!dist/test",
"!**/*.tsbuildinfo"
],
Expand Down Expand Up @@ -176,26 +176,27 @@
"test:firefox-webworker": "aegir test -t webworker -- --browser firefox",
"test:node": "aegir test -t node --cov",
"test:electron-main": "aegir test -t electron-main",
"release": "aegir release"
"release": "aegir release",
"docs": "aegir docs"
},
"dependencies": {
"event-iterator": "^2.0.0",
"iso-url": "^1.1.2",
"it-stream-types": "^1.0.2",
"it-stream-types": "^2.0.1",
"uint8arrays": "^4.0.2",
"ws": "^8.4.0"
},
"devDependencies": {
"@types/ws": "^8.2.2",
"aegir": "^37.0.15",
"aegir": "^38.1.8",
"delay": "^5.0.0",
"it-all": "^2.0.0",
"it-drain": "^2.0.0",
"it-foreach": "^1.0.0",
"it-all": "^3.0.1",
"it-drain": "^3.0.1",
"it-foreach": "^2.0.2",
"it-goodbye": "^4.0.0",
"it-map": "^2.0.0",
"it-map": "^3.0.2",
"it-ndjson": "^1.0.0",
"it-pipe": "^2.0.3",
"it-pipe": "^3.0.1",
"p-defer": "^4.0.0",
"wherearewe": "^2.0.1",
"wsurl": "^1.0.0"
Expand Down
6 changes: 3 additions & 3 deletions src/duplex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ import source from './source.js'
import sink from './sink.js'
import type WebSocket from './web-socket.js'
import type { SinkOptions } from './sink.js'
import type { Duplex } from 'it-stream-types'
import type { Duplex, Source } from 'it-stream-types'

export interface DuplexWebSocket extends Duplex<Uint8Array, Uint8Array, Promise<void>> {
export interface DuplexWebSocket extends Duplex<AsyncGenerator<Uint8Array>, Source<Uint8Array>, Promise<void>> {
connected: () => Promise<void>
localAddress?: string
localPort?: number
Expand Down Expand Up @@ -43,7 +43,7 @@ export default (socket: WebSocket, options?: DuplexWebSocketOptions): DuplexWebS
const duplex: DuplexWebSocket = {
sink: sink(socket, options),
source: connectedSource,
connected: async () => await connectedSource.connected(),
connected: async () => { await connectedSource.connected() },
close: async () => {
if (socket.readyState === socket.CONNECTING || socket.readyState === socket.OPEN) {
await new Promise<void>((resolve) => {
Expand Down
10 changes: 5 additions & 5 deletions src/ready.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { ErrorEvent, WebSocket } from 'ws'

export default (socket: WebSocket) => {
export default async (socket: WebSocket): Promise<void> => {
// if the socket is closing or closed, return end
if (socket.readyState >= 2) {
throw new Error('socket closed')
Expand All @@ -11,18 +11,18 @@ export default (socket: WebSocket) => {
return
}

return new Promise<void>((resolve, reject) => {
function cleanup () {
await new Promise<void>((resolve, reject) => {
function cleanup (): void {
socket.removeEventListener('open', handleOpen)
socket.removeEventListener('error', handleErr)
}

function handleOpen () {
function handleOpen (): void {
cleanup()
resolve()
}

function handleErr (event: ErrorEvent) {
function handleErr (event: ErrorEvent): void {
cleanup()
reject(event.error ?? new Error(`connect ECONNREFUSED ${socket.url}`))
}
Expand Down
20 changes: 10 additions & 10 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,38 +29,38 @@ class Server extends EventEmitter {
opts = opts ?? {}
this.server = server
this.wsServer = new WSServer({
server: server,
server,
perMessageDeflate: false,
verifyClient: opts.verifyClient
})
this.wsServer.on('connection', this.onWsServerConnection.bind(this))
}

async listen (addrInfo: { port: number } | number) {
async listen (addrInfo: { port: number } | number): Promise<WebSocketServer> {
return await new Promise<WebSocketServer>((resolve, reject) => {
this.wsServer.once('error', (e) => reject(e))
this.wsServer.once('listening', () => resolve(this))
this.wsServer.once('error', (e) => { reject(e) })
this.wsServer.once('listening', () => { resolve(this) })
this.server.listen(typeof addrInfo === 'number' ? addrInfo : addrInfo.port)
})
}

async close () {
return await new Promise<void>((resolve, reject) => {
async close (): Promise<void> {
await new Promise<void>((resolve, reject) => {
this.server.close((err) => {
if (err != null) {
return reject(err)
reject(err); return
}

resolve()
})
})
}

address () {
address (): string | AddressInfo | null {
return this.server.address()
}

onWsServerConnection (socket: WebSocket, req: http.IncomingMessage) {
onWsServerConnection (socket: WebSocket, req: http.IncomingMessage): void {
const addr = this.wsServer.address()

if (typeof addr === 'string') {
Expand Down Expand Up @@ -96,7 +96,7 @@ export function createServer (opts?: ServerOptions): WebSocketServer {
wss.on('connection', opts.onConnection)
}

function proxy (server: http.Server, event: string) {
function proxy (server: http.Server, event: string): http.Server {
return server.on(event, (...args: any[]) => {
wss.emit(event, ...args)
})
Expand Down
10 changes: 5 additions & 5 deletions src/sink.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import ready from './ready.js'
import type { WebSocket } from 'ws'
import type { Sink } from 'it-stream-types'
import type { Sink, Source } from 'it-stream-types'

export interface SinkOptions {
closeOnEnd?: boolean
}

export default (socket: WebSocket, options: SinkOptions) => {
export default (socket: WebSocket, options: SinkOptions): Sink<Source<Uint8Array>, Promise<void>> => {
options = options ?? {}
options.closeOnEnd = options.closeOnEnd !== false

const sink: Sink<Uint8Array, Promise<void>> = async source => {
const sink: Sink<Source<Uint8Array>, Promise<void>> = async source => {
for await (const data of source) {
try {
await ready(socket)
Expand All @@ -23,7 +23,7 @@ export default (socket: WebSocket, options: SinkOptions) => {
}

if (options.closeOnEnd != null && socket.readyState <= 1) {
return await new Promise((resolve, reject) => {
await new Promise<void>((resolve, reject) => {
socket.addEventListener('close', event => {
if (event.wasClean || event.code === 1006) {
resolve()
Expand All @@ -33,7 +33,7 @@ export default (socket: WebSocket, options: SinkOptions) => {
}
})

setTimeout(() => socket.close())
setTimeout(() => { socket.close() })
})
}
}
Expand Down
52 changes: 27 additions & 25 deletions src/source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,40 +10,42 @@ function isArrayBuffer (obj: any): obj is ArrayBuffer {
(obj?.constructor?.name === 'ArrayBuffer' && typeof obj?.byteLength === 'number')
}

export interface ConnectedSource extends AsyncIterable<Uint8Array> {
export interface ConnectedSource extends AsyncGenerator<Uint8Array> {
connected: () => Promise<void>
}

export default (socket: WebSocket): ConnectedSource => {
socket.binaryType = 'arraybuffer'

const connected = async () => await new Promise<void>((resolve, reject) => {
if (isConnected) {
return resolve()
}
if (connError != null) {
return reject(connError)
}

const cleanUp = (cont: () => void) => {
socket.removeEventListener('open', onOpen)
socket.removeEventListener('error', onError)
cont()
}

const onOpen = () => cleanUp(resolve)
const onError = (event: ErrorEvent) => {
cleanUp(() => reject(event.error ?? new Error(`connect ECONNREFUSED ${socket.url}`)))
}

socket.addEventListener('open', onOpen)
socket.addEventListener('error', onError)
})
const connected = async (): Promise<void> => {
await new Promise<void>((resolve, reject) => {
if (isConnected) {
resolve(); return
}
if (connError != null) {
reject(connError); return
}

const cleanUp = (cont: () => void): void => {
socket.removeEventListener('open', onOpen)
socket.removeEventListener('error', onError)
cont()
}

const onOpen = (): void => { cleanUp(resolve) }
const onError = (event: ErrorEvent): void => {
cleanUp(() => { reject(event.error ?? new Error(`connect ECONNREFUSED ${socket.url}`)) })
}

socket.addEventListener('open', onOpen)
socket.addEventListener('error', onError)
})
}

const source = (async function * () {
const messages = new EventIterator<Uint8Array>(
({ push, stop, fail }) => {
const onMessage = (event: MessageEvent) => {
const onMessage = (event: MessageEvent): void => {
let data: Uint8Array | null = null

if (typeof event.data === 'string') {
Expand All @@ -64,7 +66,7 @@ export default (socket: WebSocket): ConnectedSource => {

push(data)
}
const onError = (event: ErrorEvent) => fail(event.error ?? new Error('Socket error'))
const onError = (event: ErrorEvent): void => { fail(event.error ?? new Error('Socket error')) }

socket.addEventListener('message', onMessage)
socket.addEventListener('error', onError)
Expand Down
2 changes: 1 addition & 1 deletion src/ws-url.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ import { relative } from 'iso-url'
const map = { http: 'ws', https: 'wss' }
const def = 'ws'

export default (url: string, location: string | Partial<Location>) => relative(url, location, map, def)
export default (url: string, location: string | Partial<Location>): string => relative(url, location, map, def)
2 changes: 1 addition & 1 deletion test/echo-inline.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ describe('simple echo server', () => {
[1, 2, 3],
// need a delay, because otherwise ws hangs up wrong.
// otherwise use pull-goodbye.
(source) => map(source, async val => await new Promise(resolve => setTimeout(() => resolve(val), 10))),
(source) => map(source, async val => await new Promise(resolve => setTimeout(() => { resolve(val) }, 10))),
(source) => map(ndjson.stringify(source), str => uint8ArrayFromString(str)),
WS.connect('ws://localhost:5678'),
ndjson.parse,
Expand Down
16 changes: 9 additions & 7 deletions test/echo.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,15 @@ describe('echo', () => {
pws,
goodbye({
source: expected,
sink: async source => await pipe(
source,
(source) => each(source, (value) => {
expect(value).to.equalBytes(expected.shift())
}),
drain
)
sink: async source => {
await pipe(
source,
(source) => each(source, (value) => {
expect(value).to.equalBytes(expected.shift())
}),
drain
)
}
}),
pws
)
Expand Down
4 changes: 2 additions & 2 deletions test/helpers/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { WebSocket } from 'ws'

const port = parseInt(process.env.PORT ?? '3000', 10)

export function createTestServer () {
export function createTestServer (): WebSocketServer {
const routes: Record<string, (ws: WebSocket) => void> = {
'/read': function (ws: WebSocket) {
const values = ['a', 'b', 'c', 'd']
Expand All @@ -24,7 +24,7 @@ export function createTestServer () {
})
}
}
const wss = new WebSocketServer({ port: port })
const wss = new WebSocketServer({ port })

wss.on('connection', function (ws, req) {
if (req.url == null) {
Expand Down
2 changes: 1 addition & 1 deletion test/pass-in-server.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ describe('simple echo server', () => {
[1, 2, 3],
// need a delay, because otherwise ws hangs up wrong.
// otherwise use pull-goodbye.
(source) => map(source, async val => await new Promise(resolve => setTimeout(() => resolve(val), 10))),
(source) => map(source, async val => await new Promise(resolve => setTimeout(() => { resolve(val) }, 10))),
(source) => map(ndjson.stringify(source), str => uint8ArrayFromString(str)),
stream,
ndjson.parse,
Expand Down
Loading