Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: prevent duplicate trustless-gateway reqs #503

Merged
merged 6 commits into from
Apr 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions packages/block-brokers/.aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ const options = {
})
res.end(Uint8Array.from([0, 1, 2, 0]))
})
server.all('/ipfs/bafkqabtimvwgy3yk', async (req, res) => {
// delay the response
await new Promise((resolve) => setTimeout(resolve, 500))

res.writeHead(200, {
'content-type': 'application/octet-stream',
'content-length': 5
})
// "hello"
res.end(Uint8Array.from([104, 101, 108, 108, 111]))
})

await server.listen()
const { port } = server.server.address()
Expand Down
1 change: 1 addition & 0 deletions packages/block-brokers/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
"@libp2p/logger": "^4.0.7",
"@libp2p/peer-id-factory": "^4.0.7",
"@multiformats/uri-to-multiaddr": "^8.0.0",
"@types/polka": "^0.5.7",
"@types/sinon": "^17.0.3",
"aegir": "^42.2.5",
"cors": "^2.8.5",
Expand Down
81 changes: 63 additions & 18 deletions packages/block-brokers/src/trustless-gateway/trustless-gateway.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
import { base64 } from 'multiformats/bases/base64'
import type { ComponentLogger, Logger } from '@libp2p/interface'
import type { CID } from 'multiformats/cid'

export interface TrustlessGatewayStats {
attempts: number
errors: number
invalidBlocks: number
successes: number
pendingResponses?: number
}

/**
* A `TrustlessGateway` keeps track of the number of attempts, errors, and
* successes for a given gateway url so that we can prioritize gateways that
Expand Down Expand Up @@ -37,13 +46,34 @@
*/
#successes = 0

/**
* A map of pending responses for this gateway. This is used to ensure that
* only one request per CID is made to a given gateway at a time, and that we
* don't make multiple in-flight requests for the same CID to the same gateway.
*/
#pendingResponses = new Map<string, Promise<Uint8Array>>()
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved

private readonly log: Logger

constructor (url: URL | string, logger: ComponentLogger) {
this.url = url instanceof URL ? url : new URL(url)
this.log = logger.forComponent(`helia:trustless-gateway-block-broker:${this.url.hostname}`)
}

/**
* This function returns a unique string for the multihash.bytes of the CID.
*
* Some useful resources for why this is needed can be found using the links below:
*
* - https://github.com/ipfs/helia/pull/503#discussion_r1572451331
* - https://github.com/ipfs/kubo/issues/6815
* - https://www.notion.so/pl-strflt/Handling-ambiguity-around-CIDs-9d5e14f6516f438980b01ef188efe15d#d9d45cd1ed8b4d349b96285de4aed5ab
*/
#uniqueBlockId (cid: CID): string {
const multihashBytes = cid.multihash.bytes
return base64.encode(multihashBytes)
}

/**
* Fetch a raw block from `this.url` following the specification defined at
* https://specs.ipfs.tech/http-gateways/trustless-gateway/
Expand All @@ -60,26 +90,29 @@
throw new Error(`Signal to fetch raw block for CID ${cid} from gateway ${this.url} was aborted prior to fetch`)
}

const blockId = this.#uniqueBlockId(cid)
try {
this.#attempts++
const res = await fetch(gwUrl.toString(), {
signal,
headers: {
// also set header, just in case ?format= is filtered out by some
// reverse proxy
Accept: 'application/vnd.ipld.raw'
},
cache: 'force-cache'
})

this.log('GET %s %d', gwUrl, res.status)

if (!res.ok) {
this.#errors++
throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${this.url}`)
let pendingResponse: Promise<Uint8Array> | undefined = this.#pendingResponses.get(blockId)
if (pendingResponse == null) {
this.#attempts++
pendingResponse = fetch(gwUrl.toString(), {
signal,
headers: {
Accept: 'application/vnd.ipld.raw'
},
cache: 'force-cache'
}).then(async (res) => {
this.log('GET %s %d', gwUrl, res.status)
if (!res.ok) {
this.#errors++
throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${this.url}`)
}

Check warning on line 109 in packages/block-brokers/src/trustless-gateway/trustless-gateway.ts

View check run for this annotation

Codecov / codecov/patch

packages/block-brokers/src/trustless-gateway/trustless-gateway.ts#L107-L109

Added lines #L107 - L109 were not covered by tests
this.#successes++
return new Uint8Array(await res.arrayBuffer())
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved
})
this.#pendingResponses.set(blockId, pendingResponse)
}
this.#successes++
return new Uint8Array(await res.arrayBuffer())
return await pendingResponse
} catch (cause) {
// @ts-expect-error - TS thinks signal?.aborted can only be false now
// because it was checked for true above.
Expand All @@ -88,6 +121,8 @@
}
this.#errors++
throw new Error(`unable to fetch raw block for CID ${cid}`)
} finally {

Check warning on line 124 in packages/block-brokers/src/trustless-gateway/trustless-gateway.ts

View check run for this annotation

Codecov / codecov/patch

packages/block-brokers/src/trustless-gateway/trustless-gateway.ts#L124

Added line #L124 was not covered by tests
this.#pendingResponses.delete(blockId)
}
}

Expand Down Expand Up @@ -130,4 +165,14 @@
incrementInvalidBlocks (): void {
this.#invalidBlocks++
}

getStats (): TrustlessGatewayStats {
return {
attempts: this.#attempts,
errors: this.#errors,
invalidBlocks: this.#invalidBlocks,
successes: this.#successes,
pendingResponses: this.#pendingResponses.size
}
}
}
31 changes: 30 additions & 1 deletion packages/block-brokers/test/trustless-gateway.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
import { multiaddr } from '@multiformats/multiaddr'
import { uriToMultiaddr } from '@multiformats/uri-to-multiaddr'
import { expect } from 'aegir/chai'
import { CID } from 'multiformats/cid'
import * as raw from 'multiformats/codecs/raw'
import Sinon from 'sinon'
import { type StubbedInstance, stubConstructor, stubInterface } from 'sinon-ts'
import { TrustlessGatewayBlockBroker } from '../src/trustless-gateway/broker.js'
import { TrustlessGateway } from '../src/trustless-gateway/trustless-gateway.js'
import { createBlock } from './fixtures/create-block.js'
import type { Routing } from '@helia/interface'
import type { CID } from 'multiformats/cid'

describe('trustless-gateway-block-broker', () => {
let blocks: Array<{ cid: CID, block: Uint8Array }>
Expand Down Expand Up @@ -190,4 +190,33 @@

await expect(sessionBlockstore?.retrieve?.(blocks[0].cid)).to.eventually.deep.equal(blocks[0].block)
})

it('does not trigger new network requests if the same cid request is in-flight', async function () {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test was renamed in the last iteration from "[...] same block [...]" to "[...] same cid [...]" because we're not actually testing the uniqueBlock functionality here where we would prevent requests given separate CIDs if they're referencing the same bytes.

// from .aegir.js polka server
const cid = CID.parse('bafkqabtimvwgy3yk')
if (process.env.TRUSTLESS_GATEWAY == null) {
return this.skip()
}

Check warning on line 199 in packages/block-brokers/test/trustless-gateway.spec.ts

View check run for this annotation

Codecov / codecov/patch

packages/block-brokers/test/trustless-gateway.spec.ts#L198-L199

Added lines #L198 - L199 were not covered by tests
const trustlessGateway = new TrustlessGateway(process.env.TRUSTLESS_GATEWAY, defaultLogger())

// Call getRawBlock multiple times with the same CID
const promises = Array.from({ length: 10 }, async () => trustlessGateway.getRawBlock(cid))

// Wait for both promises to resolve
const [block1, ...blocks] = await Promise.all(promises)

// Assert that all calls to getRawBlock returned the same block
for (const block of blocks) {
expect(block).to.deep.equal(block1)
}

expect(trustlessGateway.getStats()).to.deep.equal({
// attempt is only incremented when a new request is made
attempts: 1,
errors: 0,
invalidBlocks: 0,
successes: 1,
pendingResponses: 0 // the queue is empty
})
})
})
Loading