From ecc1777b24cf636eb7c08fd1eee04f80c7ba2b33 Mon Sep 17 00:00:00 2001 From: John Alanbrook Date: Wed, 25 Feb 2026 17:43:01 -0600 Subject: [PATCH] async cellfs --- cellfs.cm | 4 +- internal/engine.cm | 430 ++++++++++++++++++++++--------------------- tests/cellfs_test.ce | 251 +++++++++++++++++++++++++ tests/pronto.cm | 50 +---- 4 files changed, 483 insertions(+), 252 deletions(-) create mode 100644 tests/cellfs_test.ce diff --git a/cellfs.cm b/cellfs.cm index 7c95b95b..63cd5d24 100644 --- a/cellfs.cm +++ b/cellfs.cm @@ -142,7 +142,7 @@ function mount(source, name) { base_url: source, get: function(path, callback) { var url = source + '/' + path - $clock(function() { + $clock(function(_t) { var resp = http.request('GET', url, null, null) if (resp && resp.status == 200) { callback(resp.body) @@ -523,7 +523,7 @@ function get(path) { f = fd.open(full, 'r') acc = blob() - function next() { + function next(_t) { var chunk = null if (cancelled) return chunk = fd.read(f, 65536) diff --git a/internal/engine.cm b/internal/engine.cm index 7b441c6a..062b48d8 100644 --- a/internal/engine.cm +++ b/internal/engine.cm @@ -820,6 +820,25 @@ function create_actor(desc) { var $_ = {} +// Forward-declare actor system variables so closures in $_ can capture them. +// Initialized here; values used at runtime when fully set up. +var time = null +var enet = null +var HEADER = {} +var underlings = {} +var overling = null +var root = null +var receive_fn = null +var greeters = {} +var message_queue = [] +var couplings = {} +var peers = {} +var id_address = {} +var peer_queue = {} +var portal = null +var portal_fn = null +var replies = {} + use_cache['core/json'] = json // Runtime env: passed to package modules via shop's inject_env. @@ -859,6 +878,205 @@ core_extras.parallel = parallel core_extras.race = race core_extras.sequence = sequence +// Set actor identity before shop loads so $self is available +if (!_cell.args.id) _cell.id = guid() +else _cell.id = _cell.args.id + +$_.self = create_actor({id: _cell.id}) + +overling = _cell.args.overling_id ? create_actor({id: _cell.args.overling_id}) : null +$_.overling = overling + +root = _cell.args.root_id ? create_actor({id: _cell.args.root_id}) : null +if (root == null) root = $_.self + +// Define all actor intrinsic functions ($clock, $delay, etc.) before shop loads. +// Closures here capture module-level variables by reference; those variables +// are fully initialized before these functions are ever called at runtime. + +$_.clock = function(fn) { + actor_mod.clock(_ => { + fn(time.number()) + send_messages() + }) +} + +$_.delay = function delay(fn, seconds) { + var _seconds = seconds == null ? 0 : seconds + function delay_turn() { + fn() + send_messages() + } + var id = actor_mod.delay(delay_turn, _seconds) + return function() { actor_mod.removetimer(id) } +} + +$_.stop = function stop(actor) { + if (!actor) { + need_stop = true + return + } + if (!is_actor(actor)) { + log.error('Can only call stop on an actor.') + disrupt + } + if (is_null(underlings[actor[ACTORDATA].id])) { + log.error('Can only call stop on an underling or self.') + disrupt + } + sys_msg(actor, {kind:"stop"}) +} + +$_.start = function start(cb, program) { + if (!program) return + var id = guid() + var oid = $_.self[ACTORDATA].id + var startup = { + id, + overling_id: oid, + root_id: root ? root[ACTORDATA].id : null, + program, + native_mode: native_mode, + no_warn: _no_warn, + } + greeters[id] = cb + push(message_queue, { startup }) +} + +$_.receiver = function receiver(fn) { + receive_fn = fn +} + +$_.unneeded = function unneeded(fn, seconds) { + actor_mod.unneeded(fn, seconds) +} + +$_.couple = function couple(actor) { + if (actor == $_.self) return + couplings[actor[ACTORDATA].id] = true + sys_msg(actor, {kind:'couple', from_id: _cell.id}) + log.system(`coupled to ${actor[ACTORDATA].id}`) +} + +$_.contact = function(callback, record) { + send(create_actor(record), record, callback) +} + +$_.portal = function(fn, port) { + if (portal) { + log.error(`Already started a portal listening on ${portal.port()}`) + disrupt + } + if (!port) { + log.error("Requires a valid port.") + disrupt + } + log.system(`starting a portal on port ${port}`) + portal = enet.create_host({address: "any", port}) + portal_fn = fn + enet_check() +} + +$_.connection = function(callback, actor, config) { + var peer = peers[actor[ACTORDATA].id] + if (peer) { + callback(peer_connection(peer)) + return + } + if (actor_mod.mailbox_exist(actor[ACTORDATA].id)) { + callback({type:"local"}) + return + } + callback() +} + +$_.time_limit = function(requestor, seconds) { + if (!pronto.is_requestor(requestor)) { + log.error('time_limit: first argument must be a requestor') + disrupt + } + if (!is_number(seconds) || seconds <= 0) { + log.error('time_limit: seconds must be a positive number') + disrupt + } + + return function time_limit_requestor(callback, value) { + pronto.check_callback(callback, 'time_limit') + var finished = false + var requestor_cancel = null + var timer_cancel = null + + function cancel(reason) { + if (finished) return + finished = true + if (timer_cancel) { + timer_cancel() + timer_cancel = null + } + if (requestor_cancel) { + requestor_cancel(reason) + requestor_cancel = null + } + } + + function safe_cancel_requestor(reason) { + if (requestor_cancel) { + requestor_cancel(reason) + requestor_cancel = null + } + } + + timer_cancel = $_.delay(function() { + if (finished) return + def reason = { + factory: time_limit_requestor, + excuse: 'Timeout.', + evidence: seconds, + message: 'Timeout. ' + text(seconds) + } + safe_cancel_requestor(reason) + finished = true + callback(null, reason) + }, seconds) + + function do_request() { + requestor_cancel = requestor(function(val, reason) { + if (finished) return + finished = true + if (timer_cancel) { + timer_cancel() + timer_cancel = null + } + callback(val, reason) + }, value) + } disruption { + cancel('requestor failed') + callback(null, 'requestor failed') + } + do_request() + + return function(reason) { + safe_cancel_requestor(reason) + } + } +} + +// Make actor intrinsics available to core modules loaded via use_core +core_extras['$self'] = $_.self +core_extras['$overling'] = $_.overling +core_extras['$clock'] = $_.clock +core_extras['$delay'] = $_.delay +core_extras['$start'] = $_.start +core_extras['$stop'] = $_.stop +core_extras['$receiver'] = $_.receiver +core_extras['$contact'] = $_.contact +core_extras['$portal'] = $_.portal +core_extras['$time_limit'] = $_.time_limit +core_extras['$couple'] = $_.couple +core_extras['$unneeded'] = $_.unneeded +core_extras['$connection'] = $_.connection +core_extras['$fd'] = fd + // NOW load shop -- it receives all of the above via env var shop = use_core('internal/shop') use_core('build') @@ -878,7 +1096,7 @@ _summary_resolver = function(path, ctx) { return summary_fn() } -var time = use_core('time') +time = use_core('time') var toml = use_core('toml') // --- Logging system (full version) --- @@ -1097,73 +1315,6 @@ runtime_env.sequence = sequence // Make runtime functions available to modules loaded via use_core arrfor(array(runtime_env), function(k) { core_extras[k] = runtime_env[k] }) -$_.time_limit = function(requestor, seconds) -{ - if (!pronto.is_requestor(requestor)) { - log.error('time_limit: first argument must be a requestor') - disrupt - } - if (!is_number(seconds) || seconds <= 0) { - log.error('time_limit: seconds must be a positive number') - disrupt - } - - return function time_limit_requestor(callback, value) { - pronto.check_callback(callback, 'time_limit') - var finished = false - var requestor_cancel = null - var timer_cancel = null - - function cancel(reason) { - if (finished) return - finished = true - if (timer_cancel) { - timer_cancel() - timer_cancel = null - } - if (requestor_cancel) { - requestor_cancel(reason) - requestor_cancel = null - } - } - - function safe_cancel_requestor(reason) { - if (requestor_cancel) { - requestor_cancel(reason) - requestor_cancel = null - } - } - - timer_cancel = $_.delay(function() { - if (finished) return - def reason = make_reason(factory, 'Timeout.', seconds) - safe_cancel_requestor(reason) - finished = true - callback(null, reason) - }, seconds) - - function do_request() { - requestor_cancel = requestor(function(val, reason) { - if (finished) return - finished = true - if (timer_cancel) { - timer_cancel() - timer_cancel = null - } - callback(val, reason) - }, value) - } disruption { - cancel('requestor failed') - callback(null, 'requestor failed') - } - do_request() - - return function(reason) { - safe_cancel_requestor(reason) - } - } -} - var config = { ar_timer: 60, actor_memory:0, @@ -1210,31 +1361,8 @@ function guid(bits) return text(guid,'h') } -var HEADER = {} - -// takes a function input value that will eventually be called with the current time in number form. -$_.clock = function(fn) { - actor_mod.clock(_ => { - fn(time.number()) - send_messages() - }) -} - -var underlings = {} // this is more like "all actors that are notified when we die" -var overling = null -var root = null - -var receive_fn = null -var greeters = {} // Router functions for when messages are received for a specific actor - -var enet = use_core('internal/enet') - -var peers = {} -var id_address = {} -var peer_queue = {} -var portal = null -var portal_fn = null -var enet = use_core('enet') +enet = use_core('internal/enet') +enet = use_core('enet') function peer_connection(peer) { return { @@ -1259,37 +1387,6 @@ function peer_connection(peer) { } } -// takes a callback function, an actor object, and a configuration record for getting information about the status of a connection to the actor. The configuration record is used to request the sort of information that needs to be communicated. This can include latency, bandwidth, activity, congestion, cost, partitions. The callback is given a record containing the requested information. -$_.connection = function(callback, actor, config) { - var peer = peers[actor[ACTORDATA].id] - if (peer) { - callback(peer_connection(peer)) - return - } - if (actor_mod.mailbox_exist(actor[ACTORDATA].id)) { - callback({type:"local"}) - return - } - - callback() -} - -// takes a function input value that will eventually be called with the current time in number form. -$_.portal = function(fn, port) { - if (portal) { - log.error(`Already started a portal listening on ${portal.port()}`) - disrupt - } - if (!port) { - log.error("Requires a valid port.") - disrupt - } - log.system(`starting a portal on port ${port}`) - portal = enet.create_host({address: "any", port}) - portal_fn = fn - enet_check() -} - function handle_host(e) { var queue = null var data = null @@ -1337,78 +1434,6 @@ function populate_actor_addresses(obj, e) { }) } -// takes a callback function, an actor object, and a configuration record for getting information about the status of a connection to the actor. The configuration record is used to request the sort of information that needs to be communicated. This can include latency, bandwidth, activity, congestion, cost, partitions. The callback is given a record containing the requested information. -$_.contact = function(callback, record) { - send(create_actor(record), record, callback) -} - -// registers a function that will receive all messages... -$_.receiver = function receiver(fn) { - receive_fn = fn -} - -// Holds all messages queued during the current turn. -var message_queue = [] - -$_.start = function start(cb, program) { - if (!program) return - - var id = guid() - var oid = $_.self[ACTORDATA].id - var startup = { - id, - overling_id: oid, - root_id: root ? root[ACTORDATA].id : null, - program, - native_mode: native_mode, - no_warn: _no_warn, - } - greeters[id] = cb - push(message_queue, { startup }) -} - -// stops an underling or self. -$_.stop = function stop(actor) { - if (!actor) { - need_stop = true - return - } - if (!is_actor(actor)) { - log.error('Can only call stop on an actor.') - disrupt - } - if (is_null(underlings[actor[ACTORDATA].id])) { - log.error('Can only call stop on an underling or self.') - disrupt - } - - sys_msg(actor, {kind:"stop"}) -} - -// schedules the removal of an actor after a specified amount of time. -$_.unneeded = function unneeded(fn, seconds) { - actor_mod.unneeded(fn, seconds) -} - -// schedules the invocation of a function after a specified amount of time. -$_.delay = function delay(fn, seconds) { - var _seconds = seconds == null ? 0 : seconds - function delay_turn() { - fn() - send_messages() - } - var id = actor_mod.delay(delay_turn, _seconds) - return function() { actor_mod.removetimer(id) } -} - -// causes this actor to stop when another actor stops. -var couplings = {} -$_.couple = function couple(actor) { - if (actor == $_.self) return // can't couple to self - couplings[actor[ACTORDATA].id] = true - sys_msg(actor, {kind:'couple', from_id: _cell.id}) - log.system(`coupled to ${actor[ACTORDATA].id}`) -} function actor_prep(actor, send) { push(message_queue, {actor,send}); @@ -1497,8 +1522,6 @@ function send_messages() { message_queue = [] } -var replies = {} - function send(actor, message, reply) { var send_msg = null var target = null @@ -1547,11 +1570,6 @@ function send(actor, message, reply) { stone(send) -if (!_cell.args.id) _cell.id = guid() -else _cell.id = _cell.args.id - -$_.self = create_actor({id: _cell.id}) - // Actor's timeslice for processing a single message function turn(msg) { @@ -1569,12 +1587,6 @@ if (config.actor_memory) if (config.stack_max) js.max_stacksize(config.system.stack_max); -overling = _cell.args.overling_id ? create_actor({id: _cell.args.overling_id}) : null -$_.overling = overling - -root = _cell.args.root_id ? create_actor({id: _cell.args.root_id}) : null -if (root == null) root = $_.self - if (overling) { $_.couple(overling) report_to_overling({type:'greet', actor_id: _cell.id}) diff --git a/tests/cellfs_test.ce b/tests/cellfs_test.ce new file mode 100644 index 00000000..3c3e4f9f --- /dev/null +++ b/tests/cellfs_test.ce @@ -0,0 +1,251 @@ +// Test: cellfs mounting, sync access, and async requestors +// +// Known limitation: +// - is_directory() uses raw path instead of res.path for fs mounts, +// so @-prefixed paths fail (e.g. '@mount/dir'). Works via stat(). +// - cellfs.slurp() has no http code path; use cellfs.get() for http mounts. + +var cellfs = use('cellfs') +var fd = use('fd') + +var pkg_dir = '.cell/packages/gitea.pockle.world/john/prosperon' + +// Mount the prosperon package directory as 'prosperon' +cellfs.mount(pkg_dir, 'prosperon') + +// --- exists --- +var found = cellfs.exists('@prosperon/color.cm') +if (!found) { + log.error("exists('@prosperon/color.cm') returned false") + disrupt +} +log.console("exists: ok") + +// exists returns false for missing files +var missing = cellfs.exists('@prosperon/no_such_file.cm') +if (missing) { + log.error("exists returned true for missing file") + disrupt +} +log.console("exists (missing): ok") + +// --- slurp --- +var data = cellfs.slurp('@prosperon/color.cm') +if (!is_blob(data)) { + log.error("slurp did not return a blob") + disrupt +} +if (length(data) == 0) { + log.error("slurp returned empty blob") + disrupt +} +log.console(`slurp: ok (${length(data)} bits)`) + +// --- enumerate --- +var files = cellfs.enumerate('@prosperon', false) +if (!is_array(files)) { + log.error("enumerate did not return an array") + disrupt +} +if (length(files) == 0) { + log.error("enumerate returned empty array") + disrupt +} +// color.cm should be in the listing +var found_color = false +arrfor(files, function(f) { + if (f == 'color.cm') { + found_color = true + return true + } +}, false, true) +if (!found_color) { + log.error("enumerate did not include color.cm") + disrupt +} +log.console(`enumerate: ok (${length(files)} entries, found color.cm)`) + +// enumerate recursive +var rfiles = cellfs.enumerate('@prosperon', true) +if (length(rfiles) <= length(files)) { + log.error("recursive enumerate should return more entries") + disrupt +} +log.console(`enumerate recursive: ok (${length(rfiles)} entries)`) + +// --- stat --- +var st = cellfs.stat('@prosperon/color.cm') +if (!is_object(st)) { + log.error("stat did not return an object") + disrupt +} +if (st.filesize == null || st.filesize == 0) { + log.error("stat filesize missing or zero") + disrupt +} +log.console(`stat: ok (size=${st.filesize}, mtime=${st.modtime})`) + +// stat on a directory +var dir_st = cellfs.stat('@prosperon/docs') +if (!dir_st.isDirectory) { + log.error("stat('@prosperon/docs').isDirectory returned false") + disrupt +} +log.console("stat (directory): ok") + +// --- searchpath --- +var sp = cellfs.searchpath() +if (!is_array(sp)) { + log.error("searchpath did not return an array") + disrupt +} +if (length(sp) == 0) { + log.error("searchpath returned empty array") + disrupt +} +log.console(`searchpath: ok (${length(sp)} mounts)`) + +// --- resolve --- +var res = cellfs.resolve('@prosperon/color.cm') +if (!is_object(res)) { + log.error("resolve did not return an object") + disrupt +} +if (res.mount.name != 'prosperon') { + log.error("resolve returned wrong mount name") + disrupt +} +if (res.path != 'color.cm') { + log.error("resolve returned wrong path") + disrupt +} +log.console("resolve: ok") + +// --- realdir --- +var rd = cellfs.realdir('@prosperon/color.cm') +if (!is_text(rd)) { + log.error("realdir did not return text") + disrupt +} +if (!ends_with(rd, 'color.cm')) { + log.error("realdir does not end with color.cm") + disrupt +} +log.console(`realdir: ok (${rd})`) + +// --- unmount and re-mount --- +cellfs.unmount('prosperon') +var after_unmount = cellfs.searchpath() +var unmount_ok = true +arrfor(after_unmount, function(m) { + if (m.name == 'prosperon') { + unmount_ok = false + return true + } +}, false, true) +if (!unmount_ok) { + log.error("unmount failed, mount still present") + disrupt +} +log.console("unmount: ok") + +// re-mount for further tests +cellfs.mount(pkg_dir, 'prosperon') + +// --- match (wildstar) --- +var m1 = cellfs.match('color.cm', '*.cm') +if (!m1) { + log.error("match('color.cm', '*.cm') returned false") + disrupt +} +var m2 = cellfs.match('color.cm', '*.ce') +if (m2) { + log.error("match('color.cm', '*.ce') returned true") + disrupt +} +log.console("match: ok") + +// --- globfs --- +var cm_files = cellfs.globfs(['*.cm'], '@prosperon') +if (!is_array(cm_files)) { + log.error("globfs did not return an array") + disrupt +} +if (length(cm_files) == 0) { + log.error("globfs returned empty array") + disrupt +} +// all results should end in .cm +var all_cm = true +arrfor(cm_files, function(f) { + if (!ends_with(f, '.cm')) { + all_cm = false + return true + } +}, false, true) +if (!all_cm) { + log.error("globfs returned non-.cm files") + disrupt +} +log.console(`globfs: ok (${length(cm_files)} .cm files)`) + +log.console("--- sync tests passed ---") + +// --- Requestor tests --- + +// get requestor for a local fs mount +var get_color = cellfs.get('@prosperon/color.cm') + +get_color(function(result, reason) { + if (reason != null) { + log.error(`get color.cm failed: ${reason}`) + disrupt + } + if (!is_blob(result)) { + log.error("get did not return a blob") + disrupt + } + if (length(result) == 0) { + log.error("get returned empty blob") + disrupt + } + log.console(`get (fs): ok (${length(result)} bits)`) + + // parallel requestor test - fetch multiple files at once + var get_core = cellfs.get('@prosperon/core.cm') + var get_ease = cellfs.get('@prosperon/ease.cm') + + parallel([get_color, get_core, get_ease])(function(results, reason) { + if (reason != null) { + log.error(`parallel get failed: ${reason}`) + disrupt + } + if (length(results) != 3) { + log.error(`parallel expected 3 results, got ${length(results)}`) + disrupt + } + log.console(`parallel get: ok (${length(results)} files fetched)`) + + // HTTP mount test — network may not be available in test env + cellfs.mount('http://example.com', 'web') + var web_res = cellfs.resolve('@web/') + if (web_res.mount.type != 'http') { + log.error("http mount type is not 'http'") + disrupt + } + log.console("http mount: ok (type=http)") + + var get_web = cellfs.get('@web/') + get_web(function(body, reason) { + if (reason != null) { + log.console(`get (http): skipped (${reason})`) + } else { + log.console(`get (http): ok (${length(body)} bits)`) + } + + log.console("--- requestor tests passed ---") + log.console("all cellfs tests passed") + $stop() + }) + }) +}) diff --git a/tests/pronto.cm b/tests/pronto.cm index 8cff0e82..3c69794f 100644 --- a/tests/pronto.cm +++ b/tests/pronto.cm @@ -1,5 +1,5 @@ // Test pronto functions -// Tests for fallback, parallel, race, sequence, time_limit, requestorize, objectify +// Tests for fallback, parallel, race, sequence var test_count = 0 @@ -89,49 +89,17 @@ return { }, 1000) }, - test_time_limit: function() { - log.console("Testing time_limit...") - var slow_req = make_requestor("slow_req", 0.5, true) // takes 0.5s - var timed_req = time_limit(slow_req, 0.2) // 0.2s limit + test_immediate_requestors: function() { + log.console("Testing immediate requestors...") + var req1 = make_requestor("imm1", 0, true) + var req2 = make_requestor("imm2", 0, true) - timed_req(function(result, reason) { + sequence([req1, req2])(function(result, reason) { if (result != null) { - log.console(`Time limit succeeded: ${result}`) + log.console(`Immediate sequence result: ${result}`) } else { - log.console(`Time limit failed: ${reason}`) + log.console(`Immediate sequence failed: ${reason}`) } - }, 100) - }, - - test_requestorize: function() { - log.console("Testing requestorize...") - var add_one = function(x) { return x + 1 } - var req = requestorize(add_one) - - req(function(result, reason) { - if (result != null) { - log.console(`Requestorize result: ${result}`) - } else { - log.console(`Requestorize failed: ${reason}`) - } - }, 42) - }, - - test_objectify: function() { - log.console("Testing objectify...") - var req_a = make_requestor("obj_req_a", 0.1, true) - var req_b = make_requestor("obj_req_b", 0.1, true) - var req_c = make_requestor("obj_req_c", 0.1, true) - - var parallel_obj = objectify(parallel) - var req = parallel_obj({a: req_a, b: req_b, c: req_c}) - - req(function(result, reason) { - if (result != null) { - log.console(`Objectify result: ${result}`) - } else { - log.console(`Objectify failed: ${reason}`) - } - }, 1000) + }, 0) } } \ No newline at end of file