Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

typeorm: typeorm-store rework #293

Open
wants to merge 11 commits into
base: beta
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion typeorm/typeorm-store/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
},
"dependencies": {
"@subsquid/typeorm-config": "^4.1.1",
"@subsquid/util-internal": "^3.2.0"
"@subsquid/util-internal": "^3.2.0",
"@subsquid/logger": "^1.3.3"
},
"peerDependencies": {
"typeorm": "^0.3.17",
Expand Down
158 changes: 92 additions & 66 deletions typeorm/typeorm-store/src/database.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,39 @@
import {createOrmConfig} from '@subsquid/typeorm-config'
import {assertNotNull, last, maybeLast} from '@subsquid/util-internal'
import {assertNotNull, def, last, maybeLast} from '@subsquid/util-internal'
import assert from 'assert'
import {DataSource, EntityManager} from 'typeorm'
import {ChangeTracker, rollbackBlock} from './hot'
import {ChangeWriter, rollbackBlock} from './utils/changeWriter'
import {DatabaseState, FinalTxInfo, HashAndHeight, HotTxInfo} from './interfaces'
import {Store} from './store'
import {CacheMode, FlushMode, ResetMode, Store} from './store'
import {createLogger} from '@subsquid/logger'
import {StateManager} from './utils/stateManager'
import {sortMetadatasInCommitOrder} from './utils/commitOrder'
import {IsolationLevel} from './utils/tx'


export type IsolationLevel = 'SERIALIZABLE' | 'READ COMMITTED' | 'REPEATABLE READ'
export {IsolationLevel}


export interface TypeormDatabaseOptions {
supportHotBlocks?: boolean
isolationLevel?: IsolationLevel
flushMode?: FlushMode
resetMode?: ResetMode
cacheMode?: CacheMode
stateSchema?: string
projectDir?: string
}


const STATE_MANAGERS: WeakMap<DataSource, StateManager> = new WeakMap()


export class TypeormDatabase {
private statusSchema: string
private isolationLevel: IsolationLevel
private flushMode: FlushMode
private resetMode: ResetMode
private cacheMode: CacheMode
private con?: DataSource
private projectDir: string

Expand All @@ -29,6 +42,9 @@ export class TypeormDatabase {
constructor(options?: TypeormDatabaseOptions) {
this.statusSchema = options?.stateSchema || 'squid_processor'
this.isolationLevel = options?.isolationLevel || 'SERIALIZABLE'
this.resetMode = options?.resetMode || 'BATCH'
this.flushMode = options?.flushMode || 'AUTO'
this.cacheMode = options?.cacheMode || 'ALL'
this.supportsHotBlocks = options?.supportHotBlocks !== false
this.projectDir = options?.projectDir || process.cwd()
}
Expand All @@ -42,48 +58,46 @@ export class TypeormDatabase {
await this.con.initialize()

try {
return await this.con.transaction('SERIALIZABLE', em => this.initTransaction(em))
} catch(e: any) {
return await this.con.transaction('SERIALIZABLE', (em) => this.initTransaction(em))
} catch (e: any) {
await this.con.destroy().catch(() => {}) // ignore error
this.con = undefined
throw e
}
}

async disconnect(): Promise<void> {
await this.con?.destroy().finally(() => this.con = undefined)
await this.con?.destroy().finally(() => (this.con = undefined))
}

private async initTransaction(em: EntityManager): Promise<DatabaseState> {
let schema = this.escapedSchema()

await em.query(
`CREATE SCHEMA IF NOT EXISTS ${schema}`
)
await em.query(`CREATE SCHEMA IF NOT EXISTS ${schema}`)
await em.query(
`CREATE TABLE IF NOT EXISTS ${schema}.status (` +
`id int4 primary key, ` +
`height int4 not null, ` +
`hash text DEFAULT '0x', ` +
`nonce int4 DEFAULT 0`+
`)`
`id int4 primary key, ` +
`height int4 not null, ` +
`hash text DEFAULT '0x', ` +
`nonce int4 DEFAULT 0` +
`)`
)
await em.query( // for databases created by prev version of typeorm store
await em.query(
// for databases created by prev version of typeorm store
`ALTER TABLE ${schema}.status ADD COLUMN IF NOT EXISTS hash text DEFAULT '0x'`
)
await em.query( // for databases created by prev version of typeorm store
`ALTER TABLE ${schema}.status ADD COLUMN IF NOT EXISTS nonce int DEFAULT 0`
)
await em.query(
`CREATE TABLE IF NOT EXISTS ${schema}.hot_block (height int4 primary key, hash text not null)`
// for databases created by prev version of typeorm store
`ALTER TABLE ${schema}.status ADD COLUMN IF NOT EXISTS nonce int DEFAULT 0`
)
await em.query(`CREATE TABLE IF NOT EXISTS ${schema}.hot_block (height int4 primary key, hash text not null)`)
await em.query(
`CREATE TABLE IF NOT EXISTS ${schema}.hot_change_log (` +
`block_height int4 not null references ${schema}.hot_block on delete cascade, ` +
`index int4 not null, ` +
`change jsonb not null, ` +
`PRIMARY KEY (block_height, index)` +
`)`
`block_height int4 not null references ${schema}.hot_block on delete cascade, ` +
`index int4 not null, ` +
`change jsonb not null, ` +
`PRIMARY KEY (block_height, index)` +
`)`
)

let status: (HashAndHeight & {nonce: number})[] = await em.query(
Expand All @@ -94,9 +108,7 @@ export class TypeormDatabase {
status.push({height: -1, hash: '0x', nonce: 0})
}

let top: HashAndHeight[] = await em.query(
`SELECT height, hash FROM ${schema}.hot_block ORDER BY height`
)
let top: HashAndHeight[] = await em.query(`SELECT height, hash FROM ${schema}.hot_block ORDER BY height`)

return assertStateInvariants({...status[0], top})
}
Expand All @@ -110,15 +122,13 @@ export class TypeormDatabase {

assert(status.length == 1)

let top: HashAndHeight[] = await em.query(
`SELECT hash, height FROM ${schema}.hot_block ORDER BY height`
)
let top: HashAndHeight[] = await em.query(`SELECT hash, height FROM ${schema}.hot_block ORDER BY height`)

return assertStateInvariants({...status[0], top})
}

transact(info: FinalTxInfo, cb: (store: Store) => Promise<void>): Promise<void> {
return this.submit(async em => {
return this.submit(async (em) => {
belopash marked this conversation as resolved.
Show resolved Hide resolved
let state = await this.getState(em)
let {prevHead: prev, nextHead: next} = info

Expand Down Expand Up @@ -146,15 +156,21 @@ export class TypeormDatabase {
})
}

transactHot2(info: HotTxInfo, cb: (store: Store, sliceBeg: number, sliceEnd: number) => Promise<void>): Promise<void> {
return this.submit(async em => {
transactHot2(
info: HotTxInfo,
cb: (store: Store, sliceBeg: number, sliceEnd: number) => Promise<void>
): Promise<void> {
return this.submit(async (em) => {
let state = await this.getState(em)
let chain = [state, ...state.top]

assertChainContinuity(info.baseHead, info.newBlocks)
assert(info.finalizedHead.height <= (maybeLast(info.newBlocks) ?? info.baseHead).height)

assert(chain.find(b => b.hash === info.baseHead.hash), RACE_MSG)
assert(
chain.find((b) => b.hash === info.baseHead.hash),
RACE_MSG
)
if (info.newBlocks.length == 0) {
assert(last(chain).hash === info.baseHead.hash, RACE_MSG)
}
Expand All @@ -169,18 +185,14 @@ export class TypeormDatabase {
if (info.newBlocks.length) {
let finalizedEnd = info.finalizedHead.height - info.newBlocks[0].height + 1
if (finalizedEnd > 0) {
await this.performUpdates(store => cb(store, 0, finalizedEnd), em)
await this.performUpdates((store) => cb(store, 0, finalizedEnd), em)
} else {
finalizedEnd = 0
}
for (let i = finalizedEnd; i < info.newBlocks.length; i++) {
let b = info.newBlocks[i]
await this.insertHotBlock(em, b)
await this.performUpdates(
store => cb(store, i, i + 1),
em,
new ChangeTracker(em, this.statusSchema, b.height)
)
await this.performUpdates((store) => cb(store, i, i + 1), em, new ChangeWriter(em, this.statusSchema, b.height))
}
}

Expand All @@ -195,17 +207,14 @@ export class TypeormDatabase {
}

private deleteHotBlocks(em: EntityManager, finalizedHeight: number): Promise<void> {
return em.query(
`DELETE FROM ${this.escapedSchema()}.hot_block WHERE height <= $1`,
[finalizedHeight]
)
return em.query(`DELETE FROM ${this.escapedSchema()}.hot_block WHERE height <= $1`, [finalizedHeight])
}

private insertHotBlock(em: EntityManager, block: HashAndHeight): Promise<void> {
return em.query(
`INSERT INTO ${this.escapedSchema()}.hot_block (height, hash) VALUES ($1, $2)`,
[block.height, block.hash]
)
return em.query(`INSERT INTO ${this.escapedSchema()}.hot_block (height, hash) VALUES ($1, $2)`, [
block.height,
block.hash,
])
}

private async updateStatus(em: EntityManager, nonce: number, next: HashAndHeight): Promise<void> {
Expand All @@ -220,32 +229,30 @@ export class TypeormDatabase {

// Will never happen if isolation level is SERIALIZABLE or REPEATABLE_READ,
// but occasionally people use multiprocessor setups and READ_COMMITTED.
assert.strictEqual(
rowsChanged,
1,
RACE_MSG
)
assert.strictEqual(rowsChanged, 1, RACE_MSG)
}

private async performUpdates(
cb: (store: Store) => Promise<void>,
em: EntityManager,
changeTracker?: ChangeTracker
changeWriter?: ChangeWriter
): Promise<void> {
let running = true

let store = new Store(
() => {
assert(running, `too late to perform db updates, make sure you haven't forgot to await on db query`)
return em
},
changeTracker
)
let store = new Store({
em,
state: this.getStateManager(),
logger: this.getLogger(),
changes: changeWriter,
cacheMode: this.cacheMode,
flushMode: this.flushMode,
resetMode: this.resetMode,
})

try {
await cb(store)
await store.flush()
if (this.resetMode === 'BATCH') store.reset()
} finally {
running = false
store['isClosed'] = true
}
}

Expand All @@ -256,7 +263,7 @@ export class TypeormDatabase {
let con = this.con
assert(con != null, 'not connected')
return await con.transaction(this.isolationLevel, tx)
} catch(e: any) {
} catch (e: any) {
if (e.code == '40001' && retries) {
retries -= 1
} else {
Expand All @@ -270,6 +277,25 @@ export class TypeormDatabase {
let con = assertNotNull(this.con)
return con.driver.escape(this.statusSchema)
}

@def
private getLogger() {
return createLogger('sqd:typeorm-db')
}

private getStateManager() {
let con = assertNotNull(this.con)
let stateManager = STATE_MANAGERS.get(con)
if (stateManager != null) return stateManager

stateManager = new StateManager({
commitOrder: sortMetadatasInCommitOrder(con),
logger: this.getLogger(),
})
STATE_MANAGERS.set(con, stateManager)

return stateManager
}
}


Expand Down
4 changes: 2 additions & 2 deletions typeorm/typeorm-store/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export * from './database'
export {EntityClass, FindManyOptions, FindOneOptions, Store} from './store'
export * from './store'
export * from './decorators'
export * from './transformers'
export * from './transformers'
Loading
Loading