Files
cell/internal/engine.cm

1582 lines
42 KiB
Plaintext

// Hidden vars (os, actorsym, init, core_path, shop_path, json, args) come from env
// Engine is self-sufficient: defines its own compilation pipeline
var ACTORDATA = actorsym
var native_mode = false
var SYSYM = '__SYSTEM__'
var _cell = {}
var need_stop = false
var cases = {
Windows: '.dll',
macOS: '.dylib',
Linux: '.so'
}
var dylib_ext = cases[os.platform()]
var MOD_EXT = '.cm'
var ACTOR_EXT = '.ce'
var load_internal = os.load_internal
function use_embed(name) {
return load_internal("js_core_" + name + "_use")
}
var fd = use_embed('internal_fd')
var js = use_embed('js')
var crypto = use_embed('internal_crypto')
// core_path and shop_path come from env (C runtime passes them through)
// shop_path may be null if --core was used without --shop
var packages_path = shop_path ? shop_path + '/packages' : null
// Self-sufficient initialization: content-addressed cache
var use_cache = {}
function content_hash(content) {
var data = content
if (!is_blob(data)) data = stone(blob(text(data)))
return text(crypto.blake2(data), 'h')
}
function cache_path(hash) {
if (!shop_path) return null
return shop_path + '/build/' + hash
}
function ensure_build_dir() {
if (!shop_path) return null
var dir = shop_path + '/build'
if (!fd.is_dir(dir)) fd.mkdir(dir)
return dir
}
// Load a boot seed module (for compiling pipeline modules on cache miss)
function boot_load(name) {
var mcode_path = core_path + '/boot/' + name + '.cm.mcode'
var mcode_blob = null
var mach_blob = null
if (!fd.is_file(mcode_path)) {
os.print("error: missing boot seed: " + name + "\n")
disrupt
}
mcode_blob = fd.slurp(mcode_path)
mach_blob = mach_compile_mcode_bin(name, text(mcode_blob))
return mach_load(mach_blob, stone({use: use_embed}))
}
// Load a pipeline module from cache; on miss compile from source via boot chain
function load_pipeline_module(name, env) {
var source_path = core_path + '/' + name + '.cm'
var source_blob = null
var hash = null
var cached = null
var mcode_path = null
var mcode_blob = null
var mach_blob = null
var src = null
var boot_tok = null
var boot_par = null
var boot_fld = null
var boot_mc = null
var boot_sl = null
var tok_result = null
var ast = null
var compiled = null
var mcode_json = null
if (fd.is_file(source_path)) {
source_blob = fd.slurp(source_path)
hash = content_hash(source_blob)
cached = cache_path(hash)
if (cached && fd.is_file(cached))
return mach_load(fd.slurp(cached), env)
// Cache miss: compile from source using boot seed pipeline
mcode_path = core_path + '/boot/' + name + '.cm.mcode'
if (fd.is_file(mcode_path)) {
boot_tok = boot_load("tokenize")
boot_par = boot_load("parse")
boot_fld = boot_load("fold")
boot_mc = boot_load("mcode")
src = text(source_blob)
tok_result = boot_tok(src, source_path)
ast = boot_par(tok_result.tokens, src, source_path, boot_tok)
if (ast.errors != null && length(ast.errors) > 0) {
os.print("error: failed to compile pipeline module: " + name + "\n")
disrupt
}
ast = boot_fld(ast)
compiled = boot_mc(ast)
boot_sl = boot_load("streamline")
compiled = boot_sl(compiled)
mcode_json = json.encode(compiled)
mach_blob = mach_compile_mcode_bin(name, mcode_json)
if (cached) {
ensure_build_dir()
fd.slurpwrite(cached, mach_blob)
}
return mach_load(mach_blob, env)
}
}
// Last resort: boot seed as runtime (no source file found)
mcode_path = core_path + '/boot/' + name + '.cm.mcode'
if (fd.is_file(mcode_path)) {
mcode_blob = fd.slurp(mcode_path)
mach_blob = mach_compile_mcode_bin(name, text(mcode_blob))
return mach_load(mach_blob, env)
}
os.print("error: cannot load pipeline module: " + name + "\n")
disrupt
}
// Load compilation pipeline
var pipeline_env = stone({use: use_embed})
var tokenize_mod = load_pipeline_module('tokenize', pipeline_env)
var parse_mod = load_pipeline_module('parse', pipeline_env)
var fold_mod = load_pipeline_module('fold', pipeline_env)
var mcode_mod = load_pipeline_module('mcode', pipeline_env)
var streamline_mod = load_pipeline_module('streamline', pipeline_env)
use_cache['tokenize'] = tokenize_mod
use_cache['parse'] = parse_mod
use_cache['fold'] = fold_mod
use_cache['mcode'] = mcode_mod
use_cache['core/mcode'] = mcode_mod
use_cache['streamline'] = streamline_mod
use_cache['core/streamline'] = streamline_mod
// analyze: tokenize + parse + fold, check for errors
function analyze(src, filename) {
var tok_result = tokenize_mod(src, filename)
var _ast = parse_mod(tok_result.tokens, src, filename, tokenize_mod)
var _i = 0
var prev_line = -1
var prev_msg = null
var e = null
var msg = null
var line = null
var col = null
var has_errors = _ast.errors != null && length(_ast.errors) > 0
var folded = null
if (has_errors) {
while (_i < length(_ast.errors)) {
e = _ast.errors[_i]
msg = e.message
line = e.line
col = e.column
if (msg != prev_msg || line != prev_line) {
if (line != null && col != null)
os.print(`${filename}:${text(line)}:${text(col)}: error: ${msg}\n`)
else
os.print(`${filename}: error: ${msg}\n`)
}
prev_line = line
prev_msg = msg
_i = _i + 1
}
disrupt
}
folded = fold_mod(_ast)
if (!_no_warn && folded._diagnostics != null && length(folded._diagnostics) > 0) {
_i = 0
while (_i < length(folded._diagnostics)) {
e = folded._diagnostics[_i]
os.print(`${filename}:${text(e.line)}:${text(e.col)}: ${e.severity}: ${e.message}\n`)
_i = _i + 1
}
}
folded._diagnostics = null
return folded
}
// Lazy-loaded verify_ir module (loaded on first use)
var _verify_ir_mod = null
// Module summary extraction for cross-program analysis.
// Scans mcode IR for use() call patterns and attaches summaries.
// _summary_resolver is set after shop loads (null during bootstrap).
var _summary_resolver = null
function extract_module_summaries(compiled, ctx) {
if (_summary_resolver == null) return null
var instrs = null
var summaries = []
var unresolved = []
var i = 0
var j = 0
var n = 0
var instr = null
var prev = null
var op = null
var use_slots = {}
var frame_map = {}
var arg_map = {}
var val_slot = 0
var f_slot = 0
var path = null
var result_slot = 0
var summary = null
var inv_n = 0
if (compiled.main == null) return null
instrs = compiled.main.instructions
if (instrs == null) return null
n = length(instrs)
// Pass 1: find access(slot, {make:"intrinsic", name:"use"})
i = 0
while (i < n) {
instr = instrs[i]
if (is_array(instr) && instr[0] == "access") {
if (is_object(instr[2]) && instr[2].make == "intrinsic" && instr[2].name == "use") {
use_slots[text(instr[1])] = true
}
}
i = i + 1
}
// Pass 2: find frame(frame_slot, use_slot), setarg with string, invoke
i = 0
while (i < n) {
instr = instrs[i]
if (is_array(instr)) {
op = instr[0]
if (op == "frame" || op == "goframe") {
if (use_slots[text(instr[2])] == true) {
frame_map[text(instr[1])] = true
}
} else if (op == "setarg") {
if (frame_map[text(instr[1])] == true) {
val_slot = instr[3]
j = i - 1
while (j >= 0) {
prev = instrs[j]
if (is_array(prev) && prev[0] == "access" && prev[1] == val_slot && is_text(prev[2])) {
arg_map[text(instr[1])] = prev[2]
break
}
j = j - 1
}
}
} else if (op == "invoke" || op == "tail_invoke") {
f_slot = instr[1]
path = arg_map[text(f_slot)]
if (path != null) {
result_slot = instr[2]
summary = _summary_resolver(path, ctx)
if (summary != null) {
if (summary._native != true) {
summaries[] = {slot: result_slot, summary: summary}
}
} else {
inv_n = length(instr)
unresolved[] = {path: path, line: instr[inv_n - 2], col: instr[inv_n - 1]}
}
}
}
}
i = i + 1
}
if (length(summaries) > 0 || length(unresolved) > 0) {
return {summaries: summaries, unresolved: unresolved}
}
return null
}
// Run AST through mcode pipeline -> register VM
function run_ast_fn(name, ast, env, pkg) {
var compiled = mcode_mod(ast)
var ms = null
var _ui = 0
var _ur = null
var optimized = null
var _di = 0
var _diag = null
var _has_errors = false
var mcode_json = null
var mach_blob = null
if (os._verify_ir) {
if (_verify_ir_mod == null) {
_verify_ir_mod = load_pipeline_module('verify_ir', pipeline_env)
}
compiled._verify = true
compiled._verify_mod = _verify_ir_mod
}
if (!_no_warn) {
compiled._warn = true
ms = extract_module_summaries(compiled, pkg)
if (ms != null) {
if (length(ms.summaries) > 0) {
compiled._module_summaries = ms.summaries
}
if (length(ms.unresolved) > 0) {
compiled._unresolved_imports = ms.unresolved
}
}
}
if (compiled._unresolved_imports != null) {
_ui = 0
while (_ui < length(compiled._unresolved_imports)) {
_ur = compiled._unresolved_imports[_ui]
os.print(`${name}:${text(_ur.line)}:${text(_ur.col)}: error: cannot resolve module '${_ur.path}'\n`)
_ui = _ui + 1
}
disrupt
}
optimized = streamline_mod(compiled)
if (optimized._verify) {
delete optimized._verify
delete optimized._verify_mod
}
if (optimized._diagnostics != null && length(optimized._diagnostics) > 0) {
_di = 0
_has_errors = false
while (_di < length(optimized._diagnostics)) {
_diag = optimized._diagnostics[_di]
os.print(`${_diag.file}:${text(_diag.line)}:${text(_diag.col)}: ${_diag.severity}: ${_diag.message}\n`)
if (_diag.severity == "error") _has_errors = true
_di = _di + 1
}
if (_has_errors) disrupt
}
mcode_json = json.encode(optimized)
mach_blob = mach_compile_mcode_bin(name, mcode_json)
return mach_load(mach_blob, env)
}
// Run AST through mcode pipeline WITHOUT optimization -> register VM
function run_ast_noopt_fn(name, ast, env) {
var compiled = mcode_mod(ast)
var mcode_json = json.encode(compiled)
var mach_blob = mach_compile_mcode_bin(name, mcode_json)
return mach_load(mach_blob, env)
}
// Compile AST to blob without loading (for caching)
function compile_to_blob(name, ast) {
var compiled = mcode_mod(ast)
var optimized = streamline_mod(compiled)
return mach_compile_mcode_bin(name, json.encode(optimized))
}
// Compile user program AST to blob with diagnostics
function compile_user_blob(name, ast, pkg) {
var compiled = mcode_mod(ast)
var ms = null
var _ui = 0
var _ur = null
var optimized = null
var _di = 0
var _diag = null
var _has_errors = false
if (!_no_warn) {
compiled._warn = true
ms = extract_module_summaries(compiled, pkg)
if (ms != null) {
if (length(ms.summaries) > 0) {
compiled._module_summaries = ms.summaries
}
if (length(ms.unresolved) > 0) {
compiled._unresolved_imports = ms.unresolved
}
}
}
if (compiled._unresolved_imports != null) {
_ui = 0
while (_ui < length(compiled._unresolved_imports)) {
_ur = compiled._unresolved_imports[_ui]
os.print(`${name}:${text(_ur.line)}:${text(_ur.col)}: error: cannot resolve module '${_ur.path}'\n`)
_ui = _ui + 1
}
disrupt
}
optimized = streamline_mod(compiled)
if (optimized._diagnostics != null && length(optimized._diagnostics) > 0) {
_di = 0
_has_errors = false
while (_di < length(optimized._diagnostics)) {
_diag = optimized._diagnostics[_di]
os.print(`${_diag.file}:${text(_diag.line)}:${text(_diag.col)}: ${_diag.severity}: ${_diag.message}\n`)
if (_diag.severity == "error") _has_errors = true
_di = _di + 1
}
if (_has_errors) disrupt
}
return mach_compile_mcode_bin(name, json.encode(optimized))
}
// If loaded directly by C runtime (not via bootstrap), convert args -> init
var _program = null
var _user_args = []
var _j = 1
var _init = init
// Inherit native_mode from init (set by C for --native, or by parent actor)
if (_init != null && _init.native_mode)
native_mode = true
// Inherit warn mode from init (set by C for --no-warn)
var _no_warn = (_init != null && _init.no_warn) ? true : false
// CLI path: convert args to init record
if (args != null && (_init == null || !_init.program)) {
_program = args[0]
while (_j < length(args)) {
push(_user_args, args[_j])
_j = _j + 1
}
if (_init == null) {
_init = {program: _program, arg: _user_args}
} else {
_init.program = _program
_init.arg = _user_args
}
}
use_cache['core/internal/os'] = os
// Extra env properties added as engine initializes (log, runtime fns, etc.)
var core_extras = {}
// Load a core module from the file system
function use_core(path) {
var cache_key = 'core/' + path
var env = null
if (use_cache[cache_key])
return use_cache[cache_key]
var sym = use_embed(replace(path, '/', '_'))
var result = null
var script = null
var ast = null
var _load_mod = null
// Build env: merge core_extras
env = {use: use_core}
arrfor(array(core_extras), function(k) { env[k] = core_extras[k] })
env = stone(env)
var hash = null
var cached_path = null
var mach_blob = null
var source_blob = null
var file_path = null
// Compile from source .cm file
file_path = core_path + '/' + path + MOD_EXT
if (fd.is_file(file_path)) {
_load_mod = function() {
source_blob = fd.slurp(file_path)
hash = content_hash(source_blob)
cached_path = cache_path(hash)
if (cached_path && fd.is_file(cached_path)) {
result = mach_load(fd.slurp(cached_path), env)
} else {
script = text(source_blob)
ast = analyze(script, file_path)
mach_blob = compile_to_blob('core:' + path, ast)
if (cached_path) {
ensure_build_dir()
fd.slurpwrite(cached_path, mach_blob)
}
result = mach_load(mach_blob, env)
}
} disruption {
os.print("use('" + path + "'): failed to compile or load " + file_path + "\n")
disrupt
}
_load_mod()
use_cache[cache_key] = result
return result
}
// Embedded C module only
use_cache[cache_key] = sym
return sym
}
// Load full modules via use_core (extends C embeds with .cm additions, and caches)
fd = use_core('fd')
use_core('js')
var blob = use_core('blob')
function actor() {
}
var actor_mod = use_core('actor')
var wota = use_core('internal/wota')
var nota = use_core('internal/nota')
var ENETSERVICE = 0.1
var REPLYTIMEOUT = 60 // seconds before replies are ignored
// --- Logging system (bootstrap phase) ---
// Early log: prints to console before toml/time/json are loaded.
// Upgraded to full sink-based system after config loads (see load_log_config below).
// The bootstrap log forwards to _log_full once the full system is ready, so that
// modules loaded early (like shop.cm) get full logging even though they captured
// the bootstrap function reference.
var log_config = null
var channel_sinks = {}
var wildcard_sinks = []
var warned_channels = {}
var stack_channels = {}
var _log_full = null
var log_quiet_channels = { shop: true }
function log(name, args) {
if (_log_full) return _log_full(name, args)
if (log_quiet_channels[name]) return
var msg = args[0]
var stk = null
var i = 0
var fr = null
if (msg == null) msg = ""
os.print(`[${text(_cell.id, 0, 5)}] [${name}]: ${msg}\n`)
if (name == "error") {
stk = os.stack(2)
if (stk && length(stk) > 0) {
for (i = 0; i < length(stk); i = i + 1) {
fr = stk[i]
os.print(` at ${fr.fn} (${fr.file}:${text(fr.line)}:${text(fr.col)})\n`)
}
}
}
}
function actor_die(err)
{
var reason = null
var unders = null
if (err && is_function(err.toString)) {
os.print(err.toString())
os.print("\n")
if (err.stack) os.print(err.stack)
}
if (overling) {
if (err) {
// with an err, this is a forceful disrupt
reason = err
report_to_overling({type:'disrupt', reason})
} else
report_to_overling({type:'stop'})
}
if (underlings) {
unders = array(underlings)
arrfor(unders, function(id, index) {
log.console(`calling on ${id} to disrupt too`)
$_.stop(create_actor({id}))
})
}
if (err) {
if (err.message)
log.console(err.message)
if (err.stack)
log.console(err.stack)
}
actor_mod["disrupt"]()
}
actor_mod.on_exception(actor_die)
_cell.args = _init != null ? _init : {}
function create_actor(desc) {
var _desc = desc == null ? {id:guid()} : desc
var actor = {}
actor[ACTORDATA] = _desc
stone(actor)
return actor
}
var $_ = {}
use_cache['core/json'] = json
// Create runtime_env early (empty) -- filled after pronto loads.
// Shop accesses it lazily (in inject_env, called at module-use time, not load time)
// so it sees the filled version.
var runtime_env = {}
// Populate core_extras with everything shop (and other core modules) need
core_extras.use_cache = use_cache
core_extras.core_path = core_path
core_extras.shop_path = shop_path
core_extras.analyze = analyze
core_extras.run_ast_fn = run_ast_fn
core_extras.run_ast_noopt_fn = run_ast_noopt_fn
os.analyze = analyze
os.run_ast_fn = run_ast_fn
os.run_ast_noopt_fn = run_ast_noopt_fn
core_extras.core_json = json
core_extras.actor_api = $_
core_extras.log = log
core_extras.runtime_env = runtime_env
core_extras.content_hash = content_hash
core_extras.cache_path = cache_path
core_extras.ensure_build_dir = ensure_build_dir
core_extras.compile_to_blob = compile_to_blob
core_extras.native_mode = native_mode
// NOW load shop -- it receives all of the above via env
var shop = use_core('internal/shop')
use_core('build')
// Wire up module summary resolver now that shop is available
_summary_resolver = function(path, ctx) {
var info = shop.resolve_import_info(path, ctx)
if (info == null) return null
if (info.type == 'native') return {_native: true}
var resolved = info.resolved_path
if (resolved == null) return null
var summary_fn = function() {
return shop.summary_file(resolved)
} disruption {
return null
}
return summary_fn()
}
var time = use_core('time')
var toml = use_core('toml')
// --- Logging system (full version) ---
// Now that toml, time, fd, and json are available, upgrade the log function
// from the bootstrap version to a configurable sink-based system.
function ensure_log_dir(path) {
var parts = array(path, '/')
var current = starts_with(path, '/') ? '/' : ''
var i = 0
// ensure parent dir (skip last element which is the filename)
for (i = 0; i < length(parts) - 1; i++) {
if (parts[i] == '') continue
current = current + parts[i] + '/'
if (!fd.is_dir(current)) fd.mkdir(current)
}
}
function build_sink_routing() {
channel_sinks = {}
wildcard_sinks = []
stack_channels = {}
var names = array(log_config.sink)
arrfor(names, function(name) {
var sink = log_config.sink[name]
if (!sink || !is_object(sink)) return
sink._name = name
if (!is_array(sink.channels)) sink.channels = []
if (is_text(sink.exclude)) sink.exclude = [sink.exclude]
if (!is_array(sink.exclude)) sink.exclude = []
if (is_text(sink.stack)) sink.stack = [sink.stack]
if (!is_array(sink.stack)) sink.stack = []
if (sink.type == "file" && sink.path) ensure_log_dir(sink.path)
arrfor(sink.stack, function(ch) {
stack_channels[ch] = true
})
arrfor(sink.channels, function(ch) {
if (ch == "*") {
wildcard_sinks[] = sink
return
}
if (!channel_sinks[ch]) channel_sinks[ch] = []
channel_sinks[ch][] = sink
})
})
}
function load_log_config() {
var log_path = null
if (shop_path) {
log_path = shop_path + '/log.toml'
if (fd.is_file(log_path)) {
log_config = toml.decode(text(fd.slurp(log_path)))
}
}
if (!log_config || !log_config.sink || length(array(log_config.sink)) == 0) {
log_config = {
sink: {
terminal: {
type: "console",
format: "pretty",
channels: ["*"],
stack: ["error"]
}
}
}
}
build_sink_routing()
}
function pretty_format(rec) {
var aid = text(rec.actor_id, 0, 5)
var src = ""
var ev = null
var out = null
var i = 0
var fr = null
ev = is_text(rec.event) ? rec.event : json.encode(rec.event, false)
out = `[${aid}] [${rec.channel}] ${ev}\n`
if (rec.stack && length(rec.stack) > 0) {
for (i = 0; i < length(rec.stack); i = i + 1) {
fr = rec.stack[i]
out = out + ` at ${fr.fn} (${fr.file}:${text(fr.line)}:${text(fr.col)})\n`
}
}
return out
}
function bare_format(rec) {
var aid = text(rec.actor_id, 0, 5)
var ev = is_text(rec.event) ? rec.event : json.encode(rec.event, false)
var out = `[${aid}] ${ev}\n`
var i = 0
var fr = null
if (rec.stack && length(rec.stack) > 0) {
for (i = 0; i < length(rec.stack); i = i + 1) {
fr = rec.stack[i]
out = out + ` at ${fr.fn} (${fr.file}:${text(fr.line)}:${text(fr.col)})\n`
}
}
return out
}
function sink_excluded(sink, channel) {
var excluded = false
if (!sink.exclude || length(sink.exclude) == 0) return false
arrfor(sink.exclude, function(ex) {
if (ex == channel) excluded = true
})
return excluded
}
function dispatch_to_sink(sink, rec) {
var line = null
if (sink_excluded(sink, rec.channel)) return
if (sink.type == "console") {
if (sink.format == "json")
os.print(json.encode(rec, false) + "\n")
else if (sink.format == "bare")
os.print(bare_format(rec))
else
os.print(pretty_format(rec))
} else if (sink.type == "file") {
line = json.encode(rec, false) + "\n"
fd.slurpappend(sink.path, stone(blob(line)))
}
}
load_log_config()
log = function(name, args) {
var sinks = channel_sinks[name]
var event = args[0]
var c_stack = args[1]
var caller = null
var stack = null
var rec = null
if (!sinks && length(wildcard_sinks) == 0) return
// C-provided stack (from JS_Log callback) overrides caller_info/os.stack
if (c_stack && length(c_stack) > 0) {
caller = {file: c_stack[0].file, line: c_stack[0].line}
if (stack_channels[name]) stack = c_stack
} else {
caller = caller_info(0)
if (stack_channels[name]) stack = os.stack(1)
}
rec = {
actor_id: _cell.id,
timestamp: time.number(),
channel: name,
event: event,
source: caller
}
if (stack) rec.stack = stack
if (sinks) arrfor(sinks, function(sink) { dispatch_to_sink(sink, rec) })
arrfor(wildcard_sinks, function(sink) { dispatch_to_sink(sink, rec) })
}
// Wire C-level JS_Log through the ƿit log system
actor_mod.set_log(log)
// Let the bootstrap log forward to the full system — modules loaded early
// (before the full log was ready) captured the bootstrap function reference.
_log_full = log
var pronto = use_core('pronto')
var fallback = pronto.fallback
var parallel = pronto.parallel
var race = pronto.race
var sequence = pronto.sequence
runtime_env.actor = actor
runtime_env.log = log
runtime_env.send = send
runtime_env.shop_path = shop_path
runtime_env.fallback = fallback
runtime_env.parallel = parallel
runtime_env.race = race
runtime_env.sequence = sequence
// Make runtime functions available to modules loaded via use_core
arrfor(array(runtime_env), function(k) { core_extras[k] = runtime_env[k] })
$_.time_limit = function(requestor, seconds)
{
if (!pronto.is_requestor(requestor)) {
log.error('time_limit: first argument must be a requestor')
disrupt
}
if (!is_number(seconds) || seconds <= 0) {
log.error('time_limit: seconds must be a positive number')
disrupt
}
return function time_limit_requestor(callback, value) {
pronto.check_callback(callback, 'time_limit')
var finished = false
var requestor_cancel = null
var timer_cancel = null
function cancel(reason) {
if (finished) return
finished = true
if (timer_cancel) {
timer_cancel()
timer_cancel = null
}
if (requestor_cancel) {
requestor_cancel(reason)
requestor_cancel = null
}
}
function safe_cancel_requestor(reason) {
if (requestor_cancel) {
requestor_cancel(reason)
requestor_cancel = null
}
}
timer_cancel = $_.delay(function() {
if (finished) return
def reason = make_reason(factory, 'Timeout.', seconds)
safe_cancel_requestor(reason)
finished = true
callback(null, reason)
}, seconds)
function do_request() {
requestor_cancel = requestor(function(val, reason) {
if (finished) return
finished = true
if (timer_cancel) {
timer_cancel()
timer_cancel = null
}
callback(val, reason)
}, value)
} disruption {
cancel('requestor failed')
callback(null, 'requestor failed')
}
do_request()
return function(reason) {
safe_cancel_requestor(reason)
}
}
}
var config = {
ar_timer: 60,
actor_memory:0,
net_service:0.1,
reply_timeout:60,
main: true
}
_cell.config = config
ENETSERVICE = config.net_service
REPLYTIMEOUT = config.reply_timeout
/*
When handling a message, the message appears like this:
{
type: type of message
- contact: used for contact messages
- stop: used to issue stop command
- etc
reply: ID this message will respond to (callback saved on the actor)
replycc: the actor that is waiting for the reply
target: ID of the actor that's supposed to receive the message. Only added to non direct sends (out of portals)
return: reply ID so the replycc actor can know what callback to send the message to
data: the actual content of the message
}
actors look like
{
id: the GUID of this actor
address: the IP this actor can be found at
port: the port of the IP the actor can be found at
}
*/
function guid(bits)
{
var _bits = bits == null ? 256 : bits
var guid = blob(_bits, os.random)
stone(guid)
return text(guid,'h')
}
var HEADER = {}
// takes a function input value that will eventually be called with the current time in number form.
$_.clock = function(fn) {
actor_mod.clock(_ => {
fn(time.number())
send_messages()
})
}
var underlings = {} // this is more like "all actors that are notified when we die"
var overling = null
var root = null
var receive_fn = null
var greeters = {} // Router functions for when messages are received for a specific actor
var peers = {}
var id_address = {}
var peer_queue = {}
var portal = null
var portal_fn = null
function peer_connection(peer) {
return {
latency: peer.rtt,
bandwidth: {
incoming: peer.incoming_bandwidth,
outgoing: peer.outgoing_bandwidth
},
activity: {
last_sent: peer.last_send_time,
last_received: peer.last_receive_time
},
mtu: peer.mtu,
data: {
incoming_total: peer.incoming_data_total,
outgoing_total: peer.outgoing_data_total,
reliable_in_transit: peer.reliable_data_in_transit
},
latency_variance: peer.rtt_variance,
packet_loss: peer.packet_loss,
state: peer.state
}
}
// takes a callback function, an actor object, and a configuration record for getting information about the status of a connection to the actor. The configuration record is used to request the sort of information that needs to be communicated. This can include latency, bandwidth, activity, congestion, cost, partitions. The callback is given a record containing the requested information.
$_.connection = function(callback, actor, config) {
var peer = peers[actor[ACTORDATA].id]
if (peer) {
callback(peer_connection(peer))
return
}
if (actor_mod.mailbox_exist(actor[ACTORDATA].id)) {
callback({type:"local"})
return
}
callback()
}
// takes a function input value that will eventually be called with the current time in number form.
$_.portal = function(fn, port) {
if (portal) {
log.error(`Already started a portal listening on ${portal.port}`)
disrupt
}
if (!port) {
log.error("Requires a valid port.")
disrupt
}
log.system(`starting a portal on port ${port}`)
portal = enet.create_host({address: "any", port})
portal_fn = fn
}
function handle_host(e) {
var queue = null
var data = null
if (e.type == "connect") {
log.system(`connected a new peer: ${e.peer.address}:${e.peer.port}`)
peers[`${e.peer.address}:${e.peer.port}`] = e.peer
queue = peer_queue.get(e.peer)
if (queue) {
arrfor(queue, (msg, index) => e.peer.send(nota.encode(msg)))
log.system(`sent queue out of queue`)
peer_queue.delete(e.peer)
}
} else if (e.type == "disconnect") {
peer_queue.delete(e.peer)
arrfor(array(peers), function(id, index) {
if (peers[id] == e.peer) delete peers[id]
})
log.system('portal got disconnect from ' + e.peer.address + ":" + e.peer.port)
} else if (e.type == "receive") {
data = nota.decode(e.data)
if (data.replycc && !data.replycc.address) {
data.replycc[ACTORDATA].address = e.peer.address
data.replycc[ACTORDATA].port = e.peer.port
}
if (data.data) populate_actor_addresses(data.data, e)
turn(data)
}
}
function populate_actor_addresses(obj, e) {
if (!is_object(obj)) return
if (obj[ACTORDATA] && !obj[ACTORDATA].address) {
obj[ACTORDATA].address = e.peer.address
obj[ACTORDATA].port = e.peer.port
}
arrfor(array(obj), function(key, index) {
if (key in obj)
populate_actor_addresses(obj[key], e)
})
}
// takes a callback function, an actor object, and a configuration record for getting information about the status of a connection to the actor. The configuration record is used to request the sort of information that needs to be communicated. This can include latency, bandwidth, activity, congestion, cost, partitions. The callback is given a record containing the requested information.
$_.contact = function(callback, record) {
send(create_actor(record), record, callback)
}
// registers a function that will receive all messages...
$_.receiver = function receiver(fn) {
receive_fn = fn
}
// Holds all messages queued during the current turn.
var message_queue = []
$_.start = function start(cb, program) {
if (!program) return
var id = guid()
var oid = $_.self[ACTORDATA].id
var startup = {
id,
overling_id: oid,
root_id: root ? root[ACTORDATA].id : null,
program,
native_mode: native_mode,
}
greeters[id] = cb
push(message_queue, { startup })
}
// stops an underling or self.
$_.stop = function stop(actor) {
if (!actor) {
need_stop = true
return
}
if (!is_actor(actor)) {
log.error('Can only call stop on an actor.')
disrupt
}
if (is_null(underlings[actor[ACTORDATA].id])) {
log.error('Can only call stop on an underling or self.')
disrupt
}
sys_msg(actor, {kind:"stop"})
}
// schedules the removal of an actor after a specified amount of time.
$_.unneeded = function unneeded(fn, seconds) {
actor_mod.unneeded(fn, seconds)
}
// schedules the invocation of a function after a specified amount of time.
$_.delay = function delay(fn, seconds) {
var _seconds = seconds == null ? 0 : seconds
function delay_turn() {
fn()
send_messages()
}
var id = actor_mod.delay(delay_turn, _seconds)
return function() { actor_mod.removetimer(id) }
}
var enet = use_core('enet')
// causes this actor to stop when another actor stops.
var couplings = {}
$_.couple = function couple(actor) {
if (actor == $_.self) return // can't couple to self
couplings[actor[ACTORDATA].id] = true
sys_msg(actor, {kind:'couple', from_id: _cell.id})
log.system(`coupled to ${actor}`)
}
function actor_prep(actor, send) {
push(message_queue, {actor,send});
}
// Send a message immediately without queuing
function actor_send_immediate(actor, send) {
actor_send(actor, send)
}
function actor_send(actor, message) {
var wota_blob = null
var peer = null
if (actor[HEADER] && !actor[HEADER].replycc) // attempting to respond to a message but sender is not expecting; silently drop
return
if (!is_actor(actor) && !is_actor(actor.replycc)) {
log.error(`Must send to an actor object. Attempted send to ${actor}`)
disrupt
}
if (!is_object(message)) {
log.error('Must send an object record.')
disrupt
}
// message to self
if (actor[ACTORDATA].id == _cell.id) {
if (receive_fn) receive_fn(message.data)
return
}
// message to actor in same flock
if (actor[ACTORDATA].id && actor_mod.mailbox_exist(actor[ACTORDATA].id)) {
wota_blob = wota.encode(message)
actor_mod.mailbox_push(actor[ACTORDATA].id, wota_blob)
return
}
if (actor[ACTORDATA].address) {
if (actor[ACTORDATA].id)
message.target = actor[ACTORDATA].id
else
message.type = "contact"
peer = peers[actor[ACTORDATA].address + ":" + actor[ACTORDATA].port]
if (!peer) {
if (!portal) {
log.system(`creating a contactor ...`)
portal = enet.create_host({address:"any"})
log.system(`allowing contact to port ${portal.port}`)
}
log.system(`no peer! connecting to ${actor[ACTORDATA].address}:${actor[ACTORDATA].port}`)
peer = portal.connect(actor[ACTORDATA].address, actor[ACTORDATA].port)
peer_queue.set(peer, [message])
} else {
peer.send(nota.encode(message))
}
return
}
log.system(`Unable to send message to actor ${actor[ACTORDATA]}`)
}
function send_messages() {
// if we've been flagged to stop, bail out before doing anything
if (need_stop) {
actor_die()
message_queue = []
return
}
var _qi = 0
var _qm = null
while (_qi < length(message_queue)) {
_qm = message_queue[_qi]
if (_qm.startup) {
actor_mod.createactor(_qm.startup)
} else {
actor_send(_qm.actor, _qm.send)
}
_qi = _qi + 1
}
message_queue = []
}
var replies = {}
function send(actor, message, reply) {
var send_msg = null
var target = null
var header = null
var id = null
if (!is_object(actor)) {
log.error(`Must send to an actor object. Provided: ${actor}`)
disrupt
}
if (!is_object(message)) {
log.error('Message must be an object')
disrupt
}
send_msg = {type:"user", data: message}
target = actor
if (actor[HEADER] && actor[HEADER].replycc) {
header = actor[HEADER]
if (!header.replycc || !is_actor(header.replycc)) {
log.error(`Supplied actor had a return, but it's not a valid actor! ${actor[HEADER]}`)
disrupt
}
target = header.replycc
send_msg.return = header.reply
}
if (reply) {
id = guid()
replies[id] = reply
$_.delay(_ => {
if (replies[id]) {
replies[id](null, "timeout")
delete replies[id]
}
}, REPLYTIMEOUT)
send_msg.reply = id
send_msg.replycc_id = _cell.id
}
// Instead of sending immediately, queue it
actor_prep(target, send_msg);
}
stone(send)
if (!_cell.args.id) _cell.id = guid()
else _cell.id = _cell.args.id
$_.self = create_actor({id: _cell.id})
// Actor's timeslice for processing a single message
function turn(msg)
{
var mes = wota.decode(msg)
handle_message(mes)
send_messages()
}
//log.console(`FIXME: need to get main from config, not just set to true`)
actor_mod.register_actor(_cell.id, turn, true, config.ar_timer)
if (config.actor_memory)
js.mem_limit(config.actor_memory)
if (config.stack_max)
js.max_stacksize(config.system.stack_max);
overling = _cell.args.overling_id ? create_actor({id: _cell.args.overling_id}) : null
$_.overling = overling
root = _cell.args.root_id ? create_actor({id: _cell.args.root_id}) : null
if (root == null) root = $_.self
if (overling) {
$_.couple(overling)
report_to_overling({type:'greet', actor_id: _cell.id})
}
// sys messages are always dispatched immediately
function sys_msg(actor, msg)
{
var envelope = {}
envelope[SYSYM] = msg
actor_send(actor, envelope)
}
// messages sent to here get put into the cb provided to start
function report_to_overling(msg)
{
if (!overling) return
sys_msg(overling, {kind:'underling', message:msg, from_id: _cell.id})
}
// Determine the program to run from command line
var program = _cell.args.program
if (!program) {
log.error('No program specified. Usage: cell <program> [args...]')
os.exit(1)
}
function handle_actor_disconnect(id) {
var greeter = greeters[id]
if (greeter) {
greeter({type: "stopped", id})
delete greeters[id]
}
log.system(`actor ${id} disconnected`)
if (!is_null(couplings[id])) actor_die("coupled actor died") // couplings now disrupts instead of stop
}
function handle_sysym(msg)
{
var from_id = null
var from_actor = null
var greeter = null
var letter2 = null
var greet_msg = null
if (msg.kind == 'stop') {
actor_die("got stop message")
} else if (msg.kind == 'underling') {
from_id = msg.from_id
greeter = greeters[from_id]
if (greeter) {
greet_msg = msg.message
if (greet_msg.actor_id) {
greet_msg.actor = create_actor({id: greet_msg.actor_id})
}
greeter(greet_msg)
}
if (msg.message.type == 'disrupt')
delete underlings[from_id]
} else if (msg.kind == 'contact') {
if (portal_fn) {
letter2 = msg.data
letter2[HEADER] = msg
delete msg.data
portal_fn(letter2)
} else {
log.error('Got a contact message, but no portal is established.')
disrupt
}
} else if (msg.kind == 'couple') {
from_id = msg.from_id
underlings[from_id] = true
log.system(`actor ${from_id} is coupled to me`)
}
}
function handle_message(msg) {
var letter = null
var fn = null
if (msg[SYSYM]) {
handle_sysym(msg[SYSYM], msg.from)
return
}
if (msg.type == "user") {
letter = msg.data // what the sender really sent
if (msg.replycc_id) {
msg.replycc = create_actor({id: msg.replycc_id})
}
letter[HEADER] = msg
letter[ACTORDATA] = { reply: msg.reply }
if (msg.return) {
fn = replies[msg.return]
if (fn) fn(letter)
delete replies[msg.return]
return
}
if (receive_fn) receive_fn(letter)
} else if (msg.type == "stopped") {
handle_actor_disconnect(msg.id)
}
}
function enet_check()
{
if (portal) portal.service(handle_host)
$_.delay(enet_check, ENETSERVICE);
}
// enet_check();
// Finally, run the program
actor_mod.setname(_cell.args.program)
var prog = _cell.args.program
if (ends_with(prog, '.cm')) {
os.print(`error: ${prog} is a module (.cm), not a program (.ce)\n`)
os.exit(1)
}
if (ends_with(prog, '.ce')) prog = text(prog, 0, -3)
var package = use_core('package')
// Find the .ce file using unified resolver
var cwd_package = package.find_package_dir(".")
var prog_info = shop.resolve_program ? shop.resolve_program(prog, cwd_package) : null
var prog_path = null
if (prog_info) {
prog_path = prog_info.path
} else {
// Fallback: check CWD, package dir, and core
prog_path = prog + ".ce"
if (!fd.is_file(prog_path) && cwd_package)
prog_path = cwd_package + '/' + prog + '.ce'
if (!fd.is_file(prog_path))
prog_path = core_path + '/' + prog + '.ce'
if (!fd.is_file(prog_path)) {
os.print(`Main program ${prog} could not be found\n`)
os.exit(1)
}
}
$_.clock(_ => {
var _file_info_ok = false
var file_info = null
var _try_fi = function() {
file_info = shop.file_info ? shop.file_info(prog_path) : null
_file_info_ok = true
} disruption {}
_try_fi()
if (!_file_info_ok || !file_info)
file_info = {path: prog_path, is_module: false, is_actor: true, package: null, name: prog}
// If the unified resolver found the package, use that as the authoritative source
if (prog_info && prog_info.pkg)
file_info.package = prog_info.pkg
var inject = shop.script_inject_for ? shop.script_inject_for(file_info) : []
// Build env with runtime functions + capability injections
var env = {}
arrfor(array(runtime_env), function(k) { env[k] = runtime_env[k] })
var _ki = 0
var inj = null
var key = null
while (_ki < length(inject)) {
inj = inject[_ki]
key = inj
if (key && key[0] == '$') key = text(key, 1)
if (key == 'fd') env['$fd'] = fd
else env['$' + key] = $_[key]
_ki = _ki + 1
}
var pkg = file_info ? file_info.package : null
// Verify all transitive dependency packages are present, auto-install if missing
var _deps = null
var _di = 0
var _dep_dir = null
var _auto_install = null
if (pkg) {
_deps = package.gather_dependencies(pkg)
_di = 0
while (_di < length(_deps)) {
_dep_dir = package.get_dir(_deps[_di])
if (!fd.is_dir(_dep_dir)) {
log.console('installing missing dependency: ' + _deps[_di])
_auto_install = function() {
shop.sync(_deps[_di])
} disruption {
log.error('failed to install dependency: ' + _deps[_di])
disrupt
}
_auto_install()
}
_di = _di + 1
}
}
env.use = function(path) {
var ck = 'core/' + path
var _use_core_result = null
var _use_core_ok = false
if (use_cache[ck]) return use_cache[ck]
var _try_core = function() {
_use_core_result = use_core(path)
_use_core_ok = true
} disruption {}
_try_core()
if (_use_core_ok && _use_core_result) return _use_core_result
var _shop_use = function() {
return shop.use(path, pkg)
} disruption {
log.error(`use('${path}') failed (package: ${pkg})`)
disrupt
}
return _shop_use()
}
env.args = _cell.args.arg
env.log = log
env = stone(env)
var native_build = null
var native_dylib_path = null
var native_handle = null
var native_parts = null
var native_basename = null
var native_sym = null
// Native execution path: compile to dylib and run
if (native_mode) {
native_build = use_core('build')
native_dylib_path = native_build.compile_native(prog_path, null, null, pkg)
native_handle = os.dylib_open(native_dylib_path)
native_parts = array(prog_path, '/')
native_basename = native_parts[length(native_parts) - 1]
native_sym = pkg ? shop.c_symbol_for_file(pkg, native_basename) : null
if (native_sym)
os.native_module_load_named(native_handle, native_sym, env)
else
os.native_module_load(native_handle, env)
return
}
var source_blob = fd.slurp(prog_path)
var hash = content_hash(source_blob)
var cached_path = cache_path(hash)
var val = null
var script = null
var ast = null
var mach_blob = null
if (cached_path && fd.is_file(cached_path)) {
val = mach_load(fd.slurp(cached_path), env)
} else {
script = text(source_blob)
ast = analyze(script, prog_path)
mach_blob = compile_user_blob(prog, ast, pkg)
if (cached_path) {
ensure_build_dir()
fd.slurpwrite(cached_path, mach_blob)
}
val = mach_load(mach_blob, env)
}
if (val) {
log.error('Program must not return anything')
disrupt
}
})