fix actor working

This commit is contained in:
2026-02-17 08:53:16 -06:00
parent fbdfbc1200
commit 2a38292ff7
8 changed files with 177 additions and 181 deletions

View File

@@ -706,14 +706,18 @@ $_.receiver = function receiver(fn) {
receive_fn = fn
}
// Holds all messages queued during the current turn.
var message_queue = []
$_.start = function start(cb, program) {
if (!program) return
var id = guid()
var oid = $_.self[ACTORDATA].id
var startup = {
id,
overling: $_.self,
root,
overling_id: oid,
root_id: root ? root[ACTORDATA].id : null,
program,
}
greeters[id] = cb
@@ -761,7 +765,7 @@ var couplings = {}
$_.couple = function couple(actor) {
if (actor == $_.self) return // can't couple to self
couplings[actor[ACTORDATA].id] = true
sys_msg(actor, {kind:'couple', from: $_.self})
sys_msg(actor, {kind:'couple', from_id: _cell.id})
log.system(`coupled to ${actor}`)
}
@@ -828,9 +832,6 @@ function actor_send(actor, message) {
log.system(`Unable to send message to actor ${actor[ACTORDATA]}`)
}
// Holds all messages queued during the current turn.
var message_queue = []
function send_messages() {
// if we've been flagged to stop, bail out before doing anything
if (need_stop) {
@@ -839,14 +840,17 @@ function send_messages() {
return
}
arrfor(message_queue, function(msg, index) {
if (msg.startup) {
// now is the time to actually spin up the actor
actor_mod.createactor(msg.startup)
var _qi = 0
var _qm = null
while (_qi < length(message_queue)) {
_qm = message_queue[_qi]
if (_qm.startup) {
actor_mod.createactor(_qm.startup)
} else {
actor_send(msg.actor, msg.send)
actor_send(_qm.actor, _qm.send)
}
})
_qi = _qi + 1
}
message_queue = []
}
@@ -892,7 +896,7 @@ function send(actor, message, reply) {
}
}, REPLYTIMEOUT)
send_msg.reply = id
send_msg.replycc = $_.self
send_msg.replycc_id = _cell.id
}
// Instead of sending immediately, queue it
@@ -923,29 +927,30 @@ if (config.actor_memory)
if (config.stack_max)
js.max_stacksize(config.system.stack_max);
overling = _cell.args.overling
overling = _cell.args.overling_id ? create_actor({id: _cell.args.overling_id}) : null
$_.overling = overling
root = _cell.args.root
root = _cell.args.root_id ? create_actor({id: _cell.args.root_id}) : null
if (root == null) root = $_.self
if (overling) {
$_.couple(overling) // auto couple to overling
report_to_overling({type:'greet', actor: $_.self})
$_.couple(overling)
report_to_overling({type:'greet', actor_id: _cell.id})
}
// sys messages are always dispatched immediately
function sys_msg(actor, msg)
{
actor_send(actor, {[SYSYM]:msg})
var envelope = {}
envelope[SYSYM] = msg
actor_send(actor, envelope)
}
// messages sent to here get put into the cb provided to start
function report_to_overling(msg)
{
if (!overling) return
sys_msg(overling, {kind:'underling', message:msg, from: $_.self})
sys_msg(overling, {kind:'underling', message:msg, from_id: _cell.id})
}
// Determine the program to run from command line
@@ -968,18 +973,26 @@ function handle_actor_disconnect(id) {
function handle_sysym(msg)
{
var from = null
var from_id = null
var from_actor = null
var greeter = null
var letter2 = null
var greet_msg = null
if (msg.kind == 'stop') {
actor_die("got stop message")
} else if (msg.kind == 'underling') {
from = msg.from
greeter = greeters[from[ACTORDATA].id]
if (greeter) greeter(msg.message)
from_id = msg.from_id
greeter = greeters[from_id]
if (greeter) {
greet_msg = msg.message
if (greet_msg.actor_id) {
greet_msg.actor = create_actor({id: greet_msg.actor_id})
}
greeter(greet_msg)
}
if (msg.message.type == 'disrupt')
delete underlings[from[ACTORDATA].id]
delete underlings[from_id]
} else if (msg.kind == 'contact') {
if (portal_fn) {
letter2 = msg.data
@@ -991,10 +1004,9 @@ function handle_sysym(msg)
disrupt
}
} else if (msg.kind == 'couple') {
// from must be notified when we die
from = msg.from
underlings[from[ACTORDATA].id] = true
log.system(`actor ${from} is coupled to me`)
from_id = msg.from_id
underlings[from_id] = true
log.system(`actor ${from_id} is coupled to me`)
}
}
@@ -1009,6 +1021,9 @@ function handle_message(msg) {
if (msg.type == "user") {
letter = msg.data // what the sender really sent
if (msg.replycc_id) {
msg.replycc = create_actor({id: msg.replycc_id})
}
letter[HEADER] = msg
letter[ACTORDATA] = { reply: msg.reply }

View File

@@ -344,14 +344,17 @@ void script_startup(cell_rt *prt)
tmp = js_core_json_use(js);
JS_SetPropertyStr(js, env_ref.val, "json", tmp);
crt->actor_sym_ref.val = JS_NewObject(js);
crt->actor_sym_ref.val = JS_NewString(js, "__ACTOR__");
JS_SetActorSym(js, JS_DupValue(js, crt->actor_sym_ref.val));
JS_SetPropertyStr(js, env_ref.val, "actorsym", JS_DupValue(js, crt->actor_sym_ref.val));
// Always set init (even if null)
if (crt->init_wota) {
tmp = wota2value(js, crt->init_wota);
JS_SetPropertyStr(js, env_ref.val, "init", tmp);
JSGCRef init_ref;
JS_PushGCRef(js, &init_ref);
init_ref.val = wota2value(js, crt->init_wota);
JS_SetPropertyStr(js, env_ref.val, "init", init_ref.val);
JS_PopGCRef(js, &init_ref);
free(crt->init_wota);
crt->init_wota = NULL;
} else {
@@ -556,7 +559,7 @@ int cell_init(int argc, char **argv)
cli_rt->on_exception_ref.val = JS_NULL;
cli_rt->message_handle_ref.val = JS_NULL;
cli_rt->unneeded_ref.val = JS_NULL;
cli_rt->actor_sym_ref.val = JS_NewObject(ctx);
cli_rt->actor_sym_ref.val = JS_NewString(ctx, "__ACTOR__");
JS_SetActorSym(ctx, JS_DupValue(ctx, cli_rt->actor_sym_ref.val));
root_cell = cli_rt;

View File

@@ -10,6 +10,8 @@ JSC_CCALL(os_createactor,
return JS_ThrowInternalError(js, "Can't start a new actor while disrupting.");
void *startup = value2wota(js, argv[0], JS_NULL, NULL);
if (!startup)
return JS_ThrowInternalError(js, "Failed to encode startup data");
create_actor(startup);
)
@@ -21,46 +23,32 @@ JSC_CCALL(os_mailbox_push,
int exist = actor_exists(id);
if (!exist) {
JS_FreeCString(js,id);
JS_FreeCString(js, id);
return JS_ThrowInternalError(js, "No mailbox found for given ID");
}
JS_FreeCString(js,id);
/* JSValue sys = JS_GetPropertyStr(js, argv[1], "__SYSTEM__");
if (!JS_IsNull(sys)) {
const char *k = JS_ToCString(js,sys);
int stop = 0;
if (strcmp(k, "stop") == 0) {
stop = 1;
actor_disrupt(target);
}
JS_FreeValue(js,sys);
JS_FreeCString(js,k);
if (stop) return JS_NULL;
}
*/
size_t size;
void *data = js_get_blob_data(js, &size, argv[1]);
if (data == (void*)-1) {
JS_FreeCString(js, id);
return JS_EXCEPTION;
}
if (size == 0) {
JS_FreeCString(js, id);
return JS_ThrowInternalError(js, "No data present in blob");
}
// Create a new blob and copy the data
blob *msg_blob = blob_new(size * 8); // Convert bytes to bits
blob *msg_blob = blob_new(size * 8);
if (!msg_blob) {
JS_FreeCString(js, id);
return JS_ThrowInternalError(js, "Could not allocate blob");
}
// Copy data to blob
blob_write_bytes(msg_blob, data, size);
blob_make_stone(msg_blob); // Make it immutable
blob_make_stone(msg_blob);
const char *err = send_message(id, msg_blob);
JS_FreeCString(js, id);
if (err) {
blob_destroy(msg_blob);
return JS_ThrowInternalError(js, "Could not send message: %s", err);

View File

@@ -11342,11 +11342,16 @@ static char *js_do_nota_decode (JSContext *js, JSValue *tmp, char *nota, JSValue
nota = nota_read_record (&n, nota);
*tmp = JS_NewObject (js);
for (int i = 0; i < n; i++) {
JSGCRef prop_key_ref, sub_val_ref;
JS_PushGCRef (js, &prop_key_ref);
JS_PushGCRef (js, &sub_val_ref);
nota = nota_read_text (&str, nota);
JSValue prop_key = JS_NewString (js, str);
nota = js_do_nota_decode (js, &ret2, nota, *tmp, prop_key, reviver);
JS_SetPropertyStr (js, *tmp, str, ret2);
JS_FreeValue (js, prop_key);
prop_key_ref.val = JS_NewString (js, str);
sub_val_ref.val = JS_NULL;
nota = js_do_nota_decode (js, &sub_val_ref.val, nota, *tmp, prop_key_ref.val, reviver);
JS_SetPropertyStr (js, *tmp, str, sub_val_ref.val);
JS_PopGCRef (js, &sub_val_ref);
JS_PopGCRef (js, &prop_key_ref);
sys_free (str);
}
break;
@@ -11357,16 +11362,18 @@ static char *js_do_nota_decode (JSContext *js, JSValue *tmp, char *nota, JSValue
case NOTA_SYM:
nota = nota_read_sym (&b, nota);
if (b == NOTA_PRIVATE) {
JSGCRef inner_ref;
JSGCRef inner_ref, obj_ref2;
JS_PushGCRef (js, &inner_ref);
JS_PushGCRef (js, &obj_ref2);
inner_ref.val = JS_NULL;
nota = js_do_nota_decode (js, &inner_ref.val, nota, holder, JS_NULL, reviver);
JSValue obj = JS_NewObject (js);
obj_ref2.val = JS_NewObject (js);
if (!JS_IsNull (js->actor_sym))
JS_SetPropertyKey (js, obj, js->actor_sym, inner_ref.val);
JS_CellStone (js, obj);
JS_SetPropertyKey (js, obj_ref2.val, js->actor_sym, inner_ref.val);
JS_CellStone (js, obj_ref2.val);
*tmp = obj_ref2.val;
JS_PopGCRef (js, &obj_ref2);
JS_PopGCRef (js, &inner_ref);
*tmp = obj;
} else {
switch (b) {
case NOTA_NULL: *tmp = JS_NULL; break;
@@ -11771,15 +11778,21 @@ static void encode_object_properties (WotaEncodeContext *enc, JSValueConst val,
}
for (uint32_t i = 0; i < plen; i++) {
JSValue key = JS_GetPropertyNumber (ctx, keys_ref.val, i);
JSValue prop_val = JS_GetProperty (ctx, val_ref.val, key);
/* Store key into its GCRef slot immediately so it's rooted before
JS_GetProperty can trigger GC and relocate the string. */
key_refs[i].val = JS_GetPropertyNumber (ctx, keys_ref.val, i);
JSValue prop_val = JS_GetProperty (ctx, val_ref.val, key_refs[i].val);
if (!JS_IsFunction (prop_val)) {
key_refs[non_function_count].val = key;
if (i != non_function_count) {
key_refs[non_function_count].val = key_refs[i].val;
key_refs[i].val = JS_NULL;
}
prop_refs[non_function_count].val = prop_val;
non_function_count++;
} else {
JS_FreeValue (ctx, prop_val);
JS_FreeValue (ctx, key);
JS_FreeValue (ctx, key_refs[i].val);
key_refs[i].val = JS_NULL;
}
}
JS_FreeValue (ctx, keys_ref.val);
@@ -11844,6 +11857,13 @@ static void wota_encode_value (WotaEncodeContext *enc, JSValueConst val, JSValue
wota_write_sym (&enc->wb, WOTA_NULL);
break;
case JS_TAG_PTR: {
if (JS_IsText (replaced)) {
size_t plen;
const char *str = JS_ToCStringLen (ctx, &plen, replaced);
wota_write_text_len (&enc->wb, str ? str : "", str ? plen : 0);
JS_FreeCString (ctx, str);
break;
}
if (js_is_blob (ctx, replaced)) {
size_t buf_len;
void *buf_data = js_get_blob_data (ctx, &buf_len, replaced);
@@ -11936,16 +11956,18 @@ static char *decode_wota_value (JSContext *ctx, char *data_ptr, JSValue *out_val
int scode;
data_ptr = wota_read_sym (&scode, data_ptr);
if (scode == WOTA_PRIVATE) {
JSGCRef inner_ref;
JSGCRef inner_ref, obj_ref2;
JS_PushGCRef (ctx, &inner_ref);
JS_PushGCRef (ctx, &obj_ref2);
inner_ref.val = JS_NULL;
data_ptr = decode_wota_value (ctx, data_ptr, &inner_ref.val, holder, JS_NULL, reviver);
JSValue obj = JS_NewObject (ctx);
obj_ref2.val = JS_NewObject (ctx);
if (!JS_IsNull (ctx->actor_sym))
JS_SetPropertyKey (ctx, obj, ctx->actor_sym, inner_ref.val);
JS_CellStone (ctx, obj);
JS_SetPropertyKey (ctx, obj_ref2.val, ctx->actor_sym, inner_ref.val);
JS_CellStone (ctx, obj_ref2.val);
*out_val = obj_ref2.val;
JS_PopGCRef (ctx, &obj_ref2);
JS_PopGCRef (ctx, &inner_ref);
*out_val = obj;
} else if (scode == WOTA_NULL) *out_val = JS_NULL;
else if (scode == WOTA_FALSE) *out_val = JS_NewBool (ctx, 0);
else if (scode == WOTA_TRUE) *out_val = JS_NewBool (ctx, 1);
@@ -12003,8 +12025,7 @@ static char *decode_wota_value (JSContext *ctx, char *data_ptr, JSValue *out_val
prop_key_ref.val = JS_NewStringLen (ctx, tkey, key_len);
sub_val_ref.val = JS_NULL;
data_ptr = decode_wota_value (ctx, data_ptr, &sub_val_ref.val, obj_ref.val, prop_key_ref.val, reviver);
JS_SetProperty (ctx, obj_ref.val, prop_key_ref.val, sub_val_ref.val);
JS_FreeValue (ctx, prop_key_ref.val);
JS_SetPropertyStr (ctx, obj_ref.val, tkey, sub_val_ref.val);
JS_PopGCRef (ctx, &sub_val_ref);
JS_PopGCRef (ctx, &prop_key_ref);
sys_free (tkey);
@@ -12094,12 +12115,18 @@ static JSValue js_wota_decode (JSContext *ctx, JSValueConst this_val, int argc,
if (!buf || len == 0) return JS_ThrowTypeError (ctx, "No blob data present");
JSValue reviver = (argc > 1 && JS_IsFunction (argv[1])) ? argv[1] : JS_NULL;
char *data_ptr = (char *)buf;
JSValue result = JS_NULL;
JSValue holder = JS_NewObject (ctx);
JSValue empty_key = JS_NewString (ctx, "");
decode_wota_value (ctx, data_ptr, &result, holder, empty_key, reviver);
JS_FreeValue (ctx, empty_key);
JS_FreeValue (ctx, holder);
JSGCRef result_ref, holder_ref, empty_key_ref;
JS_PushGCRef (ctx, &result_ref);
JS_PushGCRef (ctx, &holder_ref);
JS_PushGCRef (ctx, &empty_key_ref);
result_ref.val = JS_NULL;
holder_ref.val = JS_NewObject (ctx);
empty_key_ref.val = JS_NewString (ctx, "");
decode_wota_value (ctx, data_ptr, &result_ref.val, holder_ref.val, empty_key_ref.val, reviver);
JSValue result = result_ref.val;
JS_PopGCRef (ctx, &empty_key_ref);
JS_PopGCRef (ctx, &holder_ref);
JS_PopGCRef (ctx, &result_ref);
return result;
}

View File

@@ -377,24 +377,33 @@ int actor_exists(const char *id)
void set_actor_state(cell_rt *actor)
{
if (actor->disrupt) {
#ifdef SCHEDULER_DEBUG
fprintf(stderr, "set_actor_state: %s disrupted, freeing\n", actor->name ? actor->name : actor->id);
#endif
actor_free(actor);
return;
}
pthread_mutex_lock(actor->msg_mutex);
#ifdef SCHEDULER_DEBUG
fprintf(stderr, "set_actor_state: %s state=%d letters=%ld\n", actor->name ? actor->name : actor->id, actor->state, (long)arrlen(actor->letters));
#endif
switch(actor->state) {
case ACTOR_RUNNING:
case ACTOR_READY:
if (actor->ar)
actor->ar = 0;
break;
case ACTOR_IDLE:
if (arrlen(actor->letters)) {
if (actor->is_quiescent) {
actor->is_quiescent = 0;
atomic_fetch_sub(&engine.quiescent_count, 1);
}
#ifdef SCHEDULER_DEBUG
fprintf(stderr, "set_actor_state: %s IDLE->READY, enqueueing (main=%d)\n", actor->name ? actor->name : actor->id, actor->main_thread_only);
#endif
actor->state = ACTOR_READY;
actor->ar = 0;
@@ -635,21 +644,26 @@ void actor_turn(cell_rt *actor)
if (actor->trace_hook)
actor->trace_hook(actor->name, CELL_HOOK_ENTER);
TAKETURN:
pthread_mutex_lock(actor->msg_mutex);
JSValue result;
if (!arrlen(actor->letters)) {
int pending = arrlen(actor->letters);
if (!pending) {
pthread_mutex_unlock(actor->msg_mutex);
goto ENDTURN;
}
#ifdef SCHEDULER_DEBUG
fprintf(stderr, "actor_turn: %s has %d letters, type=%d\n", actor->name ? actor->name : actor->id, pending, actor->letters[0].type);
#endif
letter l = actor->letters[0];
arrdel(actor->letters, 0); // O(N) but we kept array as requested
arrdel(actor->letters, 0);
pthread_mutex_unlock(actor->msg_mutex);
if (l.type == LETTER_BLOB) {
// Create a JS blob from the C blob
size_t size = blob_length(l.blob_data) / 8; // Convert bits to bytes
#ifdef SCHEDULER_DEBUG
fprintf(stderr, "actor_turn BLOB: %s size=%zu, calling message_handle\n", actor->name ? actor->name : actor->id, size);
#endif
JSValue arg = js_new_blob_stoned_copy(actor->context, (void*)blob_data(l.blob_data), size);
blob_destroy(l.blob_data);
result = JS_Call(actor->context, actor->message_handle_ref.val, JS_NULL, 1, &arg);
@@ -674,11 +688,17 @@ void actor_turn(cell_rt *actor)
if (actor->disrupt) {
/* Actor must die. Unlock before freeing so actor_free can
lock/unlock/destroy the mutex without use-after-free. */
#ifdef SCHEDULER_DEBUG
fprintf(stderr, "actor_turn ENDTURN: %s disrupted, freeing\n", actor->name ? actor->name : actor->id);
#endif
pthread_mutex_unlock(actor->mutex);
actor_free(actor);
return;
}
#ifdef SCHEDULER_DEBUG
fprintf(stderr, "actor_turn ENDTURN: %s has %ld letters, calling set_actor_state\n", actor->name ? actor->name : actor->id, (long)arrlen(actor->letters));
#endif
set_actor_state(actor);
pthread_mutex_unlock(actor->mutex);

104
test.ce
View File

@@ -318,7 +318,7 @@ function collect_actor_tests(package_name, specific_test) {
// Spawn an actor test and track it
function spawn_actor_test(test_info) {
var test_name = text(test_info.file, 6, -3) // remove "tests/" and ".ce"
var test_name = text(test_info.file, 6, -3)
log.console(` [ACTOR] ${test_info.file}`)
var entry = {
@@ -331,16 +331,41 @@ function spawn_actor_test(test_info) {
}
var _spawn = function() {
var actor_path = text(test_info.path, 0, -3) // remove .ce
entry.actor = $start(actor_path)
var actor_path = text(test_info.path, 0, -3)
$start(function(event) {
var end_time = time.number()
var duration_ns = round((end_time - entry.start_time) * 1000000000)
if (event.type == 'greet') {
entry.actor = event.actor
return
}
var idx = find(pending_actor_tests, e => e == entry)
if (idx != null) {
pending_actor_tests = array(
array(pending_actor_tests, 0, idx),
array(pending_actor_tests, idx + 1)
)
}
entry.duration_ns = duration_ns
if (event.type == 'stop') {
entry.status = "passed"
log.console(` PASS ${test_name}`)
} else {
entry.status = "failed"
entry.error = { message: event.reason || "Actor disrupted" }
log.console(` FAIL ${test_name}: ${entry.error.message}`)
}
push(actor_test_results, entry)
if (gc_after_each_test) dbg.gc()
check_completion()
}, actor_path)
push(pending_actor_tests, entry)
} disruption {
entry.status = "failed"
entry.error = { message: `Failed to spawn actor` }
entry.error = { message: "Failed to spawn actor" }
entry.duration_ns = 0
push(actor_test_results, entry)
log.console(` FAIL ${test_name}: `)
log.error()
log.console(` FAIL ${test_name}: Failed to spawn`)
}
_spawn()
}
@@ -582,69 +607,6 @@ if (length(all_actor_tests) > 0) {
}
}
// Handle messages from actor tests
function handle_actor_message(msg) {
var sender = msg.$sender
var found_idx = -1
var i = 0
var res = null
var entry = null
for (i = 0; i < length(pending_actor_tests); i++) {
if (pending_actor_tests[i].actor == sender) {
found_idx = i
break
}
}
if (found_idx == -1) return
var base_entry = pending_actor_tests[found_idx]
pending_actor_tests = array(array(pending_actor_tests, 0, found_idx), array(pending_actor_tests, found_idx + 1))
var end_time = time.number()
var duration_ns = round((end_time - base_entry.start_time) * 1000000000)
var results = []
if (is_array(msg)) {
results = msg
} else if (msg && is_array(msg.results)) {
results = msg.results
} else {
results = [msg]
}
for (i = 0; i < length(results); i++) {
res = results[i] || {}
entry = {
package: base_entry.package,
file: base_entry.file,
test: res.test || base_entry.test + (length(results) > 1 ? `#${i+1}` : ""),
status: "failed",
duration_ns: duration_ns
}
if (res.type && res.type != "test_result") {
entry.error = { message: `Unexpected message type: ${res.type}` }
log.console(` FAIL ${entry.test}: unexpected message`)
} else if (res.passed) {
entry.status = "passed"
log.console(` PASS ${entry.test}`)
} else {
entry.error = { message: res.error || "Test failed" }
if (res.stack) entry.error.stack = res.stack
log.console(` FAIL ${entry.test}: ${entry.error.message}`)
}
push(actor_test_results, entry)
}
if (gc_after_each_test) {
dbg.gc()
}
check_completion()
}
// Check for timed out actor tests
function check_timeouts() {
var now = time.number()
@@ -870,8 +832,4 @@ Total: ${totals.total}, Passed: ${totals.passed}, Failed: ${totals.failed}
if (length(all_actor_tests) == 0) {
generate_reports(totals)
$stop()
} else {
$portal(function(msg) {
handle_actor_message(msg)
})
}

View File

@@ -1,7 +1,2 @@
$receiver(e => {
log.console(`Got a message: ${e}`)
send(e, {
message: "Good to go."
})
})
// tests/reply_actor.ce - Simple child that just logs
log.console("reply_actor: alive!")

View File

@@ -1,10 +0,0 @@
return {
test_send: function() {
$start(e => {
send(e.actor, { message: "Hello! Good to go?" }, msg => {
log.console(`Original sender got message back: ${msg}. Stopping!`)
// $stop() // Removed
})
}, "tests/reply_actor")
}
}