(function engine() { var ACTORDATA = cell.hidden.actorsym var SYSYM = '__SYSTEM__' var hidden = cell.hidden var os = hidden.os; cell.os = null var dylib_ext switch(os.platform()) { case 'Windows': dylib_ext = '.dll'; break; case 'macOS': dylib_ext = '.dylib'; break; case 'Linux': dylib_ext = '.so'; break; } var MOD_EXT = '.cm' var ACTOR_EXT = '.ce' var load_internal = os.load_internal function use_embed(name) { return load_internal(`js_${name}_use`) } globalThis.use = use_embed globalThis.meme = function(obj) { return { __proto__: obj } } globalThis.clone = function(obj) { return { __proto__: obj.__proto__, ...obj } } var qop = use_embed('qop') var core_qop = qop.open(hidden.core_qop_blob) var utf8 = use_embed('utf8') var js = use_embed('js') var use_cache = {} function use_core(path) { var cache_path = `2::${path}`; if (use_cache[cache_path]) return use_cache[cache_path]; var sym = use_embed(path) var script = core_qop.read(path + MOD_EXT); if (script) { script = utf8.decode(script) var mod = `(function setup_${path}_module($_){${script}})` var fn = js.eval(path, mod) var result = fn.call(sym); use_cache[cache_path] = result; return result; } use_cache[cache_path] = sym; return sym; } globalThis.use = use_core var blob = use('blob') var blob_stone = blob.prototype.stone var blob_stonep = blob.prototype.stonep; delete blob.prototype.stone; delete blob.prototype.stonep; function deepFreeze(object) { if (object instanceof blob) blob_stone.call(object); var propNames = Object.keys(object); for (var name of propNames) { var value = object[name]; if ((value && typeof value == "object") || typeof value == "function") deepFreeze(value); } return Object.freeze(object); } globalThis.stone = deepFreeze stone.p = function(object) { if (object instanceof blob) return blob_stonep.call(object) return Object.isFrozen(object) } var actor_mod = use('actor') var wota = use('wota') var nota = use('nota') globalThis.text = use('text') var ENETSERVICE = 0.1 var REPLYTIMEOUT = 60 // seconds before replies are ignored globalThis.pi = 3.1415926535897932 function caller_data(depth = 0) { var file = "nofile" var line = 0 var caller = new Error().stack.split("\n")[1+depth] if (caller) { var md = caller.match(/\((.*)\:/) var m = md ? md[1] : "SCRIPT" if (m) file = m md = caller.match(/\:(\d*)\)/) m = md ? md[1] : 0 if (m) line = m } return {file,line} } function console_rec(line, file, msg) { return `[${cell.id.slice(0,5)}] [${file}:${line}]: ${msg}\n` // time: [${time.text("mb d yyyy h:nn:ss")}] } globalThis.log = {} log.console = function(msg) { var caller = caller_data(1) os.print(console_rec(caller.line, caller.file, msg)) } log.error = function(msg = new Error()) { var caller = caller_data(1) if (msg instanceof Error) msg = msg.name + ": " + msg.message + "\n" + msg.stack os.print(console_rec(caller.line,caller.file,msg)) } log.system = function(msg) { msg = "[SYSTEM] " + msg log.console(msg) } function disrupt(err) { if (overling) { if (err) { // with an err, this is a forceful disrupt var reason = (err instanceof Error) ? err.stack : err report_to_overling({type:'disrupt', reason}) } else report_to_overling({type:'stop'}) } if (underlings) { for (var id of underlings) { log.console(`calling on ${id} to disrupt too`) $_.stop(create_actor({id})) } } if (err) { log.console(err); if (err.stack) log.console(err.stack) } actor_mod.disrupt() } actor_mod.on_exception(disrupt) cell.args = cell.hidden.init cell.args ??= {} cell.id ??= "newguy" function create_actor(desc = {id:guid()}) { var actor = {} actor[ACTORDATA] = desc return actor } var $_ = create_actor() var shop = use('shop') os.core_qop = core_qop os.use_cache = use_cache shop.set_os(os, $_) globalThis.use = shop.use var config = shop.load_config() // Get package name from a resolved path function get_package_from_path(path) { if (!path) return null var modules_prefix = '.cell/modules/' if (path.startsWith(modules_prefix)) { var rest = path.substring(modules_prefix.length) var slash_idx = rest.indexOf('/') if (slash_idx > 0) { return rest.substring(0, slash_idx) } } return null } globalThis.json = use('json') var time = use('time') var st_now = time.number() var default_config = { ar_timer: 60, actor_memory:0, net_service:0.1, reply_timeout:60, main: false, } config ??= {} config.system ??= {} config.system.__proto__ = default_config cell.config = config ENETSERVICE = config.system.net_service REPLYTIMEOUT = config.system.reply_timeout /* When handling a message, the message appears like this: { type: type of message - contact: used for contact messages - stop: used to issue stop command - etc reply: ID this message will respond to (callback saved on the actor) replycc: the actor that is waiting for the reply target: ID of the actor that's supposed to receive the message. Only added to non direct sends (out of portals) return: reply ID so the replycc actor can know what callback to send the message to data: the actual content of the message } actors look like { id: the GUID of this actor address: the IP this actor can be found at port: the port of the IP the actor can be found at } */ function guid(bits = 256) { var guid = new blob(bits, os.random) stone(guid) return text(guid,'h') } var HEADER = Symbol() // returns a number between 0 and 1. There is a 50% chance that the result is less than 0.5. $_.random = function() { var n = os.random() return n > 100000 } $_.random_fit = os.random // takes a function input value that will eventually be called with the current time in number form. $_.clock = function(fn) { actor_mod.clock(_ => { fn(time.number()) send_messages() }) } var underlings = new Set() // this is more like "all actors that are notified when we die" var overling = null var root = null var receive_fn = null var greeters = {} // Router functions for when messages are received for a specific actor globalThis.is_actor = function is_actor(actor) { return actor[ACTORDATA] } function peer_connection(peer) { return { latency: peer.rtt, bandwidth: { incoming: peer.incoming_bandwidth, outgoing: peer.outgoing_bandwidth }, activity: { last_sent: peer.last_send_time, last_received: peer.last_receive_time }, mtu: peer.mtu, data: { incoming_total: peer.incoming_data_total, outgoing_total: peer.outgoing_data_total, reliable_in_transit: peer.reliable_data_in_transit }, latency_variance: peer.rtt_variance, packet_loss: peer.packet_loss, state: peer.state } } // takes a callback function, an actor object, and a configuration record for getting information about the status of a connection to the actor. The configuration record is used to request the sort of information that needs to be communicated. This can include latency, bandwidth, activity, congestion, cost, partitions. The callback is given a record containing the requested information. $_.connection = function(callback, actor, config) { var peer = peers[actor[ACTORDATA].id] if (peer) { callback(peer_connection(peer)) return } if (actor_mod.mailbox_exist(actor[ACTORDATA].id)) { callback({type:"local"}) return } callback() } var peers = {} var id_address = {} var peer_queue = {} var portal = null var portal_fn = null // takes a function input value that will eventually be called with the current time in number form. $_.portal = function(fn, port) { if (portal) throw new Error(`Already started a portal listening on ${portal.port}`) if (!port) throw new Error("Requires a valid port.") log.system(`starting a portal on port ${port}`) portal = enet.create_host({address: "any", port}) portal_fn = fn } function handle_host(e) { switch (e.type) { case "connect": log.system(`connected a new peer: ${e.peer.address}:${e.peer.port}`) peers[`${e.peer.address}:${e.peer.port}`] = e.peer var queue = peer_queue.get(e.peer) if (queue) { for (var msg of queue) e.peer.send(nota.encode(msg)) log.system(`sent ${json.encode(msg)} out of queue`) peer_queue.delete(e.peer) } break case "disconnect": peer_queue.delete(e.peer) for (var id in peers) if (peers[id] == e.peer) delete peers[id] log.system('portal got disconnect from ' + e.peer.address + ":" + e.peer.port) break case "receive": var data = nota.decode(e.data) if (data.replycc && !data.replycc.address) { data.replycc[ACTORDATA].address = e.peer.address data.replycc[ACTORDATA].port = e.peer.port } function populate_actor_addresses(obj) { if (typeof obj != 'object' || obj == null) return if (obj[ACTORDATA] && !obj[ACTORDATA].address) { obj[ACTORDATA].address = e.peer.address obj[ACTORDATA].port = e.peer.port } for (var key in obj) { if (obj.hasOwnProperty(key)) { populate_actor_addresses(obj[key]) } } } if (data.data) populate_actor_addresses(data.data) turn(data) break } } // takes a callback function, an actor object, and a configuration record for getting information about the status of a connection to the actor. The configuration record is used to request the sort of information that needs to be communicated. This can include latency, bandwidth, activity, congestion, cost, partitions. The callback is given a record containing the requested information. $_.contact = function(callback, record) { send(create_actor(record), record, callback) } // registers a function that will receive all messages... $_.receiver = function receiver(fn) { receive_fn = fn } $_.start = function start(cb, program, ...args) { if (!program) return var id = guid() if (args.length == 1 && Array.isArray(args[0])) args = args[0] var startup = { id, overling: $_, root, arg: args, program, } greeters[id] = cb message_queue.push({ startup }) } // stops an underling or self. $_.stop = function stop(actor) { if (!actor) { need_stop = true return } if (!is_actor(actor)) throw new Error('Can only call stop on an actor.') if (!underlings.has(actor[ACTORDATA].id)) throw new Error('Can only call stop on an underling or self.') sys_msg(actor, {kind:"stop"}) } // schedules the removal of an actor after a specified amount of time. $_.unneeded = function unneeded(fn, seconds) { actor_mod.unneeded(fn, seconds) } // schedules the invocation of a function after a specified amount of time. $_.delay = function delay(fn, seconds = 0) { if (seconds <= 0) { $_.clock(fn) return } function delay_turn() { fn() send_messages() } var id = actor_mod.delay(delay_turn, seconds) return function() { actor_mod.removetimer(id) } } var enet = use('enet') // causes this actor to stop when another actor stops. var couplings = new Set() $_.couple = function couple(actor) { if (actor == $_) return // can't couple to self couplings.add(actor[ACTORDATA].id) sys_msg(actor, {kind:'couple', from: $_}) log.system(`coupled to ${actor}`) } function actor_prep(actor, send) { message_queue.push({actor,send}); } // Send a message immediately without queuing function actor_send_immediate(actor, send) { try { actor_send(actor, send); } catch (err) { log.error("Failed to send immediate message:", err); } } function actor_send(actor, message) { if (actor[HEADER] && !actor[HEADER].replycc) // attempting to respond to a message but sender is not expecting; silently drop return if (!is_actor(actor) && !is_actor(actor.replycc)) throw new Error(`Must send to an actor object. Attempted send to ${json.encode(actor)}`) if (typeof message != 'object') throw new Error('Must send an object record.') // message to self if (actor[ACTORDATA].id == cell.id) { if (receive_fn) receive_fn(message.data) return } // message to actor in same flock if (actor[ACTORDATA].id && actor_mod.mailbox_exist(actor[ACTORDATA].id)) { var wota_blob = wota.encode(message) // log.console(`sending wota blob of ${wota_blob.length/8} bytes`) actor_mod.mailbox_push(actor[ACTORDATA].id, wota_blob) return } if (actor[ACTORDATA].address) { if (actor[ACTORDATA].id) message.target = actor[ACTORDATA].id else message.type = "contact" var peer = peers[actor[ACTORDATA].address + ":" + actor[ACTORDATA].port] if (!peer) { if (!portal) { log.system(`creating a contactor ...`) portal = enet.create_host({address:"any"}) log.system(`allowing contact to port ${portal.port}`) } log.system(`no peer! connecting to ${actor[ACTORDATA].address}:${actor[ACTORDATA].port}`) peer = portal.connect(actor[ACTORDATA].address, actor[ACTORDATA].port) peer_queue.set(peer, [message]) } else { peer.send(nota.encode(message)) } return } log.system(`Unable to send message to actor ${json.encode(actor[ACTORDATA])}`) } // Holds all messages queued during the current turn. var message_queue = [] var need_stop = false function send_messages() { // if we've been flagged to stop, bail out before doing anything if (need_stop) { disrupt() message_queue.length = 0 return } for (var msg of message_queue) { if (msg.startup) { // now is the time to actually spin up the actor actor_mod.createactor(msg.startup) } else { actor_send(msg.actor, msg.send) } } message_queue.length = 0 } var replies = {} globalThis.send = function send(actor, message, reply) { if (typeof actor != 'object') throw new Error(`Must send to an actor object. Provided: ${json.encode(actor)}`); if (typeof message != 'object') throw new Error('Message must be an object') var send = {type:"user", data: message} if (actor[HEADER] && actor[HEADER].replycc) { var header = actor[HEADER] if (!header.replycc || !is_actor(header.replycc)) throw new Error(`Supplied actor had a return, but it's not a valid actor! ${json.encode(actor[HEADER])}`) actor = header.replycc send.return = header.reply } if (reply) { var id = guid() replies[id] = reply $_.delay(_ => { if (replies[id]) { replies[id](null, "timeout") delete replies[id] } }, REPLYTIMEOUT) send.reply = id send.replycc = $_ } // Instead of sending immediately, queue it actor_prep(actor,send); } stone(send) if (!cell.args.id) cell.id = guid() else cell.id = cell.args.id $_[ACTORDATA].id = cell.id // Actor's timeslice for processing a single message function turn(msg) { var mes = wota.decode(msg) handle_message(mes) send_messages() } //log.console(`FIXME: need to get main from config, not just set to true`) //log.console(`FIXME: actors need the truncated use function as well`) //log.console(`FIXME: remove global access (ie globalThis.use)`) //log.console(`FIXME: add freeze/unfreeze at this level, so we can do it (but scripts cannot)`) actor_mod.register_actor(cell.id, turn, true, config.system.ar_timer) if (config.system.actor_memory) js.mem_limit(config.system.actor_memory) if (config.system.stack_max) js.max_stacksize(config.system.stack_max); overling = cell.args.overling root = cell.args.root root ??= $_ if (overling) { $_.couple(overling) // auto couple to overling report_to_overling({type:'greet', actor: $_}) } // sys messages are always dispatched immediately function sys_msg(actor, msg) { actor_send(actor, {[SYSYM]:msg}) } // messages sent to here get put into the cb provided to start function report_to_overling(msg) { if (!overling) return sys_msg(overling, {kind:'underling', message:msg, from: $_}) } // Check for embedded entry point configuration var entry_config = null var entry_data = core_qop.read('__entry__.json') if (entry_data) { try { entry_config = json.decode(text(utf8.decode(entry_data))) } catch(e) {} } // Determine the program to run // Priority: 1. command line arg, 2. embedded entry point, 3. error var program = cell.args.program if (!program && entry_config && entry_config.entry) { program = entry_config.entry cell.args.program = program } // Store static_only mode for shop.cm to use if (entry_config && entry_config.static_only) { cell.static_only = true } if (!program) os.exit(1) function handle_actor_disconnect(id) { var greeter = greeters[id] if (greeter) { greeter({type: "stopped", id}) delete greeters[id] } log.system(`actor ${id} disconnected`) if (couplings.has(id)) disrupt("coupled actor died") // couplings now disrupts instead of stop } function handle_sysym(msg) { switch(msg.kind) { case 'stop': disrupt("got stop message") break case 'underling': var from = msg.from var greeter = greeters[from[ACTORDATA].id] if (greeter) greeter(msg.message) if (msg.message.type == 'disrupt') underlings.delete(from[ACTORDATA].id) break case 'contact': if (portal_fn) { var letter2 = msg.data letter2[HEADER] = msg delete msg.data portal_fn(letter2) } else throw new Error('Got a contact message, but no portal is established.') break case 'couple': // from must be notified when we die var from = msg.from underlings.add(from[ACTORDATA].id) log.system(`actor ${from} is coupled to me`) break } } function handle_message(msg) { if (msg[SYSYM]) { handle_sysym(msg[SYSYM], msg.from) return } switch (msg.type) { case "user": var letter = msg.data // what the sender really sent Object.defineProperty(letter, HEADER, { value: msg, enumerable: false }) Object.defineProperty(letter, ACTORDATA, { // this is so is_actor == true value: { reply: msg.reply }, enumerable: false }) if (msg.return) { var fn = replies[msg.return] if (fn) fn(letter) delete replies[msg.return] return } if (receive_fn) receive_fn(letter) return case "stopped": handle_actor_disconnect(msg.id) break } }; function enet_check() { if (portal) portal.service(handle_host) $_.delay(enet_check, ENETSERVICE); } // enet_check(); var init_end = time.number() var load_program_start = time.number() // Finally, run the program actor_mod.setname(cell.args.program) var prog = cell.args.program // Resolve the main program path var locator = shop.resolve_locator(cell.args.program, ACTOR_EXT, null) if (!locator) throw new Error(`Main program ${cell.args.program} could not be found`) $_.clock(_ => { var val = locator.symbol.call(null, $_, cell.args.arg); if (val) throw new Error('Program must not return anything'); }) })()