Files
cell/scripts/engine.cm

1226 lines
34 KiB
Plaintext

(function engine() {
var ACTORDATA = cell.hidden.actorsym
var SYSYM = '__SYSTEM__'
var hidden = cell.hidden
var os = hidden.os;
cell.os = null
var dylib_ext
switch(os.platform()) {
case 'Windows': dylib_ext = '.dll'; break;
case 'macOS': dylib_ext = '.dylib'; break;
case 'Linux': dylib_ext = '.so'; break;
}
var load_internal = os.load_internal
function use_embed(name) {
return load_internal(`js_${name}_use`)
}
var actor_mod = use_embed('actor')
var wota = use_embed('wota')
var nota = use_embed('nota')
var enet = use_embed('enet')
var fd = use_embed('fd')
var ENETSERVICE = 0.1
var REPLYTIMEOUT = 60 // seconds before replies are ignored
var MOD_EXT = '.cm'
var ACTOR_EXT = '.ce'
globalThis.pi = 3.1415926535897932
var open_dl = {}
function get_import_package(name) {
var parts = name.split('/')
if (parts.length > 1)
return parts[0]
return null
}
function get_import_dl(name) {
var pkg = get_import_package(name)
if (!pkg) return null
if (open_dl[pkg]) return open_dl[pkg]
var dlpath = `.cell/modules/${pkg}/${pkg}${dylib_ext}`
var dl = os.dylib_open(dlpath)
if (dl) {
open_dl[pkg] = dl
return dl
}
return null
}
function get_c_symbol(name) {
var dl = get_import_dl(name)
var symname = `js_${name.replace('/', '_')}_use`
if (dl)
return os.dylib_symbol(dl, symname)
else
return load_internal(symname)
}
function caller_data(depth = 0)
{
var file = "nofile"
var line = 0
var caller = new Error().stack.split("\n")[1+depth]
if (caller) {
var md = caller.match(/\((.*)\:/)
var m = md ? md[1] : "SCRIPT"
if (m) file = m
md = caller.match(/\:(\d*)\)/)
m = md ? md[1] : 0
if (m) line = m
}
return {file,line}
}
cell.args = cell.hidden.init
cell.args ??= {}
cell.id ??= "newguy"
function console_rec(line, file, msg) {
return `[${cell.id.slice(0,5)}] [${file}:${line}]: ${msg}\n`
// time: [${time.text("mb d yyyy h:nn:ss")}]
}
globalThis.log = {}
log.console = function(msg)
{
var caller = caller_data(1)
os.print(console_rec(caller.line, caller.file, msg))
}
log.error = function(msg = new Error())
{
var caller = caller_data(1)
if (msg instanceof Error)
msg = msg.name + ": " + msg.message + "\n" + msg.stack
os.print(console_rec(caller.line,caller.file,msg))
}
log.system = function(msg) {
msg = "[SYSTEM] " + msg
log.console(msg)
}
var shop_path = '.cell'
if (!fd.stat('.cell').isDirectory) {
log.console("No cell directory found. Make one.\n");
os.exit(1);
}
function write_file(path, blob) {
var fd_handle = fd.open(path, 'w')
fd.write(fd_handle, blob)
fd.close(fd_handle)
}
function mkdir_p(dir) {
if (dir == '' || dir == '.') return
var st = null
try { st = fd.stat(dir) } catch {}
if (!st || !st.isDirectory) {
mkdir_p(dir.substring(0, dir.lastIndexOf('/')))
try { fd.mkdir(dir) } catch {}
}
}
var qop = use_embed('qop')
var core_qop = qop.open(hidden.core_qop_blob)
var utf8 = use_embed('utf8')
function disrupt(err)
{
if (overling) {
if (err) {
// with an err, this is a forceful disrupt
var reason = (err instanceof Error) ? err.stack : err
report_to_overling({type:'disrupt', reason})
} else
report_to_overling({type:'stop'})
}
if (underlings) {
for (var id of underlings) {
log.console(`calling on ${id} to disrupt too`)
$_.stop(create_actor({id}))
}
}
if (err) {
log.console(err);
if (err.stack)
log.console(err.stack)
}
actor_mod.disrupt()
}
actor_mod.on_exception(disrupt)
var js = use_embed('js')
var use_cache = {}
var BASEPATH = 'base' + MOD_EXT
var script = utf8.decode(core_qop.read(BASEPATH))
var fnname = "base"
script = `(function ${fnname}() { ${script}; })`
js.eval(BASEPATH, script)()
var inProgress = {}
var loadingStack = []
// Track current package context for nested use() calls
var current_package = null
// Get package name from a resolved path
function get_package_from_path(path) {
if (!path) return null
var modules_prefix = '.cell/modules/'
if (path.startsWith(modules_prefix)) {
var rest = path.substring(modules_prefix.length)
var slash_idx = rest.indexOf('/')
if (slash_idx > 0) {
return rest.substring(0, slash_idx)
}
}
return null
}
var config = null
// Scope definitions
var SCOPE_LOCAL = 0
var SCOPE_DEPENDENCY = 1
var SCOPE_CORE = 2
function resolve_path(requested, pkg_context, ext) {
if (requested.endsWith(ext)) ext = ''
// Check for underscore prefix in imports - not allowed except for local access
if (requested.includes('/')) {
var parts = requested.split('/')
if (parts.some(part => part.startsWith('_'))) {
throw new Error(`Cannot import modules with underscore prefix: ${requested}`)
}
} else if (requested.startsWith('_')) {
throw new Error(`Cannot import modules with underscore prefix: ${requested}`)
}
var dependencies = (config && config.dependencies) ? config.dependencies : {}
// Helper to check file existence and return result object
function check(path, pkg, isCore, scope) {
if (isCore) {
try {
var blob = core_qop.read(path)
if (blob) {
return { path: path, package_name: null, isCore: true, scope: scope }
}
return null
} catch (e) { return null }
}
// Check for private files if accessing from a different package
if (pkg && current_package && pkg != current_package) {
var filename = path.substring(path.lastIndexOf('/') + 1)
if (filename.startsWith('_')) return null
}
if (fd.is_file(path)) {
return { path: path, package_name: pkg, isCore: false, scope: scope }
}
return null
}
// Step 1: current package (Local)
if (pkg_context) {
var pkg_path = '.cell/modules/' + pkg_context + '/' + requested + ext
var underscore_pkg_path = '.cell/modules/' + pkg_context + '/_' + requested + ext
var res = check(pkg_path, pkg_context, false, SCOPE_LOCAL)
var underscore_res = check(underscore_pkg_path, pkg_context, false, SCOPE_LOCAL)
// If both regular and underscore versions exist, throw error
if (res && underscore_res) {
throw new Error(`Module conflict: both '${requested}' and '_${requested}' exist in package '${pkg_context}'`)
}
// Prefer regular version, but allow underscore version if no regular exists
if (res) return res
if (underscore_res) return underscore_res
// Check if package is locally replaced
if (config && config.replace && config.replace[pkg_context]) {
var replace_path = config.replace[pkg_context]
var full_path = replace_path + '/' + requested + ext
var underscore_full_path = replace_path + '/_' + requested + ext
res = check(full_path, pkg_context, false, SCOPE_LOCAL)
underscore_res = check(underscore_full_path, pkg_context, false, SCOPE_LOCAL)
if (res && underscore_res) {
throw new Error(`Module conflict: both '${requested}' and '_${requested}' exist in replaced package '${pkg_context}'`)
}
if (res) return res
if (underscore_res) return underscore_res
}
} else {
// Top-level local
var project_path = requested + ext
var underscore_project_path = '_' + requested + ext
var res = check(project_path, null, false, SCOPE_LOCAL)
var underscore_res = check(underscore_project_path, null, false, SCOPE_LOCAL)
// If both regular and underscore versions exist, throw error
if (res && underscore_res) {
throw new Error(`Module conflict: both '${requested}' and '_${requested}' exist locally`)
}
// Prefer regular version, but allow underscore version if no regular exists
if (res) return res
if (underscore_res) return underscore_res
}
// Step 2: dependencies (explicit alias first) and replace directives
if (requested.includes('/')) {
var parts = requested.split('/')
var pkg_alias = parts[0]
var sub_path = parts.slice(1).join('/')
// Check for replace directive first
if (config && config.replace && config.replace[pkg_alias]) {
var replace_path = config.replace[pkg_alias]
var full_path = replace_path + '/' + (sub_path || pkg_alias) + ext
var res = check(full_path, pkg_alias, false, SCOPE_DEPENDENCY)
if (res) return res
} else if (dependencies[pkg_alias]) {
var dep_path = '.cell/modules/' + pkg_alias + '/' + sub_path + ext
var res = check(dep_path, pkg_alias, false, SCOPE_DEPENDENCY)
if (res) return res
}
// Also check if it's just a module in .cell/modules even if not in dependencies (implicit)
var implicit_path = '.cell/modules/' + requested + ext
var res = check(implicit_path, pkg_alias, false, SCOPE_DEPENDENCY)
if (res) return res
} else {
// Check replace directives for simple names
if (config && config.replace && config.replace[requested]) {
var replace_path = config.replace[requested]
var full_path = replace_path + '/' + requested + ext
var res = check(full_path, requested, false, SCOPE_DEPENDENCY)
if (res) return res
}
// Check dependencies for simple names
for (var alias in dependencies) {
if (alias == requested) {
var dep_simple = '.cell/modules/' + alias + '/' + requested + ext
var res = check(dep_simple, alias, false, SCOPE_DEPENDENCY)
if (res) return res
}
}
// Implicit check
var implicit_path = '.cell/modules/' + requested + '/' + requested + ext
var res = check(implicit_path, requested, false, SCOPE_DEPENDENCY)
if (res) return res
}
// Step 3: core
var core_res = check(requested + ext, null, true, SCOPE_CORE)
return core_res
}
function get_compiled_path(resolved) {
var build_base = '.cell/build/'
if (resolved.isCore) {
return build_base + 'core/' + resolved.path + '.o'
} else if (resolved.package_name) {
// If it's in .cell/modules/<pkg>/...
var prefix = '.cell/modules/' + resolved.package_name + '/'
if (resolved.path.startsWith(prefix)) {
return build_base + 'modules/' + resolved.package_name + '/' + resolved.path.substring(prefix.length) + '.o'
}
// This seems correct for the "modules" case.
log.console(json.encode(resolved))
return build_base + 'modules/' + resolved.package_name + '/' + resolved.path.substring(resolved.path.lastIndexOf('/') + 1) + '.o'
} else {
// Local project file
return build_base + 'local/' + resolved.path + '.o'
}
}
var open_dl = {}
function get_c_symbol(requested, pkg_context) {
// Construct symbol name: js_x_y_z_use
var symname = `js_${requested.replace(/\//g, '_')}_use`
// 1. Check Local
if (!pkg_context) {
// Check local dylib: .cell/local/local.dylib
var local_dl_path = '.cell/local/local' + dylib_ext
if (fd.is_file(local_dl_path)) {
var dl = open_dl['local']
if (!dl) {
try {
dl = os.dylib_open(local_dl_path)
open_dl['local'] = dl
} catch(e) {}
}
if (dl) {
try {
// For local symbols, use js_local_<name>_use
var local_symname = `js_local_${requested.replace(/\//g, '_')}_use`
var sym = os.dylib_symbol(dl, local_symname)
if (sym) {
return { symbol: sym, scope: SCOPE_LOCAL }
} else {
}
} catch(e) {
}
}
}
}
// 2. Check Modules
// Determine package name from requested path
var pkg_name = null
if (pkg_context) pkg_name = pkg_context
else if (requested.includes('/')) pkg_name = requested.split('/')[0]
else pkg_name = requested
if (pkg_name) {
var mod_dl_path = `.cell/modules/${pkg_name}/${pkg_name}${dylib_ext}`
if (fd.is_file(mod_dl_path)) {
var dl = open_dl[pkg_name]
if (!dl) {
try {
dl = os.dylib_open(mod_dl_path)
open_dl[pkg_name] = dl
} catch(e) {}
}
if (dl) {
try {
var sym = os.dylib_symbol(dl, symname)
if (sym) return { symbol: sym, scope: SCOPE_DEPENDENCY }
} catch(e) {}
}
}
}
// 3. Check Core (Internal)
var internal_sym = load_internal(symname)
if (internal_sym) return { symbol: internal_sym, scope: SCOPE_CORE }
return null
}
function get_module(name, pkg_context) {
return resolve_path(name, pkg_context, MOD_EXT)
}
function get_actor_script(name, pkg_context) {
return resolve_path(name, pkg_context, ACTOR_EXT)
}
globalThis.use = function use(file, ...args) {
var requested = file
// Find C symbol
var c_res = get_c_symbol(requested, current_package)
// Find Module
var mod_res = get_module(requested, current_package)
var c_mod = null
var resolved = null
// Decision logic
if (c_res && mod_res) {
if (c_res.scope < mod_res.scope) {
// C symbol is more specific
c_mod = c_res.symbol
resolved = null
} else if (mod_res.scope < c_res.scope) {
// Module is more specific
c_mod = null
resolved = mod_res
} else {
// Same scope - use both (C symbol as context)
c_mod = c_res.symbol
resolved = mod_res
}
} else if (c_res) {
c_mod = c_res.symbol
} else if (mod_res) {
resolved = mod_res
} else {
// Try embed as fallback for core
var embed_mod = use_embed(requested)
if (embed_mod) c_mod = embed_mod
}
if (!c_mod && !resolved)
throw new Error(`Module ${file} could not be found (package context: ${current_package || 'none'})`)
// Generate cache key
var cache_key = resolved
? (resolved.isCore ? 'core:' + resolved.path : resolved.path)
: (c_mod ? 'c:' + requested : requested)
if (use_cache[cache_key]) return use_cache[cache_key]
// log.console(`cache miss: ${cache_key}`)
// If we have a C module, use it as context
var context = {}
if (c_mod) {
context = c_mod
}
// If we have a script, run it
var ret = c_mod
if (resolved) {
var path = resolved.path
var isCore = resolved.isCore
var module_package = resolved.package_name
// Check for circular dependencies
if (path && loadingStack.includes(path)) {
let cycleIndex = loadingStack.indexOf(path)
let cyclePath = loadingStack.slice(cycleIndex).concat(path)
throw new Error(`Circular dependency: ${cyclePath.join(" -> ")}`)
}
inProgress[path] = true
loadingStack.push(path)
var prev_package = current_package
current_package = module_package
var compiledPath = get_compiled_path(resolved)
mkdir_p(compiledPath.substring(0, compiledPath.lastIndexOf('/')))
var useCompiled = false
var srcStat = fd.stat(path)
var compiledStat = fd.stat(compiledPath)
// Always compile from source - never use precompiled for regular modules
// if (srcStat && srcStat.isFile && compiledStat && compiledStat.isFile && compiledStat.mtime > srcStat.mtime) {
// useCompiled = true
// }
var fn
var mod_name = path.substring(path.lastIndexOf('/') + 1, path.lastIndexOf('.'))
if (useCompiled) {
var compiledBlob = fd.slurp(compiledPath)
fn = js.compile_unblob(compiledBlob)
fn = js.eval_compile(fn)
log.console("use: using compiled version " + compiledPath)
} else {
if (isCore) {
var script = utf8.decode(core_qop.read(path))
var mod_script = `(function setup_${requested.replace(/[^a-zA-Z0-9_]/g, '_')}_module(arg, $_){${script};})`
fn = js.compile(path, mod_script)
// Save compiled version
mkdir_p(compiledPath.substring(0, compiledPath.lastIndexOf('/')))
var compiled = js.compile_blob(fn)
write_file(compiledPath, compiled)
fn = js.eval_compile(fn)
} else {
var script = utf8.decode(fd.slurp(path))
var mod_script = `(function setup_${mod_name}_module(arg, $_){${script};})`
fn = js.compile(path, mod_script)
// Save compiled version to .cell directory
var compiled = js.compile_blob(fn)
write_file(compiledPath, compiled)
fn = js.eval_compile(fn)
}
}
ret = fn.call(context, args, $_)
current_package = prev_package
loadingStack.pop()
delete inProgress[path]
}
if (!ret && c_mod) {
log.console("use: script returned nothing, using c_mod")
ret = c_mod
}
else if (!ret) throw new Error(`Use must be used with a module, but ${file} doesn't return a value`)
use_cache[cache_key] = ret
return ret
}
globalThis.json = use('json')
var time = use('time')
var st_now = time.number()
var shop = use('shop')
config = shop.load_config()
var default_config = {
ar_timer: 60,
actor_memory:0,
net_service:0.1,
reply_timeout:60,
main: false,
}
config ??= {}
config.system ??= {}
config.system.__proto__ = default_config
cell.config = config
ENETSERVICE = config.system.net_service
REPLYTIMEOUT = config.system.reply_timeout
globalThis.text = use('text')
// Load actor-specific configuration
function load_actor_config(program) {
// Extract actor name from program path
// e.g., "prosperon/_sdl_video" or "extramath/spline"
var actor_name = program
if (program.includes('/')) {
actor_name = program
}
if (config.actors && config.actors[actor_name]) {
for (var key in config.actors[actor_name])
cell.args[key] = config.actors[actor_name][key]
}
}
var blob = use('blob')
var blob_stone = blob.prototype.stone
var blob_stonep = blob.prototype.stonep;
delete blob.prototype.stone;
delete blob.prototype.stonep;
function deepFreeze(object) {
if (object instanceof blob)
blob_stone.call(object);
// Retrieve the property names defined on object
var propNames = Object.keys(object);
// Freeze properties before freezing self
for (var name of propNames) {
var value = object[name];
if ((value && typeof value == "object") || typeof value == "function") {
deepFreeze(value);
}
}
return Object.freeze(object);
}
globalThis.stone = deepFreeze
stone.p = function(object)
{
if (object instanceof blob)
return blob_stonep.call(object)
return Object.isFrozen(object)
}
/*
When handling a message, the message appears like this:
{
type: type of message
- contact: used for contact messages
- stop: used to issue stop command
- etc
reply: ID this message will respond to (callback saved on the actor)
replycc: the actor that is waiting for the reply
target: ID of the actor that's supposed to receive the message. Only added to non direct sends (out of portals)
return: reply ID so the replycc actor can know what callback to send the message to
data: the actual content of the message
}
actors look like
{
id: the GUID of this actor
address: the IP this actor can be found at
port: the port of the IP the actor can be found at
}
*/
function guid(bits = 256)
{
var guid = new blob(bits, os.random)
stone(guid)
return text(guid,'h')
}
var HEADER = Symbol()
function create_actor(desc = {id:guid()}) {
var actor = {}
actor[ACTORDATA] = desc
return actor
}
var $_ = create_actor()
// returns a number between 0 and 1. There is a 50% chance that the result is less than 0.5.
$_.random = function() {
var n = os.random()
return n > 100000
}
$_.random_fit = os.random
// takes a function input value that will eventually be called with the current time in number form.
$_.clock = function(fn) {
actor_mod.clock(_ => {
fn(time.number())
send_messages()
})
}
var underlings = new Set() // this is more like "all actors that are notified when we die"
var overling = null
var root = null
var receive_fn = null
var greeters = {} // Router functions for when messages are received for a specific actor
globalThis.is_actor = function is_actor(actor) {
return actor[ACTORDATA]
}
function peer_connection(peer) {
return {
latency: peer.rtt,
bandwidth: {
incoming: peer.incoming_bandwidth,
outgoing: peer.outgoing_bandwidth
},
activity: {
last_sent: peer.last_send_time,
last_received: peer.last_receive_time
},
mtu: peer.mtu,
data: {
incoming_total: peer.incoming_data_total,
outgoing_total: peer.outgoing_data_total,
reliable_in_transit: peer.reliable_data_in_transit
},
latency_variance: peer.rtt_variance,
packet_loss: peer.packet_loss,
state: peer.state
}
}
// takes a callback function, an actor object, and a configuration record for getting information about the status of a connection to the actor. The configuration record is used to request the sort of information that needs to be communicated. This can include latency, bandwidth, activity, congestion, cost, partitions. The callback is given a record containing the requested information.
$_.connection = function(callback, actor, config) {
var peer = peers[actor[ACTORDATA].id]
if (peer) {
callback(peer_connection(peer))
return
}
if (actor_mod.mailbox_exist(actor[ACTORDATA].id)) {
callback({type:"local"})
return
}
callback()
}
var peers = {}
var id_address = {}
var peer_queue = {}
var portal = null
var portal_fn = null
// takes a function input value that will eventually be called with the current time in number form.
$_.portal = function(fn, port) {
if (portal) throw new Error(`Already started a portal listening on ${portal.port}`)
if (!port) throw new Error("Requires a valid port.")
log.system(`starting a portal on port ${port}`)
portal = enet.create_host({address: "any", port})
portal_fn = fn
}
function handle_host(e) {
switch (e.type) {
case "connect":
log.system(`connected a new peer: ${e.peer.address}:${e.peer.port}`)
peers[`${e.peer.address}:${e.peer.port}`] = e.peer
var queue = peer_queue.get(e.peer)
if (queue) {
for (var msg of queue) e.peer.send(nota.encode(msg))
log.system(`sent ${json.encode(msg)} out of queue`)
peer_queue.delete(e.peer)
}
break
case "disconnect":
peer_queue.delete(e.peer)
for (var id in peers) if (peers[id] == e.peer) delete peers[id]
log.system('portal got disconnect from ' + e.peer.address + ":" + e.peer.port)
break
case "receive":
var data = nota.decode(e.data)
if (data.replycc && !data.replycc.address) {
data.replycc[ACTORDATA].address = e.peer.address
data.replycc[ACTORDATA].port = e.peer.port
}
function populate_actor_addresses(obj) {
if (typeof obj != 'object' || obj == null) return
if (obj[ACTORDATA] && !obj[ACTORDATA].address) {
obj[ACTORDATA].address = e.peer.address
obj[ACTORDATA].port = e.peer.port
}
for (var key in obj) {
if (obj.hasOwnProperty(key)) {
populate_actor_addresses(obj[key])
}
}
}
if (data.data) populate_actor_addresses(data.data)
turn(data)
break
}
}
// takes a callback function, an actor object, and a configuration record for getting information about the status of a connection to the actor. The configuration record is used to request the sort of information that needs to be communicated. This can include latency, bandwidth, activity, congestion, cost, partitions. The callback is given a record containing the requested information.
$_.contact = function(callback, record) {
send(create_actor(record), record, callback)
}
// registers a function that will receive all messages...
$_.receiver = function receiver(fn) {
receive_fn = fn
}
$_.start = function start(cb, program, ...args) {
if (!program) return
// Resolve the actor program path with package awareness
var resolved_program = resolve_path(program, current_package, ACTOR_EXT)
if (!resolved_program) {
throw new Error(`Actor program ${program} could not be found (package context: ${current_package || 'none'})`)
}
var id = guid()
if (args.length == 1 && Array.isArray(args[0])) args = args[0]
var startup = {
id,
overling: $_,
root,
arg: args,
program: resolved_program.path,
package_context: resolved_program.package_name // Pass package context to new actor
}
greeters[id] = cb
message_queue.push({ startup })
}
// stops an underling or self.
$_.stop = function stop(actor) {
if (!actor) {
need_stop = true
return
}
if (!is_actor(actor))
throw new Error('Can only call stop on an actor.')
if (!underlings.has(actor[ACTORDATA].id))
throw new Error('Can only call stop on an underling or self.')
sys_msg(actor, {kind:"stop"})
}
// schedules the removal of an actor after a specified amount of time.
$_.unneeded = function unneeded(fn, seconds) {
actor_mod.unneeded(fn, seconds)
}
// schedules the invocation of a function after a specified amount of time.
$_.delay = function delay(fn, seconds = 0) {
if (seconds <= 0) {
$_.clock(fn)
return
}
function delay_turn() {
fn()
send_messages()
}
var id = actor_mod.delay(delay_turn, seconds)
return function() { actor_mod.removetimer(id) }
}
// causes this actor to stop when another actor stops.
var couplings = new Set()
$_.couple = function couple(actor) {
if (actor == $_) return // can't couple to self
couplings.add(actor[ACTORDATA].id)
sys_msg(actor, {kind:'couple', from: $_})
log.system(`coupled to ${actor}`)
}
function actor_prep(actor, send) {
message_queue.push({actor,send});
}
// Send a message immediately without queuing
function actor_send_immediate(actor, send) {
try {
actor_send(actor, send);
} catch (err) {
log.error("Failed to send immediate message:", err);
}
}
function actor_send(actor, message) {
if (actor[HEADER] && !actor[HEADER].replycc) // attempting to respond to a message but sender is not expecting; silently drop
return
if (!is_actor(actor) && !is_actor(actor.replycc)) throw new Error(`Must send to an actor object. Attempted send to ${json.encode(actor)}`)
if (typeof message != 'object') throw new Error('Must send an object record.')
// message to self
if (actor[ACTORDATA].id == cell.id) {
if (receive_fn) receive_fn(message.data)
return
}
// message to actor in same flock
if (actor[ACTORDATA].id && actor_mod.mailbox_exist(actor[ACTORDATA].id)) {
var wota_blob = wota.encode(message)
// log.console(`sending wota blob of ${wota_blob.length/8} bytes`)
actor_mod.mailbox_push(actor[ACTORDATA].id, wota_blob)
return
}
if (actor[ACTORDATA].address) {
if (actor[ACTORDATA].id)
message.target = actor[ACTORDATA].id
else
message.type = "contact"
var peer = peers[actor[ACTORDATA].address + ":" + actor[ACTORDATA].port]
if (!peer) {
if (!portal) {
log.system(`creating a contactor ...`)
portal = enet.create_host({address:"any"})
log.system(`allowing contact to port ${portal.port}`)
}
log.system(`no peer! connecting to ${actor[ACTORDATA].address}:${actor[ACTORDATA].port}`)
peer = portal.connect(actor[ACTORDATA].address, actor[ACTORDATA].port)
peer_queue.set(peer, [message])
} else {
peer.send(nota.encode(message))
}
return
}
log.system(`Unable to send message to actor ${json.encode(actor[ACTORDATA])}`)
}
// Holds all messages queued during the current turn.
var message_queue = []
var need_stop = false
function send_messages() {
// if we've been flagged to stop, bail out before doing anything
if (need_stop) {
disrupt()
message_queue.length = 0
return
}
for (var msg of message_queue) {
if (msg.startup) {
// now is the time to actually spin up the actor
actor_mod.createactor(msg.startup)
} else {
actor_send(msg.actor, msg.send)
}
}
message_queue.length = 0
}
var replies = {}
globalThis.send = function send(actor, message, reply) {
if (typeof actor != 'object')
throw new Error(`Must send to an actor object. Provided: ${json.encode(actor)}`);
if (typeof message != 'object')
throw new Error('Message must be an object')
var send = {type:"user", data: message}
if (actor[HEADER] && actor[HEADER].replycc) {
var header = actor[HEADER]
if (!header.replycc || !is_actor(header.replycc))
throw new Error(`Supplied actor had a return, but it's not a valid actor! ${json.encode(actor[HEADER])}`)
actor = header.replycc
send.return = header.reply
}
if (reply) {
var id = guid()
replies[id] = reply
$_.delay(_ => {
if (replies[id]) {
replies[id](null, "timeout")
delete replies[id]
}
}, REPLYTIMEOUT)
send.reply = id
send.replycc = $_
}
// Instead of sending immediately, queue it
actor_prep(actor,send);
}
stone(send)
if (!cell.args.id) cell.id = guid()
else cell.id = cell.args.id
$_[ACTORDATA].id = cell.id
// Actor's timeslice for processing a single message
function turn(msg)
{
var mes = wota.decode(msg)
handle_message(mes)
send_messages()
}
load_actor_config(cell.args.program)
actor_mod.register_actor(cell.id, turn, cell.args.main, config.system.ar_timer)
if (config.system.actor_memory)
js.mem_limit(config.system.actor_memory)
if (config.system.stack_max)
js.max_stacksize(config.system.stack_max);
overling = cell.args.overling
root = cell.args.root
root ??= $_
// Set package context from parent actor (if spawned from a package)
if (cell.args.package_context) {
current_package = cell.args.package_context
log.console(`Actor initialized with package context: ${current_package}`)
} else {
// Infer package context from program path
current_package = get_package_from_path(cell.args.program)
if (current_package) {
log.console(`Actor inferred package context from path: ${current_package}`)
}
}
if (overling) {
$_.couple(overling) // auto couple to overling
report_to_overling({type:'greet', actor: $_})
}
// sys messages are always dispatched immediately
function sys_msg(actor, msg)
{
actor_send(actor, {[SYSYM]:msg})
}
// messages sent to here get put into the cb provided to start
function report_to_overling(msg)
{
if (!overling) return
sys_msg(overling, {kind:'underling', message:msg, from: $_})
}
if (!cell.args.program)
os.exit(1)
function handle_actor_disconnect(id) {
var greeter = greeters[id]
if (greeter) {
greeter({type: "stopped", id})
delete greeters[id]
}
log.system(`actor ${id} disconnected`)
if (couplings.has(id)) disrupt("coupled actor died") // couplings now disrupts instead of stop
}
function handle_sysym(msg)
{
switch(msg.kind) {
case 'stop':
disrupt("got stop message")
break
case 'underling':
var from = msg.from
var greeter = greeters[from[ACTORDATA].id]
if (greeter) greeter(msg.message)
if (msg.message.type == 'disrupt')
underlings.delete(from[ACTORDATA].id)
break
case 'contact':
if (portal_fn) {
var letter2 = msg.data
letter2[HEADER] = msg
delete msg.data
portal_fn(letter2)
} else throw new Error('Got a contact message, but no portal is established.')
break
case 'couple': // from must be notified when we die
var from = msg.from
underlings.add(from[ACTORDATA].id)
log.system(`actor ${from} is coupled to me`)
break
}
}
function handle_message(msg) {
if (msg[SYSYM]) {
handle_sysym(msg[SYSYM], msg.from)
return
}
switch (msg.type) {
case "user":
var letter = msg.data // what the sender really sent
Object.defineProperty(letter, HEADER, {
value: msg, enumerable: false
})
Object.defineProperty(letter, ACTORDATA, { // this is so is_actor == true
value: { reply: msg.reply }, enumerable: false
})
if (msg.return) {
var fn = replies[msg.return]
if (fn) fn(letter)
delete replies[msg.return]
return
}
if (receive_fn) receive_fn(letter)
return
case "stopped":
handle_actor_disconnect(msg.id)
break
}
};
function enet_check()
{
if (portal) portal.service(handle_host)
$_.delay(enet_check, ENETSERVICE);
}
// enet_check();
var init_end = time.number()
var load_program_start = time.number()
// Finally, run the program
actor_mod.setname(cell.args.program)
var prog = cell.args.program
// Resolve the main program path
var resolved_prog = resolve_path(cell.args.program, current_package, ACTOR_EXT)
if (!resolved_prog) {
throw new Error(`Main program ${cell.args.program} could not be found`)
}
prog = resolved_prog.path
var startfn
var compiledPath = get_compiled_path(resolved_prog)
var useCompiled = false
// Always compile from source - never use precompiled for main program
// if (resolved_prog.isCore) {
// // For core, we check if we have a compiled version, else we compile it
// if (fd.is_file(compiledPath)) {
// useCompiled = true
// }
// } else {
// // For local/modules, check timestamps
// var srcStat = fd.stat(prog)
// var compiledStat = fd.stat(compiledPath)
// if (srcStat && srcStat.isFile && compiledStat && compiledStat.isFile && compiledStat.mtime > srcStat.mtime) {
// useCompiled = true
// }
// }
if (useCompiled) {
var compiledBlob = fd.slurp(compiledPath)
var fn = js.compile_unblob(compiledBlob)
startfn = js.eval_compile(fn)
log.console("main: using compiled version " + compiledPath)
} else {
var progContent
if (resolved_prog.isCore) {
progContent = utf8.decode(core_qop.read(prog))
} else {
progContent = utf8.decode(fd.slurp(prog))
}
var prog_script = `(function ${cell.args.program.name()}_start($_, arg) { var args = arg; ${progContent} })`
// Compile and save
var fn = js.compile(cell.args.program, prog_script)
mkdir_p(compiledPath.substring(0, compiledPath.lastIndexOf('/')))
var compiled = js.compile_blob(fn)
write_file(compiledPath, compiled)
startfn = js.eval_compile(fn)
}
$_.clock(_ => {
var val = startfn($_, cell.args.arg);
if (val)
throw new Error('Program must not return anything');
})
})()