Skip to content

Commit

Permalink
Mesh filesystem now supports file deletion and watching
Browse files Browse the repository at this point in the history
  • Loading branch information
pajama-coder committed Aug 9, 2024
1 parent bd85b92 commit 99840a0
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 67 deletions.
15 changes: 8 additions & 7 deletions agent/fs.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
//
// Mesh Filesystem
//
// /home
// /users
// /<username>
// /apps
// /<provider>
// /<appname>
// /shared
// /<username>
// /pkg
// /apps
// /<provider>
// /<appname>
// /apps
// /<provider>
// /<appname>
// /users
// /<username>
// /shared
// /<username>
//

export default function(storeDir) {
Expand Down
101 changes: 74 additions & 27 deletions agent/mesh.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ export default function (rootDir, config) {
var agentLog = []
var meshErrors = []
var fs = null
var fsWatchers = []
var fsLastChangeTime = Date.now()
var apps = null
var exited = false

Expand Down Expand Up @@ -1066,6 +1068,49 @@ export default function (rootDir, config) {
})
}

function watchFile(prefix) {
var resolve
var promise = new Promise(cb => resolve = cb)
var isWatching = fsWatchers.length > 0
var entry = fsWatchers.find(([k]) => k === prefix)
if (entry) {
entry[1].push(resolve)
} else {
fsWatchers.push([prefix, [resolve]])
}
if (!isWatching) startWatchingFiles()
return promise
}

function startWatchingFiles() {
new Timeout(5).wait().then(
() => discoverFiles(fsLastChangeTime)
).then(files => {
var paths = Object.keys(files)
if (paths.length > 0) {
fsLastChangeTime = Object.values(files).map(f => f.since).reduce(
(max, t) => (t > max ? t : max), fsLastChangeTime
)
fsWatchers.forEach(
([prefix, watchers]) => {
var changes = []
paths.forEach(path => {
if (path.startsWith(prefix)) {
changes.push(path)
}
})
if (changes.length > 0) {
watchers.forEach(resolve => resolve([...changes]))
watchers.length = 0
}
}
)
fsWatchers = fsWatchers.filter(([_, watchers]) => watchers.length > 0)
}
if (fsWatchers.length > 0) startWatchingFiles()
})
}

//
// Mesh API exposed to apps
//
Expand All @@ -1086,32 +1131,22 @@ export default function (rootDir, config) {
}

function makeAppFilesystem(provider, app) {
var prefixHome = `/home/${username}/`
var prefixHomeApp = prefixHome + `apps/${provider}/${app}/`
var prefixApp = `/apps/${provider}/${app}/`
var matchShared = new http.Match(`/shared/{username}/apps/${provider}/${app}/*`)
var matchShared = new http.Match('/shared/{username}' + prefixApp + '*')
var pathApp = `/apps/${provider}/${app}`
var pathUser = `/users/${username}/`
var pathShared = `/shared/`
var pathLocal = `/local/`
var pathAppUser = pathApp + pathUser
var pathAppShared = pathApp + pathShared

function pathToLocal(path) {
if (path.startsWith(prefixHomeApp)) {
return prefixHome + path.substring(prefixHomeApp.length)
}
var params = matchShared(path)
if (params) {
return `/shared/${params.username}/${params['*']}`
if (path.startsWith(pathAppUser) || path.startsWith(pathAppShared)) {
return path.substring(pathApp.length)
}
}

function pathToGlobal(path) {
if (path.startsWith(prefixHome)) {
return prefixHomeApp + path.substring(prefixHome.length)
}
if (path.startsWith('/shared/')) {
path = path.substring(8)
var i = path.indexOf('/')
if (i <= 0 || i + 1 == path.length) return
var username = path.substring(0, i)
return `/shared/${username}` + prefixApp + path.substring(i + 1)
if (path.startsWith(pathUser) || path.startsWith(pathShared)) {
return pathApp + path
}
}

Expand All @@ -1124,11 +1159,13 @@ export default function (rootDir, config) {
var list = []
Object.keys(files).forEach(path => {
var localPath = pathToLocal(path)
if (localPath && localPath.startsWith(prefix)) list.push(localPath)
if (localPath && localPath.startsWith(prefix)) {
list.push(path)
}
})
db.allFiles(meshName, provider, app).forEach(
path => {
var fullPath = '/local' + path
var fullPath = os.path.join(pathLocal, path)
if (fullPath.startsWith(prefix)) list.push(fullPath)
}
)
Expand All @@ -1139,9 +1176,9 @@ export default function (rootDir, config) {

function read(pathname) {
var path = os.path.normalize(pathname)
if (path.startsWith('/local/')) {
if (path.startsWith(pathLocal)) {
return Promise.resolve(
db.getFile(meshName, provider, app, path.substring(6))
db.getFile(meshName, provider, app, path.substring(pathLocal.length))
)
} else {
var globalPath = pathToGlobal(path)
Expand All @@ -1156,8 +1193,8 @@ export default function (rootDir, config) {
function write(pathname, data) {
if (typeof data === 'string') data = new Data(data)
var path = os.path.normalize(pathname)
if (path.startsWith('/local/')) {
db.setFile(meshName, provider, app, path.substring(6), data)
if (path.startsWith(pathLocal)) {
db.setFile(meshName, provider, app, path.substring(pathLocal.length), data)
} else {
var globalPath = pathToGlobal(path)
if (globalPath) {
Expand All @@ -1167,7 +1204,17 @@ export default function (rootDir, config) {
}
}

return { dir, read, write }
function watch(prefix) {
if (!prefix.endsWith('/')) prefix += '/'
var globalPath = pathToGlobal(prefix)
if (globalPath) {
return watchFile(globalPath).then(
paths => paths.map(path => pathToLocal(path)).filter(p=>p)
)
}
}

return { dir, read, write, watch }
}

function remoteQueryLog(ep) {
Expand Down
16 changes: 8 additions & 8 deletions cli/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ function doCommand(meshName, epName, argv, program) {
var type = args['<object type>']
var name = args['<object name>']
switch (type) {
case 'app': return selectMesh(meshName).then(mesh => downloadApp(name, mesh))
case 'app': return selectMeshEndpoint(meshName, epName).then(({ mesh, ep }) => downloadApp(name, mesh, ep))
case 'file': return selectMesh(meshName).then(mesh => downloadFile(name, args['--output'], mesh))
default: return invalidObjectType(type, 'download')
}
Expand All @@ -294,7 +294,7 @@ function doCommand(meshName, epName, argv, program) {
var type = args['<object type>']
var name = args['<object name>']
switch (type) {
case 'app': return selectMesh(meshName).then(mesh => eraseApp(name, mesh))
case 'app': return selectMeshEndpoint(meshName, epName).then(({ mesh, ep }) => eraseApp(name, mesh, ep))
case 'file': return selectMesh(meshName).then(mesh => eraseFile(name, mesh))
default: return invalidObjectType(type, 'erase')
}
Expand All @@ -314,7 +314,7 @@ function doCommand(meshName, epName, argv, program) {
var type = args['<object type>']
var name = args['<object name>']
switch (type) {
case 'app': return selectMesh(meshName).then(mesh => publishApp(name, mesh))
case 'app': return selectMeshEndpoint(meshName, epName).then(({ mesh, ep }) => publishApp(name, mesh, ep))
case 'file': return selectMesh(meshName).then(mesh => publishFile(name, args['--input'], mesh))
default: return invalidObjectType(type, 'publish')
}
Expand All @@ -329,7 +329,7 @@ function doCommand(meshName, epName, argv, program) {
var type = args['<object type>']
var name = args['<object name>']
switch (type) {
case 'app': return selectMesh(meshName).then(mesh => unpublishApp(name, mesh))
case 'app': return selectMeshEndpoint(meshName, epName).then(({ mesh, ep }) => unpublishApp(name, mesh, ep))
case 'file': return selectMesh(meshName).then(mesh => unpublishFile(name, mesh))
default: return invalidObjectType(type, 'unpublish')
}
Expand Down Expand Up @@ -1039,7 +1039,7 @@ function describeApp(name, mesh, ep) {
// Command: download
//

function downloadApp(name) {
function downloadApp(name, mesh, ep) {
var appName = normalizeAppName(name)
if (!appName) throw 'missing app name'
return selectApp(appName, mesh, ep).then(app => {
Expand Down Expand Up @@ -1069,7 +1069,7 @@ function downloadFile(name, output, mesh) {
// Command: erase
//

function eraseApp(name, mesh) {
function eraseApp(name, mesh, ep) {
var appName = normalizeAppName(name)
if (!appName) throw 'missing app name'
return selectApp(appName, mesh, ep).then(app => {
Expand All @@ -1091,7 +1091,7 @@ function eraseFile(name, mesh) {
// Command: publish
//

function publishApp(name) {
function publishApp(name, mesh, ep) {
var appName = normalizeAppName(name)
if (!appName) throw 'missing app name'
return selectApp(appName, mesh, ep).then(app => {
Expand All @@ -1118,7 +1118,7 @@ function publishFile(name, input, mesh) {
// Command: unpublish
//

function unpublishApp(name) {
function unpublishApp(name, mesh, ep) {
var appName = normalizeAppName(name)
if (!appName) throw 'missing app name'
return selectApp(appName, mesh, ep).then(app => {
Expand Down
5 changes: 3 additions & 2 deletions docs/Agent-API.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Agent API

The Agent API is organized into 4 types of resources that can be accessed by the standard HTTP semantics. The 4 types of resources are:
The Agent API is organized into 4 types of resources that can be accessed by standard HTTP semantics. The 4 types of resources are:

- Meshes
- Endpoints
Expand Down Expand Up @@ -163,8 +163,9 @@ Or returns raw binary content for `/file-data` based paths.
Paths and methods:

```
GET /api/meshes/{meshName}/files
GET /api/meshes/{meshName}/files[?since={time}]
GET /api/meshes/{meshName}/files/{pathname}
DELETE /api/meshes/{meshName}/files/{pathname}
GET /api/meshes/{meshName}/file-data/{pathname}
POST /api/meshes/{meshName}/file-data/{pathname}
DELETE /api/meshes/{meshName}/file-data/{pathname}
Expand Down
62 changes: 39 additions & 23 deletions hub/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,19 @@ var postFilesystem = pipeline($=>$
.replaceMessage(
function (req) {
var body = JSON.decode(req.body)
var prefix = `/home/${$endpoint.username}/`
var username = $endpoint.username
var prefixUser = `/users/${username}/`
var prefixShared = `/shared/${username}`
var matchAppUser = new http.Match(`/apps/{provider}/{appname}/users/${username}/*`)
var matchAppShared = new http.Match(`/apps/{provider}/{appname}/shared/${username}/*`)
var canUpdate = (path) => (
path.startsWith(prefixUser) ||
path.startsWith(prefixShared) ||
matchAppUser(path) ||
matchAppShared(path)
)
Object.entries(body).map(
([k, v]) => updateFileInfo(k, v, $endpoint.id, k.startsWith(prefix))
([k, v]) => updateFileInfo(k, v, $endpoint.id, canUpdate(k))
)
return new Message({ status: 201 })
}
Expand Down Expand Up @@ -588,27 +598,33 @@ function makeFileInfo(hash, size, time, since) {
}

function updateFileInfo(pathname, f, ep, update) {
var e = (files[pathname] ??= makeFileInfo('', 0, 0, 0))
var t1 = e['T']
var h1 = e['#']
var t2 = f['T']
var h2 = f['#']
if (h2 === h1) {
var sources = e['@']
if (!sources.includes(ep)) sources.push(ep)
if (update) e['T'] = Math.max(t1, t2)
} else if (t2 > t1 && update) {
e['#'] = h2
e['$'] = f['$']
e['T'] = t2
e['+'] = Date.now()
e['@'] = [ep]
db.setFile(pathname, {
hash: h2,
size: e['$'],
time: t2,
since: e['+'],
})
var e = files[pathname]
if (e || update) {
if (!e) e = files[pathname] = makeFileInfo('', 0, 0, 0)
var t1 = e['T']
var h1 = e['#']
var t2 = f['T']
var h2 = f['#']
if (h2 === h1) {
var sources = e['@']
if (!sources.includes(ep)) sources.push(ep)
if (update && t2 > t1) {
e['T'] = t2
e['+'] = Date.now()
}
} else if (t2 > t1 && update) {
e['#'] = h2
e['$'] = f['$']
e['T'] = t2
e['+'] = Date.now()
e['@'] = [ep]
db.setFile(pathname, {
hash: h2,
size: e['$'],
time: t2,
since: e['+'],
})
}
}
}

Expand Down

0 comments on commit 99840a0

Please sign in to comment.