Skip to content

Commit

Permalink
fix: create RTCPeerConnection after dialing remote peer (#2593)
Browse files Browse the repository at this point in the history
Chrome limits how many RTCPeerConnections a given tab can instantiated
during it's lifetime - https://issues.chromium.org/issues/41378764

To delay hitting this limit, only create the dial-end RTCPeerConnection
once a relayed connection has successfully been opened to the dial
target, this prevents needlessly creating RTCPeerConnections when the
dial fails before they are actually used.

Fixes #2591
  • Loading branch information
achingbrain authored Jun 18, 2024
1 parent a970b53 commit 8e4fdcd
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 50 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { CodeError } from '@libp2p/interface'
import { peerIdFromString } from '@libp2p/peer-id'
import { pbStream } from 'it-protobuf-stream'
import { type RTCPeerConnection, RTCSessionDescription } from '../webrtc/index.js'
import { DataChannelMuxerFactory } from '../muxer.js'
import { RTCPeerConnection, RTCSessionDescription } from '../webrtc/index.js'
import { Message } from './pb/message.js'
import { SIGNALING_PROTO_ID, splitAddr, type WebRTCTransportMetrics } from './transport.js'
import { readCandidatesUntilConnected } from './util.js'
import type { DataChannelOptions } from '../index.js'
import type { LoggerOptions, Connection } from '@libp2p/interface'
import type { LoggerOptions, Connection, ComponentLogger } from '@libp2p/interface'
import type { ConnectionManager, IncomingStreamData, TransportManager } from '@libp2p/interface-internal'
import type { Multiaddr } from '@multiformats/multiaddr'

Expand All @@ -17,16 +18,18 @@ export interface IncomingStreamOpts extends IncomingStreamData {
}

export interface ConnectOptions extends LoggerOptions {
peerConnection: RTCPeerConnection
rtcConfiguration?: RTCConfiguration
dataChannel?: DataChannelOptions
multiaddr: Multiaddr
connectionManager: ConnectionManager
transportManager: TransportManager
dataChannelOptions?: Partial<DataChannelOptions>
signal?: AbortSignal
metrics?: WebRTCTransportMetrics
logger: ComponentLogger
}

export async function initiateConnection ({ peerConnection, signal, metrics, multiaddr: ma, connectionManager, transportManager, log }: ConnectOptions): Promise<{ remoteAddress: Multiaddr }> {
export async function initiateConnection ({ rtcConfiguration, dataChannel, signal, metrics, multiaddr: ma, connectionManager, transportManager, log, logger }: ConnectOptions): Promise<{ remoteAddress: Multiaddr, peerConnection: RTCPeerConnection, muxerFactory: DataChannelMuxerFactory }> {
const { baseAddr } = splitAddr(ma)

metrics?.dialerEvents.increment({ open: true })
Expand Down Expand Up @@ -64,6 +67,13 @@ export async function initiateConnection ({ peerConnection, signal, metrics, mul
})

const messageStream = pbStream(stream).pb(Message)
const peerConnection = new RTCPeerConnection(rtcConfiguration)
const muxerFactory = new DataChannelMuxerFactory({
logger
}, {
peerConnection,
dataChannelOptions: dataChannel
})

try {
// we create the channel so that the RTCPeerConnection has a component for
Expand All @@ -79,7 +89,7 @@ export async function initiateConnection ({ peerConnection, signal, metrics, mul
// see - https://www.w3.org/TR/webrtc/#rtcpeerconnectioniceevent
const data = JSON.stringify(candidate?.toJSON() ?? null)

log.trace('initiator sending ICE candidate %s', data)
log.trace('initiator sending ICE candidate %o', candidate)

void messageStream.write({
type: Message.Type.ICE_CANDIDATE,
Expand Down Expand Up @@ -142,17 +152,21 @@ export async function initiateConnection ({ peerConnection, signal, metrics, mul
log.trace('initiator connected, closing init channel')
channel.close()

log.trace('initiator closing signalling stream')
await messageStream.unwrap().unwrap().close({
log.trace('closing signalling channel')
await stream.close({
signal
})

log.trace('initiator connected to remote address %s', ma)

return {
remoteAddress: ma
remoteAddress: ma,
peerConnection,
muxerFactory
}
} catch (err: any) {
log.error('outgoing signalling error', err)

peerConnection.close()
stream.abort(err)
throw err
Expand Down
31 changes: 15 additions & 16 deletions packages/transport-webrtc/src/private-to-private/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,20 +133,16 @@ export class WebRTCTransport implements Transport, Startable {
async dial (ma: Multiaddr, options: DialOptions): Promise<Connection> {
this.log.trace('dialing address: %a', ma)

const peerConnection = new RTCPeerConnection(this.init.rtcConfiguration)
const muxerFactory = new DataChannelMuxerFactory(this.components, {
peerConnection,
dataChannelOptions: this.init.dataChannel
})

const { remoteAddress } = await initiateConnection({
peerConnection,
const { remoteAddress, peerConnection, muxerFactory } = await initiateConnection({
rtcConfiguration: this.init.rtcConfiguration,
dataChannel: this.init.dataChannel,
multiaddr: ma,
dataChannelOptions: this.init.dataChannel,
signal: options.signal,
connectionManager: this.components.connectionManager,
transportManager: this.components.transportManager,
log: this.log
log: this.log,
logger: this.components.logger
})

const webRTCConn = new WebRTCMultiaddrConnection(this.components, {
Expand Down Expand Up @@ -185,27 +181,30 @@ export class WebRTCTransport implements Transport, Startable {
log: this.log
})

// close the stream if SDP messages have been exchanged successfully
await stream.close({
signal
})

const webRTCConn = new WebRTCMultiaddrConnection(this.components, {
peerConnection,
timeline: { open: (new Date()).getTime() },
remoteAddr: remoteAddress,
metrics: this.metrics?.listenerEvents
})

// close the connection on shut down
this._closeOnShutdown(peerConnection, webRTCConn)

await this.components.upgrader.upgradeInbound(webRTCConn, {
skipEncryption: true,
skipProtection: true,
muxerFactory
})

// close the stream if SDP messages have been exchanged successfully
await stream.close({
signal
})
// close the connection on shut down
this._closeOnShutdown(peerConnection, webRTCConn)
} catch (err: any) {
this.log.error('incoming signalling error', err)

peerConnection.close()
stream.abort(err)
throw err
}
Expand Down
7 changes: 5 additions & 2 deletions packages/transport-webrtc/src/private-to-private/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ export const readCandidatesUntilConnected = async (pc: RTCPeerConnection, stream
connectedPromise.promise,
stream.read({
signal: options.signal
})
}).catch(() => {})
])

// stream ended or we became connected
if (message == null) {
// throw if we timed out
options.signal?.throwIfAborted()

break
}

Expand All @@ -48,7 +51,7 @@ export const readCandidatesUntilConnected = async (pc: RTCPeerConnection, stream

const candidate = new RTCIceCandidate(candidateInit)

options.log.trace('%s received new ICE candidate', options.direction, candidate)
options.log.trace('%s received new ICE candidate %o', options.direction, candidateInit)

try {
await pc.addIceCandidate(candidate)
Expand Down
42 changes: 19 additions & 23 deletions packages/transport-webrtc/test/peer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@ import { Message } from '../src/private-to-private/pb/message.js'
import { handleIncomingStream } from '../src/private-to-private/signaling-stream-handler.js'
import { SIGNALING_PROTO_ID, WebRTCTransport, splitAddr } from '../src/private-to-private/transport.js'
import { RTCPeerConnection, RTCSessionDescription } from '../src/webrtc/index.js'
import type { Logger, Connection, Stream } from '@libp2p/interface'
import type { Logger, Connection, Stream, ComponentLogger } from '@libp2p/interface'
import type { ConnectionManager, TransportManager } from '@libp2p/interface-internal'

const browser = detect()

interface Initiator {
multiaddr: Multiaddr
peerConnection: RTCPeerConnection
connectionManager: StubbedInstance<ConnectionManager>
transportManager: StubbedInstance<TransportManager>
connection: StubbedInstance<Connection>
stream: Stream
log: Logger
logger: ComponentLogger
}

interface Recipient {
Expand Down Expand Up @@ -67,12 +67,12 @@ async function getComponents (): Promise<PrivateToPrivateComponents> {
return {
initiator: {
multiaddr: receiverMultiaddr,
peerConnection: new RTCPeerConnection(),
connectionManager: stubInterface<ConnectionManager>(),
transportManager: stubInterface<TransportManager>(),
connection: stubInterface<Connection>(),
stream: initiatorStream,
log: logger('test')
log: logger('test'),
logger: defaultLogger()
},
recipient: {
peerConnection: new RTCPeerConnection(),
Expand All @@ -91,9 +91,10 @@ describe('webrtc basic', () => {
const isFirefox = ((browser != null) && browser.name === 'firefox')
let initiator: Initiator
let recipient: Recipient
let initiatorPeerConnection: RTCPeerConnection

afterEach(() => {
initiator?.peerConnection?.close()
initiatorPeerConnection?.close()
recipient?.peerConnection?.close()
})

Expand All @@ -109,7 +110,7 @@ describe('webrtc basic', () => {
// signalling stream opens successfully
initiator.connection.newStream.withArgs(SIGNALING_PROTO_ID).resolves(initiator.stream)

await expect(
;[{ peerConnection: initiatorPeerConnection }] = await expect(
Promise.all([
initiateConnection(initiator),
handleIncomingStream(recipient)
Expand All @@ -118,11 +119,11 @@ describe('webrtc basic', () => {

await pRetry(async () => {
if (isFirefox) {
expect(initiator.peerConnection.iceConnectionState).eq('connected')
expect(initiatorPeerConnection.iceConnectionState).eq('connected')
expect(recipient.peerConnection.iceConnectionState).eq('connected')
return
}
expect(initiator.peerConnection.connectionState).eq('connected')
expect(initiatorPeerConnection.connectionState).eq('connected')
expect(recipient.peerConnection.connectionState).eq('connected')
})
})
Expand All @@ -137,18 +138,14 @@ describe('webrtc basic', () => {
// transport manager dials recipient
initiator.transportManager.dial.resolves(initiator.connection)

const createOffer = initiator.peerConnection.setRemoteDescription.bind(initiator.peerConnection)

initiator.peerConnection.setRemoteDescription = async (name) => {
// the dial is aborted
initiator.connection.newStream.callsFake(async () => {
// the operation is aborted
abortController.abort(new Error('Oh noes!'))
// setting the description takes some time
// opening the stream takes some time
await delay(100)
return createOffer(name)
}

// signalling stream opens successfully
initiator.connection.newStream.withArgs(SIGNALING_PROTO_ID).resolves(initiator.stream)
// signalling stream opens successfully
return initiator.stream
})

await expect(Promise.all([
initiateConnection({
Expand All @@ -164,9 +161,10 @@ describe('webrtc basic', () => {
describe('webrtc receiver', () => {
let initiator: Initiator
let recipient: Recipient
let initiatorPeerConnection: RTCPeerConnection

afterEach(() => {
initiator?.peerConnection?.close()
initiatorPeerConnection?.close()
recipient?.peerConnection?.close()
})

Expand All @@ -177,18 +175,16 @@ describe('webrtc receiver', () => {

await stream.write({ type: Message.Type.SDP_OFFER, data: 'bad' })
await expect(receiverPeerConnectionPromise).to.be.rejectedWith(/Failed to set remoteDescription/)

initiator.peerConnection.close()
recipient.peerConnection.close()
})
})

describe('webrtc dialer', () => {
let initiator: Initiator
let recipient: Recipient
let initiatorPeerConnection: RTCPeerConnection

afterEach(() => {
initiator?.peerConnection?.close()
initiatorPeerConnection?.close()
recipient?.peerConnection?.close()
})

Expand Down
2 changes: 1 addition & 1 deletion packages/transport-webrtc/test/stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ describe('Max message size', () => {
await expect(webrtcStream.sink([new Uint8Array(1)])).to.eventually.be.rejected
.with.property('code', 'ERR_BUFFER_CLEAR_TIMEOUT')
const t1 = Date.now()
expect(t1 - t0).greaterThan(timeout)
expect(t1 - t0).greaterThanOrEqual(timeout)
expect(t1 - t0).lessThan(timeout + 1000) // Some upper bound
await closed.promise
expect(webrtcStream.timeline.close).to.be.greaterThan(webrtcStream.timeline.open)
Expand Down

0 comments on commit 8e4fdcd

Please sign in to comment.