Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
fix: update interfaces (#145)
Browse files Browse the repository at this point in the history
Update to the lastest libp2p interfaces
  • Loading branch information
achingbrain authored Feb 21, 2022
1 parent 034b5da commit 213ebc5
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 60 deletions.
18 changes: 15 additions & 3 deletions .aegir.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,25 @@ module.exports = {
test: {
async before () {
const { Multiaddr } = await import('@multiformats/multiaddr')
const { mockUpgrader } = await import('@libp2p/interface-compliance-tests/transport/utils')
const { mockRegistrar, mockUpgrader } = await import('@libp2p/interface-compliance-tests/mocks')
const { WebSockets } = await import('./dist/src/index.js')
const { pipe } = await import('it-pipe')

const ws = new WebSockets({ upgrader: mockUpgrader() })
const protocol = '/echo/1.0.0'
const registrar = mockRegistrar()
registrar.handle(protocol, (evt) => {
void pipe(
evt.detail.stream,
evt.detail.stream
)
})
const upgrader = mockUpgrader({
registrar
})

const ws = new WebSockets({ upgrader })
const ma = new Multiaddr('/ip4/127.0.0.1/tcp/9095/ws')
const listener = ws.createListener(conn => pipe(conn, conn))
const listener = ws.createListener()
await listener.listen(ma)
listener.addEventListener('error', (evt) => {
console.error(evt.detail)
Expand Down
67 changes: 34 additions & 33 deletions test/browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,22 @@
import { expect } from 'aegir/utils/chai.js'
import { Multiaddr } from '@multiformats/multiaddr'
import { pipe } from 'it-pipe'
import { goodbye } from 'it-goodbye'
import take from 'it-take'
import all from 'it-all'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { WebSockets } from '../src/index.js'
import { mockUpgrader } from '@libp2p/interface-compliance-tests/transport/utils'
import { mockUpgrader } from '@libp2p/interface-compliance-tests/mocks'
import env from 'wherearewe'
import type { Connection } from '@libp2p/interfaces/connection'

const upgrader = mockUpgrader()
const protocol = '/echo/1.0.0'

describe('libp2p-websockets', () => {
const ma = new Multiaddr('/ip4/127.0.0.1/tcp/9095/ws')
let ws: WebSockets
let conn: Connection

beforeEach(async () => {
ws = new WebSockets({ upgrader })
ws = new WebSockets({ upgrader: mockUpgrader() })
conn = await ws.dial(ma)
})

Expand All @@ -29,12 +27,16 @@ describe('libp2p-websockets', () => {
})

it('echo', async () => {
const message = uint8ArrayFromString('Hello World!')
const s = goodbye({ source: [message], sink: all })
const { stream } = await conn.newStream(['echo'])
const data = uint8ArrayFromString('hey')
const { stream } = await conn.newStream([protocol])

const results = await pipe(s, stream, s)
expect(results).to.eql([message])
const res = await pipe(
[data],
stream,
async (source) => await all(source)
)

expect(res).to.deep.equal([data])
})

it('should filter out no DNS websocket addresses', function () {
Expand All @@ -54,37 +56,36 @@ describe('libp2p-websockets', () => {

describe('stress', () => {
it('one big write', async () => {
const rawMessage = new Uint8Array(1000000).fill(5)
const data = new Uint8Array(1000000).fill(5)
const { stream } = await conn.newStream([protocol])

const s = goodbye({ source: [rawMessage], sink: all })
const { stream } = await conn.newStream(['echo'])
const res = await pipe(
[data],
stream,
async (source) => await all(source)
)

const results = await pipe(s, stream, s)
expect(results).to.eql([rawMessage])
expect(res).to.deep.equal([data])
})

it('many writes', async function () {
this.timeout(10000)
const s = goodbye({
source: pipe(
(function * () {
while (true) {
yield uint8ArrayFromString(Math.random().toString())
}
}()),
(source) => take(source, 20000)
),
sink: all
})

const { stream } = await conn.newStream(['echo'])

const results = await pipe(s, stream, s)
expect(results).to.have.length(20000)
this.timeout(30000)

const count = 20000
const data = Array(count).fill(0).map(() => uint8ArrayFromString(Math.random().toString()))
const { stream } = await conn.newStream([protocol])

const res = await pipe(
data,
stream,
async (source) => await all(source)
)

expect(res).to.deep.equal(data)
})
})

it('.createServer throws in browser', () => {
expect(new WebSockets({ upgrader }).createListener).to.throw()
expect(new WebSockets({ upgrader: mockUpgrader() }).createListener).to.throw()
})
})
55 changes: 31 additions & 24 deletions test/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,29 @@ import { isLoopbackAddr } from 'is-loopback-addr'
import all from 'it-all'
import { pipe } from 'it-pipe'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { mockUpgrader } from '@libp2p/interface-compliance-tests/transport/utils'
import { mockRegistrar, mockUpgrader } from '@libp2p/interface-compliance-tests/mocks'
import defer from 'p-defer'
import waitFor from 'p-wait-for'
import { WebSockets } from '../src/index.js'
import * as filters from '../src/filters.js'
import drain from 'it-drain'
import type { Listener } from '@libp2p/interfaces/transport'

import './compliance.node.js'

const upgrader = mockUpgrader()
const protocol = '/say-hello/1.0.0'
const registrar = mockRegistrar()
void registrar.handle(protocol, (evt) => {
void pipe([
uint8ArrayFromString('hey')
],
evt.detail.stream,
drain
)
})
const upgrader = mockUpgrader({
registrar
})

describe('instantiate the transport', () => {
it('create', () => {
Expand All @@ -35,15 +48,15 @@ describe('listen', () => {
const ws = new WebSockets({ upgrader })
const listener = ws.createListener({
handler: (conn) => {
void conn.newStream(['echo']).then(async ({ stream }) => {
void conn.newStream([protocol]).then(async ({ stream }) => {
return await pipe(stream, stream)
})
}
})
await listener.listen(ma)

const conn = await ws.dial(ma)
const { stream } = await conn.newStream(['echo'])
const { stream } = await conn.newStream([protocol])
void pipe(stream, stream)

await listener.close()
Expand Down Expand Up @@ -212,33 +225,27 @@ describe('dial', () => {

beforeEach(async () => {
ws = new WebSockets({ upgrader })
listener = ws.createListener({
handler: (conn) => {
void conn.newStream(['echo']).then(async ({ stream }) => {
return await pipe(stream, stream)
})
}
})
listener = ws.createListener()
return await listener.listen(ma)
})

afterEach(async () => await listener.close())

it('dial', async () => {
const conn = await ws.dial(ma)
const s = goodbye({ source: [uint8ArrayFromString('hey')], sink: all })
const { stream } = await conn.newStream(['echo'])
const { stream } = await conn.newStream([protocol])

await expect(pipe(s, stream, s)).to.eventually.deep.equal([uint8ArrayFromString('hey')])
await expect(all(stream.source)).to.eventually.deep.equal([uint8ArrayFromString('hey')])
await conn.close()
})

it('dial with p2p Id', async () => {
const ma = new Multiaddr('/ip4/127.0.0.1/tcp/9091/ws/p2p/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw')
const conn = await ws.dial(ma)
const s = goodbye({ source: [uint8ArrayFromString('hey')], sink: all })
const { stream } = await conn.newStream(['echo'])
const { stream } = await conn.newStream([protocol])

await expect(pipe(s, stream, s)).to.eventually.deep.equal([uint8ArrayFromString('hey')])
await expect(all(stream.source)).to.eventually.deep.equal([uint8ArrayFromString('hey')])
await conn.close()
})

it('dial should throw on immediate abort', async () => {
Expand Down Expand Up @@ -286,7 +293,7 @@ describe('dial', () => {
ws = new WebSockets({ upgrader })
listener = ws.createListener({
handler: (conn) => {
void conn.newStream(['echo']).then(async ({ stream }) => {
void conn.newStream([protocol]).then(async ({ stream }) => {
return await pipe(stream, stream)
})
}
Expand All @@ -306,7 +313,7 @@ describe('dial', () => {
// Dial first no loopback address
const conn = await ws.dial(addrs[0])
const s = goodbye({ source: [uint8ArrayFromString('hey')], sink: all })
const { stream } = await conn.newStream(['echo'])
const { stream } = await conn.newStream([protocol])

await expect(pipe(s, stream, s)).to.eventually.deep.equal([uint8ArrayFromString('hey')])
})
Expand All @@ -327,7 +334,7 @@ describe('dial', () => {
listener = ws.createListener({
server,
handler: (conn) => {
void conn.newStream(['echo']).then(async ({ stream }) => {
void conn.newStream([protocol]).then(async ({ stream }) => {
return await pipe(stream, stream)
})
}
Expand All @@ -350,7 +357,7 @@ describe('dial', () => {
it('dial ip4', async () => {
const conn = await ws.dial(ma, { websocket: { rejectUnauthorized: false } })
const s = goodbye({ source: [uint8ArrayFromString('hey')], sink: all })
const { stream } = await conn.newStream(['echo'])
const { stream } = await conn.newStream([protocol])

const res = await pipe(s, stream, s)

Expand All @@ -368,7 +375,7 @@ describe('dial', () => {
ws = new WebSockets({ upgrader })
listener = ws.createListener({
handler: (conn) => {
void conn.newStream(['echo']).then(async ({ stream }) => {
void conn.newStream([protocol]).then(async ({ stream }) => {
return await pipe(stream, stream)
})
}
Expand All @@ -381,7 +388,7 @@ describe('dial', () => {
it('dial ip6', async () => {
const conn = await ws.dial(ma)
const s = goodbye({ source: [uint8ArrayFromString('hey')], sink: all })
const { stream } = await conn.newStream(['echo'])
const { stream } = await conn.newStream([protocol])

await expect(pipe(s, stream, s)).to.eventually.deep.equal([uint8ArrayFromString('hey')])
})
Expand All @@ -394,7 +401,7 @@ describe('dial', () => {
source: [uint8ArrayFromString('hey')],
sink: all
})
const { stream } = await conn.newStream(['echo'])
const { stream } = await conn.newStream([protocol])

await expect(pipe(s, stream, s)).to.eventually.deep.equal([uint8ArrayFromString('hey')])
})
Expand Down

0 comments on commit 213ebc5

Please sign in to comment.