Skip to content

Commit

Permalink
Merge branch 'main' into hyperdb-store-refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreiRegiani authored Nov 26, 2024
2 parents e9ceaeb + 9cce3f4 commit e0e29a6
Show file tree
Hide file tree
Showing 25 changed files with 274 additions and 9 deletions.
13 changes: 13 additions & 0 deletions docs/hyperdb-schema-diagram.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Hyperdb Schema Diagram

A visual representation of [`schema.json`](../hyperdb/schema/schema.json) using [mermaid.js](https://github.com/mermaid-js/mermaid).

```mermaid
erDiagram
node {
string host "Required"
unit port "Required"
}
dht {}
dht ||--o{ node : nodes
```
10 changes: 8 additions & 2 deletions gui/gui.js
Original file line number Diff line number Diff line change
Expand Up @@ -1498,15 +1498,21 @@ class PearGUI extends ReadyResource {
evt.reply('workerPipeClose')
})
pipe.on('data', (data) => { evt.reply('workerPipeData', data) })
pipe.on('end', () => { evt.reply('workerPipeData', null) })
pipe.on('error', (err) => { evt.reply('pipeError', err.stack) })
pipe.on('end', () => { evt.reply('workerPipeEnd') })
pipe.on('error', (err) => { evt.reply('workerPipeError', err.stack) })
})

electron.ipcMain.on('workerPipeId', (evt) => {
evt.returnValue = this.pipes.nextId()
return evt.returnValue
})

electron.ipcMain.on('workerPipeEnd', (evt, id) => {
const pipe = this.pipes.from(id)
if (!pipe) return
pipe.end()
})

electron.ipcMain.on('workerPipeClose', (evt, id) => {
const pipe = this.pipes.from(id)
if (!pipe) return
Expand Down
11 changes: 8 additions & 3 deletions gui/preload.js
Original file line number Diff line number Diff line change
Expand Up @@ -295,19 +295,24 @@ class IPC {
return stream
}

workerRun (link) {
workerRun (link, args) {
const id = electron.ipcRenderer.sendSync('workerPipeId')
electron.ipcRenderer.send('workerRun', link)
electron.ipcRenderer.send('workerRun', link, args)
const stream = new streamx.Duplex({
write (data, cb) {
electron.ipcRenderer.send('workerPipeWrite', id, data)
cb()
},
final (cb) {
electron.ipcRenderer.send('workerPipeEnd', id)
cb()
}
})
electron.ipcRenderer.on('workerPipeError', (e, stack) => {
stream.emit('error', new Error('Worker PipeError (from electron-main): ' + stack))
})
electron.ipcRenderer.on('workerClose', () => { stream.destroy() })
electron.ipcRenderer.on('workerPipeClose', () => { stream.destroy() })
electron.ipcRenderer.on('workerPipeEnd', () => { stream.end() })
stream.once('close', () => {
electron.ipcRenderer.send('workerPipeClose', id)
})
Expand Down
10 changes: 7 additions & 3 deletions lib/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,13 @@ class Worker {
this.#unref()
})
const pipe = sp.stdio[3]
pipe.on('end', () => {
if (pipe.ended === false) pipe.end()
})
pipe.pid = sp.pid
const pipeEmit = pipe.emit
pipe.emit = function (event, ...args) {
if (event === 'error' && args[0]?.code === 'ENOTCONN') return false
return pipeEmit.apply(this, [event, ...args])
}
pipe.on('end', () => pipe.end())
return pipe
}

Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
"brittle": "^3.5.2",
"graceful-goodbye": "^1.3.0",
"hyperschema": "^1.0.3",
"random-access-memory": "^6.2.1",
"standard": "^17.0.0"
},
"repository": {
Expand Down
8 changes: 7 additions & 1 deletion subsystems/sidecar/ops/stage.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,13 @@ module.exports = class Stage extends Opstream {
const opts = { ignore, dryRun, batch: true }
const builtins = isTerminal ? sidecar.gunk.bareBuiltins : sidecar.gunk.builtins
const linker = new ScriptLinker(src, { builtins })
const entrypoints = [...(state.manifest.main ? [state.main] : []), ...(state.manifest.pear?.stage?.entrypoints || [])].map((entry) => unixPathResolve('/', entry))

const mainExists = await src.entry(unixPathResolve('/', state.main)) !== null
const entrypoints = [
...(mainExists ? [state.main] : []),
...(state.manifest.pear?.stage?.entrypoints || [])
]
.map(entrypoint => unixPathResolve('/', entrypoint))

for (const entrypoint of entrypoints) {
const entry = await src.entry(entrypoint)
Expand Down
31 changes: 31 additions & 0 deletions test/03-worker.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ const helloWorld = path.join(Helper.localDir, 'test', 'fixtures', 'hello-world')
const printArgs = path.join(Helper.localDir, 'test', 'fixtures', 'print-args')
const workerRunner = path.join(Helper.localDir, 'test', 'fixtures', 'worker-runner')

const workerParent = path.join(Helper.localDir, 'test', 'fixtures', 'worker-parent')
const workerChild = path.join(Helper.localDir, 'test', 'fixtures', 'worker-child')
const workerEndFromChild = path.join(Helper.localDir, 'test', 'fixtures', 'worker-end-from-child')
const workerDestroyFromChild = path.join(Helper.localDir, 'test', 'fixtures', 'worker-destroy-from-child')
const workerEndFromParent = path.join(Helper.localDir, 'test', 'fixtures', 'worker-end-from-parent')
const workerDestroyFromParent = path.join(Helper.localDir, 'test', 'fixtures', 'worker-destroy-from-parent')

test('worker pipe', async function ({ is, plan, teardown }) {
plan(1)
const helper = new Helper()
Expand Down Expand Up @@ -99,3 +106,27 @@ test('worker should run as a link in a terminal app', async function ({ is, plan

await Helper.untilClose(pipe)
})

//
// test worker exit gracefully for terminal app
//

test('[terminal] worker exit when child calls pipe.end()', async function () {
const { pipe } = await Helper.run({ link: workerParent, args: [workerEndFromChild] })
await Helper.untilWorkerExit(pipe)
})

test('[terminal] worker exit when child calls pipe.destroy()', async function () {
const { pipe } = await Helper.run({ link: workerParent, args: [workerDestroyFromChild] })
await Helper.untilWorkerExit(pipe)
})

test('[terminal] worker exit when parent calls pipe.end()', async function () {
const { pipe } = await Helper.run({ link: workerEndFromParent, args: [workerChild] })
await Helper.untilWorkerExit(pipe)
})

test('[terminal] worker exit when parent calls pipe.destroy()', async function () {
const { pipe } = await Helper.run({ link: workerDestroyFromParent, args: [workerChild] })
await Helper.untilWorkerExit(pipe)
})
56 changes: 56 additions & 0 deletions test/07-warmup.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@ const test = require('brittle')
const path = require('bare-path')
const Helper = require('./helper')

const Hyperswarm = require('hyperswarm')
const Corestore = require('corestore')
const RAM = require('random-access-memory')
const Hyperdrive = require('hyperdrive')

const warmup = path.join(Helper.localDir, 'test', 'fixtures', 'warmup')
const desktop = path.join(Helper.localDir, 'test', 'fixtures', 'desktop-warmup')
const prefetch = path.join(Helper.localDir, 'test', 'fixtures', 'warmup-with-prefetch')
const appWithoutMain = path.join(Helper.localDir, 'test', 'fixtures', 'app-without-main')

test('stage warmup with entrypoints', async function ({ ok, is, plan, comment, teardown, timeout }) {
timeout(180000)
Expand Down Expand Up @@ -81,3 +87,53 @@ test('stage warmup with prefetch', async function ({ ok, is, plan, comment, tear
ok(warming.total > 0, 'Warmup total is correct')
is(warming.success, true, 'Warmup completed')
})

test('staged bundle contains entries metadata', async function ({ ok, is, plan, comment, teardown, timeout }) {
plan(2)

const dir = appWithoutMain

const helper = new Helper()
teardown(() => helper.close(), { order: Infinity })
await helper.ready()

const id = Math.floor(Math.random() * 10000)

comment('staging')
const staging = helper.stage({ channel: `test-${id}`, name: `test-${id}`, dir, dryRun: false, bare: true })
teardown(() => Helper.teardownStream(staging))

const staged = await Helper.pick(staging, [{ tag: 'warming' }, { tag: 'final' }])
await staged.final

comment('seeding')
const seeding = helper.seed({ channel: `test-${id}`, name: `test-${id}`, dir, key: null, cmdArgs: [] })
teardown(() => Helper.teardownStream(seeding))
const until = await Helper.pick(seeding, [{ tag: 'key' }, { tag: 'announced' }])
const key = await until.key
await until.announced

const swarm = new Hyperswarm({ bootstrap: Pear.config.dht.bootstrap })
const store = new Corestore(RAM)
await store.ready()
const drive = new Hyperdrive(store, key)
await drive.ready()

teardown(() => swarm.destroy())

swarm.on('connection', (conn) => {
drive.corestore.replicate(conn)
})

swarm.join(drive.discoveryKey)

await new Promise((resolve) => setTimeout(resolve, 500))

comment('bundle entries should contain metadata')
for await (const file of drive.list()) {
if (file.key === '/app.js' || file.key === '/dep.js') {
const entry = await drive.entry(file.key)
ok(entry.value.metadata)
}
}
})
1 change: 1 addition & 0 deletions test/fixtures/app-without-main/app.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
import dep from './dep.js'
1 change: 1 addition & 0 deletions test/fixtures/app-without-main/dep.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
console.log('-')
8 changes: 8 additions & 0 deletions test/fixtures/app-without-main/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<!DOCTYPE html>
<html>
<head>
<script type='module' src='./app.js'></script>
</head>
<body>
</body>
</html>
7 changes: 7 additions & 0 deletions test/fixtures/app-without-main/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"name": "app-without-main",
"pear": {
"name": "app-without-main",
"type": "desktop"
}
}
2 changes: 2 additions & 0 deletions test/fixtures/worker-child/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
const pipe = Pear.worker.pipe()
pipe.resume()
6 changes: 6 additions & 0 deletions test/fixtures/worker-child/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"name": "worker-child",
"main": "index.js",
"type": "module",
"pear": {}
}
4 changes: 4 additions & 0 deletions test/fixtures/worker-destroy-from-child/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
const pipe = Pear.worker.pipe()
pipe.resume()
await new Promise((resolve) => setTimeout(resolve, 1000))
pipe.destroy()
6 changes: 6 additions & 0 deletions test/fixtures/worker-destroy-from-child/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"name": "worker-destroy-from-child",
"main": "index.js",
"type": "module",
"pear": {}
}
22 changes: 22 additions & 0 deletions test/fixtures/worker-destroy-from-parent/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
const link = Bare.argv[Bare.argv.length - 1]
const pipe = Pear.worker.run(link)
pipe.resume()
await new Promise((resolve) => setTimeout(resolve, 1000))
pipe.destroy()
await untilExit(pipe)

async function untilExit (pipe, timeout = 5000) {
const start = Date.now()
while (isRunning(pipe)) {
if (Date.now() - start > timeout) throw new Error('timed out')
await new Promise((resolve) => setTimeout(resolve, 100))
}
}

function isRunning (pipe) {
try {
return process.kill(pipe.pid, 0)
} catch (err) {
return err.code === 'EPERM'
}
}
6 changes: 6 additions & 0 deletions test/fixtures/worker-destroy-from-parent/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"name": "worker-destroy-from-parent",
"main": "index.js",
"type": "module",
"pear": {}
}
4 changes: 4 additions & 0 deletions test/fixtures/worker-end-from-child/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
const pipe = Pear.worker.pipe()
pipe.resume()
await new Promise((resolve) => setTimeout(resolve, 1000))
pipe.end()
6 changes: 6 additions & 0 deletions test/fixtures/worker-end-from-child/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"name": "worker-end-from-child",
"main": "index.js",
"type": "module",
"pear": {}
}
22 changes: 22 additions & 0 deletions test/fixtures/worker-end-from-parent/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
const link = Bare.argv[Bare.argv.length - 1]
const pipe = Pear.worker.run(link)
pipe.resume()
await new Promise((resolve) => setTimeout(resolve, 1000))
pipe.end()
await untilExit(pipe)

async function untilExit (pipe, timeout = 5000) {
const start = Date.now()
while (isRunning(pipe)) {
if (Date.now() - start > timeout) throw new Error('timed out')
await new Promise((resolve) => setTimeout(resolve, 100))
}
}

function isRunning (pipe) {
try {
return process.kill(pipe.pid, 0)
} catch (err) {
return err.code === 'EPERM'
}
}
6 changes: 6 additions & 0 deletions test/fixtures/worker-end-from-parent/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"name": "worker-end-from-parent",
"main": "index.js",
"type": "module",
"pear": {}
}
20 changes: 20 additions & 0 deletions test/fixtures/worker-parent/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
const link = Bare.argv[Bare.argv.length - 1]
const pipe = Pear.worker.run(link)
pipe.resume()
await untilExit(pipe)

async function untilExit (pipe, timeout = 5000) {
const start = Date.now()
while (isRunning(pipe)) {
if (Date.now() - start > timeout) throw new Error('timed out')
await new Promise((resolve) => setTimeout(resolve, 100))
}
}

function isRunning (pipe) {
try {
return process.kill(pipe.pid, 0)
} catch (err) {
return err.code === 'EPERM'
}
}
6 changes: 6 additions & 0 deletions test/fixtures/worker-parent/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"name": "worker-parent",
"main": "index.js",
"type": "module",
"pear": {}
}
16 changes: 16 additions & 0 deletions test/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,22 @@ class Helper extends IPC.Client {
return res
}

static async isRunning (pipe) {
try {
return process.kill(pipe.pid, 0)
} catch (err) {
return err.code === 'EPERM'
}
}

static async untilWorkerExit (pipe, timeout = 5000) {
const start = Date.now()
while (await this.isRunning(pipe)) {
if (Date.now() - start > timeout) throw new Error('timed out')
await new Promise((resolve) => setTimeout(resolve, 100))
}
}

static async pick (stream, ptn = {}, by = 'tag') {
if (Array.isArray(ptn)) return this.#untils(stream, ptn, by)
for await (const output of stream) {
Expand Down

0 comments on commit e0e29a6

Please sign in to comment.