Merge branch 'async_fd'

This commit is contained in:
2026-02-25 17:43:12 -06:00
4 changed files with 483 additions and 252 deletions

View File

@@ -142,7 +142,7 @@ function mount(source, name) {
base_url: source,
get: function(path, callback) {
var url = source + '/' + path
$clock(function() {
$clock(function(_t) {
var resp = http.request('GET', url, null, null)
if (resp && resp.status == 200) {
callback(resp.body)
@@ -523,7 +523,7 @@ function get(path) {
f = fd.open(full, 'r')
acc = blob()
function next() {
function next(_t) {
var chunk = null
if (cancelled) return
chunk = fd.read(f, 65536)

View File

@@ -820,6 +820,25 @@ function create_actor(desc) {
var $_ = {}
// Forward-declare actor system variables so closures in $_ can capture them.
// Initialized here; values used at runtime when fully set up.
var time = null
var enet = null
var HEADER = {}
var underlings = {}
var overling = null
var root = null
var receive_fn = null
var greeters = {}
var message_queue = []
var couplings = {}
var peers = {}
var id_address = {}
var peer_queue = {}
var portal = null
var portal_fn = null
var replies = {}
use_cache['core/json'] = json
// Runtime env: passed to package modules via shop's inject_env.
@@ -859,6 +878,205 @@ core_extras.parallel = parallel
core_extras.race = race
core_extras.sequence = sequence
// Set actor identity before shop loads so $self is available
if (!_cell.args.id) _cell.id = guid()
else _cell.id = _cell.args.id
$_.self = create_actor({id: _cell.id})
overling = _cell.args.overling_id ? create_actor({id: _cell.args.overling_id}) : null
$_.overling = overling
root = _cell.args.root_id ? create_actor({id: _cell.args.root_id}) : null
if (root == null) root = $_.self
// Define all actor intrinsic functions ($clock, $delay, etc.) before shop loads.
// Closures here capture module-level variables by reference; those variables
// are fully initialized before these functions are ever called at runtime.
$_.clock = function(fn) {
actor_mod.clock(_ => {
fn(time.number())
send_messages()
})
}
$_.delay = function delay(fn, seconds) {
var _seconds = seconds == null ? 0 : seconds
function delay_turn() {
fn()
send_messages()
}
var id = actor_mod.delay(delay_turn, _seconds)
return function() { actor_mod.removetimer(id) }
}
$_.stop = function stop(actor) {
if (!actor) {
need_stop = true
return
}
if (!is_actor(actor)) {
log.error('Can only call stop on an actor.')
disrupt
}
if (is_null(underlings[actor[ACTORDATA].id])) {
log.error('Can only call stop on an underling or self.')
disrupt
}
sys_msg(actor, {kind:"stop"})
}
$_.start = function start(cb, program) {
if (!program) return
var id = guid()
var oid = $_.self[ACTORDATA].id
var startup = {
id,
overling_id: oid,
root_id: root ? root[ACTORDATA].id : null,
program,
native_mode: native_mode,
no_warn: _no_warn,
}
greeters[id] = cb
push(message_queue, { startup })
}
$_.receiver = function receiver(fn) {
receive_fn = fn
}
$_.unneeded = function unneeded(fn, seconds) {
actor_mod.unneeded(fn, seconds)
}
$_.couple = function couple(actor) {
if (actor == $_.self) return
couplings[actor[ACTORDATA].id] = true
sys_msg(actor, {kind:'couple', from_id: _cell.id})
log.system(`coupled to ${actor[ACTORDATA].id}`)
}
$_.contact = function(callback, record) {
send(create_actor(record), record, callback)
}
$_.portal = function(fn, port) {
if (portal) {
log.error(`Already started a portal listening on ${portal.port()}`)
disrupt
}
if (!port) {
log.error("Requires a valid port.")
disrupt
}
log.system(`starting a portal on port ${port}`)
portal = enet.create_host({address: "any", port})
portal_fn = fn
enet_check()
}
$_.connection = function(callback, actor, config) {
var peer = peers[actor[ACTORDATA].id]
if (peer) {
callback(peer_connection(peer))
return
}
if (actor_mod.mailbox_exist(actor[ACTORDATA].id)) {
callback({type:"local"})
return
}
callback()
}
$_.time_limit = function(requestor, seconds) {
if (!pronto.is_requestor(requestor)) {
log.error('time_limit: first argument must be a requestor')
disrupt
}
if (!is_number(seconds) || seconds <= 0) {
log.error('time_limit: seconds must be a positive number')
disrupt
}
return function time_limit_requestor(callback, value) {
pronto.check_callback(callback, 'time_limit')
var finished = false
var requestor_cancel = null
var timer_cancel = null
function cancel(reason) {
if (finished) return
finished = true
if (timer_cancel) {
timer_cancel()
timer_cancel = null
}
if (requestor_cancel) {
requestor_cancel(reason)
requestor_cancel = null
}
}
function safe_cancel_requestor(reason) {
if (requestor_cancel) {
requestor_cancel(reason)
requestor_cancel = null
}
}
timer_cancel = $_.delay(function() {
if (finished) return
def reason = {
factory: time_limit_requestor,
excuse: 'Timeout.',
evidence: seconds,
message: 'Timeout. ' + text(seconds)
}
safe_cancel_requestor(reason)
finished = true
callback(null, reason)
}, seconds)
function do_request() {
requestor_cancel = requestor(function(val, reason) {
if (finished) return
finished = true
if (timer_cancel) {
timer_cancel()
timer_cancel = null
}
callback(val, reason)
}, value)
} disruption {
cancel('requestor failed')
callback(null, 'requestor failed')
}
do_request()
return function(reason) {
safe_cancel_requestor(reason)
}
}
}
// Make actor intrinsics available to core modules loaded via use_core
core_extras['$self'] = $_.self
core_extras['$overling'] = $_.overling
core_extras['$clock'] = $_.clock
core_extras['$delay'] = $_.delay
core_extras['$start'] = $_.start
core_extras['$stop'] = $_.stop
core_extras['$receiver'] = $_.receiver
core_extras['$contact'] = $_.contact
core_extras['$portal'] = $_.portal
core_extras['$time_limit'] = $_.time_limit
core_extras['$couple'] = $_.couple
core_extras['$unneeded'] = $_.unneeded
core_extras['$connection'] = $_.connection
core_extras['$fd'] = fd
// NOW load shop -- it receives all of the above via env
var shop = use_core('internal/shop')
use_core('build')
@@ -878,7 +1096,7 @@ _summary_resolver = function(path, ctx) {
return summary_fn()
}
var time = use_core('time')
time = use_core('time')
var toml = use_core('toml')
// --- Logging system (full version) ---
@@ -1097,73 +1315,6 @@ runtime_env.sequence = sequence
// Make runtime functions available to modules loaded via use_core
arrfor(array(runtime_env), function(k) { core_extras[k] = runtime_env[k] })
$_.time_limit = function(requestor, seconds)
{
if (!pronto.is_requestor(requestor)) {
log.error('time_limit: first argument must be a requestor')
disrupt
}
if (!is_number(seconds) || seconds <= 0) {
log.error('time_limit: seconds must be a positive number')
disrupt
}
return function time_limit_requestor(callback, value) {
pronto.check_callback(callback, 'time_limit')
var finished = false
var requestor_cancel = null
var timer_cancel = null
function cancel(reason) {
if (finished) return
finished = true
if (timer_cancel) {
timer_cancel()
timer_cancel = null
}
if (requestor_cancel) {
requestor_cancel(reason)
requestor_cancel = null
}
}
function safe_cancel_requestor(reason) {
if (requestor_cancel) {
requestor_cancel(reason)
requestor_cancel = null
}
}
timer_cancel = $_.delay(function() {
if (finished) return
def reason = make_reason(factory, 'Timeout.', seconds)
safe_cancel_requestor(reason)
finished = true
callback(null, reason)
}, seconds)
function do_request() {
requestor_cancel = requestor(function(val, reason) {
if (finished) return
finished = true
if (timer_cancel) {
timer_cancel()
timer_cancel = null
}
callback(val, reason)
}, value)
} disruption {
cancel('requestor failed')
callback(null, 'requestor failed')
}
do_request()
return function(reason) {
safe_cancel_requestor(reason)
}
}
}
var config = {
ar_timer: 60,
actor_memory:0,
@@ -1210,31 +1361,8 @@ function guid(bits)
return text(guid,'h')
}
var HEADER = {}
// takes a function input value that will eventually be called with the current time in number form.
$_.clock = function(fn) {
actor_mod.clock(_ => {
fn(time.number())
send_messages()
})
}
var underlings = {} // this is more like "all actors that are notified when we die"
var overling = null
var root = null
var receive_fn = null
var greeters = {} // Router functions for when messages are received for a specific actor
var enet = use_core('internal/enet')
var peers = {}
var id_address = {}
var peer_queue = {}
var portal = null
var portal_fn = null
var enet = use_core('enet')
enet = use_core('internal/enet')
enet = use_core('enet')
function peer_connection(peer) {
return {
@@ -1259,37 +1387,6 @@ function peer_connection(peer) {
}
}
// takes a callback function, an actor object, and a configuration record for getting information about the status of a connection to the actor. The configuration record is used to request the sort of information that needs to be communicated. This can include latency, bandwidth, activity, congestion, cost, partitions. The callback is given a record containing the requested information.
$_.connection = function(callback, actor, config) {
var peer = peers[actor[ACTORDATA].id]
if (peer) {
callback(peer_connection(peer))
return
}
if (actor_mod.mailbox_exist(actor[ACTORDATA].id)) {
callback({type:"local"})
return
}
callback()
}
// takes a function input value that will eventually be called with the current time in number form.
$_.portal = function(fn, port) {
if (portal) {
log.error(`Already started a portal listening on ${portal.port()}`)
disrupt
}
if (!port) {
log.error("Requires a valid port.")
disrupt
}
log.system(`starting a portal on port ${port}`)
portal = enet.create_host({address: "any", port})
portal_fn = fn
enet_check()
}
function handle_host(e) {
var queue = null
var data = null
@@ -1337,78 +1434,6 @@ function populate_actor_addresses(obj, e) {
})
}
// takes a callback function, an actor object, and a configuration record for getting information about the status of a connection to the actor. The configuration record is used to request the sort of information that needs to be communicated. This can include latency, bandwidth, activity, congestion, cost, partitions. The callback is given a record containing the requested information.
$_.contact = function(callback, record) {
send(create_actor(record), record, callback)
}
// registers a function that will receive all messages...
$_.receiver = function receiver(fn) {
receive_fn = fn
}
// Holds all messages queued during the current turn.
var message_queue = []
$_.start = function start(cb, program) {
if (!program) return
var id = guid()
var oid = $_.self[ACTORDATA].id
var startup = {
id,
overling_id: oid,
root_id: root ? root[ACTORDATA].id : null,
program,
native_mode: native_mode,
no_warn: _no_warn,
}
greeters[id] = cb
push(message_queue, { startup })
}
// stops an underling or self.
$_.stop = function stop(actor) {
if (!actor) {
need_stop = true
return
}
if (!is_actor(actor)) {
log.error('Can only call stop on an actor.')
disrupt
}
if (is_null(underlings[actor[ACTORDATA].id])) {
log.error('Can only call stop on an underling or self.')
disrupt
}
sys_msg(actor, {kind:"stop"})
}
// schedules the removal of an actor after a specified amount of time.
$_.unneeded = function unneeded(fn, seconds) {
actor_mod.unneeded(fn, seconds)
}
// schedules the invocation of a function after a specified amount of time.
$_.delay = function delay(fn, seconds) {
var _seconds = seconds == null ? 0 : seconds
function delay_turn() {
fn()
send_messages()
}
var id = actor_mod.delay(delay_turn, _seconds)
return function() { actor_mod.removetimer(id) }
}
// causes this actor to stop when another actor stops.
var couplings = {}
$_.couple = function couple(actor) {
if (actor == $_.self) return // can't couple to self
couplings[actor[ACTORDATA].id] = true
sys_msg(actor, {kind:'couple', from_id: _cell.id})
log.system(`coupled to ${actor[ACTORDATA].id}`)
}
function actor_prep(actor, send) {
push(message_queue, {actor,send});
@@ -1497,8 +1522,6 @@ function send_messages() {
message_queue = []
}
var replies = {}
function send(actor, message, reply) {
var send_msg = null
var target = null
@@ -1547,11 +1570,6 @@ function send(actor, message, reply) {
stone(send)
if (!_cell.args.id) _cell.id = guid()
else _cell.id = _cell.args.id
$_.self = create_actor({id: _cell.id})
// Actor's timeslice for processing a single message
function turn(msg)
{
@@ -1569,12 +1587,6 @@ if (config.actor_memory)
if (config.stack_max)
js.max_stacksize(config.system.stack_max);
overling = _cell.args.overling_id ? create_actor({id: _cell.args.overling_id}) : null
$_.overling = overling
root = _cell.args.root_id ? create_actor({id: _cell.args.root_id}) : null
if (root == null) root = $_.self
if (overling) {
$_.couple(overling)
report_to_overling({type:'greet', actor_id: _cell.id})

251
tests/cellfs_test.ce Normal file
View File

@@ -0,0 +1,251 @@
// Test: cellfs mounting, sync access, and async requestors
//
// Known limitation:
// - is_directory() uses raw path instead of res.path for fs mounts,
// so @-prefixed paths fail (e.g. '@mount/dir'). Works via stat().
// - cellfs.slurp() has no http code path; use cellfs.get() for http mounts.
var cellfs = use('cellfs')
var fd = use('fd')
var pkg_dir = '.cell/packages/gitea.pockle.world/john/prosperon'
// Mount the prosperon package directory as 'prosperon'
cellfs.mount(pkg_dir, 'prosperon')
// --- exists ---
var found = cellfs.exists('@prosperon/color.cm')
if (!found) {
log.error("exists('@prosperon/color.cm') returned false")
disrupt
}
log.console("exists: ok")
// exists returns false for missing files
var missing = cellfs.exists('@prosperon/no_such_file.cm')
if (missing) {
log.error("exists returned true for missing file")
disrupt
}
log.console("exists (missing): ok")
// --- slurp ---
var data = cellfs.slurp('@prosperon/color.cm')
if (!is_blob(data)) {
log.error("slurp did not return a blob")
disrupt
}
if (length(data) == 0) {
log.error("slurp returned empty blob")
disrupt
}
log.console(`slurp: ok (${length(data)} bits)`)
// --- enumerate ---
var files = cellfs.enumerate('@prosperon', false)
if (!is_array(files)) {
log.error("enumerate did not return an array")
disrupt
}
if (length(files) == 0) {
log.error("enumerate returned empty array")
disrupt
}
// color.cm should be in the listing
var found_color = false
arrfor(files, function(f) {
if (f == 'color.cm') {
found_color = true
return true
}
}, false, true)
if (!found_color) {
log.error("enumerate did not include color.cm")
disrupt
}
log.console(`enumerate: ok (${length(files)} entries, found color.cm)`)
// enumerate recursive
var rfiles = cellfs.enumerate('@prosperon', true)
if (length(rfiles) <= length(files)) {
log.error("recursive enumerate should return more entries")
disrupt
}
log.console(`enumerate recursive: ok (${length(rfiles)} entries)`)
// --- stat ---
var st = cellfs.stat('@prosperon/color.cm')
if (!is_object(st)) {
log.error("stat did not return an object")
disrupt
}
if (st.filesize == null || st.filesize == 0) {
log.error("stat filesize missing or zero")
disrupt
}
log.console(`stat: ok (size=${st.filesize}, mtime=${st.modtime})`)
// stat on a directory
var dir_st = cellfs.stat('@prosperon/docs')
if (!dir_st.isDirectory) {
log.error("stat('@prosperon/docs').isDirectory returned false")
disrupt
}
log.console("stat (directory): ok")
// --- searchpath ---
var sp = cellfs.searchpath()
if (!is_array(sp)) {
log.error("searchpath did not return an array")
disrupt
}
if (length(sp) == 0) {
log.error("searchpath returned empty array")
disrupt
}
log.console(`searchpath: ok (${length(sp)} mounts)`)
// --- resolve ---
var res = cellfs.resolve('@prosperon/color.cm')
if (!is_object(res)) {
log.error("resolve did not return an object")
disrupt
}
if (res.mount.name != 'prosperon') {
log.error("resolve returned wrong mount name")
disrupt
}
if (res.path != 'color.cm') {
log.error("resolve returned wrong path")
disrupt
}
log.console("resolve: ok")
// --- realdir ---
var rd = cellfs.realdir('@prosperon/color.cm')
if (!is_text(rd)) {
log.error("realdir did not return text")
disrupt
}
if (!ends_with(rd, 'color.cm')) {
log.error("realdir does not end with color.cm")
disrupt
}
log.console(`realdir: ok (${rd})`)
// --- unmount and re-mount ---
cellfs.unmount('prosperon')
var after_unmount = cellfs.searchpath()
var unmount_ok = true
arrfor(after_unmount, function(m) {
if (m.name == 'prosperon') {
unmount_ok = false
return true
}
}, false, true)
if (!unmount_ok) {
log.error("unmount failed, mount still present")
disrupt
}
log.console("unmount: ok")
// re-mount for further tests
cellfs.mount(pkg_dir, 'prosperon')
// --- match (wildstar) ---
var m1 = cellfs.match('color.cm', '*.cm')
if (!m1) {
log.error("match('color.cm', '*.cm') returned false")
disrupt
}
var m2 = cellfs.match('color.cm', '*.ce')
if (m2) {
log.error("match('color.cm', '*.ce') returned true")
disrupt
}
log.console("match: ok")
// --- globfs ---
var cm_files = cellfs.globfs(['*.cm'], '@prosperon')
if (!is_array(cm_files)) {
log.error("globfs did not return an array")
disrupt
}
if (length(cm_files) == 0) {
log.error("globfs returned empty array")
disrupt
}
// all results should end in .cm
var all_cm = true
arrfor(cm_files, function(f) {
if (!ends_with(f, '.cm')) {
all_cm = false
return true
}
}, false, true)
if (!all_cm) {
log.error("globfs returned non-.cm files")
disrupt
}
log.console(`globfs: ok (${length(cm_files)} .cm files)`)
log.console("--- sync tests passed ---")
// --- Requestor tests ---
// get requestor for a local fs mount
var get_color = cellfs.get('@prosperon/color.cm')
get_color(function(result, reason) {
if (reason != null) {
log.error(`get color.cm failed: ${reason}`)
disrupt
}
if (!is_blob(result)) {
log.error("get did not return a blob")
disrupt
}
if (length(result) == 0) {
log.error("get returned empty blob")
disrupt
}
log.console(`get (fs): ok (${length(result)} bits)`)
// parallel requestor test - fetch multiple files at once
var get_core = cellfs.get('@prosperon/core.cm')
var get_ease = cellfs.get('@prosperon/ease.cm')
parallel([get_color, get_core, get_ease])(function(results, reason) {
if (reason != null) {
log.error(`parallel get failed: ${reason}`)
disrupt
}
if (length(results) != 3) {
log.error(`parallel expected 3 results, got ${length(results)}`)
disrupt
}
log.console(`parallel get: ok (${length(results)} files fetched)`)
// HTTP mount test — network may not be available in test env
cellfs.mount('http://example.com', 'web')
var web_res = cellfs.resolve('@web/')
if (web_res.mount.type != 'http') {
log.error("http mount type is not 'http'")
disrupt
}
log.console("http mount: ok (type=http)")
var get_web = cellfs.get('@web/')
get_web(function(body, reason) {
if (reason != null) {
log.console(`get (http): skipped (${reason})`)
} else {
log.console(`get (http): ok (${length(body)} bits)`)
}
log.console("--- requestor tests passed ---")
log.console("all cellfs tests passed")
$stop()
})
})
})

View File

@@ -1,5 +1,5 @@
// Test pronto functions
// Tests for fallback, parallel, race, sequence, time_limit, requestorize, objectify
// Tests for fallback, parallel, race, sequence
var test_count = 0
@@ -89,49 +89,17 @@ return {
}, 1000)
},
test_time_limit: function() {
log.console("Testing time_limit...")
var slow_req = make_requestor("slow_req", 0.5, true) // takes 0.5s
var timed_req = time_limit(slow_req, 0.2) // 0.2s limit
test_immediate_requestors: function() {
log.console("Testing immediate requestors...")
var req1 = make_requestor("imm1", 0, true)
var req2 = make_requestor("imm2", 0, true)
timed_req(function(result, reason) {
sequence([req1, req2])(function(result, reason) {
if (result != null) {
log.console(`Time limit succeeded: ${result}`)
log.console(`Immediate sequence result: ${result}`)
} else {
log.console(`Time limit failed: ${reason}`)
log.console(`Immediate sequence failed: ${reason}`)
}
}, 100)
},
test_requestorize: function() {
log.console("Testing requestorize...")
var add_one = function(x) { return x + 1 }
var req = requestorize(add_one)
req(function(result, reason) {
if (result != null) {
log.console(`Requestorize result: ${result}`)
} else {
log.console(`Requestorize failed: ${reason}`)
}
}, 42)
},
test_objectify: function() {
log.console("Testing objectify...")
var req_a = make_requestor("obj_req_a", 0.1, true)
var req_b = make_requestor("obj_req_b", 0.1, true)
var req_c = make_requestor("obj_req_c", 0.1, true)
var parallel_obj = objectify(parallel)
var req = parallel_obj({a: req_a, b: req_b, c: req_c})
req(function(result, reason) {
if (result != null) {
log.console(`Objectify result: ${result}`)
} else {
log.console(`Objectify failed: ${reason}`)
}
}, 1000)
}, 0)
}
}