Merge branch 'async_fd'

This commit is contained in:
2026-02-25 16:03:55 -06:00
2 changed files with 158 additions and 24 deletions

164
cellfs.cm
View File

@@ -4,10 +4,11 @@ var fd = use('fd')
var miniz = use('miniz')
var qop = use('internal/qop')
var wildstar = use('internal/wildstar')
var blib = use('blob')
var mounts = []
var writepath = "."
var write_mount = null
function normalize_path(path) {
if (!path) return ""
@@ -30,7 +31,7 @@ function mount_exists(mount, path) {
result = mount.handle.stat(path) != null
} disruption {}
_check()
} else {
} else if (mount.type == 'fs') {
full_path = fd.join_paths(mount.source, path)
_check = function() {
st = fd.stat(full_path)
@@ -119,12 +120,12 @@ function resolve(path, must_exist) {
}
function mount(source, name) {
var st = fd.stat(source)
var st = null
var blob = null
var qop_archive = null
var zip = null
var _try_qop = null
var http = null
var mount_info = {
source: source,
@@ -134,6 +135,29 @@ function mount(source, name) {
zip_blob: null
}
if (starts_with(source, 'http://') || starts_with(source, 'https://')) {
http = use('http')
mount_info.type = 'http'
mount_info.handle = {
base_url: source,
get: function(path, callback) {
var url = source + '/' + path
$clock(function() {
var resp = http.request('GET', url, null, null)
if (resp && resp.status == 200) {
callback(resp.body)
} else {
callback(null, "HTTP " + text(resp ? resp.status : 0) + ": " + url)
}
})
}
}
push(mounts, mount_info)
return
}
st = fd.stat(source)
if (st.isDirectory) {
mount_info.type = 'fs'
} else if (st.isFile) {
@@ -146,18 +170,18 @@ function mount(source, name) {
_try_qop()
if (qop_archive) {
mount_info.type = 'qop'
mount_info.handle = qop_archive
mount_info.zip_blob = blob
mount_info.type = 'qop'
mount_info.handle = qop_archive
mount_info.zip_blob = blob
} else {
zip = miniz.read(blob)
if (!is_object(zip) || !is_function(zip.count)) {
log.error("Invalid archive file (not zip or qop): " + source); disrupt
}
zip = miniz.read(blob)
if (!is_object(zip) || !is_function(zip.count)) {
log.error("Invalid archive file (not zip or qop): " + source); disrupt
}
mount_info.type = 'zip'
mount_info.handle = zip
mount_info.zip_blob = blob
mount_info.type = 'zip'
mount_info.handle = zip
mount_info.zip_blob = blob
}
} else {
log.error("Unsupported mount source type: " + source); disrupt
@@ -191,11 +215,13 @@ function slurp(path) {
}
function slurpwrite(path, data) {
var full_path = writepath + "/" + path
var f = fd.open(full_path, 'w')
fd.write(f, data)
fd.close(f)
var full_path = null
if (write_mount) {
full_path = fd.join_paths(write_mount.source, path)
} else {
full_path = fd.join_paths(".", path)
}
fd.slurpwrite(full_path, data)
}
function exists(path) {
@@ -276,12 +302,25 @@ function rm(path) {
}
function mkdir(path) {
var full = fd.join_paths(writepath, path)
var full = null
if (write_mount) {
full = fd.join_paths(write_mount.source, path)
} else {
full = fd.join_paths(".", path)
}
fd.mkdir(full)
}
function set_writepath(path) {
writepath = path
function set_writepath(mount_name) {
var found = null
if (mount_name == null) { write_mount = null; return }
arrfor(mounts, function(m) {
if (m.name == mount_name) { found = m; return true }
}, false, true)
if (!found || found.type != 'fs') {
log.error("writepath: must be an fs mount"); disrupt
}
write_mount = found
}
function basedir() {
@@ -449,6 +488,82 @@ function globfs(globs, _dir) {
return results
}
// Requestor factory: returns a requestor for reading a file at path
function get(path) {
return function get_requestor(callback, value) {
var res = resolve(path, false)
var full = null
var f = null
var acc = null
var cancelled = false
var data = null
var _close = null
if (!res) { callback(null, "not found: " + path); return }
if (res.mount.type == 'zip') {
callback(res.mount.handle.slurp(res.path))
return
}
if (res.mount.type == 'qop') {
data = res.mount.handle.read(res.path)
if (data) {
callback(data)
} else {
callback(null, "not found in qop: " + path)
}
return
}
if (res.mount.type == 'http') {
res.mount.handle.get(res.path, callback)
return
}
full = fd.join_paths(res.mount.source, res.path)
f = fd.open(full, 'r')
acc = blob()
function next() {
var chunk = null
if (cancelled) return
chunk = fd.read(f, 65536)
if (length(chunk) == 0) {
fd.close(f)
stone(acc)
callback(acc)
return
}
acc.write_blob(chunk)
$clock(next)
}
next()
return function cancel() {
cancelled = true
_close = function() { fd.close(f) } disruption {}
_close()
}
}
}
// Requestor factory: returns a requestor for writing data to path
function put(path, data) {
return function put_requestor(callback, value) {
var _data = data != null ? data : value
var full = null
var _do = null
if (!write_mount) { callback(null, "no write mount set"); return }
full = fd.join_paths(write_mount.source, path)
_do = function() {
fd.slurpwrite(full, _data)
callback(true)
} disruption {
callback(null, "write failed: " + path)
}
_do()
}
}
cellfs.mount = mount
cellfs.mount_package = mount_package
cellfs.unmount = unmount
@@ -467,7 +582,8 @@ cellfs.writepath = set_writepath
cellfs.basedir = basedir
cellfs.prefdir = prefdir
cellfs.realdir = realdir
cellfs.mount('.')
cellfs.get = get
cellfs.put = put
cellfs.resolve = resolve
return cellfs

View File

@@ -761,6 +761,22 @@ JSC_CCALL(fd_readlink,
#endif
)
JSC_CCALL(fd_on_readable,
int fd = js2fd(js, argv[0]);
if (fd < 0) return JS_EXCEPTION;
if (!JS_IsFunction(argv[1]))
return JS_RaiseDisrupt(js, "on_readable: callback must be a function");
actor_watch_readable(js, fd, argv[1]);
return JS_NULL;
)
JSC_CCALL(fd_unwatch,
int fd = js2fd(js, argv[0]);
if (fd < 0) return JS_EXCEPTION;
actor_unwatch(js, fd);
return JS_NULL;
)
static const JSCFunctionListEntry js_fd_funcs[] = {
MIST_FUNC_DEF(fd, open, 2),
MIST_FUNC_DEF(fd, write, 2),
@@ -787,6 +803,8 @@ static const JSCFunctionListEntry js_fd_funcs[] = {
MIST_FUNC_DEF(fd, symlink, 2),
MIST_FUNC_DEF(fd, realpath, 1),
MIST_FUNC_DEF(fd, readlink, 1),
MIST_FUNC_DEF(fd, on_readable, 2),
MIST_FUNC_DEF(fd, unwatch, 1),
};
JSValue js_core_internal_fd_use(JSContext *js) {