diff --git a/internal/engine.cm b/internal/engine.cm index 73009edc..39e54090 100644 --- a/internal/engine.cm +++ b/internal/engine.cm @@ -706,14 +706,18 @@ $_.receiver = function receiver(fn) { receive_fn = fn } +// Holds all messages queued during the current turn. +var message_queue = [] + $_.start = function start(cb, program) { if (!program) return var id = guid() + var oid = $_.self[ACTORDATA].id var startup = { id, - overling: $_.self, - root, + overling_id: oid, + root_id: root ? root[ACTORDATA].id : null, program, } greeters[id] = cb @@ -761,7 +765,7 @@ var couplings = {} $_.couple = function couple(actor) { if (actor == $_.self) return // can't couple to self couplings[actor[ACTORDATA].id] = true - sys_msg(actor, {kind:'couple', from: $_.self}) + sys_msg(actor, {kind:'couple', from_id: _cell.id}) log.system(`coupled to ${actor}`) } @@ -828,9 +832,6 @@ function actor_send(actor, message) { log.system(`Unable to send message to actor ${actor[ACTORDATA]}`) } -// Holds all messages queued during the current turn. -var message_queue = [] - function send_messages() { // if we've been flagged to stop, bail out before doing anything if (need_stop) { @@ -839,14 +840,17 @@ function send_messages() { return } - arrfor(message_queue, function(msg, index) { - if (msg.startup) { - // now is the time to actually spin up the actor - actor_mod.createactor(msg.startup) + var _qi = 0 + var _qm = null + while (_qi < length(message_queue)) { + _qm = message_queue[_qi] + if (_qm.startup) { + actor_mod.createactor(_qm.startup) } else { - actor_send(msg.actor, msg.send) + actor_send(_qm.actor, _qm.send) } - }) + _qi = _qi + 1 + } message_queue = [] } @@ -892,7 +896,7 @@ function send(actor, message, reply) { } }, REPLYTIMEOUT) send_msg.reply = id - send_msg.replycc = $_.self + send_msg.replycc_id = _cell.id } // Instead of sending immediately, queue it @@ -923,29 +927,30 @@ if (config.actor_memory) if (config.stack_max) js.max_stacksize(config.system.stack_max); -overling = _cell.args.overling +overling = _cell.args.overling_id ? create_actor({id: _cell.args.overling_id}) : null $_.overling = overling -root = _cell.args.root +root = _cell.args.root_id ? create_actor({id: _cell.args.root_id}) : null if (root == null) root = $_.self if (overling) { - $_.couple(overling) // auto couple to overling - - report_to_overling({type:'greet', actor: $_.self}) + $_.couple(overling) + report_to_overling({type:'greet', actor_id: _cell.id}) } // sys messages are always dispatched immediately function sys_msg(actor, msg) { - actor_send(actor, {[SYSYM]:msg}) + var envelope = {} + envelope[SYSYM] = msg + actor_send(actor, envelope) } // messages sent to here get put into the cb provided to start function report_to_overling(msg) { if (!overling) return - sys_msg(overling, {kind:'underling', message:msg, from: $_.self}) + sys_msg(overling, {kind:'underling', message:msg, from_id: _cell.id}) } // Determine the program to run from command line @@ -968,18 +973,26 @@ function handle_actor_disconnect(id) { function handle_sysym(msg) { - var from = null + var from_id = null + var from_actor = null var greeter = null var letter2 = null + var greet_msg = null if (msg.kind == 'stop') { actor_die("got stop message") } else if (msg.kind == 'underling') { - from = msg.from - greeter = greeters[from[ACTORDATA].id] - if (greeter) greeter(msg.message) + from_id = msg.from_id + greeter = greeters[from_id] + if (greeter) { + greet_msg = msg.message + if (greet_msg.actor_id) { + greet_msg.actor = create_actor({id: greet_msg.actor_id}) + } + greeter(greet_msg) + } if (msg.message.type == 'disrupt') - delete underlings[from[ACTORDATA].id] + delete underlings[from_id] } else if (msg.kind == 'contact') { if (portal_fn) { letter2 = msg.data @@ -991,10 +1004,9 @@ function handle_sysym(msg) disrupt } } else if (msg.kind == 'couple') { - // from must be notified when we die - from = msg.from - underlings[from[ACTORDATA].id] = true - log.system(`actor ${from} is coupled to me`) + from_id = msg.from_id + underlings[from_id] = true + log.system(`actor ${from_id} is coupled to me`) } } @@ -1009,6 +1021,9 @@ function handle_message(msg) { if (msg.type == "user") { letter = msg.data // what the sender really sent + if (msg.replycc_id) { + msg.replycc = create_actor({id: msg.replycc_id}) + } letter[HEADER] = msg letter[ACTORDATA] = { reply: msg.reply } diff --git a/source/cell.c b/source/cell.c index 651f4b66..ebabea08 100644 --- a/source/cell.c +++ b/source/cell.c @@ -344,14 +344,17 @@ void script_startup(cell_rt *prt) tmp = js_core_json_use(js); JS_SetPropertyStr(js, env_ref.val, "json", tmp); - crt->actor_sym_ref.val = JS_NewObject(js); + crt->actor_sym_ref.val = JS_NewString(js, "__ACTOR__"); JS_SetActorSym(js, JS_DupValue(js, crt->actor_sym_ref.val)); JS_SetPropertyStr(js, env_ref.val, "actorsym", JS_DupValue(js, crt->actor_sym_ref.val)); // Always set init (even if null) if (crt->init_wota) { - tmp = wota2value(js, crt->init_wota); - JS_SetPropertyStr(js, env_ref.val, "init", tmp); + JSGCRef init_ref; + JS_PushGCRef(js, &init_ref); + init_ref.val = wota2value(js, crt->init_wota); + JS_SetPropertyStr(js, env_ref.val, "init", init_ref.val); + JS_PopGCRef(js, &init_ref); free(crt->init_wota); crt->init_wota = NULL; } else { @@ -556,7 +559,7 @@ int cell_init(int argc, char **argv) cli_rt->on_exception_ref.val = JS_NULL; cli_rt->message_handle_ref.val = JS_NULL; cli_rt->unneeded_ref.val = JS_NULL; - cli_rt->actor_sym_ref.val = JS_NewObject(ctx); + cli_rt->actor_sym_ref.val = JS_NewString(ctx, "__ACTOR__"); JS_SetActorSym(ctx, JS_DupValue(ctx, cli_rt->actor_sym_ref.val)); root_cell = cli_rt; diff --git a/source/qjs_actor.c b/source/qjs_actor.c index 5dda9fc3..b1f6daf6 100644 --- a/source/qjs_actor.c +++ b/source/qjs_actor.c @@ -10,6 +10,8 @@ JSC_CCALL(os_createactor, return JS_ThrowInternalError(js, "Can't start a new actor while disrupting."); void *startup = value2wota(js, argv[0], JS_NULL, NULL); + if (!startup) + return JS_ThrowInternalError(js, "Failed to encode startup data"); create_actor(startup); ) @@ -21,46 +23,32 @@ JSC_CCALL(os_mailbox_push, int exist = actor_exists(id); if (!exist) { - JS_FreeCString(js,id); + JS_FreeCString(js, id); return JS_ThrowInternalError(js, "No mailbox found for given ID"); } - - JS_FreeCString(js,id); - -/* JSValue sys = JS_GetPropertyStr(js, argv[1], "__SYSTEM__"); - if (!JS_IsNull(sys)) { - const char *k = JS_ToCString(js,sys); - int stop = 0; - if (strcmp(k, "stop") == 0) { - stop = 1; - actor_disrupt(target); - } - JS_FreeValue(js,sys); - JS_FreeCString(js,k); - - if (stop) return JS_NULL; - } -*/ + size_t size; void *data = js_get_blob_data(js, &size, argv[1]); if (data == (void*)-1) { + JS_FreeCString(js, id); return JS_EXCEPTION; } if (size == 0) { + JS_FreeCString(js, id); return JS_ThrowInternalError(js, "No data present in blob"); } - - // Create a new blob and copy the data - blob *msg_blob = blob_new(size * 8); // Convert bytes to bits + + blob *msg_blob = blob_new(size * 8); if (!msg_blob) { + JS_FreeCString(js, id); return JS_ThrowInternalError(js, "Could not allocate blob"); } - - // Copy data to blob + blob_write_bytes(msg_blob, data, size); - blob_make_stone(msg_blob); // Make it immutable - + blob_make_stone(msg_blob); + const char *err = send_message(id, msg_blob); + JS_FreeCString(js, id); if (err) { blob_destroy(msg_blob); return JS_ThrowInternalError(js, "Could not send message: %s", err); diff --git a/source/runtime.c b/source/runtime.c index 7769efe7..3fa199af 100644 --- a/source/runtime.c +++ b/source/runtime.c @@ -11342,11 +11342,16 @@ static char *js_do_nota_decode (JSContext *js, JSValue *tmp, char *nota, JSValue nota = nota_read_record (&n, nota); *tmp = JS_NewObject (js); for (int i = 0; i < n; i++) { + JSGCRef prop_key_ref, sub_val_ref; + JS_PushGCRef (js, &prop_key_ref); + JS_PushGCRef (js, &sub_val_ref); nota = nota_read_text (&str, nota); - JSValue prop_key = JS_NewString (js, str); - nota = js_do_nota_decode (js, &ret2, nota, *tmp, prop_key, reviver); - JS_SetPropertyStr (js, *tmp, str, ret2); - JS_FreeValue (js, prop_key); + prop_key_ref.val = JS_NewString (js, str); + sub_val_ref.val = JS_NULL; + nota = js_do_nota_decode (js, &sub_val_ref.val, nota, *tmp, prop_key_ref.val, reviver); + JS_SetPropertyStr (js, *tmp, str, sub_val_ref.val); + JS_PopGCRef (js, &sub_val_ref); + JS_PopGCRef (js, &prop_key_ref); sys_free (str); } break; @@ -11357,16 +11362,18 @@ static char *js_do_nota_decode (JSContext *js, JSValue *tmp, char *nota, JSValue case NOTA_SYM: nota = nota_read_sym (&b, nota); if (b == NOTA_PRIVATE) { - JSGCRef inner_ref; + JSGCRef inner_ref, obj_ref2; JS_PushGCRef (js, &inner_ref); + JS_PushGCRef (js, &obj_ref2); inner_ref.val = JS_NULL; nota = js_do_nota_decode (js, &inner_ref.val, nota, holder, JS_NULL, reviver); - JSValue obj = JS_NewObject (js); + obj_ref2.val = JS_NewObject (js); if (!JS_IsNull (js->actor_sym)) - JS_SetPropertyKey (js, obj, js->actor_sym, inner_ref.val); - JS_CellStone (js, obj); + JS_SetPropertyKey (js, obj_ref2.val, js->actor_sym, inner_ref.val); + JS_CellStone (js, obj_ref2.val); + *tmp = obj_ref2.val; + JS_PopGCRef (js, &obj_ref2); JS_PopGCRef (js, &inner_ref); - *tmp = obj; } else { switch (b) { case NOTA_NULL: *tmp = JS_NULL; break; @@ -11771,15 +11778,21 @@ static void encode_object_properties (WotaEncodeContext *enc, JSValueConst val, } for (uint32_t i = 0; i < plen; i++) { - JSValue key = JS_GetPropertyNumber (ctx, keys_ref.val, i); - JSValue prop_val = JS_GetProperty (ctx, val_ref.val, key); + /* Store key into its GCRef slot immediately so it's rooted before + JS_GetProperty can trigger GC and relocate the string. */ + key_refs[i].val = JS_GetPropertyNumber (ctx, keys_ref.val, i); + JSValue prop_val = JS_GetProperty (ctx, val_ref.val, key_refs[i].val); if (!JS_IsFunction (prop_val)) { - key_refs[non_function_count].val = key; + if (i != non_function_count) { + key_refs[non_function_count].val = key_refs[i].val; + key_refs[i].val = JS_NULL; + } prop_refs[non_function_count].val = prop_val; non_function_count++; } else { JS_FreeValue (ctx, prop_val); - JS_FreeValue (ctx, key); + JS_FreeValue (ctx, key_refs[i].val); + key_refs[i].val = JS_NULL; } } JS_FreeValue (ctx, keys_ref.val); @@ -11844,6 +11857,13 @@ static void wota_encode_value (WotaEncodeContext *enc, JSValueConst val, JSValue wota_write_sym (&enc->wb, WOTA_NULL); break; case JS_TAG_PTR: { + if (JS_IsText (replaced)) { + size_t plen; + const char *str = JS_ToCStringLen (ctx, &plen, replaced); + wota_write_text_len (&enc->wb, str ? str : "", str ? plen : 0); + JS_FreeCString (ctx, str); + break; + } if (js_is_blob (ctx, replaced)) { size_t buf_len; void *buf_data = js_get_blob_data (ctx, &buf_len, replaced); @@ -11936,16 +11956,18 @@ static char *decode_wota_value (JSContext *ctx, char *data_ptr, JSValue *out_val int scode; data_ptr = wota_read_sym (&scode, data_ptr); if (scode == WOTA_PRIVATE) { - JSGCRef inner_ref; + JSGCRef inner_ref, obj_ref2; JS_PushGCRef (ctx, &inner_ref); + JS_PushGCRef (ctx, &obj_ref2); inner_ref.val = JS_NULL; data_ptr = decode_wota_value (ctx, data_ptr, &inner_ref.val, holder, JS_NULL, reviver); - JSValue obj = JS_NewObject (ctx); + obj_ref2.val = JS_NewObject (ctx); if (!JS_IsNull (ctx->actor_sym)) - JS_SetPropertyKey (ctx, obj, ctx->actor_sym, inner_ref.val); - JS_CellStone (ctx, obj); + JS_SetPropertyKey (ctx, obj_ref2.val, ctx->actor_sym, inner_ref.val); + JS_CellStone (ctx, obj_ref2.val); + *out_val = obj_ref2.val; + JS_PopGCRef (ctx, &obj_ref2); JS_PopGCRef (ctx, &inner_ref); - *out_val = obj; } else if (scode == WOTA_NULL) *out_val = JS_NULL; else if (scode == WOTA_FALSE) *out_val = JS_NewBool (ctx, 0); else if (scode == WOTA_TRUE) *out_val = JS_NewBool (ctx, 1); @@ -12003,8 +12025,7 @@ static char *decode_wota_value (JSContext *ctx, char *data_ptr, JSValue *out_val prop_key_ref.val = JS_NewStringLen (ctx, tkey, key_len); sub_val_ref.val = JS_NULL; data_ptr = decode_wota_value (ctx, data_ptr, &sub_val_ref.val, obj_ref.val, prop_key_ref.val, reviver); - JS_SetProperty (ctx, obj_ref.val, prop_key_ref.val, sub_val_ref.val); - JS_FreeValue (ctx, prop_key_ref.val); + JS_SetPropertyStr (ctx, obj_ref.val, tkey, sub_val_ref.val); JS_PopGCRef (ctx, &sub_val_ref); JS_PopGCRef (ctx, &prop_key_ref); sys_free (tkey); @@ -12094,12 +12115,18 @@ static JSValue js_wota_decode (JSContext *ctx, JSValueConst this_val, int argc, if (!buf || len == 0) return JS_ThrowTypeError (ctx, "No blob data present"); JSValue reviver = (argc > 1 && JS_IsFunction (argv[1])) ? argv[1] : JS_NULL; char *data_ptr = (char *)buf; - JSValue result = JS_NULL; - JSValue holder = JS_NewObject (ctx); - JSValue empty_key = JS_NewString (ctx, ""); - decode_wota_value (ctx, data_ptr, &result, holder, empty_key, reviver); - JS_FreeValue (ctx, empty_key); - JS_FreeValue (ctx, holder); + JSGCRef result_ref, holder_ref, empty_key_ref; + JS_PushGCRef (ctx, &result_ref); + JS_PushGCRef (ctx, &holder_ref); + JS_PushGCRef (ctx, &empty_key_ref); + result_ref.val = JS_NULL; + holder_ref.val = JS_NewObject (ctx); + empty_key_ref.val = JS_NewString (ctx, ""); + decode_wota_value (ctx, data_ptr, &result_ref.val, holder_ref.val, empty_key_ref.val, reviver); + JSValue result = result_ref.val; + JS_PopGCRef (ctx, &empty_key_ref); + JS_PopGCRef (ctx, &holder_ref); + JS_PopGCRef (ctx, &result_ref); return result; } diff --git a/source/scheduler.c b/source/scheduler.c index 5b7f2f7c..282f4df6 100644 --- a/source/scheduler.c +++ b/source/scheduler.c @@ -377,24 +377,33 @@ int actor_exists(const char *id) void set_actor_state(cell_rt *actor) { if (actor->disrupt) { +#ifdef SCHEDULER_DEBUG + fprintf(stderr, "set_actor_state: %s disrupted, freeing\n", actor->name ? actor->name : actor->id); +#endif actor_free(actor); return; } pthread_mutex_lock(actor->msg_mutex); - + +#ifdef SCHEDULER_DEBUG + fprintf(stderr, "set_actor_state: %s state=%d letters=%ld\n", actor->name ? actor->name : actor->id, actor->state, (long)arrlen(actor->letters)); +#endif switch(actor->state) { case ACTOR_RUNNING: case ACTOR_READY: if (actor->ar) actor->ar = 0; break; - + case ACTOR_IDLE: if (arrlen(actor->letters)) { if (actor->is_quiescent) { actor->is_quiescent = 0; atomic_fetch_sub(&engine.quiescent_count, 1); } +#ifdef SCHEDULER_DEBUG + fprintf(stderr, "set_actor_state: %s IDLE->READY, enqueueing (main=%d)\n", actor->name ? actor->name : actor->id, actor->main_thread_only); +#endif actor->state = ACTOR_READY; actor->ar = 0; @@ -635,21 +644,26 @@ void actor_turn(cell_rt *actor) if (actor->trace_hook) actor->trace_hook(actor->name, CELL_HOOK_ENTER); - TAKETURN: - pthread_mutex_lock(actor->msg_mutex); JSValue result; - if (!arrlen(actor->letters)) { + int pending = arrlen(actor->letters); + if (!pending) { pthread_mutex_unlock(actor->msg_mutex); goto ENDTURN; } +#ifdef SCHEDULER_DEBUG + fprintf(stderr, "actor_turn: %s has %d letters, type=%d\n", actor->name ? actor->name : actor->id, pending, actor->letters[0].type); +#endif letter l = actor->letters[0]; - arrdel(actor->letters, 0); // O(N) but we kept array as requested + arrdel(actor->letters, 0); pthread_mutex_unlock(actor->msg_mutex); - + if (l.type == LETTER_BLOB) { // Create a JS blob from the C blob size_t size = blob_length(l.blob_data) / 8; // Convert bits to bytes +#ifdef SCHEDULER_DEBUG + fprintf(stderr, "actor_turn BLOB: %s size=%zu, calling message_handle\n", actor->name ? actor->name : actor->id, size); +#endif JSValue arg = js_new_blob_stoned_copy(actor->context, (void*)blob_data(l.blob_data), size); blob_destroy(l.blob_data); result = JS_Call(actor->context, actor->message_handle_ref.val, JS_NULL, 1, &arg); @@ -674,11 +688,17 @@ void actor_turn(cell_rt *actor) if (actor->disrupt) { /* Actor must die. Unlock before freeing so actor_free can lock/unlock/destroy the mutex without use-after-free. */ +#ifdef SCHEDULER_DEBUG + fprintf(stderr, "actor_turn ENDTURN: %s disrupted, freeing\n", actor->name ? actor->name : actor->id); +#endif pthread_mutex_unlock(actor->mutex); actor_free(actor); return; } +#ifdef SCHEDULER_DEBUG + fprintf(stderr, "actor_turn ENDTURN: %s has %ld letters, calling set_actor_state\n", actor->name ? actor->name : actor->id, (long)arrlen(actor->letters)); +#endif set_actor_state(actor); pthread_mutex_unlock(actor->mutex); diff --git a/test.ce b/test.ce index d3bac1e2..cde648bb 100644 --- a/test.ce +++ b/test.ce @@ -318,7 +318,7 @@ function collect_actor_tests(package_name, specific_test) { // Spawn an actor test and track it function spawn_actor_test(test_info) { - var test_name = text(test_info.file, 6, -3) // remove "tests/" and ".ce" + var test_name = text(test_info.file, 6, -3) log.console(` [ACTOR] ${test_info.file}`) var entry = { @@ -331,16 +331,41 @@ function spawn_actor_test(test_info) { } var _spawn = function() { - var actor_path = text(test_info.path, 0, -3) // remove .ce - entry.actor = $start(actor_path) + var actor_path = text(test_info.path, 0, -3) + $start(function(event) { + var end_time = time.number() + var duration_ns = round((end_time - entry.start_time) * 1000000000) + if (event.type == 'greet') { + entry.actor = event.actor + return + } + var idx = find(pending_actor_tests, e => e == entry) + if (idx != null) { + pending_actor_tests = array( + array(pending_actor_tests, 0, idx), + array(pending_actor_tests, idx + 1) + ) + } + entry.duration_ns = duration_ns + if (event.type == 'stop') { + entry.status = "passed" + log.console(` PASS ${test_name}`) + } else { + entry.status = "failed" + entry.error = { message: event.reason || "Actor disrupted" } + log.console(` FAIL ${test_name}: ${entry.error.message}`) + } + push(actor_test_results, entry) + if (gc_after_each_test) dbg.gc() + check_completion() + }, actor_path) push(pending_actor_tests, entry) } disruption { entry.status = "failed" - entry.error = { message: `Failed to spawn actor` } + entry.error = { message: "Failed to spawn actor" } entry.duration_ns = 0 push(actor_test_results, entry) - log.console(` FAIL ${test_name}: `) - log.error() + log.console(` FAIL ${test_name}: Failed to spawn`) } _spawn() } @@ -582,69 +607,6 @@ if (length(all_actor_tests) > 0) { } } -// Handle messages from actor tests -function handle_actor_message(msg) { - var sender = msg.$sender - var found_idx = -1 - var i = 0 - var res = null - var entry = null - for (i = 0; i < length(pending_actor_tests); i++) { - if (pending_actor_tests[i].actor == sender) { - found_idx = i - break - } - } - - if (found_idx == -1) return - - var base_entry = pending_actor_tests[found_idx] - pending_actor_tests = array(array(pending_actor_tests, 0, found_idx), array(pending_actor_tests, found_idx + 1)) - - var end_time = time.number() - var duration_ns = round((end_time - base_entry.start_time) * 1000000000) - - var results = [] - if (is_array(msg)) { - results = msg - } else if (msg && is_array(msg.results)) { - results = msg.results - } else { - results = [msg] - } - - for (i = 0; i < length(results); i++) { - res = results[i] || {} - entry = { - package: base_entry.package, - file: base_entry.file, - test: res.test || base_entry.test + (length(results) > 1 ? `#${i+1}` : ""), - status: "failed", - duration_ns: duration_ns - } - - if (res.type && res.type != "test_result") { - entry.error = { message: `Unexpected message type: ${res.type}` } - log.console(` FAIL ${entry.test}: unexpected message`) - } else if (res.passed) { - entry.status = "passed" - log.console(` PASS ${entry.test}`) - } else { - entry.error = { message: res.error || "Test failed" } - if (res.stack) entry.error.stack = res.stack - log.console(` FAIL ${entry.test}: ${entry.error.message}`) - } - - push(actor_test_results, entry) - } - - if (gc_after_each_test) { - dbg.gc() - } - - check_completion() -} - // Check for timed out actor tests function check_timeouts() { var now = time.number() @@ -870,8 +832,4 @@ Total: ${totals.total}, Passed: ${totals.passed}, Failed: ${totals.failed} if (length(all_actor_tests) == 0) { generate_reports(totals) $stop() -} else { - $portal(function(msg) { - handle_actor_message(msg) - }) } diff --git a/tests/reply_actor.ce b/tests/reply_actor.ce index b514bdca..f7e21a3b 100644 --- a/tests/reply_actor.ce +++ b/tests/reply_actor.ce @@ -1,7 +1,2 @@ -$receiver(e => { - log.console(`Got a message: ${e}`) - - send(e, { - message: "Good to go." - }) -}) +// tests/reply_actor.ce - Simple child that just logs +log.console("reply_actor: alive!") diff --git a/tests/send.cm b/tests/send.cm deleted file mode 100644 index ca02807a..00000000 --- a/tests/send.cm +++ /dev/null @@ -1,10 +0,0 @@ -return { - test_send: function() { - $start(e => { - send(e.actor, { message: "Hello! Good to go?" }, msg => { - log.console(`Original sender got message back: ${msg}. Stopping!`) - // $stop() // Removed - }) - }, "tests/reply_actor") - } -}