Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rocksdb: apply operates on memory view #191

Open
wants to merge 41 commits into
base: rocksdb
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
3633ee9
add simple append test
chm-diederichs Oct 24, 2024
471f5b1
add memory based store session
chm-diederichs Oct 24, 2024
01a4308
only memory view backed sessions are writable
chm-diederichs Oct 24, 2024
5df4642
apply writes to a memory batch, which is flushed after
chm-diederichs Oct 24, 2024
6155eba
get info from memory views during apply
chm-diederichs Nov 13, 2024
286e428
properly open and close writable core
chm-diederichs Nov 13, 2024
70ac55f
add missing store methods
chm-diederichs Nov 13, 2024
01f1b0e
only open store if needed
chm-diederichs Nov 13, 2024
3443f08
add simple reorg test
chm-diederichs Nov 13, 2024
90660f0
use branch deps
chm-diederichs Nov 13, 2024
f49df2b
pass memview length to system flush
chm-diederichs Nov 13, 2024
04e8ab5
emit truncate events on reorg
chm-diederichs Nov 13, 2024
ad5486e
flush truncations
chm-diederichs Nov 13, 2024
d7b17db
undoAll should truncate cores
chm-diederichs Nov 14, 2024
9af2c9e
truncate events emit length
chm-diederichs Nov 14, 2024
d795cd5
only unset system at end of apply
chm-diederichs Nov 15, 2024
1ce88d6
getWriterByKey should use applySystem by default
chm-diederichs Nov 15, 2024
ed738b7
always fully update system in case of truncations
chm-diederichs Nov 15, 2024
6a38da1
migrate system before making pre ff gets
chm-diederichs Nov 19, 2024
f51976a
skip failing memview tests
chm-diederichs Nov 19, 2024
fa8972c
do not reset writer from apply system
chm-diederichs Nov 19, 2024
a7c9715
missing close on apply store
chm-diederichs Nov 19, 2024
6d2d766
derive key from memview core
chm-diederichs Nov 19, 2024
4e32be3
remove log
chm-diederichs Nov 19, 2024
d154107
reduce the number of appends in ff tests
chm-diederichs Nov 20, 2024
26cb983
pass system to force reset and length to update batch
chm-diederichs Nov 20, 2024
7c50121
fix up fast-forward tests
chm-diederichs Nov 20, 2024
b6916ab
finally closes view store
chm-diederichs Nov 20, 2024
b1edbd6
pass apply system to version check
chm-diederichs Nov 20, 2024
0805034
snapshots should detach before hc state is mutated
chm-diederichs Nov 20, 2024
69fb820
no uncaught errors in upgrade tests
chm-diederichs Nov 20, 2024
229a9ac
fix linearizer tests and ready bee in snapshot open
chm-diederichs Nov 21, 2024
54e871e
always close apply store
chm-diederichs Nov 21, 2024
864dae3
refactor boot record
chm-diederichs Nov 21, 2024
ab18937
close writable core session immediately
chm-diederichs Nov 21, 2024
3e59328
indexed length should be set to batch flushed length
chm-diederichs Nov 21, 2024
a5a3773
bump BootRecord version to v1
chm-diederichs Nov 21, 2024
72ed0f3
make tests standard
chm-diederichs Nov 21, 2024
135e3f1
enable passing test
chm-diederichs Nov 22, 2024
0669fcc
skip failing corestore test
chm-diederichs Nov 22, 2024
491a3b9
wakeup add is now async because of user data api
chm-diederichs Nov 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
442 changes: 228 additions & 214 deletions index.js

Large diffs are not rendered by default.

143 changes: 105 additions & 38 deletions lib/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const { ViewRecord } = require('./messages')

const {
SESSION_CLOSED,
SESSION_NOT_WRITABLE,
REQUEST_CANCELLED,
BLOCK_NOT_AVAILABLE
} = require('hypercore/errors')
Expand Down Expand Up @@ -114,7 +115,7 @@ class Snapshot {
}

class AutocoreSession extends EventEmitter {
constructor (source, snapshot, indexed, valueEncoding) {
constructor (source, core, snapshot, indexed, valueEncoding) {
super()

this.isAutobase = true
Expand All @@ -125,12 +126,15 @@ class AutocoreSession extends EventEmitter {

this.indexed = !snapshot && indexed === true

this.writable = true // TODO: make this configurable
this.writable = core !== null

this.activeRequests = []
this.valueEncoding = valueEncoding || null
this.globalCache = source.base.globalCache

this._core = core
this._sourceLength = core ? core.length : -1

this._source = source
this._index = source.sessions.push(this) - 1
this._snapshot = snapshot
Expand All @@ -142,6 +146,10 @@ class AutocoreSession extends EventEmitter {
return this._source.base
}

get name () {
return this._source.name
}

get id () {
return this._source.id
}
Expand All @@ -159,15 +167,15 @@ class AutocoreSession extends EventEmitter {
}

get fork () {
return this._snapshot === null ? this._source.fork : this._snapshot.fork
return this._core !== null ? this._core.fork : this._snapshot === null ? this._source.fork : this._snapshot.fork
}

get byteLength () {
return this._snapshot === null ? this._source.core ? this._source.core.byteLength : 0 : this._snapshot.byteLength
return this._core !== null ? this._core.byteLength : this._snapshot === null ? this._source.core ? this._source.core.byteLength : 0 : this._snapshot.byteLength
}

get length () {
return this._snapshot === null ? this.indexed ? this.indexedLength : this._source.length : this._snapshot.length
return this._core !== null ? this._core.length : this._snapshot === null ? this.indexed ? this.indexedLength : this._source.length : this._snapshot.length
}

get indexedByteLength () {
Expand All @@ -179,26 +187,35 @@ class AutocoreSession extends EventEmitter {
}

get signedLength () {
return this._snapshot === null ? this._source.core.indexedLength : this._snapshot.getSignedLength()
return this._snapshot === null ? this._source.signedLength : this._snapshot.getSignedLength()
}

get manifest () {
return this._source.core ? this._source.core.manifest : null
}

getBackingCore () {
return this._source.core ? this._source.core : null
return this._core ? this._core : this._source.core ? this._source.core : null
}

async ready () {
if (this.opened) return
await this._source.ready()
if (this._core) await this._core.ready()
if (this._snapshot) await this._snapshot.ready()
if (this.opened) return
this.opened = true
this.emit('ready')
}

async getPrologue (length) {
if (!length) return null

const batch = await this.getBackingCore().restoreBatch(length)

return { hash: batch.hash(), length }
}

async getUserData (name) {
if (this.opened === false) await this.ready()
if (this.closing === true) throw SESSION_CLOSED()
Expand All @@ -216,6 +233,8 @@ class AutocoreSession extends EventEmitter {
snapshot ({ valueEncoding = this.valueEncoding } = {}) {
if (this.closing === true) throw SESSION_CLOSED()

if (this._core) return this._core.snapshot({ valueEncoding })

return this._snapshot === null
? this._source.createSnapshot(valueEncoding)
: this._source._createSession(this._snapshot.clone(), valueEncoding)
Expand Down Expand Up @@ -268,6 +287,8 @@ class AutocoreSession extends EventEmitter {
throw BLOCK_NOT_AVAILABLE()
}

if (this._core) return this._core.get(index, opts)

return this._snapshot !== null ? this._snapshot.get(index, opts) : this._source.get(index, opts)
}

Expand All @@ -279,6 +300,7 @@ class AutocoreSession extends EventEmitter {

async append (block) {
if (this.opened === false) await this.ready()
if (!this.writable) throw SESSION_NOT_WRITABLE()
if (this.closing === true) throw SESSION_CLOSED()

const blocks = Array.isArray(block) ? block : [block]
Expand All @@ -292,11 +314,15 @@ class AutocoreSession extends EventEmitter {
else buffers[i] = b4a.from(blk)
}

return this._source._append(buffers)
this.base._onviewappend(this._source, blocks.length)

await this._core.append(buffers)
}

async close () {
this.closing = true

if (this._core !== null) this._core.close()
if (this.opened === false) await this.ready()

if (this.closed) return
Expand Down Expand Up @@ -390,6 +416,10 @@ module.exports = class Autocore extends ReadyResource {
return this.indexedLength + this.indexing
}

get signedLength () {
return this.core.flushedLength
}

_registerSystemCore () {
this._registerWakeupExtension()
this._registerFastForwardListener()
Expand Down Expand Up @@ -430,7 +460,7 @@ module.exports = class Autocore extends ReadyResource {
if (this.core) await this._updateBatch(core, length)
else this.core = core

this._updateCoreState(length)
this._updateCoreState(this.core.flushedLength)

if (swap) await prevOriginalCore.close()

Expand All @@ -447,7 +477,7 @@ module.exports = class Autocore extends ReadyResource {
await core.ready()
}

await this._updateBatch(core)
await this._updateBatch(core, length)
}

async _updateBatch (core, length) {
Expand Down Expand Up @@ -485,40 +515,43 @@ module.exports = class Autocore extends ReadyResource {
await Promise.resolve() // wait a tick so this doesn't run sync in the constructor...
await this.base._presystem

const sys = this.base._initialSystem

await this.originalCore.ready()

if (this.base.encryptionKey && !this.originalCore.encryption) {
await this.originalCore.setEncryptionKey(this.base._viewStore.getBlockKey(this.name), { isBlockKey: true })
}

if (this.base._initialViews) {
for (let i = 0; i < this.base._initialViews.length; i++) {
const { name, key, length } = this.base._initialViews[i]
if (name !== this.name) continue
for (let i = 0; i < this.base._initialViews.length; i++) {
if (this.base._initialViews[i] !== this.name) continue
this.systemIndex = i
break
}

this.systemIndex = i - 1
await this._ensureCore(key, length, false)
if (this.systemIndex === -1 && !this._isSystem() && sys) {
for (let i = 0; i < sys.views.length; i++) {
if (!b4a.equals(this.key, sys.views[i].key)) continue
this.systemIndex = i
break
}
}

if (this.systemIndex === -1 && !this._isSystem() && this.base._initialSystem) {
for (let i = 0; i < this.base._initialSystem.views.length; i++) {
const { key } = this.base._initialSystem.views[i]

if (!b4a.equals(this.key, key)) continue

this.systemIndex = i
}
if (this.systemIndex !== -1) {
const { key, length } = sys.views[this.systemIndex]
await this._ensureCore(key, length, false)
}

// register handlers if needed
if (this._isSystem()) this._registerSystemCore()
if (this._isSystem()) {
if (sys) await this._ensureCore(sys.core.key, sys.core.length, false)
this._registerSystemCore()
}

if (!this.core) {
this.core = this.originalCore.session({ name: 'batch' })
await this.core.ready()
this._updateCoreState(this.originalCore.flushedLength)
this._updateCoreState(this.originalCore.length)
await this._ensureUserData(this.core, false)
}

Expand Down Expand Up @@ -557,15 +590,46 @@ module.exports = class Autocore extends ReadyResource {
}

createSession (valueEncoding, indexed) {
return this._createSession(null, valueEncoding, indexed)
return this._createSession(null, valueEncoding, { indexed, writable: false })
}

createSnapshot (valueEncoding) {
return this._createSession(new Snapshot(this, this.opened), valueEncoding)
return this._createSession(new Snapshot(this, this.opened), valueEncoding, { indexed: false, writable: false })
}

createWritable (valueEncoding, checkout) {
// BIG HACK: need to have proper way to express pending system index
if (checkout === 0) this.base._viewStore._unindex(this)

return this._createSession(null, valueEncoding, { indexed: false, writable: true, checkout })
}

_createSession (snapshot, valueEncoding, indexed) {
return new AutocoreSession(this, snapshot, indexed, valueEncoding ? c.from(valueEncoding) : null)
_createSession (snapshot, valueEncoding, { indexed, writable, checkout } = {}) {
if (writable) {
const core = this.core.session({
name: 'writable',
draft: true,
checkout,
overwrite: true,
parent: this.core
})

return new AutocoreSession(this, core, null, false, valueEncoding ? c.from(valueEncoding) : null)
}

return new AutocoreSession(this, null, snapshot, indexed, valueEncoding ? c.from(valueEncoding) : null)
}

async flushWriteBatch (batch, treeLength) {
const reorg = treeLength < this.core.length

if (reorg) this._detachSnapshots(treeLength)
if (reorg && batch.length === 0) this.base._viewStore._unindex(this)

await this.core.state.overwrite(batch._core.state, { treeLength })

if (reorg) this._emitTruncate(treeLength)
this._emitAppend()
}

async seek (bytes, opts) {
Expand Down Expand Up @@ -621,12 +685,12 @@ module.exports = class Autocore extends ReadyResource {
}

async truncate (newLength) {
await this._truncateAndDetach(newLength)
this.fork++

for (const session of this.sessions) {
if (session.snapshotted === false) session.emit('truncate', newLength, this.fork)
}
if (newLength === 0) this.base._viewStore._unindex(this)
await this._detachSnapshots(newLength, newLength)
this._emitTruncate(newLength, this.fork)
return this.core.truncate(newLength, this.core.fork)
}

async checkpoint () {
Expand Down Expand Up @@ -697,6 +761,12 @@ module.exports = class Autocore extends ReadyResource {
}
}

_emitTruncate (length, fork) {
for (const session of this.sessions) {
if (session.snapshotted === false && session.indexed === false) session.emit('truncate', length, fork)
}
}

_emitIndexedAppend () {
for (const session of this.sessions) {
if (session.indexed) session.emit('append')
Expand Down Expand Up @@ -828,12 +898,11 @@ module.exports = class Autocore extends ReadyResource {
this._shifted = 0
}

_truncateAndDetach (sharedLength) {
_detachSnapshots (sharedLength) {
assert(this.indexedLength <= sharedLength && sharedLength <= this.length, 'Invalid truncation')

// if same len, nothing to do...
if (sharedLength === this.length) return
if (sharedLength === 0) this.base._viewStore._unindex(this)

let maxSnap = 0
for (const snap of this._pendingSnapshots) {
Expand All @@ -846,8 +915,6 @@ module.exports = class Autocore extends ReadyResource {
const snap = this._pendingSnapshots[i]
if (snap.length > sharedLength) snap.detach(this.core, sharedLength, true)
}

return this.core.truncate(sharedLength, this.core.fork)
}
}

Expand Down
34 changes: 25 additions & 9 deletions lib/messages.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,26 +131,42 @@ const Wakeup = {

const Views = c.array(c.string)

const V0BootRecord = {
preencode () {
throw new Error('version 0 records cannot be encoded')
},
encode () {
throw new Error('version 0 records cannot be encoded')
},
decode (state) {
const indexed = Checkout.decode(state)
const heads = Clock.decode(state)
const views = Views.decode(state)

return { version: 0, indexed, heads, views }
}
}

const BootRecord = {
preencode (state, m) {
c.uint.preencode(state, 0) // version
Checkout.preencode(state, m.indexed)
Clock.preencode(state, m.heads)
c.uint.preencode(state, 1) // version
c.fixed32.preencode(state, m.key)
Views.preencode(state, m.views)
},
encode (state, m) {
c.uint.encode(state, 0) // version
Checkout.encode(state, m.indexed)
Clock.encode(state, m.heads)
c.uint.encode(state, 1) // version
c.fixed32.encode(state, m.key)
Views.encode(state, m.views)
},
decode (state) {
const v = c.uint.decode(state)
assert(v === 0, 'Unsupported version: ' + v)
if (v === 0) return V0BootRecord.decode(state)

assert(v === 1, 'Unsupported version: ' + v)

return {
indexed: Checkout.decode(state),
heads: Clock.decode(state),
version: 1,
key: c.fixed32.decode(state),
views: Views.decode(state)
}
}
Expand Down
Loading