// Hidden vars (os, actorsym, init, core_path, shop_path, analyze, run_ast_fn, run_ast_noopt_fn, json, use_cache, content_hash, cache_path, ensure_build_dir, compile_to_blob_fn) come from env // In actor spawn mode, also: nota, wota var ACTORDATA = actorsym var SYSYM = '__SYSTEM__' var _cell = {} var need_stop = false var cases = { Windows: '.dll', macOS: '.dylib', Linux: '.so' } var dylib_ext = cases[os.platform()] var MOD_EXT = '.cm' var ACTOR_EXT = '.ce' var load_internal = os.load_internal function use_embed(name) { return load_internal("js_" + name + "_use") } function logical(val1) { if (val1 == 0 || val1 == false || val1 == "false" || val1 == null) return false; if (val1 == 1 || val1 == true || val1 == "true") return true; return null; } function some(arr, pred) { return find(arr, pred) != null } function every(arr, pred) { return find(arr, x => not(pred(x))) == null } function starts_with(str, prefix) { return search(str, prefix) == 0 } function ends_with(str, suffix) { return search(str, suffix, -length(suffix)) != null } var fd = use_embed('fd') var js = use_embed('js') // core_path and shop_path come from env (bootstrap.cm passes them through) // shop_path may be null if --core was used without --shop var packages_path = shop_path ? shop_path + '/packages' : null use_cache['core/os'] = os // Extra env properties added as engine initializes (log, runtime fns, etc.) var core_extras = {} // Load a core module from the file system function use_core(path) { var cache_key = 'core/' + path var env = null if (use_cache[cache_key]) return use_cache[cache_key] var sym = use_embed(replace(path, '/', '_')) var result = null var script = null var ast = null var c_cache_key = null // If C embed exists, register it so .cm modules can use('internal/_c') if (sym) { c_cache_key = 'core/internal/' + path + '_c' if (!use_cache[c_cache_key]) use_cache[c_cache_key] = sym } // Build env: merge core_extras env = {use: use_core} arrfor(array(core_extras), function(k) { env[k] = core_extras[k] }) // Check for pre-compiled .cm.mach file first var mach_path = core_path + '/' + path + '.cm.mach' if (fd.is_file(mach_path)) { result = mach_load(fd.slurp(mach_path), env) use_cache[cache_key] = result return result } // Check for .cm.mcode JSON IR var mcode_path = core_path + '/' + path + '.cm.mcode' var mcode_blob = null var hash = null var cached_path = null var mach_blob = null var source_blob = null if (fd.is_file(mcode_path)) { mcode_blob = fd.slurp(mcode_path) hash = content_hash(mcode_blob) cached_path = cache_path(hash) if (cached_path && fd.is_file(cached_path)) { result = mach_load(fd.slurp(cached_path), env) } else { mach_blob = mach_compile_mcode_bin('core:' + path, text(mcode_blob)) if (cached_path) { ensure_build_dir() fd.slurpwrite(cached_path, mach_blob) } result = mach_load(mach_blob, env) } use_cache[cache_key] = result return result } // Fall back to source .cm file — compile at runtime var file_path = core_path + '/' + path + MOD_EXT if (fd.is_file(file_path)) { source_blob = fd.slurp(file_path) hash = content_hash(source_blob) cached_path = cache_path(hash) if (cached_path && fd.is_file(cached_path)) { result = mach_load(fd.slurp(cached_path), env) } else { script = text(source_blob) ast = analyze(script, file_path) mach_blob = compile_to_blob_fn('core:' + path, ast) if (cached_path) { ensure_build_dir() fd.slurpwrite(cached_path, mach_blob) } result = mach_load(mach_blob, env) } use_cache[cache_key] = result return result } // Embedded C module only use_cache[cache_key] = sym return sym } // Load full modules via use_core (extends C embeds with .cm additions, and caches) fd = use_core('fd') use_core('js') var blob = use_core('blob') function actor() { } var actor_mod = use_core('actor') var wota = use_core('wota') var nota = use_core('nota') function is_actor(value) { return is_object(value) && value[ACTORDATA] } var ENETSERVICE = 0.1 var REPLYTIMEOUT = 60 // seconds before replies are ignored function caller_data(depth) { return {file: "nofile", line: 0} } function console_rec(line, file, msg) { return `[${text(_cell.id, 0, 5)}] [${file}:${line}]: ${msg}\n` // time: [${time.text("mb d yyyy h:nn:ss")}] } function log(name, args) { var caller = caller_data(1) var msg = args[0] if (name == 'console') { os.print(console_rec(caller.line, caller.file, msg)) } else if (name == 'error') { if (msg == null) msg = "error" os.print(console_rec(caller.line, caller.file, msg)) } else if (name == 'system') { msg = "[SYSTEM] " + msg os.print(console_rec(caller.line, caller.file, msg)) } else { log.console(`unknown log type: ${name}`) } } function actor_die(err) { var reason = null var unders = null if (err && is_function(err.toString)) { os.print(err.toString()) os.print("\n") if (err.stack) os.print(err.stack) } if (overling) { if (err) { // with an err, this is a forceful disrupt reason = err report_to_overling({type:'disrupt', reason}) } else report_to_overling({type:'stop'}) } if (underlings) { unders = array(underlings) arrfor(unders, function(id, index) { log.console(`calling on ${id} to disrupt too`) $_.stop(create_actor({id})) }) } if (err) { if (err.message) log.console(err.message) if (err.stack) log.console(err.stack) } actor_mod["disrupt"]() } //actor_mod.on_exception(actor_die) _cell.args = init != null ? init : {} _cell.id = "newguy" function create_actor(desc) { var _desc = desc == null ? {id:guid()} : desc var actor = {} actor[ACTORDATA] = _desc return actor } var $_ = {} $_.self = create_actor() use_cache['core/json'] = json // Create runtime_env early (empty) — filled after pronto loads. // Shop accesses it lazily (in inject_env, called at module-use time, not load time) // so it sees the filled version. var runtime_env = {} // Populate core_extras with everything shop (and other core modules) need core_extras.use_cache = use_cache core_extras.shop_path = shop_path core_extras.analyze = analyze core_extras.run_ast_fn = run_ast_fn core_extras.run_ast_noopt_fn = run_ast_noopt_fn core_extras.core_json = json core_extras.actor_api = $_ core_extras.runtime_env = runtime_env core_extras.content_hash = content_hash core_extras.cache_path = cache_path core_extras.ensure_build_dir = ensure_build_dir // NOW load shop — it receives all of the above via env var shop = use_core('internal/shop') var time = use_core('time') var pronto = use_core('pronto') var fallback = pronto.fallback var parallel = pronto.parallel var race = pronto.race var sequence = pronto.sequence // Fill runtime_env (same object reference shop holds) runtime_env.logical = logical runtime_env.some = some runtime_env.every = every runtime_env.starts_with = starts_with runtime_env.ends_with = ends_with runtime_env.actor = actor runtime_env.is_actor = is_actor runtime_env.log = log runtime_env.send = send runtime_env.fallback = fallback runtime_env.parallel = parallel runtime_env.race = race runtime_env.sequence = sequence // Make runtime functions available to modules loaded via use_core arrfor(array(runtime_env), function(k) { core_extras[k] = runtime_env[k] }) $_.time_limit = function(requestor, seconds) { if (!pronto.is_requestor(requestor)) { log.error('time_limit: first argument must be a requestor') disrupt } if (!is_number(seconds) || seconds <= 0) { log.error('time_limit: seconds must be a positive number') disrupt } return function time_limit_requestor(callback, value) { pronto.check_callback(callback, 'time_limit') var finished = false var requestor_cancel = null var timer_cancel = null function cancel(reason) { if (finished) return finished = true if (timer_cancel) { timer_cancel() timer_cancel = null } if (requestor_cancel) { requestor_cancel(reason) requestor_cancel = null } } function safe_cancel_requestor(reason) { if (requestor_cancel) { requestor_cancel(reason) requestor_cancel = null } } timer_cancel = $_.delay(function() { if (finished) return def reason = make_reason(factory, 'Timeout.', seconds) safe_cancel_requestor(reason) finished = true callback(null, reason) }, seconds) function do_request() { requestor_cancel = requestor(function(val, reason) { if (finished) return finished = true if (timer_cancel) { timer_cancel() timer_cancel = null } callback(val, reason) }, value) } disruption { cancel('requestor failed') callback(null, 'requestor failed') } do_request() return function(reason) { safe_cancel_requestor(reason) } } } var config = { ar_timer: 60, actor_memory:0, net_service:0.1, reply_timeout:60, main: true } _cell.config = config ENETSERVICE = config.net_service REPLYTIMEOUT = config.reply_timeout /* When handling a message, the message appears like this: { type: type of message - contact: used for contact messages - stop: used to issue stop command - etc reply: ID this message will respond to (callback saved on the actor) replycc: the actor that is waiting for the reply target: ID of the actor that's supposed to receive the message. Only added to non direct sends (out of portals) return: reply ID so the replycc actor can know what callback to send the message to data: the actual content of the message } actors look like { id: the GUID of this actor address: the IP this actor can be found at port: the port of the IP the actor can be found at } */ function guid(bits) { var _bits = bits == null ? 256 : bits var guid = blob(_bits, os.random) stone(guid) return text(guid,'h') } var HEADER = {} // takes a function input value that will eventually be called with the current time in number form. $_.clock = function(fn) { actor_mod.clock(_ => { fn(time.number()) send_messages() }) } var underlings = {} // this is more like "all actors that are notified when we die" var overling = null var root = null var receive_fn = null var greeters = {} // Router functions for when messages are received for a specific actor function peer_connection(peer) { return { latency: peer.rtt, bandwidth: { incoming: peer.incoming_bandwidth, outgoing: peer.outgoing_bandwidth }, activity: { last_sent: peer.last_send_time, last_received: peer.last_receive_time }, mtu: peer.mtu, data: { incoming_total: peer.incoming_data_total, outgoing_total: peer.outgoing_data_total, reliable_in_transit: peer.reliable_data_in_transit }, latency_variance: peer.rtt_variance, packet_loss: peer.packet_loss, state: peer.state } } // takes a callback function, an actor object, and a configuration record for getting information about the status of a connection to the actor. The configuration record is used to request the sort of information that needs to be communicated. This can include latency, bandwidth, activity, congestion, cost, partitions. The callback is given a record containing the requested information. $_.connection = function(callback, actor, config) { var peer = peers[actor[ACTORDATA].id] if (peer) { callback(peer_connection(peer)) return } if (actor_mod.mailbox_exist(actor[ACTORDATA].id)) { callback({type:"local"}) return } callback() } var peers = {} var id_address = {} var peer_queue = {} var portal = null var portal_fn = null // takes a function input value that will eventually be called with the current time in number form. $_.portal = function(fn, port) { if (portal) { log.error(`Already started a portal listening on ${portal.port}`) disrupt } if (!port) { log.error("Requires a valid port.") disrupt } log.system(`starting a portal on port ${port}`) portal = enet.create_host({address: "any", port}) portal_fn = fn } function handle_host(e) { var queue = null var data = null if (e.type == "connect") { log.system(`connected a new peer: ${e.peer.address}:${e.peer.port}`) peers[`${e.peer.address}:${e.peer.port}`] = e.peer queue = peer_queue.get(e.peer) if (queue) { arrfor(queue, (msg, index) => e.peer.send(nota.encode(msg))) log.system(`sent queue out of queue`) peer_queue.delete(e.peer) } } else if (e.type == "disconnect") { peer_queue.delete(e.peer) arrfor(array(peers), function(id, index) { if (peers[id] == e.peer) delete peers[id] }) log.system('portal got disconnect from ' + e.peer.address + ":" + e.peer.port) } else if (e.type == "receive") { data = nota.decode(e.data) if (data.replycc && !data.replycc.address) { data.replycc[ACTORDATA].address = e.peer.address data.replycc[ACTORDATA].port = e.peer.port } if (data.data) populate_actor_addresses(data.data, e) turn(data) } } function populate_actor_addresses(obj, e) { if (!is_object(obj)) return if (obj[ACTORDATA] && !obj[ACTORDATA].address) { obj[ACTORDATA].address = e.peer.address obj[ACTORDATA].port = e.peer.port } arrfor(array(obj), function(key, index) { if (key in obj) populate_actor_addresses(obj[key], e) }) } // takes a callback function, an actor object, and a configuration record for getting information about the status of a connection to the actor. The configuration record is used to request the sort of information that needs to be communicated. This can include latency, bandwidth, activity, congestion, cost, partitions. The callback is given a record containing the requested information. $_.contact = function(callback, record) { send(create_actor(record), record, callback) } // registers a function that will receive all messages... $_.receiver = function receiver(fn) { receive_fn = fn } $_.start = function start(cb, program) { if (!program) return var id = guid() var startup = { id, overling: $_.self, root, program, } greeters[id] = cb push(message_queue, { startup }) } // stops an underling or self. $_.stop = function stop(actor) { if (!actor) { need_stop = true return } if (!is_actor(actor)) { log.error('Can only call stop on an actor.') disrupt } if (is_null(underlings[actor[ACTORDATA].id])) { log.error('Can only call stop on an underling or self.') disrupt } sys_msg(actor, {kind:"stop"}) } // schedules the removal of an actor after a specified amount of time. $_.unneeded = function unneeded(fn, seconds) { actor_mod.unneeded(fn, seconds) } // schedules the invocation of a function after a specified amount of time. $_.delay = function delay(fn, seconds) { var _seconds = seconds == null ? 0 : seconds function delay_turn() { fn() send_messages() } var id = actor_mod.delay(delay_turn, _seconds) return function() { actor_mod.removetimer(id) } } var enet = use_core('enet') // causes this actor to stop when another actor stops. var couplings = {} $_.couple = function couple(actor) { if (actor == $_.self) return // can't couple to self couplings[actor[ACTORDATA].id] = true sys_msg(actor, {kind:'couple', from: $_.self}) log.system(`coupled to ${actor}`) } function actor_prep(actor, send) { push(message_queue, {actor,send}); } // Send a message immediately without queuing function actor_send_immediate(actor, send) { actor_send(actor, send) } function actor_send(actor, message) { var wota_blob = null var peer = null if (actor[HEADER] && !actor[HEADER].replycc) // attempting to respond to a message but sender is not expecting; silently drop return if (!is_actor(actor) && !is_actor(actor.replycc)) { log.error(`Must send to an actor object. Attempted send to ${actor}`) disrupt } if (!is_object(message)) { log.error('Must send an object record.') disrupt } // message to self if (actor[ACTORDATA].id == _cell.id) { if (receive_fn) receive_fn(message.data) return } // message to actor in same flock if (actor[ACTORDATA].id && actor_mod.mailbox_exist(actor[ACTORDATA].id)) { wota_blob = wota.encode(message) actor_mod.mailbox_push(actor[ACTORDATA].id, wota_blob) return } if (actor[ACTORDATA].address) { if (actor[ACTORDATA].id) message.target = actor[ACTORDATA].id else message.type = "contact" peer = peers[actor[ACTORDATA].address + ":" + actor[ACTORDATA].port] if (!peer) { if (!portal) { log.system(`creating a contactor ...`) portal = enet.create_host({address:"any"}) log.system(`allowing contact to port ${portal.port}`) } log.system(`no peer! connecting to ${actor[ACTORDATA].address}:${actor[ACTORDATA].port}`) peer = portal.connect(actor[ACTORDATA].address, actor[ACTORDATA].port) peer_queue.set(peer, [message]) } else { peer.send(nota.encode(message)) } return } log.system(`Unable to send message to actor ${actor[ACTORDATA]}`) } // Holds all messages queued during the current turn. var message_queue = [] function send_messages() { // if we've been flagged to stop, bail out before doing anything if (need_stop) { actor_die() message_queue = [] return } arrfor(message_queue, function(msg, index) { if (msg.startup) { // now is the time to actually spin up the actor actor_mod.createactor(msg.startup) } else { actor_send(msg.actor, msg.send) } }) message_queue = [] } var replies = {} function send(actor, message, reply) { var send_msg = null var target = null var header = null var id = null if (!is_object(actor)) { log.error(`Must send to an actor object. Provided: ${actor}`) disrupt } if (!is_object(message)) { log.error('Message must be an object') disrupt } send_msg = {type:"user", data: message} target = actor if (actor[HEADER] && actor[HEADER].replycc) { header = actor[HEADER] if (!header.replycc || !is_actor(header.replycc)) { log.error(`Supplied actor had a return, but it's not a valid actor! ${actor[HEADER]}`) disrupt } target = header.replycc send_msg.return = header.reply } if (reply) { id = guid() replies[id] = reply $_.delay(_ => { if (replies[id]) { replies[id](null, "timeout") delete replies[id] } }, REPLYTIMEOUT) send_msg.reply = id send_msg.replycc = $_.self } // Instead of sending immediately, queue it actor_prep(target, send_msg); } stone(send) if (!_cell.args.id) _cell.id = guid() else _cell.id = _cell.args.id $_.self[ACTORDATA].id = _cell.id // Actor's timeslice for processing a single message function turn(msg) { var mes = wota.decode(msg) handle_message(mes) send_messages() } //log.console(`FIXME: need to get main from config, not just set to true`) actor_mod.register_actor(_cell.id, turn, true, config.ar_timer) if (config.actor_memory) js.mem_limit(config.actor_memory) if (config.stack_max) js.max_stacksize(config.system.stack_max); overling = _cell.args.overling $_.overling = overling root = _cell.args.root if (root == null) root = $_.self if (overling) { $_.couple(overling) // auto couple to overling report_to_overling({type:'greet', actor: $_.self}) } // sys messages are always dispatched immediately function sys_msg(actor, msg) { actor_send(actor, {[SYSYM]:msg}) } // messages sent to here get put into the cb provided to start function report_to_overling(msg) { if (!overling) return sys_msg(overling, {kind:'underling', message:msg, from: $_.self}) } // Determine the program to run from command line var program = _cell.args.program if (!program) { log.error('No program specified. Usage: cell [args...]') os.exit(1) } function handle_actor_disconnect(id) { var greeter = greeters[id] if (greeter) { greeter({type: "stopped", id}) delete greeters[id] } log.system(`actor ${id} disconnected`) if (!is_null(couplings[id])) actor_die("coupled actor died") // couplings now disrupts instead of stop } function handle_sysym(msg) { var from = null var greeter = null var letter2 = null if (msg.kind == 'stop') { actor_die("got stop message") } else if (msg.kind == 'underling') { from = msg.from greeter = greeters[from[ACTORDATA].id] if (greeter) greeter(msg.message) if (msg.message.type == 'disrupt') delete underlings[from[ACTORDATA].id] } else if (msg.kind == 'contact') { if (portal_fn) { letter2 = msg.data letter2[HEADER] = msg delete msg.data portal_fn(letter2) } else { log.error('Got a contact message, but no portal is established.') disrupt } } else if (msg.kind == 'couple') { // from must be notified when we die from = msg.from underlings[from[ACTORDATA].id] = true log.system(`actor ${from} is coupled to me`) } } function handle_message(msg) { var letter = null var fn = null if (msg[SYSYM]) { handle_sysym(msg[SYSYM], msg.from) return } if (msg.type == "user") { letter = msg.data // what the sender really sent _ObjectDefineProperty(letter, HEADER, { value: msg, enumerable: false }) _ObjectDefineProperty(letter, ACTORDATA, { // this is so is_actor == true value: { reply: msg.reply }, enumerable: false }) if (msg.return) { fn = replies[msg.return] if (fn) fn(letter) delete replies[msg.return] return } if (receive_fn) receive_fn(letter) } else if (msg.type == "stopped") { handle_actor_disconnect(msg.id) } } function enet_check() { if (portal) portal.service(handle_host) $_.delay(enet_check, ENETSERVICE); } // enet_check(); // Finally, run the program actor_mod.setname(_cell.args.program) var prog = _cell.args.program if (ends_with(prog, '.cm')) { os.print(`error: ${prog} is a module (.cm), not a program (.ce)\n`) os.exit(1) } if (ends_with(prog, '.ce')) prog = text(prog, 0, -3) var package = use_core('package') // Find the .ce file var prog_path = prog + ".ce" var pkg_dir = null var core_dir = null if (!fd.is_file(prog_path)) { pkg_dir = package.find_package_dir(prog_path) if (pkg_dir) prog_path = pkg_dir + '/' + prog + '.ce' } if (!fd.is_file(prog_path)) { // Check core packages core_dir = core_path prog_path = core_dir + '/' + prog + '.ce' } if (!fd.is_file(prog_path)) { os.print(`Main program ${prog} could not be found\n`) os.exit(1) } $_.clock(_ => { var file_info = shop.file_info ? shop.file_info(prog_path) : null var inject = shop.script_inject_for ? shop.script_inject_for(file_info) : [] // Build env with runtime functions + capability injections var env = {} arrfor(array(runtime_env), function(k) { env[k] = runtime_env[k] }) var _ki = 0 var inj = null var key = null while (_ki < length(inject)) { inj = inject[_ki] key = inj if (key && key[0] == '$') key = text(key, 1) if (key == 'fd') env['$fd'] = fd else env['$' + key] = $_[key] _ki = _ki + 1 } var pkg = file_info ? file_info.package : null env.use = function(path) { var ck = 'core/' + path if (use_cache[ck]) return use_cache[ck] var core_mod = use_core(path) if (core_mod) return core_mod return shop.use(path, pkg) } env.args = _cell.args.arg env.log = log var source_blob = fd.slurp(prog_path) var hash = content_hash(source_blob) var cached_path = cache_path(hash) var val = null var script = null var ast = null var mach_blob = null if (cached_path && fd.is_file(cached_path)) { val = mach_load(fd.slurp(cached_path), env) } else { script = text(source_blob) ast = analyze(script, prog_path) mach_blob = compile_to_blob_fn(prog, ast) if (cached_path) { ensure_build_dir() fd.slurpwrite(cached_path, mach_blob) } val = mach_load(mach_blob, env) } if (val) { log.error('Program must not return anything') disrupt } })