Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions interop/BrowserDockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,8 @@ WORKDIR /app/interop
ARG BROWSER=chromium
ENV BROWSER=${BROWSER}

# hack to ensure the correct browser version is installed while building the
# container and not during the test run which slows everything down
RUN npx playwright-test --runner mocha --browser $BROWSER --grep do-not-match-anything

ENTRYPOINT npm test -- -t browser -- --browser $BROWSER
11 changes: 7 additions & 4 deletions interop/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ This must be repeated every time you make a change to the js-libp2p source code.

```console
$ npm run build
$ docker build . -f ./interop/Dockerfile -t js-libp2p-node
$ docker build . -f ./interop/Dockerfile -t node-js-libp2p-head
```

#### Browsers

```console
$ npm run build
$ docker build . -f ./interop/BrowserDockerfile -t js-libp2p-browsers
$ docker build . -f ./interop/BrowserDockerfile -t browsers-js-libp2p-head
```

### Build another libp2p implementation
Expand Down Expand Up @@ -93,13 +93,13 @@ $ docker run --name redis --rm -p 6379:6379 redis:7-alpine
#### node.js

```console
$ docker run -e transport=tcp -e muxer=yamux -e security=noise -e is_dialer=true -e redis_addr=redis:6379 --link redis:redis js-libp2p-node
$ docker run -e transport=tcp -e muxer=yamux -e security=noise -e is_dialer=true -e redis_addr=redis:6379 --link redis:redis node-js-libp2p-head
```

#### Browsers

```console
$ docker run -e transport=webtransport -e muxer=yamux -e security=noise -e is_dialer=true -e redis_addr=redis:6379 --link redis:redis js-libp2p-browsers
$ docker run -e transport=webtransport -e muxer=yamux -e security=noise -e is_dialer=true -e redis_addr=redis:6379 --link redis:redis browsers-js-libp2p-head
```

### Start another libp2p implementation
Expand All @@ -110,6 +110,9 @@ $ docker run -e transport=webtransport -e muxer=yamux -e security=noise -e is_di

```console
$ docker run -e transport=tcp -e muxer=yamux -e security=noise -e is_dialer=false -e redis_addr=redis:6379 --link redis:redis go-v0.29


docker run -e transport=webrtc-direct -e is_dialer=false -e redis_addr=redis:6379 --link redis:redis go-v0.42
```

# License
Expand Down
17 changes: 11 additions & 6 deletions packages/interface-compliance-tests/src/transport/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ export default (common: TestSetup<TransportTestFixtures>): void => {

const input = Uint8Array.from([0, 1, 2, 3, 4])
const output = await dialer.services.echo.echo(dialAddrs[0], input, {
signal: AbortSignal.timeout(5000)
signal: AbortSignal.timeout(5_000)
})

expect(output.subarray()).to.equalBytes(input)
Expand All @@ -179,12 +179,12 @@ export default (common: TestSetup<TransportTestFixtures>): void => {
const input = Uint8Array.from([0, 1, 2, 3, 4])

const output1 = await dialer.services.echo.echo(dialAddrs[0], input, {
signal: AbortSignal.timeout(5000)
signal: AbortSignal.timeout(5_000)
})
expect(output1.subarray()).to.equalBytes(input)

const output2 = await dialer.services.echo.echo(dialAddrs[1], input, {
signal: AbortSignal.timeout(5000),
signal: AbortSignal.timeout(5_000),
force: true
})
expect(output2.subarray()).to.equalBytes(input)
Expand All @@ -194,10 +194,13 @@ export default (common: TestSetup<TransportTestFixtures>): void => {
({ dialer, listener, dialAddrs } = await getSetup(common))

const conn = await dialer.dial(dialAddrs[0], {
signal: AbortSignal.timeout(5000)
signal: AbortSignal.timeout(5_000)
})

await conn.close({
signal: AbortSignal.timeout(5_000)
})

await conn.close()
expect(isValidTick(conn.timeline.close)).to.equal(true)
})

Expand Down Expand Up @@ -247,7 +250,9 @@ export default (common: TestSetup<TransportTestFixtures>): void => {
})
}

expect(connection).to.have.property('streams').that.has.lengthOf(5)
expect(
connection.streams.filter(s => s.protocol === '/echo/1.0.0')
).to.have.lengthOf(5)

if (remoteConn != null) {
await pWaitFor(() => remoteConn.streams.filter(s => s.protocol === '/echo/1.0.0').length === 5, {
Expand Down
7 changes: 4 additions & 3 deletions packages/protocol-identify/src/identify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ export class Identify extends AbstractIdentify implements Startable, IdentifyInt
}
}

this.log('run identify on new connection %a', connection.remoteAddr)

try {
stream = await connection.newStream(this.protocol, {
...options,
Expand All @@ -62,10 +64,7 @@ export class Identify extends AbstractIdentify implements Startable, IdentifyInt
maxDataLength: this.maxMessageSize
}).pb(IdentifyMessage)

log('read response')
const message = await pb.read(options)

log('close write')
await pb.unwrap().unwrap().close(options)

return message
Expand Down Expand Up @@ -147,6 +146,8 @@ export class Identify extends AbstractIdentify implements Startable, IdentifyInt
async handleProtocol (stream: Stream, connection: Connection): Promise<void> {
const log = stream.log.newScope('identify')

log('responding to identify')

const signal = AbortSignal.timeout(this.timeout)
setMaxListeners(Infinity, signal)

Expand Down
65 changes: 50 additions & 15 deletions packages/transport-webrtc/src/muxer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,44 @@ export class DataChannelMuxerFactory implements StreamMuxerFactory {
private readonly peerConnection: RTCPeerConnection
private readonly metrics?: CounterGroup
private readonly dataChannelOptions?: DataChannelOptions
private readonly earlyDataChannels: RTCDataChannel[]

constructor (init: DataChannelMuxerFactoryInit) {
this.onEarlyDataChannel = this.onEarlyDataChannel.bind(this)

this.peerConnection = init.peerConnection
this.metrics = init.metrics
this.protocol = init.protocol ?? MUXER_PROTOCOL
this.dataChannelOptions = init.dataChannelOptions ?? {}
this.peerConnection.addEventListener('datachannel', this.onEarlyDataChannel)
this.earlyDataChannels = []
}

private onEarlyDataChannel (evt: RTCDataChannelEvent): void {
this.earlyDataChannels.push(evt.channel)
}

createStreamMuxer (maConn: MultiaddrConnection): StreamMuxer {
this.peerConnection.removeEventListener('datachannel', this.onEarlyDataChannel)

return new DataChannelMuxer(maConn, {
peerConnection: this.peerConnection,
dataChannelOptions: this.dataChannelOptions,
metrics: this.metrics,
protocol: this.protocol
protocol: this.protocol,
earlyDataChannels: this.earlyDataChannels
})
}
}

export interface DataChannelMuxerInit extends DataChannelMuxerFactoryInit {
protocol: string

/**
* Incoming data channels that were opened by the remote before the peer
* connection was established
*/
earlyDataChannels: RTCDataChannel[]
}

export interface DataChannelMuxerComponents {
Expand Down Expand Up @@ -89,27 +107,44 @@ export class DataChannelMuxer extends AbstractStreamMuxer<WebRTCStream> implemen
* {@link https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/datachannel_event}
*/
this.peerConnection.ondatachannel = ({ channel }) => {
this.log.trace('incoming %s datachannel with channel id %d, protocol %s and status %s', channel.protocol, channel.id, channel.protocol, channel.readyState)

// 'init' channel is only used during connection establishment, it is
// closed by the initiator
if (channel.label === 'init') {
this.log.trace('closing init channel %d', channel.id)
channel.close()
this.onDataChannel(channel)
}

queueMicrotask(() => {
if (this.status !== 'open') {
init.earlyDataChannels.forEach(channel => {
channel.close()
})
return
}

const stream = createStream({
...this.streamOptions,
...this.dataChannelOptions,
channel,
direction: 'inbound',
log: this.log
init.earlyDataChannels.forEach(channel => {
this.onDataChannel(channel)
})
})
}

private onDataChannel (channel: RTCDataChannel): void {
this.log('incoming datachannel with channel id %d, protocol %s and status %s', channel.id, channel.protocol, channel.readyState)

// 'init' channel is only used during connection establishment, it is
// closed by the initiator
if (channel.label === 'init') {
this.log.trace('closing init channel %d', channel.id)
channel.close()

this.onRemoteStream(stream)
return
}

const stream = createStream({
...this.streamOptions,
...this.dataChannelOptions,
channel,
direction: 'inbound',
log: this.log
})

this.onRemoteStream(stream)
}

async onCreateStream (options?: CreateStreamOptions): Promise<WebRTCStream> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export interface ConnectOptions extends LoggerOptions, ProgressOptions<WebRTCDia
logger: ComponentLogger
}

export async function initiateConnection ({ rtcConfiguration, dataChannel, signal, metrics, multiaddr: ma, connectionManager, transportManager, log, logger, onProgress }: ConnectOptions): Promise<{ remoteAddress: Multiaddr, peerConnection: RTCPeerConnection, muxerFactory: DataChannelMuxerFactory }> {
export async function initiateConnection ({ rtcConfiguration, dataChannel, signal, metrics, multiaddr: ma, connectionManager, transportManager, log, logger, onProgress }: ConnectOptions): Promise<{ remoteAddress: Multiaddr, peerConnection: globalThis.RTCPeerConnection, muxerFactory: DataChannelMuxerFactory }> {
const { circuitAddress, targetPeer } = splitAddr(ma)

metrics?.dialerEvents.increment({ open: true })
Expand Down Expand Up @@ -209,6 +209,7 @@ export async function initiateConnection ({ rtcConfiguration, dataChannel, signa

return {
remoteAddress: ma,
// @ts-expect-error https://github.com/murat-dogan/node-datachannel/pull/370
peerConnection,
muxerFactory
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ export class WebRTCTransport implements Transport<WebRTCDialEvents>, Startable {
})

const webRTCConn = toMultiaddrConnection({
// @ts-expect-error https://github.com/murat-dogan/node-datachannel/pull/370
peerConnection,
remoteAddr: remoteAddress,
metrics: this.metrics?.listenerEvents,
Expand All @@ -245,6 +246,7 @@ export class WebRTCTransport implements Transport<WebRTCDialEvents>, Startable {
})

// close the connection on shut down
// @ts-expect-error https://github.com/murat-dogan/node-datachannel/pull/370
this._closeOnShutdown(peerConnection, webRTCConn)
} catch (err: any) {
this.log.error('incoming signaling error', err)
Expand All @@ -255,7 +257,7 @@ export class WebRTCTransport implements Transport<WebRTCDialEvents>, Startable {
}
}

private _closeOnShutdown (pc: RTCPeerConnection, webRTCConn: MultiaddrConnection): void {
private _closeOnShutdown (pc: globalThis.RTCPeerConnection, webRTCConn: MultiaddrConnection): void {
// close the connection on shut down
const shutDownListener = (): void => {
webRTCConn.close()
Expand Down
12 changes: 9 additions & 3 deletions packages/transport-webrtc/src/private-to-public/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export interface WebRTCDirectListenerComponents {
keychain?: Keychain
datastore: Datastore
metrics?: Metrics
events?: CounterGroup
}

export interface WebRTCDirectListenerInit {
Expand Down Expand Up @@ -184,7 +185,13 @@ export class WebRTCDirectListener extends TypedEventEmitter<ListenerEvents> impl
signal.throwIfAborted()

// https://github.com/libp2p/specs/blob/master/webrtc/webrtc-direct.md#browser-to-public-server
peerConnection = await createDialerRTCPeerConnection('server', ufrag, this.init.rtcConfiguration, this.certificate)
const results = await createDialerRTCPeerConnection('server', ufrag, {
rtcConfiguration: this.init.rtcConfiguration,
certificate: this.certificate,
events: this.metrics?.listenerEvents,
dataChannel: this.init.dataChannel
})
peerConnection = results.peerConnection

this.connections.set(key, peerConnection)

Expand All @@ -201,11 +208,10 @@ export class WebRTCDirectListener extends TypedEventEmitter<ListenerEvents> impl
})

try {
await connect(peerConnection, ufrag, {
await connect(peerConnection, results.muxerFactory, ufrag, {
role: 'server',
log: this.log,
logger: this.components.logger,
metrics: this.components.metrics,
events: this.metrics?.listenerEvents,
signal,
remoteAddr: multiaddr(`/ip${isIPv4(remoteHost) ? 4 : 6}/${remoteHost}/udp/${remotePort}/webrtc-direct`),
Expand Down
11 changes: 8 additions & 3 deletions packages/transport-webrtc/src/private-to-public/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,19 @@ export class WebRTCDirectTransport implements Transport, Startable {
const ufrag = genUfrag()

// https://github.com/libp2p/specs/blob/master/webrtc/webrtc-direct.md#browser-to-public-server
const peerConnection = await createDialerRTCPeerConnection('client', ufrag, typeof this.init.rtcConfiguration === 'function' ? await this.init.rtcConfiguration() : this.init.rtcConfiguration ?? {})
const {
peerConnection,
muxerFactory
} = await createDialerRTCPeerConnection('client', ufrag, {
rtcConfiguration: typeof this.init.rtcConfiguration === 'function' ? await this.init.rtcConfiguration() : this.init.rtcConfiguration ?? {},
dataChannel: this.init.dataChannel
})

try {
return await connect(peerConnection, ufrag, {
return await connect(peerConnection, muxerFactory, ufrag, {
role: 'client',
log: this.log,
logger: this.components.logger,
metrics: this.components.metrics,
events: this.metrics?.dialerEvents,
signal: options.signal,
remoteAddr: ma,
Expand Down
Loading
Loading