Files
prosperon/scripts/engine.cm

739 lines
20 KiB
Plaintext

(function engine() {
globalThis.cell = prosperon
cell.DOC = cell.hidden.DOCSYM
var MOD_EXT = '.cm'
var ACTOR_EXT = '.ce'
globalThis.pi = 3.1415926535897932
function caller_data(depth = 0)
{
var file = "nofile"
var line = 0
var caller = new Error().stack.split("\n")[1+depth]
if (caller) {
var md = caller.match(/\((.*)\:/)
var m = md ? md[1] : "SCRIPT"
if (m) file = m
md = caller.match(/\:(\d*)\)/)
m = md ? md[1] : 0
if (m) line = m
}
return {file,line}
}
cell.id ??= "newguy"
function console_rec(line, file, msg) {
return `[${cell.id.slice(0,5)}] [${file}:${line}]: ${msg}\n`
var id = cell.name ? cell.name : cell.id
id = id.substring(0,6)
return `[${id}] [${time.text("mb d yyyy h:nn:ss")}] ${file}:${line}: ${msg}\n`
}
var console_mod = cell.hidden.console
var logs = {}
logs.console = function(msg)
{
var caller = caller_data(4)
console_mod.print(console_rec(caller.line, caller.file, msg))
}
logs.error = function(msg)
{
var caller = caller_data(4)
var err
if (!msg || !(msg instanceof Error))
err = new Error()
else {
err = msg
msg = undefined
}
console_mod.print(console_rec(caller.line,caller.file,`${msg}
${err.stack}`))
}
logs.panic = function(msg)
{
pprint(e, 5)
os.quit()
}
function noop() {}
globalThis.log = new Proxy(logs, {
get(target,prop,receiver) {
if (target[prop])
return (...args) => args.forEach(arg => target[prop](arg))
return noop
}
})
// Get hidden modules from cell.hidden before stripping it
var hidden = cell.hidden
var actor_mod = hidden.actor
var wota = hidden.wota
var use_embed = hidden.use_embed
var use_dyn = hidden.use_dyn
var enet = hidden.enet
var nota = hidden.nota
// Strip hidden from cell so nothing else can access it
delete cell.hidden
var os = use_embed('os')
var js = use_embed('js')
var io = use_embed('io')
if (!io.exists('.cell')) {
console_mod.print("No cell directory found. Make one.\n");
os.exit(1);
}
io.mount(".cell/modules", "")
var use_cache = {}
function print_api(obj) {
for (var prop in obj) {
if (!obj.hasOwnProperty(prop)) continue
var val = obj[prop]
log.console(prop)
if (typeof val === 'function') {
var m = val.toString().match(/\(([^)]*)\)/)
if (m) log.console(' function: ' + prop + '(' + m[1].trim() + ')')
}
}
}
var res_cache = {}
var BASEPATH = 'scripts/base' + MOD_EXT
var script = io.slurp(BASEPATH)
var fnname = "base"
script = `(function ${fnname}() { ${script}; })`
js.eval(BASEPATH, script)()
var inProgress = {}
var loadingStack = []
globalThis.use = function use(file, ...args) {
// Check cache first
if (use_cache[file]) {
return use_cache[file]
}
// Check for circular dependencies
if (loadingStack.includes(file)) {
let cycleIndex = loadingStack.indexOf(file)
let cyclePath = loadingStack.slice(cycleIndex).concat(file)
throw new Error(
`Circular dependency detected while loading "${file}".\n` +
`Module chain: ${loadingStack.join(" -> ")}\n` +
`Cycle specifically: ${cyclePath.join(" -> ")}`
)
}
var path = null
if (io.exists(file + MOD_EXT) && !io.is_directory(file + MOD_EXT)) {
path = file + MOD_EXT
}
// Check if there's an embedded module
var embed_mod = use_embed(file)
// If no script and no embedded module, error
if (!path && !embed_mod) {
throw new Error(`Module ${file} could not be found`)
}
// If only embedded module exists, return it
if (!path && embed_mod) {
use_cache[file] = embed_mod
return embed_mod
}
// If we have a script path, check for circular dependency
if (inProgress[path]) {
throw new Error(
`Circular dependency detected while loading "${file}".\n` +
`Module chain: ${loadingStack.join(" -> ")}\n` +
`Cycle specifically: ${cyclePath.join(" -> ")}`
)
}
inProgress[path] = true
loadingStack.push(file)
// Determine the compiled file path in .cell directory
var compiledPath = ".cell/build/" + io.realdir(path) + "/" + path + '.o'
io.mkdir(compiledPath.dir())
// Check if compiled version exists and is newer than source
var useCompiled = false
if (io.exists(compiledPath)) {
var srcStat = io.stat(path)
var compiledStat = io.stat(compiledPath)
if (compiledStat.modtime >= srcStat.modtime) {
useCompiled = true
}
}
var fn
var mod_name = path.name()
if (useCompiled) {
var compiledBlob = io.slurpbytes(compiledPath)
fn = js.compile_unblob(compiledBlob)
fn = js.eval_compile(fn)
} else {
// Compile from source
var script = io.slurp(path)
var mod_script = `(function setup_${mod_name}_module(arg){${script};})`
fn = js.compile(path, mod_script)
// Save compiled version to .cell directory
var compiled = js.compile_blob(fn)
io.slurpwrite(compiledPath, compiled)
fn = js.eval_compile(fn)
}
// Create context - if embedded module exists, script extends it
var context = {}
if (embed_mod)
context.__proto__ = embed_mod
// Call the script - pass embedded module as 'this' if it exists
var ret = fn.call(context, args)
// If script doesn't return anything, check if we have embedded module
if (!ret && embed_mod) {
ret = embed_mod
} else if (!ret) {
throw new Error(`Use must be used with a module, but ${path} doesn't return a value`)
}
loadingStack.pop()
delete inProgress[path]
// Cache the result
use_cache[file] = ret
return ret
}
globalThis.json = use('json')
var time = use('time')
var blob = use('blob')
function deepFreeze(object) {
if (object instanceof blob)
object.stone()
// Retrieve the property names defined on object
var propNames = Object.keys(object);
// Freeze properties before freezing self
for (var name of propNames) {
var value = object[name];
if ((value && typeof value === "object") || typeof value === "function") {
deepFreeze(value);
}
}
return Object.freeze(object);
}
globalThis.stone = deepFreeze
stone.p = function(object)
{
return Object.isFrozen(object)
}
var DOCPATH = 'scripts/doc' + MOD_EXT
var script = io.slurp(DOCPATH)
var fnname = "doc"
script = `(function ${fnname}() { ${script}; })`
/*
When handling a message, the message appears like this:
{
type: type of message
- contact: used for contact messages
- stop: used to issue stop command
- etc
reply: ID this message will respond to (callback saved on the actor)
replycc: the actor that is waiting for the reply
target: ID of the actor that's supposed to receive the message. Only added to non direct sends (out of portals)
return: reply ID so the replycc actor can know what callback to send the message to
data: the actual content of the message
}
actors look like
{
id: the GUID of this actor
address: the IP this actor can be found at
port: the port of the IP the actor can be found at
}
*/
var util = use('util')
var math = use('math')
var crypto = use('crypto')
var dying = false
var HEADER = Symbol()
function create_actor(__ACTORDATA__ = {id:util.guid()}) {
return { __ACTORDATA__ }
}
var $_ = create_actor()
$_.random = crypto.random
$_.random[cell.DOC] = "returns a number between 0 and 1. There is a 50% chance that the result is less than 0.5."
$_.clock = function(fn) { return os.now() }
$_.clock[cell.DOC] = "takes a function input value that will eventually be called with the current time in number form."
var underlings = new Set()
var overling = undefined
var root = undefined
// Don't make $_ global - it should only be available to actor scripts
var receive_fn = undefined
var greeters = {}
function is_actor(actor) {
return actor.__ACTORDATA__
}
globalThis.is_actor = is_actor;
function peer_connection(peer) {
return {
latency: peer.rtt,
bandwidth: {
incoming: peer.incoming_bandwidth,
outgoing: peer.outgoing_bandwidth
},
activity: {
last_sent: peer.last_send_time,
last_received: peer.last_receive_time
},
mtu: peer.mtu,
data: {
incoming_total: peer.incoming_data_total,
outgoing_total: peer.outgoing_data_total,
reliable_in_transit: peer.reliable_data_in_transit
},
latency_variance: peer.rtt_variance,
packet_loss: peer.packet_loss,
state: peer.state
}
}
$_.connection = function(callback, actor, config) {
var peer = peers[actor.__ACTORDATA__.id]
if (peer) {
callback(peer_connection(peer))
return
}
if (actor_mod.mailbox_exist(actor.__ACTORDATA__.id)) {
callback({type:"local"})
return
}
throw new Error(`Could not get connection information for ${actor}`)
}
$_.connection[cell.DOC] = "The connection function takes a callback function, an actor object, and a configuration record for getting information about the status of a connection to the actor. The configuration record is used to request the sort of information that needs to be communicated. This can include latency, bandwidth, activity, congestion, cost, partitions. The callback is given a record containing the requested information."
var peers = {}
var id_address = {}
var peer_queue = new WeakMap()
var portal = undefined
var portal_fn = undefined
var service_delay = 0.01
$_.portal = function(fn, port) {
if (portal) throw new Error(`Already started a portal listening on ${portal.port}`)
if (!port) throw new Error("Requires a valid port.")
log.console(`starting a portal on port ${port}`)
portal = enet.create_host({address: "any", port})
portal_fn = fn
}
$_.portal[cell.DOC] = "A portal is a special actor with a public address that performs introduction services. It listens on a specified port for contacts by external actors that need to acquire an actor object. The function will receive the record containing the request. The record can have a reply sent through it. A portal can respond by beginning a new actor, or finding an existing actor, or by forwarding the contact message to another actor. This is how distributed Misty networks are bootstrapped. The portal function returns null."
function handle_host(e) {
switch (e.type) {
case "connect":
log.console(`connected a new peer: ${e.peer.address}:${e.peer.port}`)
peers[`${e.peer.address}:${e.peer.port}`] = e.peer
var queue = peer_queue.get(e.peer)
if (queue) {
for (var msg of queue) e.peer.send(nota.encode(msg))
log.console(`sent ${json.encode(msg)} out of queue`)
peer_queue.delete(e.peer)
}
break
case "disconnect":
peer_queue.delete(e.peer)
for (var id in peers) if (peers[id] === e.peer) delete peers[id]
log.console('portal got disconnect from ' + e.peer.address + ":" + e.peer.port)
break
case "receive":
var data = nota.decode(e.data)
// log.console(`got message ${json.encode(data)} over the wire`)
if (data.replycc && !data.replycc.address) {
data.replycc.__ACTORDATA__.address = e.peer.address
data.replycc.__ACTORDATA__.port = e.peer.port
}
// Also populate address/port for any actor objects in the message data
function populate_actor_addresses(obj) {
if (typeof obj !== 'object' || obj === null) return
if (obj.__ACTORDATA__ && !obj.__ACTORDATA__.address) {
obj.__ACTORDATA__.address = e.peer.address
obj.__ACTORDATA__.port = e.peer.port
}
for (var key in obj) {
if (obj.hasOwnProperty(key)) {
populate_actor_addresses(obj[key])
}
}
}
if (data.data) populate_actor_addresses(data.data)
// log.console(`turned it into ${json.encode(data)} over the wire`)
handle_message(data)
break
}
}
$_.contact = function(callback, record) {
send(create_actor(record), record, callback)
}
$_.contact[cell.DOC] = `The contact function sends a message to a portal on another machine to obtain an actor object.
The callback is a function with a actor input and a reason input. If successful, actor is bound to an actor object. If not successful, actor is null and reason may contain an explanation.`
$_.receiver = function receiver(fn) {
receive_fn = fn
}
$_.receiver[cell.DOC] = "registers a function that will receive all messages..."
$_.start = function start(cb, prg, arg) {
if (dying) {
log.warn(`Cannot start an underling in the same turn as we're stopping`)
return
}
var id = util.guid()
greeters[id] = cb
var argv = ["./cell", "spawn", "--id", id, "--overling", json.encode($_), "--root", json.encode(root)]
if (prg) argv = argv.concat(['--program', prg])
if (arg) argv = argv.concat(cmd.encode(arg))
underlings.add(id)
actor_mod.createactor(argv)
}
$_.start[cell.DOC] = "The start function creates a new actor..."
$_.stop = function stop(actor) {
if (!actor) {
destroyself()
return
}
if (!is_actor(actor))
throw new Error('Can only call stop on an actor.')
if (!underlings.has(actor.__ACTORDATA__.id))
throw new Error('Can only call stop on an underling or self.')
actor_prep(actor, {type:"stop", id: cell.id})
}
$_.stop[cell.DOC] = "The stop function stops an underling."
$_.unneeded = function unneeded(fn, seconds) {
actor_mod.unneeded(fn, seconds)
}
$_.unneeded[cell.DOC] = "registers a function that is called when the actor..."
$_.delay = function delay(fn, seconds) {
function delay_turn() {
fn()
send_messages()
}
var id = actor_mod.delay(delay_turn, seconds)
return function() { actor_mod.removetimer(id) }
}
$_.delay[cell.DOC] = "used to schedule the invocation of a function..."
var couplings = new Set()
$_.couple = function couple(actor) {
log.console(`coupled to ${actor.__ACTORDATA__.id}`)
couplings.add(actor.__ACTORDATA__.id)
}
$_.couple[cell.DOC] = "causes this actor to stop when another actor stops."
function actor_prep(actor, send) {
message_queue.push({actor,send});
}
function actor_send(actor, message) {
if (actor[HEADER] && !actor[HEADER].replycc) // attempting to respond to a message but sender is not expecting; silently drop
return
if (!is_actor(actor) ) throw new Error(`Must send to an actor object. Attempted send to ${json.encode(actor)}`)
if (typeof message !== 'object') throw new Error('Must send an object record.')
// message to self
if (actor.__ACTORDATA__.id === cell.id) {
if (receive_fn) receive_fn(message.data)
return
}
// message to actor in same flock
if (actor.__ACTORDATA__.id && actor_mod.mailbox_exist(actor.__ACTORDATA__.id)) {
actor_mod.mailbox_push(actor.__ACTORDATA__.id, message)
return
}
if (actor.__ACTORDATA__.address) {
if (actor.__ACTORDATA__.id)
message.target = actor.__ACTORDATA__.id
else
message.type = "contact"
var peer = peers[actor.__ACTORDATA__.address + ":" + actor.__ACTORDATA__.port]
if (!peer) {
if (!portal) {
log.console(`creating a contactor ...`)
portal = enet.create_host({address:"any"})
log.console(`allowing contact to port ${portal.port}`)
}
log.console(`no peer! connecting to ${actor.__ACTORDATA__.address}:${actor.__ACTORDATA__.port}`)
peer = portal.connect(actor.__ACTORDATA__.address, actor.__ACTORDATA__.port)
peer_queue.set(peer, [message])
} else {
peer.send(nota.encode(message))
}
return
}
throw new Error(`Unable to send message to actor ${json.encode(actor)}`)
}
// Holds all messages queued during the current turn.
var message_queue = []
function send_messages() {
// Attempt to flush the queued messages. If one fails, keep going anyway.
var errors = []
while (message_queue.length > 0) {
var item = message_queue.shift()
var actor = item.actor
var send = item.send
try {
actor_send(actor, send)
} catch (err) {
errors.push(err)
}
}
if (errors.length > 0) {
log.error("Some messages failed to send:", errors)
for (var i of errors) log.error(i)
}
}
var replies = {}
function _send(actor, message, reply) {
if (typeof message !== 'object')
throw new Error('Message must be an object')
var send = {type:"user", data: message}
if (actor[HEADER] && actor[HEADER].replycc) {
var header = actor[HEADER]
if (!header.replycc || !is_actor(header.replycc))
throw new Error(`Supplied actor had a return, but it's not a valid actor! ${json.encode(actor[HEADER])}`)
actor = header.replycc
send.return = header.reply
}
if (reply) {
var id = util.guid()
replies[id] = reply
send.reply = id
send.replycc = $_ // This still references the engine's internal $_
}
// Instead of sending immediately, queue it
actor_prep(actor,send);
}
Object.defineProperty(globalThis, 'send', {
value: _send,
writable: false,
configurable: false,
enumerable: true
});
var cmd = use('cmd')
cmd.process(cell.argv.slice())
if (!cell.args.id) cell.id = util.guid()
else cell.id = cell.args.id
// Make remaining arguments available as global 'args' variable
globalThis.args = cell.args.remaining || []
$_.__ACTORDATA__.id = cell.id
function turn(msg)
{
try {
handle_message(msg)
send_messages()
} catch (err) {
message_queue = []
throw err
}
}
actor_mod.register_actor(cell.id, turn, cell.args.main)
if (cell.args.overling) overling = json.decode(cell.args.overling)
if (cell.args.root) root = json.decode(cell.args.root)
else root = $_
if (overling) actor_prep(overling, {type:'greet', actor: $_})
if (!cell.args.program)
os.exit(1)
function destroyself() {
log.spam(`Got the message to destroy self.`)
dying = true
for (var i of underlings)
$_.stop(create_actor({id:i}))
actor_mod.destroy()
}
function handle_actor_disconnect(id) {
var greeter = greeters[id]
if (greeter) {
greeter({type: "stopped", id})
delete greeters[id]
}
log.console(`actor ${id} disconnected`)
if (couplings.has(id)) $_.stop()
delete peers[id]
}
function handle_message(msg) {
if (msg.target) {
if (msg.target !== cell.id) {
actor_mod.mailbox_push(msg.target, msg)
return
}
}
switch (msg.type) {
case "user":
var letter = msg.data
delete msg.data
letter[HEADER] = msg
if (msg.return) {
log.trace(`Received a message for the return id ${msg.return}`)
var fn = replies[msg.return]
if (!fn) throw new Error(`Could not find return function for message ${msg.return}`)
fn(letter)
delete replies[msg.return]
return
}
if (receive_fn) receive_fn(letter)
break
case "stop":
if (msg.id !== overling.__ACTORDATA__.id)
throw new Error(`Got a message from an actor ${msg.id} to stop...`)
destroyself()
break
case "contact":
if (portal_fn) {
var letter2 = msg.data
letter2[HEADER] = msg
delete msg.data
portal_fn(letter2)
} else throw new Error('Got a contact message, but no portal is established.')
break
case "stopped":
handle_actor_disconnect(msg.id)
break
case "greet":
var greeter = greeters[msg.actor.__ACTORDATA__.id]
if (greeter) greeter(msg)
break;
default:
if (receive_fn) receive_fn(msg)
break;
}
};
function enet_check()
{
if (portal) portal.service(handle_host)
$_.delay(enet_check, service_delay);
}
//enet_check();
// Finally, run the program
actor_mod.setname(cell.args.program)
var prog = null
var progPath = cell.args.program
if (io.exists(progPath + ACTOR_EXT) && !io.is_directory(progPath + ACTOR_EXT)) {
prog = progPath + ACTOR_EXT
} else if (io.exists(progPath) && io.is_directory(progPath)) {
var mainPath = progPath + '/main' + ACTOR_EXT
if (io.exists(mainPath) && !io.is_directory(mainPath)) {
prog = mainPath
}
}
if (!prog)
throw new Error(cell.args.program + " not found.");
// Mount the directory containing the program
var progDir = prog.substring(0, prog.lastIndexOf('/'))
if (progDir && progDir !== '.') {
io.mount(progDir, "")
}
var progContent = io.slurp(prog)
var prog_script = `(function ${cell.args.program.name()}_start($_) { ${progContent} })`
var val = js.eval(cell.args.program, prog_script)($_)
if (val)
throw new Error('Program must not return anything');
send_messages()
})()