diff --git a/typeorm/typeorm-store/package.json b/typeorm/typeorm-store/package.json index 4424dce6b..f467708cd 100644 --- a/typeorm/typeorm-store/package.json +++ b/typeorm/typeorm-store/package.json @@ -20,7 +20,7 @@ "dependencies": { "@subsquid/typeorm-config": "^4.1.1", "@subsquid/util-internal": "^3.2.0", - "@subsquid/logger": "~1.3.3" + "@subsquid/logger": "^1.3.3" }, "peerDependencies": { "typeorm": "^0.3.17", diff --git a/typeorm/typeorm-store/src/database.ts b/typeorm/typeorm-store/src/database.ts index cfeb813eb..183a261cf 100644 --- a/typeorm/typeorm-store/src/database.ts +++ b/typeorm/typeorm-store/src/database.ts @@ -249,7 +249,7 @@ export class TypeormDatabase { await store.flush() if (this.resetMode === 'BATCH') store.reset() } finally { - store._close() + store['isClosed'] = true } } diff --git a/typeorm/typeorm-store/src/store.ts b/typeorm/typeorm-store/src/store.ts index ccf22ba61..73fb6a745 100644 --- a/typeorm/typeorm-store/src/store.ts +++ b/typeorm/typeorm-store/src/store.ts @@ -1,4 +1,11 @@ -import {EntityManager, EntityMetadata, FindOptionsOrder, FindOptionsRelations, FindOptionsWhere} from 'typeorm' +import { + EntityManager, + EntityMetadata, + EntityNotFoundError, + FindOptionsOrder, + FindOptionsRelations, + FindOptionsWhere, +} from 'typeorm' import {EntityTarget} from 'typeorm/common/EntityTarget' import {ChangeWriter} from './utils/changeWriter' import {StateManager} from './utils/stateManager' @@ -96,16 +103,10 @@ export class Store { this.cacheMode = cacheMode } - /** - * @internal - */ get _em() { return this.em } - /** - * @internal - */ get _state() { return this.state } @@ -237,7 +238,7 @@ export class Store { private async _delete(metadata: EntityMetadata, ids: string[]) { this.logger?.debug(`delete ${metadata.name} ${ids.length} entities`) await this.changes?.writeDelete(metadata, ids) - await this.em.delete(metadata.target, ids) // TODO: should be split by chunks too? + await this.em.delete(metadata.target, ids) // NOTE: should be split by chunks too? } async count(target: EntityTarget, options?: FindManyOptions): Promise { @@ -250,16 +251,19 @@ export class Store { target: EntityTarget, where: FindOptionsWhere | FindOptionsWhere[] ): Promise { - return await this.performRead(async () => { - return await this.em.countBy(target, where) - }) + return await this.count(target, {where}) } async find(target: EntityTarget, options: FindManyOptions): Promise { return await this.performRead(async () => { const {cache, ...opts} = options const res = await this.em.find(target, opts) - if (cache ?? this.cacheMode === 'ALL') this.persistEntities(target, res, options?.relations) + if (cache ?? this.cacheMode === 'ALL') { + const metadata = this.getEntityMetadata(target) + for (const e of res) { + this.cacheEntity(metadata, e) + } + } return res }) } @@ -269,11 +273,7 @@ export class Store { where: FindOptionsWhere | FindOptionsWhere[], cache?: boolean ): Promise { - return await this.performRead(async () => { - const res = await this.em.findBy(target, where) - if (cache ?? this.cacheMode === 'ALL') this.persistEntities(target, res) - return res - }) + return await this.find(target, {where, cache}) } async findOne( @@ -283,8 +283,11 @@ export class Store { return await this.performRead(async () => { const {cache, ...opts} = options const res = await this.em.findOne(target, opts).then(noNull) - if (res != null && (cache ?? this.cacheMode === 'ALL')) - this.persistEntities(target, res, options?.relations) + if (cache ?? this.cacheMode === 'ALL') { + const metadata = this.getEntityMetadata(target) + const idOrEntity = res || getIdFromWhere(options.where) + this.cacheEntity(metadata, idOrEntity) + } return res }) } @@ -294,22 +297,13 @@ export class Store { where: FindOptionsWhere | FindOptionsWhere[], cache?: boolean ): Promise { - return await this.performRead(async () => { - const res = await this.em.findOneBy(target, where).then(noNull) - if (res != null && (cache ?? this.cacheMode === 'ALL')) this.persistEntities(target, res) - - return res - }) + return await this.findOne(target, {where, cache}) } async findOneOrFail(target: EntityTarget, options: FindOneOptions): Promise { - return await this.performRead(async () => { - const {cache, ...opts} = options - const res = await this.em.findOneOrFail(target, opts) - if (cache ?? this.cacheMode === 'ALL') this.persistEntities(target, res, options?.relations) - - return res - }) + const res = await this.findOne(target, options) + if (res == null) throw new EntityNotFoundError(target, options.where) + return res } async findOneByOrFail( @@ -317,11 +311,9 @@ export class Store { where: FindOptionsWhere | FindOptionsWhere[], cache?: boolean ): Promise { - return await this.performRead(async () => { - const res = await this.em.findOneByOrFail(target, where) - if (cache || this.cacheMode === 'ALL') this.persistEntities(target, res) - return res - }) + const res = await this.findOneBy(target, where, cache) + if (res == null) throw new EntityNotFoundError(target, where) + return res } async get(target: EntityTarget, id: string): Promise @@ -331,28 +323,18 @@ export class Store { idOrOptions: string | GetOptions ): Promise { const {id, relations} = parseGetOptions(idOrOptions) - const metadata = this.getEntityMetadata(target) let entity = this.state.get(metadata, id, relations) if (entity !== undefined) return noNull(entity) - return await this.findOne(target, {where: {id} as any, relations, cache: true}) } - async getOrFail(entityClass: EntityTarget, id: string): Promise - async getOrFail(entityClass: EntityTarget, options: GetOptions): Promise - async getOrFail( - entityClass: EntityTarget, - idOrOptions: string | GetOptions - ): Promise { + async getOrFail(target: EntityTarget, id: string): Promise + async getOrFail(target: EntityTarget, options: GetOptions): Promise + async getOrFail(target: EntityTarget, idOrOptions: string | GetOptions): Promise { const options = parseGetOptions(idOrOptions) - let e = await this.get(entityClass, options) - - if (e == null) { - const metadata = this.getEntityMetadata(entityClass) - throw new Error(`Missing entity ${metadata.name} with id "${options.id}"`) - } - + let e = await this.get(target, options) + if (e == null) throw new EntityNotFoundError(target, options.id) return e } @@ -365,89 +347,66 @@ export class Store { this.pendingCommit = createFuture() try { - const {upserts, inserts, deletes, extraUpserts} = this.state.computeChangeSets() + await this.state.performUpdate(async ({upserts, inserts, deletes, extraUpserts}) => { + for (const {metadata, entities} of upserts) { + await this._upsert(metadata, entities) + } - for (const {metadata, entities} of upserts) { - await this._upsert(metadata, entities) - } + for (const {metadata, entities} of inserts) { + await this._insert(metadata, entities) + } - for (const {metadata, entities} of inserts) { - await this._insert(metadata, entities) - } + for (const {metadata, ids} of deletes) { + await this._delete(metadata, ids) + } - for (const {metadata, ids} of deletes) { - await this._delete(metadata, ids) - } + for (const {metadata, entities} of extraUpserts) { + await this._upsert(metadata, entities) + } + }) - for (const {metadata, entities} of extraUpserts) { - await this._upsert(metadata, entities) + if (this.resetMode === 'FLUSH' || reset) { + this.reset() } - - this.state.clear() } finally { this.pendingCommit.resolve() this.pendingCommit = undefined } - - if (this.resetMode === 'FLUSH' || reset) { - this.reset() - } - } - - /** - * @internal - */ - _close() { - this.isClosed = true } private async performRead(cb: () => Promise): Promise { - this.assetNotClosed() - + this.assertNotClosed() if (this.flushMode === 'AUTO' || this.flushMode === 'ALWAYS') { await this.flush() } - return await cb() } private async performWrite(cb: () => Promise): Promise { + this.assertNotClosed() await this.pendingCommit?.promise() - - this.assetNotClosed() - await cb() - if (this.flushMode === 'ALWAYS') { await this.flush() } } - private assetNotClosed() { + private assertNotClosed() { assert(!this.isClosed, `too late to perform db updates, make sure you haven't forgot to await on db query`) } - private persistEntities( - target: EntityTarget, - e: E | E[], - relationMask?: FindOptionsRelations - ) { - const metadata = this.getEntityMetadata(target) - - e = Array.isArray(e) ? e : [e] - for (const entity of e) { - traverseEntity({ - metadata, - entity, - relationMask: relationMask || null, - cb: (e, md) => this.state?.persist(md, e), - }) + private cacheEntity(metadata: EntityMetadata, entityOrId?: E | string) { + if (entityOrId == null) { + return + } else if (typeof entityOrId === 'string') { + this.state.settle(metadata, entityOrId) + } else { + traverseEntity(metadata, entityOrId, (e, md) => this.state.persist(md, e)) } } private getEntityMetadata(target: EntityTarget) { - const em = this.em - return em.connection.getMetadata(target) + return this.em.connection.getMetadata(target) } } @@ -458,3 +417,7 @@ function parseGetOptions(idOrOptions: string | GetOptions): GetOptions return idOrOptions } } + +function getIdFromWhere(where?: FindOptionsWhere) { + return typeof where?.id === 'string' ? where.id : undefined +} diff --git a/typeorm/typeorm-store/src/utils/cacheMap.ts b/typeorm/typeorm-store/src/utils/cacheMap.ts index 0d7e7e9a9..7a9af0233 100644 --- a/typeorm/typeorm-store/src/utils/cacheMap.ts +++ b/typeorm/typeorm-store/src/utils/cacheMap.ts @@ -18,13 +18,13 @@ export class CacheMap { return this.getEntityCache(metadata)?.get(id) } - exist(metadata: EntityMetadata, id: string): boolean { + has(metadata: EntityMetadata, id: string): boolean { const cacheMap = this.getEntityCache(metadata) const cachedEntity = cacheMap.get(id) return !!cachedEntity?.value } - ensure(metadata: EntityMetadata, id: string): void { + settle(metadata: EntityMetadata, id: string): void { const cacheMap = this.getEntityCache(metadata) if (cacheMap.has(id)) return diff --git a/typeorm/typeorm-store/src/utils/misc.ts b/typeorm/typeorm-store/src/utils/misc.ts index 37b7a0956..c293f9baa 100644 --- a/typeorm/typeorm-store/src/utils/misc.ts +++ b/typeorm/typeorm-store/src/utils/misc.ts @@ -89,43 +89,21 @@ export function mergeRelations( return mergedObject } -export function traverseEntity({ - metadata, - entity, - relationMask, - cb, -}: { - metadata: EntityMetadata - entity: EntityLiteral - relationMask: FindOptionsRelations | null +export function traverseEntity( + metadata: EntityMetadata, + entity: EntityLiteral, cb: (e: EntityLiteral, metadata: EntityMetadata) => void -}) { - if (relationMask != null) { - for (const relation of metadata.relations) { - const inverseRelationMask = relationMask[relation.propertyName] - if (!inverseRelationMask) continue - - const inverseEntity = relation.getEntityValue(entity) - if (inverseEntity == null) continue - - if (relation.isOneToMany || relation.isManyToMany) { - if (!Array.isArray(inverseEntity)) continue - for (const ie of inverseEntity) { - traverseEntity({ - metadata: relation.inverseEntityMetadata, - entity: ie, - relationMask: inverseRelationMask === true ? null : inverseRelationMask, - cb, - }) - } - } else { - traverseEntity({ - metadata: relation.inverseEntityMetadata, - entity: inverseEntity, - relationMask: inverseRelationMask === true ? null : inverseRelationMask, - cb, - }) +) { + for (const relation of metadata.relations) { + const inverseEntity = relation.getEntityValue(entity) + if (inverseEntity == null) continue + + if (relation.isOneToMany || relation.isManyToMany) { + for (const ie of inverseEntity) { + traverseEntity(relation.inverseEntityMetadata, ie, cb) } + } else { + traverseEntity(relation.inverseEntityMetadata, inverseEntity, cb) } } diff --git a/typeorm/typeorm-store/src/utils/stateManager.ts b/typeorm/typeorm-store/src/utils/stateManager.ts index 28d067436..53ca38d4c 100644 --- a/typeorm/typeorm-store/src/utils/stateManager.ts +++ b/typeorm/typeorm-store/src/utils/stateManager.ts @@ -147,6 +147,10 @@ export class StateManager { this.cacheMap.add(metadata, entity) } + settle(metadata: EntityMetadata, id: string) { + this.cacheMap.settle(metadata, id) + } + isInserted(metadata: EntityMetadata, id: string) { return this.getState(metadata, id) === ChangeType.Insert } @@ -159,18 +163,17 @@ export class StateManager { return this.getState(metadata, id) === ChangeType.Delete } - clear(): void { - this.logger?.debug(`clear states`) - this.stateMap.clear() + isExists(metadata: EntityMetadata, id: string) { + return this.cacheMap.has(metadata, id) } reset(): void { - this.clear() - this.logger?.debug(`reset cache`) + this.logger?.debug(`reset`) + this.stateMap.clear() this.cacheMap.clear() } - computeChangeSets() { + async performUpdate(cb: (cs: ChangeSets) => Promise) { const changeSets: ChangeSets = { inserts: [], upserts: [], @@ -239,7 +242,9 @@ export class StateManager { } } - return changeSets + await cb(changeSets) + + this.stateMap.clear() } private extractExtraUpsert(metadata: EntityMetadata, entity: E) {