Files
cell/scripts/engine.cm
2025-11-24 23:08:40 -06:00

1113 lines
32 KiB
Plaintext

(function engine() {
globalThis.cell = prosperon
cell.DOC = Symbol()
var ACTORDATA = cell.hidden.actorsym
var SYSYM = '__SYSTEM__'
var ENETSERVICE = 0.1
var REPLYTIMEOUT = 60 // seconds before replies are ignored
var MOD_EXT = '.cm'
var ACTOR_EXT = '.ce'
globalThis.pi = 3.1415926535897932
function caller_data(depth = 0)
{
var file = "nofile"
var line = 0
var caller = new Error().stack.split("\n")[1+depth]
if (caller) {
var md = caller.match(/\((.*)\:/)
var m = md ? md[1] : "SCRIPT"
if (m) file = m
md = caller.match(/\:(\d*)\)/)
m = md ? md[1] : 0
if (m) line = m
}
return {file,line}
}
cell.args = cell.hidden.init
cell.args ??= {}
cell.id ??= "newguy"
function console_rec(line, file, msg) {
return `[${cell.id.slice(0,5)}] [${file}:${line}]: ${msg}\n`
// time: [${time.text("mb d yyyy h:nn:ss")}]
}
var console_mod = cell.hidden.console
globalThis.log = {}
log.console = function(msg)
{
var caller = caller_data(1)
console_mod.print(console_rec(caller.line, caller.file, msg))
}
log.error = function(msg = new Error())
{
var caller = caller_data(1)
if (msg instanceof Error)
msg = msg + "\n" + msg.stack
console_mod.print(console_rec(caller.line,caller.file,msg))
}
log.system = function(msg) {
msg = "[SYSTEM] " + msg
log.console(msg)
}
// Get hidden modules from cell.hidden before stripping it
var hidden = cell.hidden
var actor_mod = hidden.actor
var wota = hidden.wota
var use_embed = hidden.use_embed
var use_dyn = hidden.use_dyn
var enet = hidden.enet
var nota = hidden.nota
var fd = use_embed('fd')
log.console(fd.getcwd())
log.console(cell.args.program)
var shop_path = '.cell'
var mod_path = '.cell/modules/'
if (!fd.stat('.cell').isDirectory) {
log.console("No cell directory found. Make one.\n");
os.exit(1);
}
function is_file(path) {
try {
var st = fd.stat(path)
return st.isFile
} catch {
return false
}
}
function write_file(path, blob) {
var fd_handle = fd.open(path, 'w')
fd.write(fd_handle, blob)
fd.close(fd_handle)
}
function mkdir_p(dir) {
if (dir == '' || dir == '.') return
try { fd.stat(dir) } catch {
mkdir_p(dir.substring(0, dir.lastIndexOf('/')))
fd.mkdir(dir)
}
}
// Wota decode timing tracking
var wota_decode_times = []
var last_wota_flush = 0
// Strip hidden from cell so nothing else can access it
// But first, list files in the core QOP package
var qop = use_embed('qop')
var core_qop = qop.open(hidden.core_qop_blob)
var utf8 = use_embed('utf8')
delete cell.hidden
function disrupt(err)
{
if (overling) {
if (err) {
// with an err, this is a forceful disrupt
var reason = (err instanceof Error) ? err.stack : err
report_to_overling({type:'disrupt', reason})
} else
report_to_overling({type:'stop'})
}
if (underlings) {
for (var id of underlings) {
log.console(`calling on ${id} to disrupt too`)
$_.stop(create_actor({id}))
}
}
if (err) {
log.console(err);
if (err.stack)
log.console(err.stack)
}
actor_mod.disrupt()
}
actor_mod.on_exception(disrupt)
var js = use_embed('js')
var io = use_embed('io')
var use_cache = {}
var BASEPATH = 'base' + MOD_EXT
var script = utf8.decode(core_qop.read(BASEPATH))
var fnname = "base"
script = `(function ${fnname}() { ${script}; })`
js.eval(BASEPATH, script)()
var inProgress = {}
var loadingStack = []
// Track current package context for nested use() calls
var current_package = null
// Get package name from a resolved path
function get_package_from_path(path) {
if (!path) return null
var modules_prefix = '.cell/modules/'
if (path.startsWith(modules_prefix)) {
var rest = path.substring(modules_prefix.length)
var slash_idx = rest.indexOf('/')
if (slash_idx > 0) {
return rest.substring(0, slash_idx)
}
}
return null
}
// Config is loaded later, but we need to access it in resolve_module_path
// This will be set after shop.load_config() is called
var config = null
// Resolve actor program path with package awareness
// Resolution order:
// 1. Current package (root project when pkg_context is null)
// 2. Declared dependencies (from cell.toml)
// 3. core_qop (standard library)
function resolve_actor_path(requested, pkg_context) {
var dependencies = (config && config.dependencies) ? config.dependencies : {}
// Step 1: current package
if (pkg_context) {
var pkg_actor_path = '.cell/modules/' + pkg_context + '/' + requested + ACTOR_EXT
if (is_file(pkg_actor_path)) {
return { path: pkg_actor_path, package_name: pkg_context, isCore: false }
}
// Check if package is locally replaced
if (config && config.replace && config.replace[pkg_context]) {
var replace_path = config.replace[pkg_context]
var full_path = replace_path + '/' + requested + ACTOR_EXT
if (is_file(full_path)) {
return { path: full_path, package_name: pkg_context, isCore: false }
}
}
} else {
var project_actor_path = requested + ACTOR_EXT
if (is_file(project_actor_path)) {
return { path: project_actor_path, package_name: null, isCore: false }
}
}
// Step 2: dependencies (explicit alias first) and replace directives
if (requested.includes('/')) {
var actor_parts = requested.split('/')
var actor_pkg_alias = actor_parts[0]
var actor_sub_path = actor_parts.slice(1).join('/')
// Check for replace directive first
if (config && config.replace && config.replace[actor_pkg_alias]) {
var replace_path = config.replace[actor_pkg_alias]
var full_path = replace_path + '/' + (actor_sub_path || actor_pkg_alias) + ACTOR_EXT
if (is_file(full_path)) {
return { path: full_path, package_name: actor_pkg_alias, isCore: false }
}
} else if (dependencies[actor_pkg_alias]) {
var dep_actor_path = '.cell/modules/' + actor_pkg_alias + '/' + actor_sub_path + ACTOR_EXT
if (is_file(dep_actor_path)) {
return { path: dep_actor_path, package_name: actor_pkg_alias, isCore: false }
}
}
} else {
// Check replace directives for simple actor names
if (config && config.replace && config.replace[requested]) {
var replace_path = config.replace[requested]
var full_path = replace_path + '/' + requested + ACTOR_EXT
if (is_file(full_path)) {
return { path: full_path, package_name: requested, isCore: false }
}
}
// Check dependencies for simple actor names
for (var actor_alias in dependencies) {
var dep_actor_simple = '.cell/modules/' + actor_alias + '/' + requested + ACTOR_EXT
if (is_file(dep_actor_simple)) {
return { path: dep_actor_simple, package_name: actor_alias, isCore: false }
}
}
}
// Step 3: core
try {
core_qop.read(requested + ACTOR_EXT)
return { path: requested + ACTOR_EXT, package_name: null, isCore: true }
} catch (e) {
// Not in core
}
return null
}
// Resolve module path with package awareness
// Resolution order:
// 1. Current package (root project when pkg_context is null)
// 2. Declared dependencies (from cell.toml)
// 3. core_qop (standard library)
function resolve_module_path(requested, pkg_context) {
var dependencies = (config && config.dependencies) ? config.dependencies : {}
// Step 1: current package
if (pkg_context) {
var pkg_module_path = '.cell/modules/' + pkg_context + '/' + requested + MOD_EXT
if (is_file(pkg_module_path)) {
return { path: pkg_module_path, package_name: pkg_context, isCore: false }
}
// Check if package is locally replaced
if (config && config.replace && config.replace[pkg_context]) {
var replace_path = config.replace[pkg_context]
var full_path = replace_path + '/' + requested + MOD_EXT
if (is_file(full_path)) {
return { path: full_path, package_name: pkg_context, isCore: false }
}
}
} else {
var project_module_path = requested + MOD_EXT
if (is_file(project_module_path)) {
return { path: project_module_path, package_name: null, isCore: false }
}
}
// Step 2: dependencies (explicit alias first) and replace directives
if (requested.includes('/')) {
var module_parts = requested.split('/')
var module_pkg_alias = module_parts[0]
var module_sub = module_parts.slice(1).join('/')
// Check for replace directive first
if (config && config.replace && config.replace[module_pkg_alias]) {
var replace_path = config.replace[module_pkg_alias]
var full_path = replace_path + '/' + (module_sub || module_pkg_alias) + MOD_EXT
if (is_file(full_path)) {
return { path: full_path, package_name: module_pkg_alias, isCore: false }
}
} else if (dependencies[module_pkg_alias]) {
var dep_module_path = '.cell/modules/' + module_pkg_alias + '/' + module_sub + MOD_EXT
if (is_file(dep_module_path)) {
return { path: dep_module_path, package_name: module_pkg_alias, isCore: false }
}
}
} else {
// Check replace directives for simple module names
if (config && config.replace && config.replace[requested]) {
var replace_path = config.replace[requested]
var full_path = replace_path + '/' + requested + MOD_EXT
if (is_file(full_path)) {
return { path: full_path, package_name: requested, isCore: false }
}
}
// Check dependencies for simple module names
for (var module_alias in dependencies) {
var dep_module_simple = '.cell/modules/' + module_alias + '/' + requested + MOD_EXT
if (is_file(dep_module_simple)) {
return { path: dep_module_simple, package_name: module_alias, isCore: false }
}
}
}
// Step 3: core
try {
core_qop.read(requested + MOD_EXT)
return { path: requested + MOD_EXT, package_name: null, isCore: true }
} catch (e) {
// Not in core
}
return null
}
globalThis.use = function use(file, ...args) {
/* Package-aware module resolution:
1. If in a package context, check within that package first
2. Check local project files
3. Check declared dependencies (from cell.toml [dependencies])
4. Check core_qop (standard library)
There's also the possibility of native C code;
there may be, in a package, a .so/.dll/.dylib
that can be loaded. If that exists, as well as a .cm file, the
.so/.dll/.dylib is loaded and the .cm file is ran with the
loaded module as this.
for embedded modules, it's the same, but in the cell runtime, so no .so/.dll/.dylib
is loaded.
*/
var requested = file
// Check embedded modules first (these are always available)
var embed_mod = use_embed(requested)
// Resolve the module path with package awareness
var resolved = resolve_module_path(requested, current_package)
// Generate cache key based on resolution
var cache_key = resolved
? (resolved.isCore ? 'core:' + resolved.path : resolved.path)
: requested
if (use_cache[cache_key]) return use_cache[cache_key]
if (!resolved && !embed_mod)
throw new Error(`Module ${file} could not be found (package context: ${current_package || 'none'})`)
// If only embedded module exists, return it
if (!resolved && embed_mod) {
use_cache[cache_key] = embed_mod
return embed_mod
}
var path = resolved.path
var isCore = resolved.isCore
var module_package = resolved.package_name
// If core module, load it
if (isCore) {
var ret = null
try {
var script = utf8.decode(core_qop.read(path))
var mod_script = `(function setup_${requested.replace(/[^a-zA-Z0-9_]/g, '_')}_module(arg, $_){${script};})`
var fn = js.compile(path, mod_script)
fn = js.eval_compile(fn)
var context = embed_mod ? embed_mod : {}
ret = fn.call(context, args, $_)
} catch (e) {
// Script component doesn't exist, fall back to embedded module
// log.console("use: core module " + path + " has no script component, using embedded module")
}
if (!ret && embed_mod) {
ret = embed_mod
} else if (!ret) {
throw new Error(`Use must be used with a module, but ${path} doesn't return a value`)
}
use_cache[cache_key] = ret
return ret
}
// Check for circular dependencies using the resolved path
if (path && loadingStack.includes(path)) {
let cycleIndex = loadingStack.indexOf(path)
let cyclePath = loadingStack.slice(cycleIndex).concat(path)
throw new Error(
`Circular dependency detected while loading "${file}".\n` +
`Module chain: ${loadingStack.join(" -> ")}\n` +
`Cycle specifically: ${cyclePath.join(" -> ")}`
)
}
log.console("use: loading file " + path + " (package: " + (module_package || 'local') + ")")
inProgress[path] = true
loadingStack.push(path)
// Save and set package context for nested use() calls
var prev_package = current_package
current_package = module_package
// Determine the compiled file path in .cell directory
var cleanPath = path.replace(/[:\\]/g, '/').replace(/\/+/g, '/')
var compiledPath = ".cell/build/" + cleanPath + '.o'
mkdir_p(compiledPath.substring(0, compiledPath.lastIndexOf('/')))
// Check if compiled version exists and is newer than source
var useCompiled = false
var srcStat = fd.stat(path)
var compiledStat = fd.stat(compiledPath)
// if (srcStat && compiledStat && compiledStat.mtime > srcStat.mtime) {
// useCompiled = true
// }
var fn
var mod_name = path.substring(path.lastIndexOf('/') + 1, path.lastIndexOf('.'))
if (useCompiled) {
var compiledBlob = fd.slurp(compiledPath)
fn = js.compile_unblob(compiledBlob)
fn = js.eval_compile(fn)
} else {
var script = utf8.decode(fd.slurp(path))
var mod_script = `(function setup_${mod_name}_module(arg, $_){${script};})`
fn = js.compile(path, mod_script)
// Save compiled version to .cell directory
// var compiled = js.compile_blob(fn)
// write_file(compiledPath, compiled)
fn = js.eval_compile(fn)
}
// Create context - if embedded module exists, use it as 'this'
var context = embed_mod ? embed_mod : {}
// Call the script - pass embedded module as 'this' if it exists
var ret = fn.call(context, args, $_)
// Restore previous package context
current_package = prev_package
// If script doesn't return anything, check if we have embedded module
if (!ret && embed_mod) {
ret = embed_mod
} else if (!ret) {
throw new Error(`Use must be used with a module, but ${path} doesn't return a value`)
}
loadingStack.pop()
delete inProgress[path]
// Cache the result
use_cache[cache_key] = ret
return ret
}
globalThis.json = use('json')
log.console(json.encode(cell))
var time = use('time')
var st_now = time.number()
var shop = use('shop')
log.console(`use shop in ${time.number() - st_now} seconds`)
config = shop.load_config()
var default_config = {
ar_timer: 60,
actor_memory:0,
net_service:0.1,
reply_timeout:60,
main: false,
}
config ??= {}
config.system ??= {}
config.system.__proto__ = default_config
ENETSERVICE = config.system.net_service
REPLYTIMEOUT = config.system.reply_timeout
log.console(`config loaded in ${time.number()-st_now} seconds`)
globalThis.text = use('text')
// Load actor-specific configuration
function load_actor_config(program) {
// Extract actor name from program path
// e.g., "prosperon/_sdl_video" or "extramath/spline"
var actor_name = program
if (program.includes('/')) {
actor_name = program
}
if (config.actors && config.actors[actor_name]) {
for (var key in config.actors[actor_name])
cell.args[key] = config.actors[actor_name][key]
}
}
var blob = use('blob')
var blob_stone = blob.prototype.stone
var blob_stonep = blob.prototype.stonep;
delete blob.prototype.stone;
delete blob.prototype.stonep;
function deepFreeze(object) {
if (object instanceof blob)
blob_stone.call(object);
// Retrieve the property names defined on object
var propNames = Object.keys(object);
// Freeze properties before freezing self
for (var name of propNames) {
var value = object[name];
if ((value && typeof value == "object") || typeof value == "function") {
deepFreeze(value);
}
}
return Object.freeze(object);
}
log.console(`stone initialized in ${time.number()-st_now} seconds`)
globalThis.stone = deepFreeze
stone.p = function(object)
{
if (object instanceof blob)
return blob_stonep.call(object)
return Object.isFrozen(object)
}
/*
When handling a message, the message appears like this:
{
type: type of message
- contact: used for contact messages
- stop: used to issue stop command
- etc
reply: ID this message will respond to (callback saved on the actor)
replycc: the actor that is waiting for the reply
target: ID of the actor that's supposed to receive the message. Only added to non direct sends (out of portals)
return: reply ID so the replycc actor can know what callback to send the message to
data: the actual content of the message
}
actors look like
{
id: the GUID of this actor
address: the IP this actor can be found at
port: the port of the IP the actor can be found at
}
*/
function guid(bits = 256)
{
var guid = new blob(bits, hidden.randi)
stone(guid)
return text(guid,'h')
}
var HEADER = Symbol()
function create_actor(desc = {id:guid()}) {
var actor = {}
actor[ACTORDATA] = desc
return actor
}
var $_ = create_actor()
$_.random = hidden.rand
$_.random[cell.DOC] = "returns a number between 0 and 1. There is a 50% chance that the result is less than 0.5."
$_.random_fit = hidden.randi
$_.clock = function(fn) {
actor_mod.clock(_ => {
fn(time.number())
send_messages()
})
}
$_.clock[cell.DOC] = "takes a function input value that will eventually be called with the current time in number form."
var underlings = new Set() // this is more like "all actors that are notified when we die"
var overling = null
var root = null
var receive_fn = null
var greeters = {} // Router functions for when messages are received for a specific actor
globalThis.is_actor = function is_actor(actor) {
return actor[ACTORDATA]
}
function peer_connection(peer) {
return {
latency: peer.rtt,
bandwidth: {
incoming: peer.incoming_bandwidth,
outgoing: peer.outgoing_bandwidth
},
activity: {
last_sent: peer.last_send_time,
last_received: peer.last_receive_time
},
mtu: peer.mtu,
data: {
incoming_total: peer.incoming_data_total,
outgoing_total: peer.outgoing_data_total,
reliable_in_transit: peer.reliable_data_in_transit
},
latency_variance: peer.rtt_variance,
packet_loss: peer.packet_loss,
state: peer.state
}
}
$_.connection = function(callback, actor, config) {
var peer = peers[actor[ACTORDATA].id]
if (peer) {
callback(peer_connection(peer))
return
}
if (actor_mod.mailbox_exist(actor[ACTORDATA].id)) {
callback({type:"local"})
return
}
callback()
}
$_.connection[cell.DOC] = "The connection function takes a callback function, an actor object, and a configuration record for getting information about the status of a connection to the actor. The configuration record is used to request the sort of information that needs to be communicated. This can include latency, bandwidth, activity, congestion, cost, partitions. The callback is given a record containing the requested information."
var peers = {}
var id_address = {}
var peer_queue = {}
var portal = null
var portal_fn = null
$_.portal = function(fn, port) {
if (portal) throw new Error(`Already started a portal listening on ${portal.port}`)
if (!port) throw new Error("Requires a valid port.")
log.system(`starting a portal on port ${port}`)
portal = enet.create_host({address: "any", port})
portal_fn = fn
}
$_.portal[cell.DOC] = "A portal is a special actor with a public address that performs introduction services. It listens on a specified port for contacts by external actors that need to acquire an actor object. The function will receive the record containing the request. The record can have a reply sent through it. A portal can respond by beginning a new actor, or finding an existing actor, or by forwarding the contact message to another actor. This is how distributed Misty networks are bootstrapped. The portal function returns null."
function handle_host(e) {
switch (e.type) {
case "connect":
log.system(`connected a new peer: ${e.peer.address}:${e.peer.port}`)
peers[`${e.peer.address}:${e.peer.port}`] = e.peer
var queue = peer_queue.get(e.peer)
if (queue) {
for (var msg of queue) e.peer.send(nota.encode(msg))
log.system(`sent ${json.encode(msg)} out of queue`)
peer_queue.delete(e.peer)
}
break
case "disconnect":
peer_queue.delete(e.peer)
for (var id in peers) if (peers[id] == e.peer) delete peers[id]
log.system('portal got disconnect from ' + e.peer.address + ":" + e.peer.port)
break
case "receive":
var data = nota.decode(e.data)
if (data.replycc && !data.replycc.address) {
data.replycc[ACTORDATA].address = e.peer.address
data.replycc[ACTORDATA].port = e.peer.port
}
function populate_actor_addresses(obj) {
if (typeof obj != 'object' || obj == null) return
if (obj[ACTORDATA] && !obj[ACTORDATA].address) {
obj[ACTORDATA].address = e.peer.address
obj[ACTORDATA].port = e.peer.port
}
for (var key in obj) {
if (obj.hasOwnProperty(key)) {
populate_actor_addresses(obj[key])
}
}
}
if (data.data) populate_actor_addresses(data.data)
turn(data)
break
}
}
$_.contact = function(callback, record) {
send(create_actor(record), record, callback)
}
$_.contact[cell.DOC] = `The contact function sends a message to a portal on another machine to obtain an actor object.
The callback is a function with a actor input and a reason input. If successful, actor is bound to an actor object. If not successful, actor is null and reason may contain an explanation.`
$_.receiver = function receiver(fn) {
receive_fn = fn
}
$_.receiver[cell.DOC] = "registers a function that will receive all messages..."
$_.start = function start(cb, program, ...args) {
if (!program) return
// Resolve the actor program path with package awareness
var resolved_program = resolve_actor_path(program, current_package)
if (!resolved_program) {
throw new Error(`Actor program ${program} could not be found (package context: ${current_package || 'none'})`)
}
var id = guid()
if (args.length == 1 && Array.isArray(args[0])) args = args[0]
var startup = {
id,
overling: $_,
root,
arg: args,
program: resolved_program.path,
package_context: resolved_program.package_name // Pass package context to new actor
}
greeters[id] = cb
message_queue.push({ startup })
}
$_.stop = function stop(actor) {
if (!actor) {
need_stop = true
return
}
if (!is_actor(actor))
throw new Error('Can only call stop on an actor.')
if (!underlings.has(actor[ACTORDATA].id))
throw new Error('Can only call stop on an underling or self.')
sys_msg(actor, {kind:"stop"})
}
$_.stop[cell.DOC] = "The stop function stops an underling."
$_.unneeded = function unneeded(fn, seconds) {
actor_mod.unneeded(fn, seconds)
}
$_.delay = function delay(fn, seconds = 0) {
if (seconds <= 0) {
$_.clock(fn)
return
}
function delay_turn() {
fn()
send_messages()
}
var id = actor_mod.delay(delay_turn, seconds)
return function() { actor_mod.removetimer(id) }
}
$_.delay[cell.DOC] = "used to schedule the invocation of a function..."
var couplings = new Set()
$_.couple = function couple(actor) {
if (actor == $_) return // can't couple to self
couplings.add(actor[ACTORDATA].id)
sys_msg(actor, {kind:'couple', from: $_})
log.system(`coupled to ${actor}`)
}
$_.couple[cell.DOC] = "causes this actor to stop when another actor stops."
function actor_prep(actor, send) {
message_queue.push({actor,send});
}
// Send a message immediately without queuing
function actor_send_immediate(actor, send) {
try {
actor_send(actor, send);
} catch (err) {
log.error("Failed to send immediate message:", err);
}
}
function actor_send(actor, message) {
if (actor[HEADER] && !actor[HEADER].replycc) // attempting to respond to a message but sender is not expecting; silently drop
return
if (!is_actor(actor) && !is_actor(actor.replycc)) throw new Error(`Must send to an actor object. Attempted send to ${json.encode(actor)}`)
if (typeof message != 'object') throw new Error('Must send an object record.')
// message to self
if (actor[ACTORDATA].id == cell.id) {
if (receive_fn) receive_fn(message.data)
return
}
// message to actor in same flock
if (actor[ACTORDATA].id && actor_mod.mailbox_exist(actor[ACTORDATA].id)) {
var wota_blob = wota.encode(message)
// log.console(`sending wota blob of ${wota_blob.length/8} bytes`)
actor_mod.mailbox_push(actor[ACTORDATA].id, wota_blob)
return
}
if (actor[ACTORDATA].address) {
if (actor[ACTORDATA].id)
message.target = actor[ACTORDATA].id
else
message.type = "contact"
var peer = peers[actor[ACTORDATA].address + ":" + actor[ACTORDATA].port]
if (!peer) {
if (!portal) {
log.system(`creating a contactor ...`)
portal = enet.create_host({address:"any"})
log.system(`allowing contact to port ${portal.port}`)
}
log.system(`no peer! connecting to ${actor[ACTORDATA].address}:${actor[ACTORDATA].port}`)
peer = portal.connect(actor[ACTORDATA].address, actor[ACTORDATA].port)
peer_queue.set(peer, [message])
} else {
peer.send(nota.encode(message))
}
return
}
log.system(`Unable to send message to actor ${json.encode(actor[ACTORDATA])}`)
}
// Holds all messages queued during the current turn.
var message_queue = []
var need_stop = false
function send_messages() {
// if we've been flagged to stop, bail out before doing anything
if (need_stop) {
disrupt()
message_queue.length = 0
return
}
for (var msg of message_queue) {
if (msg.startup) {
// now is the time to actually spin up the actor
actor_mod.createactor(msg.startup)
} else {
actor_send(msg.actor, msg.send)
}
}
message_queue.length = 0
}
var replies = {}
globalThis.send = function send(actor, message, reply) {
if (typeof actor != 'object')
throw new Error(`Must send to an actor object. Provided: ${json.encode(actor)}`);
if (typeof message != 'object')
throw new Error('Message must be an object')
var send = {type:"user", data: message}
if (actor[HEADER] && actor[HEADER].replycc) {
var header = actor[HEADER]
if (!header.replycc || !is_actor(header.replycc))
throw new Error(`Supplied actor had a return, but it's not a valid actor! ${json.encode(actor[HEADER])}`)
actor = header.replycc
send.return = header.reply
}
if (reply) {
var id = guid()
replies[id] = reply
$_.delay(_ => {
if (replies[id]) {
replies[id](null, "timeout")
delete replies[id]
}
}, REPLYTIMEOUT)
send.reply = id
send.replycc = $_
}
// Instead of sending immediately, queue it
actor_prep(actor,send);
}
stone(send)
if (!cell.args.id) cell.id = guid()
else cell.id = cell.args.id
$_[ACTORDATA].id = cell.id
// Actor's timeslice for processing a single message
function turn(msg)
{
var mes = wota.decode(msg)
handle_message(mes)
send_messages()
}
load_actor_config(cell.args.program)
actor_mod.register_actor(cell.id, turn, cell.args.main, config.system.ar_timer)
log.console(`actor registered in ${time.number()-st_now} seconds`)
if (config.system.actor_memory)
js.mem_limit(config.system.actor_memory)
if (config.system.stack_max)
js.max_stacksize(config.system.stack_max);
overling = cell.args.overling
root = cell.args.root
root ??= $_
// Set package context from parent actor (if spawned from a package)
if (cell.args.package_context) {
current_package = cell.args.package_context
log.console(`Actor initialized with package context: ${current_package}`)
} else {
// Infer package context from program path
current_package = get_package_from_path(cell.args.program)
if (current_package) {
log.console(`Actor inferred package context from path: ${current_package}`)
}
}
if (overling) {
$_.couple(overling) // auto couple to overling
report_to_overling({type:'greet', actor: $_})
}
// sys messages are always dispatched immediately
function sys_msg(actor, msg)
{
actor_send(actor, {[SYSYM]:msg})
}
// messages sent to here get put into the cb provided to start
function report_to_overling(msg)
{
if (!overling) return
sys_msg(overling, {kind:'underling', message:msg, from: $_})
}
if (!cell.args.program)
os.exit(1)
function handle_actor_disconnect(id) {
var greeter = greeters[id]
if (greeter) {
greeter({type: "stopped", id})
delete greeters[id]
}
log.system(`actor ${id} disconnected`)
if (couplings.has(id)) disrupt("coupled actor died") // couplings now disrupts instead of stop
}
function handle_sysym(msg)
{
switch(msg.kind) {
case 'stop':
disrupt("got stop message")
break
case 'underling':
var from = msg.from
var greeter = greeters[from[ACTORDATA].id]
if (greeter) greeter(msg.message)
if (msg.message.type == 'disrupt')
underlings.delete(from[ACTORDATA].id)
break
case 'contact':
if (portal_fn) {
var letter2 = msg.data
letter2[HEADER] = msg
delete msg.data
portal_fn(letter2)
} else throw new Error('Got a contact message, but no portal is established.')
break
case 'couple': // from must be notified when we die
var from = msg.from
underlings.add(from[ACTORDATA].id)
log.system(`actor ${from} is coupled to me`)
break
}
}
function handle_message(msg) {
if (msg[SYSYM]) {
handle_sysym(msg[SYSYM], msg.from)
return
}
switch (msg.type) {
case "user":
var letter = msg.data // what the sender really sent
Object.defineProperty(letter, HEADER, {
value: msg, enumerable: false
})
Object.defineProperty(letter, ACTORDATA, { // this is so is_actor == true
value: { reply: msg.reply }, enumerable: false
})
if (msg.return) {
var fn = replies[msg.return]
if (fn) fn(letter)
delete replies[msg.return]
return
}
if (receive_fn) receive_fn(letter)
return
case "stopped":
handle_actor_disconnect(msg.id)
break
}
};
function enet_check()
{
if (portal) portal.service(handle_host)
$_.delay(enet_check, ENETSERVICE);
}
// enet_check();
var init_end = time.number()
log.console(`initialization completed in ${init_end-st_now} seconds`)
var load_program_start = time.number()
// Finally, run the program
actor_mod.setname(cell.args.program)
var prog = cell.args.program
// Resolve the main program path
var resolved_prog = resolve_actor_path(cell.args.program, current_package)
if (!resolved_prog) {
throw new Error(`Main program ${cell.args.program} could not be found`)
}
prog = resolved_prog.path
var progContent
if (resolved_prog.isCore) {
progContent = utf8.decode(core_qop.read(prog))
} else {
progContent = utf8.decode(fd.slurp(prog))
}
var prog_script = `(function ${cell.args.program.name()}_start($_, arg) { var args = arg; ${progContent} })`
// queue up its first turn instead of run immediately
var startfn = js.eval(cell.args.program, prog_script);
log.console(`program compiled in ${time.number()-load_program_start} seconds`)
var exec_start = time.number()
$_.clock(_ => {
var val = startfn($_, cell.args.arg);
if (val)
throw new Error('Program must not return anything');
})
log.console(`program queued in ${time.number()-exec_start} seconds`)
log.console(`program executed in ${time.number()-st_now} seconds`)
})()