Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
belopash committed May 27, 2024
1 parent 5403ef5 commit b44249d
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 148 deletions.
2 changes: 1 addition & 1 deletion typeorm/typeorm-store/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"dependencies": {
"@subsquid/typeorm-config": "^4.1.1",
"@subsquid/util-internal": "^3.2.0",
"@subsquid/logger": "~1.3.3"
"@subsquid/logger": "^1.3.3"
},
"peerDependencies": {
"typeorm": "^0.3.17",
Expand Down
2 changes: 1 addition & 1 deletion typeorm/typeorm-store/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ export class TypeormDatabase {
await store.flush()
if (this.resetMode === 'BATCH') store.reset()
} finally {
store._close()
store['isClosed'] = true
}
}

Expand Down
167 changes: 65 additions & 102 deletions typeorm/typeorm-store/src/store.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
import {EntityManager, EntityMetadata, FindOptionsOrder, FindOptionsRelations, FindOptionsWhere} from 'typeorm'
import {
EntityManager,
EntityMetadata,
EntityNotFoundError,
FindOptionsOrder,
FindOptionsRelations,
FindOptionsWhere,
} from 'typeorm'
import {EntityTarget} from 'typeorm/common/EntityTarget'
import {ChangeWriter} from './utils/changeWriter'
import {StateManager} from './utils/stateManager'
Expand Down Expand Up @@ -96,16 +103,10 @@ export class Store {
this.cacheMode = cacheMode
}

/**
* @internal
*/
get _em() {
return this.em
}

/**
* @internal
*/
get _state() {
return this.state
}
Expand Down Expand Up @@ -237,7 +238,7 @@ export class Store {
private async _delete(metadata: EntityMetadata, ids: string[]) {
this.logger?.debug(`delete ${metadata.name} ${ids.length} entities`)
await this.changes?.writeDelete(metadata, ids)
await this.em.delete(metadata.target, ids) // TODO: should be split by chunks too?
await this.em.delete(metadata.target, ids) // NOTE: should be split by chunks too?
}

async count<E extends EntityLiteral>(target: EntityTarget<E>, options?: FindManyOptions<E>): Promise<number> {
Expand All @@ -250,16 +251,19 @@ export class Store {
target: EntityTarget<E>,
where: FindOptionsWhere<E> | FindOptionsWhere<E>[]
): Promise<number> {
return await this.performRead(async () => {
return await this.em.countBy(target, where)
})
return await this.count(target, {where})
}

async find<E extends EntityLiteral>(target: EntityTarget<E>, options: FindManyOptions<E>): Promise<E[]> {
return await this.performRead(async () => {
const {cache, ...opts} = options
const res = await this.em.find(target, opts)
if (cache ?? this.cacheMode === 'ALL') this.persistEntities(target, res, options?.relations)
if (cache ?? this.cacheMode === 'ALL') {
const metadata = this.getEntityMetadata(target)
for (const e of res) {
this.cacheEntity(metadata, e)
}
}
return res
})
}
Expand All @@ -269,11 +273,7 @@ export class Store {
where: FindOptionsWhere<E> | FindOptionsWhere<E>[],
cache?: boolean
): Promise<E[]> {
return await this.performRead(async () => {
const res = await this.em.findBy(target, where)
if (cache ?? this.cacheMode === 'ALL') this.persistEntities(target, res)
return res
})
return await this.find(target, {where, cache})
}

async findOne<E extends EntityLiteral>(
Expand All @@ -283,8 +283,11 @@ export class Store {
return await this.performRead(async () => {
const {cache, ...opts} = options
const res = await this.em.findOne(target, opts).then(noNull)
if (res != null && (cache ?? this.cacheMode === 'ALL'))
this.persistEntities(target, res, options?.relations)
if (cache ?? this.cacheMode === 'ALL') {
const metadata = this.getEntityMetadata(target)
const idOrEntity = res || getIdFromWhere(options.where)
this.cacheEntity(metadata, idOrEntity)
}
return res
})
}
Expand All @@ -294,34 +297,23 @@ export class Store {
where: FindOptionsWhere<E> | FindOptionsWhere<E>[],
cache?: boolean
): Promise<E | undefined> {
return await this.performRead(async () => {
const res = await this.em.findOneBy(target, where).then(noNull)
if (res != null && (cache ?? this.cacheMode === 'ALL')) this.persistEntities(target, res)

return res
})
return await this.findOne(target, {where, cache})
}

async findOneOrFail<E extends EntityLiteral>(target: EntityTarget<E>, options: FindOneOptions<E>): Promise<E> {
return await this.performRead(async () => {
const {cache, ...opts} = options
const res = await this.em.findOneOrFail(target, opts)
if (cache ?? this.cacheMode === 'ALL') this.persistEntities(target, res, options?.relations)

return res
})
const res = await this.findOne(target, options)
if (res == null) throw new EntityNotFoundError(target, options.where)
return res
}

async findOneByOrFail<E extends EntityLiteral>(
target: EntityTarget<E>,
where: FindOptionsWhere<E> | FindOptionsWhere<E>[],
cache?: boolean
): Promise<E> {
return await this.performRead(async () => {
const res = await this.em.findOneByOrFail(target, where)
if (cache || this.cacheMode === 'ALL') this.persistEntities(target, res)
return res
})
const res = await this.findOneBy(target, where, cache)
if (res == null) throw new EntityNotFoundError(target, where)
return res
}

async get<E extends EntityLiteral>(target: EntityTarget<E>, id: string): Promise<E | undefined>
Expand All @@ -331,28 +323,18 @@ export class Store {
idOrOptions: string | GetOptions<E>
): Promise<E | undefined> {
const {id, relations} = parseGetOptions(idOrOptions)

const metadata = this.getEntityMetadata(target)
let entity = this.state.get<E>(metadata, id, relations)
if (entity !== undefined) return noNull(entity)

return await this.findOne(target, {where: {id} as any, relations, cache: true})
}

async getOrFail<E extends EntityLiteral>(entityClass: EntityTarget<E>, id: string): Promise<E>
async getOrFail<E extends EntityLiteral>(entityClass: EntityTarget<E>, options: GetOptions<E>): Promise<E>
async getOrFail<E extends EntityLiteral>(
entityClass: EntityTarget<E>,
idOrOptions: string | GetOptions<E>
): Promise<E> {
async getOrFail<E extends EntityLiteral>(target: EntityTarget<E>, id: string): Promise<E>
async getOrFail<E extends EntityLiteral>(target: EntityTarget<E>, options: GetOptions<E>): Promise<E>
async getOrFail<E extends EntityLiteral>(target: EntityTarget<E>, idOrOptions: string | GetOptions<E>): Promise<E> {
const options = parseGetOptions(idOrOptions)
let e = await this.get(entityClass, options)

if (e == null) {
const metadata = this.getEntityMetadata(entityClass)
throw new Error(`Missing entity ${metadata.name} with id "${options.id}"`)
}

let e = await this.get(target, options)
if (e == null) throw new EntityNotFoundError(target, options.id)
return e
}

Expand All @@ -365,89 +347,66 @@ export class Store {

this.pendingCommit = createFuture()
try {
const {upserts, inserts, deletes, extraUpserts} = this.state.computeChangeSets()
await this.state.performUpdate(async ({upserts, inserts, deletes, extraUpserts}) => {
for (const {metadata, entities} of upserts) {
await this._upsert(metadata, entities)
}

for (const {metadata, entities} of upserts) {
await this._upsert(metadata, entities)
}
for (const {metadata, entities} of inserts) {
await this._insert(metadata, entities)
}

for (const {metadata, entities} of inserts) {
await this._insert(metadata, entities)
}
for (const {metadata, ids} of deletes) {
await this._delete(metadata, ids)
}

for (const {metadata, ids} of deletes) {
await this._delete(metadata, ids)
}
for (const {metadata, entities} of extraUpserts) {
await this._upsert(metadata, entities)
}
})

for (const {metadata, entities} of extraUpserts) {
await this._upsert(metadata, entities)
if (this.resetMode === 'FLUSH' || reset) {
this.reset()
}

this.state.clear()
} finally {
this.pendingCommit.resolve()
this.pendingCommit = undefined
}

if (this.resetMode === 'FLUSH' || reset) {
this.reset()
}
}

/**
* @internal
*/
_close() {
this.isClosed = true
}

private async performRead<T>(cb: () => Promise<T>): Promise<T> {
this.assetNotClosed()

this.assertNotClosed()
if (this.flushMode === 'AUTO' || this.flushMode === 'ALWAYS') {
await this.flush()
}

return await cb()
}

private async performWrite(cb: () => Promise<void>): Promise<void> {
this.assertNotClosed()
await this.pendingCommit?.promise()

this.assetNotClosed()

await cb()

if (this.flushMode === 'ALWAYS') {
await this.flush()
}
}

private assetNotClosed() {
private assertNotClosed() {
assert(!this.isClosed, `too late to perform db updates, make sure you haven't forgot to await on db query`)
}

private persistEntities<E extends EntityLiteral>(
target: EntityTarget<E>,
e: E | E[],
relationMask?: FindOptionsRelations<any>
) {
const metadata = this.getEntityMetadata(target)

e = Array.isArray(e) ? e : [e]
for (const entity of e) {
traverseEntity({
metadata,
entity,
relationMask: relationMask || null,
cb: (e, md) => this.state?.persist(md, e),
})
private cacheEntity<E extends EntityLiteral>(metadata: EntityMetadata, entityOrId?: E | string) {
if (entityOrId == null) {
return
} else if (typeof entityOrId === 'string') {
this.state.settle(metadata, entityOrId)
} else {
traverseEntity(metadata, entityOrId, (e, md) => this.state.persist(md, e))
}
}

private getEntityMetadata(target: EntityTarget<any>) {
const em = this.em
return em.connection.getMetadata(target)
return this.em.connection.getMetadata(target)
}
}

Expand All @@ -458,3 +417,7 @@ function parseGetOptions<E>(idOrOptions: string | GetOptions<E>): GetOptions<E>
return idOrOptions
}
}

function getIdFromWhere(where?: FindOptionsWhere<EntityLiteral>) {
return typeof where?.id === 'string' ? where.id : undefined
}
4 changes: 2 additions & 2 deletions typeorm/typeorm-store/src/utils/cacheMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ export class CacheMap {
return this.getEntityCache(metadata)?.get(id)
}

exist(metadata: EntityMetadata, id: string): boolean {
has(metadata: EntityMetadata, id: string): boolean {
const cacheMap = this.getEntityCache(metadata)
const cachedEntity = cacheMap.get(id)
return !!cachedEntity?.value
}

ensure(metadata: EntityMetadata, id: string): void {
settle(metadata: EntityMetadata, id: string): void {
const cacheMap = this.getEntityCache(metadata)

if (cacheMap.has(id)) return
Expand Down
48 changes: 13 additions & 35 deletions typeorm/typeorm-store/src/utils/misc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,43 +89,21 @@ export function mergeRelations<E extends ObjectLiteral>(
return mergedObject
}

export function traverseEntity({
metadata,
entity,
relationMask,
cb,
}: {
metadata: EntityMetadata
entity: EntityLiteral
relationMask: FindOptionsRelations<any> | null
export function traverseEntity(
metadata: EntityMetadata,
entity: EntityLiteral,
cb: (e: EntityLiteral, metadata: EntityMetadata) => void
}) {
if (relationMask != null) {
for (const relation of metadata.relations) {
const inverseRelationMask = relationMask[relation.propertyName]
if (!inverseRelationMask) continue

const inverseEntity = relation.getEntityValue(entity)
if (inverseEntity == null) continue

if (relation.isOneToMany || relation.isManyToMany) {
if (!Array.isArray(inverseEntity)) continue
for (const ie of inverseEntity) {
traverseEntity({
metadata: relation.inverseEntityMetadata,
entity: ie,
relationMask: inverseRelationMask === true ? null : inverseRelationMask,
cb,
})
}
} else {
traverseEntity({
metadata: relation.inverseEntityMetadata,
entity: inverseEntity,
relationMask: inverseRelationMask === true ? null : inverseRelationMask,
cb,
})
) {
for (const relation of metadata.relations) {
const inverseEntity = relation.getEntityValue(entity)
if (inverseEntity == null) continue

if (relation.isOneToMany || relation.isManyToMany) {
for (const ie of inverseEntity) {
traverseEntity(relation.inverseEntityMetadata, ie, cb)
}
} else {
traverseEntity(relation.inverseEntityMetadata, inverseEntity, cb)
}
}

Expand Down
Loading

0 comments on commit b44249d

Please sign in to comment.