forked from pull-stream/pull-ws
-
Notifications
You must be signed in to change notification settings - Fork 11
/
server.ts
120 lines (99 loc) · 3.31 KB
/
server.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import { EventEmitter } from 'events'
import http from 'http'
import https from 'https'
import { WebSocketServer as WSServer } from 'ws'
import duplex, { type DuplexWebSocket } from './duplex.js'
import type WebSocket from './web-socket.js'
import type { VerifyClientCallbackSync, VerifyClientCallbackAsync, AddressInfo } from 'ws'
export interface ServerOptions {
key?: string
cert?: string
server?: http.Server | https.Server
verifyClient?: VerifyClientCallbackAsync | VerifyClientCallbackSync
onConnection?(connection: DuplexWebSocket): void
}
export interface WebSocketServer extends EventEmitter {
listen(addrInfo: { port: number } | number): Promise<WebSocketServer>
close(): Promise<void>
address(): string | AddressInfo | null
}
class Server extends EventEmitter {
private readonly server: http.Server | https.Server
private readonly wsServer: WSServer
constructor (server: http.Server | https.Server, opts?: ServerOptions) {
super()
opts = opts ?? {}
this.server = server
this.wsServer = new WSServer({
server,
perMessageDeflate: false,
verifyClient: opts.verifyClient
})
this.wsServer.on('connection', this.onWsServerConnection.bind(this))
}
async listen (addrInfo: { port: number } | number): Promise<WebSocketServer> {
return new Promise<WebSocketServer>((resolve, reject) => {
this.wsServer.once('error', (e) => { reject(e) })
this.wsServer.once('listening', () => { resolve(this) })
this.server.listen(typeof addrInfo === 'number' ? addrInfo : addrInfo.port)
})
}
async close (): Promise<void> {
await new Promise<void>((resolve, reject) => {
this.server.close((err) => {
if (err != null) {
reject(err); return
}
resolve()
})
})
}
address (): string | AddressInfo | null {
return this.server.address()
}
onWsServerConnection (socket: WebSocket, req: http.IncomingMessage): void {
let addr: string | AddressInfo | null
try {
if (req.socket.remoteAddress == null || req.socket.remotePort == null) {
throw new Error('Remote connection did not have address and/or port')
}
addr = this.wsServer.address()
if (typeof addr === 'string') {
throw new Error('Cannot listen on unix sockets')
}
if (addr == null) {
throw new Error('Server was closing or not running')
}
} catch (err: any) {
req.destroy(err)
this.emit('error', err)
return
}
const stream: DuplexWebSocket = {
...duplex(socket, {
remoteAddress: req.socket.remoteAddress,
remotePort: req.socket.remotePort
}),
localAddress: addr.address,
localPort: addr.port
}
this.emit('connection', stream, req)
}
}
export function createServer (opts?: ServerOptions): WebSocketServer {
opts = opts ?? {}
const server = opts.server ?? (opts.key != null && opts.cert != null ? https.createServer(opts) : http.createServer())
const wss = new Server(server)
if (opts.onConnection != null) {
wss.on('connection', opts.onConnection)
}
function proxy (server: http.Server, event: string): http.Server {
return server.on(event, (...args: any[]) => {
wss.emit(event, ...args)
})
}
proxy(server, 'listening')
proxy(server, 'request')
proxy(server, 'close')
return wss
}