kill actor when abusive

This commit is contained in:
2026-02-17 17:34:25 -06:00
parent 2df45b2acb
commit 5ee51198a7
11 changed files with 396 additions and 110 deletions

View File

@@ -270,6 +270,9 @@ void script_startup(cell_rt *prt)
JS_SetContextOpaque(js, prt);
prt->context = js;
/* Set per-actor heap memory limit */
JS_SetHeapMemoryLimit(js, ACTOR_MEMORY_LIMIT);
/* Register all GCRef fields so the Cheney GC can relocate them. */
JS_AddGCRef(js, &prt->idx_buffer_ref);
JS_AddGCRef(js, &prt->on_exception_ref);
@@ -759,11 +762,24 @@ void cell_trace_sethook(cell_hook)
int uncaught_exception(JSContext *js, JSValue v)
{
(void)v;
if (!JS_HasException(js))
int has_exc = JS_HasException(js);
int is_exc = JS_IsException(v);
if (!has_exc && !is_exc)
return 1;
/* Error message and backtrace were already printed to stderr
by JS_ThrowError2 / print_backtrace. Just clear the flag. */
JS_GetException(js);
if (has_exc)
JS_GetException(js);
cell_rt *crt = JS_GetContextOpaque(js);
if (crt && !JS_IsNull(crt->on_exception_ref.val)) {
/* Disable interrupt handler so actor_die can send messages
without being re-interrupted. */
JS_SetInterruptHandler(js, NULL, NULL);
JSValue err = JS_NewString(js, "interrupted");
JS_Call(js, crt->on_exception_ref.val, JS_NULL, 1, &err);
/* Clear any secondary exception from the callback. */
if (JS_HasException(js))
JS_GetException(js);
}
return 0;
}

View File

@@ -21,6 +21,14 @@ typedef struct letter {
#define ACTOR_EXHAUSTED 3 // Actor waiting for GC
#define ACTOR_RECLAIMING 4 // Actor running GC
#define ACTOR_SLOW 5 // Actor going slowly; deprioritize
#define ACTOR_REFRESHED 6 // GC finished, ready to resume
// #define ACTOR_TRACE
#define ACTOR_FAST_TIMER_NS (10ULL * 1000000) // 10ms per turn
#define ACTOR_SLOW_TIMER_NS (5000ULL * 1000000) // 5s for slow actors
#define ACTOR_SLOW_STRIKES_MAX 3 // consecutive slow turns -> kill
#define ACTOR_MEMORY_LIMIT (16ULL * 1024 * 1024) // 16MB heap cap
typedef struct cell_rt {
JSContext *context;
@@ -58,6 +66,12 @@ typedef struct cell_rt {
int main_thread_only;
int affinity;
uint64_t turn_start_ns; // cell_ns() when turn began
uint64_t turn_deadline_ns; // turn_start_ns + fast or slow timer
int is_slow_turn; // 1 if running under slow timer
int slow_strikes; // consecutive slow-completed turns
int vm_suspended; // 1 if VM is paused mid-turn
const char *name; // human friendly name
cell_hook trace_hook;
} cell_rt;
@@ -74,6 +88,7 @@ int uncaught_exception(JSContext *js, JSValue v);
int actor_exists(const char *id);
void set_actor_state(cell_rt *actor);
void enqueue_actor_priority(cell_rt *actor);
void actor_clock(cell_rt *actor, JSValue fn);
uint32_t actor_delay(cell_rt *actor, JSValue fn, double seconds);
JSValue actor_remove_timer(cell_rt *actor, uint32_t timer_id);
@@ -85,6 +100,8 @@ void actor_free(cell_rt *actor);
int scheduler_actor_count(void);
void scheduler_enable_quiescence(void);
JSValue JS_ResumeRegisterVM(JSContext *ctx);
uint64_t cell_ns();
void cell_sleep(double seconds);
int randombytes(void *buf, size_t n);

View File

@@ -678,14 +678,14 @@ static JSValue reg_vm_binop(JSContext *ctx, int op, JSValue a, JSValue b) {
return JS_ThrowTypeError(ctx, "type mismatch in binary operation");
}
/* Check for interrupt */
/* Check for interrupt — returns: 0 = continue, -1 = hard kill, 1 = suspend */
int reg_vm_check_interrupt(JSContext *ctx) {
if (--ctx->interrupt_counter <= 0) {
ctx->interrupt_counter = JS_INTERRUPT_COUNTER_INIT;
if (ctx->interrupt_handler) {
if (ctx->interrupt_handler(ctx->rt, ctx->interrupt_opaque)) {
return -1;
}
int r = ctx->interrupt_handler(ctx->rt, ctx->interrupt_opaque);
if (r < 0) return -1; /* hard kill */
if (r > 0) return 1; /* suspend request */
}
}
return 0;
@@ -731,6 +731,31 @@ void __asan_on_error(void) {
JSValue JS_CallRegisterVM(JSContext *ctx, JSCodeRegister *code,
JSValue this_obj, int argc, JSValue *argv,
JSValue env, JSValue outer_frame) {
JSGCRef frame_ref;
JSFrameRegister *frame;
uint32_t pc;
JSValue result;
/* Resume path: if VM was suspended, restore state and jump into dispatch */
if (ctx->suspended) {
ctx->suspended = 0;
JS_AddGCRef(ctx, &frame_ref);
frame_ref.val = ctx->suspended_frame_ref.val;
ctx->suspended_frame_ref.val = JS_NULL;
frame = (JSFrameRegister *)JS_VALUE_GET_PTR(frame_ref.val);
JSFunction *fn = JS_VALUE_GET_FUNCTION(frame->function);
code = fn->u.reg.code;
env = fn->u.reg.env_record;
pc = ctx->suspended_pc;
result = JS_NULL;
#ifdef HAVE_ASAN
__asan_js_ctx = ctx;
#endif
goto vm_dispatch;
}
{
/* Normal path: set up a new call */
/* Protect env and outer_frame from GC — alloc_frame_register can trigger
collection which moves heap objects, invalidating stack-local copies */
JSGCRef env_gc, of_gc;
@@ -752,7 +777,7 @@ JSValue JS_CallRegisterVM(JSContext *ctx, JSCodeRegister *code,
}
/* Allocate initial frame */
JSFrameRegister *frame = alloc_frame_register(ctx, code->nr_slots);
frame = alloc_frame_register(ctx, code->nr_slots);
if (!frame) {
for (int i = nargs_copy - 1; i >= 0; i--) JS_PopGCRef(ctx, &arg_gcs[i]);
JS_PopGCRef(ctx, &this_gc);
@@ -762,7 +787,6 @@ JSValue JS_CallRegisterVM(JSContext *ctx, JSCodeRegister *code,
}
/* Protect frame from GC */
JSGCRef frame_ref;
JS_AddGCRef(ctx, &frame_ref);
frame_ref.val = JS_MKPTR(frame);
#ifdef HAVE_ASAN
@@ -786,9 +810,11 @@ JSValue JS_CallRegisterVM(JSContext *ctx, JSCodeRegister *code,
JS_PopGCRef(ctx, &of_gc);
JS_PopGCRef(ctx, &env_gc);
uint32_t pc = code->entry_point;
JSValue result = JS_NULL;
pc = code->entry_point;
result = JS_NULL;
} /* end normal path block */
vm_dispatch:
/* Execution loop — 32-bit instruction dispatch */
for (;;) {
#ifndef NDEBUG
@@ -1379,9 +1405,13 @@ JSValue JS_CallRegisterVM(JSContext *ctx, JSCodeRegister *code,
VM_CASE(MACH_JMP): {
int offset = MACH_GET_sJ(instr);
pc = (uint32_t)((int32_t)pc + offset);
if (offset < 0 && reg_vm_check_interrupt(ctx)) {
result = JS_ThrowInternalError(ctx, "interrupted");
goto done;
if (offset < 0) {
int irc = reg_vm_check_interrupt(ctx);
if (irc < 0) {
result = JS_ThrowInternalError(ctx, "interrupted");
goto done;
}
if (irc > 0) goto suspend;
}
VM_BREAK();
}
@@ -1395,9 +1425,13 @@ JSValue JS_CallRegisterVM(JSContext *ctx, JSCodeRegister *code,
if (cond) {
int offset = MACH_GET_sBx(instr);
pc = (uint32_t)((int32_t)pc + offset);
if (offset < 0 && reg_vm_check_interrupt(ctx)) {
result = JS_ThrowInternalError(ctx, "interrupted");
goto done;
if (offset < 0) {
int irc = reg_vm_check_interrupt(ctx);
if (irc < 0) {
result = JS_ThrowInternalError(ctx, "interrupted");
goto done;
}
if (irc > 0) goto suspend;
}
}
VM_BREAK();
@@ -1412,9 +1446,13 @@ JSValue JS_CallRegisterVM(JSContext *ctx, JSCodeRegister *code,
if (!cond) {
int offset = MACH_GET_sBx(instr);
pc = (uint32_t)((int32_t)pc + offset);
if (offset < 0 && reg_vm_check_interrupt(ctx)) {
result = JS_ThrowInternalError(ctx, "interrupted");
goto done;
if (offset < 0) {
int irc = reg_vm_check_interrupt(ctx);
if (irc < 0) {
result = JS_ThrowInternalError(ctx, "interrupted");
goto done;
}
if (irc > 0) goto suspend;
}
}
VM_BREAK();
@@ -1927,11 +1965,13 @@ JSValue JS_CallRegisterVM(JSContext *ctx, JSCodeRegister *code,
/* C or bytecode function: args already in fr->slots (GC-protected via frame chain) */
ctx->reg_current_frame = frame_ref.val;
ctx->current_register_pc = pc > 0 ? pc - 1 : 0;
ctx->vm_call_depth++;
JSValue ret;
if (fn->kind == JS_FUNC_KIND_C)
ret = js_call_c_function(ctx, fn_val, fr->slots[0], c_argc, &fr->slots[1]);
else
ret = JS_CallInternal(ctx, fn_val, fr->slots[0], c_argc, &fr->slots[1], 0);
ctx->vm_call_depth--;
frame = (JSFrameRegister *)JS_VALUE_GET_PTR(frame_ref.val);
ctx->reg_current_frame = JS_NULL;
if (JS_IsException(ret)) goto disrupt;
@@ -2010,11 +2050,13 @@ JSValue JS_CallRegisterVM(JSContext *ctx, JSCodeRegister *code,
/* C/bytecode function: call it, then return result to our caller */
ctx->reg_current_frame = frame_ref.val;
ctx->current_register_pc = pc > 0 ? pc - 1 : 0;
ctx->vm_call_depth++;
JSValue ret;
if (fn->kind == JS_FUNC_KIND_C)
ret = js_call_c_function(ctx, fn_val, fr->slots[0], c_argc, &fr->slots[1]);
else
ret = JS_CallInternal(ctx, fn_val, fr->slots[0], c_argc, &fr->slots[1], 0);
ctx->vm_call_depth--;
frame = (JSFrameRegister *)JS_VALUE_GET_PTR(frame_ref.val);
ctx->reg_current_frame = JS_NULL;
if (JS_IsException(ret)) goto disrupt;
@@ -2144,6 +2186,17 @@ JSValue JS_CallRegisterVM(JSContext *ctx, JSCodeRegister *code,
}
}
suspend:
ctx->suspended = 1;
ctx->suspended_pc = pc;
ctx->suspended_frame_ref.val = frame_ref.val;
result = JS_SUSPENDED;
JS_DeleteGCRef(ctx, &frame_ref);
#ifdef HAVE_ASAN
__asan_js_ctx = NULL;
#endif
return result;
done:
#ifdef HAVE_ASAN
__asan_js_ctx = NULL;
@@ -2157,6 +2210,13 @@ done:
return result;
}
JSValue JS_ResumeRegisterVM(JSContext *ctx) {
if (!ctx->suspended)
return JS_ThrowInternalError(ctx, "no suspended VM to resume");
/* ctx->suspended is set; JS_CallRegisterVM will take the resume path */
return JS_CallRegisterVM(ctx, NULL, JS_NULL, 0, NULL, JS_NULL, JS_NULL);
}
/* ============================================================
MCODE Lowering — mcode JSON IR → MachInstr32
============================================================ */

View File

@@ -1138,6 +1138,13 @@ struct JSContext {
JSValue reg_current_frame; /* current JSFrameRegister being executed */
uint32_t current_register_pc; /* PC at exception time */
/* VM suspend/resume state */
int suspended; /* 1 = VM was suspended (not exception) */
JSGCRef suspended_frame_ref; /* GC-rooted saved frame for resume */
uint32_t suspended_pc; /* saved PC for resume */
int vm_call_depth; /* 0 = pure bytecode, >0 = C frames on stack */
size_t heap_memory_limit; /* 0 = no limit, else max heap bytes */
JSInterruptHandler *interrupt_handler;
void *interrupt_opaque;
@@ -1540,7 +1547,8 @@ void JS_ThrowInterrupted (JSContext *ctx);
static no_inline __exception int __js_poll_interrupts (JSContext *ctx) {
ctx->interrupt_counter = JS_INTERRUPT_COUNTER_INIT;
if (ctx->interrupt_handler) {
if (ctx->interrupt_handler (ctx->rt, ctx->interrupt_opaque)) {
int r = ctx->interrupt_handler (ctx->rt, ctx->interrupt_opaque);
if (r < 0) {
JS_ThrowInterrupted (ctx);
return -1;
}

View File

@@ -294,6 +294,12 @@ JS_IsShortFloat (JSValue v) {
#define JS_FALSE ((JSValue)JS_TAG_BOOL)
#define JS_TRUE ((JSValue)(JS_TAG_BOOL | (1 << 5)))
#define JS_EXCEPTION ((JSValue)JS_TAG_EXCEPTION)
#define JS_TAG_SUSPENDED 0x13 /* 10011 - distinct special tag */
#define JS_SUSPENDED ((JSValue)JS_TAG_SUSPENDED)
static inline JS_BOOL JS_IsSuspended(JSValue v) {
return JS_VALUE_GET_TAG(v) == JS_TAG_SUSPENDED;
}
#ifndef JS_DEFAULT_STACK_SIZE
#define JS_DEFAULT_STACK_SIZE (1024 * 1024)
@@ -333,6 +339,12 @@ void JS_SetMaxStackSize (JSContext *ctx, size_t stack_size);
used to check stack overflow. */
void JS_UpdateStackTop (JSContext *ctx);
/* Returns the current VM call depth (0 = pure bytecode, >0 = C frames) */
int JS_GetVMCallDepth(JSContext *ctx);
/* Set per-context heap memory limit (0 = no limit) */
void JS_SetHeapMemoryLimit(JSContext *ctx, size_t limit);
/* return != 0 if the JS code needs to be interrupted */
typedef int JSInterruptHandler (JSRuntime *rt, void *opaque);
void JS_SetInterruptHandler (JSContext *ctx, JSInterruptHandler *cb,

View File

@@ -1836,6 +1836,17 @@ int ctx_gc (JSContext *ctx, int allow_grow, size_t alloc_size) {
}
#endif
/* Check memory limit — kill actor if heap exceeds cap */
if (ctx->heap_memory_limit > 0 && ctx->current_block_size > ctx->heap_memory_limit) {
#ifdef ACTOR_TRACE
void *crt = ctx->user_opaque;
if (crt)
fprintf(stderr, "[ACTOR_TRACE] heap %zu > limit %zu, OOM\n",
ctx->current_block_size, ctx->heap_memory_limit);
#endif
return -1;
}
return 0;
}
@@ -1873,6 +1884,14 @@ void JS_SetInterruptHandler (JSContext *ctx, JSInterruptHandler *cb, void *opaqu
ctx->interrupt_opaque = opaque;
}
int JS_GetVMCallDepth(JSContext *ctx) {
return ctx->vm_call_depth;
}
void JS_SetHeapMemoryLimit(JSContext *ctx, size_t limit) {
ctx->heap_memory_limit = limit;
}
/* Allocate a string using bump allocation from context heap.
Note: the string contents are uninitialized */
JSText *js_alloc_string (JSContext *ctx, int max_len) {
@@ -1943,6 +1962,14 @@ JSContext *JS_NewContextRawWithHeapSize (JSRuntime *rt, size_t heap_size) {
ctx->reg_current_frame = JS_NULL;
ctx->c_call_root = NULL;
/* Initialize VM suspend/resume state */
ctx->suspended = 0;
ctx->suspended_pc = 0;
ctx->vm_call_depth = 0;
ctx->heap_memory_limit = 0;
JS_AddGCRef(ctx, &ctx->suspended_frame_ref);
ctx->suspended_frame_ref.val = JS_NULL;
/* Initialize per-context execution state (moved from JSRuntime) */
ctx->current_exception = JS_NULL;
ctx->actor_sym = JS_NULL;
@@ -2051,6 +2078,8 @@ void JS_FreeContext (JSContext *ctx) {
JSRuntime *rt = ctx->rt;
int i;
JS_DeleteGCRef(ctx, &ctx->suspended_frame_ref);
for (i = 0; i < JS_NATIVE_ERROR_COUNT; i++) {
}
for (i = 0; i < ctx->class_count; i++) {

View File

@@ -34,19 +34,23 @@ typedef struct {
static timer_node *timer_heap = NULL;
// Priority queue indices
#define PQ_READY 0
#define PQ_REFRESHED 1
#define PQ_EXHAUSTED 2
#define PQ_SLOW 3
#define PQ_COUNT 4
// --- 3. The Global Engine State ---
static struct {
pthread_mutex_t lock; // Protects queue, shutdown flag, and timers
pthread_cond_t wake_cond; // Wakes up workers
pthread_cond_t timer_cond; // Wakes up the timer thread
pthread_cond_t main_cond; // Wakes up the main thread
actor_node *head; // Ready Queue Head
actor_node *tail; // Ready Queue Tail
actor_node *main_head; // Main Thread Queue Head
actor_node *main_tail; // Main Thread Queue Tail
actor_node *q_head[PQ_COUNT], *q_tail[PQ_COUNT]; // worker queues
actor_node *mq_head[PQ_COUNT], *mq_tail[PQ_COUNT]; // main-thread queues
int shutting_down;
int quiescence_enabled; // set after bootstrap, before actor_loop
_Atomic int quiescent_count; // actors idle with no messages and no timers
@@ -56,6 +60,33 @@ static struct {
pthread_t timer_thread;
} engine;
static int state_to_pq(int state) {
switch (state) {
case ACTOR_REFRESHED: return PQ_REFRESHED;
case ACTOR_EXHAUSTED: return PQ_EXHAUSTED;
case ACTOR_SLOW: return PQ_SLOW;
default: return PQ_READY;
}
}
static actor_node *dequeue_priority(actor_node *heads[], actor_node *tails[]) {
for (int i = 0; i < PQ_COUNT; i++) {
if (heads[i]) {
actor_node *n = heads[i];
heads[i] = n->next;
if (!heads[i]) tails[i] = NULL;
return n;
}
}
return NULL;
}
static int has_any_work(actor_node *heads[]) {
for (int i = 0; i < PQ_COUNT; i++)
if (heads[i]) return 1;
return 0;
}
static pthread_mutex_t *actors_mutex;
static struct { char *key; cell_rt *value; } *actors = NULL;
@@ -196,33 +227,48 @@ void *timer_thread_func(void *arg) {
return NULL;
}
void enqueue_actor_priority(cell_rt *actor) {
actor_node *n = malloc(sizeof(actor_node));
n->actor = actor;
n->next = NULL;
int pq = state_to_pq(actor->state);
pthread_mutex_lock(&engine.lock);
if (actor->main_thread_only) {
if (engine.mq_tail[pq]) engine.mq_tail[pq]->next = n;
else engine.mq_head[pq] = n;
engine.mq_tail[pq] = n;
pthread_cond_signal(&engine.main_cond);
} else {
if (engine.q_tail[pq]) engine.q_tail[pq]->next = n;
else engine.q_head[pq] = n;
engine.q_tail[pq] = n;
pthread_cond_signal(&engine.wake_cond);
}
pthread_mutex_unlock(&engine.lock);
}
void *actor_runner(void *arg) {
while (1) {
pthread_mutex_lock(&engine.lock);
// Wait while queue is empty AND not shutting down
while (engine.head == NULL && !engine.shutting_down) {
pthread_cond_wait(&engine.wake_cond, &engine.lock);
}
while (1) {
pthread_mutex_lock(&engine.lock);
if (engine.shutting_down && engine.head == NULL) {
pthread_mutex_unlock(&engine.lock);
break; // Exit thread
}
// Pop from Linked List
actor_node *node = engine.head;
engine.head = node->next;
if (engine.head == NULL) engine.tail = NULL;
pthread_mutex_unlock(&engine.lock);
if (node) {
actor_turn(node->actor);
free(node);
}
while (!has_any_work(engine.q_head) && !engine.shutting_down) {
pthread_cond_wait(&engine.wake_cond, &engine.lock);
}
return NULL;
if (engine.shutting_down && !has_any_work(engine.q_head)) {
pthread_mutex_unlock(&engine.lock);
break;
}
actor_node *node = dequeue_priority(engine.q_head, engine.q_tail);
pthread_mutex_unlock(&engine.lock);
if (node) {
actor_turn(node->actor);
free(node);
}
}
return NULL;
}
void actor_initialize(void) {
@@ -230,12 +276,14 @@ void actor_initialize(void) {
pthread_cond_init(&engine.wake_cond, NULL);
pthread_cond_init(&engine.timer_cond, NULL);
pthread_cond_init(&engine.main_cond, NULL);
engine.shutting_down = 0;
engine.head = NULL;
engine.tail = NULL;
engine.main_head = NULL;
engine.main_tail = NULL;
for (int i = 0; i < PQ_COUNT; i++) {
engine.q_head[i] = NULL;
engine.q_tail[i] = NULL;
engine.mq_head[i] = NULL;
engine.mq_tail[i] = NULL;
}
actors_mutex = malloc(sizeof(pthread_mutex_t));
pthread_mutex_init(actors_mutex, NULL);
@@ -406,30 +454,7 @@ void set_actor_state(cell_rt *actor)
#endif
actor->state = ACTOR_READY;
actor->ar = 0;
actor_node *n = malloc(sizeof(actor_node));
n->actor = actor;
n->next = NULL;
pthread_mutex_lock(&engine.lock);
if (actor->main_thread_only) {
if (engine.main_tail) {
engine.main_tail->next = n;
} else {
engine.main_head = n;
}
engine.main_tail = n;
pthread_cond_signal(&engine.main_cond);
} else {
if (engine.tail) {
engine.tail->next = n;
} else {
engine.head = n;
}
engine.tail = n;
pthread_cond_signal(&engine.wake_cond);
}
pthread_mutex_unlock(&engine.lock);
enqueue_actor_priority(actor);
} else if (!hmlen(actor->timers)) {
// No messages AND no timers
@@ -531,26 +556,23 @@ cell_rt *get_actor(char *id)
void actor_loop()
{
while (!engine.shutting_down) { // Direct read safe enough here or use lock
while (!engine.shutting_down) {
pthread_mutex_lock(&engine.lock);
while (engine.main_head == NULL && !engine.shutting_down) {
while (!has_any_work(engine.mq_head) && !engine.shutting_down) {
pthread_cond_wait(&engine.main_cond, &engine.lock);
}
if (engine.shutting_down && engine.main_head == NULL) {
pthread_mutex_unlock(&engine.lock);
break;
if (engine.shutting_down && !has_any_work(engine.mq_head)) {
pthread_mutex_unlock(&engine.lock);
break;
}
actor_node *node = engine.main_head;
engine.main_head = node->next;
if (engine.main_head == NULL) engine.main_tail = NULL;
actor_node *node = dequeue_priority(engine.mq_head, engine.mq_tail);
pthread_mutex_unlock(&engine.lock);
if (node) {
actor_turn(node->actor);
free(node);
actor_turn(node->actor);
free(node);
}
}
}
@@ -608,7 +630,33 @@ const char *register_actor(const char *id, cell_rt *actor, int mainthread, doubl
int actor_interrupt_cb(JSRuntime *rt, cell_rt *crt)
{
return engine.shutting_down || crt->disrupt;
if (engine.shutting_down || crt->disrupt)
return -1;
if (crt->turn_deadline_ns == 0)
return 0; /* no timer set (e.g. during startup) */
uint64_t now = cell_ns();
if (now < crt->turn_deadline_ns)
return 0;
if (crt->is_slow_turn) {
#ifdef ACTOR_TRACE
fprintf(stderr, "[ACTOR_TRACE] %s: slow timer expired, killing\n",
crt->name ? crt->name : crt->id);
#endif
crt->disrupt = 1;
return -1;
}
/* Fast timer expired — check if we can suspend */
if (JS_GetVMCallDepth(crt->context) > 0) {
/* Can't suspend with C frames on stack, switch to slow timer */
crt->is_slow_turn = 1;
crt->turn_deadline_ns = now + ACTOR_SLOW_TIMER_NS;
return 0;
}
#ifdef ACTOR_TRACE
fprintf(stderr, "[ACTOR_TRACE] %s: fast timer expired, requesting suspend\n",
crt->name ? crt->name : crt->id);
#endif
return 1;
}
const char *send_message(const char *id, void *msg)
@@ -638,58 +686,113 @@ const char *send_message(const char *id, void *msg)
void actor_turn(cell_rt *actor)
{
pthread_mutex_lock(actor->mutex);
int prev_state = actor->state;
actor->state = ACTOR_RUNNING;
#ifdef ACTOR_TRACE
fprintf(stderr, "[ACTOR_TRACE] %s: %d -> RUNNING\n",
actor->name ? actor->name : actor->id, prev_state);
#endif
if (actor->trace_hook)
actor->trace_hook(actor->name, CELL_HOOK_ENTER);
pthread_mutex_lock(actor->msg_mutex);
JSValue result;
if (actor->vm_suspended) {
/* RESUME path: continue suspended turn under slow timer */
actor->turn_start_ns = cell_ns();
actor->turn_deadline_ns = actor->turn_start_ns + ACTOR_SLOW_TIMER_NS;
actor->is_slow_turn = 1;
result = JS_ResumeRegisterVM(actor->context);
actor->vm_suspended = 0;
if (JS_IsSuspended(result)) {
/* Still suspended after slow timer — shouldn't happen, handler returns -1 */
actor->disrupt = 1;
goto ENDTURN;
}
if (JS_IsException(result)) {
if (!uncaught_exception(actor->context, result))
actor->disrupt = 1;
}
actor->slow_strikes++;
#ifdef ACTOR_TRACE
fprintf(stderr, "[ACTOR_TRACE] %s: slow strike %d/%d\n",
actor->name ? actor->name : actor->id,
actor->slow_strikes, ACTOR_SLOW_STRIKES_MAX);
#endif
if (actor->slow_strikes >= ACTOR_SLOW_STRIKES_MAX) {
#ifdef ACTOR_TRACE
fprintf(stderr, "[ACTOR_TRACE] %s: %d slow strikes, killing\n",
actor->name ? actor->name : actor->id, actor->slow_strikes);
#endif
actor->disrupt = 1;
}
goto ENDTURN;
}
/* NORMAL path: pop a message and execute */
pthread_mutex_lock(actor->msg_mutex);
int pending = arrlen(actor->letters);
if (!pending) {
pthread_mutex_unlock(actor->msg_mutex);
goto ENDTURN;
}
#ifdef SCHEDULER_DEBUG
fprintf(stderr, "actor_turn: %s has %d letters, type=%d\n", actor->name ? actor->name : actor->id, pending, actor->letters[0].type);
fprintf(stderr, "actor_turn: %s has %d letters, type=%d\n",
actor->name ? actor->name : actor->id, pending, actor->letters[0].type);
#endif
letter l = actor->letters[0];
arrdel(actor->letters, 0);
pthread_mutex_unlock(actor->msg_mutex);
actor->turn_start_ns = cell_ns();
actor->turn_deadline_ns = actor->turn_start_ns + ACTOR_FAST_TIMER_NS;
actor->is_slow_turn = 0;
if (l.type == LETTER_BLOB) {
// Create a JS blob from the C blob
size_t size = blob_length(l.blob_data) / 8; // Convert bits to bytes
#ifdef SCHEDULER_DEBUG
fprintf(stderr, "actor_turn BLOB: %s size=%zu, calling message_handle\n", actor->name ? actor->name : actor->id, size);
#endif
JSValue arg = js_new_blob_stoned_copy(actor->context, (void*)blob_data(l.blob_data), size);
size_t size = blob_length(l.blob_data) / 8;
JSValue arg = js_new_blob_stoned_copy(actor->context,
(void *)blob_data(l.blob_data), size);
blob_destroy(l.blob_data);
result = JS_Call(actor->context, actor->message_handle_ref.val, JS_NULL, 1, &arg);
result = JS_Call(actor->context, actor->message_handle_ref.val,
JS_NULL, 1, &arg);
if (JS_IsSuspended(result)) {
actor->vm_suspended = 1;
actor->state = ACTOR_SLOW;
JS_FreeValue(actor->context, arg);
goto ENDTURN_SLOW;
}
if (!uncaught_exception(actor->context, result))
actor->disrupt = 1;
JS_FreeValue(actor->context, arg);
} else if (l.type == LETTER_CALLBACK) {
result = JS_Call(actor->context, l.callback, JS_NULL, 0, NULL);
if (JS_IsSuspended(result)) {
actor->vm_suspended = 1;
actor->state = ACTOR_SLOW;
JS_FreeValue(actor->context, l.callback);
goto ENDTURN_SLOW;
}
if (!uncaught_exception(actor->context, result))
actor->disrupt = 1;
JS_FreeValue(actor->context, l.callback);
}
if (actor->disrupt) goto ENDTURN;
actor->slow_strikes = 0; /* completed within fast timer */
ENDTURN:
ENDTURN:
actor->state = ACTOR_IDLE;
if (actor->trace_hook)
actor->trace_hook(actor->name, CELL_HOOK_EXIT);
if (actor->disrupt) {
/* Actor must die. Unlock before freeing so actor_free can
lock/unlock/destroy the mutex without use-after-free. */
#ifdef SCHEDULER_DEBUG
fprintf(stderr, "actor_turn ENDTURN: %s disrupted, freeing\n", actor->name ? actor->name : actor->id);
fprintf(stderr, "actor_turn ENDTURN: %s disrupted, freeing\n",
actor->name ? actor->name : actor->id);
#endif
pthread_mutex_unlock(actor->mutex);
actor_free(actor);
@@ -697,10 +800,21 @@ void actor_turn(cell_rt *actor)
}
#ifdef SCHEDULER_DEBUG
fprintf(stderr, "actor_turn ENDTURN: %s has %ld letters, calling set_actor_state\n", actor->name ? actor->name : actor->id, (long)arrlen(actor->letters));
fprintf(stderr, "actor_turn ENDTURN: %s has %ld letters, calling set_actor_state\n",
actor->name ? actor->name : actor->id, (long)arrlen(actor->letters));
#endif
set_actor_state(actor);
pthread_mutex_unlock(actor->mutex);
return;
ENDTURN_SLOW:
#ifdef ACTOR_TRACE
fprintf(stderr, "[ACTOR_TRACE] %s: suspended mid-turn -> SLOW\n",
actor->name ? actor->name : actor->id);
#endif
if (actor->trace_hook)
actor->trace_hook(actor->name, CELL_HOOK_EXIT);
enqueue_actor_priority(actor);
pthread_mutex_unlock(actor->mutex);
}