From 2f7bbbe5c3947460eb6336e1153851de69a38ee8 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Fri, 23 Aug 2024 17:36:54 -0400 Subject: [PATCH] [WIP] Records Subscriptions in `@web5/api` (#522) Add API for subscribing to records. --- .changeset/lovely-rules-fold.md | 5 + packages/api/README.md | 60 ++++++++- packages/api/src/dwn-api.ts | 89 +++++++++++-- packages/api/src/record.ts | 29 ++++- packages/api/src/subscription-util.ts | 33 +++++ packages/api/tests/dwn-api.spec.ts | 172 +++++++++++++++++++++++++- 6 files changed, 374 insertions(+), 14 deletions(-) create mode 100644 .changeset/lovely-rules-fold.md create mode 100644 packages/api/src/subscription-util.ts diff --git a/.changeset/lovely-rules-fold.md b/.changeset/lovely-rules-fold.md new file mode 100644 index 000000000..15d163211 --- /dev/null +++ b/.changeset/lovely-rules-fold.md @@ -0,0 +1,5 @@ +--- +"@web5/api": patch +--- + +Add `records.subscribe()` functionality to the DwnApi diff --git a/packages/api/README.md b/packages/api/README.md index f78d22721..d94157399 100644 --- a/packages/api/README.md +++ b/packages/api/README.md @@ -20,6 +20,7 @@ The SDK is currently still under active development, but having entered the Tech - [API Documentation](#api-documentation) - [Web5.connect](#web5connectoptions) - [web5.dwn.records.query](#web5dwnrecordsqueryrequest) + - [web5.dwn.records.subscribe](#web5dwnrecordssubscriberequest) - [web5.dwn.records.create](#web5dwnrecordscreaterequest) - [web5.dwn.records.write](#web5dwnrecordswriterequest) - [web5.dwn.records.read](#web5dwnrecordsreadrequest) @@ -233,6 +234,59 @@ The query `response` contains the following properties: - **`records`** - _`Records array`_ (_optional_): the array of `Records` returned if the request was successful. - **`cursor`** - _`messageCid string`_ (_optional_): the `messageCid` of the last message returned in the results if there are exist additional records beyond the specified `limit` in the `query`. +### **`web5.dwn.records.subscribe(request)`** + +Method for subscribing to either the locally connected DWeb Node or any remote DWeb Node specified in the `from` property. + +```javascript +// This invocation will subscribe the user's own DWeb Nodes + +const subscriptionHandler = (record) => { + console.log("received", record); +}; + +const { status } = await web5.dwn.records.subscribe({ + message: { + filter: { + protocol: "https://schema.org/protocols/social", + }, + }, + subscriptionHandler, +}); + +console.log(status.code === 200); // successful subscription + +// This invocation will query Bob's DWeb Nodes +const { status } = await web5.dwn.records.query({ + from: "did:example:bob", + message: { + filter: { + protocol: "https://schema.org/protocols/social", + }, + }, + subscriptionHandler, +}); + +console.log(status.code === 200); // successful subscription +``` + +#### **Request** + +The query `request` contains the following properties: + +- **`from`** - _`DID string`_ (_optional_): the decentralized identifier of the DWeb Node the subscribe will receive results from. +- **`message`** - _`object`_: the properties of the DWeb Node Message Descriptor that will be used to construct a valid record subscription: + - **`filter`** - _`object`_: properties against which results of the subscription will be filtered: + - **`recordId`** - _`string`_ (_optional_): the record ID string that identifies the record data you are fetching. + - **`protocol`** - _`URI string`_ (_optional_): the URI of the protocol bucket in which to subscribe to. + - **`protocolPath`** - _`string`_ (_optional_): the path to the record in the protocol configuration. + - **`contextId`** _`string`_ (_optional_): the `recordId` of a root record of a protocol. + - **`parentId`** _`string`_ (_optional_): the `recordId` of a the parent of a protocol record. + - **`recipient`** - _`string`_ (_optional_): the DID in the `recipient` field of the record. + - **`schema`** - _`URI string`_ (_optional_): the URI of the schema bucket in which to subscribe to. + - **`dataFormat`** - _`Media Type string`_ (_optional_): the IANA string corresponding with the format of the data to filter for. See IANA's Media Type list here: https://www.iana.org/assignments/media-types/media-types.xhtml + - **`subscriptionHandler`** - _`function`_: The handler function which emits a `Record` object when any matching records arrive. + ### **`web5.dwn.records.create(request)`** Method for creating a new record and storing it in the user's local DWeb Node, remote DWeb Nodes, or another party's DWeb Nodes (if permitted). @@ -497,7 +551,9 @@ metadata associated with a DID. #### **Usage** ```javascript -const { didDocument } = await web5.did.resolve('did:dht:qftx7z968xcpfy1a1diu75pg5meap3gdtg6ezagaw849wdh6oubo'); +const { didDocument } = await web5.did.resolve( + "did:dht:qftx7z968xcpfy1a1diu75pg5meap3gdtg6ezagaw849wdh6oubo" +); ``` #### **Parameters** @@ -514,7 +570,7 @@ The method returns a DID resolution result as a JavaScript object. The structure #### **Notes** -- The resolution process for some DID methods like DID DHT involve network requests to the relevant DID verifiable +- The resolution process for some DID methods like DID DHT involve network requests to the relevant DID verifiable data registry or a resolver endpoint, which may introduce latency based on the network conditions and the specific DID method utilized. diff --git a/packages/api/src/dwn-api.ts b/packages/api/src/dwn-api.ts index 7b007e93d..556225592 100644 --- a/packages/api/src/dwn-api.ts +++ b/packages/api/src/dwn-api.ts @@ -16,12 +16,12 @@ import { DwnMessage, DwnResponse, DwnMessageParams, + DwnMessageSubscription, DwnResponseStatus, CachedPermissions, ProcessDwnRequest, DwnPaginationCursor, - DwnDataEncodedRecordsWriteMessage, - AgentPermissionsApi + AgentPermissionsApi, } from '@web5/agent'; import { isEmptyObject } from '@web5/common'; @@ -32,8 +32,7 @@ import { dataToBlob } from './utils.js'; import { Protocol } from './protocol.js'; import { PermissionGrant } from './permission-grant.js'; import { PermissionRequest } from './permission-request.js'; -import { DwnMessagesPermissionScope } from '@web5/agent'; -import { DwnRecordsPermissionScope } from '@web5/agent'; +import { SubscriptionUtil } from './subscription-util.js'; /** * Represents the request payload for fetching permission requests from a Decentralized Web Node (DWN). @@ -179,7 +178,7 @@ export type RecordsQueryResponse = DwnResponseStatus & { /** If there are additional results, the messageCid of the last record will be returned as a pagination cursor. */ cursor?: DwnPaginationCursor; -}; +} /** * Represents a request to read a specific record from a Decentralized Web Node (DWN). @@ -206,7 +205,36 @@ export type RecordsReadRequest = { export type RecordsReadResponse = DwnResponseStatus & { /** The record retrieved by the read operation. */ record: Record; -}; +} + +/** Subscription handler for Records */ +export type RecordsSubscriptionHandler = (record: Record) => void; + +/** + * Represents a request to subscribe to records from a Decentralized Web Node (DWN). + * + * This request type is used to specify the target DWN from which records matching the subscription + * criteria should be emitted. It's useful for being notified in real time when records are written, deleted or modified. + */ +export type RecordsSubscribeRequest = { + /** Optional DID specifying the remote target DWN tenant to subscribe from. */ + from?: string; + + /** The parameters for the subscription operation, detailing the criteria for the subscription filter */ + message: Omit; + + /** The handler to process the subscription events */ + subscriptionHandler: RecordsSubscriptionHandler; +} + +/** Encapsulates the response from a DWN RecordsSubscriptionRequest */ +export type RecordsSubscribeResponse = DwnResponseStatus & { + /** + * Represents the subscription that was created. Includes an ID and the close method to stop the subscription. + * + * */ + subscription?: DwnMessageSubscription; +} /** * Defines a request to write (create) a record to a Decentralized Web Node (DWN). @@ -252,7 +280,7 @@ export type RecordsWriteResponse = DwnResponseStatus & { * DWN as a result of the write operation. */ record?: Record -}; +} /** * Interface to interact with DWN Records and Protocols @@ -582,7 +610,6 @@ export class DwnApi { return { status }; }, - /** * Query a single or multiple records based on the given filter */ @@ -733,6 +760,52 @@ export class DwnApi { return { record, status }; }, + /** + * Subscribes to records based on the given filter and emits events to the `subscriptionHandler`. + * + * @param request must include the `message` with the subscription filter and the `subscriptionHandler` to process the events. + * @returns the subscription status and the subscription object used to close the subscription. + */ + subscribe: async (request: RecordsSubscribeRequest): Promise => { + const agentRequest: ProcessDwnRequest = { + /** + * The `author` is the DID that will sign the message and must be the DID the Web5 app is + * connected with and is authorized to access the signing private key of. + */ + author : this.connectedDid, + messageParams : request.message, + messageType : DwnInterface.RecordsSubscribe, + /** + * The `target` is the DID of the DWN tenant under which the subscribe operation will be executed. + * If `from` is provided, the subscribe operation will be executed on a remote DWN. + * Otherwise, the local DWN will execute the subscribe operation. + */ + target : request.from || this.connectedDid, + + /** + * The handler to process the subscription events. + */ + subscriptionHandler: SubscriptionUtil.recordSubscriptionHandler({ + agent : this.agent, + connectedDid : this.connectedDid, + request + }) + }; + + let agentResponse: DwnResponse; + + if (request.from) { + agentResponse = await this.agent.sendDwnRequest(agentRequest); + } else { + agentResponse = await this.agent.processDwnRequest(agentRequest); + } + + const reply = agentResponse.reply; + const { status, subscription } = reply; + + return { status, subscription }; + }, + /** * Writes a record to the DWN * diff --git a/packages/api/src/record.ts b/packages/api/src/record.ts index bbaa3a356..3c8f045ff 100644 --- a/packages/api/src/record.ts +++ b/packages/api/src/record.ts @@ -18,7 +18,8 @@ import { DwnDateSort, DwnPaginationCursor, isDwnMessage, - SendDwnRequest + SendDwnRequest, + isRecordsWrite } from '@web5/agent'; import { Convert, isEmptyObject, NodeStream, removeUndefinedProperties, Stream } from '@web5/common'; @@ -72,10 +73,22 @@ export type RecordModel = ImmutableRecordProperties & OptionalRecordProperties & * * @beta */ -export type RecordOptions = DwnMessage[DwnInterface.RecordsWrite] & { +export type RecordOptions = DwnMessage[DwnInterface.RecordsWrite | DwnInterface.RecordsDelete] & { /** The DID that signed the record. */ author: string; + /** The attestation signature(s) for the record. */ + attestation?: DwnMessage[DwnInterface.RecordsWrite]['attestation']; + + /** The encryption information for the record. */ + encryption?: DwnMessage[DwnInterface.RecordsWrite]['encryption']; + + /** The contextId associated with the record. */ + contextId?: string; + + /** The unique identifier of the record */ + recordId?: string; + /** The DID of the DWN tenant under which record operations are being performed. */ connectedDid: string; @@ -360,7 +373,7 @@ export class Record implements RecordModel { this._descriptor = options.descriptor; this._encryption = options.encryption; this._initialWrite = options.initialWrite; - this._recordId = options.recordId; + this._recordId = this.isRecordsDeleteDescriptor(options.descriptor) ? options.descriptor.recordId : options.recordId; this._protocolRole = options.protocolRole; if (options.encodedData) { @@ -539,6 +552,7 @@ export class Record implements RecordModel { /** * Send the current record to a remote DWN by specifying their DID * If no DID is specified, the target is assumed to be the owner (connectedDID). + * * If an initial write is present and the Record class send cache has no awareness of it, the initial write is sent first * (vs waiting for the regular DWN sync) * @@ -949,4 +963,13 @@ export class Record implements RecordModel { } } } + + /** + * Checks if the descriptor is a RecordsDelete descriptor. + * + * @param descriptor a RecordsWrite or RecordsDelete descriptor + */ + private isRecordsDeleteDescriptor(descriptor: DwnMessageDescriptor[DwnInterface.RecordsWrite | DwnInterface.RecordsDelete]): descriptor is DwnMessageDescriptor[DwnInterface.RecordsDelete] { + return descriptor.interface + descriptor.method === DwnInterface.RecordsDelete; + } } \ No newline at end of file diff --git a/packages/api/src/subscription-util.ts b/packages/api/src/subscription-util.ts new file mode 100644 index 000000000..733647aee --- /dev/null +++ b/packages/api/src/subscription-util.ts @@ -0,0 +1,33 @@ +import { DwnRecordSubscriptionHandler, getRecordAuthor, Web5Agent } from '@web5/agent'; +import { RecordsSubscribeRequest } from './dwn-api.js'; +import { Record } from './record.js'; + +/** + * Utility class for dealing with subscriptions. + */ +export class SubscriptionUtil { + /** + * Creates a record subscription handler that can be used to process incoming {Record} messages. + */ + static recordSubscriptionHandler({ agent, connectedDid, request }:{ + agent: Web5Agent; + connectedDid: string; + request: RecordsSubscribeRequest; + }): DwnRecordSubscriptionHandler { + const { subscriptionHandler, from: remoteOrigin } = request; + + return async (event) => { + const { message, initialWrite } = event; + const author = getRecordAuthor(message); + const recordOptions = { + author, + connectedDid, + remoteOrigin, + initialWrite + }; + + const record = new Record(agent, { ...message, ...recordOptions }); + subscriptionHandler(record); + }; + } +} \ No newline at end of file diff --git a/packages/api/tests/dwn-api.spec.ts b/packages/api/tests/dwn-api.spec.ts index 984b92bbc..d98e399e0 100644 --- a/packages/api/tests/dwn-api.spec.ts +++ b/packages/api/tests/dwn-api.spec.ts @@ -9,9 +9,10 @@ import { DwnApi } from '../src/dwn-api.js'; import { testDwnUrl } from './utils/test-config.js'; import emailProtocolDefinition from './fixtures/protocol-definitions/email.json' assert { type: 'json' }; import photosProtocolDefinition from './fixtures/protocol-definitions/photos.json' assert { type: 'json' }; -import { DwnInterfaceName, DwnMethodName, PermissionsProtocol, Time } from '@tbd54566975/dwn-sdk-js'; +import { DwnInterfaceName, DwnMethodName, PermissionsProtocol, Poller, RecordsWrite, Time } from '@tbd54566975/dwn-sdk-js'; import { PermissionGrant } from '../src/permission-grant.js'; import { Web5 } from '../src/web5.js'; +import { Record } from '../src/record.js'; let testDwnUrls: string[] = [testDwnUrl]; @@ -1248,6 +1249,7 @@ describe('DwnApi', () => { expect(writeResult.status.detail).to.equal('Accepted'); expect(writeResult.record).to.exist; + // Delete the record await writeResult.record!.delete(); @@ -1372,6 +1374,174 @@ describe('DwnApi', () => { }); }); + describe('records.subscribe()', () => { + describe('agent', () => { + it('subscribes to records', async () => { + // configure a protocol + const protocolConfigure = await dwnAlice.protocols.configure({ + message: { definition: { ...emailProtocolDefinition, published: true } } + }); + expect(protocolConfigure.status.code).to.equal(202); + + // subscribe to all messages from the protocol + const records: Map = new Map(); + const subscriptionHandler = async (record: Record) => { + records.set(record.id, record); + }; + + const subscribeResult = await dwnAlice.records.subscribe({ + message: { + filter: { + protocol: emailProtocolDefinition.protocol + } + }, + subscriptionHandler + }); + expect(subscribeResult.status.code).to.equal(200); + + // write a record + const writeResult = await dwnAlice.records.write({ + data : 'Hello, world!', + message : { + recipient : bobDid.uri, + protocol : emailProtocolDefinition.protocol, + protocolPath : 'thread', + schema : emailProtocolDefinition.types.thread.schema, + dataFormat : 'text/plain' + } + }); + expect(writeResult.status.code).to.equal(202); + + // wait for the record to be received + await Poller.pollUntilSuccessOrTimeout(async () => { + expect(records.size).to.equal(1); + const record = records.get(writeResult.record.id); + expect(record.toJSON()).to.deep.equal(writeResult.record.toJSON()); + expect(record.deleted).to.be.false; + }); + + // delete the record using the original writeResult instance of it + const deleteResult = await writeResult.record.delete(); + expect(deleteResult.status.code).to.equal(202); + + // wait for the record state to be reflected as deleted + await Poller.pollUntilSuccessOrTimeout(async () => { + const record = records.get(writeResult.record.id); + expect(record).to.exist; + expect(record.deleted).to.be.true; + }); + + // write another record and delete the previous one, the state should be updated + const writeResult2 = await dwnAlice.records.write({ + data : 'Hello, world!', + message : { + recipient : bobDid.uri, + protocol : emailProtocolDefinition.protocol, + protocolPath : 'thread', + schema : emailProtocolDefinition.types.thread.schema, + dataFormat : 'text/plain' + } + }); + expect(writeResult2.status.code).to.equal(202); + + // wait for the record to be received + await Poller.pollUntilSuccessOrTimeout(async () => { + expect(records.size).to.equal(2); + const record = records.get(writeResult2.record.id); + expect(record.toJSON()).to.deep.equal(writeResult2.record.toJSON()); + expect(record.deleted).to.be.false; + }); + }); + }); + + describe('from: did', () => { + it('subscribes to records from remote', async () => { + // configure a protocol + const protocolConfigure = await dwnAlice.protocols.configure({ + message: { definition: { ...emailProtocolDefinition, published: true } } + }); + expect(protocolConfigure.status.code).to.equal(202); + const protocolSend = await protocolConfigure.protocol.send(aliceDid.uri); + expect(protocolSend.status.code).to.equal(202); + + // subscribe to all messages from the protocol + const records: Map = new Map(); + const subscriptionHandler = async (record: Record) => { + records.set(record.id, record); + }; + + const subscribeResult = await dwnAlice.records.subscribe({ + from : aliceDid.uri, + message : { + filter: { + protocol: emailProtocolDefinition.protocol + } + }, + subscriptionHandler + }); + expect(subscribeResult.status.code).to.equal(200); + + // write a record + const writeResult = await dwnAlice.records.write({ + data : 'Hello, world!', + message : { + recipient : bobDid.uri, + protocol : emailProtocolDefinition.protocol, + protocolPath : 'thread', + schema : emailProtocolDefinition.types.thread.schema, + dataFormat : 'text/plain' + } + }); + expect(writeResult.status.code).to.equal(202); + const writeResultSend = await writeResult.record.send(); + expect(writeResultSend.status.code).to.equal(202); + + // wait for the record to be received + await Poller.pollUntilSuccessOrTimeout(async () => { + expect(records.size).to.equal(1); + const record = records.get(writeResult.record.id); + expect(record.toJSON()).to.deep.equal(writeResult.record.toJSON()); + expect(record.deleted).to.be.false; + }); + + // delete the record using the original writeResult instance of it + const deleteResult = await writeResult.record.delete(); + expect(deleteResult.status.code).to.equal(202); + const deleteResultSend = await writeResult.record.send(); + expect(deleteResultSend.status.code).to.equal(202); + + // wait for the record state to be reflected as deleted + await Poller.pollUntilSuccessOrTimeout(async () => { + const record = records.get(writeResult.record.id); + expect(record).to.exist; + expect(record.deleted).to.be.true; + }); + + // write another record and delete the previous one, the state should be updated + const writeResult2 = await dwnAlice.records.write({ + data : 'Hello, world!', + message : { + recipient : bobDid.uri, + protocol : emailProtocolDefinition.protocol, + protocolPath : 'thread', + schema : emailProtocolDefinition.types.thread.schema, + dataFormat : 'text/plain' + } + }); + const writeResult2Send = await writeResult2.record.send(); + expect(writeResult2Send.status.code).to.equal(202); + + // wait for the record to be received + await Poller.pollUntilSuccessOrTimeout(async () => { + expect(records.size).to.equal(2); + const record = records.get(writeResult2.record.id); + expect(record.toJSON()).to.deep.equal(writeResult2.record.toJSON()); + expect(record.deleted).to.be.false; + }); + }); + }); + }); + describe('connected.findPermissionGrantForRequest', () => { it('caches result', async () => { // create a grant for bob