From 5ee51198a7494a68bb65e923c46131aa9c832ce4 Mon Sep 17 00:00:00 2001 From: John Alanbrook Date: Tue, 17 Feb 2026 17:34:25 -0600 Subject: [PATCH] kill actor when abusive --- internal/engine.cm | 2 +- source/cell.c | 22 ++- source/cell_internal.h | 17 +++ source/mach.c | 94 +++++++++--- source/quickjs-internal.h | 10 +- source/quickjs.h | 12 ++ source/runtime.c | 29 ++++ source/scheduler.c | 290 +++++++++++++++++++++++++----------- tests/actor_memory_abuse.ce | 12 ++ tests/actor_slow_killed.ce | 13 ++ tests/hang_actor_memory.ce | 5 + 11 files changed, 396 insertions(+), 110 deletions(-) create mode 100644 tests/actor_memory_abuse.ce create mode 100644 tests/actor_slow_killed.ce create mode 100644 tests/hang_actor_memory.ce diff --git a/internal/engine.cm b/internal/engine.cm index efb901f2..28c53e0d 100644 --- a/internal/engine.cm +++ b/internal/engine.cm @@ -383,7 +383,7 @@ function actor_die(err) -//actor_mod.on_exception(actor_die) +actor_mod.on_exception(actor_die) _cell.args = _init != null ? _init : {} diff --git a/source/cell.c b/source/cell.c index a0145e34..4cc02b65 100644 --- a/source/cell.c +++ b/source/cell.c @@ -270,6 +270,9 @@ void script_startup(cell_rt *prt) JS_SetContextOpaque(js, prt); prt->context = js; + /* Set per-actor heap memory limit */ + JS_SetHeapMemoryLimit(js, ACTOR_MEMORY_LIMIT); + /* Register all GCRef fields so the Cheney GC can relocate them. */ JS_AddGCRef(js, &prt->idx_buffer_ref); JS_AddGCRef(js, &prt->on_exception_ref); @@ -759,11 +762,24 @@ void cell_trace_sethook(cell_hook) int uncaught_exception(JSContext *js, JSValue v) { - (void)v; - if (!JS_HasException(js)) + int has_exc = JS_HasException(js); + int is_exc = JS_IsException(v); + if (!has_exc && !is_exc) return 1; /* Error message and backtrace were already printed to stderr by JS_ThrowError2 / print_backtrace. Just clear the flag. */ - JS_GetException(js); + if (has_exc) + JS_GetException(js); + cell_rt *crt = JS_GetContextOpaque(js); + if (crt && !JS_IsNull(crt->on_exception_ref.val)) { + /* Disable interrupt handler so actor_die can send messages + without being re-interrupted. */ + JS_SetInterruptHandler(js, NULL, NULL); + JSValue err = JS_NewString(js, "interrupted"); + JS_Call(js, crt->on_exception_ref.val, JS_NULL, 1, &err); + /* Clear any secondary exception from the callback. */ + if (JS_HasException(js)) + JS_GetException(js); + } return 0; } diff --git a/source/cell_internal.h b/source/cell_internal.h index fa767920..29757aa5 100644 --- a/source/cell_internal.h +++ b/source/cell_internal.h @@ -21,6 +21,14 @@ typedef struct letter { #define ACTOR_EXHAUSTED 3 // Actor waiting for GC #define ACTOR_RECLAIMING 4 // Actor running GC #define ACTOR_SLOW 5 // Actor going slowly; deprioritize +#define ACTOR_REFRESHED 6 // GC finished, ready to resume + +// #define ACTOR_TRACE + +#define ACTOR_FAST_TIMER_NS (10ULL * 1000000) // 10ms per turn +#define ACTOR_SLOW_TIMER_NS (5000ULL * 1000000) // 5s for slow actors +#define ACTOR_SLOW_STRIKES_MAX 3 // consecutive slow turns -> kill +#define ACTOR_MEMORY_LIMIT (16ULL * 1024 * 1024) // 16MB heap cap typedef struct cell_rt { JSContext *context; @@ -58,6 +66,12 @@ typedef struct cell_rt { int main_thread_only; int affinity; + uint64_t turn_start_ns; // cell_ns() when turn began + uint64_t turn_deadline_ns; // turn_start_ns + fast or slow timer + int is_slow_turn; // 1 if running under slow timer + int slow_strikes; // consecutive slow-completed turns + int vm_suspended; // 1 if VM is paused mid-turn + const char *name; // human friendly name cell_hook trace_hook; } cell_rt; @@ -74,6 +88,7 @@ int uncaught_exception(JSContext *js, JSValue v); int actor_exists(const char *id); void set_actor_state(cell_rt *actor); +void enqueue_actor_priority(cell_rt *actor); void actor_clock(cell_rt *actor, JSValue fn); uint32_t actor_delay(cell_rt *actor, JSValue fn, double seconds); JSValue actor_remove_timer(cell_rt *actor, uint32_t timer_id); @@ -85,6 +100,8 @@ void actor_free(cell_rt *actor); int scheduler_actor_count(void); void scheduler_enable_quiescence(void); +JSValue JS_ResumeRegisterVM(JSContext *ctx); + uint64_t cell_ns(); void cell_sleep(double seconds); int randombytes(void *buf, size_t n); diff --git a/source/mach.c b/source/mach.c index 7dbf48b2..ae9f1e9a 100644 --- a/source/mach.c +++ b/source/mach.c @@ -678,14 +678,14 @@ static JSValue reg_vm_binop(JSContext *ctx, int op, JSValue a, JSValue b) { return JS_ThrowTypeError(ctx, "type mismatch in binary operation"); } -/* Check for interrupt */ +/* Check for interrupt — returns: 0 = continue, -1 = hard kill, 1 = suspend */ int reg_vm_check_interrupt(JSContext *ctx) { if (--ctx->interrupt_counter <= 0) { ctx->interrupt_counter = JS_INTERRUPT_COUNTER_INIT; if (ctx->interrupt_handler) { - if (ctx->interrupt_handler(ctx->rt, ctx->interrupt_opaque)) { - return -1; - } + int r = ctx->interrupt_handler(ctx->rt, ctx->interrupt_opaque); + if (r < 0) return -1; /* hard kill */ + if (r > 0) return 1; /* suspend request */ } } return 0; @@ -731,6 +731,31 @@ void __asan_on_error(void) { JSValue JS_CallRegisterVM(JSContext *ctx, JSCodeRegister *code, JSValue this_obj, int argc, JSValue *argv, JSValue env, JSValue outer_frame) { + JSGCRef frame_ref; + JSFrameRegister *frame; + uint32_t pc; + JSValue result; + + /* Resume path: if VM was suspended, restore state and jump into dispatch */ + if (ctx->suspended) { + ctx->suspended = 0; + JS_AddGCRef(ctx, &frame_ref); + frame_ref.val = ctx->suspended_frame_ref.val; + ctx->suspended_frame_ref.val = JS_NULL; + frame = (JSFrameRegister *)JS_VALUE_GET_PTR(frame_ref.val); + JSFunction *fn = JS_VALUE_GET_FUNCTION(frame->function); + code = fn->u.reg.code; + env = fn->u.reg.env_record; + pc = ctx->suspended_pc; + result = JS_NULL; +#ifdef HAVE_ASAN + __asan_js_ctx = ctx; +#endif + goto vm_dispatch; + } + + { + /* Normal path: set up a new call */ /* Protect env and outer_frame from GC — alloc_frame_register can trigger collection which moves heap objects, invalidating stack-local copies */ JSGCRef env_gc, of_gc; @@ -752,7 +777,7 @@ JSValue JS_CallRegisterVM(JSContext *ctx, JSCodeRegister *code, } /* Allocate initial frame */ - JSFrameRegister *frame = alloc_frame_register(ctx, code->nr_slots); + frame = alloc_frame_register(ctx, code->nr_slots); if (!frame) { for (int i = nargs_copy - 1; i >= 0; i--) JS_PopGCRef(ctx, &arg_gcs[i]); JS_PopGCRef(ctx, &this_gc); @@ -762,7 +787,6 @@ JSValue JS_CallRegisterVM(JSContext *ctx, JSCodeRegister *code, } /* Protect frame from GC */ - JSGCRef frame_ref; JS_AddGCRef(ctx, &frame_ref); frame_ref.val = JS_MKPTR(frame); #ifdef HAVE_ASAN @@ -786,9 +810,11 @@ JSValue JS_CallRegisterVM(JSContext *ctx, JSCodeRegister *code, JS_PopGCRef(ctx, &of_gc); JS_PopGCRef(ctx, &env_gc); - uint32_t pc = code->entry_point; - JSValue result = JS_NULL; + pc = code->entry_point; + result = JS_NULL; + } /* end normal path block */ +vm_dispatch: /* Execution loop — 32-bit instruction dispatch */ for (;;) { #ifndef NDEBUG @@ -1379,9 +1405,13 @@ JSValue JS_CallRegisterVM(JSContext *ctx, JSCodeRegister *code, VM_CASE(MACH_JMP): { int offset = MACH_GET_sJ(instr); pc = (uint32_t)((int32_t)pc + offset); - if (offset < 0 && reg_vm_check_interrupt(ctx)) { - result = JS_ThrowInternalError(ctx, "interrupted"); - goto done; + if (offset < 0) { + int irc = reg_vm_check_interrupt(ctx); + if (irc < 0) { + result = JS_ThrowInternalError(ctx, "interrupted"); + goto done; + } + if (irc > 0) goto suspend; } VM_BREAK(); } @@ -1395,9 +1425,13 @@ JSValue JS_CallRegisterVM(JSContext *ctx, JSCodeRegister *code, if (cond) { int offset = MACH_GET_sBx(instr); pc = (uint32_t)((int32_t)pc + offset); - if (offset < 0 && reg_vm_check_interrupt(ctx)) { - result = JS_ThrowInternalError(ctx, "interrupted"); - goto done; + if (offset < 0) { + int irc = reg_vm_check_interrupt(ctx); + if (irc < 0) { + result = JS_ThrowInternalError(ctx, "interrupted"); + goto done; + } + if (irc > 0) goto suspend; } } VM_BREAK(); @@ -1412,9 +1446,13 @@ JSValue JS_CallRegisterVM(JSContext *ctx, JSCodeRegister *code, if (!cond) { int offset = MACH_GET_sBx(instr); pc = (uint32_t)((int32_t)pc + offset); - if (offset < 0 && reg_vm_check_interrupt(ctx)) { - result = JS_ThrowInternalError(ctx, "interrupted"); - goto done; + if (offset < 0) { + int irc = reg_vm_check_interrupt(ctx); + if (irc < 0) { + result = JS_ThrowInternalError(ctx, "interrupted"); + goto done; + } + if (irc > 0) goto suspend; } } VM_BREAK(); @@ -1927,11 +1965,13 @@ JSValue JS_CallRegisterVM(JSContext *ctx, JSCodeRegister *code, /* C or bytecode function: args already in fr->slots (GC-protected via frame chain) */ ctx->reg_current_frame = frame_ref.val; ctx->current_register_pc = pc > 0 ? pc - 1 : 0; + ctx->vm_call_depth++; JSValue ret; if (fn->kind == JS_FUNC_KIND_C) ret = js_call_c_function(ctx, fn_val, fr->slots[0], c_argc, &fr->slots[1]); else ret = JS_CallInternal(ctx, fn_val, fr->slots[0], c_argc, &fr->slots[1], 0); + ctx->vm_call_depth--; frame = (JSFrameRegister *)JS_VALUE_GET_PTR(frame_ref.val); ctx->reg_current_frame = JS_NULL; if (JS_IsException(ret)) goto disrupt; @@ -2010,11 +2050,13 @@ JSValue JS_CallRegisterVM(JSContext *ctx, JSCodeRegister *code, /* C/bytecode function: call it, then return result to our caller */ ctx->reg_current_frame = frame_ref.val; ctx->current_register_pc = pc > 0 ? pc - 1 : 0; + ctx->vm_call_depth++; JSValue ret; if (fn->kind == JS_FUNC_KIND_C) ret = js_call_c_function(ctx, fn_val, fr->slots[0], c_argc, &fr->slots[1]); else ret = JS_CallInternal(ctx, fn_val, fr->slots[0], c_argc, &fr->slots[1], 0); + ctx->vm_call_depth--; frame = (JSFrameRegister *)JS_VALUE_GET_PTR(frame_ref.val); ctx->reg_current_frame = JS_NULL; if (JS_IsException(ret)) goto disrupt; @@ -2144,6 +2186,17 @@ JSValue JS_CallRegisterVM(JSContext *ctx, JSCodeRegister *code, } } +suspend: + ctx->suspended = 1; + ctx->suspended_pc = pc; + ctx->suspended_frame_ref.val = frame_ref.val; + result = JS_SUSPENDED; + JS_DeleteGCRef(ctx, &frame_ref); +#ifdef HAVE_ASAN + __asan_js_ctx = NULL; +#endif + return result; + done: #ifdef HAVE_ASAN __asan_js_ctx = NULL; @@ -2157,6 +2210,13 @@ done: return result; } +JSValue JS_ResumeRegisterVM(JSContext *ctx) { + if (!ctx->suspended) + return JS_ThrowInternalError(ctx, "no suspended VM to resume"); + /* ctx->suspended is set; JS_CallRegisterVM will take the resume path */ + return JS_CallRegisterVM(ctx, NULL, JS_NULL, 0, NULL, JS_NULL, JS_NULL); +} + /* ============================================================ MCODE Lowering — mcode JSON IR → MachInstr32 ============================================================ */ diff --git a/source/quickjs-internal.h b/source/quickjs-internal.h index a4abe12a..b32415de 100644 --- a/source/quickjs-internal.h +++ b/source/quickjs-internal.h @@ -1138,6 +1138,13 @@ struct JSContext { JSValue reg_current_frame; /* current JSFrameRegister being executed */ uint32_t current_register_pc; /* PC at exception time */ + /* VM suspend/resume state */ + int suspended; /* 1 = VM was suspended (not exception) */ + JSGCRef suspended_frame_ref; /* GC-rooted saved frame for resume */ + uint32_t suspended_pc; /* saved PC for resume */ + int vm_call_depth; /* 0 = pure bytecode, >0 = C frames on stack */ + size_t heap_memory_limit; /* 0 = no limit, else max heap bytes */ + JSInterruptHandler *interrupt_handler; void *interrupt_opaque; @@ -1540,7 +1547,8 @@ void JS_ThrowInterrupted (JSContext *ctx); static no_inline __exception int __js_poll_interrupts (JSContext *ctx) { ctx->interrupt_counter = JS_INTERRUPT_COUNTER_INIT; if (ctx->interrupt_handler) { - if (ctx->interrupt_handler (ctx->rt, ctx->interrupt_opaque)) { + int r = ctx->interrupt_handler (ctx->rt, ctx->interrupt_opaque); + if (r < 0) { JS_ThrowInterrupted (ctx); return -1; } diff --git a/source/quickjs.h b/source/quickjs.h index ec049139..abcdfc76 100644 --- a/source/quickjs.h +++ b/source/quickjs.h @@ -294,6 +294,12 @@ JS_IsShortFloat (JSValue v) { #define JS_FALSE ((JSValue)JS_TAG_BOOL) #define JS_TRUE ((JSValue)(JS_TAG_BOOL | (1 << 5))) #define JS_EXCEPTION ((JSValue)JS_TAG_EXCEPTION) +#define JS_TAG_SUSPENDED 0x13 /* 10011 - distinct special tag */ +#define JS_SUSPENDED ((JSValue)JS_TAG_SUSPENDED) + +static inline JS_BOOL JS_IsSuspended(JSValue v) { + return JS_VALUE_GET_TAG(v) == JS_TAG_SUSPENDED; +} #ifndef JS_DEFAULT_STACK_SIZE #define JS_DEFAULT_STACK_SIZE (1024 * 1024) @@ -333,6 +339,12 @@ void JS_SetMaxStackSize (JSContext *ctx, size_t stack_size); used to check stack overflow. */ void JS_UpdateStackTop (JSContext *ctx); +/* Returns the current VM call depth (0 = pure bytecode, >0 = C frames) */ +int JS_GetVMCallDepth(JSContext *ctx); + +/* Set per-context heap memory limit (0 = no limit) */ +void JS_SetHeapMemoryLimit(JSContext *ctx, size_t limit); + /* return != 0 if the JS code needs to be interrupted */ typedef int JSInterruptHandler (JSRuntime *rt, void *opaque); void JS_SetInterruptHandler (JSContext *ctx, JSInterruptHandler *cb, diff --git a/source/runtime.c b/source/runtime.c index 824b3489..81e52812 100644 --- a/source/runtime.c +++ b/source/runtime.c @@ -1836,6 +1836,17 @@ int ctx_gc (JSContext *ctx, int allow_grow, size_t alloc_size) { } #endif + /* Check memory limit — kill actor if heap exceeds cap */ + if (ctx->heap_memory_limit > 0 && ctx->current_block_size > ctx->heap_memory_limit) { +#ifdef ACTOR_TRACE + void *crt = ctx->user_opaque; + if (crt) + fprintf(stderr, "[ACTOR_TRACE] heap %zu > limit %zu, OOM\n", + ctx->current_block_size, ctx->heap_memory_limit); +#endif + return -1; + } + return 0; } @@ -1873,6 +1884,14 @@ void JS_SetInterruptHandler (JSContext *ctx, JSInterruptHandler *cb, void *opaqu ctx->interrupt_opaque = opaque; } +int JS_GetVMCallDepth(JSContext *ctx) { + return ctx->vm_call_depth; +} + +void JS_SetHeapMemoryLimit(JSContext *ctx, size_t limit) { + ctx->heap_memory_limit = limit; +} + /* Allocate a string using bump allocation from context heap. Note: the string contents are uninitialized */ JSText *js_alloc_string (JSContext *ctx, int max_len) { @@ -1943,6 +1962,14 @@ JSContext *JS_NewContextRawWithHeapSize (JSRuntime *rt, size_t heap_size) { ctx->reg_current_frame = JS_NULL; ctx->c_call_root = NULL; + /* Initialize VM suspend/resume state */ + ctx->suspended = 0; + ctx->suspended_pc = 0; + ctx->vm_call_depth = 0; + ctx->heap_memory_limit = 0; + JS_AddGCRef(ctx, &ctx->suspended_frame_ref); + ctx->suspended_frame_ref.val = JS_NULL; + /* Initialize per-context execution state (moved from JSRuntime) */ ctx->current_exception = JS_NULL; ctx->actor_sym = JS_NULL; @@ -2051,6 +2078,8 @@ void JS_FreeContext (JSContext *ctx) { JSRuntime *rt = ctx->rt; int i; + JS_DeleteGCRef(ctx, &ctx->suspended_frame_ref); + for (i = 0; i < JS_NATIVE_ERROR_COUNT; i++) { } for (i = 0; i < ctx->class_count; i++) { diff --git a/source/scheduler.c b/source/scheduler.c index 282f4df6..73ad613c 100644 --- a/source/scheduler.c +++ b/source/scheduler.c @@ -34,19 +34,23 @@ typedef struct { static timer_node *timer_heap = NULL; +// Priority queue indices +#define PQ_READY 0 +#define PQ_REFRESHED 1 +#define PQ_EXHAUSTED 2 +#define PQ_SLOW 3 +#define PQ_COUNT 4 + // --- 3. The Global Engine State --- static struct { pthread_mutex_t lock; // Protects queue, shutdown flag, and timers pthread_cond_t wake_cond; // Wakes up workers pthread_cond_t timer_cond; // Wakes up the timer thread pthread_cond_t main_cond; // Wakes up the main thread - - actor_node *head; // Ready Queue Head - actor_node *tail; // Ready Queue Tail - - actor_node *main_head; // Main Thread Queue Head - actor_node *main_tail; // Main Thread Queue Tail - + + actor_node *q_head[PQ_COUNT], *q_tail[PQ_COUNT]; // worker queues + actor_node *mq_head[PQ_COUNT], *mq_tail[PQ_COUNT]; // main-thread queues + int shutting_down; int quiescence_enabled; // set after bootstrap, before actor_loop _Atomic int quiescent_count; // actors idle with no messages and no timers @@ -56,6 +60,33 @@ static struct { pthread_t timer_thread; } engine; +static int state_to_pq(int state) { + switch (state) { + case ACTOR_REFRESHED: return PQ_REFRESHED; + case ACTOR_EXHAUSTED: return PQ_EXHAUSTED; + case ACTOR_SLOW: return PQ_SLOW; + default: return PQ_READY; + } +} + +static actor_node *dequeue_priority(actor_node *heads[], actor_node *tails[]) { + for (int i = 0; i < PQ_COUNT; i++) { + if (heads[i]) { + actor_node *n = heads[i]; + heads[i] = n->next; + if (!heads[i]) tails[i] = NULL; + return n; + } + } + return NULL; +} + +static int has_any_work(actor_node *heads[]) { + for (int i = 0; i < PQ_COUNT; i++) + if (heads[i]) return 1; + return 0; +} + static pthread_mutex_t *actors_mutex; static struct { char *key; cell_rt *value; } *actors = NULL; @@ -196,33 +227,48 @@ void *timer_thread_func(void *arg) { return NULL; } +void enqueue_actor_priority(cell_rt *actor) { + actor_node *n = malloc(sizeof(actor_node)); + n->actor = actor; + n->next = NULL; + int pq = state_to_pq(actor->state); + pthread_mutex_lock(&engine.lock); + if (actor->main_thread_only) { + if (engine.mq_tail[pq]) engine.mq_tail[pq]->next = n; + else engine.mq_head[pq] = n; + engine.mq_tail[pq] = n; + pthread_cond_signal(&engine.main_cond); + } else { + if (engine.q_tail[pq]) engine.q_tail[pq]->next = n; + else engine.q_head[pq] = n; + engine.q_tail[pq] = n; + pthread_cond_signal(&engine.wake_cond); + } + pthread_mutex_unlock(&engine.lock); +} + void *actor_runner(void *arg) { - while (1) { - pthread_mutex_lock(&engine.lock); - - // Wait while queue is empty AND not shutting down - while (engine.head == NULL && !engine.shutting_down) { - pthread_cond_wait(&engine.wake_cond, &engine.lock); - } + while (1) { + pthread_mutex_lock(&engine.lock); - if (engine.shutting_down && engine.head == NULL) { - pthread_mutex_unlock(&engine.lock); - break; // Exit thread - } - - // Pop from Linked List - actor_node *node = engine.head; - engine.head = node->next; - if (engine.head == NULL) engine.tail = NULL; - - pthread_mutex_unlock(&engine.lock); - - if (node) { - actor_turn(node->actor); - free(node); - } + while (!has_any_work(engine.q_head) && !engine.shutting_down) { + pthread_cond_wait(&engine.wake_cond, &engine.lock); } - return NULL; + + if (engine.shutting_down && !has_any_work(engine.q_head)) { + pthread_mutex_unlock(&engine.lock); + break; + } + + actor_node *node = dequeue_priority(engine.q_head, engine.q_tail); + pthread_mutex_unlock(&engine.lock); + + if (node) { + actor_turn(node->actor); + free(node); + } + } + return NULL; } void actor_initialize(void) { @@ -230,12 +276,14 @@ void actor_initialize(void) { pthread_cond_init(&engine.wake_cond, NULL); pthread_cond_init(&engine.timer_cond, NULL); pthread_cond_init(&engine.main_cond, NULL); - + engine.shutting_down = 0; - engine.head = NULL; - engine.tail = NULL; - engine.main_head = NULL; - engine.main_tail = NULL; + for (int i = 0; i < PQ_COUNT; i++) { + engine.q_head[i] = NULL; + engine.q_tail[i] = NULL; + engine.mq_head[i] = NULL; + engine.mq_tail[i] = NULL; + } actors_mutex = malloc(sizeof(pthread_mutex_t)); pthread_mutex_init(actors_mutex, NULL); @@ -406,30 +454,7 @@ void set_actor_state(cell_rt *actor) #endif actor->state = ACTOR_READY; actor->ar = 0; - - actor_node *n = malloc(sizeof(actor_node)); - n->actor = actor; - n->next = NULL; - - pthread_mutex_lock(&engine.lock); - if (actor->main_thread_only) { - if (engine.main_tail) { - engine.main_tail->next = n; - } else { - engine.main_head = n; - } - engine.main_tail = n; - pthread_cond_signal(&engine.main_cond); - } else { - if (engine.tail) { - engine.tail->next = n; - } else { - engine.head = n; - } - engine.tail = n; - pthread_cond_signal(&engine.wake_cond); - } - pthread_mutex_unlock(&engine.lock); + enqueue_actor_priority(actor); } else if (!hmlen(actor->timers)) { // No messages AND no timers @@ -531,26 +556,23 @@ cell_rt *get_actor(char *id) void actor_loop() { - while (!engine.shutting_down) { // Direct read safe enough here or use lock + while (!engine.shutting_down) { pthread_mutex_lock(&engine.lock); - while (engine.main_head == NULL && !engine.shutting_down) { + while (!has_any_work(engine.mq_head) && !engine.shutting_down) { pthread_cond_wait(&engine.main_cond, &engine.lock); } - - if (engine.shutting_down && engine.main_head == NULL) { - pthread_mutex_unlock(&engine.lock); - break; + + if (engine.shutting_down && !has_any_work(engine.mq_head)) { + pthread_mutex_unlock(&engine.lock); + break; } - actor_node *node = engine.main_head; - engine.main_head = node->next; - if (engine.main_head == NULL) engine.main_tail = NULL; - + actor_node *node = dequeue_priority(engine.mq_head, engine.mq_tail); pthread_mutex_unlock(&engine.lock); if (node) { - actor_turn(node->actor); - free(node); + actor_turn(node->actor); + free(node); } } } @@ -608,7 +630,33 @@ const char *register_actor(const char *id, cell_rt *actor, int mainthread, doubl int actor_interrupt_cb(JSRuntime *rt, cell_rt *crt) { - return engine.shutting_down || crt->disrupt; + if (engine.shutting_down || crt->disrupt) + return -1; + if (crt->turn_deadline_ns == 0) + return 0; /* no timer set (e.g. during startup) */ + uint64_t now = cell_ns(); + if (now < crt->turn_deadline_ns) + return 0; + if (crt->is_slow_turn) { +#ifdef ACTOR_TRACE + fprintf(stderr, "[ACTOR_TRACE] %s: slow timer expired, killing\n", + crt->name ? crt->name : crt->id); +#endif + crt->disrupt = 1; + return -1; + } + /* Fast timer expired — check if we can suspend */ + if (JS_GetVMCallDepth(crt->context) > 0) { + /* Can't suspend with C frames on stack, switch to slow timer */ + crt->is_slow_turn = 1; + crt->turn_deadline_ns = now + ACTOR_SLOW_TIMER_NS; + return 0; + } +#ifdef ACTOR_TRACE + fprintf(stderr, "[ACTOR_TRACE] %s: fast timer expired, requesting suspend\n", + crt->name ? crt->name : crt->id); +#endif + return 1; } const char *send_message(const char *id, void *msg) @@ -638,58 +686,113 @@ const char *send_message(const char *id, void *msg) void actor_turn(cell_rt *actor) { pthread_mutex_lock(actor->mutex); - + int prev_state = actor->state; actor->state = ACTOR_RUNNING; +#ifdef ACTOR_TRACE + fprintf(stderr, "[ACTOR_TRACE] %s: %d -> RUNNING\n", + actor->name ? actor->name : actor->id, prev_state); +#endif if (actor->trace_hook) actor->trace_hook(actor->name, CELL_HOOK_ENTER); - pthread_mutex_lock(actor->msg_mutex); JSValue result; + + if (actor->vm_suspended) { + /* RESUME path: continue suspended turn under slow timer */ + actor->turn_start_ns = cell_ns(); + actor->turn_deadline_ns = actor->turn_start_ns + ACTOR_SLOW_TIMER_NS; + actor->is_slow_turn = 1; + + result = JS_ResumeRegisterVM(actor->context); + actor->vm_suspended = 0; + + if (JS_IsSuspended(result)) { + /* Still suspended after slow timer — shouldn't happen, handler returns -1 */ + actor->disrupt = 1; + goto ENDTURN; + } + if (JS_IsException(result)) { + if (!uncaught_exception(actor->context, result)) + actor->disrupt = 1; + } + actor->slow_strikes++; +#ifdef ACTOR_TRACE + fprintf(stderr, "[ACTOR_TRACE] %s: slow strike %d/%d\n", + actor->name ? actor->name : actor->id, + actor->slow_strikes, ACTOR_SLOW_STRIKES_MAX); +#endif + if (actor->slow_strikes >= ACTOR_SLOW_STRIKES_MAX) { +#ifdef ACTOR_TRACE + fprintf(stderr, "[ACTOR_TRACE] %s: %d slow strikes, killing\n", + actor->name ? actor->name : actor->id, actor->slow_strikes); +#endif + actor->disrupt = 1; + } + goto ENDTURN; + } + + /* NORMAL path: pop a message and execute */ + pthread_mutex_lock(actor->msg_mutex); int pending = arrlen(actor->letters); if (!pending) { pthread_mutex_unlock(actor->msg_mutex); goto ENDTURN; } #ifdef SCHEDULER_DEBUG - fprintf(stderr, "actor_turn: %s has %d letters, type=%d\n", actor->name ? actor->name : actor->id, pending, actor->letters[0].type); + fprintf(stderr, "actor_turn: %s has %d letters, type=%d\n", + actor->name ? actor->name : actor->id, pending, actor->letters[0].type); #endif letter l = actor->letters[0]; arrdel(actor->letters, 0); pthread_mutex_unlock(actor->msg_mutex); + actor->turn_start_ns = cell_ns(); + actor->turn_deadline_ns = actor->turn_start_ns + ACTOR_FAST_TIMER_NS; + actor->is_slow_turn = 0; + if (l.type == LETTER_BLOB) { - // Create a JS blob from the C blob - size_t size = blob_length(l.blob_data) / 8; // Convert bits to bytes -#ifdef SCHEDULER_DEBUG - fprintf(stderr, "actor_turn BLOB: %s size=%zu, calling message_handle\n", actor->name ? actor->name : actor->id, size); -#endif - JSValue arg = js_new_blob_stoned_copy(actor->context, (void*)blob_data(l.blob_data), size); + size_t size = blob_length(l.blob_data) / 8; + JSValue arg = js_new_blob_stoned_copy(actor->context, + (void *)blob_data(l.blob_data), size); blob_destroy(l.blob_data); - result = JS_Call(actor->context, actor->message_handle_ref.val, JS_NULL, 1, &arg); + result = JS_Call(actor->context, actor->message_handle_ref.val, + JS_NULL, 1, &arg); + if (JS_IsSuspended(result)) { + actor->vm_suspended = 1; + actor->state = ACTOR_SLOW; + JS_FreeValue(actor->context, arg); + goto ENDTURN_SLOW; + } if (!uncaught_exception(actor->context, result)) actor->disrupt = 1; JS_FreeValue(actor->context, arg); } else if (l.type == LETTER_CALLBACK) { result = JS_Call(actor->context, l.callback, JS_NULL, 0, NULL); + if (JS_IsSuspended(result)) { + actor->vm_suspended = 1; + actor->state = ACTOR_SLOW; + JS_FreeValue(actor->context, l.callback); + goto ENDTURN_SLOW; + } if (!uncaught_exception(actor->context, result)) actor->disrupt = 1; JS_FreeValue(actor->context, l.callback); } if (actor->disrupt) goto ENDTURN; + actor->slow_strikes = 0; /* completed within fast timer */ - ENDTURN: +ENDTURN: actor->state = ACTOR_IDLE; if (actor->trace_hook) actor->trace_hook(actor->name, CELL_HOOK_EXIT); if (actor->disrupt) { - /* Actor must die. Unlock before freeing so actor_free can - lock/unlock/destroy the mutex without use-after-free. */ #ifdef SCHEDULER_DEBUG - fprintf(stderr, "actor_turn ENDTURN: %s disrupted, freeing\n", actor->name ? actor->name : actor->id); + fprintf(stderr, "actor_turn ENDTURN: %s disrupted, freeing\n", + actor->name ? actor->name : actor->id); #endif pthread_mutex_unlock(actor->mutex); actor_free(actor); @@ -697,10 +800,21 @@ void actor_turn(cell_rt *actor) } #ifdef SCHEDULER_DEBUG - fprintf(stderr, "actor_turn ENDTURN: %s has %ld letters, calling set_actor_state\n", actor->name ? actor->name : actor->id, (long)arrlen(actor->letters)); + fprintf(stderr, "actor_turn ENDTURN: %s has %ld letters, calling set_actor_state\n", + actor->name ? actor->name : actor->id, (long)arrlen(actor->letters)); #endif set_actor_state(actor); + pthread_mutex_unlock(actor->mutex); + return; +ENDTURN_SLOW: +#ifdef ACTOR_TRACE + fprintf(stderr, "[ACTOR_TRACE] %s: suspended mid-turn -> SLOW\n", + actor->name ? actor->name : actor->id); +#endif + if (actor->trace_hook) + actor->trace_hook(actor->name, CELL_HOOK_EXIT); + enqueue_actor_priority(actor); pthread_mutex_unlock(actor->mutex); } diff --git a/tests/actor_memory_abuse.ce b/tests/actor_memory_abuse.ce new file mode 100644 index 00000000..3f403ca9 --- /dev/null +++ b/tests/actor_memory_abuse.ce @@ -0,0 +1,12 @@ +// Test: actor that allocates too much memory is killed +// Parent starts a child that allocates arrays in a loop. +// The child should be killed when it exceeds the heap memory limit. +$start(function(event) { + if (event.type == 'greet') { + // child started, wait for it to die from memory abuse + } + if (event.type == 'disrupt') { + print("PASS: memory abusing actor killed") + $stop() + } +}, 'tests/hang_actor_memory') diff --git a/tests/actor_slow_killed.ce b/tests/actor_slow_killed.ce new file mode 100644 index 00000000..cd58acf0 --- /dev/null +++ b/tests/actor_slow_killed.ce @@ -0,0 +1,13 @@ +// Test: actor with infinite loop is killed by slow timer +// Parent starts a child that hangs forever. The child should be +// killed by the slow timer, triggering a 'disrupt' event. +$start(function(event) { + if (event.type == 'greet') { + // child started, just wait for it to die + } + if (event.type == 'disrupt') { + // child was killed by the timer — success + print("PASS: slow actor killed by timer") + $stop() + } +}, 'tests/hang_actor') diff --git a/tests/hang_actor_memory.ce b/tests/hang_actor_memory.ce new file mode 100644 index 00000000..7b7ac532 --- /dev/null +++ b/tests/hang_actor_memory.ce @@ -0,0 +1,5 @@ +// Actor that allocates memory until killed by memory limit +var big = [] +while (1) { + big[] = array(10000, 0) +}