Files
cell/internal/engine.cm
2026-02-08 22:26:13 -06:00

872 lines
22 KiB
Plaintext

// Hidden vars (os, actorsym, init, core_path) come from env
var ACTORDATA = actorsym
var SYSYM = '__SYSTEM__'
var _cell = {}
var need_stop = false
var dylib_ext
var cases = {
Windows: '.dll',
macOS: '.dylib',
Linux: '.so'
}
print(os.platform())
dylib_ext = cases[os.platform()]
var MOD_EXT = '.cm'
var ACTOR_EXT = '.ce'
var load_internal = os.load_internal
function use_embed(name) {
return load_internal("js_" + name + "_use")
}
function logical(val1) {
if (val1 == 0 || val1 == false || val1 == "false" || val1 == null)
return false;
if (val1 == 1 || val1 == true || val1 == "true")
return true;
return null;
}
function some(arr, pred) {
return find(arr, pred) != null
}
function every(arr, pred) {
return find(arr, x => not(pred(x))) == null
}
function starts_with(str, prefix) {
return search(str, prefix) == 0
}
function ends_with(str, suffix) {
return search(str, suffix, -length(suffix)) != null
}
print("engine: loading js")
var js = use_embed('js')
print("engine: loading fd")
var fd = use_embed('fd')
print("engine: loaded fd")
print("engine: getting home")
// Get the shop path from HOME environment
var home = os.getenv('HOME') || os.getenv('USERPROFILE')
if (!home) {
os.print('Could not determine home directory\n')
os.exit(1)
}
var shop_path = home + '/.cell'
var packages_path = shop_path + '/packages'
var core_path = packages_path + '/core'
if (!fd.is_dir(core_path)) {
os.print('Cell shop not found at ' + shop_path + '. Run "cell install" to set up.\n')
os.exit(1)
}
var use_cache = {}
use_cache['core/os'] = os
// Load a core module from the file system
function use_core(path) {
var cache_key = 'core/' + path
if (use_cache[cache_key])
return use_cache[cache_key];
print(`engine: use_core('${path}') - embed`)
var sym = use_embed(replace(path, '/', '_'))
// Core scripts are in packages/core/
var file_path = core_path + '/' + path + MOD_EXT
if (fd.is_file(file_path)) {
print(`engine: use_core('${path}') - loading script`)
var script_blob = fd.slurp(file_path)
var script = text(script_blob)
var mod = `(function setup_module(use){${script}})`
var fn = mach_eval('core:' + path, mod)
if (fn == null) {
print(`engine: use_core('${path}') - mach_eval returned null!`)
return null
}
print(`engine: use_core('${path}') - calling (fn type: ${typeof(fn)})`)
var result = call(fn,sym, [use_core])
print(`engine: use_core('${path}') - done`)
use_cache[cache_key] = result;
return result;
}
print(`engine: use_core('${path}') - using embed directly`)
use_cache[cache_key] = sym;
return sym;
}
print("engine: loading blob")
var blob = use_core('blob')
print("engine: loaded blob")
function actor() {
}
print("engine: loading actor")
var actor_mod = use_core('actor')
print("engine: loading wota")
var wota = use_core('wota')
print("engine: loading nota")
var nota = use_core('nota')
print("engine: loaded nota")
function is_actor(value) {
return is_object(value) && value[ACTORDATA]
}
var ENETSERVICE = 0.1
var REPLYTIMEOUT = 60 // seconds before replies are ignored
function caller_data(depth = 0)
{
var file = "nofile"
var line = 0
var caller = array(Error().stack, "\n")[1+depth]
if (caller) {
var md = extract(caller, /\((.*)\:/)
var m = md ? md[1] : "SCRIPT"
if (m) file = m
md = extract(caller, /\:(\d*)\)/)
m = md ? md[1] : 0
if (m) line = m
}
return {file,line}
}
function console_rec(line, file, msg) {
return `[${text(_cell.id, 0, 5)}] [${file}:${line}]: ${msg}\n`
// time: [${time.text("mb d yyyy h:nn:ss")}]
}
function log(name, args) {
var caller = caller_data(1)
var msg = args[0]
if (name == 'console') {
os.print(console_rec(caller.line, caller.file, msg))
} else if (name == 'error') {
if (msg == null) msg = Error()
if (is_proto(msg, Error))
msg = msg.name + ": " + msg.message + "\n" + msg.stack
os.print(console_rec(caller.line, caller.file, msg))
} else if (name == 'system') {
msg = "[SYSTEM] " + msg
os.print(console_rec(caller.line, caller.file, msg))
} else {
log.console(`unknown log type: ${name}`)
}
}
function actor_die(err)
{
if (err && is_function(err.toString)) {
os.print(err.toString())
os.print("\n")
if (err.stack) os.print(err.stack)
}
if (overling) {
if (err) {
// with an err, this is a forceful disrupt
var reason = (is_proto(err, Error)) ? err.stack : err
report_to_overling({type:'disrupt', reason})
} else
report_to_overling({type:'stop'})
}
if (underlings) {
var unders = array(underlings)
arrfor(unders, function(id, index) {
log.console(`calling on ${id} to disrupt too`)
$_.stop(create_actor({id}))
})
}
if (err) {
if (err.message)
log.console(err.message)
if (err.stack)
log.console(err.stack)
}
actor_mod["disrupt"]()
}
print("engine: setting up actor_die")
actor_mod.on_exception(actor_die)
print("engine: setting cell args")
print(`engine: init is ${init}`)
_cell.args = init != null ? init : {}
print("engine: args set")
_cell.id = "newguy"
print("engine: id set")
function create_actor(desc = {id:guid()}) {
var actor = {}
actor[ACTORDATA] = desc
return actor
}
print("engine: creating $_")
var $_ = {}
print("engine: creating self actor")
$_.self = create_actor()
print("engine: setting os props")
os.use_cache = use_cache
os.global_shop_path = shop_path
os.$_ = $_
print("engine: os props set")
print("engine: loading shop")
var shop = use_core('internal/shop')
print("engine: loaded shop, loading json")
var json = use_core('json')
var time = use_core('time')
print("engine: loading pronto")
var pronto = use_core('pronto')
print("engine: loaded pronto")
var fallback = pronto.fallback
var parallel = pronto.parallel
var race = pronto.race
var sequence = pronto.sequence
// Create runtime environment for modules
var runtime_env = {
logical: logical,
some: some,
every: every,
starts_with: starts_with,
ends_with: ends_with,
actor: actor,
is_actor: is_actor,
log: log,
send: send,
fallback: fallback,
parallel: parallel,
race: race,
sequence: sequence
}
// Pass to os for shop to access
os.runtime_env = runtime_env
$_.time_limit = function(requestor, seconds)
{
if (!pronto.is_requestor(requestor)) {
log.error('time_limit: first argument must be a requestor')
disrupt
}
if (!is_number(seconds) || seconds <= 0) {
log.error('time_limit: seconds must be a positive number')
disrupt
}
return function time_limit_requestor(callback, value) {
pronto.check_callback(callback, 'time_limit')
var finished = false
var requestor_cancel = null
var timer_cancel = null
function cancel(reason) {
if (finished) return
finished = true
if (timer_cancel) {
timer_cancel()
timer_cancel = null
}
if (requestor_cancel) {
requestor_cancel(reason)
requestor_cancel = null
}
}
function safe_cancel_requestor(reason) {
if (requestor_cancel) {
requestor_cancel(reason)
requestor_cancel = null
}
}
timer_cancel = $_.delay(function() {
if (finished) return
def reason = make_reason(factory, 'Timeout.', seconds)
safe_cancel_requestor(reason)
finished = true
callback(null, reason)
}, seconds)
function do_request() {
requestor_cancel = requestor(function(val, reason) {
if (finished) return
finished = true
if (timer_cancel) {
timer_cancel()
timer_cancel = null
}
callback(val, reason)
}, value)
} disruption {
cancel(Error('requestor failed'))
callback(null, Error('requestor failed'))
}
do_request()
return function(reason) {
safe_cancel_requestor(reason)
}
}
}
var config = {
ar_timer: 60,
actor_memory:0,
net_service:0.1,
reply_timeout:60,
main: true
}
_cell.config = config
ENETSERVICE = config.net_service
REPLYTIMEOUT = config.reply_timeout
/*
When handling a message, the message appears like this:
{
type: type of message
- contact: used for contact messages
- stop: used to issue stop command
- etc
reply: ID this message will respond to (callback saved on the actor)
replycc: the actor that is waiting for the reply
target: ID of the actor that's supposed to receive the message. Only added to non direct sends (out of portals)
return: reply ID so the replycc actor can know what callback to send the message to
data: the actual content of the message
}
actors look like
{
id: the GUID of this actor
address: the IP this actor can be found at
port: the port of the IP the actor can be found at
}
*/
function guid(bits = 256)
{
var guid = blob(bits, os.random)
stone(guid)
return text(guid,'h')
}
var HEADER = {}
// takes a function input value that will eventually be called with the current time in number form.
$_.clock = function(fn) {
actor_mod.clock(_ => {
fn(time.number())
send_messages()
})
}
var underlings = {} // this is more like "all actors that are notified when we die"
var overling = null
var root = null
var receive_fn = null
var greeters = {} // Router functions for when messages are received for a specific actor
function peer_connection(peer) {
return {
latency: peer.rtt,
bandwidth: {
incoming: peer.incoming_bandwidth,
outgoing: peer.outgoing_bandwidth
},
activity: {
last_sent: peer.last_send_time,
last_received: peer.last_receive_time
},
mtu: peer.mtu,
data: {
incoming_total: peer.incoming_data_total,
outgoing_total: peer.outgoing_data_total,
reliable_in_transit: peer.reliable_data_in_transit
},
latency_variance: peer.rtt_variance,
packet_loss: peer.packet_loss,
state: peer.state
}
}
// takes a callback function, an actor object, and a configuration record for getting information about the status of a connection to the actor. The configuration record is used to request the sort of information that needs to be communicated. This can include latency, bandwidth, activity, congestion, cost, partitions. The callback is given a record containing the requested information.
$_.connection = function(callback, actor, config) {
var peer = peers[actor[ACTORDATA].id]
if (peer) {
callback(peer_connection(peer))
return
}
if (actor_mod.mailbox_exist(actor[ACTORDATA].id)) {
callback({type:"local"})
return
}
callback()
}
var peers = {}
var id_address = {}
var peer_queue = {}
var portal = null
var portal_fn = null
// takes a function input value that will eventually be called with the current time in number form.
$_.portal = function(fn, port) {
if (portal) {
log.error(`Already started a portal listening on ${portal.port}`)
disrupt
}
if (!port) {
log.error("Requires a valid port.")
disrupt
}
log.system(`starting a portal on port ${port}`)
portal = enet.create_host({address: "any", port})
portal_fn = fn
}
function handle_host(e) {
if (e.type == "connect") {
log.system(`connected a new peer: ${e.peer.address}:${e.peer.port}`)
peers[`${e.peer.address}:${e.peer.port}`] = e.peer
var queue = peer_queue.get(e.peer)
if (queue) {
arrfor(queue, (msg, index) => e.peer.send(nota.encode(msg)))
log.system(`sent ${msg} out of queue`)
peer_queue.delete(e.peer)
}
} else if (e.type == "disconnect") {
peer_queue.delete(e.peer)
arrfor(array(peers), function(id, index) {
if (peers[id] == e.peer) delete peers[id]
})
log.system('portal got disconnect from ' + e.peer.address + ":" + e.peer.port)
} else if (e.type == "receive") {
var data = nota.decode(e.data)
if (data.replycc && !data.replycc.address) {
data.replycc[ACTORDATA].address = e.peer.address
data.replycc[ACTORDATA].port = e.peer.port
}
function populate_actor_addresses(obj) {
if (!is_object(obj)) return
if (obj[ACTORDATA] && !obj[ACTORDATA].address) {
obj[ACTORDATA].address = e.peer.address
obj[ACTORDATA].port = e.peer.port
}
arrfor(array(obj), function(key, index) {
if (key in obj)
populate_actor_addresses(obj[key])
})
}
if (data.data) populate_actor_addresses(data.data)
turn(data)
}
}
// takes a callback function, an actor object, and a configuration record for getting information about the status of a connection to the actor. The configuration record is used to request the sort of information that needs to be communicated. This can include latency, bandwidth, activity, congestion, cost, partitions. The callback is given a record containing the requested information.
$_.contact = function(callback, record) {
send(create_actor(record), record, callback)
}
// registers a function that will receive all messages...
$_.receiver = function receiver(fn) {
receive_fn = fn
}
$_.start = function start(cb, program) {
if (!program) return
var id = guid()
var startup = {
id,
overling: $_.self,
root,
program,
}
greeters[id] = cb
push(message_queue, { startup })
}
// stops an underling or self.
$_.stop = function stop(actor) {
if (!actor) {
need_stop = true
return
}
if (!is_actor(actor)) {
log.error('Can only call stop on an actor.')
disrupt
}
if (is_null(underlings[actor[ACTORDATA].id])) {
log.error('Can only call stop on an underling or self.')
disrupt
}
sys_msg(actor, {kind:"stop"})
}
// schedules the removal of an actor after a specified amount of time.
$_.unneeded = function unneeded(fn, seconds) {
actor_mod.unneeded(fn, seconds)
}
// schedules the invocation of a function after a specified amount of time.
$_.delay = function delay(fn, seconds = 0) {
function delay_turn() {
fn()
send_messages()
}
var id = actor_mod.delay(delay_turn, seconds)
return function() { actor_mod.removetimer(id) }
}
var enet = use_core('enet')
// causes this actor to stop when another actor stops.
var couplings = {}
$_.couple = function couple(actor) {
if (actor == $_.self) return // can't couple to self
couplings[actor[ACTORDATA].id] = true
sys_msg(actor, {kind:'couple', from: $_.self})
log.system(`coupled to ${actor}`)
}
function actor_prep(actor, send) {
push(message_queue, {actor,send});
}
// Send a message immediately without queuing
function actor_send_immediate(actor, send) {
actor_send(actor, send)
}
function actor_send(actor, message) {
if (actor[HEADER] && !actor[HEADER].replycc) // attempting to respond to a message but sender is not expecting; silently drop
return
if (!is_actor(actor) && !is_actor(actor.replycc)) {
log.error(`Must send to an actor object. Attempted send to ${actor}`)
disrupt
}
if (!is_object(message)) {
log.error('Must send an object record.')
disrupt
}
// message to self
if (actor[ACTORDATA].id == _cell.id) {
if (receive_fn) receive_fn(message.data)
return
}
// message to actor in same flock
if (actor[ACTORDATA].id && actor_mod.mailbox_exist(actor[ACTORDATA].id)) {
var wota_blob = wota.encode(message)
// log.console(`sending wota blob of ${length(wota_blob)/8} bytes`)
actor_mod.mailbox_push(actor[ACTORDATA].id, wota_blob)
return
}
if (actor[ACTORDATA].address) {
if (actor[ACTORDATA].id)
message.target = actor[ACTORDATA].id
else
message.type = "contact"
var peer = peers[actor[ACTORDATA].address + ":" + actor[ACTORDATA].port]
if (!peer) {
if (!portal) {
log.system(`creating a contactor ...`)
portal = enet.create_host({address:"any"})
log.system(`allowing contact to port ${portal.port}`)
}
log.system(`no peer! connecting to ${actor[ACTORDATA].address}:${actor[ACTORDATA].port}`)
peer = portal.connect(actor[ACTORDATA].address, actor[ACTORDATA].port)
peer_queue.set(peer, [message])
} else {
peer.send(nota.encode(message))
}
return
}
log.system(`Unable to send message to actor ${actor[ACTORDATA]}`)
}
// Holds all messages queued during the current turn.
var message_queue = []
function send_messages() {
// if we've been flagged to stop, bail out before doing anything
if (need_stop) {
actor_die()
message_queue = []
return
}
arrfor(message_queue, function(msg, index) {
if (msg.startup) {
// now is the time to actually spin up the actor
actor_mod.createactor(msg.startup)
} else {
actor_send(msg.actor, msg.send)
}
})
message_queue = []
}
var replies = {}
function send(actor, message, reply) {
if (!is_object(actor)) {
log.error(`Must send to an actor object. Provided: ${actor}`)
disrupt
}
if (!is_object(message)) {
log.error('Message must be an object')
disrupt
}
var send_msg = {type:"user", data: message}
var target = actor
if (actor[HEADER] && actor[HEADER].replycc) {
var header = actor[HEADER]
if (!header.replycc || !is_actor(header.replycc)) {
log.error(`Supplied actor had a return, but it's not a valid actor! ${actor[HEADER]}`)
disrupt
}
target = header.replycc
send_msg.return = header.reply
}
if (reply) {
var id = guid()
replies[id] = reply
$_.delay(_ => {
if (replies[id]) {
replies[id](null, "timeout")
delete replies[id]
}
}, REPLYTIMEOUT)
send_msg.reply = id
send_msg.replycc = $_.self
}
// Instead of sending immediately, queue it
actor_prep(target, send_msg);
}
stone(send)
if (!_cell.args.id) _cell.id = guid()
else _cell.id = _cell.args.id
$_.self[ACTORDATA].id = _cell.id
// Actor's timeslice for processing a single message
function turn(msg)
{
var mes = wota.decode(msg)
handle_message(mes)
send_messages()
}
//log.console(`FIXME: need to get main from config, not just set to true`)
actor_mod.register_actor(_cell.id, turn, true, config.ar_timer)
if (config.actor_memory)
js.mem_limit(config.actor_memory)
if (config.stack_max)
js.max_stacksize(config.system.stack_max);
overling = _cell.args.overling
$_.overling = overling
root = _cell.args.root
if (root == null) root = $_.self
if (overling) {
$_.couple(overling) // auto couple to overling
report_to_overling({type:'greet', actor: $_.self})
}
// sys messages are always dispatched immediately
function sys_msg(actor, msg)
{
actor_send(actor, {[SYSYM]:msg})
}
// messages sent to here get put into the cb provided to start
function report_to_overling(msg)
{
if (!overling) return
sys_msg(overling, {kind:'underling', message:msg, from: $_.self})
}
// Determine the program to run from command line
var program = _cell.args.program
if (!program) {
log.error('No program specified. Usage: cell <program.ce> [args...]')
os.exit(1)
}
function handle_actor_disconnect(id) {
var greeter = greeters[id]
if (greeter) {
greeter({type: "stopped", id})
delete greeters[id]
}
log.system(`actor ${id} disconnected`)
if (!is_null(couplings[id])) actor_die("coupled actor died") // couplings now disrupts instead of stop
}
function handle_sysym(msg)
{
var from
if (msg.kind == 'stop') {
actor_die("got stop message")
} else if (msg.kind == 'underling') {
from = msg.from
var greeter = greeters[from[ACTORDATA].id]
if (greeter) greeter(msg.message)
if (msg.message.type == 'disrupt')
delete underlings[from[ACTORDATA].id]
} else if (msg.kind == 'contact') {
if (portal_fn) {
var letter2 = msg.data
letter2[HEADER] = msg
delete msg.data
portal_fn(letter2)
} else {
log.error('Got a contact message, but no portal is established.')
disrupt
}
} else if (msg.kind == 'couple') {
// from must be notified when we die
from = msg.from
underlings[from[ACTORDATA].id] = true
log.system(`actor ${from} is coupled to me`)
}
}
function handle_message(msg) {
if (msg[SYSYM]) {
handle_sysym(msg[SYSYM], msg.from)
return
}
if (msg.type == "user") {
var letter = msg.data // what the sender really sent
_ObjectDefineProperty(letter, HEADER, {
value: msg, enumerable: false
})
_ObjectDefineProperty(letter, ACTORDATA, { // this is so is_actor == true
value: { reply: msg.reply }, enumerable: false
})
if (msg.return) {
var fn = replies[msg.return]
if (fn) fn(letter)
delete replies[msg.return]
return
}
if (receive_fn) receive_fn(letter)
} else if (msg.type == "stopped") {
handle_actor_disconnect(msg.id)
}
}
function enet_check()
{
if (portal) portal.service(handle_host)
$_.delay(enet_check, ENETSERVICE);
}
// enet_check();
// Finally, run the program
actor_mod.setname(_cell.args.program)
var prog = _cell.args.program
var package = use_core('package')
var locator = shop.resolve_locator(_cell.args.program + ".ce", null)
if (!locator) {
var pkg = package.find_package_dir(_cell.args.program + ".ce")
locator = shop.resolve_locator(_cell.args.program + ".ce", pkg)
}
if (!locator) {
os.print(`Main program ${_cell.args.program} could not be found\n`)
os.exit(1)
}
$_.clock(_ => {
// Get capabilities for the main program
var file_info = shop.file_info ? shop.file_info(locator.path) : null
var inject = shop.script_inject_for ? shop.script_inject_for(file_info) : []
// Build env object for injection
var env = {}
for (var i = 0; i < length(inject); i++) {
var key = inject[i]
if (key && key[0] == '$') key = text(key, 1)
if (key == 'fd') env[key] = fd
else env[key] = $_[key]
}
// Create use function bound to the program's package
var pkg = file_info ? file_info.package : null
var use_fn = function(path) { return shop.use(path, pkg) }
// Call with signature: setup_module(args, use, env)
// The script wrapper binds $delay, $start, etc. from env
var val = call(locator.symbol, null, [_cell.args.arg, use_fn, env])
if (val)
log.error('Program must not return anything')
disrupt
})